サイトトップ

Director Flash 書籍 業務内容 プロフィール

HTML5テクニカルノート

RxJS入門 07: Scheduler


Schedulerは、オペーレータがサブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。サブスクリプションや通知が、どのコンテキストで行われるのかをスケジュールできるのです。公式「Manual」の「Scheduler」を下じきに、サンプルコードや解説は改めました。

01 Schedulerの機能

Schedulerは、サブスクリプションをいつ開始し、通知はいつ送るのかコントロールします。構成要素となるのはつぎの3つです。

Schedulerを用いることによって、ObservableObserverに通知を送る実行コンテキストが定められるのです。まず、Schedulerは使わないコードの実行結果をみておきましょう。Observableは、値を同期的に送ります。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	observer.complete();
})
console.log('just before subscribe');
observable.subscribe({
	next(x) {console.log('got value ' + x);},
	error(err) {console.error('something wrong occurred: ' + err);},
	complete() {console.log('done');}
});
console.log('just after subscribe');

/* コンソール出力
just before subscribe
got value 1
got value 2
got value 3
done
just after subscribe
*/

ObservableobserveOn()オペレータScheduler.asyncを定めると、値が非同期で送られるようになります。具体的には、Scheduler.asyncのもとでsubscribe()に渡されたObserverには、内部的に用いられるschedule()により実行の遅れ(delay)が与えられます。デフォルト値は0です。けれど、待ち時間0はsetTimeout()setInterval()と同じく、イペントループのつぎの繰り返しで実行されるため、同期の処理には遅れる結果となります。


const observable = Rx.Observable.create((observer) => {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	observer.complete();
})
.observeOn(Rx.Scheduler.async);
console.log('just before subscribe');
observable.subscribe({
	next(x) {console.log('got value ' + x);},
	error(err) {console.error('something wrong occurred: ' + err);},
	complete() {console.log('done');}
});
console.log('just after subscribe');

/* コンソール出力
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
*/

Schedulerschedule()メソッドの引数となる遅れ(delay)は、自身のクロックが測る時間にもとづきます。現実の時計の時間が過ぎるのとは別です。同期的なタスクを実行する場合のテストでも、内部クロックにより時間が組み替えられるので便利です。

02 Schedulerの種類と使い方

02-01 Schedulerの種類

Scheduler.asyncは、RxJSが提供する組み込みスケジューラのひとつです。こうしたスケジューラは、Schedulerオブジェクトの静的プロパティがつくって返します。

表001■Schedulerオブジェクトの静的プロパティとその用途

静的プロパティ 用途
null Schedulerをまったくとおさず、通知が同期的かつ再帰的に送られる。 決まった時間間隔あるいは末尾再帰の操作に用いられる。
Scheduler.queue 現在のイベントフレームのキューにスケジュールを加える(trampoline scheduler)。繰り返し操作に用いられる。
Scheduler.asap マイクロタスクキューにスケジュールを加える。Node.jsのprocess.nextTick()あるいはWeb WorkerのMessageChannelまたはsetTimeout()など利用できる中からもっとも速い配信の仕組みが用いられる。
Scheduler.async setInterval()によるスケジュールに加える。 時間軸にもとづく操作に用いられる。

02-02 Schedulerを使う

RxJSコードでSchedulerをとくに定めなくても、実際には使われていることがあります。並列処理を扱うObservableのオペレータは、すべてSchedulerが選べるからです。Schedulerが与えてられていない場合、RxJSは最小並列性の原則にもとづいてデフォルトを決めます。 オペレータの求める並列処理をもっとも少なくするSchedulerが選ばれるということです。たとえば、メッセージの数が少ないObservableのオペレータには、RxJSはSchedulerを使いません。つまり、nullまたはundefinedです。逆に、量が多く回数もかぎられないメッセージを返すオペレータなら、Scheduler.queueが用いられます。タイマーを使うオペレータが選ぶのはScheduler.asyncです。

RxJSは、並列性がもっとも少ないSchedulerを用います。パフォーマンスを考えて、並列処理を取り入れるために、別のSchedulerを選ぶことも可能です。Schedulerを指定したいとき、オペレータメソッドにはそれができるものもあります。たとえば、Observable.from()には、第2引数にSchedulerが与えられます。

Schedulerを引数にとる静的生成オペレータ

静的生成オペレータは、基本的にSchedulerを引数にとります。たとえば、Observable.from()オペレータは、つぎのような構文で、配列から変換された通知を送るときのSchedulerが定められるのです。通常、Schedulerは、オペレータの最後の引数とされます。以下に掲げたのは、Schedulerを引数にとる静的生成オペレータです。


Rx.Observable.from(array, scheduler)

subscribeOn()オペレータ

subscribeOn()オペレータは、subscribe()がどのコンテキストで呼び出されるのかをスケジュールするために用います。デフォルトでは、subscribe()の呼び出しは、同期的でただちに行われます。インスタンスオペレータsubscribeOn(()の引数にSchedulerを渡せば、実際のサブスクリプションは遅らせたり、スケジュールすることもできるのです。

observeOn()オペレータ

observeOn()オペレータは、前述01「Schedulerの機能」の例のように、通知がどのコンテキストで送られるのかをスケジュールするために用います。インスタンスオペレータobserveOn()は、引数のSchedulerにもとづいて中継のObserverを組み入れます。最終的なObserverは、そのスケジュールにしたがって呼び出されるのです。

Schedulerを引数にとるインスタンスオペレータ

Scheduler引数にとるインスタンスオペレータもあります。以下の時間に関わるオペレータは、最後の引数がSchedulerです。あとの時間関連オペレータは、デフォルトのScheduler.asyncで処理を行います。

そのほかに、Schedulerを引数にとるインスタンスオペレータはつぎのとおりです。

cache()publishReplay()Schedulerが与えられるのは、ReplaySubjectクラスを利用するためです。このクラスのコンストラクタには、最後の引数としてSchedulerが与えられます。ReplaySubjectは時間を扱うので、Schedulerのコンテキストにもとづくことになるのです。デフォルトでは、ReplaySubjectクラスは、Scheduler.queueのクロックを用います。

RxJS入門


作成者: 野中文雄
作成日: 2018年4月8日


Copyright © 2001-2018 Fumio Nonaka.  All rights reserved.