Code ví dụ Java Kafka consumer (Apache Kafka)
(Xem lại: Cài đặt, chạy Apache Kafka, Apache Zookeeper trên windows)
(Xem lại: Code ví dụ Java Kafka Producer)
Thư viện sử dụng
Download file kafka_2.12-1.1.0.jar hoặc khai báo trong maven project (Trong ví dụ này mình sử dụng Eclipse + Maven)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.1.0</version> </dependency>
Code ví dụ
import java.util.Arrays; import java.util.Properties; import java.util.UUID; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // đọc các message của topic từ thời điểm hiển tại (default) props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // đọc tất cả các message của topic từ offset ban đầu // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
- Consumer kết nối với server kafka localhost:9092
- Group Id của consumer được sinh ta tự động
UUID.randomUUID().toString()
- Để xác định vị trí đọc message (offset) ta sử dụng config
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
với 3 tùy chọnearliest
(đọc cả các message có trên topic trước đó)latest
đọc các message mới nhất từ khi consumer subcribenone
- Đối tượng consumer lắng nghe (subscribe) topic có name là “test”:
consumer.subscribe(Arrays.asList("test"));
(Một consumer có thể lắng nghe nhiều topic bằng cách truyền một mảng topic name vào methodsubscribe()
Demo:
Xem lại: Cài đặt, chạy Apache Kafka, Apache Zookeeper trên windows để start kafka server, tạo topic ‘test’ và producer trên cmd thực hiện public tới topic test
Sau khi start kafka server, tạo topic và tạo producer, ta run file SimpleConsumer.java
, bất kỳ message nào được nhập bên phía producer đều sẽ hiện bên phía consumer:
Code ví dụ Java Kafka consumer (Apache Kafka) stackjava.com
Okay, Done!
Download code ví dụ trên tại đây.
References: