Code ví dụ Java Kafka consumer (Apache Kafka)

Code ví dụ Java Kafka consumer (Apache Kafka)

(Xem lại: Cài đặt, chạy Apache Kafka, Apache Zookeeper trên windows)

(Xem lại: Code ví dụ Java Kafka Producer)

Thư viện sử dụng

Download file kafka_2.12-1.1.0.jar hoặc khai báo trong maven project (Trong ví dụ này mình sử dụng Eclipse + Maven)

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>1.1.0</version>
</dependency>

Code ví dụ

import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class SimpleConsumer {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    
    // đọc các message của topic từ thời điểm hiển tại (default)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    // đọc tất cả các message  của topic từ offset ban đầu
    //		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("test"));
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }

  }
}
  • Consumer kết nối với server kafka localhost:9092
  • Group Id của consumer được sinh ta tự động UUID.randomUUID().toString()
  • Để xác định vị trí đọc message (offset) ta sử dụng config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG với 3 tùy chọn
    • earliest(đọc cả các message có trên topic trước đó)
    • latest đọc các message mới nhất từ khi consumer subcribe
    • none
  • Đối tượng consumer lắng nghe (subscribe) topic có name là “test”: consumer.subscribe(Arrays.asList("test")); (Một consumer có thể lắng nghe nhiều topic bằng cách truyền một mảng topic name vào method subscribe()

Demo:

Xem lại: Cài đặt, chạy Apache Kafka, Apache Zookeeper trên windows để start kafka server, tạo topic ‘test’ và producer trên cmd thực hiện public tới topic test

Sau khi start kafka server, tạo topic và tạo producer, ta run file SimpleConsumer.java, bất kỳ message nào được nhập bên phía producer đều sẽ hiện bên phía consumer:

Code ví dụ Java Kafka consumer (Apache Kafka)

Code ví dụ Java Kafka consumer (Apache Kafka) stackjava.com

Okay, Done!

Download code ví dụ trên tại đây.

 

References:

http://kafka.apache.org/documentation/

stackjava.com