Lean Baseball

No Engineering, No Baseball.

GoとCloud Pub/SubをつかってBaseball Savantのクローラーを作ってみた

最初に言っておきます, 「野球」っていうワード以上に「エンジニアリング」なエントリーです*1.

野球方面の人は最初の雰囲気を読んで最後まで読むか判断してください, 「オオタニサン」っていうワードは出てこず「Go」「Pub/Sub」「コンテナ」とかそんなのばっかり出てきます🙏

という前置きはさておき, 最近はGoでモノを作るのが好きになってきており, 今はというと,

  • PyCon JP 2022およびデブサミでお披露目した「野球データ基盤」の刷新をGoで少しずつやる.
  • 野球データ基盤を構成するマイクロサービスのうち, Goでやったほうが効率良さそうなサービスをGoに書き換え.
  • クラウドサービスは引き続きGoogle Cloud(これはマスト)だが, マイクロサービスのワークロードはCloud Runに統一*2, 可能なら「マルチコンテナ*3」を採用してサービスを一つに統一したい.

といった事をしております, なお完成するとこんな感じになりそうな気がしています.

野球データ基盤第二弾の全体像(構想段階)

思ったよりGoで置き換えが可能なマイクロサービス多めで有ることに気が付きました.

その中でも特に「Baseball Savant(Statcast Search)からデータを収集してBigQueryに保存する」というデータ基盤らしい主要機能は現状すべて「Python + Cloud Functions」で処理しているのですが, これを「Go + Cloud Run」で丸っと書き換えを行っています.

Goで書き換え中のデータ基盤機能

作ってみた結果, GoとCloud Pub/Subの実践例として面白そうな気がしてきたのでコードを公開すると同時にブログとしてここに残そうと思います.

github.com

そんな当エントリーのお品書きはこちらとなります.

対象読者と前提知識

GoからGoogle Cloudのサービスを使ったことがある程度のリテラシーが必要となります.

cloud.google.com

pkg.go.dev

この辺の内容が理解できればこの先の話も伝わるかと思います.

なお, 基本的なGoとGoogle Cloudの説明は文脈と文章量の都合上, 必要最低限に抑えます.

やりたい事は何か

まずやりたいことですが,

  1. 毎朝9:00(JST)にクローラーを起動
  2. 前日のStatcastデータを「投手」「打者」ごとに取得
  3. 取得したデータを/YYYY-MM-DD/batter.csv/YYYY-MM-DD/pitcher.csv として保存
  4. 取得したCSVを読み込みBigQueryに保存 ※このエントリーでは扱いません

以上となります.

なおデータはこんな感じで使われます.

データの利用例

シンプルな構成で考えると「1〜4の処理を一つのプログラムとして作ってcrontabで順番に動かす」なのですが(いわゆる「モノリス」なパターン, 別にこの方法でも全然構わない), コードのメンテナンスがキツイのと「CSVだけほしい」みたいな時に自由に使えないので,

  1. 毎朝9:00(JST)にクローラーを起動 → 「trigger」マイクロサービスとして, Cloud Run Jobs*4等の定期実行*5を使って起動, 2以降の処理を起動(下図「処理イメージ」の①).
  2. 前日のStatcastデータを「投手」「打者」ごとに取得 → マイクロサービス「exporter」内でStatcast SearchのURLを組み立て, CSVとして取得(下図「処理イメージ」の②).
  3. 取得したデータを/YYYY-MM-DD/batter.csv/YYYY-MM-DD/pitcher.csv として保存 → マイクロサービス「exporter」内で取得結果をCloud Storageに保存(下図「処理イメージ」の②).
  4. 取得したCSVを読み込みBigQueryに保存 → マイクロサービス「importer」内でCSVを読み込みBigQueryにloading ※このエントリーでは扱いません&Go版は未実装.

という感じにマイクロサービスとして分けて考え, 構築することにしました*6.

処理イメージ

①のtriggerが以下のようなメッセージをPub/Sub TopicにPublishを行い,

// Pub/Subに投げるメッセージのスキーマ(打者成績)
{
  "season": 2023,
  "player_type": "batter",
  "game_date": "2023-08-10T00:00:00Z"
}


// 投手はこれ
{
  "season": 2023,
  "player_type": "pitcher",
  "game_date": "2023-08-10T00:00:00Z"
}

②のexporterが上記のメッセージを受け取り(Pub/Sub的にはSubscribeされたメッセージをPullして)処理する.

といった流れになります.

cloud.google.com

基本的には上記Pub/Subクイックスタート「クライアント ライブラリを使用して Pub/Sub でメッセージをパブリッシュおよび受信する」のちょっとした応用でコード自体は開発できちゃいます.

ちなみにPub/Subを使う理由ですが,

  • 使い慣れているから(最大の理由).
  • サービスの性質上, 外部公開(publicに外に出す)ものではなく, すべてinternalなネットワークに置きたかった.
  • 「内部限定のWeb APIにしてアクセスする」方法もあるが, Pub/Subでのピタゴラスイッチの方が利点が多い*7.

といった所が採用した理由となります.

GoとPub/Subを使ったクローラーの実装

説明で結構な文字数を使った気がします(小声)が, 実装自体は実は大したことはしていません.

  • Trigger(Pub/Sub的にはPublisher)がPub/Subにメッセージを投げる
  • Exporter(Pub/Sub的にはSubscriber)がメッセージを受け取り,
    • パラメーターからURLを組み立てる
    • http getしてCSVを取得
    • 取得したCSVをCloud Storageに保存

という流れとなります.

幸いにも, Baseball Savant(Statcast Search)は「日付(試合日)」「ポジション」をクエリにしてCSVを一発で引けるURLを用意してくれています.

# 2023-08-12時点の投手成績をCSVで取得
curl 'https://baseballsavant.mlb.com/statcast_search/csv?all=true&hfPT=&hfAB=&hfGT=R%7C&hfPR=&hfZ=&stadium=&hfBBL=&hfNewZones=&hfPull=&hfC=&hfSit=&hfOuts=&opponent=&pitcher_throws=&batter_stands=&hfSA=&hfInfield=&team=&position=&hfOutfield=&hfRO=&home_road=&hfFlag=&hfBBT=&metric_1=&hfInn=&min_pitches=0&min_results=0&group_by=name&sort_col=pitches&player_event_sort=api_p_release_speed&sort_order=desc&min_pas=0&type=details&&player_type=pitcher&game_date_gt=2023-08-12&game_date_lt=2023-08-12&hfSea=2023%7C' > pitcher.csv

上記をうまくExporter内で実装してあげることでいい感じにできそうです&実際にできました.

Trigger

Pub/Sub的にはPublisherの役割です.

おさらい的な話を書くと, 以下のSchemaで定義されたメッセージをpublishできたらOKです.

// Pub/Subに投げるメッセージのスキーマ(打者成績)
{
  "season": 2023,
  "player_type": "batter",
  "game_date": "2023-08-10T00:00:00Z"
}


// 投手はこれ
{
  "season": 2023,
  "player_type": "pitcher",
  "game_date": "2023-08-10T00:00:00Z"
}

このコードは「クライアント ライブラリを使用して Pub/Sub でメッセージをパブリッシュおよび受信する」のちょっとした応用で実装できちゃいます.

README.mdのサンプルから抜粋.

package main

import (
    "cloud.google.com/go/pubsub"
    "context"
    "encoding/json"
    "log"
    "time"
)


// 選手タイプ(投手 or 打者)の定義
type PlayerType string

const (
    PITCHER PlayerType = "pitcher"
    BATTER  PlayerType = "batter"
)


// メッセージのSchema
type Form struct {
    Season     int        `validate:"required, min=2015,max=2999" json:"season"`
    PlayerType PlayerType `validate:"required" json:"player_type"`
    GameDate   time.Time  `validate:"required" json:"game_date"`
}


// Pub/Sub TopicにPubrishするための関数
func Publish(ctx context.Context, topic *pubsub.Topic, form Form) {
    value, err := json.Marshal(form)
    if err != nil {
        log.Printf("Request Error: %s", err)
    }
    result := topic.Publish(ctx, &pubsub.Message{
        Data: value,
    })
    id, err := result.Get(ctx)
    if err != nil {
        log.Printf("Publish Error: %s", err)
    }
    log.Printf("Published a message; msg ID: %v\n", id)
}

// Pub/Sub Client
func NewPubSubClient(ctx context.Context, projectId string) *pubsub.Client {
    client, err := pubsub.NewClient(ctx, projectId)
    if err != nil {
        log.Fatalf("pubsub.NewClient: %v", err)
    }
    return client
}

// Pub/Sub Topicの作成
func Topic(client *pubsub.Client, topicID string) *pubsub.Topic {
    topic := client.Topic(topicID)
    return topic
}

func main() {
    // 試合日の取得, UTCで取得して2日引く(当日のデータは取れないため)
    t := time.Now().UTC()
    gameDate := t.Add(-2 * time.Hour * 24)
    // 必要なモノを作る
    ctx := context.Background()
    client := NewPubSubClient(ctx, GoogleCloudProjectID)
    topicExporter := Topic(client, PubTopicIDExporter)

    // 打者データ取得のためのメッセージ送信
    formBatter := Form{Season: 2023, GameDate: gameDate, PlayerType: BATTER}
    log.Printf("export batter game_date: %s", formBatter.GameDate)
    Publish(ctx, topicExporter, formBatter)
    log.Print("export batter end")

    // 投手データ取得のためのメッセージ送信
    formPitcher := Form{Season: 2023, GameDate: gameDate, PlayerType: PITCHER}
    log.Printf("export pitcher game_date: %s", formPitcher.GameDate)
    Publish(ctx, topicExporter, formPitcher)
    log.Print("export pitcher end")
}

全体で100行もいかない程度で書けちゃいました.

Exporter

Triggerは単なるメッセージを投げる役割なのでシュッとしたコードになりましたが, 受け手のExporterはやることが多いので分けて紹介します.

main

main.go自体はシンプルです.

「Pub/SubをSubscribeしてからの処理」をサラッと書いています.

package main

import (
    "context"
    "fmt"
    "log"

    "cloud.google.com/go/pubsub"
    "github.com/Shinichi-Nakagawa/baseball-savant-crawler-go/gcp"
    "github.com/Shinichi-Nakagawa/baseball-savant-crawler-go/savant"
)

func main() {
    // 必要なオブジェクトの作成, いくつかの変数は環境変数.
    ctx := context.Background()
    gcs := gcp.NewStorageClient(ctx)
    bucket := gcp.GetBucket(gcs, GcsBucketName)
    client := gcp.NewPubSubClient(ctx, GoogleCloudProjectID)

    // Subscriptionの取得&設定.
    sub := gcp.Subscription(client, PubSubSubscriptionID)
    sub.ReceiveSettings.MaxOutstandingMessages = 10

    // メッセージ受信後の処理
    err := sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
        // パラメーターの取得
        form, err2 := savant.CreateForm(string(msg.Data))
        if err2 != nil {
            log.Print(err2)
            return
        }

        // query(URL)と保存先のパス生成
        query := savant.Query(form)
        filename := savant.Filename(form)

        // ファイル取得&保存
        body, _ := savant.FetchAndAsString(query)
        gcp.WriteObject(bucket, fmt.Sprintf("%s/%s", GcsPathName, filename), body, ctx)
        log.Print(fmt.Sprintf("saved: %s", filename))
        msg.Ack()
    })
    if err != nil {
        log.Fatalf("sub.Receive: %s", err)
    }
}

baseball savant

Baseball Savant(Statcast Search)周りの処理.

といってもやることは,

  • URLを作る
  • 保存先のファイル名・パスを決める

以上となります.

/savant/main.goに書いた主要な処理はこちらです.

package savant

import (
    "fmt"
    "io"
    "net/http"
)

// 必要な定数定義
const (
    BASE_URL       = "https://baseballsavant.mlb.com/statcast_search/csv"
    PARAMETERS     = "all=true&hfPT=&hfAB=&hfGT=R%7C&hfPR=&hfZ=&stadium=&hfBBL=&hfNewZones=&hfPull=&hfC=&hfSit=&hfOuts=&opponent=&pitcher_throws=&batter_stands=&hfSA=&hfInfield=&team=&position=&hfOutfield=&hfRO=&home_road=&hfFlag=&hfBBT=&metric_1=&hfInn=&min_pitches=0&min_results=0&group_by=name&sort_col=pitches&player_event_sort=api_p_release_speed&sort_order=desc&min_pas=0&type=details&"
    QUERY_FORMAT   = "player_type=%s&game_date_gt=%s&game_date_lt=%s&hfSea=%d"
    FILE_FORMAT    = "%s/%s.csv"
    GAME_DT_FORMAT = "2006-01-02"
)

// URLクエリの作成
func Query(form Form) string {
    params := fmt.Sprintf(QUERY_FORMAT,
        form.PlayerType,
        form.GameDate.Format(GAME_DT_FORMAT),
        form.GameDate.Format(GAME_DT_FORMAT),
        form.Season,
    )
    return fmt.Sprintf("%s?%s&%s%s", BASE_URL, PARAMETERS, params, "%7C")
}

// 保存先ファイル名
func Filename(form Form) string {
    filename := fmt.Sprintf(FILE_FORMAT,
        form.GameDate.Format(GAME_DT_FORMAT),
        form.PlayerType,
    )
    return filename
}

// データ取得
func FetchAndAsString(query string) (string, error) {
    // HTTP GETリクエストを発行
    resp, err := http.Get(query)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()

    // レスポンスボディを読み込む
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", err
    }

    return string(body), nil
}

/savant/form.goにFormなどのI/Fを定義.

package savant

import (
    "encoding/json"
    "time"
)

// 選手タイプ(投手 or 打者)の定義
type PlayerType string

const (
    PITCHER PlayerType = "pitcher"
    BATTER  PlayerType = "batter"
)

// メッセージからSchemaを生成
func CreateForm(value string) (Form, error) {
    var form Form
    if err := json.Unmarshal([]byte(value), &form); err != nil {
        return form, err

    }
    return form, nil
}

// メッセージのSchema
type Form struct {
    Season     int        `validate:"required, min=2015,max=2999" json:"season"`
    PlayerType PlayerType `validate:"required" json:"player_type"`
    GameDate   time.Time  `validate:"required" json:"game_date"`
}

Google Cloud周りの処理

以下を参考に写経して試した後, 整理しました.

cloud.google.com

pkg.go.dev

整理して作った結果がこちら.

/gcp/pubsub.goにPub/Sub処理を固めて,

package gcp

import (
    "cloud.google.com/go/pubsub"
    "context"
    "log"
)

// Pub/Sub Client
func NewPubSubClient(ctx context.Context, projectId string) *pubsub.Client {
    client, err := pubsub.NewClient(ctx, projectId)
    if err != nil {
        log.Fatalf("pubsub.NewClient: %v", err)
    }
    return client
}

// Pub/Sub Subscriptionの取得
func Subscription(client *pubsub.Client, subID string) *pubsub.Subscription {
    sub := client.Subscription(subID)
    return sub
}

/gcp/gcs.goにCloud Storageに保存する処理を実装.

package gcp

import (
    "cloud.google.com/go/storage"
    "context"
    "fmt"
    "log"
)

// Cloud Storage Client
func NewStorageClient(ctx context.Context) *storage.Client {
    client, err := storage.NewClient(ctx)
    if err != nil {
        log.Fatalf("storage.NewClient: %v", err)
    }
    return client
}

// Cloud Storage Bucket取得
func GetBucket(client *storage.Client, name string) *storage.BucketHandle {
    bucket := client.Bucket(name)
    return bucket

}

// Cloud Storageにテキストとして保存
func WriteObject(bkt *storage.BucketHandle, name string, value string, ctx context.Context) {
    obj := bkt.Object(name)
    w := obj.NewWriter(ctx)
    if _, err := fmt.Fprintf(w, value); err != nil {
        log.Fatalf("fmt.Fprintf: %v", err)
    }
    if err := w.Close(); err != nil {
        log.Fatalf("w.Close: %v", err)
    }
}

毎回思うのですが, Google Cloudのクイックスタートは写経で動くので楽です, その後のアレンジに集中できるのは強みかもしれません.

結び

結果として手元で意図通り動いたので,

  • Cloud Runにホスト
  • Trigger部分をCloud Run Jobsで動かしてバッチ処理化

を引き続き進めつつ, BigQueryに保存する処理もどうにかしたいと思っています.

今回は元々Pythonで作っていたものを元にGoで作り直すというアプローチで開発しましたが, Goは本当に使いやすいなと思いました.

しばらくこのまま開発をして, 何かしらの続編を年内に出せたらと思っています.

そしてそろそろ野球の新ネタも...乞うご期待ください, 最後までお読みいただきありがとうございました.

*1:そろそろ分析の新ネタも出したいのですがそれはまた後日...

*2:過去バージョンはCloud FunctionsとCloud Runが混在していますが, Cloud Functionsも裏はCloud Runであること, コンテナ化前提の方が取り回しが良いので全部Cloud Runにしたいと思っている次第です.

*3:一つのCloud Runサービスで複数のコンテナを動かす仕組みのこと. 2023/8/13時点ではpre-GAなので正式提供ではない(いずれGAになると思いますが). これを使うと何が嬉しいかと言うと, K8sを使う方にはお馴染みである「サイドカー」パターンをCloud Runで使えるようになります.

*4:Cloud Runをバックグラウンドのジョブ, いわゆるバッチとして動かす事ができる機能のこと. こちらは既にGA済みのものです&近いうちにこのブログでも紹介するお気持ちはあります(いつ書けるかはわからない).

*5:Cloud Runにアプリがある前提だと, Cloud SchedulerやCloud Tasks等で外から動かすか, 前述のCloud Run Jobsを使うことになります.

*6:お気付きの方も多いと思いますがいわゆるETLってやつです. 余談ですが今回はデータ変換をBigQuery側でやる想定なので厳密にはELTパターンとなります(データ基盤内でデータ変換はほぼしていません).

*7:Pub/SubはいわゆるQueとしての使い方になるので, 「イベント駆動」でいい感じにサービス同士が非同期になるという所が主たる利点となります(Webだとどこかで同期になるので).