Kafka
Sample Commands
on-kafka --topics
on-kafka --sample-read
on-kafka --sample-send
on-kafka --lag
Source code in linecaspy/kafka/kafka.py
class KafkaClass:
def __init__(self, sys_exit=True):
"""Sets up the Kafka Class
Args:
sys_exit (bool): If calls should force app close on failure
"""
self.bootstrap_servers = "192.168.86.6:9094"
self.consumer = None
self.producer = None
self.sys_exit = sys_exit
def consumer_init(self, topic="test"):
self.consumer = KafkaConsumer(
topic,
group_id="linecas",
bootstrap_servers=[self.bootstrap_servers],
enable_auto_commit=True,
max_poll_records=1,
consumer_timeout_ms=20000,
)
def consumer_close(self):
if self.consumer:
self.consumer.close()
def producer_init(self):
self.producer = KafkaProducer(bootstrap_servers=[self.bootstrap_servers])
def producer_close(self):
if self.producer:
self.producer.close()
def get_topics(self):
self.consumer_init()
logger.info("STARTING LIST OF TOPICS")
for topic in self.consumer.topics():
logger.info(f"TOPIC: {topic}")
logger.info("ENDING LIST OF TOPICS")
return self.consumer.topics()
def partition_lag(self, topic="my-topic"):
self.consumer_init(topic=topic)
partitions = 3
LAG_THRESHOLD = 5
try:
topic_partition_list = []
for i in range(0, partitions):
topic_partition_list.append(TopicPartition(topic=topic, partition=i))
logger.info(f"topic_partition_list: {topic_partition_list}")
end_offsets_raw = self.consumer.end_offsets(topic_partition_list)
end_offset_list = {}
logger.info(f"end_offset_list: {end_offset_list}")
for key, value in end_offsets_raw.items():
end_offset_list[key.partition] = value
raise_warn = False
output_message = ""
for item in topic_partition_list:
logger.info(f"item: {item}")
current_partition = item.partition
current_offset = self.consumer.committed(item) or 0
end_offset = end_offset_list[item.partition]
offset_lag = end_offset - current_offset
if offset_lag > LAG_THRESHOLD:
raise_warn = True
output_message = (
output_message + f"\nPartition: {current_partition}, Current Offset: {current_offset}, "
f"End Offset: {end_offset}, Lag: {offset_lag}"
)
logger.info(output_message)
if raise_warn:
hf.return_none_or_sys_exit(self.sys_exit)
except Exception as e:
logger.error(f"Exception: {repr(e)}")
hf.return_none_or_sys_exit(self.sys_exit, 2)
def sample_read(self, topic="test"):
self.consumer_init()
for m in self.consumer:
logger.info(
f"Message topic: {m.topic}, parition: {m.partition}, offset: {m.offset}, key: {m.key}, value: {m.value}"
)
self.consumer.commit()
def sample_send(self, topic="my-topic"):
self.producer_init()
self.producer.send(topic, b"Hello, World!")
self.producer.send(topic, key=b"message-two", value=b"This is Kafka-Python")
__init__(self, sys_exit=True)
special
Sets up the Kafka Class
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sys_exit |
bool |
If calls should force app close on failure |
True |
Source code in linecaspy/kafka/kafka.py
def __init__(self, sys_exit=True):
"""Sets up the Kafka Class
Args:
sys_exit (bool): If calls should force app close on failure
"""
self.bootstrap_servers = "192.168.86.6:9094"
self.consumer = None
self.producer = None
self.sys_exit = sys_exit