concurrent.futures — 並行処理 in Python
概要
Global Interpreter Lock (GIL) の制約により、
1つのPythonインタープリタでは同時に1つのスレッドしかコードを実行できない。
したがってCPUバウンドなピュアPythonコードを
threading
でマルチスレッド化しても速くならない。
subprocess
による外部プログラム実行やI/OなどGIL外の処理を待つ場合には有効。
一方 multiprocessing
は新しいインタプリタを os.fork()
で立ち上げるので、
CPUバウンドなPythonコードもGILに邪魔されず並列処理できる。
ただし通信のため関数や返り値がpicklableでなければならない。
それらの低級ライブラリを使いやすくまとめたのが
concurrent.futures
(since 3.2) なので、とりあえずこれを使えばよい。
新しい asyncio
(since 3.4) は勝手が違いすぎてとっつきにくい。
並列化対象の関数の例:
import time
import random
def target_func(x):
time.sleep(random.uniform(0, 1))
return x + 1
concurrent.futures
import os
import concurrent.futures as confu
# 呼び出し順に拾う
with confu.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
futures = [executor.submit(target_func, x) for x in range(8)]
(done, notdone) = confu.wait(futures)
for future in futures:
print(future.result())
# 終わったやつから拾う
with confu.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
futures = [executor.submit(target_func, x) for x in range(8)]
for future in confu.as_completed(futures):
print(future.result())
デフォルトの max_workers=None
では 5 * os.cpu_count()
になるらしい。
multiprocessing
由来の ProcessPoolExecutor
も使い方は同じ。
こちらは os.cpu_count()
がデフォルト。
threading
target
を指定して作った Thread
インスタンスで start()
するのが基本。
import threading
threads = []
for i in range(8):
th = threading.Thread(target=target_func, args=[i])
th.start()
threads.append(th)
for th in threads:
th.join()
返り値を得たい場合などはクラスを継承していじる必要がある。
その場合は必ず run()
メソッドをoverrideする。
class Worker(threading.Thread):
def __init__(self, target, name=None, args=(), kwargs={}):
threading.Thread.__init__(self, None, target, name, args, kwargs)
self._return = None
def run(self):
self._return = self._target(*self._args, **self._kwargs)
def get(self, timeout=None):
self.join(timeout)
return self._return
threads = []
for i in range(8):
th = Worker(target=target_func, args=[i])
th.start()
threads.append(th)
for th in threads:
print(th.get())
スレッド数の上限値を設けたい場合は
threading.Semaphore
でうまくロックしてやる必要がある。
multiprocessing
threading.Thread
とほぼ同じインターフェイスの
Process
クラスも用意されているが、
Pool
を使ったほうが楽チン。
import multiprocessing as mp
with mp.Pool(processes=mp.cpu_count()) as pool:
results = [pool.apply_async(target_func, [x]) for x in range(8)]
for res in results:
print(res.get())
mp.cpu_count()
このためだけに multiprocessing
をimportするのは億劫だったが、
3.4で os.cpu_count()
が追加された。
Hyper-Threading (HT)が有効な場合は論理コア数が返ってくることに注意。
CPUを100%使い続ける数値計算とかだとそんなに並列化しても早くならない。
物理コア数を取得したい場合は
psutil
の psutil.cpu_count(logical=False)
を使う。
標準ライブラリではないが、広く使われてるらしい。