We learnt about Kafka in previous post. We will setup and run three node Kafka cluster (fully distributed) here.
- Installation base directory:
For installation we will use three CentOS VMs which we configured in earlier post. We will setup three node Kafka cluster.Note that here we are using following details for installation (for complete Kafka setup):
- /home/anishsneh/installs
- anishsneh
- server01 (broker 1)
- server02 (broker 2)
- server03 (broker 3)
- Install Kafka
We will use Kafka (kafka_2.10- built using Scala 2.10
- Download kafka_2.10- from Apache Kafka webpage, note that we are using Kafka which is compiled using Scala 2.10
- Extract downloaded package to /home/anishsneh/installs, such that we have:
[anishsneh@server01 installs]$ ls -ltr kafka_2.10- total 28 -rw-rw-r--. 1 anishsneh anishsneh 162 Apr 22 11:37 NOTICE -rw-rw-r--. 1 anishsneh anishsneh 11358 Apr 22 11:37 LICENSE drwxr-xr-x. 2 anishsneh anishsneh 4096 Apr 22 12:26 libs drwxr-xr-x. 2 anishsneh anishsneh 4096 Apr 22 12:26 config drwxr-xr-x. 3 anishsneh anishsneh 4096 Apr 22 12:26 bin
- Repeat above steps for all the three hosts.
- Configure Cluster
- Set KAFKA_HOME=/home/anishsneh/installs/kafka_2.10- in ~/.bashrc (or wherever maintaining environment variables), reload profile/bash
- Create /home/anishsneh/installs/data/kafka/zookeeper directory for Kafka's ZooKeeper data and /home/anishsneh/installs/tmp/kafka-logs for Kafka logs
Edit $KAFKA_HOME/config/server.properties with following:
#For first broker broker_id=1 #For second broker broker_id=2 #For third broker broker_id=3 broker.id=1 host.name=server01 log.dirs=/home/anishsneh/installs/tmp/kafka-logs zookeeper.connect=server01:2181,server02:2181,server03:2181
Repeat for all the three hosts. -
Edit $KAFKA_HOME/config/zookeeper.properties with following:
dataDir=/home/anishsneh/installs/data/kafka/zookeeper server.1=server01:2888:3888 server.2=server02:2888:3888 server.3=server03:2888:3888
Repeat for all the three hosts. -
Create file /home/anishsneh/installs/data/kafka/zookeeper/myid (i.e. in the data directory for each node of the Kafka cluster)
touch /home/anishsneh/installs/data/kafka/zookeeper/myid #Use BROKER_ID=1 for first broker #Use BROKER_ID=2 for second broker #Use BROKER_ID=3 for third broker echo "$BROKER_ID">/home/anishsneh/installs/data/kafka/zookeeper/myid
Start Cluster
Start ZooKeeper using startup script:
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
Repeat for all the three servers i.e. start on server01, server02, server03 in our case -
Start Kafka Server using startup script:
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
Repeat for all the three servers i.e. start on server01, server02, server03 in our case
Verify Installation
Execute jps command ($JAVA_HOME/bin/jps) on all the three Kafka nodes in cluster, it should show following running processes
[anishsneh@server01 installs]$ jps 57690 Jps 57558 QuorumPeerMain 57621 Kafka
Create First Topic
Create topic
[anishsneh@server01 installs]$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic kafka-demo-topic Created topic "kafka-demo-topic".
Verify topic
[anishsneh@server01 installs]$ $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka-demo-topic Topic:kafka-demo-topic PartitionCount:3 ReplicationFactor:3 Configs: Topic: kafka-demo-topic Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: kafka-demo-topic Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: kafka-demo-topic Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Send/Receive Message
For starting commandline consumer, open terminal and start default consumer shipped with Kafka bundle using following command:
[anishsneh@server01 installs]$ $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic kafka-demo-topic SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
For starting commandline producer, open terminal and start default producer shipped with Kafka bundle using following command:
[anishsneh@server02 installs]$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list server01:9092 --topic kafka-demo-topic SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- Enter any text in the producer console, it should appear in the consumer terminal window.
Producer/Consumer - Java API
- Create two maven projects in eclipse with following structure:
Edit kafka-consumer pom.xml with following contents:
4.0.0 com.anishsneh.demo.kafka kafka-consumer 0.0.1-SNAPSHOT jar kafka-consumer http://maven.apache.org org.apache.kafka kafka_2.10 0.8.1 com.sun.jmx jmxri com.sun.jdmk jmxtools javax.jms jms junit junit 3.8.1 test -
Edit kafka-producer pom.xml with following contents:
4.0.0 com.anishsneh.demo.kafka kafka-producer 0.0.1-SNAPSHOT jar kafka-producer http://maven.apache.org org.apache.kafka kafka_2.10 0.8.1 com.sun.jmx jmxri com.sun.jdmk jmxtools javax.jms jms junit junit 3.8.1 test -
Add KafkaConsumer.java class with following contents:
package com.anishsneh.demo.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "first_consumer_group"); props.put("zookeeper.session.timeout.ms", "30000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig cf = new ConsumerConfig(props); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf); String topic = "kafka-demo-topic"; Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); KafkaStream<byte[], byte[]> stream = streams.get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); int i = 1; while (it.hasNext()) { System.out.println("INDEX: " + i + ": MESSAGE: " + new String(it.next().message())); ++i; } consumer.shutdown(); } }
Add KafkaProducer.java class with following contents:
package com.anishsneh.demo.kafka; import java.util.Properties; import java.util.UUID; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("metadata.broker.list", "server01:9092,server02:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); String topic = "kafka-demo-topic"; for (int i = 1; i <= 1000; i++) { String msg = "UUID: " + UUID.randomUUID().toString() + "; AT: " + System.currentTimeMillis(); System.out.println(msg); KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, String.valueOf(i), msg); producer.send(data); Thread.sleep(2000L); } producer.close(); } }
- Create single executable jars for each producer and consumer project respectively (we may use eclipse export with extracted dependencies or maven assembly or maven shade plugin etc.). Say we have producer jar kafka-producer.jar and consumer jar kafka-consumer.jar
Start consumer using:
[anishsneh@server01 tmp]$ java -jar <PATH_TO_JAR>kafka-consumer.jar
Start producer using:
[anishsneh@server01 tmp]$ java -jar <PATH_TO_JAR>kafka-producer.jar
After few seconds we will see in producer console that messages are being sent:
[anishsneh@server01 tmp]$ java -jar kafka-producer.jar log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please initialize the log4j system properly. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. UUID: 87b94fe3-523b-4e02-89a8-9bcf1ad39332; AT: 1410570722367 UUID: cbd637c4-e925-4d67-b694-dc4d6bc128bf; AT: 1410570724581 UUID: db3b2330-9074-432b-884f-4a6b874f6717; AT: 1410570726606 UUID: 13cbdb3e-2589-4673-8b3e-6d534c43874f; AT: 1410570728631 UUID: 2028b3a5-f399-45e0-b681-e35900ef5c2e; AT: 1410570730644
and in consumer console that messages are being received:[anishsneh@server01 tmp]$ java -jar kafka-consumer.jar log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please initialize the log4j system properly. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. INDEX: 1: MESSAGE: UUID: 87b94fe3-523b-4e02-89a8-9bcf1ad39332; AT: 1410570722367 INDEX: 2: MESSAGE: UUID: cbd637c4-e925-4d67-b694-dc4d6bc128bf; AT: 1410570724581 INDEX: 3: MESSAGE: UUID: db3b2330-9074-432b-884f-4a6b874f6717; AT: 1410570726606 INDEX: 4: MESSAGE: UUID: 13cbdb3e-2589-4673-8b3e-6d534c43874f; AT: 1410570728631 INDEX: 5: MESSAGE: UUID: 2028b3a5-f399-45e0-b681-e35900ef5c2e; AT: 1410570730644