Hanatare's PaPa

Make life a little richer.

Virtual Space of Hanatare's PaPa

人生をほんの少しだけ充実させる

【Architecture】dbtとSnowflakeを使ってデータパイプライン構築する(実践編)

前回の記事ではSnowflakeとdbt Cloudの無料枠を使って環境の構築を行いました。今回の記事では、dbtの機能を使い実際の開発を行っていきたいと思います。

www.hanatare-papa.jp

記事のポイント
  • dbtの基本的な開発サイクル(モデル作成 → テスト → ドキュメント生成)を体験する。
  • dbtのコア機能であるref()を使い、SQLの依存関係を管理する。
  • 作成したデータ変換処理をdbt Cloudのジョブ機能で自動化する。

dbt Cloud IDEを操作する

dbt Cloudにログインし、左上の「Studio」メニューをクリックして、WebベースのIDE(統合開発環境)を開きます。

開発準備

まず、開発を行うための準備をします。具体的にはinitializeをして必要な開発資源を準備します。

プロジェクトの初期化

ファイルエクスプローラーに、「dbt_project.yml」や「models」ディレクトリなどが表示されていることを確認します。もし表示されていなければ、「Initialize your project」をクリックしてください。

以下の画面キャプチャはInitialize your projectクリック後の画面になります。

初期コミット

Version controlのリストから「Commit and Sync」を選択し、リストの文字をクリックします。

初期化されたファイル群をコミットするための画面が表示されたら、コミットメッセージには「Initial commit」などと入力し、「Commit Change」ボタンを押しましょう。

Version controlに何も表示されなくなれば、開発準備完了です。

補足

今回、mainブランチをそのまま操作して作業をしますが、本来の開発案件であればNGの方法になります。開発用のブランチを作りそこで開発ソースを作ることが望ましいので、業務適用される場合はご注意ください。

dbtモデルを構築する

dbtにおける「モデル」とは、単なるSELECT文が書かれたSQLファイルのことです。しかし、dbtはこれらのSQLファイルを賢く解釈し、それらの間の依存関係を自動で管理してくれます。ここでは、「Source → Staging → Mart」という階層的なアプローチでモデルを構築していきます。

Sourceの定義

dbtに「私たちの生データはSnowflakeのこのテーブルですよ」と教えてあげる必要があります。これを「Source」の定義と呼びます。

modelsディレクトリの中に、stagingという名前の新しいフォルダを作成し、そのstagingフォルダの中に、schema.ymlという名前で新しいファイルを作成します。

schema.ymlには以下の内容を記述します。

version: 2

sources:
  - name: raw_pos # このソースグループの好きな名前
    database: tasty_bytes_sample_data # Snowflakeのデータベース名
    schema: raw_pos # Snowflakeのスキーマ名
    tables:
      - name: menu # Snowflakeのテーブル名

このYAMLファイルによって、dbtはtasty_bytes_sample_data.raw_pos.menuテーブルをraw_pos.menuという名前で参照できるようになります。

Stagingモデルの作成

Stagingモデルは、生データを直接参照し、カラム名の変更や簡単なデータクレンジングなど、基本的な下準備を行う層です。 models/stagingフォルダの中に、stg_menu.sqlという名前で新しいファイルを作成します。

stg_menu.sqlには以下のSQLを記述します。

select
    menu_item_id,
    menu_item_name,
    truck_brand_name,
    cost_of_goods_usd,
    sale_price_usd
from {{ source('raw_pos', 'menu') }}

注目すべきは{{ source('raw_pos', 'menu') }}の部分です。これはdbtのJinjaテンプレート機能で、「raw_posというソースグループのmenuテーブルを参照してください」という意味になります。これにより、SQL内にデータベース名やスキーマ名をハードコーディングする必要がなくなります。

Martモデルの作成

Martモデルは、Stagingモデルを組み合わせて、ビジネス上の意味を持つ分析用のテーブルを作成する層です。ここでは、メニューアイテムごとの利益を計算するモデルを作ってみます。

modelsディレクトリの中に、martsという新しいフォルダを作成し、その中にmart_menu_profit.sqlという名前で新しいファイルを作成します。

mart_menu_profit.sqlに以下のSQLを記述します。

select
    menu_item_id,
    menu_item_name,
    truck_brand_name,
    (sale_price_usd - cost_of_goods_usd) as profit_usd
from {{ ref('stg_menu') }}

{{ ref('stg_menu') }}という関数が登場しました。これは「stg_menuという名前のdbtモデルを参照してください」という意味です。dbtはこのref関数を解釈し、「mart_menu_profitはstg_menuに依存している」という関係性を自動で把握します。これがdbtの最も強力な機能の一つです。

モデルの実行

作成したモデルを実行してみます。IDE下部のコマンドラインにdbt runと入力し、Enterキーを押します。

dbt run

処理が成功するとdbt上では以下のような表示になると思います。

Snowflakeのデータベース(PC_DBT_DB)の開発用スキーマ(DBT_)に、STG_MENUとMART_MENU_PROFITという2つのビューが作成されているはずです。

モデルの実行が失敗した場合

dbt runコマンドを実行し、stg_menu.sqlファイルで以下のようなエラーが出ている場合は、データベースやテーブルがない可能性もありますが、多くの場合は権限不足の可能性が高いです。

Snowflake adapter: Snowflake error: 002003 (02000): SQL compilation error:
Database 'TASTY_BYTES_SAMPLE_DATA' does not exist or not authorized.
01:57:35 Database Error in model stg_menu (models/staging/stg_menu.sql)

Snowflake側で以下のSQLを実行し、dbtが使用しているSnowflakeのロールに、TASTY_BYTES_SAMPLE_DATA データベースへのアクセス権限を付与してください。その際、your_dbt_roleはご自身のdbtのロールを設定してください

-- dbtが使用しているロールに対してUSAGE権限を付与
GRANT USAGE ON DATABASE TASTY_BYTES_SAMPLE_DATA TO ROLE <your_dbt_role>;

-- 参照するスキーマとテーブルにも権限が必要な場合があります
GRANT USAGE ON SCHEMA TASTY_BYTES_SAMPLE_DATA.raw_pos TO ROLE <your_dbt_role>;
GRANT SELECT ON TABLE TASTY_BYTES_SAMPLE_DATA.raw_pos.menu TO ROLE <your_dbt_role>;

your_dbt_roleはdbtの以下の手順で確認が可能です。

左側メニューの「Dashboard」>「Setting」をクリックする。

Development ConnectionでSnowflakeをクリックする。

Snowflakeの接続設定の画面からRoleを確認する。

データ品質を保証する (テストとドキュメント)

データパイプラインは、データの品質が命です。dbtには、データの品質を簡単にテストできる仕組みが組み込まれています。

Generic Testの実装

menu_item_idは各メニューアイテムに固有のIDの想定です。これがユニーク(一意)で、かつNULLでないことをテストしてみます。

先ほど作成したmodels/staging/schema.ymlに、以下のようにテストの定義を追記します。

version: 2

sources:
  - name: raw_pos # このソースグループの好きな名前
    database: TASTY_BYTES_SAMPLE_DATA # Snowflakeのデータベース名
    schema: raw_pos # Snowflakeのスキーマ名
    tables:
      - name: menu # Snowflakeのテーブル名
        columns:
          - name: menu_item_id
            tests:
              - unique
              - not_null

下2行に記載されたuniqueとnot_nullはdbtに組み込まれている汎用テスト(Generic Test)で、このように記述するだけでdbtがテスト用のSQLを自動生成してくれます 。  

テストの実行

コマンドラインでdbt testを実行します。

dbt test

すべてのテストがパスすれば、基本的な品質要件を満たしていることが保証されます。

exampleディレクトリ内のモデルがエラーを起こす

デフォルトの状態で dbt testを実行すると以下のように、作成した記憶のないファイルがエラーを起こします。

Failure in test not_null_my_first_dbt_model_id (models/example/schema.yml)

これは、models/example/my_first_dbt_modelの汎用テスト内にnot_nullが含まれており、作成されるテーブルにnullのデータが含まれるからです。exampleディレクトリ以下を削除すれば解消しますので、もしエラーが出た方は削除してください。

version: 2

models:
    - name: my_first_dbt_model
      description: "A starter dbt model"
      columns:
          - name: id
            description: "The primary key for this table"
            tests:
                - unique
                - not_null

    - name: my_second_dbt_model
      description: "A starter dbt model"
      columns:
          - name: id
            description: "The primary key for this table"
            tests:
                - unique
                - not_null

models/example/my_first_dbt_model.sql

/*
    Welcome to your first dbt model!
    Did you know that you can also configure models directly within SQL files?
    This will override configurations stated in dbt_project.yml

    Try changing "table" to "view" below
*/

{{ config(materialized='table') }}

with source_data as (

    select 1 as id
    union all
    select null as id

)

select *
from source_data

/*
    Uncomment the line below to remove records with null `id` values
*/

-- where id is not null

作成したソースをコミットする

後続でジョブ設定を行い、起動を自動化するため修正したソースをmainリポジトリに反映しておきます。

Version controlのリストから「Commit and Sync」を選択し、リストの文字をクリックし、新しいブランチを作成します。 Version controlのリストから「Merge this Branch」を選択し、リストの文字をクリックし、mainにマージします。

パイプラインを自動化する(dbt Cloud Jobs)

最後にこれまで手動で実行してきた変換とテストのプロセスを、dbt Cloudのジョブ機能を使って自動化します。dbtにはdbt Core(オープンソース版)と呼ばれる、サーバにホスティングするものもあるのですが、この「自動実行(オーケストレーション)」のためにAirflowのような別のツールをセットアップする必要があります。その点、dbt Cloudには、強力なジョブスケジューラ機能が含まれています。大規模なシステムになってくるとJP1などでパイプライン設定をしていくと思うので、あまりCloudのメリット享受にはならない気がしますが、ある程度規模が小さく、簡単にパイプラインを作ってしまいたい場合においては、Cloud版を選ぶメリットになります。

本番環境のセットアップ

左メニューのOrchestration > Enviroments > Create Enviroments をクリックします。

環境の情報を入力してSaveします。 dbt Versionは「latest version」を選択し、Connectionは「Snowflake」を選んでおきます。 Snowflakeの接続情報はユーザー (PC_DBT_USER)、ロール (PC_DBT_ROLE)、ウェアハウス (PC_DBT_WH) など初期構築時の情報を参考にしてください。Schemaに関しては、開発時と分けておく方が好ましいので、prodなどに変更しておくことを推奨します。

ジョブの作成とスケジューリング

左メニューのOrchestration > Jobs > Create Jobs をクリックします。

以下の画面のように設定を行うことで、作成したデータパイプラインは設定したタイミングで起動し、そのタイミングで最新で品質の保たれたデータマートを提供するという形になります。

Job name

ジョブの名前になるため、任意の名前を設定します。

Environment

実行環境を選択します。画面上は作成した環境のProductionです。

Commands

入力欄に「dbt build」と入力します。dbt buildは、dbt run、dbt test、dbt snapshot、dbt seedを賢く一度に実行してくれる便利なコマンドです。

Triggers

Run on scheduleをONにして実行したいスケジュールを選びます。 画面の設定は毎日日本時間の10時に起動するという設定にしています。

まとめ

Snowflakeとdbt Cloudを使い、データパイプラインの構築を行ってきました。データのロード、変換、テスト、そして自動化までの一連のパイプラインを、完全に無料で構築することで、データパイプライン仕組みを少しでも理解できていればと思います。今回の記事では、Snowflakeとdbt Cloudの基本的な部分だけを使った操作なので、今後はもう少し深堀をしていければと思います。具体的には以下のようなことが考えられます。

  • dbt-utilsを探求する: dbt Hubで公開されているdbt-utilsのようなコミュニティパッケージを導入し、便利なマクロを使ってみましょう。
  • より複雑なモデルに挑戦する: TASTY_BYTESの他のテーブルもSourceとして追加し、複数のテーブルをJOINする、より実践的なデータマートを構築してみましょう。
  • 特異テストを書く: uniqueやnot_nullだけでなく、特定のビジネスロジックを検証する独自のSQLテスト(Singular Test)を作成してみましょう。

今回の記事がデータ変換パイプラインの構築をしたい方の参考になっていれば幸いです。