使用 Kafka 和 Docker 開發事件驅動應用程式

隨著微服務的興起,事件驅動架構變得越來越普及。作為分散式事件串流平台,Apache Kafka 通常是這些架構的核心。不幸的是,設定和部署自己的 Kafka 執行個體以進行開發通常很麻煩。幸運的是,Docker 和容器技術讓這一切變得容易許多。

在本指南中,您將學習如何:

  1. 使用 Docker 啟動 Kafka 叢集
  2. 將非容器化應用程式連接至叢集
  3. 將容器化應用程式連接至叢集
  4. 部署 Kafka-UI 以協助疑難排解與除錯

先決條件

若要跟隨本操作指南,需要具備以下先決條件:

啟動 Kafka

Kafka 3.3 開始,由於引入了 KRaft (Kafka Raft),Kafka 的部署大幅簡化,不再需要 Zookeeper。有了 KRaft,為本地開發設定 Kafka 執行個體變得容易多了。從 Kafka 3.8 發布開始,提供了全新的 kafka-native Docker 映像檔,其啟動速度顯著加快且記憶體佔用更少。

提示

本指南將使用 apache/kafka 映像檔,因为它包含許多有用的腳本來管理和使用 Kafka。不過,您可能更傾向使用 apache/kafka-native 映像檔,因為它啟動速度更快且所需資源更少。

啟動 Kafka

請依照下列步驟啟動基本 Kafka 叢集。此範例將啟動一個叢集,並將 9092 連接埠公開至主機,以便原生執行的應用程式能夠連接到它。

  1. 執行以下指令啟動 Kafka 容器

    $ docker run -d --name=kafka -p 9092:9092 apache/kafka
    
  2. 映像檔拉取完成後,您將在一兩秒內擁有一個運作中的 Kafka 執行個體。

  3. apache/kafka 映像檔在 /opt/kafka/bin 目錄中提供了幾個實用的腳本。執行下列指令以驗證叢集是否正常運作並取得其叢集 ID

    $ docker exec -ti kafka /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server :9092
    

    執行後將產生類似以下的輸出

    Cluster ID: 5L6g3nShT-eMCtK--X86sw
  4. 執行下列指令以建立範例主題並生產(或發布)幾則訊息

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
    

    執行後,您可以輸入訊息,每一行一則。例如,輸入幾則訊息,每行一則。以下是幾個範例

    First message

    以及

    Second message

    按下 Enter 鍵發送最後一則訊息,完成後按下 ctrl+c。這些訊息將被發布到 Kafka。

  5. 透過消費這些訊息來確認它們已成功發布到叢集中

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic demo --from-beginning
    

    接著您應該會在輸出中看到您的訊息

    First message
    Second message

    如果您想嘗試,可以開啟另一個終端機並發布更多訊息,然後觀察它們出現在消費者中。

    完成後,按下 ctrl+c 停止消費訊息。

您現在擁有一個本地執行的 Kafka 叢集,並已驗證可以連接到它。

從非容器化應用程式連接至 Kafka

既然您已經示範可以從命令列連接到 Kafka 執行個體,現在是時候從應用程式連接到叢集了。在此範例中,您將使用一個簡單的 Node 專案,該專案使用 KafkaJS 函式庫。

由於叢集在本地執行並公開在 9092 連接埠,應用程式可以透過 localhost:9092 連接至叢集(因為此時它是以原生方式而非容器方式執行)。連接後,此範例應用程式將記錄它從 demo 主題消費的訊息。此外,當它在開發模式下執行時,如果找不到該主題,它也會主動建立。

  1. 如果您沒有執行前一個步驟的 Kafka 叢集,請執行下列指令來啟動 Kafka 執行個體

    $ docker run -d --name=kafka -p 9092:9092 apache/kafka
    
  2. GitHub 儲存庫複製到本地。

    $ git clone https://github.com/dockersamples/kafka-development-node.git
    
  3. 進入該專案目錄。

    cd kafka-development-node/app
    
  4. 使用 yarn 安裝相依套件。

    $ yarn install
    
  5. 使用 yarn dev 啟動應用程式。這會將 NODE_ENV 環境變數設為 development 並使用 nodemon 來監控檔案變更。

    $ yarn dev
    
  6. 應用程式現在執行中,它會將接收到的訊息記錄到控制台。在新的終端機中,使用下列指令發布幾則訊息

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
    

    然後發送一則訊息到叢集

    Test message

    記得在完成後按 ctrl+c 停止生產訊息。

從容器和原生應用程式連接至 Kafka

現在您已擁有一個透過公開連接埠連接到 Kafka 的應用程式,是時候探討從另一個容器連接到 Kafka 需要什麼變更。為此,您現在將以容器方式執行該應用程式,而不是以原生方式執行。

但在開始之前,了解 Kafka 監聽器如何運作以及它們如何協助客戶端進行連接非常重要。

了解 Kafka 監聽器

當客戶端連接到 Kafka 叢集時,它實際上是連接到一個「代理 (broker)」。雖然代理有很多角色,但其中之一是支援客戶端的負載平衡。當客戶端連接時,代理會返回一組客戶端應使用的連接 URL,以進行訊息的生產或消費。這些連接 URL 是如何配置的呢?

每個 Kafka 執行個體都有一組監聽器 (listeners) 和廣播監聽器 (advertised listeners)。「監聽器」是 Kafka 綁定的位置,而「廣播監聽器」則是設定客戶端應如何連接到叢集。客戶端接收到的連接 URL 取決於它們連接的是哪個監聽器。

定義監聽器

為了讓這更容易理解,讓我們看看 Kafka 如何設定以支援兩種連接場景

  1. 主機連接(那些透過主機對應連接埠進來的連接)——這些需要使用 localhost 連接
  2. Docker 連接(那些從 Docker 網路內部進來的連接)——這些不能使用 localhost,而是使用 Kafka 服務的網路別名(或 DNS 位址)

由於客戶端需要兩種不同的連接方式,因此需要兩個不同的監聽器 —— HOSTDOCKERHOST 監聽器會告知客戶端使用 localhost:9092 連接,而 DOCKER 監聽器會告知客戶端使用 kafka:9093 連接。請注意,這意味著 Kafka 同時監聽 9092 和 9093 連接埠。不過,只有主機監聽器需要暴露給主機。

Diagram showing the DOCKER and HOST listeners and how they are exposed to the host and Docker networks

為了進行此設定,Kafka 的 compose.yaml 需要一些額外的配置。一旦您開始覆寫某些預設值,為了讓 KRaft 模式正常工作,您還需要指定其他幾個選項。

services:
  kafka:
    image: apache/kafka-native
    ports:
      - "9092:9092"
    environment:
      # Configure listeners for both docker and host communication
      KAFKA_LISTENERS: CONTROLLER://:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: HOST://:9092,DOCKER://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT

      # Settings required for KRaft mode
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091

      # Listener to use for broker-to-broker communication
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER

      # Required for a single node cluster
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

請依照下列步驟試試看。

  1. 如果您已從上一個步驟執行 Node 應用程式,請按下終端機中的 ctrl+c 將其停止。

  2. 如果您已從上一節執行 Kafka 叢集,請使用下列指令停止該容器

    $ docker rm -f kafka
    
  3. 在複製專案的根目錄中執行以下指令以啟動 Compose 堆疊

    $ docker compose up
    

    稍等片刻,應用程式就會啟動並運作。

  4. 在堆疊中有另一個可用於發布訊息的服務。請前往 https://:3000 開啟它。當您輸入訊息並提交表單時,您應該會看到應用程式接收到該訊息的日誌。

    這有助於示範容器化方法如何輕鬆地加入額外服務,以協助測試和疑難排解您的應用程式。

加入叢集視覺化

一旦您開始在開發環境中使用容器,就會體會到加入額外服務的便利性,這些服務僅專注於輔助開發,例如視覺化工具和其他支援服務。既然您已經在執行 Kafka,觀察 Kafka 叢集中發生的情況可能會很有幫助。為此,您可以執行 Kafbat UI 網路應用程式

若要將其加入您自己的專案(示範應用程式中已包含),您只需將以下配置加入您的 Compose 檔案即可

services:
  kafka-ui:
    image: kafbat/kafka-ui:main
    ports:
      - 8080:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: "true"
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
    depends_on:
      - kafka

然後,一旦 Compose 堆疊啟動,您就可以在瀏覽器中開啟 https://:8080,並瀏覽以查看有關叢集的更多詳細資訊、檢查消費者、發布測試訊息等等。

使用 Kafka 進行測試

如果您有興趣了解如何輕鬆地將 Kafka 整合到您的整合測試中,請查看 使用 Testcontainers 測試 Spring Boot Kafka 監聽器指南。本指南將教您如何使用 Testcontainers 管理測試中 Kafka 容器的生命週期。

結論

透過使用 Docker,您可以簡化開發和測試 Kafka 事件驅動應用程式的過程。容器簡化了設定和部署開發所需各種服務的流程。一旦在 Compose 中定義完成,團隊中的每個人都可以受益於這種易用性。

如果您之前錯過了,所有的範例應用程式程式碼都可以在 dockersamples/kafka-development-node 中找到。

© . This site is unofficial and not affiliated with Kubernetes or Docker Inc.