BigQueryのpython client(google-cloud-bigquery)の使い方をメモっていきます。

インストール

  • サービスアカウントキーファイルを環境変数に設定
    • こちらを参考に、サービスアカウント+キーファイルを作成し、そのキーファイルのパスを環境変数に設定します。
      export GOOGLE_APPLICATION_CREDENTIALS="/hoge/fuga/key_file.json"
  • ライブラリインストール
    pip install google-cloud-bigquery
  • クライアント生成
    from google.cloud.bigquery import Client
    client: Client = Client() # 環境変数 GOOGLE_APPLICATION_CREDENTIALSが設定されている前提。引数で渡すこともできる

データの取得

クエリを使う

from google.cloud.bigquery.table import RowIterator, Row
from google.cloud.bigquery.job import QueryJob
import itertools

# クエリ定義
query: str = "SELECT item1, item2 FROM HOGE;"
# JOB実行
query_job: QueryJob = client.query(query)
rows: RowIterator = query_job.result() # APIリクエスト発生+結果待ち合わせ
# 全件利用
print(f"total_rows: {rows.num_results}")
for row in rows:
    print(f"item1: {row.item1}, item2: {row.item2}")
# 1件だけ実行(必ずしもこうする必要は全くないけど)
row: Optional[table.Row] = next(itertools.islice(rows, 1), None)
if row is not None:
    print(f"item1: {item1}, item2: {item2}")

テーブル作成

クエリを使う

# クエリ定義(SELECTで既存テーブルを元にテーブル作成)
query: str = """
CREATE OR REPLACE TABLE dataset_name.hoge
OPTIONS (expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 3 HOUR)) -- 3時間で削除(テンポラリテーブル扱い)
AS
SELECT
    f.*
    , CAST(NULL AS INT64) AS item3 -- 空のカラムを追加したい場合はCASTが必要
FROM
    dataset_name.fuga f
-- データなしのテーブルを作りたければ、以下を追加
-- LIMIT 0
"""
# クエリ定義(スキーマ定義をしてテーブル作成)
query: str = """
CREATE OR REPLACE TABLE dataset_name.hoge (
    item1 STRING NOT NULL
    , item2 STRING NOT NULL
    , item3 INT64
)
OPTIONS (expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 3 HOUR)) -- 3時間で削除(テンポラリテーブル扱い)
"""
# クエリ実行
query_job: QueryJob = client.query(query)
query_job.result() # APIリクエスト

データの一括アップロード

client.insert_rowsを使う

これは、ストリーミングインサートを行う関数なのですが、こちらにあるように、データを利用可能になるまで数秒程度の遅延が発生します(実際にはもっと遅延が発生することもあります)。そのため、インサート結果を次の処理ですぐに使うようなケースでは利用できません。なお、遅延が発生している場合でもエラーにはなりません。

from more_itertools import chunked
from typing import Tuple, List
from google.cloud.bigquery.table import Table

table: Table = client.get_table("dataset_name.hoge") # APIでテーブル定義を取得
rows: List[Tuple[str, str, str]] = [("aa1", "bb1", "cc1"), ("aa2", "bb2", "cc2")]

# client.insert_rowsで一度に処理できる最大件数が1万件なので、1万件ずつ処理
for chuncked_rows in list(chunked(rows, 10000)):
     errors = client.insert_rows(table, chuncked_rows) # APIリクエスト
     print(f"errors: {errors}")

client.load_table_from_dataframeを使う

こちらは遅延は発生せず、同期的にデータロードされるため、安心です。
しかし、NULL(None)をinsertしようとすると、pyarrow.lib.ArrowTypeError: Expected a string or bytes object, got a 'int' objectが発生するのですが、これの回避方法が分からず困っています(LoadJobConfignull_markerはCSVでしか使えないし、こちらを見ると現状ではDataFrame側の設定でもダメそうです)。なので、どうしてもNULLをinsertしたい場合は、client.load_table_from_csvを使うほうがいいかもしれません。
なお、import不要ですが、事前にpip install pyarrowする必要があります。

import pandas as pd
from pandas.core.frame import DataFrame
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.job import LoadJob
from google.cloud.bigquery import LoadJobConfig
from typing import List

# テーブル定義取得
table: Table = client.get_table("dataset_name.hoge") # APIリクエスト

# DataFrame生成
columns: List[str] = list(map(lambda field: field.name, table.schema))
df: DataFrame = pd.DataFrame(columns=columns)
for row in [["aa1", "bb1", "cc1"], ["aa2", "bb2", "cc2"]]:
    df = df.append(pd.Series(row, index=df.columns), ignore_index=True)

# BigQueryにロード
job: LoadJob = client.load_table_from_dataframe(df, table, job_config=LoadJobConfig(schema=table.schema))
job.result() # APIリクエスト

client. load_table_from_jsonを使う(STRUCTを含むデータをロードする場合はこれ一択)

上記のclient.load_table_from_dataframeを使ってSTRUCTや配列を含むデータをロードしようとしたところ、以下のエラーが発生してできませんでした。

ValueError: Uploading dataframes with struct (record) column types is not supported. See: https://github.com/googleapis/google-cloud-python/issues/8191

こちらを見ると、

google-cloud-bigquery
DataFrame を Parquet 形式に変換して API に送信します。ネストした値や配列値をサポートします。DataFrame をテーブルに読み込むには、pyarrow(DataFrame のデータを BigQuery API へ送信するのに使う parquet エンジン)をインストールする必要があります。

と書いてあるのにひどいです。なのでそのようなケースでは、load_table_from_jsonを使います。

from typing import List, Dict, Any
from google.cloud.bigquery import Client, LoadJobConfig
from google.cloud.bigquery.job import QueryJob, LoadJob
from google.cloud.bigquery.table import Table

# テーブル作成
query: str = """
    CREATE OR REPLACE TABLE dataset_name.hoge (
        id STRING NOT NULL
        , vector ARRAY<STRUCT<
            id INT64 NOT NULL,
            value FLOAT64 NOT NULL
        >>
        , keywords ARRAY<STRING>
    )
"""
client: Client = Client()
job: QueryJob = client.query(query)
job.result()

# 対象テーブルを取得
table: Table = client.get_table("dataset_name.hoge") # APIでテーブル定義を取得

# 投入データを設定
json_rows: List[Dict[str, Any]] = [
    {
        'id': 1,
        'vector': [{'id': 1, 'value': 0.13458}, {'id': 2, 'value': 0.981345}, {'id': 3, 'value': 0.013473}],
        'keywords': ['this', 'pen']
    },
    {
        'id': 2,
        'vector': [{'id': 1, 'value': 0.234956}, {'id': 2, 'value': 0.62164}, {'id': 3, 'value': 0.856725}],
        'keywords': ['that', 'window']
    }
]

# データロード
job: LoadJob = client.load_table_from_json(
    json_rows, table, job_config=LoadJobConfig(schema=table.schema))
job.result()

ユーザ定義関数(UDF)

後で書くかも