Pythonでマルチプロセス処理を書いていて、サブプロセス内で同期的に変数を更新する(=状態を持つ)必要が出てきましたので試してみました。
公式ドキュメントを見てみると、マルチプロセスで状態を管理する方法は大きく二つあるようです。

共有メモリ(Shared Memory)

特徴

  • メインプロセス内の共有メモリで変数を保持する
  • 変数の型(入れ物)としては、Value, Arrayのみが提供されている
  • concurrent.futures.ProcessPoolExecutorでは利用できない

実装サンプル

arrayモジュールで使える型(今回はint)を利用する実装サンプルはこんな感じです。
並列処理で、それぞれ数値を二乗して共有メモリ上の変数に格納するような例です。

from multiprocessing import Value, Array, Process
from typing import List

def parallel_func(number: int, squared_nums: Array, squared_nums_total: Value):
    squared_num: int = number * number
    squared_nums[number] = squared_num
    squared_nums_total.value += squared_num

def main():
    nums = [0, 1, 2, 3, 4, 5]
    squared_nums: Array = Array('i', len(nums))
    squared_nums_total: Value = Value('i', 0)

    processes: List[Process] = []
    for num in nums:
        p: Process = Process(target=parallel_func, args=(num, squared_nums, squared_nums_total))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f'squared_nums: {squared_nums[:]}')
    print(f'squared_nums_total: {squared_nums_total.value}')

if __name__ == "__main__":
    main()

実行結果はこんな感じになります。

squared_nums: [0, 1, 4, 9, 16, 25]
squared_nums_total: 55

実装しててちょっと注意が必要だったポイントは以下です。

  • multiprocessing.Arrayは標準のarrayモジュールとちがい、必要な関数がほとんど提供されてない。初期化時に必要なサイズを確保する必要がある
  • ValueやArrayはlock引数(デフォルトTrue)を持っているが、これをFalseにすると同期アクセスのためのロックオブジェクトが作成されずプロセスセーフじゃなくなるらしい

ちなみに、ctypesを使う実装サンプルはこちらを参照ください。

トラブル対応

RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

concurrent.futures.ProcessPoolExecutorに対して、共有メモリ上の変数を渡すとこのエラーが発生します。

色々調べたのですが、対処法はなさそうなので、実装サンプルのようにmultiprocessing.Processを利用することになると思います。こちらはworker数を指定できないので不便です。。

サーバプロセス(Server Process)

特徴

  • サーバプロセス(別プロセス)を管理するマネージャオブジェクトを作成し、サーバプロセス内で変数(Pythonオブジェクト)を管理する
  • 変数の型(入れ物)としては、list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array
    • どのような型のオブジェクトでも扱えるらしい
      • 独自クラスとかもOK
      • C言語の拡張モジュールやマネージャーオブジェクト自身を含めことはできない(トラブル対応参照)
      • とはいえ、共有メモリより柔軟
  • サーバプロセス上の変数にアクセスする際は、プロキシ経由でアクセスする => ネットワーク経由なので共有メモリより遅い
  • concurrent.futures.ProcessPoolExecutorでも使える

実装サンプル①

listに独自クラスのオブジェクトを突っ込んで、ProcessPoolExecutorでマルチプロセスを実行する実装サンプルはこんな感じです。List(ListProxy)は当然可変長だし、独自クラスのオブジェクトも問題なく追加することができました。

from multiprocessing import Manager, Value
from concurrent import futures
from concurrent.futures import Future, ProcessPoolExecutor
from typing import List
import dataclasses

MAX_WORKERS = 3

@dataclasses.dataclass
class SquaredNum:
    num: int
    squared_num: int

def parallel_func(num: int, squared_nums: List[SquaredNum], squared_nums_total: Value):
    squared_num: int = num * num
    squared_nums.append(SquaredNum(num, squared_num))
    squared_nums_total.value += squared_num

def main():
    nums: List[int] = [0, 1, 2, 3, 4, 5]

    with Manager() as manager:
        squared_nums: List[SquaredNum] = manager.list()
        squared_nums_total: Value = manager.Value('i', 0)
        future_list: List[Future] = []
        with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
            for num in nums:
                future: Future = executor.submit(
                    parallel_func,
                    num=num,
                    squared_nums=squared_nums,
                    squared_nums_total=squared_nums_total,
                )
                future_list.append(future)
            for future in futures.as_completed(fs=future_list):
                future.result()
        print(f'squared_nums: {squared_nums}')
        print(f'squared_nums_total: {squared_nums_total.value}')

if __name__ == "__main__":
    main()

実行結果はこんな感じになります。

squared_nums: [SquaredNum(num=0, squared_num=0), SquaredNum(num=1, squared_num=1), SquaredNum(num=3, squared_num=9), SquaredNum(num=2, squared_num=4), SquaredNum(num=4, squared_num=16), SquaredNum(num=5, squared_num=25)]
squared_nums_total: 55

実装サンプル②

次に、独自クラスのインスタンス変数としてサーバプロセスで管理する変数(Pythonオブジェクト)を管理するケースを実装してみます。
実際のプロジェクトでは、PySparkのクラスターを再利用するためのクラスタープールを作ったのですがそれを超シンプルにした感じです。それでもだいぶ長くなってしまいました...
興味がある人は読み込んでみてください。

from multiprocessing import Manager
from multiprocessing.managers import SyncManager
from concurrent import futures
from concurrent.futures import Future, ProcessPoolExecutor
from typing import List
import time
import os
import random
import string

MAX_WORKERS = 3

class Cluster:
    def __init__(self):
        self.name = ''.join(random.choices(string.ascii_lowercase, k=5))

    def submit_pyspark_job(self, num: int):
        squared_num: int = num * num
        time.sleep(1)  # 本当はここでPySparkのジョブを実行する
        print(f'job finished. num: {num}, squared_num: {squared_num}, pid: {os.getpid()}, cluster_name: {self.name}')
        return num * num  # サンプルなので、numの二乗を返しておく

class ClusterPool:
    def __init__(self, manager: SyncManager):
        self.clusters = manager.list()  # インスタンス変数にサーバプロセスで管理するListを保持

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        self.clusters[:] = []  # withブロックを抜ける時にサーバプロセスのListを空に

    def pop_or_create_cluster(self) -> Cluster:
        try:
            return self.clusters.pop()  # popしてサーバプロセスのListから削除
        except IndexError:
            return Cluster()

    def append_cluster(self, cluster: Cluster):
        self.clusters.append(cluster)  # 最後にサーバプロセスのListに追加

def parallel_func(num: int, cluster_pool: ClusterPool) -> int:
    cluster: Cluster = cluster_pool.pop_or_create_cluster()
    result: int = cluster.submit_pyspark_job(num)
    cluster_pool.append_cluster(cluster)
    return result

def main():
    nums: List[int] = [0, 1, 2, 3, 4, 5, 6, 7, 8]

    with Manager() as manager:
        with ClusterPool(manager) as cluster_pool:
            future_list: List[Future] = []
            squared_nums: List[int] = []
            with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
                for num in nums:
                    future: Future = executor.submit(
                        parallel_func,
                        num=num,
                        cluster_pool=cluster_pool,
                    )
                    future_list.append(future)
                for future in futures.as_completed(fs=future_list):
                    squared_nums.append(future.result())

        print(f'squared_nums: {squared_nums}')

if __name__ == "__main__":
    main()

ポイントは以下の通りです。

  • ClusterPoolクラスのインスタンス変数として、Clusterリスト(サーバプロセスで管理する変数)を保持する
  • Clusterを利用する場合は、Clusterリストからpop()して要素を取得し、リストからは削除する
  • Clusterの利用が完了したら、Clusterリストに要素をappend()する

実行ログは以下のようになりました。
14581, 14582, 14583の三つのプロセスが立ち上がっていて、それらの中で、kvyjv, uphtx, asixtの3つのclusterを使いまわしているのがわかると思います。

job finished. num: 0, squared_num: 0, pid: 14581, cluster_name: kvyjv
job finished. num: 1, squared_num: 1, pid: 14582, cluster_name: uphtx
job finished. num: 2, squared_num: 4, pid: 14583, cluster_name: asixt
job finished. num: 3, squared_num: 9, pid: 14581, cluster_name: kvyjv
job finished. num: 4, squared_num: 16, pid: 14582, cluster_name: asixt
job finished. num: 5, squared_num: 25, pid: 14583, cluster_name: uphtx
job finished. num: 6, squared_num: 36, pid: 14581, cluster_name: kvyjv
job finished. num: 7, squared_num: 49, pid: 14583, cluster_name: uphtx
job finished. num: 8, squared_num: 64, pid: 14582, cluster_name: asixt
squared_nums: [0, 1, 4, 9, 16, 25, 36, 64, 49]

ProcessPoolExecutorを使って別プロセスで実行する関数parallel_funcの引数として、ClusterPoolのインスタンスを渡しているので、ClusterPool内のインスタンス変数clusters(サーバプロセスで管理するPythonオブジェクトへの参照)も別プロセスで実行するタイミングでコピーされて別物になってしまうかもしれないなぁと懸念していたのですが、実際やってみたところ、ちゃんと同一のサーバプロセス上の変数にアクセスしてくれることが分かりました。

トラブル対応

AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

with Manager() as mangerのwithブロックの外側、つまりManagerオブジェクトを閉じた後に、サーバプロセス上の変数にアクセスすると上記のエラーが発生します。

Managerオブジェクトが存在する間に、サーバプロセス上の変数を取得しましょう。

サーバプロセス上のPythonオブジェクトを取得して、そのオブジェクト内のインスタンス変数を変更しても、サーバプロセス上のPythonオブジェクトには反映されない

@dataclasses.dataclass
class Num:
    num: int

def main():
    with Manager() as manager:
        nums: List[Num] = manager.list()
        nums.append(Num(1))
        num_before = nums[0]
        print(f'num_before: {num_before.num}')
        num_before.num = 9
        print(f'num_changed: {num_before.num}')
        num_after = nums[0]
        print(f'num_after: {num_after.num}')

実行ログはこんな感じです。

num_before: 1
num_changed: 9
num_after: 1

サーバプロセスからNumオブジェクトを取得して、インスタンス変数のnumを変更しても、サーバプロセスから再度取得したNumオブジェクトは変更されていないことが分かります。サーバプロセスからPythonオブジェクトを取得するということは、つまり、オブジェクトをコピーするっていうことなんですね。

最初は、実装サンプル②は、Clusterクラスにis_availableというインスタンス変数を使って、以下のようにwithブロックの入口と出口でTrue/Falseを更新するような作りにしていたのですが、インスタンス変数を変更しても無駄なことがわかったのでやめました。

class Cluster:
    def __enter__(self):
        self.is_available = False
        return self
    def __exit__(self, exc_type, exc_value, tb):
        self.is_available = True

で、対応方法としては、大きく二つあります。

  1. pop()でサーバから削除しつつ取得 => 何か処理 => append()
    • 一旦サーバプロセス側から対象のオブジェクトを削除されるので、他のプロセスがこのオブジェクトにアクセスできない
  2. Lock => サーバから取得 => 何か処理 => サーバのオブジェクトを置換 => Lock解除
    • Lock中は、他のプロセスはサーバプロセスのオブジェクトにアクセスできない

今回の実装サンプル②は、1.の方法になりますので、ここでは2.の方法についても実装だけ貼り付けておきます。詳細の解説はしませんので、興味がある人はソースコード上のコメントを参照ください。

from multiprocessing import Manager
from multiprocessing.managers import SyncManager
from concurrent import futures
from concurrent.futures import Future, ProcessPoolExecutor
from threading import Lock
from typing import List
import time
import os
import random
import string

MAX_WORKERS = 3

class Cluster:
    def __init__(self):
        self.name: str = ''.join(random.choices(string.ascii_lowercase, k=5))
        self.is_available: bool = False

    def __eq__(self, other):
        return self.name == other.name

    def submit_pyspark_job(self, num: int):
        squared_num: int = num * num
        time.sleep(1)  # 本当はここでPySparkのジョブを実行する
        print(f'job finished. num: {num}, squared_num: {squared_num}, pid: {os.getpid()}, cluster_name: {self.name}')
        return num * num  # サンプルなので、numの二乗を返しておく

class ClusterPool:
    def __init__(self, manager: SyncManager):
        self.clusters: List[Cluster] = manager.list()
        self.lock: Lock = manager.Lock()  # サーバプロセスで管理するロックオブジェクトを生成

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        self.clusters[:] = []  # withブロックを抜ける時にサーバプロセスのListを空に

    def get_or_create_cluster(self) -> Cluster:
        self.lock.acquire(blocking=True) # ここでロック取得。timeoutは指定してないので他のプロセスは無限に待つ
        try:
            # 利用可能な既存クラスタを探す
            for i, existing_cluster in enumerate(self.clusters):
                if existing_cluster.is_available:
                    existing_cluster.is_available = False # 利用可否を更新
                    self.clusters[i] = existing_cluster # サーバプロセスのリスト内のオブジェクトを置き換え
                    return existing_cluster

            # 新規クラスタを生成する
            new_cluster: Cluster = Cluster()
            self.clusters.append(new_cluster) # サーバプロセスのリストにオブジェクトを追加
            return new_cluster
        finally:
            self.lock.release()

    def release_cluster(self, cluster: Cluster):
        cluster.is_available = True
        self.clusters[self.clusters.index(cluster)] = cluster

def parallel_func(num: int, cluster_pool: ClusterPool) -> int:
    cluster: Cluster = cluster_pool.get_or_create_cluster() # クラスタ作成 or クラスタプールから取得
    result: int = cluster.submit_pyspark_job(num) 
    cluster_pool.release_cluster(cluster) # クラスタプールにクラスタを返す
    return result

def main():
    nums: List[int] = [0, 1, 2, 3, 4, 5, 6, 7, 8]

    with Manager() as manager:
        with ClusterPool(manager) as cluster_pool:
            future_list: List[Future] = []
            squared_nums: List[int] = []
            with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
                for num in nums:
                    future: Future = executor.submit(
                        parallel_func,
                        num=num,
                        cluster_pool=cluster_pool,
                    )
                    future_list.append(future)
                for future in futures.as_completed(fs=future_list):
                    squared_nums.append(future.result())

        print(f'squared_nums: {squared_nums}')

上記の実装も悪くないのですが、Lockの待ち合わせが発生しない分、実装サンプル②が優れていると思ったので、実装サンプル②を採用しました。

TypeError: Pickling an AuthenticationString object is disallowed for security reasons

Managerオブジェクトがマルチプロセスで実行する関数の引数に含まれるとこのエラーが発生します。今回は、ClusterPoolwithブロックを抜ける時にManagerオブジェクトを閉じたかったのですが、そこは諦めました。

class ClusterPool:
    def __init__(self):
        self.manager = Manager()
        self.clusters = self.manager.list()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        self.manager.__exit__(exc_type, exc_value, tb)  # ここでManagerを閉じたかった

def main():
    with ClusterPool() as cluster_pool:
        with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
            future: Future = executor.submit(
                parallel_func,
                cluster_pool=cluster_pool,
            )
            future.result()

最終的には、実装サンプル②のようにClusterPoolの外でManagerオブジェクトを作成して、渡すようにしてあげることで解決しました。

TypeError: cannot pickle 'CompiledFFI' object

サーバプロセスで管理するPythonオブジェクトのインスタンス変数にgoogle.cloud.dataproc_v1.ClusterControllerClientgoogle.cloud.storage.Clientなどのオブジェクトを含めた場合に発生します。
これは、credential用の暗号を高速計算するためのcryptographyモジュールがC言語の拡張モジュールであり、上記のオブジェクトの中に含まれるようでなのですが、C言語の拡張モジュールはpickleできないという問題があるみたいです。

対処方法としては、Clientオブジェクトを保持するのではなく、それを作成するために必要なクレデンシャルパスやプロジェクト名などをインスタンス変数として保持しておき、Clientオブジェクトが必要になったら毎回作成するようにしました。

if __name__ == "__main__":の中でマルチプロセス処理を実行しないとエラー

以下のような長文のエラーメッセージが表示されます。

RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

以下のような感じで、if __name__ == '__main__':の中で実行するとエラーは解消されます。


def main():
    # マルチプロセス処理

if __name__ == '__main__':
    main()

所感

公式ドキュメントに以下のようにあります。

サーバープロセスのマネージャーオブジェクトは共有メモリのオブジェクトよりも柔軟であるといえます。それは、どのような型のオブジェクトでも使えるからです。

なので、サーバプロセスの方はなんの制約もないのかなぁと思っていたのですが、実際使ってみると思いの外いろんなところでエラーが発生して大変でした。。

とはいっても、丸一日格闘すれば大体のことは解決できる感じではありますので、それほど恐れる必要はないかな、と思います。