Saturday, 5 December 2015

Apache Kafka - Example of Producer/Consumer in Java

If you are searching for how you can write simple Kafka producer and consumer in Java, I think you reached to the right blog. In this post you will see how you can write standalone program that can produce messages and publish them to Kafka broker. Also, you can learn how to write another standalone program that can work as consumer to consume the messages published by Kafka producer.

In this post, I have focused on simple producer/consumer implementation and its examples. To keep it simple, I have not included example of message partitioning, multiple topics and multiple Kafka nodes. Let’s use single topic to produce and consume the message without message partitioning.


Prerequisite
Before proceeding further, please ensure that:
  • Kafka cluster is already setup. If it is not, don’t worry, follow my another post ‘ApacheKafka - Quick Start on Windows’ to setup Kafka in local environment.
  • Topic ‘mytopic’ is created. 
  • Zookeeper should be running on  localhost:2181.
  • Kafka should be running on localhost:9092.

Setting Up Project in Eclipse to Create Producer/Consumer
  1. Create simple java project in Eclipse.
  2. Add Kafka specific libraries in your project build path. You can locate Kafka libraries at ‘<kafka_dir>\libs’. This is the directory where you have extracted Kafka distribution during Kafka setup. 
   
       About Kafka Producer APIs
       To implement Kafka producer we need to use following Kafka classes. Let’s understand the responsibility of these different classes:
  1.  kafka.producer.ProducerConfig: This class is used to wrap different properties those are required to establish connection with Kafka broker. To get more detail on producer’s properties, you can follow section ‘Important configuration properties for the producer’ from this link.
  2. kafka.producer.KeyedMessage: This class is used by Kafka producer to send message/data to Kafka broker. With this class we can define the topic name, message partition key and message. Producer sends data to the same topic which is defined in KeyedMessage. Defining message partition key is an optional feature.
  3. kafka.javaapi.producer.Producer: This class is used to send data to the broker in form of KeyedMessage object. Message can be sent in both way synchronously or asynchronously.
     Kafka Producer - Java Example 
 In below diagram you can get the detail of different APIs implemented in example producer program. 


      
   package com.nvexample.kafka;

   import java.io.BufferedReader;
   import java.io.InputStreamReader;
   import java.util.Properties;

   import kafka.javaapi.producer.Producer;
   import kafka.producer.KeyedMessage;
   import kafka.producer.ProducerConfig;

    public class KafkaProducer {
       private static Producer<Integer, String> producer;
       private static final String topic= "mytopic";

       public void initialize() {
             Properties producerProps = new Properties();
             producerProps.put("metadata.broker.list", "localhost:9092");
             producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
             producerProps.put("request.required.acks", "1");
             ProducerConfig producerConfig = new ProducerConfig(producerProps);
             producer = new Producer<Integer, String>(producerConfig);
       }
       public void publishMesssage() throws Exception{            
             BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));               
         while (true){
             System.out.print("Enter message to send to kafka broker
                                       (Press 'Y' to close producer): ");
           String msg = null;
           msg = reader.readLine(); // Read message from console
           //Define topic name and message
           KeyedMessage<Integer, String> keyedMsg =
                        new KeyedMessage<Integer, String>(topic, msg);
           producer.send(keyedMsg); // This publishes message on given topic
           if("Y".equals(msg)){ break; }
           System.out.println("--> Message [" + msg + "] sent.
                         Check message on Consumer's program console");
         }
         return;
       }

       public static void main(String[] args) throws Exception {
             KafkaProducer kafkaProducer = new KafkaProducer();
             // Initialize producer
             kafkaProducer.initialize();            
             // Publish message
             kafkaProducer.publishMesssage();
             //Close the producer
             producer.close();
       }
   }

       
   About Kafka Consumer APIs
    To implement Kafka consumer we need to use following Kafka classes. Let’s understand the responsibility of these different classes:
  • kafka.consumer.ConsumerConfig: This class is used to wrap different properties those are required to establish connection between consumer and Zookeeper. To get more detail on producer’s properties, you can follow section ‘Important configuration properties for the consumer’ from this link.
  • kafka.javaapi.consumer.ConsumerConnector: This is the Kafka interface. ZookeeperConsumerConnector is the implementer class for this interface. This implementer class is used to establish connection with ZooKeeper. All the interactions with ZooKeeper are taken care by this implementer class class.
  • kafka.consumer.KafkaStream: ConsumerConnector returns the list of KafkaStream object for each topic wrapped in a map as mentioned below. Map key is the topic name and value is the list of KafkaStream objects. KafkaStream K is partition key type and V is the actual message Map<String, List<KafkaStream<K, V>>
  • kafka.consumer.ConsumerIterator: ConsumerIterator is used to iterate KafkaStream.
   Kafka Consumer- Java Example 
    In below diagram you can get the detail of different APIs implemented in example consumer program. 


   package com.nvexample.kafka;

   import java.util.*;
   import kafka.consumer.Consumer;
   import kafka.consumer.ConsumerConfig;
   import kafka.consumer.ConsumerIterator;
   import kafka.consumer.KafkaStream;
   import kafka.javaapi.consumer.ConsumerConnector;

    public class KafkaConsumer {
       private ConsumerConnector consumerConnector = null;
       private final String topic = "mytopic";

       public void initialize() {
             Properties props = new Properties();
             props.put("zookeeper.connect", "localhost:2181");
             props.put("group.id", "testgroup");
             props.put("zookeeper.session.timeout.ms", "400");
             props.put("zookeeper.sync.time.ms", "300");
             props.put("auto.commit.interval.ms", "1000");
             ConsumerConfig conConfig = new ConsumerConfig(props);
             consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
       }

       public void consume() {
             //Key = topic name, Value = No. of threads for topic
             Map<String, Integer> topicCount = new HashMap<String, Integer>();       
             topicCount.put(topic, new Integer(1));
            
             //ConsumerConnector creates the message stream for each topic
             Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
                   consumerConnector.createMessageStreams(topicCount);         
            
             // Get Kafka stream for topic 'mytopic'
             List<KafkaStream<byte[], byte[]>> kStreamList =
                                                  consumerStreams.get(topic);
             // Iterate stream using ConsumerIterator
             for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                    ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
                   
                    while (consumerIte.hasNext())
                           System.out.println("Message consumed from topic
                                         [" + topic + "] : "       +
                                           new String(consumerIte.next().message()));              
             }
             //Shutdown the consumer connector
             if (consumerConnector != null)   consumerConnector.shutdown();          
       }

       public static void main(String[] args) throws InterruptedException {
             KafkaConsumer kafkaConsumer = new KafkaConsumer();
             // Configure Kafka consumer
             kafkaConsumer.initialize();
             // Start consumption
             kafkaConsumer.consume();
       }
   }



Running KafkaProducer & KafkaConsumer Standalone Program  
      Now you can run the Kafka consumer and Producer program. After running producer it will prompt to enter message. As you type the message and press enter in KafkaProducer program console, see the KafkaConsumer program console, it will print the consumed message.

KafkaProducer Program Console:


KafkaConsumer Program Console:



I hope this post helped you learning Kafka Producer and Consumer Programming.

!!! Happy Kafka Learning !!