Fix queue metrics calculation

This commit is contained in:
allegroai 2022-03-15 16:28:49 +02:00
parent c17cedd93a
commit e1992e2054

View File

@ -187,13 +187,15 @@ class QueueBLL(object):
if any(e.task == task_id for e in queue.entries): if any(e.task == task_id for e in queue.entries):
raise errors.bad_request.TaskAlreadyQueued(task=task_id) raise errors.bad_request.TaskAlreadyQueued(task=task_id)
self.metrics.log_queue_metrics_to_es(company_id=company_id, queues=[queue])
entry = Entry(added=datetime.utcnow(), task=task_id) entry = Entry(added=datetime.utcnow(), task=task_id)
query = dict(id=queue_id, company=company_id) query = dict(id=queue_id, company=company_id)
res = Queue.objects(entries__task__ne=task_id, **query).update_one( res = Queue.objects(entries__task__ne=task_id, **query).update_one(
push__entries=entry, last_update=datetime.utcnow(), upsert=False push__entries=entry, last_update=datetime.utcnow(), upsert=False
) )
queue.reload()
self.metrics.log_queue_metrics_to_es(company_id=company_id, queues=[queue])
if not res: if not res:
raise errors.bad_request.InvalidQueueOrTaskNotQueued( raise errors.bad_request.InvalidQueueOrTaskNotQueued(
task=task_id, **query task=task_id, **query
@ -233,7 +235,6 @@ class QueueBLL(object):
queue = self.get_queue_with_task( queue = self.get_queue_with_task(
company_id=company_id, queue_id=queue_id, task_id=task_id company_id=company_id, queue_id=queue_id, task_id=task_id
) )
self.metrics.log_queue_metrics_to_es(company_id, queues=[queue])
entries_to_remove = [e for e in queue.entries if e.task == task_id] entries_to_remove = [e for e in queue.entries if e.task == task_id]
query = dict(id=queue_id, company=company_id) query = dict(id=queue_id, company=company_id)
@ -241,6 +242,9 @@ class QueueBLL(object):
pull_all__entries=entries_to_remove, last_update=datetime.utcnow() pull_all__entries=entries_to_remove, last_update=datetime.utcnow()
) )
queue.reload()
self.metrics.log_queue_metrics_to_es(company_id=company_id, queues=[queue])
return len(entries_to_remove) if res else 0 return len(entries_to_remove) if res else 0
def reposition_task( def reposition_task(