Mark As Completed Discussion

Build your intuition. Fill in the missing part by typing it in.

To implement a Kafka consumer in Java, you can use the KafkaConsumer class from the Kafka client library. Here's an example of how to implement a basic Kafka consumer:

TEXT/X-JAVA
1%s

In the example above, we import the necessary Kafka classes and set the configuration properties for the consumer. We specify the topic to subscribe to, the bootstrap servers, and the group ID. We then create an instance of the KafkaConsumer class with the configuration properties and subscribe to the specified topic.

Inside the while loop, we continuously poll for new records from the Kafka topic using the poll method. We iterate over the received records and process each record as needed.

To implement a Kafka producer in Java, you can use the KafkaProducer class from the Kafka client library. Here's an example of how to implement a basic Kafka producer:

TEXT/X-JAVA
1import org.apache.kafka.clients.producer.Producer;
2import org.apache.kafka.clients.producer.ProducerConfig;
3import org.apache.kafka.clients.producer.ProducerRecord;
4import org.apache.kafka.clients.producer.KafkaProducer;
5import org.apache.kafka.common.serialization.StringSerializer;
6
7import java.util.Properties;
8
9public class KafkaProducerExample {
10
11    private static final String TOPIC = "my-topic";
12    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
13
14    public static void main(String[] args) {
15        Properties props = new Properties();
16        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
17        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
18        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
19
20        Producer<String, String> kafkaProducer = new KafkaProducer<>(props);
21
22        try {
23            for (int i = 0; i < 10; i++) {
24                String message = "Message " + i;
25                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
26                kafkaProducer.send(record);
27            }
28        } finally {
29            kafkaProducer.close();
30        }
31    }
32}

Write the missing line below.