Kafka java create producer
1. Create a maven project
2. Add dependencies to pom.xml :
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
3. Create ProducerDemo.java class.
ProducerDemo.java
package com.kafka.tutorial1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
String bootstrapServers = "127.0.0.1:9092";
//Create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//Create the producer
KafkaProducer producer = new KafkaProducer(properties);
//Create a producer record
ProducerRecord producerRecord = new ProducerRecord("first_topic", "Hello world");
//send data - asynchronous
producer.send(producerRecord);
//flash and close
producer.flush();
producer.close();
}
}
4. Run the main application.
5. Run a consumer on KFKA CLI. We will get a hello world sent by the application kafka producer
$kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my-third-application
Hello world
Create java consumer:
package com.kafka.tutorial1;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-firth-application");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//Create consumer
KafkaConsumer consumer = new KafkaConsumer(properties);
//Subscribe consumer to our topic
consumer.subscribe(Collections.singleton("first_topic"));
//Poll for new data
while (true){
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record: records){
logger.info("Key: " + record.key() +", Value: "+ record.value());
logger.info("Partition: " + record.partition() +", Offset: "+ record.offset());
}
}
}
}
Notes:
The producer won't try the request for ever. It is bounded by a timeout. You can set a producer timeout. delivery.timeout.ms = 120 000 ms = 2 mins
Idempotent producer:
Producer can introduce duplicate messages due to network errors (eg: ack never reaches).
In this case the solution is to use a Producer request Id so the producer won't commit the data twice. In the second request it will only send an ack.
producerProps.put("enable.idempotence",true)
Add a comment