Fix ParseError import with new luqum version

Fix incorrect strip to task diff and requirements
Add missing property to server.report_stats_option response
Add active_duration parameter for tasks
Move artifacts info dictionary structure
This commit is contained in:
allegroai 2021-01-05 17:07:14 +02:00
parent 5b1f468957
commit 37e5d8a7e0
10 changed files with 37 additions and 223 deletions

View File

@ -9,7 +9,8 @@ from jsonmodels import fields
from jsonmodels.fields import _LazyType, NotSet
from jsonmodels.models import Base as ModelBase
from jsonmodels.validators import Enum as EnumValidator
from luqum.parser import parser, ParseError
from luqum.exceptions import ParseError
from luqum.parser import parser
from validators import email as email_validator, domain as domain_validator
from apiserver.apierrors import errors

View File

@ -176,28 +176,3 @@ class ActivityReportSeries(Base):
class GetActivityReportResponse(Base):
total = EmbeddedField(ActivityReportSeries)
active = EmbeddedField(ActivityReportSeries)
class RuntimeProperty(Base):
key = StringField()
value = StringField()
expiry = IntField(default=None)
class GetRuntimePropertiesRequest(Base):
worker = StringField(required=True)
class GetRuntimePropertiesResponse(Base):
runtime_properties = ListField(RuntimeProperty)
class SetRuntimePropertiesRequest(Base):
worker = StringField(required=True)
runtime_properties = ListField(RuntimeProperty)
class SetRuntimePropertiesResponse(Base):
added = ListField(str)
removed = ListField(str)
errors = ListField(str)

View File

@ -1,6 +1,6 @@
import itertools
from datetime import datetime, timedelta
from typing import Sequence, Set, Optional, List
from typing import Sequence, Set, Optional
import attr
import elasticsearch.helpers
@ -16,7 +16,6 @@ from apiserver.apimodels.workers import (
WorkerResponseEntry,
QueueEntry,
MachineStats,
RuntimeProperty,
)
from apiserver.config_repo import config
from apiserver.database.errors import translate_errors_context
@ -417,66 +416,6 @@ class WorkerBLL:
added, errors = es_res[:2]
return (added == len(actions)) and not errors
def set_runtime_properties(
self,
company: str,
user: str,
worker_id: str,
runtime_properties: List[RuntimeProperty],
) -> dict:
"""Save worker entry in Redis"""
res = {
"added": [],
"removed": [],
"errors": [],
}
for prop in runtime_properties:
try:
key = self._get_runtime_property_key(company, user, worker_id, prop.key)
if prop.expiry == 0:
self.redis.delete(key)
res["removed"].append(key)
else:
self.redis.set(
key,
prop.value,
ex=prop.expiry
)
res["added"].append(key)
except Exception as ex:
msg = f"Exception: {ex}\nFailed saving property '{prop.key}: {prop.value}', skipping"
log.exception(msg)
res["errors"].append(ex)
return res
def get_runtime_properties(
self,
company: str,
user: str,
worker_id: str,
) -> List[RuntimeProperty]:
match = self._get_runtime_property_key(company, user, worker_id, "*")
with TimingContext("redis", "get_runtime_properties"):
res = self.redis.scan_iter(match=match)
runtime_properties = []
for r in res:
ttl = self.redis.ttl(r)
runtime_properties.append(
RuntimeProperty(
key=r.decode()[len(match) - 1:],
value=self.redis.get(r).decode(),
expiry=ttl if ttl >= 0 else None
)
)
return runtime_properties
def _get_runtime_property_key(
self, company: str, user: str, worker_id: str, prop_id: str
) -> str:
"""Build redis key from company, user, worker_id and prop_id"""
prefix = self._get_worker_key(company, user, worker_id)
return f"{prefix}_prop_{prop_id}"
@attr.s(auto_attribs=True)
class WorkerConversionHelper:

View File

@ -51,13 +51,13 @@ class TaskSystemTags(object):
class Script(EmbeddedDocument, ProperDictMixin):
binary = StringField(default="python")
repository = StringField(default="")
tag = StringField()
branch = StringField()
version_num = StringField()
entry_point = StringField(default="")
working_dir = StringField()
binary = StringField(default="python", strip=True)
repository = StringField(default="", strip=True)
tag = StringField(strip=True)
branch = StringField(strip=True)
version_num = StringField(strip=True)
entry_point = StringField(default="", strip=True)
working_dir = StringField(strip=True)
requirements = SafeDictField()
diff = StringField()

View File

@ -0,0 +1,19 @@
from pymongo.database import Database, Collection
from apiserver.bll.task.artifacts import get_artifact_id
from apiserver.utilities.dicts import nested_get
def migrate_backend(db: Database):
collection: Collection = db["task"]
artifacts_field = "execution.artifacts"
query = {artifacts_field: {"$type": 4}}
for doc in collection.find(filter=query, projection=(artifacts_field,)):
artifacts = nested_get(doc, artifacts_field.split("."))
if not isinstance(artifacts, list):
continue
new_artifacts = {get_artifact_id(a): a for a in artifacts}
collection.update_one(
{"_id": doc["_id"]}, {"$set": {artifacts_field: new_artifacts}}
)

View File

@ -14,7 +14,7 @@ humanfriendly==4.18
Jinja2==2.10
jsonmodels>=2.3
jsonschema>=2.6.0
luqum>=0.7.2
luqum>=0.10.0
mongoengine==0.19.1
nested_dict>=1.61
psutil>=5.6.5

View File

@ -112,6 +112,10 @@ report_stats_option {
response {
type: object
properties {
supported {
description: "Is this feature supported by the server"
type: boolean
}
enabled {
description: "Returns the current report stats option value"
type: boolean

View File

@ -499,92 +499,4 @@
}
}
}
get_runtime_properties {
"2.10" {
description: "Get runtime properties for a worker"
request {
required: [
worker
]
type: object
properties {
worker {
description: "Worker ID"
type: string
}
}
}
response {
type: object
properties {
runtime_properties {
type: array
items {
type: object
properties {
key { type: string }
value { type: string }
expiry {
description: "Expiry (in seconds) for a runtime property"
type: integer
}
}
}
}
}
}
}
}
set_runtime_properties {
"2.10" {
description: "Set runtime properties for a worker"
request {
required: [
worker
runtime_properties
]
type: object
properties {
worker {
description: "Worker ID"
type: string
}
runtime_properties {
type: array
items {
type: object
properties {
key { type: string }
value { type: string }
expiry {
description: "Expiry (in seconds) for a runtime property. When set to null no expiry is set, when set to 0 the specified key is removed"
type: integer
}
}
}
}
}
}
response {
type: object
properties {
added {
type: array
description: "keys of runtime properties added to redis"
items: { type: string }
}
removed {
type: array
description: "keys of runtime properties removed from redis"
items: { type: string }
}
errors {
type: array
description: "errors for keys failed to be added to redis"
items: { type: string }
}
}
}
}
}
}

View File

@ -72,14 +72,14 @@ from apiserver.database.model.task.task import (
DEFAULT_LAST_ITERATION,
Execution,
)
from apiserver.database.utils import get_fields, parse_from_call
from apiserver.database.utils import get_fields_attr, parse_from_call
from apiserver.service_repo import APICall, endpoint
from apiserver.services.utils import conform_tag_fields, conform_output_tags, validate_tags
from apiserver.timing_context import TimingContext
from apiserver.utilities.partial_version import PartialVersion
task_fields = set(Task.get_fields())
task_script_fields = set(get_fields(Script))
task_script_stripped_fields = set([f for f, v in get_fields_attr(Script, 'strip').items() if v])
task_bll = TaskBLL()
event_bll = EventBLL()
@ -287,7 +287,7 @@ def prepare_for_save(call: APICall, fields: dict, previous_task: Task = None):
artifacts_prepare_for_save(fields)
# Strip all script fields (remove leading and trailing whitespace chars) to avoid unusable names and paths
for field in task_script_fields:
for field in task_script_stripped_fields:
try:
path = f"script/{field}"
value = dpath.get(fields, path)

View File

@ -22,10 +22,6 @@ from apiserver.apimodels.workers import (
GetActivityReportRequest,
GetActivityReportResponse,
ActivityReportSeries,
SetRuntimePropertiesRequest,
GetRuntimePropertiesRequest,
GetRuntimePropertiesResponse,
SetRuntimePropertiesResponse
)
from apiserver.bll.util import extract_properties_to_lists
from apiserver.bll.workers import WorkerBLL
@ -206,35 +202,3 @@ def get_stats(call: APICall, company_id, request: GetStatsRequest):
for worker, stats in ret.items()
]
)
@endpoint(
"workers.set_runtime_properties",
min_version="2.10",
request_data_model=SetRuntimePropertiesRequest,
response_data_model=SetRuntimePropertiesResponse,
)
def set_runtime_properties(call: APICall, company_id, request: SetRuntimePropertiesRequest):
res = worker_bll.set_runtime_properties(
company=company_id,
user=call.identity.user,
worker_id=request.worker,
runtime_properties=request.runtime_properties,
)
return SetRuntimePropertiesResponse(added=res["added"], removed=res["removed"], errors=res["errors"])
@endpoint(
"workers.get_runtime_properties",
min_version="2.10",
request_data_model=GetRuntimePropertiesRequest,
response_data_model=GetRuntimePropertiesResponse,
)
def get_runtime_properties(call: APICall, company_id, request: GetRuntimePropertiesRequest):
return GetRuntimePropertiesResponse(
runtime_properties=worker_bll.get_runtime_properties(
company=company_id,
user=call.identity.user,
worker_id=request.worker,
)
)