Python NetWorkSpaces(NWS)と並列プログラミング

この文章について

オリジナルは ここ 。これを自分(kawasaq)のために適当にメモしたものです。 内容については責任をもちませんので、あしからず。 でも、もし訳が間違ってるのに気づいたら教えてください。 また、もっとわかりやすい説明があったら教えてください。

ちなみに、 NWSはここからダウンロードできます

NetWorkSpaces を使うと、並列プログラムが簡単に作れる

NetWorkSpaces というのは、並列プログラムを簡単に書くためのライブラリ。 いわゆる「ネームスペース」をシェアする形で、複数のプログラム間でデータをやりとりする。

たとえば、こんなふうに。

ws.store('x', ws.fetch('y'))
ws.x = ws.y

この2行は同じだ。 ws というのがネームスペースを表す識別子で、 そのネームスペース上の y に関連付けられているデータを x に関連付ける。

もうちょっと詳しく書くと、たとえば Python だったらこんな感じ。

>>> from nws.client import NetWorkSpace
>>> ws=NetWorkSpace('test')
>>> l=['a','b','c']
>>> t=(1,2,3)
>>> d={'list':l, 'tuple':t}
>>> ws.store('dict example', d)
>>> ws.fetch('dict example')
{'list': ['a', 'b', 'c'], 'tuple': (1, 2, 3)}

NetWorkSpaces というのは、別に Python だけのものってわけじゃなくて、 たとえば R でも使える。特にデータがASCII文字だったら、次の例のように、言語が違うプログラムでデータをやりとりできる。

Python で書いた例

>>> from nws.client import NetWorkSpace
>>> ws=NetWorkSpace('tickets')
>>> ws.store('ticket', 'ticket string')

R で書いた例

> library(nws)
> ws<-netWorkSpace('tickets')
> nwsFetch(ws, 'ticket')

[1] "ticket string"

ところで、最初の例で示したws.x, ws.y などは、代入で上書きされる普通の変数ではなく FIFOのキューだ。ws.y というキューにデータが入っていないときに、ws.fetch("y")を実行すると、単に待たされる。 これを理解するためには、次のようなコードを試してみるとよい。

"worker"として

from nws.client import NetWorkSpace
def f(x): return x*x*x
ws = NetWorkSpace('table')
while True:
  ws.store('r', f(ws.fetch('x')))

というプログラムを動かして、同時に"Master"として

ws = NetWorkSpace('table')
for x in range(100): ws.store('x', x)
for x in range(100): print 'f(%d) = %d'%(x, ws.fetch('r'))

というプログラムを動かす。 すると、1ずつ増えていく数とその数を3乗した数の対のリストが得られるだろう。

こんなふうに、workerとmasterは別々に動く。workerが複数いるときは、 workerのプログラムを若干書きなおさなくてはいけないけれど、masterの方は同じでよい。 ああ、なんてフレキシブル。

次に、y = f(x) の最大値を求めるために、複数のプログラムが x の候補となるリストを持っている場合を考える。 たとえば、こんなプログラムを書いて並列で実行するとどうなるだろう。

for x in MyCandidateList:
    currentMax = ws.fetch('max')
    y = f(x)
    if y > currentMax: ws.store('max', y)

これはもちろん、誤ったプログラムだ。fetch したままで値を store しない場合がある。 こんなときのために、キューを変更しないで値だけ見るための関数 find が用意されている。

じゃぁ find を使ってこんなプログラムを書いたらどうだろう。

for x in MyCandidateList:
    currentMax = ws.find('max')
    y = f(x)
    if y > currentMax: ws.store('max', y)

これも誤りだ。storeされるばっかりでキューが長くなって、find で参照される値が最新のものでなくなってしまう。 というわけで、こういうふうに書くのが正しい。

for x in MyCandidateList:
    currentMax = ws.find('max')
    y = f(x)
    if y > currentMax:
        currentMax = ws.fetch('max')
        if y > currentMax: currentMax = y
        ws.store('max', currentMax)

一見冗長に見えるかもしれないけれど、これが正しい。fetch と store が対になっているので、原子性も保証される。 currentMax は毎回 find を使わなくても、10回に1回 find したり、あるいは最初に適当な値を入れておいて、 1つ目のif文が成立したときだけに書き換えるようにしてもうまくいく。

ところで、このようにキューを破壊しないでデータを参照する find の使い道として、 最初に初期化として値を与えておいて、あとはそれを定数として参照するだけ、という使い方がある。

その他、NWS では次のようなモデルをサポートしている

  • FIFO (default)
  • LIFO
  • 非決定的(ランダムではなくて、任意に)
  • Single (ただ1つの値を保持して、store関数はその値を上書きする)

Singleというモデルは、たとえばステータスを渡すときに役に立つ。いちいち以前の値をfetchして消さなくていいしね。 サンプルコードはこんな感じ。

from nws.client import SINGLE
ws.declare('status', SINGLE)
while True:
  time.sleep(60)
  status=...
  ws.store('status', status)

NWS Iterator

NWS はデータをキューとして格納するんだけれども、イテレータとして使うこともできる。 ifind という関数がそれだ。対応づけられている値を順に返す。たとえば次のようなコードを考えてみる。

for v in range(5): ws.store('v', v)
for v in ws.ifind('v'): print v

このコードは、2つめのループで値を5つ表示したところで止まる。 ifind関数が次の値を待つからだ。だから、別のプロセスで値がstoreされると、このプロセスは再び動き出す。 このプロセスを止めるには、

ws.deleteVar('v')

を実行する。

他にもこれを実現する方法はある。たとえば ifindTry という関数は、値を一通り返し終わったら終了する。 ほかにも ifetch や ifetchTry という関数があって、これはキューを破壊しながらデータにアクセスする以外は、 それぞれ ifind や ifindTry と同じだ。 NWS をイテレータとして使うには、FIFOかSingleモードの変数でなければならない。

Managing WorkSpaces

NWS は複数のワークスペースを管理することができて、コンストラクタに最初の引数として、 ワークスペースの名前を渡すことになっている。ホストサーバを特定するために、ホスト名とポート番号を渡すこともできる。 (っていうか、こういう仕組みは絶対必要だよね)たとえばこんなふうに。

ws = NetWorkSpace('example', serverHost='myhost.mycorp.com')

こうすれば、複数のプロセスで同じmyhost.mycorp.com の標準ポートで管理されるワークスペースを使うことができる。

でも、ワークスペースは誰のものだろう。それは、普通、最初にワークスペースのことをサーバに言及したプロセスのものだ。 これはクリーンアップするときに重要になる。ワークスペースとその中のデータは、永続的で、 しっかりと消去されるまで残る。実際、これはややこしいので、デフォルトではデータは一時的なものだ。 ワークスペースの持ち主のプロセスが終了したら、ワークスペースもなくなる。 でも、永続性は便利なときもあるので、コンストラクタで設定すれば永続性を持たせることもできる。

ようするに、普通は、最初にワークスペースを使ったプロセスがワークスペースの持ち主。 他のプロセスはワークスペースを勝手に起動して、勝手に使って、勝手に逝ってよし。 そんで、持ち主のプロセスが死んだら、ワークスペースもなくなる。

でもまぁ、ワークスペースのオーナーにならないで使いたいってこともある。 そんなときは、コンストラクタで useUse=True とすればよい。こんなふうに。

ws = NetWorkSpace('not mine', useUse=True)

ところで、このワークスペースの名前がぶつかっちゃったらどうするの?って心配するかもしれないけれど、大丈夫。 NWS には mkdtempWs っていうメソッドがあって、これはLinux の mkdtemp がユニークな名前のディレクトリを作るみたいに、 サーバーの中でぶつからないワークスペースの名前を返してくれるんだ。

The Web Interface

NWS には、なんとウェブのインターフェースもある。 そのサーバーにあるワークスペースの一覧と、それぞれのワークスペースに含まれる変数の値を確認できる。 また、これらを削除したり変更したりもできる。

それぞれの変数の値は、シリアライズされていて一見しただけではよく分からないけれど、 Babelfish を使うと、それらをデシリアライズ(シリアライズの逆変換)して表示してくれる。 Babelfishっていうのは、データを変換して返すだけの、ただのクライアントだ。

これを使うと、全体のシステムの初期値をブラウザから与えて実行するってこともできたりするよ。 スレイ(Sleigh:馬車の意味)を使えば、の話だけどね。

Sleigh

Sleigh を使えば、たくさんのプロセスの管理を簡単にやることができる。 これを使えば、master/worker スタイルの管理を、直接NWS を触らなくてできるようになる。

Sleigh はデフォルトで、ワーカーをローカルマシンに3つ作る。 作られたワーカーは作業を渡されるのを待っている状態になっている。それは、こんなふうに書く。

>>> from nws.sleigh import Sleigh
>>> s = Sleigh()

eachWorkerというメソッドは、引数で与えられた関数をそれぞれのワーカーに対して実行して、 その結果をリストにして返す。こんなふうに。

>>> from socket import gethostname
>>> s.eachWorker(gethostname)
['newt', 'newt', 'newt']

gethostname という関数は、ワーカーが動いているホスト名を返す。 つまり作られたワーカーは、'newt'という名前のマシンで動いていることを表している。

ワーカーを1台のマシンでいくつも動かすのは、デバッグの時には有効だけれど、 普通は複数のマシンで1つずつ動かしたいと思うだろう。そんなときはこうする。

>>> s.stop()
>>> from nws.sleigh import sshcmd
>>> s = Sleigh(nodeList=['hippo', 'newt', 'python', 'rhino'], launch=sshcmd)
>>> s.eachWorker(gethostname)
['rhino', 'hippo', 'newt', 'python']

まず、最初の行で、今まで動いていたワーカーを止める。 そして3行めで、新しいワーカーを4台のホストで1つずつ動かして、それぞれsshで連携させている。 これは一応、ユーザがパスワード無しでログインできるように、うまく設定されたsshサーバを持ってることが前提だ。

eachWorkerは主に初期化に使って、実際に計算するときには、eachElemを使う。 これは関数とリストを引数に要求する関数だ。与えられた引数の各要素を、与えられた関数で評価して、 その戻り値をリストにまとめて返してくれる。 たとえば前に紹介した、値の3乗のリストを求める計算は、こんなふうに書く。

>>> r = s.eachElem(lambda x: x*x*x, range(100))
>>> len(r)
100
>>> r[2:5]
[8, 27, 64]

eachElemに渡すリストは、当然、リストのリストとかでもOKだ。

デフォルトでは、eachElemメソッドは結果のリストが完成するまで値を返さない。 しかし、オプションを設定すれば、eachElem関数が結果のリストではなく、 SleighPendingオブジェクトを返させることもできる。そうすればeachElemはブロックしない。 SleighPendingオブジェクトは計算結果を問い合わせて、最終結果を得るためのメソッドを持っている。

Put It All Together

今まで紹介したことをまとめて、素数を見つけるプログラムを作る。 素数を見つけるアルゴリズムは、その数の平方根以下の全ての素数で割ってみて、 全てで割り切れなかったら素数に追加する、という方式だ。 それを書くと、こんな風になる。

import sys
from nws.sleigh import Sleigh

def initPrimes():
  global chunk, chunks, limit
  limit, chunk = SleighUserNws.find('prime parameters')
  chunks = {}

def findPrimes(first):
  last = min(first+chunk, limit)
  # we need to know all the primes up to the smaller of the start of
  # this chunk or the square root of its last element.
  need = min(first-2, int(.5+(last-1)**.5))

  myPrimes = []

  for c in xrange(3, need+1, chunk):
    if not c in chunks: chunks[c] = SleighUserNws.find('primes%d'%c)
    myPrimes += chunks[c]
    newx = len(myPrimes)
    for x in xrange(first, last, 2):
      for p in myPrimes:
        if x%p == 0: break
      else: myPrimes.append(x)
  if first**2 < limit: SleighUserNws.store('primes%d'%first, myPrimes[newx:])

  return myPrimes[newx:]

def master(workers, limit, chunk):
  s = Sleigh(workerCount=workers)
  s.userNws.store('prime parameters', (limit, chunk))
  s.eachWorker(initPrimes)
  primes = [2]
  map(primes.extend, s.eachElem(findPrimes, range(3, limit, chunk)))

  return primes

if __name__=='__main__':
  primes = master(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3]))
  print len(primes), primes[:3], '...', primes[-3:]

まず、マスターがSleighのインスタンスを作って、eachWorkerを使ってワーカーを初期化する。 それからタスクのリストを作って eachElemで計算を始める。2つのパラメータがNWSに固定されて、 eachWorker経由で参照される。それぞれのタスクは、素数かどうか調べる領域の最初の数で表現されている。 素数かどうか調べるときに必要な素数のリストに、ワーカーが知らないものがあったら、 ワーカーはfind関数を呼び出して調べる。ただ、ワーカーはそれぞれ自分が一度調べたものは記憶しておいて、 find関数は呼び出さない。ワーカーはSleighUserNwsというNWSを使っているが、 これは名前がぶつからないようにSleighが作ったネームスペースだ。

Conclusion

これのすごいところは、並列プログラムをツールとかハードウエアなしに実現してしまうところ。 マルチコアのCPUやたくさんのコンピュータがある環境でやるとよい。 また、計算と中にネームスペースのデータをWeb経由で参照できるので、デバッグがやりやすい。 しかも言語に依存しないので、いろんなプログラム言語で動かすことができる。

kawasaqの追記。 適当翻訳なので、わたし自身よくわかってないところもあります。 自分用のメモとして書きました。