サイトトップ

Director Flash 書籍 業務内容 プロフィール

HTML5テクニカルノート

RxJS 6入門 06: Observableをつくる関数とオペレータ


RxJSの基礎となるのはObservableです。オペレータはObservableを操作するのにとても役立ちます。複雑な非同期の処理を、わかりやすいコードで組み立てられるのです。このRxJS 6入門シリーズは公式「Manual」を下じきに、サンプルコードや解説は書き改めるかたちで進めています。けれど、「Operators」についてはRxJS 6で内容が大きく変わりました。そのため、コードも解説も大幅に手直ししています。

01 Observableをつくる関数

Observableオブジェクトは、Observable.create()メソッドでつくれます。そのほかに、一定のふるまいのObservableを生成できる関数があります。たとえば、interval()関数です。戻り値のObservableは、引数に渡されたミリ秒の間隔で、0から始まる連番整数を送ります。


const {interval} = rxjs;
const observable = interval(400);
const subscription = observable.subscribe((x) => console.log(x));
setTimeout(() => 
	subscription.unsubscribe()
, 1000);

// コンソール出力
0
1

range()関数は連番整数のObservableオブジェクトをつくります。ふたつの引数は、はじまりと終わりの整数です。値を送るにはサブスクライブしなければなりません。


const {range} = rxjs;
const observable = range(2, 3)
const subscription = observable.subscribe((x) => console.log(x));

// コンソール出力
2
3
4
さらに、from()関数は配列からObservableオブジェクトがつくれます。引数に渡せるのは配列のほか、Promise反復可能(iterable)なオブジェクトです(「from」参照)。

const {from} = rxjs;
const observable = from([1, 2, 3, 5])
const subscription = observable.subscribe((x) => console.log(x));

// コンソール出力
1
2
3
5

複数の入力Observableを引数にしてひとつの出力にまとめるには、merge()関数を用います。


const {interval, merge} = rxjs;
const observable1 = interval(700);
const observable2 = interval(300);
const observables = merge(observable1, observable2)
const subscription = observables.subscribe((x) => console.log(x));
setTimeout(() => 
	subscription.unsubscribe()
, 2000);

// コンソール出力
0
1
0  <- observable1
2
3
1  <- observable1
4
5

複数のObservableをひとつにまとめる関数には、ほかにcombineLatest()あるいはconcat()などがあります。

02 オペレータを使う

オペレータはObservableオブジェクトに値の処理を加えて返す関数です。filter()map()あるいはscan()などが備わっています。オペレータを呼び出しても参照したインスタンスは変えません。Subscriptionの定めを受け継いだ新たなObservableが返されるのです。オペレータは、もとのObservableはそのままに、新たなObservableをつくる純粋な関数といえます。

オペレータを読み込む名前空間はrxjs.operatorsです。filter()は、引数のコールバックがtrueを返す値だけ送ります。オペレータはpipe()メソッドに引数として渡してください。


const {range} = rxjs;
const {filter} = rxjs.operators;
const observable = range(1, 5);
observable.pipe(
	filter((x) => x % 2 == 1)
)
.subscribe((x) => console.log(x));

// コンソール出力
1
3
5

pipe()メソッドには、オペレータをいくつでも引数に加えられます。Observableは引数の順にオペレータに入出力されるのです。map()オペレータは、受け取った値を引数のコールバックに渡し、その戻り値が送られます。


const {range} = rxjs;
const {filter, map} = rxjs.operators;
const observable = range(1, 5);
observable.pipe(
	filter((x) => x % 2 == 1),
	map((x) => x ** 2)
)
.subscribe((x) => console.log(x));

// コンソール出力
1
9
25

scan()オペレータは、引数のコールバックの戻り値を、つぎの値のコールバックが第1引数として受け取ります。Observableから送られる値は第2引数になるのです。scan()オペレータの第2引数には、最初のコールバックに渡す初期値を与えます。


const {range} = rxjs;
const {filter, map, scan} = rxjs.operators;
const observable = range(1, 5);
observable.pipe(
	filter((x) => x % 2 == 1),
	map((x) => x ** 2),
	scan((acc, x) => acc + x, 0)
)
.subscribe((x) => console.log(x));

// コンソール出力
1
10
35

もうひとつ、take()オペレータをご紹介しましょう。Observableから送られる値の数を引数値に制限します。interval()関数でつくられたObservableでも、送る値がその数で止まるのです。


const {interval} = rxjs;
const {take} = rxjs.operators;
const observable = interval(1000)
.{pipe(
	{take(5)
)
.subscribe((x) => console.log(x));

// コンソール出力
0
1
2
3
4

03 オペレータをつくる

オペレータは、基本的にひとつのObservableを入力として受け取り、もうひとつの新たなObservableをつくって返す関数です。出力Observableをサブスクライブすると、入力Observableもサブスクライブされます。つぎのコードで定めたカスタムオペレータ関数は、入力Observableから受け取る値をそれぞれ2乗します。出力オブジェクトに対してObservable.subscribe()メソッドを呼び出したとき、入力Observableもサブスクライブされることにご注目ください。 これを「オペレータサブスクリプションチェーン」と呼びます。


const {Observable, from} = rxjs;
function square(input) {
	return Observable.create((observer) => {
		input.subscribe({
			next(value) {observer.next(value ** 2);},
			error(err) {observer.error(err);},
			complete() {observer.complete();}
		});
	});
}
const input = from([1, 2, 3, 4]);
square(input)
.subscribe((x) => console.log(x));

// コンソール出力
1
4
9
16

前掲カスタムオペレータは、組み込み済みのオペレータと同じようには使えません。RxJS 6のオペレータは、Observableに対して呼び出すpipe()メソッドの引数に与えなければならないからです。カスタムオペレータをpipe()に渡すには、引数にObservableオブジェクトを受け取るコールバック関数の中から呼び出すようにします。


const input = from([1, 2, 3, 4]);
// square(input)
input.pipe(
	(input) => square(input)
)
.subscribe((x) => console.log(x));

つまり、つぎのようにカスタムオペレータから、引数にObservableを受け取るコールバックが返されれば、組み込み済みオペレータと同じようにpipe()メソッドの引数に渡せるのです。


const {Observable, from} = rxjs;
// function square(input) {
function square() {
	return (input) =>
		// return
		Observable.create((observer) => {
			input.subscribe({
				next(value) {observer.next(value ** 2);},
				error(err) {observer.error(err);},
				complete() {observer.complete();}
			});
		});
}
const input = from([1, 2, 3, 4]);
input.pipe(
	// (input) => square(input)
	square()
)
.subscribe((x) => console.log(x));

// コンソール出力
1
4
9
16

04 マーブルダイヤグラム

オペレータの働きを、文章だけで説明しようとするとかぎりがあります。オペレータの多くは時間に関わり、それぞれ異なったやり方で値を送ります。それらは文章より、図で説明するとわかりやすくなることが少なくありません。マーブルダイヤグラム(Marble Diagram)は、オペレータの働きを図で示したものです。その中には、入力Observableやオペレータとその引数、さらに出力Observableが含まれます。

つぎの図001が、マーブルダイヤグラムの例です。時間は左から右に流れ、マーブルで示された値がObservableの実行によりどのように送られるのかを表しています。RxJS 5.5公式Referenceでは、マーブルダイヤグラムでオペレータの仕組みを説明しています(たとえばmap()オペレータ)。ところが、本校執筆時RxJS 6公式Referenceにはマーブルダイヤグラムが掲げられていません。おそらく、サイトがまだベータだからと考えられ、各オペレータの解説ページ「Description」にリンク切れで示されている図に加えられるものと思われます(たとえばmap()オペレータ)。

図001■マーブルダイヤグラムとその説明

RxJS 6入門


作成者: 野中文雄
作成日: 2018年6月23日


Copyright © 2001-2018 Fumio Nonaka.  All rights reserved.