HTML5テクニカルノート
RxJS 6入門 07: Scheduler
- ID: FN1806005
- Technique: HTML5 / ECMAScript 2015
- Library: RxJS 6.2.1
Schedulerは、オペーレータがサブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。サブスクリプションや通知が、どのコンテキストで行われるのかをスケジュールできるのです。GitHubのReactiveX/rxjs「Scheduler」を下じきに、サンプルコードや解説は改めました。
01 Schedulerの機能
Schedulerは、サブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。構成要素となるのはつぎの3つです。
- データ構造: タスクをどのように保持し、キューに入れるのか、優先度やその他の基準にもとづいて定める。
- 実行コンテキスト: タスクが実行される場所と時期を決める。
- たとえば、直ちに実行するか、
setTimeout()やprocess.nextTick()などのコールバックの仕組みを使うか、あるいはアニメーションフレームにもとづくか。
- たとえば、直ちに実行するか、
- (仮想)クロック:
Schedulerのgetterメソッドnow()により、時間の概念が与えられる。Schedulerにスケジューリングされたタスクは、そのクロックの示す時間にのみしたがう。
Schedulerを用いることによって、どの実行コンテキストでObservableがObserverに通知を送るが決まるのです。まず、Schedulerは使わないコードの実行結果をみておきましょう。Observableは値を同期的に送ります。
const {Observable} = rxjs; const observable = Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }); console.log('just before subscribe'); observable.subscribe({ next(x) {console.log('got value ' + x);}, error(err) {console.error('something wrong occurred: ' + err);}, complete() {console.log('done');} }); console.log('just after subscribe');// コンソール出力 just before subscribe got value 1 got value 2 got value 3 done just after subscribe
ObservableにobserveOn()オペレータでasyncSchedulerを定めると、値が非同期で送られるようになります。具体的には、observeOn()に渡したasyncSchedulerは、内部的にschedule()メソッドが呼び出され、Observerに実行の遅れ(delay)を与えます。デフォルト値は0です。けれど、待ち時間0はsetTimeout()やsetInterval()の仕組みで扱われ、イペントループのつぎの繰り返しで実行されるため、同期処理には遅れる結果となります。
const {Observable, asyncScheduler} = rxjs; const {observeOn} = rxjs.operators; const observable = Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }).pipe( observeOn(asyncScheduler) ); console.log('just before subscribe'); observable.subscribe({ next(x) {console.log('got value ' + x);}, error(err) {console.error('something wrong occurred: ' + err);}, complete() {console.log('done');} }); console.log('just after subscribe');// コンソール出力 just before subscribe just after subscribe got value 1 got value 2 got value 3 done
observeOn(asyncScheduler)は、Observableと最終Observerとの間に、つぎのような構造のプロキシObserverをつくるのです。
const proxyObserver = { next(val) { asyncScheduler.schedule( (x) => finalObserver.next(x), 0, // 遅れ val // 送られる値 ); }, }
Schedulerのschedule()メソッドの引数となる遅れ(delay)は、自身の内部クロックが測る時間にもとづきます。現実の時計の時間が過ぎるのとは別です。時間に関わるオペレータは、遅れなど時間の経過をSchedulerのクロックにもとづいて扱います。同期的なタスクを実行する場合のテストでも、内部クロックにより時間が組み替えられるので便利です。
02 Schedulerの種類と使い方
02-01 Schedulerの種類
asyncSchedulerは、RxJSが提供する組み込みスケジューラのひとつです。こうしたスケジューラは、Schedulerオブジェクトの静的プロパティがつくって返します。
The async Scheduler is one of the built-in schedulers provided by RxJS. Each of these can be created and returned by using static properties of the Scheduler object.
表001■Schedulerオブジェクトの静的プロパティとその用途
| 静的プロパティ | 用途 |
|---|---|
null
|
Schedulerを渡さないと、通知が同期的かつ再帰的に送られる。 決まった時間間隔あるいは末尾再帰の操作に用いられる。 |
queueScheduler
|
現在のイベントフレーム(trampoline scheduler)のキューにスケジュールを加える(「RxJava の ImmediateScheduler と TrampolineScheduler の違い」参照)。繰り返し操作に用いられる。 |
asapScheduler
|
マイクロタスクキューにスケジュールを加える。Node.jsのprocess.nextTick()またはWeb WorkerのMessageChannelあるいはsetTimeout()など利用できる中からもっとも速い配信の仕組みが用いられる。
|
asyncScheduler
|
setInterval()によるスケジュールに加える。 時間軸にもとづく操作に用いられる。
|
animationFrameScheduler
|
ブラウザコンテンツが再描画される直前にスケジュールされる。スムーズなブラウザアニメーションに用いられる。 |
02-02 Schedulerを使う
RxJSコードでSchedulerをとくに定めなくても、実際には使われていることがあります。並行処理を扱うObservableのオペレータは、すべてSchedulerが選べるからです。Schedulerが与えてられていない場合、RxJSは最小並行性の原則にもとづいてデフォルトを決めます。 オペレータの求める並行処理をもっとも少なくするSchedulerが選ばれるということです。たとえば、メッセージの数が少ないObservableのオペレータには、RxJSはSchedulerを使いません。つまり、nullまたはundefinedです。逆に、大量あるいは回数のかぎられないメッセージを返すオペレータなら、queueSchedulerが用いられます。タイマーを使うオペレータが選ぶのはasyncSchedulerです。
RxJSは、並行性がもっとも少ないSchedulerを用います。パフォーマンスを考えて、並行処理を取り入れるために、別のSchedulerを選ぶことも可能です。Observableをつくる関数には、引数にSchedulerを指定できるものがあります。たとえば、from()関数は、第2引数にSchedulerが与えられます。
Schedulerを引数にとる関数
たとえば、from()関数は、つぎのような構文で、配列から変換された通知を送るときのSchedulerが定められます。通常、Schedulerは、関数の最後の引数とされます。以下に掲げたのは、Schedulerを引数にとる関数です。
from(array, scheduler)
bindCallback()bindNodeCallback()combineLatest()concat()empty()from()fromPromise()interval()merge()of()range()throw()timer()
subscribeOn()オペレータ
subscribeOn()オペレータは、subscribe()がどのコンテキストで呼び出されるのかをスケジュールするために用います。デフォルトでは、subscribe()の呼び出しは、同期的でただちに行われます。subscribeOn(()の引数にSchedulerを渡せば、実際のサブスクリプションは遅らせたり、スケジュールすることもできるのです。
observeOn()オペレータ
observeOn()オペレータは、前述01「Schedulerの機能」の例のように、通知がどのコンテキストで送られるのかをスケジュールするために用います。observeOn()は、もとのObservableと最終Observerとの中継のObserverを組み入れます。最終的なObserverは、引数のSchedulerにもとづいて呼び出されるのです。
Schedulerを引数にとるオペレータ
以下の時間に関わるオペレータは、最後の引数がSchedulerです。省くと、デフォルトのasyncSchedulerで処理を行います。
bufferTime()debounceTime()delay()auditTime()sampleTime()throttleTime()timeInterval()timeout()timeoutWith()windowTime()
他にSchedulerを引数にとる関数やオペレータは、以下のとおりです。
publishReplay()にSchedulerが与えられるのは、ReplaySubjectクラスを利用するためです。このクラスのコンストラクタには、最後の引数としてSchedulerが与えられます。ReplaySubjectは時間を扱うので、Schedulerのコンテキストにもとづくことになるのです。デフォルトでは、ReplaySubjectクラスがクロックを定めるのに用いるのはqueueSchedulerです。
GitHub「Scheduler」の「Using Schedulers」の項では、cache()オペレータを採り上げて説明しています。けれど、cache()はバージョン5.0.0-rc.1で除かれています。
RxJS 6入門
- RxJS 6入門 01: RxJSを使ってみる
- RxJS 6入門 02: Observable
- RxJS 6入門 03: Observer
- RxJS 6入門 04: Subscription
- RxJS 6入門 05: Subject
- RxJS 6入門 06: Observableをつくる関数とオペレータ
作成者: 野中文雄
作成日: 2018年6月24日
Copyright © 2001-2018 Fumio Nonaka. All rights reserved.