Skip to content

Kafka

Sample Commands

on-kafka --topics
on-kafka --sample-read
on-kafka --sample-send
on-kafka --lag

Source code in linecaspy/kafka/kafka.py
class KafkaClass:
    def __init__(self, sys_exit=True):
        """Sets up the Kafka Class

        Args:
            sys_exit (bool): If calls should force app close on failure
        """
        self.bootstrap_servers = "192.168.86.6:9094"
        self.consumer = None
        self.producer = None
        self.sys_exit = sys_exit

    def consumer_init(self, topic="test"):
        self.consumer = KafkaConsumer(
            topic,
            group_id="linecas",
            bootstrap_servers=[self.bootstrap_servers],
            enable_auto_commit=True,
            max_poll_records=1,
            consumer_timeout_ms=20000,
        )

    def consumer_close(self):
        if self.consumer:
            self.consumer.close()

    def producer_init(self):
        self.producer = KafkaProducer(bootstrap_servers=[self.bootstrap_servers])

    def producer_close(self):
        if self.producer:
            self.producer.close()

    def get_topics(self):
        self.consumer_init()
        logger.info("STARTING LIST OF TOPICS")
        for topic in self.consumer.topics():
            logger.info(f"TOPIC: {topic}")
        logger.info("ENDING LIST OF TOPICS")
        return self.consumer.topics()

    def partition_lag(self, topic="my-topic"):
        self.consumer_init(topic=topic)
        partitions = 3
        LAG_THRESHOLD = 5

        try:
            topic_partition_list = []
            for i in range(0, partitions):
                topic_partition_list.append(TopicPartition(topic=topic, partition=i))
            logger.info(f"topic_partition_list: {topic_partition_list}")
            end_offsets_raw = self.consumer.end_offsets(topic_partition_list)
            end_offset_list = {}
            logger.info(f"end_offset_list: {end_offset_list}")
            for key, value in end_offsets_raw.items():
                end_offset_list[key.partition] = value
            raise_warn = False
            output_message = ""
            for item in topic_partition_list:
                logger.info(f"item: {item}")
                current_partition = item.partition
                current_offset = self.consumer.committed(item) or 0
                end_offset = end_offset_list[item.partition]
                offset_lag = end_offset - current_offset
                if offset_lag > LAG_THRESHOLD:
                    raise_warn = True
                output_message = (
                    output_message + f"\nPartition: {current_partition}, Current Offset: {current_offset}, "
                    f"End Offset: {end_offset}, Lag: {offset_lag}"
                )
            logger.info(output_message)
            if raise_warn:
                hf.return_none_or_sys_exit(self.sys_exit)
        except Exception as e:
            logger.error(f"Exception: {repr(e)}")
            hf.return_none_or_sys_exit(self.sys_exit, 2)

    def sample_read(self, topic="test"):
        self.consumer_init()
        for m in self.consumer:
            logger.info(
                f"Message topic: {m.topic}, parition: {m.partition}, offset: {m.offset}, key: {m.key}, value: {m.value}"
            )
            self.consumer.commit()

    def sample_send(self, topic="my-topic"):
        self.producer_init()
        self.producer.send(topic, b"Hello, World!")
        self.producer.send(topic, key=b"message-two", value=b"This is Kafka-Python")

__init__(self, sys_exit=True) special

Sets up the Kafka Class

Parameters:

Name Type Description Default
sys_exit bool

If calls should force app close on failure

True
Source code in linecaspy/kafka/kafka.py
def __init__(self, sys_exit=True):
    """Sets up the Kafka Class

    Args:
        sys_exit (bool): If calls should force app close on failure
    """
    self.bootstrap_servers = "192.168.86.6:9094"
    self.consumer = None
    self.producer = None
    self.sys_exit = sys_exit