Implementing Kafka in Java
Building Kafka consumers and producers using Java
To implement a Kafka consumer in Java, you can use the KafkaConsumer
class from the Kafka client library. Here's an example of how to implement a basic Kafka consumer:
1%s
In the example above, we import the necessary Kafka classes and set the configuration properties for the consumer. We specify the topic to subscribe to, the bootstrap servers, and the group ID. We then create an instance of the KafkaConsumer
class with the configuration properties and subscribe to the specified topic.
Inside the while
loop, we continuously poll for new records from the Kafka topic using the poll
method. We iterate over the received records and process each record as needed.
To implement a Kafka producer in Java, you can use the KafkaProducer
class from the Kafka client library. Here's an example of how to implement a basic Kafka producer:
1import org.apache.kafka.clients.producer.Producer;
2import org.apache.kafka.clients.producer.ProducerConfig;
3import org.apache.kafka.clients.producer.ProducerRecord;
4import org.apache.kafka.clients.producer.KafkaProducer;
5import org.apache.kafka.common.serialization.StringSerializer;
6
7import java.util.Properties;
8
9public class KafkaProducerExample {
10
11 private static final String TOPIC = "my-topic";
12 private static final String BOOTSTRAP_SERVERS = "localhost:9092";
13
14 public static void main(String[] args) {
15 Properties props = new Properties();
16 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
17 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
18 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
19
20 Producer<String, String> kafkaProducer = new KafkaProducer<>(props);
21
22 try {
23 for (int i = 0; i < 10; i++) {
24 String message = "Message " + i;
25 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
26 kafkaProducer.send(record);
27 }
28 } finally {
29 kafkaProducer.close();
30 }
31 }
32}
xxxxxxxxxx
}
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private static final String TOPIC = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));