Fail-safe Kafka pulling

This commit is contained in:
allegroai 2023-09-23 17:36:01 +03:00
parent e4c07c756a
commit 58d826e427

View File

@ -240,13 +240,22 @@ class StatisticsController(object):
sleep(30) sleep(30)
# we will never leave this loop # we will never leave this loop
for message in consumer: while True:
# noinspection PyBroadException
try:
message = next(consumer)
except Exception:
print("Warning: failed to pull kafka consumer pipe")
sleep(5)
continue
# noinspection PyBroadException # noinspection PyBroadException
try: try:
list_data = json.loads(message.value.decode("utf-8")) list_data = json.loads(message.value.decode("utf-8"))
except Exception: except Exception:
print("Warning: failed to decode kafka stats message") print("Warning: failed to decode kafka stats message")
continue continue
for data in list_data: for data in list_data:
try: try:
url = data.pop("_url", None) url = data.pop("_url", None)