[Solved] Concurrent processing with partition based ordering in Reactor Kafka

perplexedDev Asks: Concurrent processing with partition based ordering in Reactor Kafka
I am working on a sample application that will read from different partitions of a Kafka topic, concurrently process the records that are ordered based on partition and write the records to different partitions of another topic. This is the sample code I wrote

    public class MetricsTransposer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    static abstract class SetKafkaProperties {
        final String SOURCE_TOPIC;
        final String DESTINATION_TOPIC;
        final Map<String, Object> consumerProps;
        final Map<String, Object> producerProps;

        SetKafkaProperties(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            SOURCE_TOPIC = sourceTopic;
            DESTINATION_TOPIC = destTopic;

            consumerProps = new HashMap<String, Object>();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group-" + System.currentTimeMillis());
            consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0"); 
            if(consumerPropsOverride != null) {

            producerProps = new HashMap<String, Object>();
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0");
            producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            if(producerPropsOverride != null) {

    static class ReactiveTranspose extends SetKafkaProperties {

        SenderOptions<Integer, String> senderOptions =
            SenderOptions.<Integer, String>create(producerProps)

        KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);

        ReceiverOptions<Integer, String> receiverOptions =
            ReceiverOptions.<Integer, String>create(consumerProps)

        ReactiveTranspose(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);

        public void ReadProcessWriteRecords() {
            Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
                .doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                        .map(r -> processRecord(partitionFlux.key(), r))
                        .concatMap(offset -> offset.commit()));

        private ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, String> message) {
            System.out.printf("Processing record {} from partition {} in thread{}",
                message.value(), topicPartition, Thread.currentThread().getName());
            return message.receiverOffset();

    public static void RunReactiveTranformProcess(String sourceTopic, String destinationTopic) {
        ReactiveTranspose transpose = new ReactiveTranspose(null, null, BOOTSTRAP_SERVERS, sourceTopic, destinationTopic);

    public static void main(String[] args) throws Exception {
        String sourceTopic = "metrics";
        String destinationTopic = "cleanmetrics";

        RunReactiveTranformProcess(sourceTopic, destinationTopic);


When I run the application, I am not seeing the print statements in the console. I do have data to be consumed in the topic. So I wonder if the code is connecting to the topic at all. I am looking for help in figuring out how I can check if it is connecting to the topic and reading the messages or what could be the issue here.

I am a newbie to Java, reactive programming and Kafka. This is a self learning project, it is quite possible I am missing something simple and obvious.

More information: Here is a snapsht of my logs. I have a topic named metrics with 3 partitions
enter image description here

