STACKJAVA

Code ví dụ Java Kafka consumer (Apache Kafka)

Code ví dụ Java Kafka consumer (Apache Kafka)

(Xem lại: Cài đặt, chạy Apache Kafka, Apache Zookeeper trên windows)

(Xem lại: Code ví dụ Java Kafka Producer)

Thư viện sử dụng

Download file kafka_2.12-1.1.0.jar hoặc khai báo trong maven project (Trong ví dụ này mình sử dụng Eclipse + Maven)

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>1.1.0</version>
</dependency>

Code ví dụ

import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class SimpleConsumer {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    
    // đọc các message của topic từ thời điểm hiển tại (default)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    // đọc tất cả các message  của topic từ offset ban đầu
    //		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("test"));
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }

  }
}

Demo:

Xem lại: Cài đặt, chạy Apache Kafka, Apache Zookeeper trên windows để start kafka server, tạo topic ‘test’ và producer trên cmd thực hiện public tới topic test

Sau khi start kafka server, tạo topic và tạo producer, ta run file SimpleConsumer.java, bất kỳ message nào được nhập bên phía producer đều sẽ hiện bên phía consumer:

Code ví dụ Java Kafka consumer (Apache Kafka) stackjava.com

Okay, Done!

Download code ví dụ trên tại đây.

 

References:

http://kafka.apache.org/documentation/