HTML5テクニカルノート
RxJS入門 07: Scheduler
- ID: FN1804002
- Technique: HTML5 / ECMAScript 2015
- Library: RxJS 5.5.8
Schedulerは、オペーレータがサブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。サブスクリプションや通知が、どのコンテキストで行われるのかをスケジュールできるのです。公式「Manual」の「Scheduler」を下じきに、サンプルコードや解説は改めました。
01 Schedulerの機能
Schedulerは、サブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。構成要素となるのはつぎの3つです。
- データ構造: タスクをどのように保持し、キューに入れるのか、優先度やその他の基準にもとづいて定める。
- 実行コンテキスト: タスクが実行される場所とタイミングを決める。
- たとえば、直ちに実行するか、
setTimeout()やprocess.nextTick()などのコールバックの仕組みを使うか、あるいはアニメーションフレームにもとづくか。
- たとえば、直ちに実行するか、
- (仮想)クロック:
Schedulerのgetterメソッドnow()により、時間の概念が与えられる。Schedulerにスケジューリングされたタスクは、そのクロックの示す時間にのみしたがう。
Schedulerを用いることによって、ObservableがObserverに通知を送る実行コンテキストが定められるのです。まず、Schedulerは使わないコードの実行結果をみておきましょう。Observableは、値を同期的に送ります。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }) console.log('just before subscribe'); observable.subscribe({ next(x) {console.log('got value ' + x);}, error(err) {console.error('something wrong occurred: ' + err);}, complete() {console.log('done');} }); console.log('just after subscribe'); /* コンソール出力 just before subscribe got value 1 got value 2 got value 3 done just after subscribe */
ObservableにobserveOn()オペレータでScheduler.asyncを定めると、値が非同期で送られるようになります。具体的には、Scheduler.asyncのもとでsubscribe()に渡されたObserverには、内部的に用いられるschedule()により実行の遅れ(delay)が与えられます。デフォルト値は0です。けれど、待ち時間0はsetTimeout()やsetInterval()と同じく、イペントループのつぎの繰り返しで実行されるため、同期の処理には遅れる結果となります。
const observable = Rx.Observable.create((observer) => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }) .observeOn(Rx.Scheduler.async); console.log('just before subscribe'); observable.subscribe({ next(x) {console.log('got value ' + x);}, error(err) {console.error('something wrong occurred: ' + err);}, complete() {console.log('done');} }); console.log('just after subscribe'); /* コンソール出力 just before subscribe just after subscribe got value 1 got value 2 got value 3 done */
Schedulerのschedule()メソッドの引数となる遅れ(delay)は、自身のクロックが測る時間にもとづきます。現実の時計の時間が過ぎるのとは別です。同期的なタスクを実行する場合のテストでも、内部クロックにより時間が組み替えられるので便利です。
02 Schedulerの種類と使い方
02-01 Schedulerの種類
Scheduler.asyncは、RxJSが提供する組み込みスケジューラのひとつです。こうしたスケジューラは、Schedulerオブジェクトの静的プロパティがつくって返します。
表001■Schedulerオブジェクトの静的プロパティとその用途
| 静的プロパティ | 用途 |
|---|---|
null
|
Schedulerをまったくとおさず、通知が同期的かつ再帰的に送られる。 決まった時間間隔あるいは末尾再帰の操作に用いられる。 |
Scheduler.queue
|
現在のイベントフレームのキューにスケジュールを加える(trampoline scheduler)。繰り返し操作に用いられる。 |
Scheduler.asap
|
マイクロタスクキューにスケジュールを加える。Node.jsのprocess.nextTick()あるいはWeb WorkerのMessageChannelまたはsetTimeout()など利用できる中からもっとも速い配信の仕組みが用いられる。
|
Scheduler.async
|
setInterval()によるスケジュールに加える。 時間軸にもとづく操作に用いられる。
|
02-02 Schedulerを使う
RxJSコードでSchedulerをとくに定めなくても、実際には使われていることがあります。並列処理を扱うObservableのオペレータは、すべてSchedulerが選べるからです。Schedulerが与えてられていない場合、RxJSは最小並列性の原則にもとづいてデフォルトを決めます。 オペレータの求める並列処理をもっとも少なくするSchedulerが選ばれるということです。たとえば、メッセージの数が少ないObservableのオペレータには、RxJSはSchedulerを使いません。つまり、nullまたはundefinedです。逆に、量が多く回数もかぎられないメッセージを返すオペレータなら、Scheduler.queueが用いられます。タイマーを使うオペレータが選ぶのはScheduler.asyncです。
RxJSは、並列性がもっとも少ないSchedulerを用います。パフォーマンスを考えて、並列処理を取り入れるために、別のSchedulerを選ぶことも可能です。Schedulerを指定したいとき、オペレータメソッドにはそれができるものもあります。たとえば、Observable.from()には、第2引数にSchedulerが与えられます。
Schedulerを引数にとる静的生成オペレータ
静的生成オペレータは、基本的にSchedulerを引数にとります。たとえば、Observable.from()オペレータは、つぎのような構文で、配列から変換された通知を送るときのSchedulerが定められるのです。通常、Schedulerは、オペレータの最後の引数とされます。以下に掲げたのは、Schedulerを引数にとる静的生成オペレータです。
Rx.Observable.from(array, scheduler)
Observable.bindCallback()Observable.bindNodeCallback()Observable.combineLatestObservable.concatObservable.emptyObservable.fromObservable.fromPromiseObservable.intervalObservable.mergeObservable.ofObservable.rangeObservable.throwObservable.timer
subscribeOn()オペレータ
subscribeOn()オペレータは、subscribe()がどのコンテキストで呼び出されるのかをスケジュールするために用います。デフォルトでは、subscribe()の呼び出しは、同期的でただちに行われます。インスタンスオペレータsubscribeOn(()の引数にSchedulerを渡せば、実際のサブスクリプションは遅らせたり、スケジュールすることもできるのです。
observeOn()オペレータ
observeOn()オペレータは、前述01「Schedulerの機能」の例のように、通知がどのコンテキストで送られるのかをスケジュールするために用います。インスタンスオペレータobserveOn()は、引数のSchedulerにもとづいて中継のObserverを組み入れます。最終的なObserverは、そのスケジュールにしたがって呼び出されるのです。
Schedulerを引数にとるインスタンスオペレータ
Scheduler引数にとるインスタンスオペレータもあります。以下の時間に関わるオペレータは、最後の引数がSchedulerです。あとの時間関連オペレータは、デフォルトのScheduler.asyncで処理を行います。
bufferTime()debounceTime()delay()auditTime()sampleTime()throttleTime()timeInterval()timeout()timeoutWith()windowTime()
そのほかに、Schedulerを引数にとるインスタンスオペレータはつぎのとおりです。
cache()combineLatest()concat()expand()merge()publishReplay()startWith()
cache()とpublishReplay()にSchedulerが与えられるのは、ReplaySubjectクラスを利用するためです。このクラスのコンストラクタには、最後の引数としてSchedulerが与えられます。ReplaySubjectは時間を扱うので、Schedulerのコンテキストにもとづくことになるのです。デフォルトでは、ReplaySubjectクラスは、Scheduler.queueのクロックを用います。
RxJS入門
- RxJS入門 01: RxJSを使ってみる
- RxJS入門 02: Observable
- RxJS入門 03: Observer
- RxJS入門 04: Subscription
- RxJS入門 05: Subject
- RxJS入門 06: オペレータ
作成者: 野中文雄
作成日: 2018年4月8日
Copyright © 2001-2018 Fumio Nonaka. All rights reserved.