I have a flink job that reads message from a topic and sends it to another topic, very basic in terms of flink.
public class RoutingJob {
private static final Logger LOG = LoggerFactory.getLogger(RoutingJob.class);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
LOG.info("Starting Flink Streaming Job");
String inputTopic = getEnvOrProperty("INPUT_TOPIC");
String outputTopic = getEnvOrProperty("OUTPUT_TOPIC");
String bootstrapServers = getEnvOrProperty("BOOTSTRAP_SERVERS");
String groupId = getEnvOrProperty("GROUP_ID");
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();
LOG.info("Kafka Source initialized");
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.map(message -> {
LOG.info("Message is {}", message);
return message;
});
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
LOG.info("Kafka Sink initialized");
stream.sinkTo(sink);
env.execute("MSK to MSK Flink Job");
}
private static String getEnvOrProperty(String key) {
String value = System.getenv(key);
if (StringUtils.isEmpty(value)) {
value = System.getProperty(key);
}
return value;
}
}
That's the code, all it does is read from topic A, send it to topic B.
@Testcontainers
class RoutingJobTest {
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
private static final String TEST_MESSAGE = "test-message";
private static final String GROUP_ID = "test-group";
@Container
public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@Test
void testFlinkKafkaJob() throws Exception {
// 1. Send Test Message to Kafka
sendTestMessage(kafkaContainer.getBootstrapServers(), INPUT_TOPIC, TEST_MESSAGE);
// 2. Run Flink Job
runFlinkJob(kafkaContainer.getBootstrapServers());
// 3. Verify Output
verifyOutput(kafkaContainer.getBootstrapServers(), OUTPUT_TOPIC, TEST_MESSAGE);
}
private void sendTestMessage(String bootstrapServers, String topic, String message) throws Exception {
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer", ".apache.kafkamon.serialization.StringSerializer");
producerProps.put("value.serializer", ".apache.kafkamon.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
System.out.println("Sending message: " + message);
producer.send(new ProducerRecord<>(topic, message)).get();
}
}
private void runFlinkJob(String bootstrapServers) throws Exception {
try {
System.setProperty("INPUT_TOPIC", INPUT_TOPIC);
System.setProperty("OUTPUT_TOPIC", OUTPUT_TOPIC);
System.setProperty("BOOTSTRAP_SERVERS", bootstrapServers);
System.setProperty("GROUP_ID", GROUP_ID);
Thread flinkThread = new Thread(() -> {
try {
RoutingJob.main(new String[]{});
} catch (Exception e) {
throw new RuntimeException(e);
}
});
flinkThread.start();
// Give the Flink job some time to start processing
Thread.sleep(5000); // Adjust as needed
} finally {
// Clear system properties after running the job
System.clearProperty("INPUT_TOPIC");
System.clearProperty("OUTPUT_TOPIC");
System.clearProperty("BOOTSTRAP_SERVERS");
System.clearProperty("GROUP_ID");
}
}
private void verifyOutput(String bootstrapServers, String topic, String expectedMessage) throws Exception {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "test-consumer-group");
consumerProps.put("key.deserializer", ".apache.kafkamon.serialization.StringDeserializer");
consumerProps.put("value.deserializer", ".apache.kafkamon.serialization.StringDeserializer");
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // Poll with a shorter duration
if (records.count() == 1) {
ConsumerRecord<String, String> record = records.iterator().next();
Assertions.assertEquals(expectedMessage, record.value());
return true; // Condition met
}
System.out.println("No records found");
return false; // Condition not met, continue polling
} catch (Exception e) {
return false; // Handle exception, continue polling
}
});
}
}
This is my test and it always fails. Am I missing anything?
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744265603a4565851.html
评论列表(0条)