Fix update project time on task changes

Fix project time in non responsive tasks watchdog
This commit is contained in:
allegroai 2021-01-05 18:19:45 +02:00
parent 59994ccf9c
commit e2deff4eef
5 changed files with 53 additions and 31 deletions

View File

@ -1,8 +1,7 @@
from datetime import timedelta, datetime from datetime import timedelta, datetime
from time import sleep from time import sleep
from apiserver.apierrors import errors from apiserver.bll.task import update_project_time
from apiserver.bll.task import ChangeStatusRequest
from apiserver.config_repo import config from apiserver.config_repo import config
from apiserver.database.model.task.task import TaskStatus, Task from apiserver.database.model.task.task import TaskStatus, Task
from apiserver.utilities.threads_manager import ThreadsManager from apiserver.utilities.threads_manager import ThreadsManager
@ -71,19 +70,29 @@ class NonResponsiveTasksWatchdog:
return 0 return 0
err_count = 0 err_count = 0
project_ids = set()
now = datetime.utcnow()
for task in tasks: for task in tasks:
log.info( log.info(
f"Stopping {task.id} ({task.name}), last updated at {task.last_update}" f"Stopping {task.id} ({task.name}), last updated at {task.last_update}"
) )
# noinspection PyBroadException
try: try:
ChangeStatusRequest( updated = Task.objects(id=task.id, status=task.status).update(
task=task, status=TaskStatus.stopped,
new_status=TaskStatus.stopped,
status_reason="Forced stop (non-responsive)", status_reason="Forced stop (non-responsive)",
status_message="Forced stop (non-responsive)", status_message="Forced stop (non-responsive)",
force=True, status_changed=now,
).execute() last_update=now,
except errors.bad_request.FailedChangingTaskStatus: last_change=now,
)
if updated:
project_ids.add(task.project)
else:
err_count += 1 err_count += 1
except Exception as ex:
log.error("Failed setting status: %s", str(ex))
update_project_time(list(project_ids))
return len(tasks) - err_count return len(tasks) - err_count

View File

@ -34,7 +34,7 @@ from apiserver.timing_context import TimingContext
from apiserver.utilities.parameter_key_escaper import ParameterKeyEscaper from apiserver.utilities.parameter_key_escaper import ParameterKeyEscaper
from .artifacts import artifacts_prepare_for_save from .artifacts import artifacts_prepare_for_save
from .param_utils import params_prepare_for_save from .param_utils import params_prepare_for_save
from .utils import ChangeStatusRequest, validate_status_change from .utils import ChangeStatusRequest, validate_status_change, update_project_time
log = config.logger(__file__) log = config.logger(__file__)
org_bll = OrgBLL() org_bll = OrgBLL()
@ -275,6 +275,7 @@ class TaskBLL:
tags=updated_tags, tags=updated_tags,
system_tags=updated_system_tags, system_tags=updated_system_tags,
) )
update_project_time(new_task.project)
return new_task, new_project_data return new_task, new_project_data

View File

@ -1,5 +1,5 @@
from datetime import datetime from datetime import datetime
from typing import TypeVar, Callable, Tuple, Sequence from typing import TypeVar, Callable, Tuple, Sequence, Union
import attr import attr
import six import six
@ -153,9 +153,14 @@ def get_possible_status_changes(current_status):
return possible return possible
def update_project_time(project_id): def update_project_time(project_ids: Union[str, Sequence[str]]):
if project_id: if not project_ids:
Project.objects(id=project_id).update(last_update=datetime.utcnow()) return
if isinstance(project_ids, str):
project_ids = [project_ids]
return Project.objects(id__in=project_ids).update(last_update=datetime.utcnow())
T = TypeVar("T") T = TypeVar("T")

View File

@ -52,7 +52,7 @@ class Credentials(EmbeddedDocument):
class User(DbModelMixin, AuthDocument): class User(DbModelMixin, AuthDocument):
meta = {"db_alias": Database.auth, "strict": strict, "indexes": ["email"]} meta = {"db_alias": Database.auth, "strict": strict}
id = StringField(primary_key=True) id = StringField(primary_key=True)
name = StringField() name = StringField()
@ -72,5 +72,5 @@ class User(DbModelMixin, AuthDocument):
credentials = EmbeddedDocumentListField(Credentials, default=list) credentials = EmbeddedDocumentListField(Credentials, default=list)
""" Credentials generated for this user """ """ Credentials generated for this user """
email = EmailField(unique=True, required=True) email = EmailField()
""" Email uniquely identifying the user """ """ Email uniquely identifying the user """

View File

@ -504,9 +504,7 @@ def set_requirements(call: APICall, company_id, req_model: SetRequirementsReques
raise errors.bad_request.MissingTaskFields( raise errors.bad_request.MissingTaskFields(
"Task has no script field", task=task.id "Task has no script field", task=task.id
) )
res = update_task( res = update_task(task, update_cmds=dict(script__requirements=requirements))
task, update_cmds=dict(script__requirements=requirements)
)
call.result.data_model = UpdateResponse(updated=res) call.result.data_model = UpdateResponse(updated=res)
if res: if res:
call.result.data_model.fields = {"script.requirements": requirements} call.result.data_model.fields = {"script.requirements": requirements}
@ -559,7 +557,9 @@ def update_batch(call: APICall, company_id, _):
updated = res.modified_count updated = res.modified_count
if updated and updated_projects: if updated and updated_projects:
_reset_cached_tags(company_id, projects=list(updated_projects)) projects = list(updated_projects)
_reset_cached_tags(company_id, projects=projects)
update_project_time(project_ids=projects)
call.result.data = {"updated": updated} call.result.data = {"updated": updated}
@ -1110,6 +1110,7 @@ def delete(call: APICall, company_id, req_model: DeleteRequest):
task.delete() task.delete()
_reset_cached_tags(company_id, projects=[task.project]) _reset_cached_tags(company_id, projects=[task.project])
update_project_time(task.project)
call.result.data = dict(deleted=True, **attr.asdict(result)) call.result.data = dict(deleted=True, **attr.asdict(result))
@ -1215,9 +1216,10 @@ def move(call: APICall, company_id: str, request: MoveRequest):
"project or project_name is required" "project or project_name is required"
) )
with translate_errors_context(): updated_projects = set(
return { t.project for t in Task.objects(id__in=request.ids).only("project") if t.project
"project_id": project_bll.move_under_project( )
project_id = project_bll.move_under_project(
entity_cls=Task, entity_cls=Task,
user=call.identity.user, user=call.identity.user,
company=company_id, company=company_id,
@ -1225,4 +1227,9 @@ def move(call: APICall, company_id: str, request: MoveRequest):
project=request.project, project=request.project,
project_name=request.project_name, project_name=request.project_name,
) )
}
projects = list(updated_projects | {project_id})
_reset_cached_tags(company_id, projects=projects)
update_project_time(projects)
return {"project_id": project_id}