Airbyte + dbt クイックハンズオン on GCP

記事の概要

Airbytedbt を組み合わせるとどんな挙動をするのか、超高速で感触を掴んでみましょう。

両プロダクトの好きなところ(主観)

Airbyte

  • コネクタ爆増中
  • パースとか基本考えなくていいし、コードも一切書かなくても良きに計らってくれる
  • タダ

dbt

  • 普通のSQL書くだけでDWH上を綺麗にできる
  • ワークフローエンジン使わないでも何とかなる(うちは規模小さいので何ら問題ない)
  • タダ

最低限必要なリソース

  • Google アカウント
  • Docker Engine(Desktop) / Docker Compose がインストールされているマシン

GCPプロジェクト作成

  1. GCP管理コンソール にログイン

  2. 新規プロジェクト作成(プロジェクト名はお好きなもので結構です)

サービスアカウント作成

BigQuery 管理者を追加します。 こちらの記事を参考に、鍵(json)のダウンロードまで済ませてください。 zenn.dev

Airbyteのインストール

公式ドキュメントを参考にしてインストールをしてください。もし Docker が手元になくどうしても他に方法がないということであれば、イチかバチか Google Cloud Shell 上でやってみてください(体感 8 割くらいの確率でどこかでエラーが起きますが、画面更新で切り抜けられることがあります)

3 コマンドでサクっと起動します。

ポート番号8000で開いてみましょう。

Airbyte の転送設定

さて今回は誰でも使える PokeAPI (その名の通りポケモンのデータが盛り沢山)からミュウツーのデータを BigQuery に転送してみます。

  1. 起動直後に出てくるページですが、メールアドレスは必須、その他は任意です。

  2. チュートリアルは無視して、画面左側のメニューより "Connections" -> 右上の "New Connections" を押してください。

  3. まずデータソースの設定ですが、下記の通り入力・選択してください(ここでミュウツーを指定します) f:id:kwknkk:20211206140417p:plain

    • Name: any
    • Source type: PokeAPI
    • pokemon_name: mewtwo
  4. 次に転送先ですが、下記の通りにしてください。尚、予め BigQuery 側でデータセットやらテーブルを作成しておく必要はありません。こちらを入力すれば Airbyte が自動作成してくれます。優しいですね。 f:id:kwknkk:20211206140905p:plain

    • Name: any
    • Destination type: BigQuery
    • Default Dataset ID : hogemon_dataset(違う名前にするとサンプルのクエリがコケてしまうのでご注意ください)
    • Project ID: your project id
    • Credential JSON: your credential
    • Dataset Location: any
  5. 最後の仕上げです。 f:id:kwknkk:20211206143505p:plain

    • Sync frequency: manual
    • Sync mode: any

ここまで来たら Save しましょう。

実行(標準搭載のdbtモデルのみ)

はい、それでは転送の準備ができましたので、とりあえず dbt なしでやるとどうなるか見てみましょう。画面の "Sync now" を押せば BigQuery にミュウツーの各種パラメータ情報がロードされ、その後に Airbyte 標準仕様のdbtによる正規化が行われます。

f:id:kwknkk:20211206144254p:plain

成功しましたか?では BigQuery コンソールに飛んで結果をご覧ください。

f:id:kwknkk:20211206151632p:plain

f:id:kwknkk:20211206151743p:plain

_airbyte_raw_pokemon というテーブルがローデータで、それ以外のテーブルは Airbyte 標準搭載の dbt モデルが正規化してくれたものになります。

Airbyte 単品利用の場合はここまででお話は終わりです。では自前のdbt モデルを追加するとどうなるか見てみましょう。

実行(カスタム dbt モデルを追加)

先ほどの Airbyte の "Connections" の設定画面に戻ります。一番下の "Custom transformation" を開いて、画像の通り入力してください。

f:id:kwknkk:20211206143734p:plain

ここで自前の dbt プロジェクトの Git リポジトリを指定します。今回は私が用意したサンプルがありますので、そのままお使いください。 f:id:kwknkk:20211206163339p:plain

尚、上記リポジトリは本記事のために public にしてありますが、当然本番環境では private を使うことになるはずです。その際の URL 指定方法は下記となりますのでお気をつけください。

https://{username}:{token}@github.com/{user}/{repo}

詳細は公式ドキュメントをご覧ください。 docs.airbyte.io

では、ここまで設定できましたら改めて "Sync now" を押してみてください。転送〜Airbyte標準変換〜カスタム変換がシームレスに実行され、repo_moves_redblueというテーブルを開くとポケモン red/blue におけるミュウツーの技に関する詳細情報が出ているはずです。

おまけ

Airbyte をワークフローエンジンを使わず、直接 API を叩くこともなく、GCE インスタンス上で定期実行する方法を最後に簡単に紹介して締めくくりたいと思います。

Airbyte の定期実行ジョブには癖があり、それを利用することできめ細かい時間設定&manual実行では不可能なエラー時のSlack通知を実現することができます。例えば Airbyte で定期実行頻度を 1 時間と設定し、1 回だけジョブを実行した後に GCE インスタンスを停止するとします。その後、仮に 4 時間後にインスタンスを再起動した時の挙動は、ただ 1 回だけジョブが実行されるだけです。つまりインスタンスが停止している間に溜まった 4 回分のキューが、再起動後に連続して実行されるわけではないという事です。

この特性を利用し、私は例えば Airbyte 定期実行頻度を30分おき、GCE インスタンスの起動は 1 時間おきとして、インスタンスの起動と同時にジョブが 1 回だけ走るという設定をしていたりします。停止時間については処理時間を計測して余裕を持って停止するようにしています(起動停止の制御は GCE のインスタンススケジュールを利用)。

Airbyte は Airflow と相性が良いので、もしワークフローエンジンを入れるのであれば Airflow が良いかなと思います。

〜完〜