Compare commits

10 Commits

Author SHA1 Message Date
allegroai
b93591ec32 Improve startup sequence 2020-08-24 14:05:48 +03:00
allegroai
0abfd8da0d Version bump to v0.16.1 2020-08-23 15:43:38 +03:00
allegroai
a9cc4e36c6 Update docs 2020-08-23 15:41:05 +03:00
allegroai
fe1c963eec Fix internal export utility 2020-08-23 15:40:57 +03:00
allegroai
111d80e88d Add migration to verify correct project ordering 2020-08-23 15:39:36 +03:00
allegroai
6718862dbe Update fixed user name if user already exists 2020-08-23 15:38:53 +03:00
allegroai
0fe1bf8a61 Add elasticsearch log filtering while trying to connect 2020-08-23 15:38:22 +03:00
allegroai
10f326eda9 Fix KeyError when accessing log results in events.get_task_logs 2020-08-23 15:36:43 +03:00
allegroai
cd0d6c1a3d Fix max buckets calculation for iters histogram 2020-08-23 15:34:59 +03:00
allegroai
3205f2df97 Add services.tasks.multi_task_histogram_limit configuration option 2020-08-23 15:30:32 +03:00
11 changed files with 148 additions and 39 deletions

View File

@@ -11,6 +11,12 @@
## :rocket: Trains-Agent Services is now included, for more information see [services](https://github.com/allegroai/trains-server#services)
## v0.16 Upgrade Notice
In v0.16, the Elasticsearch subsystem of Trains Server has been upgraded from version 5.6 to version 7.6. This change necessitates the migration of the database contents to accommodate the change in index structure across the different versions.
Follow [this procedure](https://allegro.ai/docs/deploying_trains/trains_server_es7_migration/) to migrate existing data.
## Introduction
The **trains-server** is the backend service infrastructure for [Trains](https://github.com/allegroai/trains).

View File

@@ -9,6 +9,7 @@ from jsonmodels.validators import Length, Min, Max
from apimodels import ListField, IntField, ActualEnumField
from bll.event.event_metrics import EventType
from bll.event.scalar_key import ScalarKeyEnum
from config import config
from utilities.stringenum import StringEnum
@@ -23,7 +24,15 @@ class ScalarMetricsIterHistogramRequest(HistogramRequestBase):
class MultiTaskScalarMetricsIterHistogramRequest(HistogramRequestBase):
tasks: Sequence[str] = ListField(
items_types=str, validators=[Length(minimum_value=1, maximum_value=10)]
items_types=str,
validators=[
Length(
minimum_value=1,
maximum_value=config.get(
"services.tasks.multi_task_histogram_limit", 10
),
)
],
)

View File

@@ -1,4 +1,5 @@
import itertools
import math
from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
from enum import Enum
@@ -252,11 +253,14 @@ class EventMetrics:
min_index = safe_get(data, "min_index/value", default=0)
max_index = safe_get(data, "max_index/value", default=min_index)
index_range = max_index - min_index + 1
interval = max(1, math.ceil(float(index_range) / samples))
max_samples = math.ceil(float(index_range) / interval)
return (
metric,
variant,
max(1, int(max_index - min_index + 1) // samples),
samples,
interval,
max_samples,
)
MetricData = Tuple[str, dict]

View File

@@ -93,21 +93,22 @@ class LogEventsIterator:
},
}
es_result = self.es.search(index=es_index, body=es_req)
hits = es_result["hits"]["hits"]
if not hits or len(hits) < 2:
last_second_hits = es_result["hits"]["hits"]
if not last_second_hits or len(last_second_hits) < 2:
# if only one element is returned for the last timestamp
# then it is already present in the events
return events, hits_total
last_events = [hit["_source"] for hit in es_result["hits"]["hits"]]
already_present_ids = set(ev["_id"] for ev in events)
already_present_ids = set(hit["_id"] for hit in hits)
last_second_events = [
hit["_source"]
for hit in last_second_hits
if hit["_id"] not in already_present_ids
]
# return the list merged from original query results +
# leftovers from the last timestamp
return (
[
*events,
*(ev for ev in last_events if ev["_id"] not in already_present_ids),
],
[*events, *last_second_events],
hits_total,
)

View File

@@ -11,4 +11,6 @@ non_responsive_tasks_watchdog {
artifacts {
update_attempts: 10
update_retry_msec: 500
}
}
multi_task_histogram_limit: 100

View File

@@ -1,5 +1,8 @@
import logging
from time import sleep
from typing import Type, Optional, Sequence, Any, Union
import urllib3.exceptions
from elasticsearch import Elasticsearch, exceptions
import es_factory
@@ -25,6 +28,40 @@ class ElasticConnectionError(Exception):
pass
class ConnectionErrorFilter(logging.Filter):
def __init__(
self,
level: Optional[Union[int, str]] = None,
err_type: Optional[Type] = None,
args_prefix: Optional[Sequence[Any]] = None,
):
super(ConnectionErrorFilter, self).__init__()
if level is None:
self.level = None
else:
try:
self.level = int(level)
except ValueError:
self.level = logging.getLevelName(level)
self.err_type = err_type
self.args = args_prefix and tuple(args_prefix)
self.last_blocked = None
def filter(self, record):
try:
allow = (
(self.err_type is None or record.exc_info[0] != self.err_type)
and (self.level is None or record.levelno != self.level)
and (self.args is None or record.args[: len(self.args)] != self.args)
)
if not allow:
self.last_blocked = record
return allow
except Exception:
return True
def check_elastic_empty() -> bool:
"""
Check for elasticsearch connection
@@ -35,20 +72,32 @@ def check_elastic_empty() -> bool:
cluster_conf = es_factory.get_cluster_config("events")
max_retries = config.get("apiserver.elastic.probing.max_retries", 4)
timeout = config.get("apiserver.elastic.probing.timeout", 30)
for retry in range(max_retries):
try:
es = Elasticsearch(hosts=cluster_conf.get("hosts"))
return not es.indices.get_template(name="events*")
except exceptions.NotFoundError as ex:
log.error(ex)
return True
except exceptions.ConnectionError:
if retry >= max_retries - 1:
raise ElasticConnectionError()
log.warn(
f"Could not connect to es server. Retry {retry+1} of {max_retries}. Waiting for {timeout}sec"
)
sleep(timeout)
es_logger = logging.getLogger("elasticsearch")
log_filter = ConnectionErrorFilter(
err_type=urllib3.exceptions.NewConnectionError, args_prefix=("GET",)
)
try:
es_logger.addFilter(log_filter)
for retry in range(max_retries):
try:
es = Elasticsearch(hosts=cluster_conf.get("hosts"))
return not es.indices.get_template(name="events*")
except exceptions.NotFoundError as ex:
log.error(ex)
return True
except exceptions.ConnectionError as ex:
if retry >= max_retries - 1:
raise ElasticConnectionError(
f"Error connecting to Elasticsearch: {str(ex)}"
)
log.warn(
f"Could not connect to ElasticSearch Service. Retry {retry+1} of {max_retries}. Waiting for {timeout}sec"
)
sleep(timeout)
finally:
es_logger.removeFilter(log_filter)
def init_es_data():

View File

@@ -57,6 +57,10 @@ class PrePopulate:
metadata_filename = "metadata.json"
zip_args = dict(mode="w", compression=ZIP_BZIP2)
artifacts_ext = ".artifacts"
img_source_regex = re.compile(
r"['\"]source['\"]:\s?['\"](https?://(?:localhost:8081|files.*?)/.*?)['\"]",
flags=re.IGNORECASE,
)
class JsonLinesWriter:
def __init__(self, file: BinaryIO):
@@ -360,6 +364,8 @@ class PrePopulate:
@classmethod
def update_featured_projects_order(cls):
featured_order = config.get("services.projects.featured_order", [])
if not featured_order:
return
def get_index(p: Project):
for index, entry in enumerate(featured_order):
@@ -502,11 +508,19 @@ class PrePopulate:
break
scroll_id = res.next_scroll_id
for event in res.events:
if event.get("type") == "training_debug_image":
event_type = event.get("type")
if event_type == "training_debug_image":
url = cls._get_fixed_url(event.get("url"))
if url:
event["url"] = url
artifacts.append(url)
elif event_type == "plot":
plot_str: str = event.get("plot_str", "")
for match in cls.img_source_regex.findall(plot_str):
url = cls._get_fixed_url(match)
if match != url:
plot_str = plot_str.replace(match, url)
artifacts.append(url)
w.write(json.dumps(event))
data = f.getvalue()
hash_.update(data)

View File

@@ -59,7 +59,15 @@ def _ensure_backend_user(user_id: str, company_id: str, user_name: str):
def ensure_fixed_user(user: FixedUser, log: Logger):
if User.objects(company=user.company, id=user.user_id).first():
db_user = User.objects(company=user.company, id=user.user_id).first()
if db_user:
# noinspection PyBroadException
try:
log.info(f"Updating user name: {user.name}")
given_name, _, family_name = user.name.partition(" ")
db_user.update(name=user.name, given_name=given_name, family_name=family_name)
except Exception:
pass
return
data = attr.asdict(user)

View File

@@ -0,0 +1,11 @@
from pymongo.database import Database, Collection
def migrate_backend(db: Database):
collection: Collection = db["project"]
featured = "featured"
query = {featured: {"$exists": False}}
for doc in collection.find(filter=query, projection=()):
collection.update_one(
{"_id": doc["_id"]}, {"$set": {featured: 9999}},
)

View File

@@ -57,21 +57,26 @@ with distributed_lock(key, timeout=config.get("apiserver.db_init_timout", 120)):
info.es_connection_error = True
empty_db = check_mongo_empty()
if upgrade_monitoring:
if not empty_db and (info.es_connection_error or empty_es):
if get_last_server_version() < Version("0.16.0"):
log.info(f"ES database seems not migrated")
info.missed_es_upgrade = True
proceed_with_init = not (info.es_connection_error or info.missed_es_upgrade)
else:
proceed_with_init = True
if (
upgrade_monitoring
and not empty_db
and (info.es_connection_error or empty_es)
and get_last_server_version() < Version("0.16.0")
):
log.info(f"ES database seems not migrated")
info.missed_es_upgrade = True
if proceed_with_init:
if info.es_connection_error and not info.missed_es_upgrade:
raise Exception(
"Error starting server: failed connecting to ElasticSearch service"
)
if not info.missed_es_upgrade:
init_es_data()
init_mongo_data()
if (
proceed_with_init
not info.missed_es_upgrade
and empty_db
and config.get("apiserver.pre_populate.enabled", False)
):

View File

@@ -1 +1 @@
__version__ = "0.16.0"
__version__ = "0.16.1"