DigdagとEmbulkを利用してBigQueryにRDS(Aurora、MySQL)のマスタデータを同期する

こんにちは、バックエンドエンジニアの塩崎です。

先日、会社の広報のためのインターン生紹介記事にメンターとして掲載していただきました。 大学四年生のインターン生と一緒に写真撮影を行ったのですが、見た目だけではどちらが年上かわからなかったので、「メンターの塩崎(右)」という表記をされてしまいました(笑)

インターンでも実際のサービスに触れ、課題を解決!〜VASILY DEVELOPERS BLOGが公開されました〜

さて、VASILYではData WarehouseとしてGoogle BigQuery(BigQuery)を利用しています。

BigQuery内にはプロダクトのマスタデータとユーザーの行動ログが格納されています。 そして、それらに対する横断的なクエリを発行することでプロダクトの成長のためのKPIをモニタリングしています。

そのためAmazon Relational Database Service(RDS)に保存されているマスタデータをBigQueryに同期する処理を定期的に実行する必要があります。 先日、ワークフローエンジンであるDigdagとバルクデータローダーであるEmbulkを利用して、この処理を行うシステムを構築しました。 本記事ではどのようにしてDigdagとEmbulkを連携させたのかなどの具体的な方法を、設定ファイルを交えながら説明します。 なお、本システムが対象としているRDSのDBエンジンはAuroraですが、MySQLでも問題なく動作します。

EmbulkとDigdagについて

まずは本システムを構築するために使用したEmbulkとDigdagについて説明します。 どちらともTreasure DataさんによるOSSであり、ロゴの動物が可愛いです。

Embulk

EmbulkはいわゆるETLツールと呼ばれるソフトウェアです。 大量のデータを取り出し(Extract)、それを加工し(Transfer)、書き出す(Load)ことを主目的としたツールです。 EmbulkではMySQLやBigQueryなどの具体的なデータソースとの入出力に関する部分のソフトウェアはプラグインとして提供されております。 そのため、それらを組み合わせることによって様々な入出力に対応できることが特徴です。

Digdag

もう一方のDigdagはワークフローエンジンと呼ばれるソフトウェアです。 複数個のタスク間の依存関係からなるワークフローを定義し、そのワークフローの実行及び管理を行うソフトウェアです。 これだけですと、いまいち導入のメリットが分からないかもしれないので、具体例をお見せします。

例えば、処理1を実行した後に、処理2、処理3、処理4を順番に実行するというケースを考えます。 そして、これらの一連の処理(ワークフロー)を1日1回00:00に実行するとします。 この時、cronでこの問題を解決しようとすると、典型的には以下のようになります。

0 0 * * * 処理1
0 10 * * * 処理2 # 処理1は10分以内には終わるはず
0 20 * * * 処理3 # 処理2は10分以内には終わるはず
0 50 * * * 処理4 # 処理3は30分以内には終わるはず

ところが、このcronの設定には様々な問題があります。

  • 何かしらの原因によって処理1の実行時間が10分以上かかってしまった時に、処理2以降は実行を待つべき
  • 処理1が失敗した時には処理2、3、4の実行を取りやめるべき

この程度の複雑さであれば、以下のように書けば解決できます。

0 0 * * * 処理1 && 処理2 && 処理3 && 処理4

ですが、例えば以下のような複雑な要望があった場合にはcronではお手上げです。

  • 各処理に対して3回までのリトライを行いたい
  • 処理2・処理3・処理4は互いに独立な処理なので並列実行したい
  • 同時に並列実行する数は2に抑えたい

Digdagであればこのような場合でも以下に示すようなシンプルな設定ファイルでこれらの要望を叶えることができます。

+task1:
  sh>: 処理1

+parallel_tasks:
  _parallel: true
  +task2:
    _retry: 3
    sh>: 処理2
  +task3:
    _retry: 3
    sh>: 処理3
  +task4:
    _retry: 3
    sh>: 処理4

なお、ワークフローエンジンというジャンルのソフトウェアにはJenkins、Airflow、Luigiなどがありますが、Digdagはそれらと比べて以下のメリットを持ちます。

  • YAMLベースのシンプルな設定ファイルなので学習コストが低い
  • High Availability(HA)構成が容易
  • 分散環境での動作が容易

DigdagとEmbulkを組み合わせる

DigdagとEmbulkを組み合わせることによって、Embulkの良さをさらに引き出すことができます。 Embulkはあくまでも1つのETL処理を効率良く行なうためのツールなので、複数個のETL処理の管理はできません。

例えば、Auroraに保存されている複数個のテーブルをすべてBigQueryに転送する場合を考えます。 この時にはテーブル数と同じ回数だけEmbulkを起動する必要があります。 転送途中でエラーが発生してしまっても、リトライ処理や、失敗したことの通知はEmbulkではできません。 さらに、テーブルの同期の前後に任意の処理を挟み込むこともEmbulkではサポートされていません。

Digdagを利用することによって、Embulk複数回起動を効率的に管理したり、ETLの前後に処理を挟み込むことが容易になります。

システムの概要

以下に今回構成したシステムの概要図を示します。

f:id:vasilyjp:20170712205647p:plain

まず、上図で1〜4の番号が書かれた部分を説明します。 設定された時刻(現状では1日1回)になると、Digdagは以下の処理を行います。

1. 同期対象のテーブル情報の取得

同期対象のテーブル情報をAuroraから取得します。 デフォルトではAuroraに存在するすべてのテーブルの同期を行います。

2. Embulkの起動

DigdagがEmbulkを起動します。 複数テーブルの同期処理は互いに独立であるため、処理の高速化のために複数プロセスのEmbulkを起動し、並列処理を行います。

3. AuroraからBigQueryへの同期

EmbulkがAuroraからBigQueryへの同期処理を行います。 AuroraやBigQueryへの接続に必要な情報はDigdagから渡されます。 Digdagから渡された情報はEmbulkに組み込まれているテンプテートエンジンのLiquidによって展開されます。

4. 実行ログの保存

1〜3の各処理の実行ログはS3にアップロードされます。 実行ログにはタスクの開始時刻・終了時刻・標準出力へ出力した内容などが含まれます。

また、タスクが失敗した場合はSlackにそのことが通知されます(上図右上)。

f:id:vasilyjp:20170712205716p:plain

さらに、タスクの実行ログの確認や失敗したタスクのリトライ処理をDigdagに組み込まれているweb UI(Digdag UI)から行うことができます(上図左上)。

f:id:vasilyjp:20170712205744p:plain

Digdag導入の理由

TECH BLOGの以前の記事ではgoラッパーとEmbulkを利用した同期システムを紹介しましたが、今回のシステムを構築するにあたってはgoラッパーは使用しませんでした。

Embulkを利用したデータ転送基盤の構築

このままgoラッパーを拡張しながら利用し続けることと比べて、Digdagを採用することで以下の問題を簡単に解決することができたからです。

タスクのリトライ処理

タスクの1回あたりの成功確率が十分に高かったとしても、そのタスクを連続で行うと、すべてのタスクの成功する確率は低くなります。 例えば、成功確率が99.9%のタスクを1,000回連続で行う場合、それらすべてが成功する確率は36.8%しかありません。 10,000回連続で行う場合では、わずか0.0045%になってしまいます。 しかし、失敗した時にリトライ処理を1回行うだけで、これらの確率はそれぞれ99.9%、99.0%になります。 実際に、今回のようなネットワークアクセスを行うタスク(Aurora、BigQueryなど)はネットワークの不調によってタイムアウトすることがあります。 その影響でタスクの実行に失敗することもあるため、リトライ処理は必須です。

以下のグラフに連続でタスクを実行した時のすべてが成功する確率(縦軸)とタスクの連続回数(横軸)の関係を示します(注:縦横軸は対数軸)。 3回までのリトライを許容すると、109(10億)回連続で実行しても99.9%の確率で全てのタスクが成功します。

f:id:vasilyjp:20170712205803p:plain

このように、リトライ処理は非常に効果的な一方で、実装やテストが面倒なためしばしば忘れがちな処理でもあります。 Digdagではタスク定義の中に1行の設定を書くだけでリトライ処理を行うことができるので、リトライ処理を簡単に導入することができます。

_retry: 3

+task1:
  sh>: sometimes_fail.sh
+task2:
  sh>: sometimes_fail.sh
+task3:
  sh>: sometimes_fail.sh

また、ワークフローの後半部分のタスクが失敗した場合、ワークフローの最初からやり直すのは無駄です。 特にワークフロー前半部分のタスクの処理時間が長い場合には無駄が顕著です。 そのため、失敗したタスクのみのリトライ処理をしたいです。

f:id:vasilyjp:20170712205844p:plain

Digdagではタスク毎にリトライを設定することでこの問題を解決することができます。

+task1:
  _retry: 3
  sh>: sometimes_fail.sh
+task2:
  _retry: 3
  sh>: sometimes_fail.sh
+task3:
  _retry: 3
  sh>: sometimes_fail.sh

並列実行

互いに独立なタスクを並列実行することによって、ワークフロー全体の実行時間を短くすることができます。 また、それぞれのタスクが使うマシンリソースの種類(CPU、メモリ、ネットワークなど)が違う場合にはマシンリソースの有効活用をすることができます。

しかし、ミスなく並列処理を行うプログラムを書くのは難しく、再現性のないバグと戦うことも少なくありません。 Digdagを使用することによって、この並列処理の面倒な部分をDigdagが肩代わりしてくれるため、Digdag利用者は個々のタスクに集中することができます。 さらに、並列実行数の調整もDigdag起動時のオプションで簡単に行うことができます。

HA構成

ワークフロー管理をcronで行う場合の問題点の1つとしてあげられるのは、そのサーバーが落ちた場合にどうするかということです。 cronサーバーはステートレスなサーバーではないため、簡単にスケールアウトできません。

Digdagはワークフローそのものやワークフローの状態をPostgreSQLに保存することによってこの問題を解決しています。 タスクの実行を行うworkerやDigdag UIは状態をステートレスなため簡単にスケールアウトすることができ、HA構成にすることができます。

もちろんDigdagをHA構成にするときには、RDSをMulti-AZ構成にするなどの方法でPostgreSQL自体もHA構成にする必要があります。

Digdag UIでタスクの実行ログを確認

Digdagサーバーにはweb UI(Digdag UI)が付属しており、これを利用して以下の操作などを行うことができます。

  • ワークフロー実行ログの確認
  • 失敗したワークフローのリトライ
  • ワークフローを今すぐに実行

これらの操作をDigdag UIから行うことができるため、Digdagのコマンドに詳しくないメンバーでも運用作業を行うことができます。

Digdag UIの起動方法はドキュメントのどこにも書いてありませんが、実はDigdagサーバーを起動した時に立ち上がっています。 デフォルトでは、65432ポートで立ち上がっているので、http://localhost:65432/ にアクセスすることでDigdag UIを見ることができます。 なお、GitHubのREADMEのRequirementsにはNode.js 7.xと書いてありますが、Digdag UIの開発時以外にはNode.jsは不要です。

設定ファイル

それではここからは具体的な設定ファイルを交えながら説明をします。

Digdag

aurora_to_bigquery.dig

timezone: Asia/Tokyo

# Slack通知のためのプラグインの読み込み
_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - com.github.szyn:digdag-slack:0.1.1
  webhook_url: https://hooks.slack.com/services/HOGE
  workflow_name: aurora_to_bigquery

# 認証情報の取得
+get_db_info:
  _retry: 3
  _export:
    docker:
      image: ruby:2.4.1
      build:
        - gem install aws-sdk
  rb>: Prepare.get_db_info
  require: 'tasks/prepare'

# 同期対象テーブルの取得
+get_tables_to_sync:
  _retry: 3
  _export:
    docker:
      image: ruby:2.4.1
      build:
        - gem install mysql2
  rb>: Prepare.get_tables_to_sync
  require: 'tasks/prepare'

# 同期処理
+sync:
  for_each>:
    table: ${TABLES}
  _parallel: true
  _do:
    call>: sync_one_table.dig

このワークフローは大きく3つの部分に分かれています。 それぞれはget_db_info、get_tables_to_sync、syncという3つのタスクです。

最初のget_db_infoではAuroraに接続するためのパスワードやGoogle Cloud Platform(GCP)に接続するためのJSONキーを取得します。 これらはAmazon Key Management System(KMS)に格納されているため、Ruby版aws-sdkで取得され、Digdagの変数にセットします。

2つ目のget_tables_to_syncでは同期対象のテーブルの一覧を取得します。 その内部では、SHOW TABLESを実行し、テーブルの一覧を取得しています。

そして最後のsyncが同期処理の本体で、内部ではEmbulkを呼び出しています。 上流の2つのタスクによって設定された変数を使い、複数個のテーブルの同期処理を行います。 テーブル毎にパラメーターを変更しながらEmbulkを実行するためにcallオペレーターを利用しています。 また、_parallel: trueが設定されているため、並列処理が行われます。 同時に並列実行される数はDigdagサーバーを起動する時の--max-task-threadsオプションによって制御されています。 そのため、すべてのテーブルの同期処理が同時に実行されることはなく、適切にスレッドプールが使われます。

これらのタスクの実行する環境を隔離するために、Dockerを利用しています。 これによって、Digdagが動いているサーバーそのものにRubyやgemなどを入れる必要がなくなり、サーバー構築の手間を減らすことができます。 自分たちでDockerfileやDockerイメージの管理をするのは煩雑ため、DockerイメージはDockerHubに保存されている公式イメージを利用しています。 そして、それらのイメージの上に必要なライブラリなどをインストールしています。 そのため、初回実行時はgemのインストールのために少々待たされます。 ですが、2回目以降には構築済みのDockerイメージが利用されるため、コンテナが高速に起動します。

sync_one_table.dig

+get_timestamp_columns:
  _retry: 3
  _export:
   docker:
      image: ruby:2.4.1
      build:
        - gem install mysql2
  rb>: ColumnOption.get_timestamp_columns
  require: 'tasks/column_option'

+get_columns_to_drop:
  _export:
    docker:
      image: ruby:2.4.1
  rb>: ColumnOption.get_columns_to_drop
  require: 'tasks/column_option'

+run_embulk:
  _retry: 1
  _export:
    EMBULK_INPUT_TABLE: ${table}
    EMBULK_OUTPUT_TABLE: ${table}
    docker:
      image: java:7-jre
      build:
        - curl --create-dirs -o /bin/embulk -L https://dl.bintray.com/embulk/maven/embulk-0.8.25.jar
        - chmod +x /bin/embulk
  sh>: embulk gem install embulk-input-mysql embulk-filter-column embulk-output-bigquery && embulk run aurora_to_bigquery.yml.liquid
  _error:
    slack>: failed-to-sync-table-template.yml

このサブワークフローでテーブル毎の設定を取得し、それに基づきEmbulkを起動します。 このワークフローは前述したaurora_to_bigqueryからcallオペレーターによって呼び出されます。

このワークフローはget_timestamp_columns、get_columns_to_drop、run_embulkからなります。 前半の2つのタスクでテーブル毎の固有の情報を環境変数にセットし、最後のタスクrun_embulkでEmbulkを起動します。

get_timestamp_columnsではDATETIME型のカラム名を環境変数に設定しています。 デフォルトのまま同期するとAuroraでDATETIME型のカラムはBigQueryではTIMESTAMP型になります。 Auroraでの日付型がBigQueryでも日付型になるので一見すると問題ないようにも思えます。 しかし、BigQueryのweb UIでクエリの実行結果を確認する時に表示がUTCになってしない、脳内で9時間ずらすのがやや煩雑です。 そのため、%Y-%m-%d %H:%M:%S%:zというフォーマットのSTRING型で保存することによって見やすさと日付計算のしやすさのバランスを取っています。 このフォーマットであれば通常はJSTで表示されるため人間に確認がしやすく、また、TIMESTAMP型への変換も容易であるため日付計算を行いやすいです。 ちなみに、内部的には以下のSQLを発行し、テーブルのスキーマ情報を読みだしています。

SELECT COLUMN_NAME, COLUMN_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = (SELECT database()) AND TABLE_NAME = '#{table_name}'

get_columns_to_dropでは同期対象外のカラムを環境変数に設定します。 本システムで扱うデータにはセンシティブなデータも含まれるため、それらのカラムを同期対象から外します。 どのカラムを除外するのかはYAMLによって制御されています。 この処理のためにはembulk-filter-columnプラグインを利用しました。

最後のrun_embulkタスクによってEmbulkが呼び出されます。 同期処理のために必要なEmbulk gemのインストールを行った後に、Embulkを起動してテーブルの同期処理を行います。 この処理の最中にエラーが発生した場合はSlackに通知が行われます。

Embulkに引数として渡している aurora_to_bigquery.yml.liquid ファイルについての詳細は次節で説明します。

Embulk

aurora_to_bigquery.yml.liquid

in:
  type: mysql
  host: {{ env.EMBULK_INPUT_DB_HOST }}
  user: {{ env.EMBULK_INPUT_DB_USER }}
  password: {{ env.EMBULK_INPUT_DB_PASS }}
  database: {{ env.EMBULK_INPUT_DB_DATABASE }}
  table: {{ env.EMBULK_INPUT_TABLE }}

{% if env.EMBULK_FILTER_DROP_COLUMN != '' %}
filters:
  - type: column
    drop_columns:
    {% assign drop_columns = env.EMBULK_FILTER_DROP_COLUMN | split: ',' %}
    {% for column in drop_columns %}
      - { name: {{column}} }
    {% endfor %}
{% endif %}

out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile:
    content: |
        {{ env.EMBULK_OUTPUT_JSON_KEY_CONTENT }}
  project: {{ env.EMBULK_OUTPUT_PROJECT }}
  dataset: {{ env.EMBULK_OUTPUT_DATASET }}
  table: {{ env.EMBULK_OUTPUT_TABLE }}
  open_timeout_sec: 300
  send_timeout_sec: 300
  read_timeout_sec: 300
  retries: 5
  gcs_bucket: {{ env.EMBULK_OUTPUT_GCS_BUCKET }}
  auto_create_gcs_bucket: false
  compression: GZIP
  source_format: NEWLINE_DELIMITED_JSON
  default_timezone: "Asia/Tokyo"
  column_options:
  {% assign columns = env.EMBULK_OUTPUT_TIMESTAMP_COLUMNS | split: ',' %}
  {% for column in columns %}
    - { name: {{ column }}, type: STRING, timestamp_format: "%Y-%m-%d %H:%M:%S%:z" }
  {% endfor %}

このファイルがEmbulkに渡す設定ファイルです。 Embulkに組み込まれたテンプレートエンジンであるLiquidを使用しています。 shオペレータを使ってコマンドを実行すると、Digdagの変数が環境変数にセットされます。 そのため、Embulk内では、env.<環境変数名>という記法でそれを読み出すことができます。

環境変数にはスカラー型の情報しか格納できないため、配列として格納したい時にはカンマ区切りで格納して、それをsplitしています。

また、以下の環境変数はKMSに保存されており、上流のget_db_infoタスクによってセットされた状態でEmbulkが起動されます。

  • EMBULK_INPUT_DB_PASS
  • EMBULK_OUTPUT_JSON_KEY_CONTENT

認証情報をYAMLやソースコードに直に埋め込む必要がなくなったため、良いアイデアかと思いましたが、web UIでこの認証情報が丸見えになってしまいます。 なので、何かしらの対策が今後の課題として残っています。 secret機能を利用することで、ログに認証情報が平文で残ることがなくなるので、この機能の利用を検討しています。

f:id:vasilyjp:20170712205918p:plain

Digdagサーバーの構成

デーモン化

Digdagサーバーはsystemdを利用してデーモン化を行っています。 デーモン化に必要なファイルについては、以下のgistを参考にしました。 https://gist.github.com/uu59/23968e3983adcf5b4cce400710b51cb2

ワークフローやタスクの状態を管理するDBにはH2DBを利用し、DigdagサーバーのローカルディレクトリにDBファイルを配置します。 今回の構成ではDigdagサーバーは1台だけなので、local-agentとexecutor-loopの両方を有効にします。

ワークフローのソースコードやタスクの実行ログはS3に保存します。 S3にアクセスするための認証情報はIAMインスタンスプロファイルを利用するために、設定ファイルには書き込んでいません。

/etc/systemd/system/digdag.service

[Unit]
Description=digdag

[Service]
User=root
Restart=always
TimeoutStartSec=30s
Type=simple
ExecStart=/etc/init.d/digdag.sh

[Install]
WantedBy=multi-user.target

/etc/init.d/digdag.sh

#!/usr/bin/env bash

set -ue

exec >> /var/log/digdag.log
exec 2>> /var/log/digdag-error.log

exec /usr/local/bin/digdag server --database /var/digdag/database --access-log /var/digdag/log/access_log --config /etc/digdag/config.properties

/etc/digdag/config.properties

server.bind=127.0.0.1
server.port=65432
database.type=h2

archive.type=s3
archive.s3.bucket=<s3 bucket name>
archive.s3.path=digdag/archive

log-server.type=s3
log-server.s3.bucket=<s3 bucket name>
log-server.s3.path=digdag/log

Digdag UI

Digdag UIに対してVPCの外側からアクセスをするためにElastic Load Balancer(ELB)を利用しました。 今回のケースですとDigdagサーバーが1台しかないため、ELBを負荷分散のためには使用していません。 しかし、DigdagがHA構成を簡単に構築することを考慮し、HA構成に変更するときのためにELBの下にDigdagを紐付けました。

また、Digdag UIにアクセスするとワークフローに関するほぼすべての操作を行うことができます。 そのために、Digdag UIにアクセスできるユーザーを制限する必要があります。 現状では、ELBで接続元のIPアドレスを見てアクセス制限を行っています。

f:id:vasilyjp:20170712205934p:plain

デプロイ

ワークフローのデプロイはCircleCIから行います。 GitHubでのmasterブランチに対するmergeをhookし、CircleCI上のコンテナからdigdag pushコマンドを発行します。

f:id:vasilyjp:20170712205952p:plain

まとめ

DigdagとEmbulkを併用することでAuroraに保存されているマスタデータをBigQueryに同期するシステムを構築することに成功しました。 Embulk単体では使いづらい部分がいくつかありましたが、Digdagがその部分を補うことによってシステムとしての完成度を高めることができました。 特に、リトライ処理機能、並列実行の機能、web UIによる実行ログの確認機能は自力で書くのが大変なため、Digdagがこれらの面倒を見てくれることに大変助かっています。

Digdagに関する資料は少ないので、問題があった時にStack OverflowやQiitaを見ても情報が全く見つからない時もあります。 そのため、Digdagのソースコードを読んで問題の解決を行うこともしばしばありました。 VASILYではこのような困難に挑戦しながらも新技術を積極的に導入したい仲間を大募集中しています。 興味のある方は以下のバナーからご応募ください。

参考リンク

Digdag公式HP

Digdagの公式HPです。 Digdagの機能に関するビデオを見ると、一通りの機能について知ることができます。

Digdag公式ドキュメント

Digdag公式ドキュメントです。 まだまだドキュメントに書かれていない部分も多いので、詳しい挙動が気になったときにはソースコードを読むのが良いでしょう。

Embulk公式HP

Embulkの公式HPです。 Embulkは公式HPとドキュメントページが共通です。

digdag-slack

Slackに通知を行うためのDigdagプラグインです。 httpオペレーターを使ってSlackに通知するのと比べると、より柔軟な通知が可能です。

embulk-input-mysql

MySQLからデータを読みだすためのEmbulkプラグインです。 Auroraからデータを読みだすことにも使えます。

embulk-output-bigquery

BigQueryにデータを書き出すためのEmbulkプラグインです。 一旦Google Cloud Strage(GCS)にデータを転送し、そこからBigQueryにロードすることで、処理の安定化・高速化を行うことができます。

embulk-filter-column

特定のカラムに対する処理を行うためのEmbulkプラグインです。 今回は特定のカラムを削除するために使用しましたが、それ以外の用途にも使うことができます。