// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation 'org.apache.kafka:kafka-clients:3.9.0'
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version> <!-- check for latest release / compatibe version -->
</dependency>
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
implementation 'org.apache.kafka:kafka-streams:3.9.0'
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.9.0</version>
</dependency>
// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
implementation 'org.springframework.kafka:spring-kafka:3.3.1'
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream
implementation 'org.springframework.cloud:spring-cloud-stream:4.2.0'
// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka:4.2.0'
// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
implementation 'org.springframework.kafka:spring-kafka:3.3.1'
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
src
folder from project home directory. dependencies {
implementation("org.apache.kafka:kafka-clients:3.9.0")
implementation("org.slf4j:slf4j-api:2.0.16")
implementation("org.slf4j:slf4j-simple:2.0.16")
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
}
public class KafkaProducerPoc {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerPoc.class);
public static void main(String[] args) {
// Create properties wit kafka configuration
final Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "[::1]:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
// Create kafka record to encapsulate the data
ProducerRecord<String, String> record1 = new ProducerRecord<>("demo_java", "Hello");
ProducerRecord<String, String> record2 = new ProducerRecord<>("demo_java", "World");
// Create Kafka producer
final KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.send(record1);
producer.send(record2);
//flush and close
producer.flush();
producer.close();
}
}
TBU
dependencies {
implementation("org.apache.kafka:kafka-clients:3.9.0")
implementation("org.slf4j:slf4j-api:2.0.16")
implementation("org.slf4j:slf4j-simple:2.0.16")
}
public class KafkaProducerWithCallbacks {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerWithCallbacks.class);
public static void main(String[] args) {
logger.info("KafkaProducerWithCallbacks execution started.");
final Properties properties = new Properties();
/*properties.setProperty("bootstrap.servers", "[::1]:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());*/
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "[::1]:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//Kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// send data and register callback for status
producer.send(new ProducerRecord<>("demo_java", "callback demo app"), new Callback() {
// Executes every time a message is successfully sent or exception occurs.
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// check if no exception occurred
if (e == null) {
logger.info("Received new metadata. \nTopic: {}, \nPartition: {}, \nOffset: {}, \nTimestamp: {}",
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset(), recordMetadata.timestamp());
} else {
logger.error("Error while producing: {}", e);
}
}
});
// flush and close
producer.flush();
producer.close();
logger.info("KafkaProducerWithCallbacks execution completed.");
}
}
[kafka-producer-network-thread | producer-1] INFO com.srvivek.kafka.KafkaProducerWithCallbacks - Received new metadata.
Topic: demo_java,
Partition: 2,
Offset: 51,
Timestamp: 1736577123225
dependencies {
implementation("org.apache.kafka:kafka-clients:3.9.0")
implementation("org.slf4j:slf4j-api:2.0.16")
implementation("org.slf4j:slf4j-simple:2.0.16")
}
public class KafkaProducerPartitionerPoc {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerPartitionerPoc.class);
public static void main(String[] args) {
logger.info("execution started for main(...)");
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "[::1]:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// RoundRobin partitioner (Not rec. for PROD.)- send messages to each queue
// Note: keep partitioner default as per kafka config
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
//set batch size
// Note: keep it default as per kafka config
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500");
final KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// send data in multiple batches
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 30; j++) {
producer.send(new ProducerRecord<>("demo_java", String.format("Message: Hello World testing partitioner in kafka. Test [i: %s, j: %s]", i, j)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
logger.info("Record Metadata. \nTopic: {} \nPartition: {} \nOffset: {} \nTimestamp: {} \n",
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset(), recordMetadata.timestamp());
} else {
logger.error("Error: {}", e.getStackTrace());
}
}
});
producer.flush();
}
// sleep thread for 500ms
try {
logger.info("Thread will sleep for 500ms.");
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
producer.close();
logger.info("execution completed for main(...)");
}
}
# FYI - it will send data to each parition in batches as batch theshhold is reached.
public class KafkaProducerRecordWithKeys {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerRecordWithKeys.class);
public static void main(String[] args) {
logger.info("Execution started for main(...)");
final Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "[::1]:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
final KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
final String TOPIC = "demo_java";
// Send data
for (int j = 0; j < 10; j++) {
for (int i = 0; i < 10; i++) {
String key = "id_" + i;
String message = "hello world - " + i;
producer.send(new ProducerRecord<>(TOPIC, key, message), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
logger.info("Record: Key: {}, Partition: {}", key, recordMetadata.partition());
} else {
logger.error("Stacktrace:\n{}", e.getStackTrace());
}
}
});
producer.flush();
}
// sleep thead to create bataches
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
producer.close();
logger.info("Execution completed for main(...)");
}
}
src
folder from project home directory. import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
public class KafkaConsumerApp {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerApp.class);
private static final String GROUP_ID = "my-java-app-consumers";
private static final String TOPIC = "demo_java";
/**
* Kafka consumer configuration
*/
private static Properties getKafkaConfig() {
final Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "[::1]:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// set application group id
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// Read config
//Read only new messages
// properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// --> fail if consumer group doesn't exist
// properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
// --> Read from beginning
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Don't create topics if not found.
properties.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, Boolean.toString(false));
return properties;
}
public static void main(String[] args) {
logger.info("Execution started of main(...)");
// Create consumer
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getKafkaConfig());
//Add shutdown hook to gracefully close the consumer
final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
logger.info("Detected shutdown. calling to initiate shutdown.");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
logger.error("Shutdown error while waiting for consumer to close resources. Message: {}", e.getMessage());
e.printStackTrace();
throw new RuntimeException(e);
}
}
});
try {
//subscribe
kafkaConsumer.subscribe(List.of(TOPIC));
// Poll for events
while (true) {
logger.info("Polling.................");
// The maximum time to block.
// Must not be greater than Long.MAX_VALUE milliseconds.
final ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: {}, Value: {}", record.key(), record.value());
logger.info("Partition: {}, Offset: {}", record.partition(), record.offset());
}
}
} catch (WakeupException we) {
logger.info("Started shutdown for consumer.");
} catch (Exception e) {
logger.error("Unexpected error in consumer. Message: {}", e.getMessage());
e.printStackTrace();
} finally {
kafkaConsumer.close();
logger.info("Consumer is now gracefully shutdown.");
}
logger.info("Execution completed of main(...)");
}
}
About / Introduction
Code changes
public class KafkaConsumerIncrementalRebalancingApp {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerIncrementalRebalancingApp.class);
private static final String GROUP_ID = "my-java-app-consumers";
private static final String TOPIC = "demo_java";
/**
* Kafka consumer configuration
*/
private static Properties getKafkaConfig() {
final Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "[::1]:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// set application group id
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// Read config --> earliest: Read from beginning
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Don't create topics if not found.
properties.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, Boolean.toString(false));
// CooperativeStickyAssignor : Incremental repartitioning
properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
return properties;
}
public static void main(String[] args) {
logger.info("Execution started of main(...)");
// Create consumer
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getKafkaConfig());
//Add shutdown hook to gracefully close the consumer
final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
logger.info("Detected shutdown. calling to initiate shutdown.");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
logger.error("Shutdown error while waiting for consumer to close resources. Message: {}", e.getMessage());
e.printStackTrace();
throw new RuntimeException(e);
}
}
});
try {
//subscribe
kafkaConsumer.subscribe(List.of(TOPIC));
// Poll for events
while (true) {
logger.info("Polling...");
// The maximum time to block.
// Must not be greater than Long.MAX_VALUE milliseconds.
final ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: {}, Value: {}", record.key(), record.value());
logger.info("Partition: {}, Offset: {}", record.partition(), record.offset());
}
}
} catch (WakeupException we) {
logger.info("Started shutdown for consumer.");
} catch (Exception e) {
logger.error("Unexpected error in consumer. Message: {}", e.getMessage());
e.printStackTrace();
} finally {
kafkaConsumer.close();
logger.info("Consumer is now gracefully shutdown.");
}
logger.info("Execution completed of main(...)");
}
}
References:
public class KafkaConsumerManualOffsetCommitApp {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerIncrementalRebalancingApp.class);
private static final String GROUP_ID = "my-java-app-consumers";
private static final String TOPIC = "demo_java";
/**
* Kafka consumer configuration
*/
private static Properties getKafkaConfig() {
final Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "[::1]:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// set application group id
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// Read config --> earliest: Read from beginning
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Don't create topics if not found.
properties.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, Boolean.toString(false));
// CooperativeStickyAssignor : Incremental repartitioning
properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
// Auto commit interval ms
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
// disable auto commit for offset
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(Boolean.FALSE));
return properties;
}
public static void main(String[] args) {
logger.info("Execution started of main(...)");
// Create consumer
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getKafkaConfig());
//Add shutdown hook to gracefully close the consumer
final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
logger.info("Detected shutdown. calling to initiate shutdown.");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
logger.error("Shutdown error while waiting for consumer to close resources. Message: {}", e.getMessage());
e.printStackTrace();
throw new RuntimeException(e);
}
}
});
try {
//subscribe
kafkaConsumer.subscribe(List.of(TOPIC));
// Poll for events
while (true) {
logger.info("Polling.................");
// The maximum time to block.
// Must not be greater than Long.MAX_VALUE milliseconds.
final ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: {}, Value: {}", record.key(), record.value());
logger.info("Partition: {}, Offset: {}", record.partition(), record.offset());
}
// Manual commit async
kafkaConsumer.commitAsync();
}
} catch (WakeupException we) {
logger.info("Started shutdown for consumer.");
} catch (Exception e) {
logger.error("Unexpected error in consumer. Message: {}", e.getMessage());
e.printStackTrace();
} finally {
kafkaConsumer.close();
logger.info("Consumer is now gracefully shutdown.");
}
logger.info("Execution completed of main(...)");
}
}