diff --git a/server/bll/event/event_bll.py b/server/bll/event/event_bll.py index c83cc94..53cd170 100644 --- a/server/bll/event/event_bll.py +++ b/server/bll/event/event_bll.py @@ -178,7 +178,7 @@ class EventBLL(object): company_id=company_id, task_id=task_id, now=now, - iter=task_iteration.get(task_id), + iter_max=task_iteration.get(task_id), last_events=task_last_events.get(task_id), ) @@ -216,7 +216,7 @@ class EventBLL(object): if timestamp is None or timestamp < event["timestamp"]: last_events[metric_hash][variant_hash] = event - def _update_task(self, company_id, task_id, now, iter=None, last_events=None): + def _update_task(self, company_id, task_id, now, iter_max=None, last_events=None): """ Update task information in DB with aggregated results after handling event(s) related to this task. @@ -226,8 +226,8 @@ class EventBLL(object): """ fields = {} - if iter is not None: - fields["last_iteration"] = iter + if iter_max is not None: + fields["last_iteration_max"] = iter_max if last_events: fields["last_values"] = list( diff --git a/server/bll/workers/__init__.py b/server/bll/workers/__init__.py index 334b76e..28c7fa5 100644 --- a/server/bll/workers/__init__.py +++ b/server/bll/workers/__init__.py @@ -4,7 +4,6 @@ from typing import Sequence, Set, Optional import attr import elasticsearch.helpers - import es_factory from apierrors import APIError from apierrors.errors import bad_request, server_error @@ -19,12 +18,14 @@ from apimodels.workers import ( ) from config import config from database.errors import translate_errors_context -from database.model import Company, User +from database.model.auth import User +from database.model.company import Company from database.model.queue import Queue from database.model.task.task import Task from service_repo.redis_manager import redman from timing_context import TimingContext from tools import safe_get + from .stats import WorkerStats log = config.logger(__file__) diff --git a/server/service_repo/redis_manager.py b/server/service_repo/redis_manager.py index b9692a8..bae8e42 100644 --- a/server/service_repo/redis_manager.py +++ b/server/service_repo/redis_manager.py @@ -1,14 +1,25 @@ import threading +from os import getenv from time import sleep -from redis import StrictRedis -from redis.sentinel import Sentinel, SentinelConnectionPool - from apierrors.errors.server_error import ConfigError, GeneralError from config import config +from redis import StrictRedis +from redis.sentinel import Sentinel, SentinelConnectionPool log = config.logger(__file__) +OVERRIDE_HOST_ENV_KEY = "REDIS_SERVICE_HOST" +OVERRIDE_PORT_ENV_KEY = "REDIS_SERVICE_PORT" + +OVERRIDE_HOST = getenv(OVERRIDE_HOST_ENV_KEY) +if OVERRIDE_HOST: + log.info(f"Using override redis host {OVERRIDE_HOST}") + +OVERRIDE_PORT = getenv(OVERRIDE_PORT_ENV_KEY) +if OVERRIDE_PORT: + log.info(f"Using override redis port {OVERRIDE_PORT}") + class MyPubSubWorkerThread(threading.Thread): def __init__(self, sentinel, on_new_master, msg_sleep_time, daemon=True): @@ -108,10 +119,21 @@ class RedisManager(object): def __init__(self, redis_config_dict): self.aliases = {} for alias, alias_config in redis_config_dict.items(): + + alias_config = alias_config.as_plain_ordered_dict() + is_cluster = alias_config.get("cluster", False) - host = alias_config.get("host", None) - port = alias_config.get("port", None) + + host = OVERRIDE_HOST or alias_config.get("host", None) + if host: + alias_config["host"] = host + + port = OVERRIDE_PORT or alias_config.get("port", None) + if port: + alias_config["port"] = port + db = alias_config.get("db", 0) + sentinels = alias_config.get("sentinels", None) service_name = alias_config.get("service_name", None) @@ -132,12 +154,11 @@ class RedisManager(object): if is_cluster: # todo support all redis connection args via sentinel's connection_kwargs - connection_kwargs = alias_config.as_plain_ordered_dict() - del connection_kwargs["sentinels"] - del connection_kwargs["cluster"] - del connection_kwargs["service_name"] + del alias_config["sentinels"] + del alias_config["cluster"] + del alias_config["service_name"] self.aliases[alias] = RedisCluster( - sentinels, service_name, **connection_kwargs + sentinels, service_name, **alias_config ) else: self.aliases[alias] = StrictRedis(**alias_config)