久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

java中kafka怎么使用

175次閱讀
沒有評論

共計 2455 個字符,預計需要花費 7 分鐘才能閱讀完成。

在 Java 中使用 Kafka,首先需要添加 Kafka 的依賴項。你可以在 Maven 的 pom.xml 文件中添加以下依賴項:


org.apache.kafka
kafka-clients
2.8.0

接下來,你可以使用 Kafka 的 Java 客戶端來編寫代碼。以下是一個簡單的示例,演示了如何使用 Java 發送和接收消息:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
public class KafkaExample {
private static final String TOPIC = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
public static void main(String[] args) {
// 創建生產者
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer(producerProps);
// 發送消息
ProducerRecord record = new ProducerRecord(TOPIC, "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {exception.printStackTrace();
} else {
System.out.printf("Sent record to topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
// 關閉生產者
producer.close();
// 創建消費者
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer consumer = new KafkaConsumer(consumerProps);
// 訂閱主題并消費消息
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {System.out.printf("Received record: key=%s, value=%s%n", record.key(), record.value());
}
}
}
}

在上面的示例中,我們首先創建了一個生產者,并使用 ProducerConfig 類的常量來配置生產者的屬性,例如 Kafka 集群的地址、鍵和值的序列化方式等。然后,我們創建了一個 ProducerRecord 對象,指定要發送的主題、鍵和值。我們調用生產者的 send() 方法來發送消息,并通過 Callback 來處理發送結果。最后,我們關閉了生產者。
然后,我們創建了一個消費者,并使用 ConsumerConfig 類的常量來配置消費者的屬性,例如 Kafka 集群的地址、鍵和值的反序列化方式、消費者組等。我們訂閱了一個主題,并在一個無限循環中調用 poll() 方法來獲取消息。我們遍歷消息并進行處理。
請注意,這只是一個簡單的示例,用于演示如何使用 Java 操作 Kafka。在實際應用中,你可能需要更復雜的邏輯來處理消息,并使用更多的配置選項來優化性能和確??煽啃浴?/p>

丸趣 TV 網 – 提供最優質的資源集合!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-12-20發表,共計2455字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 蒙阴县| 洪雅县| 禄劝| 隆化县| 丰原市| 孝昌县| 会理县| 东乡| 韶山市| 云阳县| 罗源县| 许昌县| 锡林浩特市| 绩溪县| 玉门市| 合阳县| 甘南县| 扶风县| 天等县| 金昌市| 儋州市| 合山市| 宣城市| 泽普县| 漳州市| 静乐县| 四川省| 松溪县| 陆河县| 中超| 海林市| 金川县| 富锦市| 齐河县| 尚义县| 肇庆市| 湖北省| 华坪县| 眉山市| 乌拉特前旗| 玉山县|