サイトトップ

Director Flash 書籍 業務内容 プロフィール

HTML5テクニカルノート

RxJS入門 05: Subject


Subjectは、Observableであり、なおかつObserverです。Observableと違って、複数のObserverに値をマルチキャストできます。機能が加えられたサブクラスもいくつかあります。公式「Manual」の「Subject」を下じきに、サンプルコードや解説は改めました。

01 Subjectでマルチキャストする

RxJSのSubjectObservableを継承するサブクラスで、複数のObserverに値がマルチキャストできます。親のObservableは単一キャストで、実行したObserverが単独でオブジェクトを保持するのと異なる点です。Subjectは、Observerをイベントリスナーのように複数登録して扱います。

SubjectObservableです。Subjectを実行(サブスクライブ)してObserverが与えられると、Observableの場合と同じように値が送られます。Observerからは、実行されたのが単一キャストのObservableなのかマルチキャストのSubjectなのかはわかりません。Subjectのサブスクリプトは、内部的には実行して値を送るわけではありません。Observerを実行のリストに加えるだけです。イベントリスナーに似た仕組みといえます。

SubjectObserverでもあります。next()error()およびcomplete()の3つのメソッドを備えたオブジェクトです。next()に値を渡して呼び出せば、Subjectのリストに登録されたObserverすべてに値はマルチキャストで送られます。つぎのコードは、Observable.subscribe()メソッドでSubjectにふたつのObserverを加えたうえで、next()により値を送ります。


const subject = new Rx.Subject();
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
console.log('calling next()');
subject.next(1);
subject.next(2);

// コンソール出力
calling next()
observerA: 1
observerB: 1
observerA: 2
observerB: 2

SubjectObserverです。したがって、SubjectObservable.subscribe()メソッドの引数にして呼び出すこともできます。すると、単一キャストのObservableをマルチキャストにすることができるのです。つぎのコードは、ObservableSubjectを渡して実行することにより、登録された複数のObserverに値を送ります。なお、静的メソッドObservable.from()は、配列からつくったObservableに要素を値として送ります。


const observable = Rx.Observable.from([1, 2, 3]);
const subject = new Rx.Subject();
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
observable.subscribe(subject);

// コンソール出力
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

Subjectを継承するサブクラスには、あとに述べるBehaviorSubjectReplaySubject、あるいはAsyncSubjectなどがあります。

02 Observableをマルチキャストにする

Observableそのものは単一キャストで、ひとつのObserverにしか値を送れません。けれども、Subjectを介すれば、複数の通知先をもつマルチキャストのObservableにできます。Subjectにより、内部的に複数のObserverに同じObservableの実行を扱わせることができるのです。multicast()オペレータを使えば、複数のObserverSubjectに登録し、Observableから受け取ったデータが送れます。前掲のコードは、multicast()オペレータを使って以下のように書き替えられます。

multicast()が返すのは、Observableを継承するConnectableObservableのオブジェクトです(第2引数がない場合。「RxJS: multicast’s Secret」参照)。引数に渡したSubjectによりマルチキャストの機能が備わります(なお、ConnectableObservableについては「RxJS を学ぼう #4 - COLD と HOT について学ぶ / ConnectableObservable」参照)。ConnectableObservable.connect()メソッドが、登録されたObservableの実行を始めます。つぎのコードでメソッドの呼び出しは、内部的にobservable.subscribe(subject)と同じ役割を果たすのです。メソッドはSubscriptionを返しますので、Subscription.unsubscribe()で実行は取り消せます。


const observable = Rx.Observable.from([1, 2, 3]);
const subject = new Rx.Subject();
const multicasted = observable.multicast(subject);
// subject.subscribe({
multicasted.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
// subject.subscribe({
multicasted.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
// observable.subscribe(subject);
multicasted.connect();

03 参照を調べる

マルチキャストにしたObservableに、複数のObserverで時間差の実行をしてみましょう。最初の実行でObserverが加えられたらconnect()を呼び出し、最後の実行を止めたときunsubscribe()で取り消します。そのために、connect()の戻り値は変数にとっておかなければなりません。


const observable = Rx.Observable.interval(500);
const subject = new Rx.Subject();
const multicasted = observable.multicast(subject);
let subscription1, subscription2, subscriptionConnect;
console.log('observerA subscribed');
subscription1 = multicasted.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subscriptionConnect = multicasted.connect();  // 実行開始
setTimeout(() => {
	console.log('observerB subscribed');
	subscription2 = multicasted.subscribe({
		next(v) {console.log('observerB: ' + v);}
	});
}, 600);
setTimeout(() => {
	console.log('observerA unsubscribed');
	subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
	console.log('observerB unsubscribed');
	subscription2.unsubscribe();
	subscriptionConnect.unsubscribe(); // 実行終了
}, 2000);

// コンソール出力
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

マルチキャストのObservablesubscribe()Observerが加わったら値の受け取りを始め、すべてunsubscribe()されてObserverがなくなったら止めてしまえると便利でしょう。登録されている参照数を確かめて実行・停止するのがConnectableObservable.refCount()です。Observableを返すので(新たなConnectableObservableをつくるのではありません)、そのオブジェクトにsubscribe()unsubscribe()を行います。参照数が0より増えれば、内部的にconnect()が呼び出されます。そして、0になれば実行を止めるのです。


const observable = Rx.Observable.interval(500);
const subject = new Rx.Subject();
// const multicasted = observable.multicast(subject);
const multicasted = observable.multicast(subject).refCount();
let subscription1, subscription2;  // , subscriptionConnect;
console.log('observerA subscribed');
subscription1 = multicasted.subscribe({  // 実行開始
	next(v) {console.log('observerA: ' + v);}
});
// subscriptionConnect = multicasted.connect();
setTimeout(() => {
	console.log('observerB subscribed');
	subscription2 = multicasted.subscribe({
		next(v) {console.log('observerB: ' + v);}
	});
}, 600);
setTimeout(() => {
	console.log('observerA unsubscribed');
	subscription1.unsubscribe();
}, 1200);
setTimeout(() => {
	console.log('observerB unsubscribed');
	subscription2.unsubscribe(); // 実行終了
	// subscriptionConnect.unsubscribe();
}, 2000);

04 BehaviorSubject

SubjectのサブクラスのひとつBehaviorSubjectは、オブジェクトが現在値をもちます。直近に送られた値を納め、Observerが実行されると同時にBehaviorSubjectからその値を受け取るのです。つぎのコードでは、BehaviorSubjectに初期値0を与えて、オブジェクトがつくられます。すると、subscribe()で実行されたとき、ただちにObserverにはその値が送られるのです。つぎの実行でも、直近の値(2)を受け取ってから、next()で値が送られることになります。


const subject = new Rx.BehaviorSubject(0);  // 初期値
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.next(1);
subject.next(2);
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
subject.next(3);

// コンソール出力
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

05 ReplaySubject

ReplaySubjectは、送信済みの値がつぎの実行に送れるということはBehaviorSubjectと似ています。違いは、Observableに送った値を複数さかのぼって記録できることです。つぎのコードのように、ReplaySubjectはさかのぼる値の数(バッファーサイズ)を渡してつくります。


const subject = new Rx.ReplaySubject(3);  // さかのぼる値の数
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
subject.next(5);

// コンソール出力
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

ReplaySubjectに記録する値の数を定めるのに、第1引数のバッファーサイズに加え、第2引数でさかのぼるウィンドウの時間をミリ秒で与えることができます。つぎのコードでは、第1引数(100)はあえて大きくしたうえで、第2引数のミリ秒(500)を狭めました。すると、ふたつ目の実行でObserverは、直近のその時間に送られた値だけさかのぼって受け取るのです。


const subject = new Rx.ReplaySubject(100, 500);  // 第2引数はさかのぼるミリ秒
subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
	subject.subscribe({
		next(v) {console.log('observerB: ' + v);}
	});
}, 1000);
setTimeout(() => {
	subject.complete();
}, 1500);

// コンソール出力
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
observerA: 7
observerB: 7

06 AsyncSubject

もうひとつご紹介するSubjectのサブクラスAsyncSubjectは、Observableを実行して最後の値だけがObserverに送られます。値を受け取るのは、complete()で実行が終わったときです。


const subject = new Rx.AsyncSubject();

subject.subscribe({
	next(v) {console.log('observerA: ' + v);}
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe({
	next(v) {console.log('observerB: ' + v);}
});
subject.next(4);
subject.next(5);
subject.complete();

// コンソール出力
observerA: 5
observerB: 5

AsyncSubjectの働きは、last()オペレータと似ています。last()は実行が終わるのを待って、最後の値だけを送ります。

RxJS入門


作成者: 野中文雄
作成日: 2018年3月3日


Copyright © 2001-2018 Fumio Nonaka.  All rights reserved.