仕事もプライベートもサーバレスなアーキテクチャでなるべく便利にCloudを使いたいと思ってる人です*1.
最近は趣味の開発(個人開発)の方で,
- MLBのトラッキングデータ「Baseball Savant」からStatcastのCSVデータを取得
- 取得したCSVデータを集計・クレンジングしてBigQueryに集約
- BigQuery上のデータを元にメジャーリーガーのパフォーマンス可視化・分析をしたり, 成績予測に活かしたりetc...これはまだ実現していない.
という計画を立てており, 今の所「取得したCSVデータを集計・クレンジングしてBigQueryに集約」というところまでいい感じに実現できました.
GCF(Google Cloud Functions)のクローラーで集めたデータをDataprocのSpark(タスクの実装はPySpark)で集計してBigQueryに保存という流れなのですが,
Dataprocのためにクラスタ(もしくはVM)建てるの微妙に面倒くさい(かつお金がかかる*2)のでサーバレスかつフルマネージドな奴で済ませたい!と思い, 手直しした結果いい感じにうまくいきました.
最近登場してずっと気になっていた, Dataproc Serverlessを使ってPySparkを動かしてみたのでその時のメモを残したいと思います.
TL;DR
気軽にSpark/Hadoopでバッチ的なデータ処理タスクができちゃうDataproc Serverless, コマンド(とちょっとしたCloudの基礎知識)があればいい感じに使えちゃいます.
なお, Jupyter Lab等のアドホックなnotebook実行はできないので注意(あくまでバッチだけです)
おしながき
想定読者
以前のエントリー同様, DataprocおよびSpark(PySpark)が何であるかをご存知の方向けの記事となります.
知らない方はApache Spark, Dataprocが何者であるかをご理解の上もう一度お読みください(基本的な要素は記事内で解説しません).
アーキ全体像・手順
実現するまでの全体像とざっくり手順および, できること(できないこと)を解説します.
今回のアーキテクチャ
今回やりたいことは(以前のエントリーと同じく),
- Datalake(Cloud StorageのBucket)にあるBaseball savantのCSVデータをSparkのデータセットとして保存
- 保存したデータセットを集計・重複排除し(これはSpark SQLを使って処理), BigQueryのテーブルと同じ単位にまとめる
- 既存のBigQueryテーブルに追加書き込みで保存
このようなタスクを以下のPySparkなスクリプトを用いて行います.
gist8033a6558098a50821b9a62a7264ca90
Google Cloudで使うサービスは
- Dataproc Serverless. このエントリーの主役.
- Google Cloud Storage(GCS). 元データのCSVを保存するBuket(Datalake)と, Dataproc Serverlessで実行するPySparkスクリプトを保存するBuket(Resources)が存在.
- BigQuery. 最終的なデータの保存先.
以上3つとなります.
動かすまでの手順
実際動かすまでの手順ですが,
- DataprocのAPIを有効化(初回のみ)
- VPCとSubnetを用意する(初回の環境構築).
- PySparkのスクリプトをGCSにアップロードする
- gcloudコマンドを叩いて実行
一番基本的な使い方はこの3ステップです.
公式ドキュメントの解説も合わせてご覧いただけると良いかと思います.
なお, Service Account(所謂権限周り)については, デフォルトのGCE Service Account(比較的強めの権限)を使ってやってます.
環境構築(初回のみ)
サーバレスなのでインスタンスの準備・管理は不要ですが,
限定公開Googleアクセス有効化かつ, 上向きfirewall(ingress)が全port開いてるsubnetが必要です.
gcloud compute firewall-rules create allow-internal-ingress \ --network="network-name" \ --source-ranges="subnetwork internal-IP ranges" \ --direction="ingress" \ --action="allow" \ --rules="all"
私はDefaultの穴を開けるのはどうかと思ったので, カスタムでVPCを作り, そのsubnetのfirewallを変更して対応しました.
ちなみにGUIでやってもOKです.
PySparkのスクリプトをGCSにアップロードする
Dataproc Serverlessの実行ファイル(このエントリーだと, 上記のソースコード)ですが, 以下のいずれかの場所に格納する必要があります.
- Dataprocからアクセス可能なGCS Buketにアップロードする(このエントリーで取った手段)
- クラスタ上のfile. (今回は使いませんでしたが)カスタムイメージからの実行もできるらしいのでおそらくそのパスに配置して実施
今回は「PySparkでspark sqlを処理するスクリプトを書いて実施」程度の軽めのタスクだったのでGCSに保存しました.
GCSへの保存はgsutilを使うなり, GUI上でアップロードするなりお好きな方法で大丈夫です.
gcloudコマンドで実行する
ここまで出来たらあとはコマンド一発です.
- PySparkスクリプトをserverlessモード実行
- regionはasia-northeast1
- BigQueryを使うのでBigQueryのJARを使いますね
という内容でスクリプトにした結果がこちらです.
gcloud beta dataproc batches submit \ --project baseball-sample-dataproc \ --region asia-northeast1 pyspark \ --batch batch-baseball-dataproc gs://hogefuga/script/csv2bq_daily.py \ --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.25.2.jar \ --subnet network-name
コマンドがbetaなんだ...というのにちょっと驚きましたが(これ確かGAなはずなんだけど?), なんの問題もなく無事タスクが動きました.
Servelessでできること(できないこと)
という感じで,
- PySparkのスクリプトを実行
- PySpark以外でも通常のSpark(Java)やRを元にしたコード実行
といった事をserverlessで行うことができます.
Sparkを使うユースケースとして, 「永続化するほどのデータじゃないけど, 一時的にデータ集計するためのリソースが欲しい(終わったらすぐ消す)」というのは割とありがちだと思うのですが, これをserverlessにちょこっとできるのは便利だと思いました, 手順の手数も少ないですし.
ただ,
アドホックにJupyter Lab(Jupyter notebook)に処理を書いて試行錯誤して実行, というユースケースには対応していません, あくまでバッチ的な処理が対象となります.
なので,
- Jupyter Labなどでアドホック実行したい時はDataprocクラスタを立てて行うか, 自前でSparkクラスタを用意
- アドホックにやった内容を定期実行する手段としてDataproc serverlessを使う
といった使い方が基本的な落とし所になりそうです.
結び
というわけで, 「Sparkをサーバー管理せずにサーバレスにやっちゃうぜ!」という方法を紹介しました.
結構使えるサーバレスなデータ集計基盤だと思うので今後もちょいちょい使おうと思います.
なお, 今日使いはじめたばかりなので料金的なところはまだ未検証です...が, 使った分課金なのでさほど影響はないはず...
また, Cloud SchedulerやCloud Taskあたりのスケジューラーをトリガーとして動かすこともまだしていないので引き続き試したいと思います.*3
引き続き使って感想を書きたいと思います.
*1:気がついてる方もいらっしゃるかと思いますが, クラウドエンジニアとしての私の芸風は「自分でサーバー管理したくない., サブスク的に使いたいからフルマネージド&サーバレスなもので終わらせる」だったりします.
*2:お金がかかる, の部分はterraformでIaC(Infra as a Code)化して運用を楽にし, ある程度緩和されましたがサーバレスではないので根本解決にはなっていません.
*3:一番キレイなのはCloud Composerをトリガーとすることかなと思いつつも, 個人開発のバッチのためにComposerのGKEを運用したくないのでこれは無しかな...