Lean Baseball

No Engineering, No Baseball.

Google CloudとPythonを使ったバッチ処理2024 - Cloud Run JobsとPrefectで実現するサーバレスなデータ処理

タイトルの通り, Google CloudとPythonで程よいバッチ処理(正確にはワークフロー処理)を作ったという話です, 2024年っぽさあるのでそういうタイトルにしています*1&こちらは個人開発事例であり, 業務および所属組織とは無関係です.

仕事ではなく, 個人開発(例のごとく野球データ分析, メジャーリーグです)で,

  1. 毎日データサイトからクローリングしてGoogle Cloud Storage(GCS)にCSV保存(Exporter)
  2. CSVファイルがGCSにあったらBigQueryのテーブルに保存(Importer*2
  3. シーズンが始まったらこれを毎日一回のバッチ処理として実行(Trigger*3

というデータ基盤的なプロダクトを作っています*4, お絵かき(Blueprint)は以下のとおりです.

今回作ったモノです.

CSV保存(Exporter)とBigQuery保存(Importer)はGoでちょっとしたWeb APIを作ってこれをPub/Sub経由で動かすという解決方法でシンプルにイケたものの(Importerについては前回ブログに書いています&Exporterも同じような手法*5で開発しています), 最後のバッチ処理をGoで書こうとしたら(主に私shinyorkeのGopherとしての実力不足ですが*6)あんまりいい感じに作れなかったので色々考えた結果,

  • AirflowとかLuigi, 今回の主人公Prefectといった「Pythonのワークフローなフレームワーク(=うっかりバッチフレームワークとして使いがち*7なやつ)」で作れそう.
  • スケジューラー(=crontab的なやつ)はCloud Run Jobs*8で程よく解決できそう(意訳・フレームワークのタイマーより信用できる*9
  • ワークフローなフレームワーク特有の癖さえ回避できたら全部サーバレスで行けるんじゃね?

と思いつきました.

ちょこっと手を付けてみた結果,

PrefectというPythonのワークフロー(もしくはETL/ELT)フレームワークとCloud Run Jobsで手を付けたら1日ちょっとで実用可能なバッチが作れました, Terraformのコード付きで.

という最高の成果が残ったのと, このアプローチ案外使えるのでは説あるので書き残したいと思います.

このエントリーについて

バッチ処理で使ったPrefectとCloud Run Jobsを解説しながら使い所などをメモとして残しています.

主人公であるPrefectとCloud Run Jobsの解説は厚めにしますが, 以下については解説しないので関連するドキュメント・書籍をご覧になるか別で学んで来てください.

  • Google Cloudの使い方, 他クラウドとの比較
  • Cloud Runそのものの説明
  • AirflowやLuigiといった他のPython 製ワークフローなツールの解説

「バックエンドエンジニアもしくはデータエンジニアの人でPythonとDocker使って開発したことがある」ぐらいの中級者なら読めるのではと.

TL;DR

crontabで設定するようなバッチ処理はCloud Run Jobsを使おう&何で書くか迷ったらPythonのワークフローなフレームワークが良いかも(小さいやつはPrefectがいいぞ)

Prefect is 何?

Prefectはデータを順序よく処理するワークフローをいい感じに作って運用するためのツール・フレームワーク, いわゆる「ワークフローエンジン」「ETLツール」と呼ばれるものですです.

docs.prefect.io

Prefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines.

(Prefectはワークフロー・オーケストレーション・ツールで、開発者はデータ・パイプラインを構築、観察することで大抵の事に対応するやで(意訳))

と, 公式のサイトで言っています&OSS版以外にクラウドサービスとしての提供もあるみたいです*10.

ほとんどの方にとって, 馴染が無いツールだと思いますが実は日本での活用事例はそこそこ存在しています.

developers.cyberagent.co.jp

zenn.dev

実は私も, 前職のJX通信社にいた2年ちょっと前に, しれっとプロダクトコードの一部としてprefect製の簡易的なワークフローを実装・運用した経験があります*11.

tech.jxpress.net

tech.jxpress.net

一応の注意点として,

Prefectはバージョン1とバージョン2でだいぶ変わっているので注意が必要です(このエントリーは最新版の2の話です).

docs.prefect.io

私の前職ブログとCAさんのブログは1前提なので真似する時はご注意を.

とりあえず動かす

Poetryのインストールそのものはpipなりpoetryなり好きな手段を使うと良いかなと思います.

私はpoetryをよく使うのでpoetryで入れました.

$ poetry add prefect@latest

Hello World的なお試しはPrefectのGitHubのスニペット(GitHubリポジトリのstar数を出すサンプル)をそのまま貼って実行すると良いかなと思います, これが直感的かつ楽なので.

from prefect import flow, task
from typing import List
import httpx


@task(log_prints=True)
def get_stars(repo: str):
    url = f"https://api.github.com/repos/{repo}"
    count = httpx.get(url).json()["stargazers_count"]
    print(f"{repo} has {count} stars!")


@flow(name="GitHub Stars")
def github_stars(repos: List[str]):
    for repo in repos:
        get_stars(repo)


# run the flow!
if __name__=="__main__":
    github_stars(["PrefectHQ/Prefect"])

上記をmain.pyとかで保存してpython main.pyで動かせば最小限のワークフローの完成です.

参考までに, 私が実現したかった処理の実装はこんな雰囲気です(本物よりちょっと端折っています&読むのが面倒な方は上記のsample読んでくれたら問題ないです.).

私はこれをmain.pyで保存してこれをエントリーポイントにすることにしました.

コンテナ化してみる

コンテナ化は例の如くDockerfileで書きます.

/prefect_example
├── /app
│ │
│ └── main.py
│
├Dockerfile
├poetry.lock
├poetry.toml
├pyproject.toml

先ほどご登場のHello World的なお試しはPrefectのGitHubのスニペットを/app/main.pyとして保存, プロジェクトのroot(/prefect_example)にDockerfileをおいた場合のDockerfileはこんな感じです(サイズを少しでも縮めるためMulti-stage buildsにしています).

# ここはビルド用のコンテナ
FROM python:3.12-slim as builder

ENV APP_ROOT /opt/app
WORKDIR ${APP_ROOT}

RUN pip3 install poetry
COPY poetry.lock pyproject.toml poetry.toml ./
RUN poetry install --no-dev

# ここからは実行用コンテナの準備
FROM python:3.12-slim as runner

ENV APP_ROOT /opt/app
ENV PREFECT_HOME "/prefect"  # 【ここが重要】containerで動かす時のMust設定.
ENV PYTHONPATH "${PYTHONPATH}:${APP_ROOT}"
RUN useradd -r -s /bin/false appuser
WORKDIR ${APP_ROOT}
COPY --from=builder ${APP_ROOT}/.venv ${APP_ROOT}/.venv
COPY app .
USER appuser
ENTRYPOINT ["/opt/app/.venv/bin/python", "main.py"]

poetryでPythonを動かす時の比較的あるあるなDockerfileだと思いますが, Prefectを動かす観点で重要なのは以下のENVです.

ENV PREFECT_HOME "/prefect"  # 【ここが重要】containerで動かす時のMust設定.

Prefectはワークフローの実行履歴をDatabase(SQLite)で保持しているため, 明示的に出力先(SQLiteの置き場所)を指定しないとコンテナ実行時にエラーで動きません.

個人的には「まあ起こり得るだろうな*12」と思ったのでそんなに驚きはありませんでした.

docs.prefect.io

ただ, 初見で「あんたのコンテナじゃここにファイルを書き込めないよ!」って怒られるのは辛いと思うので解決策を提示すると,

  1. PREFECT_HOMEを明示的に指定. 適当なVolume名を振る(私のスニペットでは/prefectとしています)
  2. 適当に振ったVolume名「/prefect」に対して新規のVolumeを用意する.
  3. Dockerでいうところの docker container run --mount type=volume, src=prefect, dst=/prefect shinyorke/example_prefect:latest みたいなのをちゃんと用意する.

これで解決できます.

ここまで出来たら後はArtifact Registryにpushしてあげればいつでも使えます.

cloud.google.com

説明すると長いので, 知りたい方はQuickstartを御覧ください🙏*13

Cloud Run Jobsで動かす

ここまで来たら後は動かすだけです.

やることはたった2つで,

  • Terraformを書く・動かす
  • デプロイできたら画面からポチッと動かす

以上となります.

Terrafromで動かす

Cloud Run JobsのTerraformを使って書いてあげましょう.

# Create a Cloud Run job

# providerはgoogle-betaなので注意
provider "google-beta" {
  project = ${your google cloud project id}
  region  = asia-northeast1
}

# Cloud Run Job定義
resource "google_cloud_run_v2_job" "example_prefect" {
  provider     = google-beta
  name         = example_prefect
  location     = asia-northeast1
  launch_stage = "BETA"

  template {
    template {
      containers {
        image = "asia-northeast1-docker.pkg.dev/${your google cloud project id}/example/prefect:latest"
        resources {
            limits = {
            cpu    = "2"
            memory = "1024Mi"
            }
        }
        env {
            name  = "HOGE"
            value = "環境変数があればいれる"
        }
        volume_mounts {
          name = "prefect"
          mount_path = "/prefect"
        }
      }
      volumes {
        name = "prefect"
        empty_dir {
            medium = "MEMORY"
            size_limit = "128Mi"
        }
      }

    }
  }
}

Cloud Runにはインメモリ・ボリュームという仕組みがあるので, こちらを作って使うようにするのをお忘れなく.

        volume_mounts {
          name = "prefect"
          mount_path = "/prefect"
        }
      }
      volumes {
        name = "prefect"
        empty_dir {
            medium = "MEMORY"
            size_limit = "128Mi"
        }
      }

適当に振ったVolume名「/prefect」に対して新規のVolumeを用意する. ←これの設定です.

後はterrafrom applyとかしてあげたらとりあえず定義ができます.

動かしてみよう

https://console.cloud.google.com/run/jobs?project=your-google-cloud-project-id を開いて実行ボタンをポチり.

もちろんgcloud commandなどでやっても構いません.

成功するとこんな感じでログが残ります.

定期実行を行う

毎日とか毎時, 私のアプリだと「シーズンが始まったらこれを毎日一回のバッチ処理として実行」というのをやりたい時はCloud Scheduler*14を使います.

cloud.google.com

上記の例では通常のCloud Runを使っていますが, URIでJobを指定することで実行できそうです.

zenn.dev

blog.g-gen.co.jp

実はこの方式はまだ試していないので試して後日アップデートします(できるのは知ってるんだけど*15).

実践的な情報

気になる人だけ読んでもらえればOKです, 読まなくても理解はできるので.

Airflowとの違い

Airflowを語る前にまずこのスライドを紹介します.

エンタープライズ用途であれば(今のところ)Airflow, それもCloud Composer(Google Cloud)やAMWAA(Amazon Managed Workflows for Apache Airflow)といったマネジードサービスを使うのが鉄板*16(モノタロウさんの素晴らしいスライド*17に書いてあるとおり)なのですが, データ量がある程度計算できるスタートアップや個人開発であればCloud RunでPrefectやLuigi*18を動かすほうが良いと思います(断言).

Airflowは運用とDAGの実装が大変なのですが, PrefectやLuigiであれば,

  • とりあえずWorkflowの実装を覚えてDocker化する
  • volumeなどに気をつけてCloud Runで動かす

とかやれば割と気軽に作れてしまいますし, Cloud Runでやるとサーバレスでお安く済む(nodeゼロ台で待たせてもいい*19).

Loggingの実装

これはCloud Loggingの話です&デフォルト設定で使うわーって人は読まなくて大丈夫です.

cloud.google.com

cloud.google.com

PrefectでCloud Loggingを統合(デフォルトのPython Loggingを差し替える)と何故かスレッドエラーっぽい挙動で動かなくなるので, カスタムロガーは別に作った方が良いです(統合しちゃだめ).

私はこんな実装にしました.

# 定義はこれ
from google.cloud.logging import Client as LoggingClient

client: LoggingClient = LoggingClient()
logger = client.logger(name="custom-log-name")

# loggerをそのまま使って出力

logger.log("Hello Prefect")

Loggingに拘りたい方は注意しましょう.

結び

「Google CloudとPythonを使ったバッチ処理2024」と称して,

  • Prefect
  • Cloud Run Jobs

以上を紹介しました.

個人的には毎年70万レコード, 1GB/年いかない程度の「少なくはないが多くもないデータ量のバッチ処理」という, 積年のテーマに対して良い解法が見つかったこと, Google Cloudに限らずサーバレス系のクラウドがいい感じに発達していることに喜びを感じました.

これで好きな野球データ分析が捗りそうなので開幕したら頑張ろうと思います*20.

最後までお読みいただきありがとうございました.

Appendix

参考にしたやつをまとめました.

参考文献

参考文献をまとめて

developers.cyberagent.co.jp

zenn.dev

tech.jxpress.net

tech.jxpress.net

zenn.dev

blog.g-gen.co.jp

関連書籍

関連知識ってことで一応推しておきます.

*1:この手のやつは来年の今頃陳腐化していてもおかしくないので敢えてそうしています&2025年版を作るつもりがあるかと言われるとそれは無いですが笑

*2:今思えばLoaderって名前の方がいい気がするがもう引き返せないのでこれで行きます.

*3:これもJobで良かった気がする. ちなみにデータ処理そのものはELTパターン(TranslateはBigQuery側のViewなどで対処)だったりします.

*4:厳密に言うと作り直しています, 現行バージョンはCloud Functions第二世代とPythonで実装しており, その話は去年のデブサミ等でやったりしました.

*5:出力先がBigQueryじゃなくてGCSになったぐらいなので本当に一緒です

*6:具体的には並列処理にハマりましたってのと, 要件を考えたらこれLuigi(Python)でやるべきだろ!ってなって方針を変えました.

*7:バッチ的に使えるものと, バッチ専門のフレームワークは違うと思うんですよね. しらんけど.

*8:細かい話ですがCloud Run Jobsというのがどうやら正式名称っぽいです. が, TerraformのResourceはCloud Run Jobとなっているので表記揺れはありそうです. なお本ブログはGoogle Cloudのドキュメント(英語)に合わせてCloud Run Jobsにしています.

*9:Airflowはこの辺大変で, 使ったことある人ならわかると思いますが苦労の連続な気がします.

*10:今回は使いませんでしたが, 無料でも割と使えるっぽいので気になる方は使ってみるといいかもしれません.

*11:改めて2年ちょい前のブログを読みましたがこんなにブクマ集めてましたっけ?と驚いています笑

*12:AirflowもLuigiも同じく中身で状態を持っている為, 出力先を指定しないと同じ罠にハマります&この辺は比較的この手のフレームワークあるあるです.

*13:前にも書いた気がしますが, Google CloudのQuickstartはほんとちゃんとしているので真似するのオススメします.

*14:最初やる前はCloud Run Jobs単体でいけると思ったのですが, crontabっぽいパラメーターが無かったので調べたらScheduler使えとありました, 確かにこの方式はGoogle Cloudっぽさがある.

*15:単に時間切れになってやってないだけです, やるつもりです

*16:言い方を変えると, セルフホスティングは絶対やめたほうがいいです, 運用が難しすぎるのと手間がかかりすぎます.

*17:このスライドは本当に良いまとめだと思います.

*18:Luigiはかなり軽量に作れる(Prefectより軽いはず)のと, 今回のCloud Run Jobsノウハウで作れると思います. なおこっちもVolume指定しないと多分死にます.

*19:用途的に動かしたいときだけあればいいので, 常にmin instances = 0的な設定で行けます.

*20:3月には野球ネタ出したい