前のページ

言語編

次のページ


本ページは非同期処理に関するより詳細な解説であり、Asynchronous Programming: Streamsの翻訳をベースにしている。読者は非同期対応 (Asynchrony support)の章を読んだあとでこの章に進むことが好ましい。Dart 2の概要をまず知りたい場合はこのページはスキップして構わない。

非同期処理-Stream (Asynchronous Programming: Streams)

Streamのポイントは以下のようである:

  • Streamはデータの非同期なシーケンスをもたらす

  • データのシーケンスにはユーザが発生させたイベント、及びファイルから読みだしたデータなどがある

  • Stream APIからのawait forまたはlisten()のいずれかを使ってストリームを処理できる

  • Streamはエラーに対処できる

  • 単一加入(single subscription)とブロードキャスト(broadcast)2種類のストリームがある

Dartにおける非同期プログラミングはFutureStreamのクラスたちによって特徴づけられている。

Futureは直ちに完了しない計算処理を表現している。通常の関数が結果を返すのに対し非同期関数はFutureを返し、このFutureは最終的には結果が入る。このfutureはその結果が得られたことを知らせてくれる。

Streamは非同期のイベントたちのシーケンスである。これは非同期のIterableのようなもので、取りに行ったときに次のイベントが得られる代わりに、このストリームは取り出し得るイベントがあることを知らせてくれる。ここで関数の4つの型を再掲する:

関数の4つの型


単一

複数

同期

T

Iterable<T>

非同期

Future<T>

Stream<T>



ストリームのイベントの受理 (Receiving stream events)

Streamはいろんな手段で生成され得る。このことは本章の対象ではないが、それらは同じやり方が使われている:非同期forループ(一般には単にawait forが呼ばれる)があるストリーム上でイベントたちに対し繰り返される。これはIterable上での繰り返しループと似ている。例えば:

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

このコードは単にあるストリームからの整数の各イベントを受信し、それらを加算し、そしてその和を(futureの値として)返している。このループのボディ部が終了すれば、この関数は次のイベントが到来するかこのストリームが終了するまで待機状態になる。

この関数はasyncキーワードでマークされており、これはawait forループを使う際に必要である。

次の例はasync*を使って単純な整数のストリームを生成することで上記のコードをテストするものである:

注:赤いボタンをクリックしてコンソール出力でその結果を確認する。



エラーのイベント (Error events)

ストリームはこれ以上のイベントが無くなったとき終了し、イベントを受理していたコードはこのことを新しいイベントが到来した時と同じように受理する。await forループを使ってイベントを読みだしている際に、このストリームが終了したらこのループは停止する。

ある場合にはこのストリームが完了する前にエラーがたまたま発生し得る:遠隔サーバからファイルを取り出している途中でネットワークで支障が起きた、あるいはこのイベントを発生させているコードにバグがあったとか。しかしこのエラーはあ誰かがそのことを知る必要がある。

Streamはデータのイベントを渡していると同じようにエラーのイベントも渡してくれる。殆どストリームは最初のエラーで停止するが、ストリームに対しひとつ以上のエラーを渡させることも可能であり、またエラーのイベントの後で更なるデータのイベントを渡させることも可能である。この資料ではたかだか1個のエラーを渡すストリーム関して扱っている。

await forを使ってストリームを読んでいる際にそのエラーはloop文によってスローされる。このことがループの停止もさせる。以下の例ではループのイタレータが4になったらエラーをスローさせる:

注:赤いボタンをクリックしてコンソール出力でその結果を確認する。



ストリームを使い込む (Working with streams)

Streamクラスには一連のヘルパ・メソッドたちがあり、Iterable上のメソッドたちと似た一般的な操作がストリーム上で行える。例えば、 Stream APIlastWhere()を使うとストリーム内の最後の正の整数を見つけることができる。

Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);



2種類のストリーム (Two kinds of streams)

2種類のストリームがある。より詳細はSingle-Subscription vs. Broadcast Streamsという資料を参照されたい。

単一加入のストリーム

最も一般的なストリームで、全体としての要素であるイベントたちのシーケンスを含む。イベントたちは正しい順序で且つ欠損なしでもたらされねばならない。これはファイルを読み出したりウェブのリクエストを受信したりする際に使われるストリームのたぐいである。

そのようなストリームは一回ごとにのみリスンできる。後から再度リスンすることは当初のイベントたちを見落としてしまい、このストリームの残りを意味のないものにしてしまう可能性があることを意味する。リスンを開始したときにデータはかたまり(チャンク)毎として捕捉され渡される。



放送のストリーム (Broadcast streams)

もう一種のストリームは一度に扱える個々のメッセージたちを意図したものである。この種のストリームは例えばブラウザ上でのマウスのイベントに使える。

そのようなストリームは何時でもリスン開始が可能であり、リスン中に生じたイベントを取得できる。同時にひとつ以上のリスナがリスンでき、以前のリスンをキャンセルした後でも再度リスンできる。



ストリームの処理のメソッド (Methods that process a stream)

Stream<T>の以下のメソッドたちはストリームを処理し結果を返す:

Future<T> get first;

Future<bool> get isEmpty;

Future<T> get last;

Future<int> get length;

Future<T> get single;

Future<bool> any(bool Function(T element) test);

Future<bool> contains(Object needle);

Future<E> drain<E>([E futureValue]);

Future<T> elementAt(int index);

Future<bool> every(bool Function(T element) test);

Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});

Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);

Future forEach(void Function(T element) action);

Future<String> join([String separator = ""]);

Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});

Future pipe(StreamConsumer<T> streamConsumer);

Future<T> reduce(T Function(T previous, T element) combine);

Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});

Future<List<T>> toList();

Future<Set<T>> toSet();

drain()pipe()を除くすべての関数はIterableの同じような関数に対応している。どれもがawait forループつきのasync関数を使って(或いは単に他のメソッドのひとつを使って)簡単に書ける。例えば、幾つかの適用は次のようなものになろう:

Future<bool> contains(Object needle) async {
  await for (var event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (var event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await this.forEach(result.add);
  return result;
}

Future<String> join([String separator = ""]) async =>
    (await this.toList()).join(separator);

(実際の実装では少し複雑なものになっているが、それは主として歴史的な理由による。)



ストリームを改善するメソッド (Methods that modify a stream)

Streamの以下のメソッドたちはオリジナルのストリームをベースにした新規のストリームを返す。各々はオリジナルでリスンする前に新規のストリーム上で誰かがリスンするまで待つ。

Stream<R> cast<R>();

Stream<S> expand<S>(Iterable<S> Function(T element) convert);

Stream<S> map<S>(S Function(T event) convert);

Stream<R> retype<R>();

Stream<T> skip(int count);

Stream<T> skipWhile(bool Function(T element) test);

Stream<T> take(int count);

Stream<T> takeWhile(bool Function(T element) test);

Stream<T> where(bool Function(T event) test);

これらすべての関数はIterableの同じようなiterableを別のiterableに変換する関数に対応している。どれもがawait forループつきのasync関数を使って簡単に書ける。

Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert);

Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);

Stream<T> distinct([bool Function(T previous, T next) equals]);

asyncExpand()asyncMap()関数はexpand()map()に似たものであるが、これらの関数の引数は非同期関数となっている。distinct()関数はIterableには存在しないが、あった筈のものである。

Stream<T> handleError(Function onError, {bool test(error)});

Stream<T> timeout(Duration timeLimit, {void Function(EventSink<T> sink) onTimeout});

Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

最後の三つの関数はもっと特別なものである。これらはawait forループができないエラー処理(ループに達した最初のエラーがそのループとそのストリーム上の加入を終了させる。)に関わっている。それから復旧させることができない。handleError()を使うとそのエラーが await forループの中で使われる前にエラーを除去することが可能になる。



transform()関数

transform()関数は単にエラー処理の為だけのものではない;これはストリームに対するより一般化された「マップ」である。通常のマップは到来イベントごとにひとつの値が必要である。しかしながら、特にI/Oストリームの場合は、ある出力イベントを作るのに幾つかの到来イベントを要することがある。 StreamTransformerはその為に機能する。例えば、 Utf8Decoderのようなデコーダがこれに該当する。トランスフォーマは唯一の関数bind()が必要で、これは容易にasync関数によって実装できる。

Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  await for (var event in streamWithoutErrors) {
    yield convert(event);
  }
}



ファイルの読み出しとデコード

以下のコードはファイルを読み出し、2つのトランスフォーマを走らせている。最初はUTF8からのデータを変換しており、次にそれが LineSplitterに渡している。ハッシュタグ#で始まる行を除くすべての行がプリントされる。

import 'dart:convert';
import 'dart:io';

Future<void> main(List<String> args) async {
  var file = File(args[0]);
  var lines = file
      .openRead()
      .transform(utf8.decoder)
      .transform(LineSplitter());
  await for (var line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}



listen()メソッド (The listen() method)

Streamの最後のメソッドはlisten()である。これは「低レベル」のメソッドで、他の総てのstream関数はlisten()に関して定義されている。

StreamSubscription<T> listen(void Function(T event) onData,
    {Function onError, void Function() onDone, bool cancelOnError});

新規のStreamの型を生成するには、単にStreamクラスを継承しlisten()メソッドを実装すればよい:Streamの他の総てのメソッドは、機能する為にlisten()メソッドを呼び出している。

listen()メソッドはあるストリーム上でリスンを開始させる。そうするまでは、このストリームは不活性なオブジェクトで、どのイベントが欲しいのかを記述しているだけである。listenするとStreamSubscriptionオブジェクトが戻され、このオブジェクトがイベントたちを作り出すアクティブなストリームである。これは如何にIterableのオブジェクトが単にオブジェクトたちのコレクションであるが実際に繰り返しをしているのはイタレータであることと似ている。

stream subscriptionでは加入の一旦停止(pause)、一旦停止後の再開、及び完全なキャンセルが可能である。各データ・イベントまたはエラーにイベントごとに、そしてそのストリームが閉じたときに呼ばれるようにコールバックをセットできる。



関連資料 (Other resources)

Dartにおけるstreamの使用と非同期プログラミングに関するより詳細なドキュメンテーションを以下に挙げる:





前のページ

次のページ