BigQueryのpython client(google-cloud-bigquery)の使い方をメモっていきます。
インストール
- サービスアカウントキーファイルを環境変数に設定
-
こちらを参考に、サービスアカウント+キーファイルを作成し、そのキーファイルのパスを環境変数に設定します。
export GOOGLE_APPLICATION_CREDENTIALS="/hoge/fuga/key_file.json"
-
こちらを参考に、サービスアカウント+キーファイルを作成し、そのキーファイルのパスを環境変数に設定します。
- ライブラリインストール
pip install google-cloud-bigquery
- クライアント生成
from google.cloud.bigquery import Client client: Client = Client() # 環境変数 GOOGLE_APPLICATION_CREDENTIALSが設定されている前提。引数で渡すこともできる
データの取得
クエリを使う
from google.cloud.bigquery.table import RowIterator, Row
from google.cloud.bigquery.job import QueryJob
import itertools
# クエリ定義
query: str = "SELECT item1, item2 FROM HOGE;"
# JOB実行
query_job: QueryJob = client.query(query)
rows: RowIterator = query_job.result() # APIリクエスト発生+結果待ち合わせ
# 全件利用
print(f"total_rows: {rows.num_results}")
for row in rows:
print(f"item1: {row.item1}, item2: {row.item2}")
# 1件だけ実行(必ずしもこうする必要は全くないけど)
row: Optional[table.Row] = next(itertools.islice(rows, 1), None)
if row is not None:
print(f"item1: {item1}, item2: {item2}")
テーブル作成
クエリを使う
# クエリ定義(SELECTで既存テーブルを元にテーブル作成)
query: str = """
CREATE OR REPLACE TABLE dataset_name.hoge
OPTIONS (expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 3 HOUR)) -- 3時間で削除(テンポラリテーブル扱い)
AS
SELECT
f.*
, CAST(NULL AS INT64) AS item3 -- 空のカラムを追加したい場合はCASTが必要
FROM
dataset_name.fuga f
-- データなしのテーブルを作りたければ、以下を追加
-- LIMIT 0
"""
# クエリ定義(スキーマ定義をしてテーブル作成)
query: str = """
CREATE OR REPLACE TABLE dataset_name.hoge (
item1 STRING NOT NULL
, item2 STRING NOT NULL
, item3 INT64
)
OPTIONS (expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 3 HOUR)) -- 3時間で削除(テンポラリテーブル扱い)
"""
# クエリ実行
query_job: QueryJob = client.query(query)
query_job.result() # APIリクエスト
データの一括アップロード
client.insert_rowsを使う
これは、ストリーミングインサートを行う関数なのですが、こちらにあるように、データを利用可能になるまで数秒程度の遅延が発生します(実際にはもっと遅延が発生することもあります)。そのため、インサート結果を次の処理ですぐに使うようなケースでは利用できません。なお、遅延が発生している場合でもエラーにはなりません。
from more_itertools import chunked
from typing import Tuple, List
from google.cloud.bigquery.table import Table
table: Table = client.get_table("dataset_name.hoge") # APIでテーブル定義を取得
rows: List[Tuple[str, str, str]] = [("aa1", "bb1", "cc1"), ("aa2", "bb2", "cc2")]
# client.insert_rowsで一度に処理できる最大件数が1万件なので、1万件ずつ処理
for chuncked_rows in list(chunked(rows, 10000)):
errors = client.insert_rows(table, chuncked_rows) # APIリクエスト
print(f"errors: {errors}")
client.load_table_from_dataframeを使う
こちらは遅延は発生せず、同期的にデータロードされるため、安心です。
しかし、NULL(None)
をinsertしようとすると、pyarrow.lib.ArrowTypeError: Expected a string or bytes object, got a 'int' object
が発生するのですが、これの回避方法が分からず困っています(LoadJobConfig
のnull_marker
はCSVでしか使えないし、こちらを見ると現状ではDataFrame側の設定でもダメそうです)。なので、どうしてもNULL
をinsertしたい場合は、client.load_table_from_csv
を使うほうがいいかもしれません。
なお、import不要ですが、事前にpip install pyarrow
する必要があります。
import pandas as pd
from pandas.core.frame import DataFrame
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.job import LoadJob
from google.cloud.bigquery import LoadJobConfig
from typing import List
# テーブル定義取得
table: Table = client.get_table("dataset_name.hoge") # APIリクエスト
# DataFrame生成
columns: List[str] = list(map(lambda field: field.name, table.schema))
df: DataFrame = pd.DataFrame(columns=columns)
for row in [["aa1", "bb1", "cc1"], ["aa2", "bb2", "cc2"]]:
df = df.append(pd.Series(row, index=df.columns), ignore_index=True)
# BigQueryにロード
job: LoadJob = client.load_table_from_dataframe(df, table, job_config=LoadJobConfig(schema=table.schema))
job.result() # APIリクエスト
client. load_table_from_jsonを使う(STRUCTを含むデータをロードする場合はこれ一択)
上記のclient.load_table_from_dataframe
を使ってSTRUCTや配列を含むデータをロードしようとしたところ、以下のエラーが発生してできませんでした。
ValueError: Uploading dataframes with struct (record) column types is not supported. See: https://github.com/googleapis/google-cloud-python/issues/8191
こちらを見ると、
google-cloud-bigquery
DataFrame を Parquet 形式に変換して API に送信します。ネストした値や配列値をサポートします。DataFrame をテーブルに読み込むには、pyarrow(DataFrame のデータを BigQuery API へ送信するのに使う parquet エンジン)をインストールする必要があります。
と書いてあるのにひどいです。なのでそのようなケースでは、load_table_from_json
を使います。
from typing import List, Dict, Any
from google.cloud.bigquery import Client, LoadJobConfig
from google.cloud.bigquery.job import QueryJob, LoadJob
from google.cloud.bigquery.table import Table
# テーブル作成
query: str = """
CREATE OR REPLACE TABLE dataset_name.hoge (
id STRING NOT NULL
, vector ARRAY<STRUCT<
id INT64 NOT NULL,
value FLOAT64 NOT NULL
>>
, keywords ARRAY<STRING>
)
"""
client: Client = Client()
job: QueryJob = client.query(query)
job.result()
# 対象テーブルを取得
table: Table = client.get_table("dataset_name.hoge") # APIでテーブル定義を取得
# 投入データを設定
json_rows: List[Dict[str, Any]] = [
{
'id': 1,
'vector': [{'id': 1, 'value': 0.13458}, {'id': 2, 'value': 0.981345}, {'id': 3, 'value': 0.013473}],
'keywords': ['this', 'pen']
},
{
'id': 2,
'vector': [{'id': 1, 'value': 0.234956}, {'id': 2, 'value': 0.62164}, {'id': 3, 'value': 0.856725}],
'keywords': ['that', 'window']
}
]
# データロード
job: LoadJob = client.load_table_from_json(
json_rows, table, job_config=LoadJobConfig(schema=table.schema))
job.result()
ユーザ定義関数(UDF)
後で書くかも