Heroku のストリーミングデータコネクター
この記事の英語版に更新があります。ご覧の翻訳には含まれていない変更点があるかもしれません。
最終更新日 2025年03月10日(月)
Table of Contents
この記事では、Heroku Postgres イベント用に Change Data Capture (CDC) を設定し、Private Space または Shield Private Space にプロビジョニングされた Apache Kafka on Heroku アドオンにそれらのイベントをストリーミングする方法について説明します。このプロセスは、大きく分けて 3 つの手順で構成されます。
- Private Space または Shield Private Space にアプリを作成する
- Private または Shield の Heroku Postgres アドオンと、Private または Shield の Apache Kafka on Heroku アドオンを新しいアプリにプロビジョニングする
- ストリーミングデータコネクターを作成して Postgres から Kafka への CDC イベントを有効にする
ストリーミングデータコネクターが機能するのは、Private または Shield の Apache Kafka on Heroku アドオンと Private または Shield の Heroku Postgres アドオンが同じ Private Space または Shield Private Space にある場合のみです。
ストリーミングデータコネクターを最適に設定する方法についての詳細は、「Heroku のストリーミングデータコネクターのベストプラクティス」を参照してください。
Heroku アプリの設定
まず、Private Space または Shield Private Space を作成します。Space が利用可能になったら、その Space にアプリを作成できます。
$ heroku spaces:create --region virginia --team my-team-name --space myspace
$ heroku spaces:wait --space myspace
$ heroku apps:create --space myspace my-cdc-app
Heroku アドオンの設定
次に、2 つの Private または Shield のデータアドオンをアプリにアタッチする必要があります。
$ heroku addons:create heroku-postgresql:private-7 --as DATABASE --app my-cdc-app
$ heroku addons:create heroku-kafka:private-extended-2 --as KAFKA --app my-cdc-app
アドオンのプロビジョニングの進行状況は、次のようにして監視できます。
$ heroku addons:wait --app my-cdc-app
アドオンが利用可能になったら、スキーマとデータを Postgres データベースにインポートします。
Heroku のストリーミングデータコネクターの設定
Private Space または Shield Private Space アプリで Heroku Postgres アドオンと Apache Kafka on Heroku アドオンを設定したら、コネクターをプロビジョニングできます。
最初に、CLI プラグインをインストールします。
$ heroku plugins:install data
コネクターを作成するには、いくつかの情報を収集する必要があります。
- Kafka アドオンの名前
- Postgres アドオンの名前
- そこで発生したイベントをキャプチャする Postgres テーブルの名前
- (オプション) キャプチャイベントから除外する列の名前
Postgres データベースでイベントをキャプチャするためには、いくつかの要件が満たされている必要があります。
- データベースのエンコードが UTF-8 であること
- その時点でテーブルが存在していること
- テーブルにプライマリキーがあること
- テーブルがパーティション分割されていないこと
- テーブル名に
[a-z,A-Z,0–9,\_]
以外の文字が含まれていないこと - Kafka Formation で直接の Zookeeper アクセスが無効になっていること
キャプチャするテーブルの選択には注意が必要です。1 つのコネクターでは、多くのテーブルからの大量のイベントを処理しきれない場合があります。
次に、コネクターを作成できます。Postgres アドオンと Kafka アドオンの名前に加えて、データベースキャプチャイベントに含める完全修飾テーブルのリストが必要です。
$ heroku data:connectors:create \
--source postgresql-neato-98765 \
--store kafka-lovely-12345 \
--table public.posts --table public.users
プロビジョニングの完了まで約 15 ~ 20 分かかることがあります。コネクターのプロビジョニングの進行状況は、次のようにして監視できます。
$ heroku data:connectors:wait gentle-connector-1234
ストリーミングデータコネクターでは、データの変更をキャプチャするために選択したテーブルごとに、Kafka クラスターにトピックが作成されます。コネクターが利用可能になったら、新しく作成された Kafka トピックなどの詳細を表示できます。
$ heroku data:connectors:info gentle-connector-1234
=== Data Connector status for gentle_connector_1234
Name: gentle_connector_1234
Status: available
=== Configuration
Table Name Topic Name
public.posts gentle_connector_1234.public.posts
public.users gentle_connector_1234.public.users
これらのトピックは次のパラメータで設定されます。
partition_count
: 32replication_factor
: 3cleanup_policy
: deleteretention_time_ms
: 24 時間
プリンシパル (Kafka ユーザー) は、ストリーミングデータコネクターによって作成されたテーブルトピックに対して、Read
および Describe
アクセス権を持ちます。Write
、Delete
、および Alter
操作は拒否されます。
この機能によって、各コネクターのハートビートトピックも作成されます。プリンシパルは、ハートビートトピックに対しても Read
および Describe
アクセス権を持ちます。
コネクターの管理
作成したコネクターを管理するために、いくつかの操作を実行できます。
一時停止または再開
新しいイベントの処理を一時停止できます。コネクターを一時停止すると、再開するまでの間、追加のレコードのポーリングが停止されます。2 つの状態は、次のようにして簡単に切り替えることができます。
# to pause
$ heroku data:connectors:pause gentle-connector-1234
# to resume
$ heroku data:connectors:resume gentle-connector-1234
通常の動作では、コネクターはコネクターが一時停止している間に発生した変更イベントを失うことはありません。コネクターは Postgres データベースのレプリケーションスロットを使用して進捗を追跡し、再開時にデータを失うことなく中止したところから取得します。
コネクターを「一時停止」状態にしたまま数時間放置しないでください。一時停止したコネクターによって WAL が削除されず、それによってプライマリデータベースにリスクが生じる可能性があります。コネクターを長期間一時停止したままにするよりは、破棄することをお勧めします。
コネクターが一時停止中に発生した変更イベントは、Kafka に送達されることが保証されません。フェイルオーバーが発生した場合 (システム障害または定期的なメンテナンスによる)、コネクターの一時停止後の変更イベントは失われます。
稼働中のデータベースでコネクターが非常に長期間一時停止する場合、レプリケーションスロットにより Postgres では未読のログ先行書き込み (WAL) が削除されません。その結果、WAL ドライブがいっぱいになり、データベースがシャットダウンする原因となります。自動化のおかげでこれらの状況は一般的に事前に検出されますが、最悪の場合、データベースを保護するためにレプリケーションスロットを削除する必要があります。そのようなまれな場合、変更イベントは Kafka に送達されません。
設定の更新
コネクターに関連付けられた特定のプロパティを CLI から変更できます。これらのプロパティには以下の値が含まれます。
プロパティ | 設定可能な値 | デフォルト値 | 詳細 |
---|---|---|---|
decimal.handling.mode |
precise 、double 、string |
precise |
ドキュメント |
hstore.handling.mode |
map 、json |
map |
ドキュメント |
time.precision.mode |
adaptive 、adaptive_time_microseconds 、connect |
adaptive |
ドキュメント |
interval.handling.mode |
numeric 、string |
numeric |
ドキュメント |
tombstones.on.delete |
true 、false |
true |
ドキュメント |
binary.handling.mode |
bytes 、base64 、hex |
bytes |
ドキュメント |
たとえば、tombstones.on.delete
を false
に更新できます。
$ heroku data:connectors:update gentle-connector-1234 \
--setting tombstones.on.delete=false
コネクターを操作するときは、推奨されるベストプラクティスについて理解しておくことをお勧めします。
Heroku によって管理される設定
ほとんどの設定プロパティは Heroku によって完全に管理され、必要に応じて変更されます。
プロパティ | 管理される値 | 詳細 |
---|---|---|
heartbeat.interval.ms |
60 秒 | ドキュメント |
テーブルおよび除外される列を更新する
除外される列のほかに、コネクターの Postgres テーブルも変更できます。
たとえば、public.parcels
テーブルを追加し、public.posts
テーブルを削除できます。
$ heroku data:connectors:update gentle-connector-1234 \
--add-table public.parcels \
--remove-table public.posts
新しいテーブルは、設定に概略を示すものと同じ要件に従う必要があります。
同様に、除外される列を追加および削除できます。
$ heroku data:connectors:update gentle-connector-1234 \
--exclude-column public.parcels.address \
--remove-excluded-column public.posts.keys
Heroku Postgres データベースのバージョンのアップグレード
ストリーミングデータコネクターを使用して Heroku Postgres データベースをアップグレードするには、新しくアップグレードしたデータベースでコネクターを再作成する必要があります。コネクターがイベントを新しいデータベースにストリーミングするように、システムを再構成するための追加手順があります。アップグレード自体はフォロワーデータベースと pg:upgrade
を使用して行われます。これらの手順は、ストリーミングデータコネクターがアップグレードプロセスにどのように適合するかを示します。
ストリーミングデータコネクターを使用して、データベースをアップグレードする手順は次のとおりです。
- 現在のコネクターとその設定に関する情報を取得する
- アップグレードしてプロモートする新しいフォロワーデータベースを準備する
- 古いデータベースへの書き込みを無効にし、コネクターを一時停止する
- アップグレードを実行する
- 新しくアップグレードしたデータベースをプロモートまたはアタッチする
- 新しくアップグレードしたデータベースでコネクターを再作成する
- メンテナンスモードを終了する
- 古いプライマリデータベースをプロビジョニング解除する
1. コネクターに関する情報を取得する
コネクターのリストと各コネクターの設定の詳細を取得する必要があります。この情報は、アップグレードしたデータベースへのコネクターを再作成する手順 6 で必要になります。
コネクターのリストを取得するには、次のコマンドを実行します。
$ heroku data:connectors --app example-app
=== Data Connector info for example-app
Connector Name: inventive-connector-83577
Kafka Add-On: example-app-kafka
Postgres Add-On: example-app-postgres
Tables: public.posts
public.comments
特定のコネクターの情報と構成の詳細を取得するには、コネクター名を指定してコマンドを実行します。
$ heroku data:connectors:info inventive-connector-83577
=== Data Connector status for inventive-connector-83577
Lag: 15 MB
Service Name: 10d5e5cb-0343-4166-80d8-f03fcc4d1e21
Status: available
=== Configuration
Table Name Topic Name
public.posts inventive_connector_83577.public.posts
public.comments inventive_connector_83577.public.comments
Your Data Connector is now available.
2. フォロワーデータベースをプロビジョニングする
次に、フォロワーデータベースを作成し、リーダーデータベースにほぼ追いつくまで待機します。フォロワーを作成すると、アップグレードに必要なダウンタイムが最小化されます。アップグレードの 24 時間前までにフォロワーを作成することをお勧めします。
3. メンテナンスモードにしてコネクターを一時停止する
次に、アプリをメンテナンスモードにしてアプリの dyno をスケールダウンし、アップグレード中にデータベースへの書き込みが行われないようにします。
$ heroku maintenance:on --app example-app
$ heroku ps:scale consumer=0 generator=0 web=0 --app example-app
その後、データコネクターを一時停止します。
$ heroku data:connectors:pause inventive-connector-83577
Pausing Data Connector inventive-connector-83577... done
4. フォロワーデータベースをアップグレードする
アップグレードする前に、フォロワーが pg:info
に追いついていることを確認します。フォロワーデータベースが同期されていると、Behind By
フィールドに 0 commits
が表示されます。
次に、pg:upgrade
を使用してフォロワーデータベースをアップグレードします。このコマンドによってフォロワーでリーダーのフォローが解除され、Postgres バージョンのアップグレードが実行されます。pg:wait
を使用して、アップグレードの進捗状況を監視できます。
5. 新しいデータベースをプロモートまたはアタッチする
DATABASE_URL
が以前のプライマリデータベースの環境設定だった場合は、pg:promote
を使用して新しくアップグレードしたデータベースを新しい DATABASE_URL
としてプロモートします。アップグレードするデータベースにデフォルトの DATABASE_URL
とは異なる環境設定がある場合は、heroku addons:attach
を使用して、必要なエイリアスまたはアタッチメント名で新しくアップグレードしたデータベースをプロモートします。
6. コネクターを交換する
次に、古いデータベースでコネクターを破棄し、新しくアップグレードしたデータベース用にコネクターを再作成する必要があります。破棄する前に、ステップ 1 の情報があることを確認してください。通常、コネクターの破壊を完了するには約 10 分かかります。
コネクターを破棄しても、関連する Kafka トピックは破棄されません。
$ heroku data:connectors:destroy inventive-connector-83577
To proceed, type inventive-connector-83577 or re-run this command with --confirm inventive-connector-83577: inventive-connector-83577
Data Connector inventive-connector-83577 deleted successfully.
Note: We do not delete your Kafka topics automatically, because they could still contain messages which you haven't consumed. Please delete the topics manually. See heroku kafka:topics:destroy --help
Destroying Data Connector... done
コネクターが破棄されたら、新しいデータベースでコネクターを再作成します。
コネクターは古いコネクターと同じ名前と設定で作成する必要があります。
7. メンテナンスモードを終了する
通常のアプリケーション操作を再開するには、Web 以外のすべての dyno をその元のレベルにスケーリングし、メンテナンスモードをオフにします。
$ heroku ps:scale consumer=1:Private-M generator=1:Private-M web=1:Private-M --app example-app
Scaling dynos... done, now running consumer at 1:Private-M, web at 1:Private-M, generator at 1:Private-M, console at 0:Private-M, rake at 0:Private-M
$ heroku maintenance:off --app example-app
アップグレード後にアプリケーションが正常に動作し、イベントが新しい接続からストリーミングされていることを確認することをお勧めします。kafka:tail
を実行して、コネクタートピックに最新メッセージを表示します。
8. 古いプライマリデータベースをプロビジョニング解除する
データベースをアップグレードした後、古いプライマリデータベースを必ずプロビジョニング解除してください。
$ heroku addons:destroy HEROKU_POSTGRESQL_LAVENDER --app example-app
コネクターの破棄
コネクターの破棄を CLI から実行できます。
このコマンドによって、イベントの生成に使用される Kafka トピックは破棄されません。トピックのライフサイクルは別途管理する必要があります。
$ heroku data:connectors:destroy gentle-connector-1234