Pythonで非同期処理を実装する場合、 asyncio モジュールを使うのが一般的だと思いますが、並行処理をさせたり、ネストしたりするケースは多少気をつけることもあったので、メモ的にやり方を残しておきます。

シーケンシャルな非同期処理

まずはシンプルなケースとして、3サイト(qiita, google, yahoo)の合計6ページについて、pyppeteer(chromiumを起動して非同期スクレイピングするライブラリ)を使って、HTMLを取得するケースを書いてみました。

from pyppeteer.browser import Browser
from pyppeteer.page import Page
from pyppeteer.launcher import launch
from typing import List
import datetime
from asyncio import AbstractEventLoop
import asyncio

URLS: List[str] = [
    'https://qiita.com/search?q=1',
    'https://qiita.com/search?q=2',
    'https://www.google.com/search?q=1',
    'https://www.google.com/search?q=2',
    'https://search.yahoo.co.jp/search?p=1',
    'https://search.yahoo.co.jp/search?p=2',
]

def log(message: str):
    print(f'{datetime.datetime.now()} {message}')

async def main():
    log('main started.')
    browser: Browser = await launch()
    try:
        htmls: List[str] = []
        for url in URLS:
            html: str = await scrape_one_page(browser, url)
            htmls.append(html)
    finally:
        await browser.close()
    log(f'htmls count: {len(htmls)}')
    log('main finished.')

async def scrape_one_page(browser: Browser, url: str) -> str:
    """ひとつのページをスクレイピングして、HTMLを返す"""
    log(f'scrape_one_page started. url: {url}')
    log(f'task count: {len(asyncio.all_tasks())}')
    page: Page = await browser.newPage()
    try:
        await page.goto(url)
        html: str = await page.content()
        return html
    finally:
        await page.close()
        log(f'scrape_one_page finished. url: {url}')

if __name__ == "__main__":
    event_loop: AbstractEventLoop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())
  • get_event_loop()イベントループを取得して、run_until_complete()main()が完了するまで実行している
  • for文でシーケンシャルに1ページずつ処理を行なっている

実行ログは以下のようになりました。

2020-11-23 11:33:38.684331 main started.
2020-11-23 11:33:39.001588 scrape_one_page started. url: https://qiita.com/search?q=1
2020-11-23 11:33:39.001651 task count: 4
2020-11-23 11:33:41.051855 scrape_one_page finished. url: https://qiita.com/search?q=1
2020-11-23 11:33:41.051888 scrape_one_page started. url: https://qiita.com/search?q=2
2020-11-23 11:33:41.051932 task count: 4
2020-11-23 11:33:42.652274 scrape_one_page finished. url: https://qiita.com/search?q=2
2020-11-23 11:33:42.652317 scrape_one_page started. url: https://www.google.com/search?q=1
2020-11-23 11:33:42.652361 task count: 4
2020-11-23 11:33:43.849362 scrape_one_page finished. url: https://www.google.com/search?q=1
2020-11-23 11:33:43.849446 scrape_one_page started. url: https://www.google.com/search?q=2
2020-11-23 11:33:43.849474 task count: 4
2020-11-23 11:33:44.906207 scrape_one_page finished. url: https://www.google.com/search?q=2
2020-11-23 11:33:44.906264 scrape_one_page started. url: https://search.yahoo.co.jp/search?p=1
2020-11-23 11:33:44.906293 task count: 4
2020-11-23 11:33:46.705226 scrape_one_page finished. url: https://search.yahoo.co.jp/search?p=1
2020-11-23 11:33:46.705264 scrape_one_page started. url: https://search.yahoo.co.jp/search?p=2
2020-11-23 11:33:46.705292 task count: 4
2020-11-23 11:33:48.536774 scrape_one_page finished. url: https://search.yahoo.co.jp/search?p=2
2020-11-23 11:33:48.554774 htmls count: 6
2020-11-23 11:33:48.554819 main finished.

ログからは以下の点を確認できます。

  • シーケンシャルに1ページずつ開始・終了していて、トータルで10秒程度時間がかかっている
  • イベントループ内に存在する未完了タスクが常に4件で変化がない(つまり、同時に実行していない)
    • 4件の内訳はmain()Connection._recv_loop()WebSocketCommonProtocol.transfer_data()WebSocketCommonProtocol.close_connection()

並行で非同期処理

上記の例では、シーケンシャルに非同期処理を実行していたためトータルでけっこう実行時間がかかってしまっていましたが、asyncioでは同一のイベントループ内に複数のタスクがある場合、あるタスクがawaitで待機状態になった際に別のタスクを実行してくれます。つまり、非同期処理を並行実行することができます。
ここでは上の例を、並行処理する形にmain()関数を少し書き換えてみようと思います。

async def main():
    log('main started.')
    browser: Browser = await launch()
    try:
        coroutines: List[Coroutine] = []
        for url in URLS:
            coroutines.append(scrape_one_page(browser, url))
        htmls: List[str] = await asyncio.gather(*coroutines)
    finally:
        await browser.close()
    log(f'htmls count: {len(htmls)}')
    log('main finished.')
  • for文でコルーチンを複数生成して、asyncio.gather()で一つのイベントループにまとめてタスクとして登録して並行実行する
  • 結果(htmls)は、各コルーチンの戻り値を元のリストの順番に集めたものになっている

実行ログは以下のようになりました。

2020-11-23 14:53:33.015661 main started.
2020-11-23 14:53:33.361009 scrape_one_page started. url: https://qiita.com/search?q=1
2020-11-23 14:53:33.361079 task count: 10
2020-11-23 14:53:33.361134 scrape_one_page started. url: https://qiita.com/search?q=2
2020-11-23 14:53:33.361158 task count: 11
2020-11-23 14:53:33.361194 scrape_one_page started. url: https://www.google.com/search?q=1
2020-11-23 14:53:33.361216 task count: 12
2020-11-23 14:53:33.361249 scrape_one_page started. url: https://www.google.com/search?q=2
2020-11-23 14:53:33.361269 task count: 13
2020-11-23 14:53:33.361301 scrape_one_page started. url: https://search.yahoo.co.jp/search?p=1
2020-11-23 14:53:33.361333 task count: 14
2020-11-23 14:53:33.361366 scrape_one_page started. url: https://search.yahoo.co.jp/search?p=2
2020-11-23 14:53:33.361394 task count: 15
2020-11-23 14:53:34.861235 scrape_one_page finished. url: https://www.google.com/search?q=2
2020-11-23 14:53:35.778646 scrape_one_page finished. url: https://www.google.com/search?q=1
2020-11-23 14:53:35.870635 scrape_one_page finished. url: https://search.yahoo.co.jp/search?p=2
2020-11-23 14:53:36.026315 scrape_one_page finished. url: https://search.yahoo.co.jp/search?p=1
2020-11-23 14:53:36.875245 scrape_one_page finished. url: https://qiita.com/search?q=1
2020-11-23 14:53:36.880065 scrape_one_page finished. url: https://qiita.com/search?q=2
2020-11-23 14:53:36.903932 htmls count: 6
2020-11-23 14:53:36.903982 main finished.

ログからは以下の点を確認できると思います。

  • 元のコルーチンのリストの順でほぼ同時に開始しているが、終了はレスポンスが帰ってきた順に実行されている(ちゃんと並行実行されている)
  • 結果、トータルの実行時間が3秒程度まで縮まっている(もともと10秒程度だった)
  • 同一イベントループ内の未完了のタスクが、固定では増えていっている
    • 一つのscrape_one_pageの実行で、scrape_one_pate()Connection._async_send()の二つのタスクが登録されていた
    • 各タスクは依存先のタスクの情報も持っており、ちゃんと依存関係も意識したスケジューリングがされているようでした

イベントループをネスト

Pythonでは、イベントループの中でそのイベントループをもう一回実行したり、別のイベントループを実行したりすると以下のエラーが発生します。

RuntimeError: This event loop is already running

RuntimeError: Cannot run the event loop while another loop is running

イベントループをネストする必然性があるケースは稀だと思うのですが、非同期関数 => 同期関数 => 非同期関数のようなことをしたくなった時にこのケースに遭遇しました。回避策は比較的簡単で、nest-asyncioを導入してあげるだけです(実装上は以下の2行)。

import nest_asyncio
nest_asyncio.apply()

全く必然性がないのですが、イベントループをネストするサンプルを一応載せておきます。
非同期関数(main) => 同期関数(scrape_one_page) => 非同期関数(scrape_by_pyppeteer)のようになっていて、最初と最後の非同期関数を別のイベントループで実行しています。

from pyppeteer.browser import Browser
from pyppeteer.page import Page
from pyppeteer.launcher import launch
from typing import List
import datetime
import requests
from asyncio import AbstractEventLoop
import asyncio
import nest_asyncio
nest_asyncio.apply()

URLS: List[str] = [
    'https://qiita.com/search?q=1',
    'https://qiita.com/search?q=2',
    'https://www.google.com/search?q=1',
    'https://www.google.com/search?q=2',
    'https://search.yahoo.co.jp/search?p=1',
    'https://search.yahoo.co.jp/search?p=2',
]

def log(message: str):
    print(f'{datetime.datetime.now()} {message}')

async def main():
    log('main started.')
    htmls: List[str] = []
    for url in URLS:
        htmls.append(scrape_one_page(url))
    log(f'htmls count: {len(htmls)}')
    log('main finished.')

def scrape_one_page(url: str) -> str:
    """ひとつのページをpyppeteerもしくはrequestsでスクレイピングして、HTMLを返す"""
    if 'qiita' in url:
        # qiitaの場合は、pyppeteerを使って非同期処理でHTMLを取得
        event_loop: AbstractEventLoop = asyncio.new_event_loop()
        return event_loop.run_until_complete(scrape_by_pyppeteer(url))
    else:
        # それ以外は、requestsを使って同期処理でHTMLを取得
        return requests.get(url).text

async def scrape_by_pyppeteer(url: str) -> str:
    """pyppeteerでスクレイピングしてHTMLを返す"""
    try:
        browser: Browser = await launch()
        page: Page = await browser.newPage()
        await page.goto(url)
        html: str = await page.content()
        return html
    finally:
        await page.close()
        await browser.close()

if __name__ == "__main__":
    event_loop: AbstractEventLoop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())

投げっぱなし(fire and forget)

例えば、通知したり、ファイルを出力したり、ポイント加算したり、非同期で投げっぱなしで処理を実行したいケースがあると思います。英語だと投げっぱなし=fire and forgetというみたいです。

AWSのSQSやGCPのGoogle Task Queueを使うという手もあると思いますが、そんな大げさな仕組みにしなくても、asyncioを使って簡単に実現できます。

run_in_executorを使って、指定した関数を実行するだけです。

import datetime
import asyncio
import time

def main():
    log('main started.')
    asyncio.get_event_loop().run_in_executor(None, wait, 10)
    log('main finished.')

def wait(second: int):
    """投げっぱなしにしたい処理"""
    log('wait started.')
    time.sleep(second)
    log('wait finished.')

def log(message: str):
    print(f'{datetime.datetime.now()} {message}')

if __name__ == "__main__":
    main()
  • 第一引数にNoneを指定しているので、default thread poolで実行している
  • 投げっぱなしにしたい処理はasyncをつけてコルーチンにする必要はない

実行ログは以下のようになります。

2020-11-28 20:02:19.608202 main started.
2020-11-28 20:02:19.611073 wait started.
2020-11-28 20:02:19.611128 main finished.
2020-11-28 20:02:29.611322 wait finished.

main()関数は、投げっぱなしの処理が終わるのを待たずに完了していることが確認できます。

また、デコレータを使って関数をラップすることで、その関数を投げっぱなしにすることもできます。
こちらの方が実装がスッキリするので、個人的にはこちらを利用してます。

import datetime
import asyncio
import time

def fire_and_forget(f):
    """対象関数を非同期で投げっぱなしにするためのデコレータ"""
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)
    return wrapped

def main():
    log('main started.')
    wait(10)  # 通常通り呼び出せば、デコレータが投げっぱなしにしてくれる
    log('main finished.')

@fire_and_forget
def wait(second: int):
    """投げっぱなしにしたい処理"""
    log('wait started.')
    time.sleep(second)
    log('wait finished.')

def log(message: str):
    print(f'{datetime.datetime.now()} {message}')

if __name__ == "__main__":
    main()