Code ví dụ Java Kafka Producer (Apache Kafka)

Code ví dụ Java Kafka Producer (Apache Kafka)

(Xem lại: Cài đặt, chạy Apache Kafka, Apache Zookeeper trên windows)

(Xem lại: Code ví dụ Java Kafka consumer)

Thư viện sử dụng

Download file kafka_2.12-1.1.0.jar hoặc khai báo trong maven project (Trong ví dụ này mình sử dụng Eclipse + Maven)

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>1.1.0</version>
</dependency>

Code ví dụ

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;


import java.util.Properties;

public class SimpleProducer {

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //If the request fails, the producer can automatically retry,
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //Reduce the no of requests less than 0
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //The buffer.memory controls the total amount of memory available to the producer for buffering.
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer
                <String, String>(props);

        String topic = "test";
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>(topic, "key " + i, "value" + i));
        }

        System.out.println("Message sent successfully");
        producer.close();
    }
}
  • Consumer kết nối với server kafka localhost:9092
  • Producer có thể gửi message tới nhiều topic, trong code ví dụ này producer sẽ gửi message tới topic có tên là “test” (gửi 10 message)

Demo:

Xem lại: Cài đặt, chạy Apache Kafka, Apache Zookeeper trên windows để start kafka server, tạo topic ‘test’ và một consumer trên cmd thực hiện subcribe topic ‘test’

Sau khi strat kafka server, tạo topic và consumer subcribe topic ‘test’, ta run file SimpleProducer.java. Các message gửi từ SimpleProducer sẽ hiện lên ở màn hình cmd của consumer.

Code ví dụ Java Kafka Producer (Apache Kafka) stackjava.com

Code ví dụ Java Kafka Producer (Apache Kafka) stackjava.com

Okay, Done!

Download code ví dụ trên tại đây.

 

References:

http://kafka.apache.org/documentation/

stackjava.com