Improve docker compose support

This commit is contained in:
allegroai 2019-10-27 00:10:08 +03:00
parent b93e843143
commit 5d17059cbe
3 changed files with 38 additions and 16 deletions

View File

@ -178,7 +178,7 @@ class EventBLL(object):
company_id=company_id, company_id=company_id,
task_id=task_id, task_id=task_id,
now=now, now=now,
iter=task_iteration.get(task_id), iter_max=task_iteration.get(task_id),
last_events=task_last_events.get(task_id), last_events=task_last_events.get(task_id),
) )
@ -216,7 +216,7 @@ class EventBLL(object):
if timestamp is None or timestamp < event["timestamp"]: if timestamp is None or timestamp < event["timestamp"]:
last_events[metric_hash][variant_hash] = event last_events[metric_hash][variant_hash] = event
def _update_task(self, company_id, task_id, now, iter=None, last_events=None): def _update_task(self, company_id, task_id, now, iter_max=None, last_events=None):
""" """
Update task information in DB with aggregated results after handling event(s) related to this task. Update task information in DB with aggregated results after handling event(s) related to this task.
@ -226,8 +226,8 @@ class EventBLL(object):
""" """
fields = {} fields = {}
if iter is not None: if iter_max is not None:
fields["last_iteration"] = iter fields["last_iteration_max"] = iter_max
if last_events: if last_events:
fields["last_values"] = list( fields["last_values"] = list(

View File

@ -4,7 +4,6 @@ from typing import Sequence, Set, Optional
import attr import attr
import elasticsearch.helpers import elasticsearch.helpers
import es_factory import es_factory
from apierrors import APIError from apierrors import APIError
from apierrors.errors import bad_request, server_error from apierrors.errors import bad_request, server_error
@ -19,12 +18,14 @@ from apimodels.workers import (
) )
from config import config from config import config
from database.errors import translate_errors_context from database.errors import translate_errors_context
from database.model import Company, User from database.model.auth import User
from database.model.company import Company
from database.model.queue import Queue from database.model.queue import Queue
from database.model.task.task import Task from database.model.task.task import Task
from service_repo.redis_manager import redman from service_repo.redis_manager import redman
from timing_context import TimingContext from timing_context import TimingContext
from tools import safe_get from tools import safe_get
from .stats import WorkerStats from .stats import WorkerStats
log = config.logger(__file__) log = config.logger(__file__)

View File

@ -1,14 +1,25 @@
import threading import threading
from os import getenv
from time import sleep from time import sleep
from redis import StrictRedis
from redis.sentinel import Sentinel, SentinelConnectionPool
from apierrors.errors.server_error import ConfigError, GeneralError from apierrors.errors.server_error import ConfigError, GeneralError
from config import config from config import config
from redis import StrictRedis
from redis.sentinel import Sentinel, SentinelConnectionPool
log = config.logger(__file__) log = config.logger(__file__)
OVERRIDE_HOST_ENV_KEY = "REDIS_SERVICE_HOST"
OVERRIDE_PORT_ENV_KEY = "REDIS_SERVICE_PORT"
OVERRIDE_HOST = getenv(OVERRIDE_HOST_ENV_KEY)
if OVERRIDE_HOST:
log.info(f"Using override redis host {OVERRIDE_HOST}")
OVERRIDE_PORT = getenv(OVERRIDE_PORT_ENV_KEY)
if OVERRIDE_PORT:
log.info(f"Using override redis port {OVERRIDE_PORT}")
class MyPubSubWorkerThread(threading.Thread): class MyPubSubWorkerThread(threading.Thread):
def __init__(self, sentinel, on_new_master, msg_sleep_time, daemon=True): def __init__(self, sentinel, on_new_master, msg_sleep_time, daemon=True):
@ -108,10 +119,21 @@ class RedisManager(object):
def __init__(self, redis_config_dict): def __init__(self, redis_config_dict):
self.aliases = {} self.aliases = {}
for alias, alias_config in redis_config_dict.items(): for alias, alias_config in redis_config_dict.items():
alias_config = alias_config.as_plain_ordered_dict()
is_cluster = alias_config.get("cluster", False) is_cluster = alias_config.get("cluster", False)
host = alias_config.get("host", None)
port = alias_config.get("port", None) host = OVERRIDE_HOST or alias_config.get("host", None)
if host:
alias_config["host"] = host
port = OVERRIDE_PORT or alias_config.get("port", None)
if port:
alias_config["port"] = port
db = alias_config.get("db", 0) db = alias_config.get("db", 0)
sentinels = alias_config.get("sentinels", None) sentinels = alias_config.get("sentinels", None)
service_name = alias_config.get("service_name", None) service_name = alias_config.get("service_name", None)
@ -132,12 +154,11 @@ class RedisManager(object):
if is_cluster: if is_cluster:
# todo support all redis connection args via sentinel's connection_kwargs # todo support all redis connection args via sentinel's connection_kwargs
connection_kwargs = alias_config.as_plain_ordered_dict() del alias_config["sentinels"]
del connection_kwargs["sentinels"] del alias_config["cluster"]
del connection_kwargs["cluster"] del alias_config["service_name"]
del connection_kwargs["service_name"]
self.aliases[alias] = RedisCluster( self.aliases[alias] = RedisCluster(
sentinels, service_name, **connection_kwargs sentinels, service_name, **alias_config
) )
else: else:
self.aliases[alias] = StrictRedis(**alias_config) self.aliases[alias] = StrictRedis(**alias_config)