Pythonでマルチプロセス処理を書いていて、サブプロセス内で同期的に変数を更新する(=状態を持つ)必要が出てきましたので試してみました。
公式ドキュメントを見てみると、マルチプロセスで状態を管理する方法は大きく二つあるようです。
共有メモリ(Shared Memory)
特徴
- メインプロセス内の共有メモリで変数を保持する
- 変数の型(入れ物)としては、
Value
,Array
のみが提供されている-
Value
は一つのデータの入れ物、Array
は複数のデータの入れ物 - 中に突っ込める型は、arrayモジュールで利用できる型・cypesの型のみ
- => 型の制約があり
-
-
concurrent.futures.ProcessPoolExecutor
では利用できない
実装サンプル
arrayモジュールで使える型(今回はint)を利用する実装サンプルはこんな感じです。
並列処理で、それぞれ数値を二乗して共有メモリ上の変数に格納するような例です。
from multiprocessing import Value, Array, Process
from typing import List
def parallel_func(number: int, squared_nums: Array, squared_nums_total: Value):
squared_num: int = number * number
squared_nums[number] = squared_num
squared_nums_total.value += squared_num
def main():
nums = [0, 1, 2, 3, 4, 5]
squared_nums: Array = Array('i', len(nums))
squared_nums_total: Value = Value('i', 0)
processes: List[Process] = []
for num in nums:
p: Process = Process(target=parallel_func, args=(num, squared_nums, squared_nums_total))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f'squared_nums: {squared_nums[:]}')
print(f'squared_nums_total: {squared_nums_total.value}')
if __name__ == "__main__":
main()
実行結果はこんな感じになります。
squared_nums: [0, 1, 4, 9, 16, 25]
squared_nums_total: 55
実装しててちょっと注意が必要だったポイントは以下です。
-
multiprocessing.Array
は標準のarrayモジュールとちがい、必要な関数がほとんど提供されてない。初期化時に必要なサイズを確保する必要がある - ValueやArrayは
lock
引数(デフォルトTrue)を持っているが、これをFalseにすると同期アクセスのためのロックオブジェクトが作成されずプロセスセーフじゃなくなるらしい
ちなみに、ctypesを使う実装サンプルはこちらを参照ください。
トラブル対応
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
concurrent.futures.ProcessPoolExecutor
に対して、共有メモリ上の変数を渡すとこのエラーが発生します。
色々調べたのですが、対処法はなさそうなので、実装サンプルのようにmultiprocessing.Process
を利用することになると思います。こちらはworker数を指定できないので不便です。。
サーバプロセス(Server Process)
特徴
- サーバプロセス(別プロセス)を管理するマネージャオブジェクトを作成し、サーバプロセス内で変数(Pythonオブジェクト)を管理する
- 変数の型(入れ物)としては、
list
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Barrier
,Queue
,Value
,Array
- どのような型のオブジェクトでも扱えるらしい
- 独自クラスとかもOK
- C言語の拡張モジュールやマネージャーオブジェクト自身を含めことはできない(トラブル対応参照)
- とはいえ、共有メモリより柔軟
- どのような型のオブジェクトでも扱えるらしい
- サーバプロセス上の変数にアクセスする際は、プロキシ経由でアクセスする => ネットワーク経由なので共有メモリより遅い
-
concurrent.futures.ProcessPoolExecutor
でも使える
実装サンプル①
list
に独自クラスのオブジェクトを突っ込んで、ProcessPoolExecutor
でマルチプロセスを実行する実装サンプルはこんな感じです。List(ListProxy)は当然可変長だし、独自クラスのオブジェクトも問題なく追加することができました。
from multiprocessing import Manager, Value
from concurrent import futures
from concurrent.futures import Future, ProcessPoolExecutor
from typing import List
import dataclasses
MAX_WORKERS = 3
@dataclasses.dataclass
class SquaredNum:
num: int
squared_num: int
def parallel_func(num: int, squared_nums: List[SquaredNum], squared_nums_total: Value):
squared_num: int = num * num
squared_nums.append(SquaredNum(num, squared_num))
squared_nums_total.value += squared_num
def main():
nums: List[int] = [0, 1, 2, 3, 4, 5]
with Manager() as manager:
squared_nums: List[SquaredNum] = manager.list()
squared_nums_total: Value = manager.Value('i', 0)
future_list: List[Future] = []
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
for num in nums:
future: Future = executor.submit(
parallel_func,
num=num,
squared_nums=squared_nums,
squared_nums_total=squared_nums_total,
)
future_list.append(future)
for future in futures.as_completed(fs=future_list):
future.result()
print(f'squared_nums: {squared_nums}')
print(f'squared_nums_total: {squared_nums_total.value}')
if __name__ == "__main__":
main()
実行結果はこんな感じになります。
squared_nums: [SquaredNum(num=0, squared_num=0), SquaredNum(num=1, squared_num=1), SquaredNum(num=3, squared_num=9), SquaredNum(num=2, squared_num=4), SquaredNum(num=4, squared_num=16), SquaredNum(num=5, squared_num=25)]
squared_nums_total: 55
実装サンプル②
次に、独自クラスのインスタンス変数としてサーバプロセスで管理する変数(Pythonオブジェクト)を管理するケースを実装してみます。
実際のプロジェクトでは、PySparkのクラスターを再利用するためのクラスタープールを作ったのですがそれを超シンプルにした感じです。それでもだいぶ長くなってしまいました...
興味がある人は読み込んでみてください。
from multiprocessing import Manager
from multiprocessing.managers import SyncManager
from concurrent import futures
from concurrent.futures import Future, ProcessPoolExecutor
from typing import List
import time
import os
import random
import string
MAX_WORKERS = 3
class Cluster:
def __init__(self):
self.name = ''.join(random.choices(string.ascii_lowercase, k=5))
def submit_pyspark_job(self, num: int):
squared_num: int = num * num
time.sleep(1) # 本当はここでPySparkのジョブを実行する
print(f'job finished. num: {num}, squared_num: {squared_num}, pid: {os.getpid()}, cluster_name: {self.name}')
return num * num # サンプルなので、numの二乗を返しておく
class ClusterPool:
def __init__(self, manager: SyncManager):
self.clusters = manager.list() # インスタンス変数にサーバプロセスで管理するListを保持
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
self.clusters[:] = [] # withブロックを抜ける時にサーバプロセスのListを空に
def pop_or_create_cluster(self) -> Cluster:
try:
return self.clusters.pop() # popしてサーバプロセスのListから削除
except IndexError:
return Cluster()
def append_cluster(self, cluster: Cluster):
self.clusters.append(cluster) # 最後にサーバプロセスのListに追加
def parallel_func(num: int, cluster_pool: ClusterPool) -> int:
cluster: Cluster = cluster_pool.pop_or_create_cluster()
result: int = cluster.submit_pyspark_job(num)
cluster_pool.append_cluster(cluster)
return result
def main():
nums: List[int] = [0, 1, 2, 3, 4, 5, 6, 7, 8]
with Manager() as manager:
with ClusterPool(manager) as cluster_pool:
future_list: List[Future] = []
squared_nums: List[int] = []
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
for num in nums:
future: Future = executor.submit(
parallel_func,
num=num,
cluster_pool=cluster_pool,
)
future_list.append(future)
for future in futures.as_completed(fs=future_list):
squared_nums.append(future.result())
print(f'squared_nums: {squared_nums}')
if __name__ == "__main__":
main()
ポイントは以下の通りです。
- ClusterPoolクラスのインスタンス変数として、Clusterリスト(サーバプロセスで管理する変数)を保持する
- Clusterを利用する場合は、Clusterリストから
pop()
して要素を取得し、リストからは削除する - Clusterの利用が完了したら、Clusterリストに要素を
append()
する
実行ログは以下のようになりました。
14581
, 14582
, 14583
の三つのプロセスが立ち上がっていて、それらの中で、kvyjv
, uphtx
, asixt
の3つのclusterを使いまわしているのがわかると思います。
job finished. num: 0, squared_num: 0, pid: 14581, cluster_name: kvyjv
job finished. num: 1, squared_num: 1, pid: 14582, cluster_name: uphtx
job finished. num: 2, squared_num: 4, pid: 14583, cluster_name: asixt
job finished. num: 3, squared_num: 9, pid: 14581, cluster_name: kvyjv
job finished. num: 4, squared_num: 16, pid: 14582, cluster_name: asixt
job finished. num: 5, squared_num: 25, pid: 14583, cluster_name: uphtx
job finished. num: 6, squared_num: 36, pid: 14581, cluster_name: kvyjv
job finished. num: 7, squared_num: 49, pid: 14583, cluster_name: uphtx
job finished. num: 8, squared_num: 64, pid: 14582, cluster_name: asixt
squared_nums: [0, 1, 4, 9, 16, 25, 36, 64, 49]
ProcessPoolExecutor
を使って別プロセスで実行する関数parallel_func
の引数として、ClusterPool
のインスタンスを渡しているので、ClusterPool
内のインスタンス変数clusters
(サーバプロセスで管理するPythonオブジェクトへの参照)も別プロセスで実行するタイミングでコピーされて別物になってしまうかもしれないなぁと懸念していたのですが、実際やってみたところ、ちゃんと同一のサーバプロセス上の変数にアクセスしてくれることが分かりました。
トラブル対応
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
with Manager() as manger
のwithブロックの外側、つまりManagerオブジェクトを閉じた後に、サーバプロセス上の変数にアクセスすると上記のエラーが発生します。
Managerオブジェクトが存在する間に、サーバプロセス上の変数を取得しましょう。
サーバプロセス上のPythonオブジェクトを取得して、そのオブジェクト内のインスタンス変数を変更しても、サーバプロセス上のPythonオブジェクトには反映されない
@dataclasses.dataclass
class Num:
num: int
def main():
with Manager() as manager:
nums: List[Num] = manager.list()
nums.append(Num(1))
num_before = nums[0]
print(f'num_before: {num_before.num}')
num_before.num = 9
print(f'num_changed: {num_before.num}')
num_after = nums[0]
print(f'num_after: {num_after.num}')
実行ログはこんな感じです。
num_before: 1
num_changed: 9
num_after: 1
サーバプロセスからNum
オブジェクトを取得して、インスタンス変数のnum
を変更しても、サーバプロセスから再度取得したNum
オブジェクトは変更されていないことが分かります。サーバプロセスからPythonオブジェクトを取得するということは、つまり、オブジェクトをコピーするっていうことなんですね。
最初は、実装サンプル②は、Cluster
クラスにis_available
というインスタンス変数を使って、以下のようにwith
ブロックの入口と出口でTrue/Falseを更新するような作りにしていたのですが、インスタンス変数を変更しても無駄なことがわかったのでやめました。
class Cluster:
def __enter__(self):
self.is_available = False
return self
def __exit__(self, exc_type, exc_value, tb):
self.is_available = True
で、対応方法としては、大きく二つあります。
-
pop()
でサーバから削除しつつ取得 => 何か処理 =>append()
- 一旦サーバプロセス側から対象のオブジェクトを削除されるので、他のプロセスがこのオブジェクトにアクセスできない
- Lock => サーバから取得 => 何か処理 => サーバのオブジェクトを置換 => Lock解除
- Lock中は、他のプロセスはサーバプロセスのオブジェクトにアクセスできない
今回の実装サンプル②は、1.の方法になりますので、ここでは2.の方法についても実装だけ貼り付けておきます。詳細の解説はしませんので、興味がある人はソースコード上のコメントを参照ください。
from multiprocessing import Manager
from multiprocessing.managers import SyncManager
from concurrent import futures
from concurrent.futures import Future, ProcessPoolExecutor
from threading import Lock
from typing import List
import time
import os
import random
import string
MAX_WORKERS = 3
class Cluster:
def __init__(self):
self.name: str = ''.join(random.choices(string.ascii_lowercase, k=5))
self.is_available: bool = False
def __eq__(self, other):
return self.name == other.name
def submit_pyspark_job(self, num: int):
squared_num: int = num * num
time.sleep(1) # 本当はここでPySparkのジョブを実行する
print(f'job finished. num: {num}, squared_num: {squared_num}, pid: {os.getpid()}, cluster_name: {self.name}')
return num * num # サンプルなので、numの二乗を返しておく
class ClusterPool:
def __init__(self, manager: SyncManager):
self.clusters: List[Cluster] = manager.list()
self.lock: Lock = manager.Lock() # サーバプロセスで管理するロックオブジェクトを生成
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
self.clusters[:] = [] # withブロックを抜ける時にサーバプロセスのListを空に
def get_or_create_cluster(self) -> Cluster:
self.lock.acquire(blocking=True) # ここでロック取得。timeoutは指定してないので他のプロセスは無限に待つ
try:
# 利用可能な既存クラスタを探す
for i, existing_cluster in enumerate(self.clusters):
if existing_cluster.is_available:
existing_cluster.is_available = False # 利用可否を更新
self.clusters[i] = existing_cluster # サーバプロセスのリスト内のオブジェクトを置き換え
return existing_cluster
# 新規クラスタを生成する
new_cluster: Cluster = Cluster()
self.clusters.append(new_cluster) # サーバプロセスのリストにオブジェクトを追加
return new_cluster
finally:
self.lock.release()
def release_cluster(self, cluster: Cluster):
cluster.is_available = True
self.clusters[self.clusters.index(cluster)] = cluster
def parallel_func(num: int, cluster_pool: ClusterPool) -> int:
cluster: Cluster = cluster_pool.get_or_create_cluster() # クラスタ作成 or クラスタプールから取得
result: int = cluster.submit_pyspark_job(num)
cluster_pool.release_cluster(cluster) # クラスタプールにクラスタを返す
return result
def main():
nums: List[int] = [0, 1, 2, 3, 4, 5, 6, 7, 8]
with Manager() as manager:
with ClusterPool(manager) as cluster_pool:
future_list: List[Future] = []
squared_nums: List[int] = []
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
for num in nums:
future: Future = executor.submit(
parallel_func,
num=num,
cluster_pool=cluster_pool,
)
future_list.append(future)
for future in futures.as_completed(fs=future_list):
squared_nums.append(future.result())
print(f'squared_nums: {squared_nums}')
上記の実装も悪くないのですが、Lockの待ち合わせが発生しない分、実装サンプル②が優れていると思ったので、実装サンプル②を採用しました。
TypeError: Pickling an AuthenticationString object is disallowed for security reasons
Managerオブジェクトがマルチプロセスで実行する関数の引数に含まれるとこのエラーが発生します。今回は、ClusterPool
のwith
ブロックを抜ける時にManagerオブジェクトを閉じたかったのですが、そこは諦めました。
class ClusterPool:
def __init__(self):
self.manager = Manager()
self.clusters = self.manager.list()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
self.manager.__exit__(exc_type, exc_value, tb) # ここでManagerを閉じたかった
def main():
with ClusterPool() as cluster_pool:
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
future: Future = executor.submit(
parallel_func,
cluster_pool=cluster_pool,
)
future.result()
最終的には、実装サンプル②のようにClusterPoolの外でManagerオブジェクトを作成して、渡すようにしてあげることで解決しました。
TypeError: cannot pickle 'CompiledFFI' object
サーバプロセスで管理するPythonオブジェクトのインスタンス変数にgoogle.cloud.dataproc_v1.ClusterControllerClient
やgoogle.cloud.storage.Client
などのオブジェクトを含めた場合に発生します。
これは、credential用の暗号を高速計算するためのcryptographyモジュールがC言語の拡張モジュールであり、上記のオブジェクトの中に含まれるようでなのですが、C言語の拡張モジュールはpickleできないという問題があるみたいです。
対処方法としては、Clientオブジェクトを保持するのではなく、それを作成するために必要なクレデンシャルパスやプロジェクト名などをインスタンス変数として保持しておき、Clientオブジェクトが必要になったら毎回作成するようにしました。
if __name__ == "__main__":
の中でマルチプロセス処理を実行しないとエラー
以下のような長文のエラーメッセージが表示されます。
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
以下のような感じで、if __name__ == '__main__':
の中で実行するとエラーは解消されます。
def main():
# マルチプロセス処理
if __name__ == '__main__':
main()
所感
公式ドキュメントに以下のようにあります。
サーバープロセスのマネージャーオブジェクトは共有メモリのオブジェクトよりも柔軟であるといえます。それは、どのような型のオブジェクトでも使えるからです。
なので、サーバプロセスの方はなんの制約もないのかなぁと思っていたのですが、実際使ってみると思いの外いろんなところでエラーが発生して大変でした。。
とはいっても、丸一日格闘すれば大体のことは解決できる感じではありますので、それほど恐れる必要はないかな、と思います。