This is my code :
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def kafka_consumer(self):
"""Consume messages from Kafka with retry logic."""
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "test_group",
"auto.offset.reset": "earliest",
"enable.automit": False
})
consumer.subscribe(["test_topic"])
try:
logging.info("Kafka Consumer started.")
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logging.error(f"Kafka error: {msg.error()}")
raise KafkaException(msg.error())
data = msg.value().decode()
logging.info(f"Received from Kafka: {data}")
# Check if event loop is running
if not self.loop.is_running():
logging.error("Event loop is not running!")
continue
# Make http call in the background
future = asyncio.run_coroutine_threadsafe(
self.send_post_request(data), self.loop
)
try:
success = future.result(timeout=5) #
except Exception as e:
logging.error(f"Error in send_post_request: {e}")
success = False
if success:
consumermit(message=msg)
else:
logging.warning(f"Message retained for retry: {data}")
except KafkaException as e:
logging.error(f"Kafka connection error: {e}")
raise
finally:
consumer.close()
When i get a successful result i commit the offset in kafka for that message, will this implementation work for the below scenario ?:
Assume i have 3 messages in Kafka , m1, m2 and m3 m1 -> did not successfully made a http call with the data , so i did not commit it m2 and m3 were successfully processed and i committed those messages
Then if my app crashes or restarts, will i get the m1 message again to process? if yes, then will i also get the m2 and m3 messages again (which i dont want) since they were in queue after m1
Or the messages will be consumed from m4 since i last committed the m3 message ?
This is my code :
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def kafka_consumer(self):
"""Consume messages from Kafka with retry logic."""
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "test_group",
"auto.offset.reset": "earliest",
"enable.automit": False
})
consumer.subscribe(["test_topic"])
try:
logging.info("Kafka Consumer started.")
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logging.error(f"Kafka error: {msg.error()}")
raise KafkaException(msg.error())
data = msg.value().decode()
logging.info(f"Received from Kafka: {data}")
# Check if event loop is running
if not self.loop.is_running():
logging.error("Event loop is not running!")
continue
# Make http call in the background
future = asyncio.run_coroutine_threadsafe(
self.send_post_request(data), self.loop
)
try:
success = future.result(timeout=5) #
except Exception as e:
logging.error(f"Error in send_post_request: {e}")
success = False
if success:
consumermit(message=msg)
else:
logging.warning(f"Message retained for retry: {data}")
except KafkaException as e:
logging.error(f"Kafka connection error: {e}")
raise
finally:
consumer.close()
When i get a successful result i commit the offset in kafka for that message, will this implementation work for the below scenario ?:
Assume i have 3 messages in Kafka , m1, m2 and m3 m1 -> did not successfully made a http call with the data , so i did not commit it m2 and m3 were successfully processed and i committed those messages
Then if my app crashes or restarts, will i get the m1 message again to process? if yes, then will i also get the m2 and m3 messages again (which i dont want) since they were in queue after m1
Or the messages will be consumed from m4 since i last committed the m3 message ?
Share asked Mar 11 at 7:54 Faizan AnsariFaizan Ansari 51 silver badge3 bronze badges1 Answer
Reset to default 0Kafka's offset commit semantics are that committing the offset of a message effectively acknowledges that message and all messages before that message in that message's TopicPartition.
So if m2 and m3 are committed and m1 is in the same TopicPartition as (at least) one of m2 and m3, then your consumer will not see m1 again. But if m1 is in some other TopicPartition, then your consumer will see m1 and then m4 (for sure if m1 and m4 are in the same TopicPartition and maybe otherwise) or will see m4 and then m1 (possible if m1 and m4 are in different TopicPartitions).
Accordingly, if your application has a requirement to process messages at least once, your application must not commit an offset unless it is sure that it has processed the preceding messages in that TopicPartition.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744808685a4594933.html
评论列表(0条)