Skip Navigation
Show nav
Dev Center
  • Get Started
  • ドキュメント
  • Changelog
  • Search
  • Get Started
    • Node.js
    • Ruby on Rails
    • Ruby
    • Python
    • Java
    • PHP
    • Go
    • Scala
    • Clojure
    • .NET
  • ドキュメント
  • Changelog
  • More
    Additional Resources
    • Home
    • Elements
    • Products
    • Pricing
    • Careers
    • Help
    • Status
    • Events
    • Podcasts
    • Compliance Center
    Heroku Blog

    Heroku Blog

    Find out what's new with Heroku on our blog.

    Visit Blog
  • Log inorSign up
View categories

Categories

  • Heroku のアーキテクチャ
    • Dyno (アプリコンテナ)
      • Dyno Management
      • Dyno Concepts
      • Dyno Behavior
      • Dyno Reference
      • Dyno Troubleshooting
    • スタック (オペレーティングシステムイメージ)
    • ネットワーキングと DNS
    • プラットフォームポリシー
    • プラットフォームの原則
  • Developer Tools
    • コマンドライン
    • Heroku VS Code Extension
  • デプロイ
    • Git を使用したデプロイ
    • Docker によるデプロイ
    • デプロイ統合
  • 継続的デリバリーとインテグレーション
    • 継続的統合
  • 言語サポート
    • Node.js
      • Working with Node.js
      • Node.js Behavior in Heroku
      • Troubleshooting Node.js Apps
    • Ruby
      • Rails のサポート
      • Bundler の使用
      • Working with Ruby
      • Ruby Behavior in Heroku
      • Troubleshooting Ruby Apps
    • Python
      • Working with Python
      • Python でのバックグランドジョブ
      • Python Behavior in Heroku
      • Django の使用
    • Java
      • Java Behavior in Heroku
      • Working with Java
      • Maven の使用
      • Spring Boot の使用
      • Troubleshooting Java Apps
    • PHP
      • PHP Behavior in Heroku
      • Working with PHP
    • Go
      • Go の依存関係管理
    • Scala
    • Clojure
    • .NET
      • Working with .NET
  • データベースとデータ管理
    • Heroku Postgres
      • Postgres の基礎
      • Postgres スターターガイド
      • Postgres のパフォーマンス
      • Postgres のデータ転送と保持
      • Postgres の可用性
      • Postgres の特別なトピック
      • Migrating to Heroku Postgres
    • Heroku Data For Redis
    • Apache Kafka on Heroku
    • その他のデータストア
  • AI
    • Working with AI
    • Heroku Inference
      • Inference API
      • Quick Start Guides
      • AI Models
      • Inference Essentials
    • Vector Database
    • Model Context Protocol
  • モニタリングとメトリクス
    • ログ記録
  • アプリのパフォーマンス
  • アドオン
    • すべてのアドオン
  • 共同作業
  • セキュリティ
    • アプリのセキュリティ
    • ID と認証
      • シングルサインオン (SSO)
    • Private Space
      • インフラストラクチャネットワーキング
    • コンプライアンス
  • Heroku Enterprise
    • Enterprise Accounts
    • Enterprise Team
    • Heroku Connect (Salesforce 同期)
      • Heroku Connect の管理
      • Heroku Connect のリファレンス
      • Heroku Connect のトラブルシューティング
  • パターンとベストプラクティス
  • Heroku の拡張
    • Platform API
    • アプリの Webhook
    • Heroku Labs
    • アドオンのビルド
      • アドオン開発のタスク
      • アドオン API
      • アドオンのガイドラインと要件
    • CLI プラグインのビルド
    • 開発ビルドパック
    • Dev Center
  • アカウントと請求
  • トラブルシューティングとサポート
  • Salesforce とのインテグレーション
  • データベースとデータ管理
  • Apache Kafka on Heroku
  • Heroku での Kafka Streams

Heroku での Kafka Streams

日本語 — Switch to English

この記事の英語版に更新があります。ご覧の翻訳には含まれていない変更点があるかもしれません。

最終更新日 2022年11月28日(月)

Table of Contents

  • 基本的な例
  • アプリケーションの構成
  • アプリケーションの接続
  • 内部トピックとコンシューマーグループの管理
  • アプリケーションのスケーリング
  • 注意

Kafka Streams は、Apache Kafka の基礎となるコンポーネントを使用してストリーミングデータを処理する Java クライアントライブラリです。Kafka Streams を使用すると、軽量で、拡張性の高い、フォールトトレラントなストリーム処理アプリを簡単に開発できます。

Kafka Streams は、Heroku では、専用 Kafka プランと基本 Kafka プランの両方でサポートされています (基本プランにはいくつかの追加の設定​が必要)。

Kafka Streams を使用して構築されたアプリケーションは、無限で​、再生可能な​、順序付けられた​、フォールトトレラントな​イベントのシーケンスであるストリームのデータを生成および消費します。ストリームは、Kafka トピックとして表されるか (KStream​)、または圧縮されたトピックとして具体化されます (KTable​)。デフォルトでは、このライブラリでは、アプリケーションがストリームイベントを 1 つずつ処理しながら、到着が遅いイベントや順序が正しくないイベントを処理する機能も提供することが保証されます。

基本的な例

Kafka Streams API を使用すると、数行のコードだけでアプリケーションを開発できます。次のサンプルは、単語数を保持する従来のユースケースを示しています。

words
  .groupBy((key, word) -> word)
  .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
  .count(Materialized.as("windowed-counts"))
  .toStream()
  .process(PostgresSink::new);

このコードは、次の処理を行います。

  1. 単語の入力ストリームを取得する
  2. その入力を単語でグループ化する
  3. 10 秒のタンブリングウィンドウ内の各単語の頻度をカウントする
  4. 断続的な結果をローカルストアに保存する
  5. 各ウィンドウ境界で結果の単語数を出力する

この例は、標準的な Kafka Streams アプリケーションで作成するロジックの大部分を示しています。アプリケーションの残りの部分は、主に設定で構成されています。Kafka Streams では、アプリケーションのロジックを基礎となるインフラストラクチャから切り離すことによって開発が簡略化されます。ここで、このライブラリは、透過的にワークロードを分散し、エラーを処理し、その他の低レベルのタスクを実行します。

アプリケーションの構成

Kafka Streams アプリケーションは、Java のさまざまな実装を使用して Heroku 上で実行できる通常の Java サービスです。Maven​ と Gradle​ のための Heroku の buildpack はどちらもサポートされています。

Gradle で複数プロジェクトの設定​を使用すると、それぞれが異なる Kafka Streams サービスを表す複数の Gradle サブプロジェクトを作成できます。これらのサービスは独立して動作することも、相互接続することもできます。

各サブプロジェクトは、その上で ./gradlew stage​ タスクが実行されると、Gradle プラグインを使用して独自の実行可能ファイルを生成します。これらの実行可能ファイルは、アプリケーションの build/libs/​ ディレクトリに sub-project-name-all.jar​ として指定された名前で作成されます。その後、Procfile​ でワーカープロセスタイプを宣言することによって、Heroku ランタイムでこれらの実行可能ファイルを実行できます。

aggregator_worker: java -jar build/libs/streams-aggregator-all.jar

1 つのアプリケーション内での複数の Kafka Streams サービスの設定に関する詳細情報は、kafka-streams-on-heroku​ リポジトリで見つけることができます。

アプリケーションの接続

Heroku で Kafka ブローカーに接続するには SSL が必要です。これを行うには、次の手順が必要になります。

  1. アプリの KAFKA_URL​ 環境設定に保存されている URI を解析します。
  2. env-keystore​ を使用して Kafka の TRUSTED_CERT​、CLIENT_CERT_KEY​、CLIENT_CERT​ 環境設定を読み取り、トラストストアとキーストアの両方を作成します。
  3. トラストストアとキーストアの関連する SSL 設定を追加します。
private Properties buildHerokuKafkaConfigVars() throws URISyntaxException, CertificateException,
    NoSuchAlgorithmException, KeyStoreException, IOException {
  Properties properties = new Properties();
  List<String> bootstrapServerList = Lists.newArrayList();

  Iterable<String> kafkaUrl = Splitter.on(",")
      .split(Preconditions.checkNotNull(System.getenv(HEROKU_KAFKA_URL)));

  for (String url : kafkaUrl) {
    URI uri = new URI(url);
    bootstrapServerList.add(String.format("%s:%d", uri.getHost(), uri.getPort()));

    switch (uri.getScheme()) {
    case "kafka":
      properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
      break;
    case "kafka+ssl":
      properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
      EnvKeyStore envTrustStore = EnvKeyStore.createWithRandomPassword(
          HEROKU_KAFKA_TRUSTED_CERT);
      EnvKeyStore envKeyStore = EnvKeyStore.createWithRandomPassword(
          HEROKU_KAFKA_CLIENT_CERT_KEY, HEROKU_KAFKA_CLIENT_CERT);

      File trustStoreFile = envTrustStore.storeTemp();
      File keyStoreFile = envKeyStore.storeTemp();

      properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, envTrustStore.type());
      properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
          trustStoreFile.getAbsolutePath());
      properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, envTrustStore.password());
      properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, envKeyStore.type());
      properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getAbsolutePath());
      properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, envKeyStore.password());
      break;
    default:
      throw new URISyntaxException(uri.getScheme(), "Unknown URI scheme");
    }
  }

  bootstrapServers = Joiner.on(",").join(bootstrapServerList);

  return properties;
}

内部トピックとコンシューマーグループの管理

Kafka Streams は、フォールトトレランスと再パーティション分割のために内部トピックを使用します。これらのトピックは、Kafka Streams アプリケーションが正しく動作するために必要です。

Kafka Streams の内部トピックの作成は、Kafka の auto.create.topics.enable​ 設定とは関連がありません。代わりに、Kafka Streams は、管理者クライアント経由でクラスターと直接通信します。

専用 Kafka プラン

専用 Kafka プランは、ユーザー間で分離されています。このため、専用プランでの内部の Kafka Streams トピックに追加の設定は必要ありません。

専用プランに関する詳細情報は、専用プランと設定​のページで見つけることができます。

基本 Kafka プラン

基本 Kafka プランは、複数の Heroku ユーザーを同じ一連の基礎となるリソースで共同ホストします。ユーザーデータとアクセス権限は、Kafka アクセス制御リスト​ (ACL) によって分離されます。さらに、名前の衝突を防ぐために、トピックとコンシューマーグループの名前は自動生成されたプレフィックスで名前空間処理されます。

基本プランで Kafka Streams アプリケーションを実行するには、application.id​ の適切な設定と、内部トピックとコンシューマーグループの事前作成という 2 つの事前の手順が必要です。

application.id の設定

各 Kafka Streams アプリケーションには、そのアプリケーションとそれに関連付けられたトポロジーを識別する application.id​ という名前の重要な一意識別子があります。Kafka Basic プランを使用している場合は、各 application.id​ が割り当てられたプレフィックスで始まっていることを確認する必要があります。

properties.put(StreamsConfig.APPLICATION_ID_CONFIG, String.format("%saggregator-app", HEROKU_KAFKA_PREFIX));

内部トピックとコンシューマーグループの事前作成

Heroku での Kafka Basic プランは ACL を使用するため、Kafka Streams アプリケーションは、適切な ACL のないトピックとコンシューマーグループとは対話できません。Kafka Streams は、内部の管理者クライアントを使用して内部トピックとコンシューマーグループを実行時に透過的に作成するため、これは問題になります。これは主に、Kafka Streams でのプロセッサーに影響を与えます。

プロセッサー​は、process​ メソッドを実装するクラスです。これはストリームから入力イベントを受信し、それらのイベントを処理した後、オプションでダウンストリームプロセッサーへの出力イベントを生成します。ステートフルプロセッサー​は、以降のイベントを処理するときに、前のイベントによって生成された状態を使用するプロセッサーです。Kafka Streams には、この状態を保存するための組み込みの機能が用意されています。

アプリケーション内のステートフルプロセッサーごとに、changelog​ 用に 1 つと repartition​ 用に 1 つの 2 つの内部トピックを作成します。

たとえば、前に示した基本的な例には、ストリームからの単語をカウントする 1 つのステートフルプロセッサーが含まれています。

words
  .groupBy((key, word) -> word)
  .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
  .count(Materialized.as("windowed-counts"))
  .toStream()
  .process(PostgresSink::new);

このアプリケーションでは、count​ 演算子に対して 2 つの内部トピックが必要です。

$ heroku kafka:topics:create aggregator-app-windowed-counts-changelog —app sushi
$ heroku kafka:topics:create aggregator-app-windowed-counts-repartition —app sushi

さらに、application.id​ に一致するアプリケーションに対して 1 つのコンシューマーグループを作成する必要があります。

$ heroku kafka:consumer-groups:create mobile-1234.aggregator-app —app sushi

基本プランに関する詳細情報は、基本プランと設定​のページで見つけることができます。

アプリケーションのスケーリング

並列処理モデル

パーティション​は、Kafka トピックの並列処理の基本単位です。Kafka Streams アプリケーションには、多数のアプリケーションインスタンスが存在します。Kafka Streams アプリケーションは通常の Java アプリケーションであるため、Heroku ランタイムの dyno 内で実行されます。

Kafka Streams アプリケーションの各インスタンスには、いくつかのストリームスレッド​が含まれています。これらのスレッドは、1 つ以上のストリームタスクを実行する役割を担っています。Kafka Streams では、ストリームタスク​は並列処理の基本単位です。Kafka Streams では、すべてのイベントを消費および処理できるように、入力パーティションがストリームタスク間で均等に分散されることが透過的に保証されます。

垂直方向のスケーリング

デフォルトでは、Kafka Streams は、アプリケーションインスタンスあたり 1 つのストリームスレッドを作成します。各ストリームスレッドは 1 つ以上のストリームタスクを実行します。アプリケーションインスタンスは、そのストリームスレッドの数をスケーリングすることによってスケーリングできます。それを行うには、アプリケーションで num.stream.threads​ 設定値を変更します。アプリケーションは、各アプリケーションインスタンス内のスレッド間でワークロードを透過的に再分散します。

水平方向のスケーリング

Kafka Streams は、アプリケーションインスタンスの数が変更されると、それらのインスタンス間でワークロードとローカル状態を再分散します。これは、同じ application.id​ を持つインスタンス間でワークロードとローカル状態を分散することによって透過的に機能します。Kafka Streams アプリケーションは、dyno の数をスケーリングすることによって水平方向にスケーリングできます。

$ heroku ps:scale aggregator_worker=2 —app sushi

入力パーティションの数は、実質的に並列処理の上限です。ストリームタスクの数が入力パーティションの数を超えないという点に注意することが重要です。そうしないと、このオーバープロビジョニングによりアプリケーションインスタンスがアイドル状態になります。

注意

RocksDB の永続性

dyno は一時的なファイルシステム​によってバックアップされるため、耐久性のある保存を基礎となるディスクに依存することは現実的でありません。これは、Heroku で Kafka Streams と共に RocksDB を使用するための課題を提示しています。ただし、RocksDB はハード要件ではありません。Kafka Streams は RocksDB をライトスルーキャッシュとして扱います。ここで、信頼できるソースは実際に、基礎となる Changelog 内部トピックです。基礎となる RocksDB ストアが存在しない場合、状態は、起動時に Changelog トピックから直接再生されます。

デフォルトでは、状態を Changelog トピックから直接再生すると、アプリケーションインスタンスを再分散するか、または dyno が再起動される場合に追加のレイテンシーが発生します。レイテンシーを最小限に抑えるには、ストリームタスクをそれに関連付けられているスタンバイタスクにフェールオーバーするように Kafka Streams を設定できます。

スタンバイタスク​は、状態の完全に複製されたコピーを保持する、ストリームタスクのレプリカです。dyno は、状態が Changelog トピックから再構築されるのを待たなくても、スタンバイタスクを使用してただちに作業を再開できます。

スタンバイタスクの数を変更するには、アプリケーションで num.standby.replicas​ 設定を変更できます。

関連カテゴリー

  • Apache Kafka on Heroku
暗号化鍵を使用した Apache Kafka on Heroku の暗号化 Kafka イベントストリームのモデリング

Information & Support

  • Getting Started
  • Documentation
  • Changelog
  • Compliance Center
  • Training & Education
  • Blog
  • Support Channels
  • Status

Language Reference

  • Node.js
  • Ruby
  • Java
  • PHP
  • Python
  • Go
  • Scala
  • Clojure
  • .NET

Other Resources

  • Careers
  • Elements
  • Products
  • Pricing
  • RSS
    • Dev Center Articles
    • Dev Center Changelog
    • Heroku Blog
    • Heroku News Blog
    • Heroku Engineering Blog
  • Twitter
    • Dev Center Articles
    • Dev Center Changelog
    • Heroku
    • Heroku Status
  • Github
  • LinkedIn
  • © 2025 Salesforce, Inc. All rights reserved. Various trademarks held by their respective owners. Salesforce Tower, 415 Mission Street, 3rd Floor, San Francisco, CA 94105, United States
  • heroku.com
  • Legal
  • Terms of Service
  • Privacy Information
  • Responsible Disclosure
  • Trust
  • Contact
  • Cookie Preferences
  • Your Privacy Choices