Optimize thread processing

This commit is contained in:
allegroai 2019-12-14 23:35:18 +02:00
parent 4a1d97c02f
commit 49515e06e1
2 changed files with 34 additions and 9 deletions

View File

@ -4,6 +4,7 @@ from typing import Sequence, Set, Optional
import attr
import elasticsearch.helpers
import es_factory
from apierrors import APIError
from apierrors.errors import bad_request, server_error
@ -22,10 +23,9 @@ from database.model.auth import User
from database.model.company import Company
from database.model.queue import Queue
from database.model.task.task import Task
from service_repo.redis_manager import redman
from redis_manager import redman
from timing_context import TimingContext
from tools import safe_get
from .stats import WorkerStats
log = config.logger(__file__)
@ -33,9 +33,9 @@ log = config.logger(__file__)
class WorkerBLL:
def __init__(self, es=None, redis=None):
self.es = es if es is not None else es_factory.connect("workers")
self.es_client = es if es is not None else es_factory.connect("workers")
self.redis = redis if redis is not None else redman.connection("workers")
self._stats = WorkerStats(self.es)
self._stats = WorkerStats(self.es_client)
@property
def stats(self) -> WorkerStats:
@ -396,7 +396,7 @@ class WorkerBLL:
for i, val in enumerate(value)
)
es_res = elasticsearch.helpers.bulk(self.es, actions)
es_res = elasticsearch.helpers.bulk(self.es_client, actions)
added, errors = es_res[:2]
return (added == len(actions)) and not errors

View File

@ -1,14 +1,29 @@
from functools import wraps
from threading import Lock, Thread
import attr
@attr.s(auto_attribs=True)
class ThreadsManager:
objects = {}
lock = Lock()
def __init__(self, name=None, **threads):
super(ThreadsManager, self).__init__()
self.name = name or self.__class__.name
self.objects = {}
self.lock = Lock()
for name, thread in threads.items():
if issubclass(thread, Thread):
thread = thread()
thread.start()
elif isinstance(thread, Thread):
if not thread.is_alive():
thread.start()
else:
raise Exception(f"Expected thread or thread class ({name}): {thread}")
self.objects[name] = thread
def register(self, thread_name, daemon=True):
def decorator(f):
@wraps(f)
@ -17,7 +32,7 @@ class ThreadsManager:
thread = self.objects.get(thread_name)
if not thread:
thread = Thread(
target=f, name=thread_name, args=args, kwargs=kwargs
target=f, name=f"{self.name}_{thread_name}", args=args, kwargs=kwargs
)
thread.daemon = daemon
thread.start()
@ -27,3 +42,13 @@ class ThreadsManager:
return wrapper
return decorator
def __getattr__(self, item):
if item in self.objects:
return self.objects[item]
return self.__getattribute__(item)
def __getitem__(self, item):
if item in self.objects:
return self.objects[item]
raise KeyError(item)