In this article we will learn how to connect to Apache Kafka cluster from a Camel route using the Camel Kafka Consumer and Producer.
Perform the following preliminary actions:
Start the Zookeeper Server
Start the ZooKeeper server. Kafka provides a simple ZooKeeper configuration file to launch a single ZooKeeper instance. To install the ZooKeeper instance, use the following command:
bin/zookeeper-server-start.sh config/zookeeper.properties
[2020-04-14 19:42:12,456] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2020-04-14 19:42:12,470] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2020-04-14 19:42:12,475] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2020-04-14 19:42:12,480] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2020-04-14 19:42:12,496] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
Start Apache Kafka
Now, start the Kafka broker with the following command:
bin/kafka-server-start.sh config/server.properties
[2020-04-14 19:43:13,350] INFO Kafka version: 2.4.1 (org.apache.kafka.common.utils.AppInfoParser)
[2020-04-14 19:43:13,350] INFO Kafka commitId: c57222ae8cd7866b (org.apache.kafka.common.utils.AppInfoParser)
[2020-04-14 19:43:13,351] INFO Kafka startTimeMs: 1586886193347 (org.apache.kafka.common.utils.AppInfoParser)
[2020-04-14 19:43:13,352] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Now that Apache Kafka is started, create a Topic that will be using in our application:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic
Create the Camel Project
You can create a Camel 3 project using its archetype:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.camel.archetypes \
-DarchetypeArtifactId=camel-archetype-java \
-DarchetypeVersion=3.0.0 \
-DgroupId=com.sample.camel \
-DartifactId=camel-kafka \
-Dversion=1.0-SNAPSHOT
A basic project will be created. Now replace the RouteBuilder class with the following one, which sends the messages contained in the data folder to the Kafka topic named 'myTopic'. At the same time, we will be consuming messages with a simple Kafka Consumer:
package com.sample.camel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
/**
* A Camel Java DSL Router
*/
public class MyRouteBuilder extends RouteBuilder {
/**
* Let's configure the Camel routing rules using Java code...
*/
@Override
public void configure() throws Exception {
// Kafka Producer
from("file:src/data?noop=true")
.setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
.to("kafka:myTopic?brokers=localhost:9092");
// Kafka Consumer
from("kafka:myTopic?brokers=localhost:9092")
.log("Message received from Kafka : ${body}")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
.log(" with the key ${headers[kafka.KEY]}");
}
}
That's all. run the project. Run the main class and verify that the messages are logged on the Console:
<person user="james"> <firstName>James</firstName> <lastName>Strachan</lastName> <city>London</city> </person> [ad #3 - KafkaConsumer[myTopic]] route2 INFO on the topic myTopic [ad #3 - KafkaConsumer[myTopic]] route2 INFO on the partition 0 [ad #3 - KafkaConsumer[myTopic]] route2 INFO with the offset 6 [ad #3 - KafkaConsumer[myTopic]] route2 INFO with the key Camel [ad #3 - KafkaConsumer[myTopic]] route2 INFO Message received from Kafka : <?xml version="1.0" encoding="UTF-8"?> <person user="hiram"> <firstName>Hiram</firstName> <lastName>Chirino</lastName> <city>Tampa</city> </person> [ad #3 - KafkaConsumer[myTopic]] route2 INFO on the topic myTopic [ad #3 - KafkaConsumer[myTopic]] route2 INFO on the partition 0 [ad #3 - KafkaConsumer[myTopic]] route2 INFO with the offset 7 [ad #3 - KafkaConsumer[myTopic]] route2 INFO with the key Camel
Source code for this tutorial: http://www.masterspringboot.com/various/camel/consuming-and-producing-kafka-messages-from-camel
FREE WildFly Application Server - JBoss - Quarkus - Drools Tutorials