python - How can i manage offsets in Kafka at Message Level - Stack Overflow

This is my code :@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))def kafka_consumer(self):"

This is my code :

@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
    def kafka_consumer(self):
        """Consume messages from Kafka with retry logic."""
        consumer = Consumer({
            "bootstrap.servers": "localhost:9092",
            "group.id": "test_group",
            "auto.offset.reset": "earliest",
            "enable.automit": False
        })
        consumer.subscribe(["test_topic"])

        try:
            logging.info("Kafka Consumer started.")
            while True:
                msg = consumer.poll(timeout=1.0)

                if msg is None:
                    continue

                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
                        logging.error(f"Kafka error: {msg.error()}")
                        raise KafkaException(msg.error())

                data = msg.value().decode()
                logging.info(f"Received from Kafka: {data}")

                # Check if event loop is running
                if not self.loop.is_running():
                    logging.error("Event loop is not running!")
                    continue

                    # Make http call in the background
                future = asyncio.run_coroutine_threadsafe(
                    self.send_post_request(data), self.loop
                )

                try:
                    success = future.result(timeout=5)  #
                except Exception as e:
                    logging.error(f"Error in send_post_request: {e}")
                    success = False

                if success:
                    consumermit(message=msg)
                else:
                    logging.warning(f"Message retained for retry: {data}")

        except KafkaException as e:
            logging.error(f"Kafka connection error: {e}")
            raise 
        finally:
            consumer.close()

When i get a successful result i commit the offset in kafka for that message, will this implementation work for the below scenario ?:

Assume i have 3 messages in Kafka , m1, m2 and m3 m1 -> did not successfully made a http call with the data , so i did not commit it m2 and m3 were successfully processed and i committed those messages

Then if my app crashes or restarts, will i get the m1 message again to process? if yes, then will i also get the m2 and m3 messages again (which i dont want) since they were in queue after m1

Or the messages will be consumed from m4 since i last committed the m3 message ?

This is my code :

@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
    def kafka_consumer(self):
        """Consume messages from Kafka with retry logic."""
        consumer = Consumer({
            "bootstrap.servers": "localhost:9092",
            "group.id": "test_group",
            "auto.offset.reset": "earliest",
            "enable.automit": False
        })
        consumer.subscribe(["test_topic"])

        try:
            logging.info("Kafka Consumer started.")
            while True:
                msg = consumer.poll(timeout=1.0)

                if msg is None:
                    continue

                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
                        logging.error(f"Kafka error: {msg.error()}")
                        raise KafkaException(msg.error())

                data = msg.value().decode()
                logging.info(f"Received from Kafka: {data}")

                # Check if event loop is running
                if not self.loop.is_running():
                    logging.error("Event loop is not running!")
                    continue

                    # Make http call in the background
                future = asyncio.run_coroutine_threadsafe(
                    self.send_post_request(data), self.loop
                )

                try:
                    success = future.result(timeout=5)  #
                except Exception as e:
                    logging.error(f"Error in send_post_request: {e}")
                    success = False

                if success:
                    consumermit(message=msg)
                else:
                    logging.warning(f"Message retained for retry: {data}")

        except KafkaException as e:
            logging.error(f"Kafka connection error: {e}")
            raise 
        finally:
            consumer.close()

When i get a successful result i commit the offset in kafka for that message, will this implementation work for the below scenario ?:

Assume i have 3 messages in Kafka , m1, m2 and m3 m1 -> did not successfully made a http call with the data , so i did not commit it m2 and m3 were successfully processed and i committed those messages

Then if my app crashes or restarts, will i get the m1 message again to process? if yes, then will i also get the m2 and m3 messages again (which i dont want) since they were in queue after m1

Or the messages will be consumed from m4 since i last committed the m3 message ?

Share asked Mar 11 at 7:54 Faizan AnsariFaizan Ansari 51 silver badge3 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Kafka's offset commit semantics are that committing the offset of a message effectively acknowledges that message and all messages before that message in that message's TopicPartition.

So if m2 and m3 are committed and m1 is in the same TopicPartition as (at least) one of m2 and m3, then your consumer will not see m1 again. But if m1 is in some other TopicPartition, then your consumer will see m1 and then m4 (for sure if m1 and m4 are in the same TopicPartition and maybe otherwise) or will see m4 and then m1 (possible if m1 and m4 are in different TopicPartitions).

Accordingly, if your application has a requirement to process messages at least once, your application must not commit an offset unless it is sure that it has processed the preceding messages in that TopicPartition.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744808685a4594933.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信