This is post is about how to create a Kafka topic dynamically through Java.In one of my project, we(me and my friend Jaya Ananthram) were required to create dynamic Kafka topic through Java. Since there is no documentation on Kafka official documentation we are struggled to create dynamic Kafka topic through Java. Then After a long search ,we found this solution for creating the Kafka topic through Java.
import java.util.Properties; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; public class KafkaTopicCreationInJava { public static void main(String[] args) throws Exception { ZkClient zkClient = null; ZkUtils zkUtils = null; try { String zookeeperHosts = "localhost:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.1.1:2181,192.168.1.2:2181"; int sessionTimeOutInMs = 15 * 1000; // 15 secs int connectionTimeOutInMs = 10 * 1000; // 10 secs zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); String topicName = "testTopic"; int noOfPartitions = 2; int noOfReplication = 3; Properties topicConfiguration = new Properties(); AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); } catch (Exception ex) { ex.printStackTrace(); } finally { if (zkClient != null) { zkClient.close(); } } } }
I am assuming both Zookeeper and Kafka are configured with default configuration at localhost and the service running. While creating ZkClient object, we must set ZkSerializer as ZKStringSerializer$.MODULE$, So that ZkClient will create a topic at Kafka. If we didn't mention ZkSerializer, the topic will not be properly created at Kafka. It will just create topic directories at Zookeepers and It will show that topic is created successfully. But by the time we try to push tuples, it will fail and throw Topic is not exists.
Note: The ZKStringSerializer$.MODULE$ will return the singleton ZKStringSerializer object.
Great Article...
ReplyDeleteHi Harish,
ReplyDeleteI am working on similar project but I want to know what if security is enabled in kafka,where and how do you provide authentication detail in this code?
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
here you have security false.
thanks
Vijay Shekhawat