Thread
はじめに
Rubyではthread機能はコンパイル時に設定されるオプションです. 手元のrubyでthreadが使えるかどうか調べるためにはコマンドライ ンから以下のように入力してみてください.
% ruby -le 'print defined?(Thread)'この結果
class-constantと表示されればそのrubyインタプリタではthreadが使えます.もし,
nilと表示されたら,残念ながらそのrubyではthreadは使えません. Rubyのthread機能は移植性が高いように作られているので,再コン パイルだけで使えるようになる可能性が高いです.(もし自分で出 来なければ)管理者に頼んでみてください.
Threadとは
Threadとはひとつのプログラムの中で複数の制御の流れを扱うこと が出来る機能です.OSで提供されるプロセスとは違ってthreadで はメモリ空間が共有されます.
Rubyで使われているthreadはユーザレベルthreadと呼ばれるもので, rubyインタプリタ自身が自分でthreadの切替えを行っています.こ の方法は,OSで実装されているものよりも効率が低い,マルチCPU を活かすことが出来ない,というデメリットがありますが,その代 わり移植性が高いというメリットがあります.
Threadの生成
新しいThreadを作るためにはThread.startというメソッドを使いま す.使い方は以下の通りです.
Thread.start { .... }Thread.startは新しいthreadを作り,そのthreadでイテレータブロッ クを評価します.簡単なプログラムでthreadが動く様子を見てみま しょう.
1 Thread.start { 2 while TRUE 3 print "thread 1\n" 4 end 5 } 6 7 while TRUE 8 print "thread 2\n" 9 endこのプログラムを動かすと「thread 1」と「thread 2」が混じって 表示されるので,二つの無限ループが同時に動作しているのが分か ると思います.このプログラムを終了させるためにはCtrl-Cを押し てください.
Threadの操作
Threadクラスのメソッドは以下の通りです.
Thread.start {...}
Thread.new {...}
新しいthreadを生成し,その中でイテレータブロックを評価す る.新しく生成されたthreadオブジェクトを返す.newはstart の別名.
Thread.current
現在実行しているthreadオブジェクトを返す.
Thread.exit
現在実行しているthreadを終了させる.
Thread.join thread
指定したthreadの実行が終了するまで,現在のthreadを停止さ せる.
Thread.kill thread
指定したthreadの実行を終了させる.
Thread.pass
実行可能な他のthreadに明示的に制御を渡す.
Thread.stop
現在のtheadの実行を停止する.他のthreadがthread#runを実 行するまで停止し続ける
Thread#exit
レシーバのthreadの実行を終了させる.
Thread#run
レシーバの実行を再開させる.
Thread#stop
レシーバの実行を停止させる.
Thread#status
レシーバがまだ生きていれば真を返す.例外によってthreadが 終了していればその例外を発生させる.
Thread#value
レシーバのイテレータブロックを評価した結果を返す.まだイ テレータブロックの評価が終了していない時にはそのthreadが 終了するまで待つ.
またthreadが使える条件でコンパイルされたrubyではsleepが再定 義されていて,現在のthreadだけを一定時間停止させることが出来 る.またselectもthreadを扱えるように拡張されている.
Threadはメモリ空間を共有しているのでThread間のデータのやりと りは普通の変数を使って行うことができますが,動作するタイミン グを合わせるために同期を行う必要があります.この同期に失敗す ると,来るはずの無いデータを待って永遠に待ち続けるデッドロッ クと呼ばれる状態になったり,期待するのと違うデータを受け取っ て見付けにくいバグの元になったりします.
Rubyのthreadライブラリでは二つの同期方法を提供しています.ひ とつは同期だけを行うMutexとデータの受渡しも行うQueueです.こ れらのライブラリを使うためにはプログラムの先頭で
require "thread"
を呼び出しておく必要があります.
Mutexとはmutual-exclusion lock(相互排他ロック)の略です. Mutexをロックしようとした時にすでにロックされていれば, threadはロックが解除されるまで停止します.
並行アクセスから共有データを保護するためには以下のようなコー ドを用いて行います(ここでmをMutexのインスタンスとします).
begin m.lock # mで保護される共有データへのアクセス ensure m.unlock end
同じことをより簡単に行うためMutexにはsyncronizeというメソッ ドがあります.
m.syncronize { # mで保護される共有データへのアクセス }
例として簡単なプログラムを用意してみましょう.
1 require "thread" 2 3 m = Mutex.new 4 v = 0; # mで保護されるデータ 5 6 Thread.start { 7 m.syncronize { 8 v = v + 100 9 } 10 } 11 12 while TRUE 13 m.syncronize { 14 v = v - 33 15 } 16 end
このプログラムをMutexで保護しないと,タイミングによってはvの 値を取り出してから代入までの間に他のthreadによって値が変更さ れてしまう可能性があります.
Mutexのメソッドは以下の通りです.
Mutex.new
新しいロックを生成する
Mutex#lock
ロックする.すでにロックされている場合にはロックが解除さ れるまで待つ.
Mutex#unlock
ロックを解除する.ロックを待っている他のthreadがあればそ ちらを走らせる.
Mutex#syncronize
ロックの獲得から解除までを行うイテレータ.
Mutex#try_lock
ロックを獲得する.すでにロックされている場合には停止せず FALSEを返す.
Queueはデータを読み書きするパイプのようなものです.データを 提供するthreadは一方からデータを書き込み,読み出すthreadはも う一方からデータを取り出します.Queueに読み出すデータが残っ ていない時には読み出そうとしたthreadはデータが来るまで停止し ます.
Queueを使った簡単なプログラムは以下のようになります.
1 require "thread" 2 3 q = Queue.new 4 5 Thread.start { 6 while gets 7 q.push $_ 8 end 9 } 10 11 while TRUE 12 while line = q.pop 13 print line 14 end 15 end
このプログラムではひとつのthreadが読み込んだ行をもうひとつの
行が出力しています.3行目を「q = []
」などとして
配列に変えてみるとthread間の同期が取れず,正しく動かないこと
が分かるでしょう.
Queueのメソッドは以下の通りです.
Queue.new
新しいQueueを生成します.
Queue.empty?
Queueが空の時真を返します.
Queue.push value
Queueにvalueを追加します.
Queue.pop [non_block]
Queueからデータを取り出します.偽でない引数non_blockが与 えられた場合にはQueueが空の時に例外を発生させます.それ以 外の場合にはQueueが空の時にはQueueにデータが追加されるま で読み出したthreadを停止させます.
並列プログラミングの世界では昔から有名な「哲学者の食事」問題 を作ってみましょう.
「哲学者の食事」問題とは以下のような状況で哲学者がどうやって 同期をとるかという問題です.
N人の哲学者が丸いテーブルに座っています.テーブルの真中に は大きなスパゲティの皿が置いてあります.またN本のフォーク があって哲学者と哲学者の席の間に置いてあります.哲学者は思 索を続けていますが,お腹がすくと両側のフォークを取ってスパ ゲティを食べます.お腹が一杯になると食べるのを止めてフォー クを返します.哲学者は紳士ですから,お腹が空いていても両方 のフォークが手に入るまでは待ちます.
このプログラムを実行すると現在の状態を次々と表示します.各文 字の意味は以下の通りです.
哲学者が考えている時間と食事している時間は乱数で決めています.
1 # 2 # The Dining Philosophers - thread example 3 # 4 require "thread" 5 6 N=7 # number of philosophers 7 $forks = [] 8 for i in 0..N-1 9 $forks[i] = Mutex.new 10 end 11 $state = "-o"*N 12 13 def wait 14 sleep rand(20)/10.0 15 end 16 17 def think(n) 18 wait(); 19 end 20 21 def eat(n) 22 wait(); 23 end 24 25 def philosopher(n) 26 while TRUE 27 think n 28 $forks[n].lock 29 if not $forks[(n+1)%N].try_lock 30 $forks[n].unlock # avoid deadlock 31 next 32 end 33 $state[n*2] = ?|; 34 $state[(n+1)%N*2] = ?|; 35 $state[n*2+1] = ?*; 36 print $state, "\n" 37 eat(n) 38 $state[n*2] = ?-; 39 $state[(n+1)%N*2] = ?-; 40 $state[n*2+1] = ?o; 41 print $state, "\n" 42 $forks[n].unlock 43 $forks[(n+1)%N].unlock 44 end 45 end 46 47 for i in 0..N-1 48 Thread.start{philosopher(i)} 49 sleep 0.1 50 end 51 sleep 52 exit