Flink unit test is failing to send message to kafka - Stack Overflow

I have a flink job that reads message from a topic and sends it to another topic, very basic in terms o

I have a flink job that reads message from a topic and sends it to another topic, very basic in terms of flink.

public class RoutingJob {

    private static final Logger LOG = LoggerFactory.getLogger(RoutingJob.class);


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        LOG.info("Starting Flink Streaming Job");
        String inputTopic = getEnvOrProperty("INPUT_TOPIC");
        String outputTopic = getEnvOrProperty("OUTPUT_TOPIC");
        String bootstrapServers = getEnvOrProperty("BOOTSTRAP_SERVERS");
        String groupId = getEnvOrProperty("GROUP_ID");

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();

        LOG.info("Kafka Source initialized");

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        stream.map(message -> {
            LOG.info("Message is {}", message);
            return message;
        });

        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .build();
        LOG.info("Kafka Sink initialized");

        stream.sinkTo(sink);

        env.execute("MSK to MSK Flink Job");
    }

    private static String getEnvOrProperty(String key) {
        String value = System.getenv(key);
        if (StringUtils.isEmpty(value)) {
            value = System.getProperty(key);
        }
        return value;
    }
}

That's the code, all it does is read from topic A, send it to topic B.

@Testcontainers
class RoutingJobTest {

    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";
    private static final String TEST_MESSAGE = "test-message";
    private static final String GROUP_ID = "test-group";

    @Container
    public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

    @Test
    void testFlinkKafkaJob() throws Exception {
        // 1. Send Test Message to Kafka
        sendTestMessage(kafkaContainer.getBootstrapServers(), INPUT_TOPIC, TEST_MESSAGE);

        // 2. Run Flink Job
        runFlinkJob(kafkaContainer.getBootstrapServers());

        // 3. Verify Output
        verifyOutput(kafkaContainer.getBootstrapServers(), OUTPUT_TOPIC, TEST_MESSAGE);
    }

    private void sendTestMessage(String bootstrapServers, String topic, String message) throws Exception {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", bootstrapServers);
        producerProps.put("key.serializer", ".apache.kafkamon.serialization.StringSerializer");
        producerProps.put("value.serializer", ".apache.kafkamon.serialization.StringSerializer");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
            System.out.println("Sending message: " + message);
            producer.send(new ProducerRecord<>(topic, message)).get();
        }
    }

    private void runFlinkJob(String bootstrapServers) throws Exception {
        try {
            System.setProperty("INPUT_TOPIC", INPUT_TOPIC);
            System.setProperty("OUTPUT_TOPIC", OUTPUT_TOPIC);
            System.setProperty("BOOTSTRAP_SERVERS", bootstrapServers);
            System.setProperty("GROUP_ID", GROUP_ID);

            Thread flinkThread = new Thread(() -> {
                try {
                    RoutingJob.main(new String[]{});
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            flinkThread.start();

            // Give the Flink job some time to start processing
            Thread.sleep(5000); // Adjust as needed
        } finally {
            // Clear system properties after running the job
            System.clearProperty("INPUT_TOPIC");
            System.clearProperty("OUTPUT_TOPIC");
            System.clearProperty("BOOTSTRAP_SERVERS");
            System.clearProperty("GROUP_ID");
        }
    }

    private void verifyOutput(String bootstrapServers, String topic, String expectedMessage) throws Exception {
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", bootstrapServers);
        consumerProps.put("group.id", "test-consumer-group");
        consumerProps.put("key.deserializer", ".apache.kafkamon.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", ".apache.kafkamon.serialization.StringDeserializer");

        Awaitility.await()
                .atMost(30, TimeUnit.SECONDS)
                .pollInterval(1, TimeUnit.SECONDS)
                .until(() -> {
                    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
                        consumer.subscribe(Collections.singletonList(topic));
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // Poll with a shorter duration
                        if (records.count() == 1) {
                            ConsumerRecord<String, String> record = records.iterator().next();
                            Assertions.assertEquals(expectedMessage, record.value());
                            return true; // Condition met
                        }
                        System.out.println("No records found");
                        return false; // Condition not met, continue polling
                    } catch (Exception e) {
                        return false; // Handle exception, continue polling
                    }
                });
    }
}

This is my test and it always fails. Am I missing anything?

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744265603a4565851.html

相关推荐

  • Flink unit test is failing to send message to kafka - Stack Overflow

    I have a flink job that reads message from a topic and sends it to another topic, very basic in terms o

    7天前
    10

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信