Lean Baseball

No Engineering, No Baseball.

Google Cloudのサーバレスなイベント駆動処理 - Cloud Runアプリをいい感じにTerraformで管理する

元・野球エンジニア*1, 現・Google Cloud Partner Top Engineer 2024*2の人です.

相変わらず仕事も趣味もGoogle Cloudで何かをやっているのですが, この年末年始に以下の絵のようなシステムを作りました(正確には「元々あった別システムを作り直しました*3」).

この記事の全体像

Baseball Savantから取得*4した投打のデータ(トラッキングデータ)のCSVをBigQueryのテーブル(事前に定義済み)に突っ込むシステムなのですが, こちらを作る過程で,

  • Cloud RunをPub/Subのメッセージをトリガーとしたイベント駆動で動かす
  • アプリケーション(Goで実装)を純然たるWeb APIとして実装(Pub/Sub専用のアプリではない)
  • 上記の構成をサービスアカウントなどの権限設定含めてTerraformでIaC(Infrastructure as Code)化

という感じで実装した結果, 程よい技術ブログが書けそうと気がついたので今回はそんな話を書いてみようかと思います.

なお都合によりコードは非公開ですが, スニペットなどで雰囲気を伝える努力はしているつもりです🙇🏻*5

TL;DR

Pub/Subを利用したイベント駆動処理ぐらいだったらQuick Startのマネでいい感じにできる&最初面倒でもTerraformでIaCやっとけ(個人開発でも)

対象読者

こちらのエントリーは「Webアプリケーション開発およびGoogle Cloudをちょっと使ってる中級者」ぐらいの方を対象に書いています.

以下の事を知っていれば読めると思います.

  • Cloud Runでのアプリケーション開発経験. このブログおよび私の個人開発の常連となっています&過去エントリーはこちら.
  • GoでWebアプリを作ったことある. 今回はApplicationをGoで実装しています.
  • Cloud Pub/Subで何ができるか何となく把握している. 「Pub/Subって何?」という方は公式ドキュメントを読んでみてください.

「駆け出しエンジニア」および「Google Cloud触ったこと無い」って方はとりあえずGoogle Cloudのアカウント作ってこの辺のチュートリアルを動かしてみてください.

cloud.google.com

このエントリーを読むとわかるのですが紹介するのは上記チュートリアルの実践版です.

Cloud Runでイベント駆動処理

Cloud RunはHTTPSでアプリケーションを公開してWebサイトなりAPIなりを公開するやり方以外に,

  • gRPCWebSocketなど好きな手段を使ってアプリ実装.
  • Cloud Run Jobを使ったバッチ処理. いわゆるcrontab的な方法*6.

などいくつかやり方があります.

その中で私が今回取った手段ですが,

  1. GCS上にあるCSVのパスおよび登録先のBigQueryテーブル(データセット込みのパス)をPub/SubにPublishするアプリ(いわゆるPublisher)を実装(予定). なおまだ実装していないので今回はCloud SDK(gcloud command)で代用します.
  2. Pub/Subのメッセージを受け取ってCSVを取得, データをBigQueryに放り込むアプリ(いわゆるSubscriber)を実装.

という「Pub/Subを元にしたイベント駆動処理」を前提としたアーキテクチャを採用しました(ちなみにPushベースです*7).

今回紹介するモノはこんな構成で作りました.

野球データをドカドカ放り込むアプリ

絵はシンプルですがここに至るまでにいくつかのステップがあるのでご紹介します.

Quick Startから動かす

Google Cloudと野球データ活用に魂とプライベートのプログラミング時間を捧げている私でも, この構成のアプリをいきなり作ることは出来ない*8ので, 「とりあえずQuick Startを動かして構想と設計の妥当性を探る」事から始めました.

cloud.google.com

絵で表すと以下の部分を解決するところから開始しました.

Publishしたメッセで動けばとりあえず何とかなるという構想

Google CloudのQuick Startは本当によく出来ていて, 「とりあえずProject作って真似したらどうにかなり」ます.

  • 手順通りにインストールしたり設定したりする.
  • コードスニペットやgcloudコマンドをterminalにペタペタ貼って実行.

やることはたったこれだけで, よほど変な事*9をしていない限り, ページに記載の通りのモノが動きます.

アプリの実装

Pub/Subイベント駆動でアプリが動いたら今度は実装します.

私は幸いにも以前の趣味開発でBigQuery, Cloud Storage(GCS)そしてCloud Loggingを使ったAPIを実装した経験があります.

shinyorke.hatenablog.com

上記のアプリではGinというフレームワークを使って実装しましたが今回はシンプルなApplicationなので標準ライブラリ(net/http )で程よくやりました.

以下それっぽく乗せますが解説は割愛します&各Google CloudのサービスでGo版Quick Startしてたらおおよそ書いて動かるかと.

main.go

Webアプリのエントリーポイント的な役割と実装.

処理実装は分けても良かったのですがこれぐらいのボリュームなら...って事であえてそのまま書きました.

package main

import (
    "context"
    "encoding/json"
    "github.com/Shinichi-Nakagawa/example-cloudrun-pubsub/importer/gcp"
    "github.com/Shinichi-Nakagawa/example-cloudrun-pubsub/importer/schema"
    "github.com/Shinichi-Nakagawa/example-cloudrun-pubsub/importer/tracking"
    "github.com/go-playground/validator/v10"
    "io"
    "log"
    "net/http"
    "os"
)

var GoogleCloudProjectID string = os.Getenv("GOOGLE_CLOUD_PROJECT")
var GoogleCloudLoggingName string = os.Getenv("GOOGLE_CLOUD_LOGGING_NAME")
var logger *log.Logger
var ctx context.Context

func main() {
    // Context, logger
    ctx = context.Background()
    loggerClient := gcp.NewLoggingClient(ctx, GoogleCloudProjectID)
    logger = gcp.NewLogger(loggerClient, GoogleCloudLoggingName)
    // Determine port for HTTP service.
    http.HandleFunc("/", TrackingPubSub)
    port := os.Getenv("PORT")
    if port == "" {
        port = "8080"
        log.Printf("Defaulting to port %s", port)
    }
    // Start HTTP server.
    log.Printf("Listening on port %s", port)
    if err := http.ListenAndServe(":"+port, nil); err != nil {
        log.Fatal(err)
    }

}

func TrackingPubSub(w http.ResponseWriter, r *http.Request) {
    logger.Printf("TrackingPubSub: Start")

    // Parse the Pub/Sub message.
    var m schema.PubSubMessage
    body, err := io.ReadAll(r.Body)
    if err != nil {
        log.Printf("ioutil.ReadAll: %v", err)
        http.Error(w, "Bad Request", http.StatusBadRequest)
        return
    }

    // byte slice unmarshalling handles base64 decoding.
    if err := json.Unmarshal(body, &m); err != nil {
        log.Printf("json.Unmarshal: %v", err)
        http.Error(w, "Bad Request", http.StatusBadRequest)
        return
    }

    // Validate the message is from the topic we expect.
    form := string(m.Message.Data)
    var message *schema.Message
    err = json.Unmarshal([]byte(form), &message)
    if err != nil {
        logger.Printf("json.Unmarshal: %v", err)
        http.Error(w, "Bad Request", http.StatusBadRequest)
        return
    }
    validate := validator.New(validator.WithRequiredStructEnabled())
    err = validate.Struct(message)
    if err != nil {
        logger.Printf("validator.Struct: %v", err)
        http.Error(w, "Bad Request", http.StatusBadRequest)
        return
    }

    // BigQuery
    logger.Printf("TrackingPubSub: GCS(%v) to BigQuery(%v.%v)", message.Uri, message.DatasetId, message.TableId)
    bq, err := gcp.NewBigQueryClient(ctx, GoogleCloudProjectID)
    if err != nil {
        logger.Printf("gcp.NewBigQueryClient: %v", err)
        http.Error(w, "Bad Request", http.StatusBadRequest)
        return
    }
    gcsRef := gcp.NewGcsRef(message.Uri)
    err = tracking.ImportCSVData(message, bq, gcsRef, ctx)
    if err != nil {
        logger.Printf("tracking.ImportCSVData: %v", err)
        http.Error(w, "Bad Request", http.StatusBadRequest)
        return
    }
    logger.Printf("TrackingPubSub: Success")

}

いくつかの処理

package tracking

package tracking

import (
    "cloud.google.com/go/bigquery"
    "context"
    "fmt"
    "github.com/Shinichi-Nakagawa/example-cloudrun-pubsub/importer/schema"
)

func ImportCSVData(form *schema.Message, bqClient *bigquery.Client, gcsRef *bigquery.GCSReference, ctx context.Context) error {

    // GCS File
    gcsRef.SourceFormat = bigquery.CSV
    gcsRef.SkipLeadingRows = 1
    // BigQuery table schema
    gcsRef.Schema = SchemaTracking

    // Loader & Options
    loader := bqClient.Dataset(form.DatasetId).Table(form.TableId).LoaderFrom(gcsRef)
    loader.WriteDisposition = bigquery.WriteAppend

    job, err := loader.Run(ctx)
    if err != nil {
        return err
    }
    status, err := job.Wait(ctx)
    if err != nil {
        return err
    }

    if status.Err() != nil {
        return fmt.Errorf("job completed with error: %v", status.Err())
    }
    return nil
}


var SchemaTracking = bigquery.Schema{
    {Name: "pitch_type", Required: false, Type: bigquery.StringFieldType},
    {Name: "game_date", Required: false, Type: bigquery.DateFieldType},
    {Name: "release_speed", Required: false, Type: bigquery.FloatFieldType},
    {Name: "release_pos_x", Required: false, Type: bigquery.FloatFieldType},
    {Name: "release_pos_z", Required: false, Type: bigquery.FloatFieldType},
    // 約90カラム, 死ぬほど長いので割愛
    {Name: "spin_axis", Required: false, Type: bigquery.FloatFieldType},
    {Name: "delta_home_win_exp", Required: false, Type: bigquery.FloatFieldType},
    {Name: "delta_run_exp", Required: false, Type: bigquery.FloatFieldType},
}

package schema

package schema

// Pub/Subのスキーマ
type PubSubMessage struct {
    Message struct {
        Data []byte `json:"data,omitempty"`
        ID   string `json:"id"`
    } `json:"message"`
    Subscription string `json:"subscription"`
}

// 中のメッセージのスキーマ
type Message struct {
    Uri       string `json:"uri" validate:"required"`
    DatasetId string `json:"dataset_id" validate:"required"`
    TableId   string `json:"table_id" validate:"required"`
}

package gcp

package gcp

import (
    "cloud.google.com/go/logging"
    "cloud.google.com/go/bigquery"
    "context"
    "log"
)

func NewBigQueryClient(ctx context.Context, projectId string) (*bigquery.Client, error) {
    client, err := bigquery.NewClient(ctx, projectId)
    if err != nil {
        return nil, err
    }
    defer client.Close()
    return client, nil
}

func NewGcsRef(uri string) *bigquery.GCSReference {
    return bigquery.NewGCSReference(uri)
}

func NewLogger(client *logging.Client, name string) *log.Logger {
    logger := client.Logger(name).StandardLogger(logging.Info)
    return logger
}

func NewLoggingClient(ctx context.Context, projectId string) *logging.Client {
    client, err := logging.NewClient(ctx, projectId)
    if err != nil {
        log.Fatalf("logging.NewClient: %v", err)
    }
    return client
}

本格的にGoを書き始めて1年ちょいの実装です, ツッコミどころは色々ある覚悟はしています*10.

なお実装はこの辺を参考にしました.

cloud.google.com

cloud.google.com

TerraformでGoogle Cloudを使う

とりあえずここまでやっておけばアプリケーションは動き, 最初の構想も実現できてめでたしめでたし.

...と言いたい所ですが,

  • Pub/Sub駆動のQuick Start読むとわかるのですが, 途中の手順でサービスアカウントの作成や紐づけを沢山しており, 人力(gcloudコマンドまたはCloud Console)で手動管理すると破綻する香りがする*11.
  • これからPublisherなどもCloud Runで作るので少なくともクラウド(インフラ層)のコードは再現性を持たせたい(同じことを2回以上手でやりたくない).
  • Loggingなどプロジェクトもしくは複数アプリケーションの共同所有物の定義は?

などなど, いくつか課題が思いつきました. 私は個人開発したアプリのマイクロサービスが常時10個ちかくいる関係上, IaCやCI/CDで管理されていない奴が増えると地獄なのは身を持って知っています, その割にはIaC化サボってますが*12.

というわけで,

今回Terrafromで真面目にやった部分.

必要そうなものからTerraform*13でIaC*14化を実施しました.

Quick Startをtfファイルに

ここもやはりQuick Startのサンプルを元に構成を考えることにしました.

cloud.google.com

やったこととして,

  1. Quick StartのTerraformスニペットを順番に実行
  2. とりあえずmain.tfにベタ書きして動かす
  3. 自分のアプリに必要なものを少しずつ足す

以上のアプローチで試しました&うまくいきました.

Quick Startはスニペットで書かれているのでイメージ沸かないかもですが, 一枚のmain.tfにまとめるとこんな感じで勿論ちゃんと動きました.

provider "google" {
  project     = "your-google-cloud-project-id"
  region      = "asia-northeast1"
}
data "google_project" "project" {
}
# Enable Cloud Run API
resource "google_project_service" "cloudrun_api" {
  service            = "run.googleapis.com"
  disable_on_destroy = false
}
resource "google_cloud_run_v2_service" "default" {
  name     = "pubsub-tutorial"
  location = "asia-northeast1"
  template {
    containers {
      image = "gcr.io/your-google-cloud-project-id/pubsub"
    }
  }
  depends_on = [google_project_service.cloudrun_api]
}
resource "google_pubsub_topic" "default" {
  name = "pubsub_topic"
}
resource "google_service_account" "sa" {
  account_id   = "cloud-run-pubsub-invoker"
  display_name = "Cloud Run Pub/Sub Invoker"
}
resource "google_cloud_run_service_iam_binding" "binding" {
  location = google_cloud_run_v2_service.default.location
  service  = google_cloud_run_v2_service.default.name
  role     = "roles/run.invoker"
  members  = ["serviceAccount:${google_service_account.sa.email}"]
}
resource "google_project_service_identity" "pubsub_agent" {
  provider = google-beta
  project  = data.google_project.project.project_id
  service  = "pubsub.googleapis.com"
}

resource "google_project_iam_binding" "project_token_creator" {
  project = data.google_project.project.project_id
  role    = "roles/iam.serviceAccountTokenCreator"
  members = ["serviceAccount:${google_project_service_identity.pubsub_agent.email}"]
}
resource "google_pubsub_subscription" "subscription" {
  name  = "pubsub_subscription"
  topic = google_pubsub_topic.default.name
  push_config {
    push_endpoint = google_cloud_run_v2_service.default.uri
    oidc_token {
      service_account_email = google_service_account.sa.email
    }
    attributes = {
      x-goog-version = "v1"
    }
  }
  depends_on = [google_cloud_run_v2_service.default]
}

your-google-cloud-project-idの所を自身の環境に変えるとイケます.

いざやってみて思ったのが,

最初からgcloudコマンドじゃなくてTerraformで書いたほうがいいじゃん?

って事ですね.

個人開発はさておき, 昨今の仕事だとdocker composeじゃなくてtfで用意したほうが良いシチュエーションの方が多そう*15なのでなおのこと思いました.

必要そうなモノを加える

そして最終系の話を.

「Quick Startに無くて自分のアプリで必要なクラウドリソース」を整理した結果,

  • Logging周りの設定. 独自のLog Router & sinkの用意*16.
  • Pub/Subのメッセージをちゃんとスキーマ管理したいのでスキーマを追加. 何故スキーマが必要なのかはこのQiita*17を参照.

って事で上記の要素をごちゃごちゃ付け加えた結果がこちらとなります.

アプリ本体

main.tfだけ紹介, valiableとかoutputはご想像にお任せします.

なおzobは自分プロジェクトの接頭詞*18.

provider "google" {
  project     = var.google_cloud_project_id
  region      = var.region
}
data "google_project" "project" {
}
# Enable Cloud Run API
resource "google_project_service" "cloudrun_zob_importer" {
  service            = "run.googleapis.com"
  disable_on_destroy = false
}
resource "google_cloud_run_v2_service" "zob_topic_importer" {
  name     = var.prefix
  location = var.region
  template {
    scaling {
      min_instance_count = 0
      max_instance_count = 2
    }
    containers {
      image = "asia-northeast1-docker.pkg.dev/${var.google_cloud_project_id}/zobrist-platform-importer/api:test-20240114-05"
      resources {
        limits = {
          cpu    = "2"
          memory = "1024Mi"
        }
      }
      env {
        name = "GOOGLE_CLOUD_PROJECT"
        value = var.google_cloud_project_id
      }
      env {
        name = "GOOGLE_CLOUD_LOGGING_NAME"
        value = var.logging
      }
    }
  }
  depends_on = [google_project_service.cloudrun_zob_importer]
}

resource "google_pubsub_schema" "zob_importer_schema" {
  name = "schema-${var.prefix}"
  type = "AVRO"
  definition = "{\n  \"type\" : \"record\",\n  \"name\" : \"Avro\",\n  \"fields\" : [\n    {\n      \"name\" : \"uri\",\n      \"type\" : \"string\"\n    },\n    {\n      \"name\" : \"dataset_id\",\n      \"type\" : \"string\"\n    },\n    {\n      \"name\" : \"table_id\",\n      \"type\" : \"string\"\n    }\n  ]\n}\n"
}
resource "google_pubsub_topic" "zob_topic_importer" {
  name = "topic-${var.prefix}-2023"
  depends_on = [google_pubsub_schema.zob_importer_schema]
  schema_settings {
    schema = "projects/${var.google_cloud_project_id}/schemas/${google_pubsub_schema.zob_importer_schema.name}"
    encoding = "JSON"
  }
}
resource "google_service_account" "zob_importer_sa" {
  account_id   = "${var.prefix}-pubsub-invoker"
  display_name = "Zobrist importer Pub/Sub Invoker"
}
resource "google_cloud_run_service_iam_binding" "zob_importer_iam_binding" {
  location = google_cloud_run_v2_service.zob_topic_importer.location
  service  = google_cloud_run_v2_service.zob_topic_importer.name
  role     = "roles/run.invoker"
  members  = ["serviceAccount:${google_service_account.zob_importer_sa.email}"]
}
resource "google_project_service_identity" "zob_importer_pubsub_agent" {
  provider = google-beta
  project  = data.google_project.project.project_id
  service  = "pubsub.googleapis.com"
}

resource "google_project_iam_binding" "zob_importer_project_token_creator" {
  project = data.google_project.project.project_id
  role    = "roles/iam.serviceAccountTokenCreator"
  members = ["serviceAccount:${google_project_service_identity.zob_importer_pubsub_agent.email}"]
}
resource "google_pubsub_subscription" "subscription" {
  name  = "${var.prefix}-subscription"
  topic = google_pubsub_topic.zob_topic_importer.name
  push_config {
    push_endpoint = google_cloud_run_v2_service.zob_topic_importer.uri
    oidc_token {
      service_account_email = google_service_account.zob_importer_sa.email
    }
    attributes = {
      x-goog-version = "v1"
    }
  }
  depends_on = [google_cloud_run_v2_service.zob_topic_importer]
}

Logging

こちらもmain.tfだけの紹介で勘弁してほしい.

# var.project_id is set Google Cloud Project ID
provider "google" {
  project     = var.google_cloud_project_id
  region      = var.region
}

resource "google_logging_project_sink" "zob_loggging_sink_api" {
  name        = "${var.prefix}-sink-api"
  destination = "logging.googleapis.com/projects/${var.google_cloud_project_id}/locations/${var.region}/buckets/${google_logging_project_bucket_config.zob_loggging_bucket_api.bucket_id}"
  filter      = "logName = projects/${var.google_cloud_project_id}/logs/${google_logging_project_bucket_config.zob_loggging_bucket_api.bucket_id}"

  unique_writer_identity = true
}

resource "google_logging_project_bucket_config" "zob_loggging_bucket_api" {
    project    = var.google_cloud_project_id
    location  = var.region
    retention_days = 10
    bucket_id = "${var.prefix}-bucket-api"
}

説明は省略しますがとりあえずこれでいい感じになりました.

結び - 今後の課題

というわけで,

  • Cloud RunをPub/Subのメッセージをトリガーとしたイベント駆動で動かす
  • アプリケーション(Goで実装)を純然たるWeb APIとして実装(Pub/Sub専用のアプリではない)
  • 上記の構成をサービスアカウントなどの権限設定含めてTerraformでIaC(Infrastructure as Code)化

というモノを実現しました, TerrformでCloud Runを扱う際の参考になればと思います.

なお, この段階ではいくつか課題があります.

  • CI/CDの実行環境への取り込み. Terrafromごとやるとなると権限含めて考えることがたくさんあります.*19
  • tfstateをどこで扱う問題. 個人開発かつ開発者は私一人なので私のPCで管理しているが破綻することは目に見えている.*20
  • アプリケーションのCI/CDだけやりたいときはどうしたら? 毎回tf apply やるのは賢いと思っていなくて普通にアプリをビルドして出したいがそれをやるとtfstateとの差分(ry

趣味でやってるはずのプロジェクトが限りなく仕事のDevOps/SREっぽい悩みになってるのが笑えますがw

この辺ご意見やいい方法あるよ!っていう提案を心からお待ちしております.

最後までお読み頂きありがとうございました.

Appendix

関連エントリー

クローラーの実装は実は昨年シレッとやっています&これもイベント駆動です.

shinyorke.hatenablog.com

参考書籍

この記事に興味持ったらぜひこの辺読んでみてください, サービスの選び方やアーキの考え方の参考に.

参考文献

Pub/Subスキーマの重要性はこちらをご参考に.

qiita.com

*1:商売にしていない, する予定が無いという意味で.

*2:期限が切れるまでこれずっと言い続けます笑.

*3:いわゆる「リアーキテクチャ」「式年遷宮」的なムーブでもっと雑に言えば「試したいモノがいくつか合ったのでゼロから作り直し」ています.

*4:余談ですがこのクローラーの実装と開設は昨年ブログで書いています.

*5:少し言い訳すると, コードを丸っと公開したところで真似するの難しいでしょってのがあります. 特にTerraform周りが.

*6:これは試してブログで書くお気持ちがあります, 今回紹介している野球データ基盤の定時タスクをCloud Scheduler + Cloud FunctionsからCloud Run Jobにモダナイゼーションするつもり.

*7:データの同期などを考えるとPullの方が良いのですが今回はどっちでも良かったのでデフォルトのPushにしています(がいずれ変更するかも)

*8:いや, できるかもしれないけどイケてない作り方に走る可能性もあるので「急がば回れ」的にやりました.

*9:ネットワークの制約, PCが調子悪い...など. あと稀にQuick Startが間違っているとかそういうこともありそうです.

*10:のですが今回は別にGoが主役のブログじゃないのでいいです, と言い訳しておきます.

*11:Google Cloudをたくさん使うとわかるのですが無造作に作ったサービスアカウントがセキュリティホールとなり得たり, いらないと思って無効化したアカウントが実は大事な奴でアプリが止まったりなど結構曲者になりがちです.

*12:CI/CDのDevOpsパイプラインはGitHub Actionsという便利な道具のお陰でサボらず整備が出来ています.

*13:Terraformにした理由はみんな使ってるからです(雑)

*14:一応言っておくとInfrastructure as Codeのこと.

*15:Google Cloudに限った話でもなく, システムの構造を言語化する意味でもIaCはあった方がハッピーです.

*16:真似する方に向けてのアドバイスですが, 別にこれはやらなくても問題ないと思っています. デフォルトのsinkに吐き出されたやつをクエリするだけならわざわざrouterやsinkの用意はしなくてよいです. ただ, ログをBigQueryやGCSに保管したいみたいなユースケースを想定している方は対応必要です(今回のコードはそこまでしていませんが).

*17:この話書くと文章が倍になるなと思った矢先に若い知り合いが書いてたので良かった, 感謝🙏

*18:野球プロダクトの名称が「zobrist」って名前で由来は大好きな元メジャーリーガーのベン・ゾブリスト(シカゴ・カブスの18番で今永の前任者)

*19:Cloud Buildで動かすとかになると思いますが個人開発でそこまで用意するかは考えたい.

*20:GCSで管理ですかね?これはCI/CDの絡みを含めて早めに実現したい.