Welcome to underground

index

MultiprocessingとMultithreading?

ver2.6から追加された multiprocessingモジュールを使って並列処理させる. threadingを使う方法もあるが,クラス継承しないと行けないとか ちょっと遅いとか上手くCPU使えてないとかちょっと不満があるので,multiprocessingを使って話をする. これはthreadingと似たようなAPIを提供するのだが,GILを効率的に回避する.(theadではなくsubprocessを使う)

何はともあれサンプルを示す.単純にカウントダウンするという面白みもなんともないもの. 1Gをカウントダウンする.この場合4スレッドに分散させるので250Mをカウントしていることになるが….
threading01.py

import threading
import time


def countDown(n):
    while n > 0:
        n -= 1

class TestThread(threading.Thread):
    def __init__(self, n):
        threading.Thread.__init__(self)
        self._n = n
    def run(self):
        countDown(self._n)


n = int(1e9)
n1, n2, n3, n4 = int(n/4), int(n/4), int(n/4), int(n/4)


jobs = [TestThread(n1), TestThread(n2), TestThread(n3), TestThread(n4)]

start_time = time.time()
for j in jobs:
    j.start()

for j in jobs:
    j.join()
finish_time = time.time()
print(finish_time - start_time)

やることは単純で,threading.threadを継承するクラスを作ってrunメソッドをoverwrite(上書き)するだけ. で,あとはインスタンスを作ってstartとしてjoinする.

multiprocessing

次.multiprocessingを使った場合.こちらはクラスを継承する必要がなく,簡単にスレッドを実装できる. 上と同じでカウントダウンをさせましょうか.
multiprocessing01.py

from multiprocessing import Process
import time

def countDown(n):
    while n > 0:
        n -= 1


n = int(1e9)
n1, n2, n3, n4 = int(n/4), int(n/4), int(n/4), int(n/4)

jobs = [
    Process(target=countDown, args=(n1,)),
    Process(target=countDown, args=(n2,)),
    Process(target=countDown, args=(n3,)),
    Process(target=countDown, args=(n4,)),
    ]

start_time = time.time()
for j in jobs:
    j.start()

for j in jobs:
    j.join()

finish_time = time.time()
print(finish_time - start_time)

こちらのほうが簡単だね.関数を作ってProcessに投げる.後はstartしてjoinするだけ. 実行すると気づくと思うが,multiprocessingはちゃんと子プロセスを作る. topやらで見ると確かにProcessの数だけpythonが動いている.
一方でthreadingの方はそうではない.プロセスが一つだけ動くだけ. これや実行速度やCPU占有時間にも現れている. threadingは一つプロセスの中で複数のスレッドを立てて実行しているのに対して, multiprocessingは作ったスレッド分だけ子プロセスを作ってそれを制御する. (GILが大きなネックになっている証拠?) という事で,今回はmultiprocessingを中心に書く.

もう少しmultiprocessingについて

一方のプロセスでデータを取って片方で処理する場合,データ取り終わって処理開始させるよりはデータは連続して取って処理は別に動かすというのが普通である. だがしかし,これが結構難しい. threadingは一つのプロセスだけで処理するから通信が楽であるが,独立したプロセスを作ってそれの通信と言うのは難しい. C++での実装の際にも問題になる(らしい)のだが,どうやって解決するか?
いくつかの方法が供給されている.

  1. shared memoryを使う
  2. server processを使う

1は一般的な共有メモリを使うという方法.型は,arrayモジュールの書式に従う(ctypeと言うべきか). 大きく分けて2つあり,ArrayとValueである. 公式サンプル (ctypeについてはいつか…いつの日にか….cのsharedが使えるようにする,早い話)

2は上記ctypeでは記述が難しいlist,dictionary, Namespace, Lock, etcを扱える. サーバとクライアントの仕組みを使ってpythonオブジェクトプロキシでやり取りする。
multiproc_manager01.py

#!/usr/bin/env python
#coding:utf-8

from multiprocessing import Process, Manager

def func(l, d):
    l.append(1)
    d.update({2:"hello"})
    d[10] = "world"

if __name__ == "__main__":
    manager = Manager()
    l = manager.list(range(2))
    d = manager.dict()

    p = Process(target=func, args=(l,d))
    p.start()
    p.join()

    print(l)
    print(d)
$ python multiproc01.py
[0, 1, 1]
{2:"hello", 10:"world"}

と通信が出来る.これ以上は骨が折れるので,簡単なサンプルを示す.
dstat-modoki.py

#!/usr/bin/env python
#coding:utf-8

import time
from multiprocessing import Process, Manager, Array


def getCPUInfo():
    
    with open("/proc/stat") as f:
        lines = [[float(elm) for elm in v.split()[1:]] 
                 for v in f.readlines() if v[:3] == "cpu"]
    return lines

    
def getCPUUsage(interval=1.):

    s1 = getCPUInfo()
    time.sleep(interval)
    s2 = getCPUInfo()

    num_of_cpu = len(s1) - 1
    cpu_total = [map(lambda x,y:(y-x)/num_of_cpu/interval/100. , s1.pop(0), s2.pop(0))]

    return cpu_total + [[(s2[i][j] - s1[i][j])/interval/100. for j,eml in enumerate(v)] \
                            for i,v in enumerate(s1)]




def getMEMInfo():
    
    with open("/proc/meminfo") as f:
        lines = [ v.strip().split(":") for v in f.readlines()]
    
    mem_dict = {}
    for l in lines:
        mem_dict[l[0]] = float(l[1].strip().split()[0])
    return mem_dict



def getMEMUsage():
    memory_usage = getMEMInfo()
    return ((memory_usage["MemFree"] + memory_usage["Buffers"] + memory_usage["Cached"]), \
                memory_usage["MemTotal"])


def getLoadAvgInfo():
    with open("/proc/loadavg") as f:
        return [float(v) for v in f.read().split()[:3]]



def getNETInfo():
    """
    Inter -| Receive   | Transmit
    face | bytes packets errs drop fifo frame compressed multicast| bytes ....
    lo: 0 0 0 ...
    """
    with open("/proc/net/dev") as f:
        lines = [v.split(":") for v in f.readlines()[2:]]

    net_dict = {}
    for l in lines:
        net_dict[l[0].strip()] = [int(v) for v in l[1].split()]
    return net_dict



def getNetUsage(interval=1., total=False):
    """
    dictionary of list
    receive's bytes, packets, errs, drop, fifo, compressed, multicast, 
    transmit's bytes, packets, errs, drop, fifo, compressed, multicast

    total 16 elements
    """
    net_dict1 = getNETInfo()
    time.sleep(interval)
    net_dict2 = getNETInfo()
    
    net_usage = {}
    for k,v in net_dict1.items():
        net_usage[k] = [0.]*16
        for i, elm in enumerate(v):
            net_usage[k][i] = (net_dict2[k][i] - elm)/interval

    if total:
        return (net_usage, net_dict2)
    else:
        return net_usage



def getCPU(l, interval=1):
    l += getCPUUsage(interval)

def getMEM(arr,):
    arr[0], arr[1] = getMEMUsage()

def getNET(d, interval=1):
    d.update(**getNetUsage(interval))

def getCPUStr(arr, progbar=False, bar="|"):
    if progbar:
        return "{0:6.1%} [{1:10s}]".format(arr[0][0], bar*int(arr[0][0]*10))
    else:
        return "C:{0:6.1%}".format(arr[0][0])

def getMEMStr(arr):
    return "M:{0:4.1f}/{1:4.1f}GB".format((arr[1] - arr[0])/1024/1024, arr[1]/1024/1024)

def getNETStr(dic, key):
    return "D:{0:7.3f}MB U:{1:7.3f}MB".format(dic[key][0]/1024/1024, dic[key][8]/1024/1024)



if __name__ == '__main__':

    ## ネットワークデバイス名を指定.eth0とかeth1とか
    ## intervalで更新時間を表示できる
    net_dev = "eth1"
    interval = 1.

    manager = Manager()
    cpu_usage = manager.list()
    mem_usage = Array("d", (0., 0.))
    net_usage = manager.dict()


    jobs = [Process(target=getCPU, args=(cpu_usage, interval)),
            Process(target=getMEM, args=(mem_usage, )),
            Process(target=getNET, args=(net_usage, interval))]

    for j in jobs:
        j.start()

    for j in jobs:
        j.join()


    
    message_cpu = getCPUStr(cpu_usage, False)
    message_net = getNETStr(net_usage, net_dev)
    message_mem = getMEMStr(mem_usage)


    print("{0} {1} {2}".format(message_cpu, message_net, message_mem))

dstatの代わり.tmuxのために作った.
CPU使用量(/proc/stat),物理メモリ(/proc/meminfo),ネットワーク(/proc/net/dev),ロードアベレージ(/proc/loadavg)を並列して取得する.
(実際はロードアベレージ以外を取得する)
並行でやることもないようなものだが,サンプルとしては丁度よいか.

中味はただパースして数値に変換しているだけ.難しい処理は行なっていない. できるだけワンラインで書けるように,処理を細かい処理で構成するようにした. 各関数で処理した値は,Managerクラスを使って渡している. 何が何であるのかはっきりしているのであればArrayを使っても構わない.

Queue/Pipe

次にQueue,Pipeを使っての処理.Queueはいわゆる生産者/消費者モデルの簡単な例である. Pipeも似たものであるが,双方向の通信が可能.Queueは一方通行という感じか.
例のごとく公式のサンプルを張り付けると,

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

別プロセスを走らせる.そのプロセス内でQueueインスタンスのputメソッドを呼び出してデータ[42, None, 'hello']を"put"する. メインのプロセスでgetメソッドを使ってそれを取り出す.
時間のかかる処理やかかる時間がまちまちな処理を走らせデータをputする. queueに置かれたデータをもう一方のプロセスで処理する.例えばファイルやDBに書き出す様な. そういった用途に使える.

Pipeであるが,Queueの発展のようなもので相互でのやり取りが可能である.

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv()   # prints "[42, None, 'hello']"
    p.join()

multiproc_pipe01.py

# 上のサンプルが物足りないので通信
# threadingのLockも利用して表示のロックを図る
# これをしないとスレッドが別々に非同期に動いてしまう.


from multiprocessing import Pipe, Process, Lock
import time
import sys



def func(conn, lc):
    conn.send("<< hello")

    lc.acquire()
    while True:
        if conn.poll():
            break
        else:
            sys.stdout.write(".")
            sys.stdout.flush()
        time.sleep(1.)
    sys.stdout.write("\n")
    print(conn.recv())
    lc.release()

    conn.close()



if __name__ == '__main__':
    lc = Lock()
    parent_conn, child_conn = Pipe()
    p = Process(target=func, args=(child_conn, lc))
    
    p.start()

    ## 画面上に表示させるまでスレッドをロック
    lc.acquire()
    print(parent_conn.recv())
    lc.release()  # ロック解除


    time.sleep(3.)
    parent_conn.send(">> hi! what's up?")


    p.join()  

Pipeでは子プロセスを殺す時,Queueよりは簡単に組める.
multiproc_pipe02.py

from multiprocessing import Pipe, Process
import time
import datetime
import random
import math


def genRandom(conn):

    while True:
        if conn.poll():
            if conn.recv() is None:
                conn.close()
                break
        else:
            time.sleep(random.randint(1,15)*0.01)
            conn.send(random.normalvariate(10, 0.6))
    print("connection closed")
    conn.close()


def cleanUp(arr):
    return filter(lambda x: x is not None, arr)


def getMean(arr):
    return 1.*sum(arr)/len(arr)


def getStd(arr):
    m = getMean(arr)
    return math.sqrt(sum(map(lambda x: (x - m)*(x - m), arr))/len(arr))


if __name__ == '__main__':

    ## create Pipe instance and Process instance
    ## Pipe returns two instance; parent and child
    ##     child will run in subprocess
    ##     parent will manage child
    parent_conn, child_conn = Pipe()
    p = Process(target=genRandom, args=(child_conn, ))
    p.daemon = True

    ## check current date
    start_time = datetime.datetime.now()
    p.start()  # process start


    timeinterval = 10     # sec
    timeinterval = datetime.timedelta(seconds=timeinterval)
    sample_size = 1e2

    i = 0
    data_array = [None] * int(sample_size)
    while i < int(sample_size):
        print(p.is_alive())
        current_time = datetime.datetime.now()
        if (current_time - start_time) > timeinterval:   # elapsed setting value, break loop
            print("time out!")
            break
        else:
            res = parent_conn.recv()
            data_array[i] = res
            if i % 10 == 0 and i != 0:
                print("{0:%x %X}: {1:.2f}".format(datetime.datetime.now(), res))

        i += 1
    
    ## at first send "Signal" to child
    parent_conn.send(None)
    p.join()   # join Process

    data_array = cleanUp(data_array)

    print("total event: {0}".format(i))
    print("mean: {0}, std: {1}".format(getMean(data_array), getStd(data_array)))
    print("done")

子プロセスはランダム秒待って乱数を親プロセスに送る. 親プロセスはひたすらそれを集める.
もし子プロセスが一定時間経過した,あるいは一定数のイベント集め終わったならば 子プロセスを終了し集めたデータの平均値と偏差を出力する.
子プロセスを親は直接殺さない. 終了のシグナルとしてNoneを子プロセスに送り,子プロセスは読み取るべきデータがあり(pollメソッドで判断) かつそれがNoneで有るときにプロセスを終了する.
子プロセスが終了するとProcessインスタンスが勝手にterminateするのでjoin出来る. 後は後処理で,平均値・偏差を求めて出力する.

恐らく,server/clientモデルで作った方がプロセスの終了などを管理しやすくなると思うのだが… Pipeを使ってもまぁまぁなものが作れるので,それはそれで便利かと思われる.

append

例のエキスパートPythonプログラミングによると, multiprocessingはthreadingのラップではなくos.fork+subprocessの組み合わせのようなものらしい…. プロセスが作られた瞬間自動的にforkしてプロセスを走らせるとのこと. また, 共有メモリスペースを提供するmultiprocessing.Arrayとmultiprocessing.Valueクラスは極力使わないほうが良い (pp365)と書かれている. 並列化のボトルネックやコードの複雑化を引き起こすから,だそうだ. もしデータをやりとりしたければ,ManagerのValue, dictやlist,あるいはctypeを使うべきだ,という事だろう.
(multiprocessingのValueではなく,managerクラスのValueやArrayを使うべきである.後者はProxyオブジェクトなので 安全に共有メモリにアクセスでき,必要であればロックを掛けることが出来る.)

実は最後のサンプルには落とし穴があった.処理に時間がかかる場合,L59のwhileループで詰まる. ここではサンプルサイズと経過時間をチェックしているのだが,elseの中つまり処理に時間を取られるとここで滞ってしまう. 従って目的の時間を大幅に過ぎてからループを抜ける可能性があるのだ.
そこでこれを回避するために,もうひとつ処理専門のプロセスを作りデータを投げてやるのだ.
それがこれ.multiproc_pipe03.py (要pyROOT)

メインのスクリプトでは,単にプロセスの管理だけを行なっている. 一定時間経過したあるいは一定のイベント数に到達したかを判断し,それに応じてプロセスに司令を与える.

で,コレで何が出来るかというと仕事の分担が出来る. 要するにデータ収集とデータ処理を並列しておこなうとき楽なのだ. 処理に時間がかかる場合,1つのスレッドで仕事をさせる場合DAQにデッドタイムが生じてしまう. 結局そんなシビアな条件を課すんだったら,Cppで書いちゃうけどね….