Créer un site internet

Kafka java create producer

1. Create a maven project 

 

2. Add dependencies to pom.xml :

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.8.0</version>

</dependency>

 

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-simple</artifactId>

<version>1.7.30</version>

</dependency>

 

3. Create ProducerDemo.java class.

ProducerDemo.java

 


    package com.kafka.tutorial1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {

    public static void main(String[] args) {

        String bootstrapServers = "127.0.0.1:9092";

        //Create Producer properties

        Properties properties = new Properties();

        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //Create the producer

        KafkaProducer producer = new KafkaProducer(properties);

        //Create a producer record
        ProducerRecord producerRecord = new ProducerRecord("first_topic", "Hello world");

        //send data - asynchronous
        producer.send(producerRecord);

        //flash and close
        producer.flush();
        producer.close();
    }
}

 

4. Run the main application.

5. Run a consumer on KFKA CLI. We will get a hello world sent by the application kafka producer


    $kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my-third-application
Hello world

Create java consumer:

 


    package com.kafka.tutorial1;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerDemo {

    public static void main(String[] args) {

        Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-firth-application");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //Create consumer

        KafkaConsumer consumer = new KafkaConsumer(properties);

        //Subscribe consumer to our topic
        consumer.subscribe(Collections.singleton("first_topic"));

        //Poll for new data

        while (true){
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record: records){

                logger.info("Key: " + record.key() +", Value: "+ record.value());
                logger.info("Partition: " + record.partition() +", Offset: "+ record.offset());
            }
        }
    }
}

Notes: The producer won't try the request for ever. It is bounded by a timeout. You can set a producer timeout. delivery.timeout.ms = 120 000 ms = 2 mins Idempotent producer: Producer can introduce duplicate messages due to network errors (eg: ack never reaches). In this case the solution is to use a Producer request Id so the producer won't commit the data twice. In the second request it will only send an ack. producerProps.put("enable.idempotence",true)

Add a comment