[Solved] Concurrent processing with partition based ordering in Reactor Kafka

perplexedDev Asks: Concurrent processing with partition based ordering in Reactor Kafka
I am working on a sample application that will read from different partitions of a Kafka topic, concurrently process the records that are ordered based on partition and write the records to different partitions of another topic. This is the sample code I wrote

Code:
    public class MetricsTransposer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    static abstract class SetKafkaProperties {
        final String SOURCE_TOPIC;
        final String DESTINATION_TOPIC;
        final Map<String, Object> consumerProps;
        final Map<String, Object> producerProps;

        SetKafkaProperties(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            SOURCE_TOPIC = sourceTopic;
            DESTINATION_TOPIC = destTopic;

            consumerProps = new HashMap<String, Object>();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group-" + System.currentTimeMillis());
            consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0"); 
            if(consumerPropsOverride != null) {
                consumerProps.putAll(consumerPropsOverride);
            }

            producerProps = new HashMap<String, Object>();
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0");
            producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            if(producerPropsOverride != null) {
                producerProps.putAll(producerPropsOverride);
            }
        }
    }

    static class ReactiveTranspose extends SetKafkaProperties {

        SenderOptions<Integer, String> senderOptions =
            SenderOptions.<Integer, String>create(producerProps)
                .maxInFlight(1024);

        KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);

        ReceiverOptions<Integer, String> receiverOptions =
            ReceiverOptions.<Integer, String>create(consumerProps)
                .subscription(Collections.singleton(SOURCE_TOPIC));


        ReactiveTranspose(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
        }

        public void ReadProcessWriteRecords() {
            Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
            KafkaReceiver.create(receiverOptions)
                .receive()
                .doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                    partitionFlux.publishOn(scheduler)
                        .map(r -> processRecord(partitionFlux.key(), r))
                        .sample(Duration.ofMillis(5000))
                        .concatMap(offset -> offset.commit()));
        }

        private ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, String> message) {
            System.out.printf("Processing record {} from partition {} in thread{}",
                message.value(), topicPartition, Thread.currentThread().getName());
            return message.receiverOffset();
        }
    }

    public static void RunReactiveTranformProcess(String sourceTopic, String destinationTopic) {
        ReactiveTranspose transpose = new ReactiveTranspose(null, null, BOOTSTRAP_SERVERS, sourceTopic, destinationTopic);
        transpose.ReadProcessWriteRecords();
    }

    public static void main(String[] args) throws Exception {
        String sourceTopic = "metrics";
        String destinationTopic = "cleanmetrics";

        RunReactiveTranformProcess(sourceTopic, destinationTopic);

    }
}

When I run the application, I am not seeing the print statements in the console. I do have data to be consumed in the topic. So I wonder if the code is connecting to the topic at all. I am looking for help in figuring out how I can check if it is connecting to the topic and reading the messages or what could be the issue here.

I am a newbie to Java, reactive programming and Kafka. This is a self learning project, it is quite possible I am missing something simple and obvious.

More information: Here is a snapsht of my logs. I have a topic named metrics with 3 partitions
enter image description here

Ten-tools.com may not be responsible for the answers or solutions given to any question asked by the users. All Answers or responses are user generated answers and we do not have proof of its validity or correctness. Please vote for the answer that helped you in order to help others find out which is the most helpful answer. Questions labeled as solved may be solved or may not be solved depending on the type of question and the date posted for some posts may be scheduled to be deleted periodically. Do not hesitate to share your response here to help other visitors like you. Thank you, Ten-tools.