Apache Kafka 基本操作

2023-03-15 15:44 更新

首先讓我們開始實現(xiàn)單節(jié)點單代理配置,然后我們將我們的設置遷移到單節(jié)點多代理配置。

希望你現(xiàn)在可以在你的機器上安裝 Java,ZooKeeper 和 Kafka 。 在遷移到 Kafka Cluster Setup 之前,首先需要啟動 ZooKeeper,因為 Kafka Cluster 使用 ZooKeeper。

啟動ZooKeeper

打開一個新終端并鍵入以下命令 -

bin/zookeeper-server-start.sh config/zookeeper.properties

要啟動 Kafka Broker,請鍵入以下命令 -

bin/kafka-server-start.sh config/server.properties

啟動 Kafka Broker后,在 ZooKeeper 終端上鍵入命令 jps ,您將看到以下響應 -

821 QuorumPeerMain
928 Kafka
931 Jps

現(xiàn)在你可以看到兩個守護進程運行在終端上,QuorumPeerMain 是 ZooKeeper 守護進程,另一個是 Kafka 守護進程。

單節(jié)點 - 單代理配置

在此配置中,您有一個 ZooKeeper 和代理 id 實例。 以下是配置它的步驟 -

創(chuàng)建 Kafka 主題 - Kafka 提供了一個名為 kafka-topics.sh 的命令行實用程序,用于在服務器上創(chuàng)建主題。 打開新終端并鍵入以下示例。

語法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

我們剛剛創(chuàng)建了一個名為 Hello-Kafka 的主題,其中包含一個分區(qū)和一個副本因子。 上面創(chuàng)建的輸出將類似于以下輸出 -

輸出 - 創(chuàng)建主題 Hello-Kafka

創(chuàng)建主題后,您可以在 Kafka 代理終端窗口中獲取通知,并在 config / server.properties 文件中的“/ tmp / kafka-logs /"中指定的創(chuàng)建主題的日志。

主題列表

要獲取 Kafka 服務器中的主題列表,可以使用以下命令 -

語法

bin/kafka-topics.sh --list --zookeeper localhost:2181

輸出

Hello-Kafka

由于我們已經(jīng)創(chuàng)建了一個主題,它將僅列出 Hello-Kafka 。 假設,如果創(chuàng)建多個主題,您將在輸出中獲取主題名稱。

啟動生產(chǎn)者以發(fā)送消息

語法

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

從上面的語法,生產(chǎn)者命令行客戶端需要兩個主要參數(shù) -

代理列表 - 我們要發(fā)送郵件的代理列表。 在這種情況下,我們只有一個代理。 Config / server.properties 文件包含代理端口 ID,因為我們知道我們的代理正在偵聽端口 9092,因此您可以直接指定它。

主題名稱 - 以下是主題名稱的示例。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

生產(chǎn)者將等待來自 stdin 的輸入并發(fā)布到 Kafka 集群。 默認情況下,每個新行都作為新消息發(fā)布,然后在 config / producer.properties 文件中指定默認生產(chǎn)者屬性。 現(xiàn)在,您可以在終端中鍵入幾行消息,如下所示。

輸出

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

啟動消費者以接收消息

與生產(chǎn)者類似,在config / consumer.proper-ties 文件中指定了缺省使用者屬性。 打開一個新終端并鍵入以下消息消息語法。

語法

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —-topic topic-name 
--from-beginning

示例

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —-topic Hello-Kafka 
--from-beginning

輸出

Hello
My first message
My second message

最后,您可以從制作商的終端輸入消息,并看到他們出現(xiàn)在消費者的終端。 到目前為止,您對具有單個代理的單節(jié)點群集有非常好的了解。 現(xiàn)在讓我們繼續(xù)討論多個代理配置。

單節(jié)點多代理配置

在進入多個代理集群設置之前,首先啟動 ZooKeeper 服務器。

創(chuàng)建多個Kafka Brokers - 我們在配置/ server.properties 中已有一個 Kafka 代理實例。 現(xiàn)在我們需要多個代理實例,因此將現(xiàn)有的 server.properties 文件復制到兩個新的配置文件中,并將其重命名為 server-one.propertiesserver-two.properties。 然后編輯這兩個新文件并分配以下更改 -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

啟動多個代理 - 在三臺服務器上進行所有更改后,打開三個新終端,逐個啟動每個代理。

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

現(xiàn)在我們有三個不同的經(jīng)紀人在機器上運行。 自己嘗試,通過在 ZooKeeper 終端上鍵入 jps 檢查所有守護程序,然后您將看到響應。

創(chuàng)建主題

讓我們?yōu)榇酥黝}將復制因子值指定為三個,因為我們有三個不同的代理運行。 如果您有兩個代理,那么分配的副本值將是兩個。

語法

bin/kafka-topics.sh 
--create 
--zookeeper localhost:2181 
--replication-factor 3 
-partitions 1 
--topic topic-name

示例

bin/kafka-topics.sh 
--create 
--zookeeper localhost:2181 
--replication-factor 3 
-partitions 1 
--topic Multibrokerapplication

輸出

created topic “Multibrokerapplication"

Describe 命令用于檢查哪個代理正在偵聽當前創(chuàng)建的主題,如下所示 -

bin/kafka-topics.sh 
--describe 
--zookeeper localhost:2181 
--topic Multibrokerappli-cation

輸出

bin/kafka-topics.sh 
--describe 
--zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

從上面的輸出,我們可以得出結論,第一行給出所有分區(qū)的摘要,顯示主題名稱,分區(qū)數(shù)量和我們已經(jīng)選擇的復制因子。 在第二行中,每個節(jié)點將是分區(qū)的隨機選擇部分的領導者。

在我們的例子中,我們看到我們的第一個 broker(with broker.id 0)是領導者。 然后 Replicas:0,2,1 意味著所有代理復制主題最后 Isr in-sync 副本的集合。 那么,這是副本的子集,當前活著并被領導者趕上。

啟動生產(chǎn)者以發(fā)送消息

此過程保持與單代理設置中相同。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

輸出

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

啟動消費者以接收消息

此過程保持與單代理設置中所示的相同。

示例

bin/kafka-console-consumer.sh 
--zookeeper localhost:2181 
—topic Multibrokerapplica-tion 
--from-beginning

輸出

bin/kafka-console-consumer.sh 
--zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

基本主題操作

在本章中,我們將討論各種基本主題操作。

修改主題

您已經(jīng)了解如何在 Kafka Cluster 中創(chuàng)建主題。 現(xiàn)在讓我們使用以下命令修改已創(chuàng)建的主題

語法

bin/kafka-topics.sh 
—zookeeper localhost:2181 
--alter 
--topic topic_name 
--parti-tions count

示例

We have already created a topic “Hello-Kafka" with single partition count and one replica factor. 
Now using “alter" command we have changed the partition count.
bin/kafka-topics.sh 
--zookeeper localhost:2181 
--alter 
--topic Hello-kafka 
--parti-tions 2

輸出

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

刪除主題

要刪除主題,可以使用以下語法。

語法

bin/kafka-topics.sh 
--zookeeper localhost:2181 
--delete 
--topic topic_name

示例

bin/kafka-topics.sh 
--zookeeper localhost:2181 
--delete 
--topic Hello-kafka

輸出

> Topic Hello-kafka marked for deletion

注意 - 如果 delete.topic.enable 未設置為 true,則此操作不會產(chǎn)生任何影響



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號