前のページ

言語編

次のページ


アイソレート (Iaolates)

DartJavaと違って単一スレッドである。しかしながら並行処理はアイソレート(isolate)を使って実現できる。これは通信の世界から生まれたEarlang言語の影響を強く受けたものである。アイソレートはマルチコアやマルチCPU環境にも拡大できる。現在総てのコンピュータ(例えモバイルのプラットホームにおいても)マルチコアのCPUが使われている。アイソレートは他のアイソレートとは資源(メモリ)を共有しないので、マルチ・スレッドに関わる複雑な競合問題(スレッド安全性問題)やコードの複雑化は存在しない。

スレッドの代わりに、Dartでは総てのコードはあるアイソレートの中で走る。しかしアイソレートの場合は各アイソレートは各々のメモリ・ヒープを持っており、アイソレートの状態が他のアイソレートからアクセスすることは出来ない。

しかしながらアイソレートをブラウザ上で実現するには、web workersを使用することになり、Dartのアイソレートを使ったコードをブラウザ上で走らせるとその分重くなるとともにメモリも消費する。Dartがその為のVMをブラウザに実装することをあきらめた現在、むしろそれはdart:htmlWorkerクラスを使ったほうが有利になっている。従ってdart:isolateライブラリは現在VMでのみ使用可能となっている。

次のコードはCPUの各コアにサーバのアイソレートを割り当てている簡単なベンチマーク用のプログラムである:

import 'dart:async';
import 'dart:io';
import 'dart:isolate';
const String _HOST = '0.0.0.0';
const String _GREET = 'Hello World';
_startServer(arg) async {
  var server = await HttpServer.bind(_HOST, 9292, shared: true);
  await for (HttpRequest request in server) {
    request.response
      ..write(_GREET)
      ..close();
  }
}
void main() {
  for (int i = 0; i < Platform.numberOfProcessors; i++)
    Isolate.spawn(_startServer, null);
  _startServer(null);
}

このコードはIsolate.spawnメソッドでHttpServerのアイソレートを産み付け(spawn)たのちそのアイソレートを起動させている。詳細はこの章を一読した後で追っていただければよい。このベンチマークではDartはとの言語に比べて芳しくない成果ではあるが、これはHttpServerAPIのなかのHTTP要求処理の差が要因のひとつと考えられ、実際の運用アプリでは異なった結果となろう。

注意:本章で示されているコードはDartPadで試すことはできない。読者はIntelliJ IdeaのようなIDEを使う必要がある。IDEに関してはIDE」の章を参照のこと。この章に従うと、IDE上ではC:\dart2_code_samples\isolatesのフォルダに本章で記されているプログラムが収容される。



Isolateクラス (class Isolate)

Isolateクラスは新規アイソレートを生成する為のテンプレートである。即ちこのクラスのサブクラスのオブジェクトのspawn(またはspawnUri)というstaticメソッドを呼ぶことで新しいアイソレートが産み付け(spawn)られる。アイソレートはこのspawnメソッドの引数で指定された関数(エントリ・ポイント)から開始する(spawnUriではmain関数がエントリ・ポイントとなる)。アイソレート間はポート(ReceivePort, SendPort)を介して値たちを送信することで通信が行われる。

spawn(またはspawnUri)メソッドの引数には最初に親から産み付けた子に渡すメッセージが含まれている。通常このメッセージには親に送信するための送信ポートを含めることになる。そうしないと子は親に送信するための手段が無くなる。親はその産み付けが成功したときはその新規の子供を表現するIsolate型のオブジェクトを受信する。

反対に子から親への最初の送信メッセージには親が子に送信するための送信ポートの情報を含めねばならない。そうすることで両者間の通信の手段が確立する。

spawn(またはspawnUri)メソッドはFuture<Isolate>を返すので、await等の非同期処理が使える。

アイソレートたちはそれ自身のイベント・ループの中でコードを走らせ、各イベントはネストされたマイクロタスクの待ち行列のなかで走ることになる。

controlPort識別子と該アイソレートを制御するためのアクセスを与えること、及びpauseCapabilityterminateCapabilityにより、何らかの制御操作へのアクセスをガードしている。例えば、pauseCapabilityなしで作られたアイソレートに対しポーズをかけてもそれは無視される。

spawn操作で作られたIsolateオブジェクトはそのアイソレートを制御するために必要な制御ポート(control port)とケーパビリティ(capabilities)を持つ。

必要ならIsolate.Isolateコンストラクタを使ってこれらの機能の一部を有さない新規のアイソレートのオブジェクトを作ることも可能である。

アイソレートのオブジェクトはSendPortを介して送信することはできないが、制御ポートとcapabilityは送信でき、受信側のポートのアイソレートの中に新たに機能するIsolateオブジェクトを生成することができる。



SendPortReceivePort

これまでの説明で送信及び受信ポートのイメージが掴めたかと思う。ポートには受信ポートであるReceivePortとそのポートに送信する為の送信ポートであるSendPortとがある。これらはともにアイソレート間の通信の為の唯一の手段である。ReceivePortsendPortというゲッタを持ち、これがSendPortを返す。このSendPortを通過して送信されるメッセージは、その送信ポートを生成したReceivePortに渡される。そこでは、それらはその受信ポートに登録されているコールバック関数に渡される。メッセージには総てのオブジェクトが含まれる。但しアイソレートをメッセージとすることはできない。

ReceivePortは複数のSendPortを持つことができる。

下図はそのイメージである:




この図では送信側はメッセージの受信をしていないが、これは送受ポートの概念を示すためのもので、送信側が受信が出来ない訳ではない。メッセージ通信を介して送信側のReceivePortに送信するためのSendPortオブジェクトをもらい、受信側でその送信ポートを使って送信側にメッセージを送信することも無論可能である。

ポートだけを使うことは無いだろうが、以下はポートの動作を知るための簡単なコードで、アイソレートは使用していない:

import 'dart:isolate';
class Sender {
  SendPort sendPort;
  String senderId;
  Sender(this.sendPort, this.senderId);
  run() {
    sendPort.send('received message from $senderId');
  }
}
main() {
  var receivePort = new ReceivePort();
  receivePort.forEach((msg) {
    print(msg);
    if(msg.endsWith('#2')) {
      receivePort.close();
    }
  });
  new Sender(receivePort.sendPort, 'sender #1').run();
  new Sender(receivePort.sendPort, 'sender #2').run();
  new Sender(receivePort.sendPort, 'sender #3').run();
}
/*
received message from sender #1
received message from sender #2
 */

ここでは送信側をSenderというクラスで表現している。このクラスのコンストラクタには受信側への送信ポート(sendPort)と、そのオブジェクトの識別のための文字列(senderId)を引数として渡している。このオブジェクトのrunメソッドはその送信ポートから識別文字列を含んだメッセージを送信するだけである。

受信側はmain()メソッドであり、

  1. 受信ポートを用意する。

  2. その受信ポートを使って、メッセージが受かった時の処理のコールバック関数を登録している。

  3. コールバックの中では、受信したメッセージ毎にそれを取り出してコンソールに表示する。

  4. もし#2の送信側からのメッセージが受かっているときは、受信ポートをクローズする。

  5. 準備ができたら、送信側を3個生成し、実行させる。

forEachというメソッドは、StreamであるReceivePortに受信されたメッセージを順番に取り出すもので、未だ受信メッセージが存在しないときは、それが到来するまで待つ。したがって、このメソッドはFutureオブジェクトを返す。メッセージを受信したら、コールバックでそのメッセージを処理する。

このコードを実行させると、

received messaage from sender #1

received messaage from sender #2

とのみ表示される。これは2番目の送信側からのメッセージを受けたことで受信ポートが閉じたためである。

受信ポートを閉じると、その後の受信は廃棄される。受信ポートを閉じないと、このプログラムはいつまでも受信待ち状態を継続する。



アイソレートの産み付け (Spawning isolates)

新規のアイソレートを産み付けるにはIsolateクラスのspawn及びspawnUri2つのメソッドが使える。spawnは現在のアイソレートと同じソース・コードを使う新規アイソレートを生成し、spawnUri は独立して(つまり別のファイルとして)書かれたアイソレートを産み付けることができる。

下表はAPIに記載されているspawn及びspawnUri2つのstaticメソッドの記述である:

Future<Isolate> spawn <T>(

void entryPoint(

T message

),

T message, {

bool paused: false,

bool errorsAreFatal,

SendPort onExit,

SendPort onError

})

現在走っているのアイソレートと同じコードを共有するアイソレートを生成し産み付ける。

引数のentryPointは産み付けられたアイソレートの開始点を指定する。このエントリ・ポイント関数は新規アイソレート内で呼び出され、その唯一の引数はmessageである。この関数は単一の引数で呼べるトップ・レベルの関数またはstaticメソッドでなければならない。この引数は少なくともひとつの位置的パラメタとたかだかひとつの必須位置的パラメタを受け付けるコンパイル時常数の関数値である。この関数は単一の引数で呼ばれ得る限り任意の数のオプショナルなパラメタたちを受け付けてよい。この関数は関数式またはインスタンス・メソッドのティア・オフの値であってはならない。

産み付け側と産み付けられた側間での相互通信が可能になるよう、通常この初期メッセージにはSendPortのオブジェクトが含められる。

pausedパラメタがtrueのセットされているときは、このアイソレートはポーズした状態で立ち上がる、つまりエントリ・ポイント関数をmessageで呼び出す直前でポーズし、あたかもisolate.pause (isolate.pauseCapability)で初期コールされたかのようになる。このアイソレートを再開させるには、isolate.resume (isolate.pauseCapability)を呼び出す。

もしerrorsAreFatal, onExit及びまたはonErrorのパラメタたちが指定されているときは、該アイソレートはあたかも各々setErrorsFatal, addOnExitListener及びaddErrorListenerが対応しているパラメタで呼び出され、該アイソレートが走り出す前に処理されたかのの如く振る舞う。

もしerrorsAreFatalがオミットされているときは、このプラットホームはデフォルトの振る舞いを選択するかまたは現行のアイソレートの振る舞いを引き継ぐかの選択をする。

setErrorsFatal, addOnExitListener及びaddErrorListenerを返されたアイソレート上で呼び出すことは可能であるが、該アイソレートがpausedとして開始していない限り、これらのメソッドが完了する前に終了してしまっているかもしれない。

このメソッドは産み付けが成功すればIsolateのインスタンスで完了するFutureオブジェクトを返す。成功しなかった場合はエラーで完了する。

Future<Isolate> spawnUri (

Uri uri,

List<String> args,

dynamic message, {

bool paused: false,

SendPort onExit,

SendPort onError,

bool errorsAreFatal,

bool checked,

Map<String, String> environment,

@Deprecated('The packages/ dir is not supported in Dart 2') Uri packageRoot,

Uri packageConfig,

bool automaticPackageResolution: false

})

指定したURIのライブラリからのコードで実行するアイソレートを生成し産み付ける。

このアイソレートは指定されたURIのライブラリのトップ・レベルにあるmain関数の実行を開始する。

ターゲットとなるmainは次の3つのシグネチュアの一つをとる。例えば

  • main()

  • main(args)

  • main(args, message)

argsが存在するときは、それらはargsリストにセットして渡される。messageの引数が存在するときはそれは初期メッセージにセットされる。

pausedパラメタがtrueのセットされているときは、このアイソレートはポーズした状態で立ち上がる、つまりエントリ・ポイント関数をmessageで呼び出す直前でポーズし、あたかもisolate.pause (isolate.pauseCapability)で初期コールされたかのようになる。このアイソレートを再開させるには、isolate.resume (isolate.pauseCapability)を呼び出す。

もしerrorsAreFatal, onExit及びまたはonErrorのパラメタたちが指定されているときは、該アイソレートはあたかも各々setErrorsFatal, addOnExitListener及びaddErrorListenerが対応しているパラメタで呼び出され、該アイソレートが走り出す前に処理されたかのの如く振る舞う。

setErrorsFatal, addOnExitListener及びaddErrorListenerを返されたアイソレート上で呼び出すことは可能であるが、該アイソレートがpausedとして開始していない限り、これらのメソッドが完了する前に終了してしまっているかもしれない。

checkedパラメタがtrueまたはfalseにセットされているときは、可能であればtrueのときは新しいアイソレートはチェックド・モード(assertと型チェックを有効とする)で走り、falseのときは運用モード(assertと型チェックを無効とする)で走る。もしこのパラメタがオミットされているときは該アイソレートは現行のアイソレートからの値を引き継ぐ。

Dart2のストロング・モードではcheckedパラメタはassertをコントロールするだけとなる。

このcheckedパラメタは常に重視できるとは限らない。もしこのアイソレートのコードがあらかじめコンパイルされているときは、動的にこのチェックド・モードの設定を変更できないかもしれない。この場合はcheckedパラメタは無視される。

警告:このcheckedパラメタは未だ総てのプラットホームには実装されていない。

packageConfigパラメタが指定されているときは、これは産み付けられるアイソレートの為のパッケージ・解決設定ファイル(package resolution configuration file)の場所をさがすのに使われる。

automaticPackageResolutionパラメタが指定されているときは、これにより産み付けられるアイソレートの中のパッケージ・ソース(package sources)の場所が決まる。

environmentは文字列から文字列へのマッピングで、これは産み付けられたアイソレートがString.fromEnvironmentの値を検索するのに使用する。もしenvironmentがオミットされているときは、産み付けられたアイソレートは産み付けるアイソレートと同じ環境宣言を持つ。

警告:このenvironmentパラメタは未だ総てのプラットホームには実装されていない。

このメソッドは産み付けが成功すればIsolateのインスタンスで完了するFutureオブジェクトを返す。成功しなかった場合はエラーで完了する。



Spawnメソッドによるアイソレート産み付け例

Spawnメソッドによる産み付けは対象となるアイソレータが比較的小規模なアプリケーションに限られよう。

簡単なエコーバックの例を示す:

import 'dart:isolate';
import 'dart:async';
void remote(SendPort replyTo) async {
  var receivePort = new ReceivePort();
  replyTo.send(receivePort.sendPort);
  await for (var msg in receivePort) {
    print('remote received : $msg');
    replyTo.send('Echo : $msg');
    if (msg == 'bar') {
      receivePort.close();
    }
  }
  ;
}
main() async {
  var receivePort = new ReceivePort();
  var sendPort = receivePort.sendPort;
  var requestTo; // SendPort to request echo back
  await Isolate.spawn(remote, sendPort);
  await for (var msg in receivePort) {
    if (msg is SendPort) {
      requestTo = msg;
      requestTo.send('foo');
    } else {
      print('received : $msg');
      requestTo.send('bar');
      if (msg.endsWith('bar')) {
        receivePort.close();
      }
    }
  }
  print("end of main");
}
/*
end of main
remote received : foo
received : Echo : foo
remote received : bar
received : Echo : bar
 */

ここでは産み付けられる関数はトップ・レベルのremote(SendPort replyTo)である。産み付けはIsolate.spawn(remote, sendPort)で行われる。最初の引数のremoteは産み付ける関数であり、sendPortは親に送信するための送信ポートである。産み付けられたアイソレートはまず自分の受信ポートを用意し、その受信ポートにメッセージを送信するための送信ポートをその産み付ける際に与えられた送信ポートを使って親に送信する。そうすることで双方向の通信手段が確立されることになる。

このコードを読めば理解されるように、子供のアイソレータは親からのメッセージを'remote received : $msg'という文字列にして送り返している。

受信したメッセージが'bar'の場合はこれが最後のメッセージだと見做してその受信ポートを閉じる。受信ポートが閉じたということはそのストリーム元がなくなったということであり、処理はそのリスナのイベント待ちの状態から抜けることになる。この場合はそのアイソレートが終了することになる。このように子供のアイソレートが走り続けるのを避けるための手段が通常必要となろう。

なおイベント待ちの状態から抜けるには以下のようにその上のStreamSubscriptioncancel()を呼んでも同じ結果が得られる:

  var streamSubscription;
  streamSubscription = receivePort.listen((msg) {
    print('remote received : $msg');
    replyTo.send('Echo : $msg');
    if (msg == 'bar') {
      streamSubscription.cancel();
    }
});

この場合はawait forが使えないのですっきりしたコードにはなっていない。



spawnUriメソッドによる産み付け例

spawnUrlによりアイソレートの産み付けはアイソレートに引数を渡すことができる。またコードの管理がし易くなるので、より大規模なアプリケーションに適している。

ここでは複数のアイソレートを起動させ、その各々の状態を管理するための基本的な手法が含まれているサンプルを示す:

sample_mother_1.dart
import 'dart:isolate';
import 'dart:async' show Future, Stream, Completer;
import 'dart:io' as io;
class Child {
  String name;
  String status;
  Child(this.name);
}
void main() {
  final children = {
    'a': new Child('a'),
    'b': new Child('b'),
    'c': new Child('c')
  };
  children.forEach((name, child) async {
    var receivePort = new ReceivePort();
    var isolate = await Isolate.spawnUri(
        Uri.parse('sample_child_1.dart'), [name], receivePort.sendPort);
    isolate.addOnExitListener(receivePort.sendPort);
    receivePort.listen((message) {
      if (message is SendPort) {
        message.send('connected');
        child.status = 'running';
      } else if (message == null) {
        print("Child exited: ${child.name}");
        child.status = 'stopped';
      } else {
        print('Message: ${message}');
        child.status = 'running';
      }
      if (child.status == 'stopped') receivePort.close();
      print('status of the child $name : ${child.status}');
    });
  });
}
sample_child_1.dart
import 'dart:isolate';
void main(List<String> args, SendPort sendPort) async {
  var receivePort = new ReceivePort();
  sendPort.send(receivePort.sendPort);
  // 最初のメッセージ受信とその処理を終えるまではこのアイソレートは生き続ける
  await for (var msg in receivePort) {
    print('${args[0]} isolate received: $msg');
    receivePort.close();
  }
}

ここでは3個のアイソレートを産み付けている。子供のアイソレートは非常に簡単な形をしており、何らかのメッセージを受信したらそれに対し応答し、それで処理が終了する(receivePort.close()の実行により)。親のアイソレートは産み付けた各アイソレートに対する非同期の処理を用意する。その中で自分の子供に対しそれが終了したときに通知を受けるように指定する:

isolate.addOnExitListener(receivePort.sendPort);

ここでは送り返すオブジェクトを指定していないので終了時にはnullが返ってくる。そうすれば親はそのアイソレートに対する受信ポートを閉じる。最終的に産み付けたアイソレートに対する受信ポートの総てが終了するとこのプログラムは終了する。

以下はその実行例である:

status of the child a : running
a isolate received: connected
Child exited: a
status of the child a : stopped
status of the child c : running
status of the child b : running
c isolate received: connected
b isolate received: connected
Child exited: c
status of the child c : stopped
Child exited: b
status of the child b : stopped

子供たちのプロセスの進行が並行的に行われていることが確認されよう。

親はこれ以外にも子供のアイソレートに対しエラーに対する処理指定、一時停止と再開などのコントロールができる。



並行処理の確認

アイソレートで正しく並行処理がなされているかをFibonacci関数を使って試してみる。Fobonatti関数はその計算項数が増えると処理時間が大きくなるので、ベンチマークではよく使われる:

'fibonacci_mother.dart'
import 'dart:isolate';
// message queue
var logMessages = [];
main() async {
  // independent calculation
  log('start fib(40) calculation');
  fib(40);
  log('finished fib(40) calculation');
  // parallel calculation
  var receivePort = new ReceivePort();
  var isolate = await Isolate.spawnUri(
      Uri.parse('fibonacci_child.dart'), [''], receivePort.sendPort);
  isolate.addOnExitListener(receivePort.sendPort);
  receivePort.listen((message) {
    if (message is SendPort) {
      message.send('start');
      doCalc();
    } else if (message == null) {
      print("Child exited");
      receivePort.close();
    } else {
      print('Message: ${message}');
    }
  });
}
// log the message
void log(String msg) {
  String timestamp = new DateTime.now().toString().substring(11);
  msg = '$timestamp : $msg';
  print(msg);
  logMessages.add(msg);
}
// calculate fibonacci
doCalc(){
  log('start fib(40) parallel calculation in the parent');
  fib(40);
  log('finished fib(40) parallel calculation in the parent');
}
// Fibonacci function
int fib(int i) {
  if (i < 2) return i;
  return fib(i - 2) + fib(i - 1);
}
'fibonacci_child.dart'
import 'dart:isolate';
// message queue
var logMessages = [];
main(List<String> args, SendPort sendPort)  async {
  var receivePort = new ReceivePort();
  sendPort.send(receivePort.sendPort);
  await for (var msg in receivePort) {
    if (msg == 'start') doCalc();
    receivePort.close();
  }
}
// return log to the parent
void log(String msg) {
  String timestamp = new DateTime.now().toString().substring(11);
  msg = '$timestamp : $msg';
  print(msg);
  logMessages.add(msg);
}
// calculate fibonacci
doCalc(){
  log('start fib(40) parallel calculation in the child');
  fib(40);
  log('finished fib(40) parallel calculation in the child');
}
// Fibonacci function
int fib(int i) {
  if (i < 2) return i;
  return fib(i - 2) + fib(i - 1);
}

このプログラムは次のように進行する:

  1. 親のアイソレートは最初に独立してfib(40)を計算する

  2. 次に子供のアイソレートを産み付け、そのアイソレートに'start'を送信して計算開始を指示する

  3. 同時に自分も同じ計算を並行して行なう。各々の開始と終了時刻をプリントする

以下はその結果の例である。但し使ったコンピュータは古い機種の為計算時間そのものは参考にならない:

15:46:45.485387 : start fib(40) calculation
15:46:46.810458 : finished fib(40) calculation
15:46:47.063472 : start fib(40) parallel calculation in the parent
15:46:47.081473 : start fib(40) parallel calculation in the child
15:46:48.315543 : finished fib(40) parallel calculation in the child
15:46:48.351552 : finished fib(40) parallel calculation in the parent
Child exited

親の並行計算時間(1.288秒)と子の並行計算時間(1.234秒)が親が独立して計算した時間(1,325秒)よりも早くなってしまっている。





前のページ

次のページ