diff --git a/apiserver/apimodels/workers.py b/apiserver/apimodels/workers.py index 85b593d..b7123a2 100644 --- a/apiserver/apimodels/workers.py +++ b/apiserver/apimodels/workers.py @@ -96,6 +96,7 @@ class WorkerResponseEntry(WorkerEntry): class GetAllRequest(Base): last_seen = IntField(default=3600) + tags = ListField(str) class GetAllResponse(Base): diff --git a/apiserver/bll/workers/__init__.py b/apiserver/bll/workers/__init__.py index 3714388..9b37a44 100644 --- a/apiserver/bll/workers/__init__.py +++ b/apiserver/bll/workers/__init__.py @@ -76,7 +76,7 @@ class WorkerBLL: raise bad_request.InvalidUserId(**query) company = Company.objects(id=company_id).only("id", "name").first() if not company: - raise server_error.InternalError("invalid company", company=company_id) + raise bad_request.InvalidId("invalid company", company=company_id) queue_objs = Queue.objects(company=company_id, id__in=queues).only("id") if len(queue_objs) < len(queues): @@ -189,7 +189,10 @@ class WorkerBLL: self._save_worker(entry) def get_all( - self, company_id: str, last_seen: Optional[int] = None + self, + company_id: str, + last_seen: Optional[int] = None, + tags: Sequence[str] = None, ) -> Sequence[WorkerEntry]: """ Get all the company workers that were active during the last_seen period @@ -210,16 +213,26 @@ class WorkerBLL: if w.last_activity_time.replace(tzinfo=None) >= ref_time ] + if tags: + include = {t for t in tags if not t.startswith("-")} + exclude = {t[1:] for t in tags if t.startswith("-")} + workers = [ + w + for w in workers + if (not include or any(t in include for t in w.tags)) + and (not exclude or all(t not in exclude for t in w.tags)) + ] + return workers def get_all_with_projection( - self, company_id: str, last_seen: int + self, company_id: str, last_seen: int, tags: Sequence[str] = None ) -> Sequence[WorkerResponseEntry]: helpers = list( map( WorkerConversionHelper.from_worker_entry, - self.get_all(company_id=company_id, last_seen=last_seen), + self.get_all(company_id=company_id, last_seen=last_seen, tags=tags), ) ) diff --git a/apiserver/schema/services/workers.conf b/apiserver/schema/services/workers.conf index fc03f14..a234a31 100644 --- a/apiserver/schema/services/workers.conf +++ b/apiserver/schema/services/workers.conf @@ -1,152 +1,320 @@ -{ - _description: "Provides an API for worker machines, allowing workers to report status and get tasks for execution" - _definitions { - metrics_category { +_description: "Provides an API for worker machines, allowing workers to report status and get tasks for execution" +_definitions { + metrics_category { + type: object + properties { + name { + type: string + description: "Name of the metrics category." + } + metric_keys { + type: array + items { type: string } + description: "The names of the metrics in the category." + } + } + } + aggregation_type { + type: string + enum: [ avg, min, max ] + description: "Metric aggregation type" + } + stat_item { + type: object + properties { + key { + type: string + description: "Name of a metric" + } + category { + "$ref": "#/definitions/aggregation_type" + } + } + } + aggregation_stats { + type: object + properties { + aggregation { + "$ref": "#/definitions/aggregation_type" + } + values { + type: array + description: "List of values corresponding to the dates in metric statistics" + items { type: number } + } + } + } + metric_stats { + type: object + properties { + metric { + type: string + description: "Name of the metric ("cpu_usage", "memory_used" etc.)" + } + variant { + type: string + description: "Name of the metric component. Set only if 'split_by_variant' was set in the request" + } + dates { + type: array + description: "List of timestamps (in seconds from epoch) in the acceding order. The timestamps are separated by the requested interval. Timestamps where no workers activity was recorded are omitted." + items { type: integer } + } + stats { + type: array + description: "Statistics data by type" + items { "$ref": "#/definitions/aggregation_stats" } + } + } + } + worker_stats { + type: object + properties { + worker { + type: string + description: "ID of the worker" + } + metrics { + type: array + description: "List of the metrics statistics for the worker" + items { "$ref": "#/definitions/metric_stats" } + } + } + } + activity_series { + type: object + properties { + dates { + type: array + description: "List of timestamps (in seconds from epoch) in the acceding order. The timestamps are separated by the requested interval." + items {type: integer} + } + counts { + type: array + description: "List of worker counts corresponding to the timestamps in the dates list. None values are returned for the dates with no workers." + items {type: integer} + } + } + } + worker { + type: object + properties { + id { + description: "Worker ID" + type: string + } + user { + description: "Associated user (under whose credentials are used by the worker daemon)" + "$ref": "#/definitions/id_name_entry" + } + company { + description: "Associated company" + "$ref": "#/definitions/id_name_entry" + } + ip { + description: "IP of the worker" + type: string + } + register_time { + description: "Registration time" + type: string + format: "date-time" + } + last_activity_time { + description: "Last activity time (even if an error occurred)" + type: string + format: "date-time" + } + last_report_time { + description: "Last successful report time" + type: string + format: "date-time" + } + task { + description: "Task currently being run by the worker" + "$ref": "#/definitions/current_task_entry" + } + project { + description: "Project in which currently executing task resides" + "$ref": "#/definitions/id_name_entry" + } + queue { + description: "Queue from which running task was taken" + "$ref": "#/definitions/queue_entry" + } + queues { + description: "List of queues on which the worker is listening" + type: array + items { "$ref": "#/definitions/queue_entry" } + } + tags { + description: "User tags for the worker" + type: array + items: { type: string } + } + } + } + + id_name_entry { + type: object + properties { + id { + description: "ID" + type: string + } + name { + description: "Name" + type: string + } + } + } + + current_task_entry = ${_definitions.id_name_entry} { + properties { + running_time { + description: "Task running time" + type: integer + } + last_iteration { + description: "Last task iteration" + type: integer + } + } + } + + queue_entry = ${_definitions.id_name_entry} { + properties { + next_task { + description: "Next task in the queue" + "$ref": "#/definitions/id_name_entry" + } + num_tasks { + description: "Number of task entries in the queue" + type: integer + } + } + } + + machine_stats { + type: object + properties { + cpu_usage { + description: "Average CPU usage per core" + type: array + items { type: number } + } + gpu_usage { + description: "Average GPU usage per GPU card" + type: array + items { type: number } + } + memory_used { + description: "Used memory MBs" + type: integer + } + memory_free { + description: "Free memory MBs" + type: integer + } + gpu_memory_free { + description: "GPU free memory MBs" + type: array + items { type: integer } + } + gpu_memory_used { + description: "GPU used memory MBs" + type: array + items { type: integer } + } + network_tx { + description: "Mbytes per second" + type: integer + } + network_rx { + description: "Mbytes per second" + type: integer + } + disk_free_home { + description: "Mbytes free space of /home drive" + type: integer + } + disk_free_temp { + description: "Mbytes free space of /tmp drive" + type: integer + } + disk_read { + description: "Mbytes read per second" + type: integer + } + disk_write { + description: "Mbytes write per second" + type: integer + } + cpu_temperature { + description: "CPU temperature" + type: array + items { type: number } + } + gpu_temperature { + description: "GPU temperature" + type: array + items { type: number } + } + } + } +} +get_all { + "2.4" { + description: "Returns information on all registered workers." + request { type: object properties { - name { - type: string - description: "Name of the metrics category." - } - metric_keys { - type: array - items { type: string } - description: "The names of the metrics in the category." + last_seen { + description: """Filter out workers not active for more than last_seen seconds. + A value or 0 or 'none' will disable the filter.""" + type: integer + default: 3600 } } } - aggregation_type { - type: string - enum: [ avg, min, max ] - description: "Metric aggregation type" - } - stat_item { + response { type: object properties { - key { - type: string - description: "Name of a metric" - } - category { - "$ref": "#/definitions/aggregation_type" + workers { + type: array + items { "$ref": "#/definitions/worker" } } } } - aggregation_stats { - type: object - properties { - aggregation { - "$ref": "#/definitions/aggregation_type" - } - values { - type: array - description: "List of values corresponding to the dates in metric statistics" - items { type: number } - } - } + } + "999.0": ${get_all."2.4"} { + request.properties.tags { + description: The list of allowed worker tags. Prepend tag value with '-' in order to exclude + type: array + items { type: string } } - metric_stats { - type: object - properties { - metric { - type: string - description: "Name of the metric ("cpu_usage", "memory_used" etc.)" - } - variant { - type: string - description: "Name of the metric component. Set only if 'split_by_variant' was set in the request" - } - dates { - type: array - description: "List of timestamps (in seconds from epoch) in the acceding order. The timestamps are separated by the requested interval. Timestamps where no workers activity was recorded are omitted." - items { type: integer } - } - stats { - type: array - description: "Statistics data by type" - items { "$ref": "#/definitions/aggregation_stats" } - } - } - } - worker_stats { + } +} +register { + "2.4" { + description: "Register a worker in the system. Called by the Worker Daemon." + request { + required: [ worker ] type: object properties { worker { - type: string - description: "ID of the worker" - } - metrics { - type: array - description: "List of the metrics statistics for the worker" - items { "$ref": "#/definitions/metric_stats" } - } - } - } - activity_series { - type: object - properties { - dates { - type: array - description: "List of timestamps (in seconds from epoch) in the acceding order. The timestamps are separated by the requested interval." - items {type: integer} - } - counts { - type: array - description: "List of worker counts corresponding to the timestamps in the dates list. None values are returned for the dates with no workers." - items {type: integer} - } - } - } - worker { - type: object - properties { - id { - description: "Worker ID" + description: "Worker id. Must be unique in company." type: string } - user { - description: "Associated user (under whose credentials are used by the worker daemon)" - "$ref": "#/definitions/id_name_entry" - } - company { - description: "Associated company" - "$ref": "#/definitions/id_name_entry" - } - ip { - description: "IP of the worker" - type: string - } - register_time { - description: "Registration time" - type: string - format: "date-time" - } - last_activity_time { - description: "Last activity time (even if an error occurred)" - type: string - format: "date-time" - } - last_report_time { - description: "Last successful report time" - type: string - format: "date-time" - } - task { - description: "Task currently being run by the worker" - "$ref": "#/definitions/current_task_entry" - } - project { - description: "Project in which currently executing task resides" - "$ref": "#/definitions/id_name_entry" - } - queue { - description: "Queue from which running task was taken" - "$ref": "#/definitions/queue_entry" + timeout { + description: "Registration timeout in seconds. If timeout seconds have passed since the worker's last call to register or status_report, the worker is automatically removed from the list of registered workers." + type: integer + default: 600 } queues { - description: "List of queues on which the worker is listening" + description: "List of queue IDs on which the worker is listening." type: array - items { "$ref": "#/definitions/queue_entry" } + items { type: string } } tags { description: "User tags for the worker" @@ -155,348 +323,185 @@ } } } - - id_name_entry { + response { + type: object + properties {} + } + } +} +unregister { + "2.4" { + description: "Unregister a worker in the system. Called by the Worker Daemon." + request { + required: [ worker ] type: object properties { - id { - description: "ID" - type: string - } - name { - description: "Name" + worker { + description: "Worker id. Must be unique in company." type: string } } } - - current_task_entry = ${_definitions.id_name_entry} { - properties { - running_time { - description: "Task running time" - type: integer - } - last_iteration { - description: "Last task iteration" - type: integer - } - } + response { + type: object + properties {} } - - queue_entry = ${_definitions.id_name_entry} { - properties { - next_task { - description: "Next task in the queue" - "$ref": "#/definitions/id_name_entry" - } - num_tasks { - description: "Number of task entries in the queue" - type: integer - } - } - } - - machine_stats { + } +} +status_report { + "2.4" { + description: "Called periodically by the worker daemon to report machine status" + request { + required: [ + worker + timestamp + ] type: object properties { - cpu_usage { - description: "Average CPU usage per core" - type: array - items { type: number } + worker { + description: "Worker id." + type: string } - gpu_usage { - description: "Average GPU usage per GPU card" - type: array - items { type: number } + task { + description: "ID of a task currently being run by the worker. If no task is sent, the worker's task field will be cleared." + type: string } - memory_used { - description: "Used memory MBs" + queue { + description: "ID of the queue from which task was received. If no queue is sent, the worker's queue field will be cleared." + type: string + } + queues { + description: "List of queue IDs on which the worker is listening. If null, the worker's queues list will not be updated." + type: array + items { type: string } + } + timestamp { + description: "UNIX time in seconds since epoch." type: integer } - memory_free { - description: "Free memory MBs" - type: integer + machine_stats { + description: "The machine statistics." + "$ref": "#/definitions/machine_stats" } - gpu_memory_free { - description: "GPU free memory MBs" + tags { + description: "New user tags for the worker" type: array - items { type: integer } + items: { type: string } } - gpu_memory_used { - description: "GPU used memory MBs" + } + } + response { + type: object + properties {} + } + } +} +get_metric_keys { + "2.4" { + description: "Returns worker statistics metric keys grouped by categories." + request { + type: object + properties { + worker_ids { + description: "List of worker ids to collect metrics for. If not provided or empty then all the company workers metrics are analyzed." type: array - items { type: integer } + items { type: string } } - network_tx { - description: "Mbytes per second" - type: integer - } - network_rx { - description: "Mbytes per second" - type: integer - } - disk_free_home { - description: "Mbytes free space of /home drive" - type: integer - } - disk_free_temp { - description: "Mbytes free space of /tmp drive" - type: integer - } - disk_read { - description: "Mbytes read per second" - type: integer - } - disk_write { - description: "Mbytes write per second" - type: integer - } - cpu_temperature { - description: "CPU temperature" + } + } + response { + type: object + properties { + categories { type: array - items { type: number } + description: "List of unique metric categories found in the statistics of the requested workers." + items { "$ref": "#/definitions/metrics_category" } } - gpu_temperature { - description: "GPU temperature" + } + } + } +} +get_stats { + "2.4" { + description: "Returns statistics for the selected workers and time range aggregated by date intervals." + request { + type: object + required: [ from_date, to_date, interval, items ] + properties { + worker_ids { + description: "List of worker ids to collect metrics for. If not provided or empty then all the company workers metrics are analyzed." type: array - items { type: number } + items { type: string } + } + from_date { + description: "Starting time (in seconds from epoch) for collecting statistics" + type: number + } + to_date { + description: "Ending time (in seconds from epoch) for collecting statistics" + type: number + } + interval { + description: "Time interval in seconds for a single statistics point. The minimal value is 1" + type: integer + } + items { + description: "List of metric keys and requested statistics" + type: array + items { "$ref": "#/definitions/stat_item" } + } + split_by_variant { + description: "If true then break statistics by hardware sub types" + type: boolean + default: false + } + } + } + response { + type: object + properties { + workers { + type: array + description: "List of the requested workers with their statistics" + items { "$ref": "#/definitions/worker_stats" } } } } } - get_all { - "2.4" { - description: "Returns information on all registered workers." - request { - type: object - properties { - last_seen { - description: """Filter out workers not active for more than last_seen seconds. - A value or 0 or 'none' will disable the filter.""" - type: integer - default: 3600 - } +} +get_activity_report { + "2.4" { + description: "Returns count of active company workers in the selected time range." + request { + type: object + required: [ from_date, to_date, interval ] + properties { + from_date { + description: "Starting time (in seconds from epoch) for collecting statistics" + type: number + } + to_date { + description: "Ending time (in seconds from epoch) for collecting statistics" + type: number + } + interval { + description: "Time interval in seconds for a single statistics point. The minimal value is 1" + type: integer } } - response { - type: object - properties { - workers { - type: array - items { "$ref": "#/definitions/worker" } - } + } + response { + type: object + properties { + total { + description: "Activity series that include all the workers that sent reports in the given time interval." + "$ref": "#/definitions/activity_series" + } + active { + description: "Activity series that include only workers that worked on a task in the given time interval." + "$ref": "#/definitions/activity_series" } } } } - register { - "2.4" { - description: "Register a worker in the system. Called by the Worker Daemon." - request { - required: [ worker ] - type: object - properties { - worker { - description: "Worker id. Must be unique in company." - type: string - } - timeout { - description: "Registration timeout in seconds. If timeout seconds have passed since the worker's last call to register or status_report, the worker is automatically removed from the list of registered workers." - type: integer - default: 600 - } - queues { - description: "List of queue IDs on which the worker is listening." - type: array - items { type: string } - } - tags { - description: "User tags for the worker" - type: array - items: { type: string } - } - } - } - response { - type: object - properties {} - } - } - } - unregister { - "2.4" { - description: "Unregister a worker in the system. Called by the Worker Daemon." - request { - required: [ worker ] - type: object - properties { - worker { - description: "Worker id. Must be unique in company." - type: string - } - } - } - response { - type: object - properties {} - } - } - } - status_report { - "2.4" { - description: "Called periodically by the worker daemon to report machine status" - request { - required: [ - worker - timestamp - ] - type: object - properties { - worker { - description: "Worker id." - type: string - } - task { - description: "ID of a task currently being run by the worker. If no task is sent, the worker's task field will be cleared." - type: string - } - queue { - description: "ID of the queue from which task was received. If no queue is sent, the worker's queue field will be cleared." - type: string - } - queues { - description: "List of queue IDs on which the worker is listening. If null, the worker's queues list will not be updated." - type: array - items { type: string } - } - timestamp { - description: "UNIX time in seconds since epoch." - type: integer - } - machine_stats { - description: "The machine statistics." - "$ref": "#/definitions/machine_stats" - } - tags { - description: "New user tags for the worker" - type: array - items: { type: string } - } - } - } - response { - type: object - properties {} - } - } - } - get_metric_keys { - "2.4" { - description: "Returns worker statistics metric keys grouped by categories." - request { - type: object - properties { - worker_ids { - description: "List of worker ids to collect metrics for. If not provided or empty then all the company workers metrics are analyzed." - type: array - items { type: string } - } - } - } - response { - type: object - properties { - categories { - type: array - description: "List of unique metric categories found in the statistics of the requested workers." - items { "$ref": "#/definitions/metrics_category" } - } - } - } - } - } - get_stats { - "2.4" { - description: "Returns statistics for the selected workers and time range aggregated by date intervals." - request { - type: object - required: [ from_date, to_date, interval, items ] - properties { - worker_ids { - description: "List of worker ids to collect metrics for. If not provided or empty then all the company workers metrics are analyzed." - type: array - items { type: string } - } - from_date { - description: "Starting time (in seconds from epoch) for collecting statistics" - type: number - } - to_date { - description: "Ending time (in seconds from epoch) for collecting statistics" - type: number - } - interval { - description: "Time interval in seconds for a single statistics point. The minimal value is 1" - type: integer - } - items { - description: "List of metric keys and requested statistics" - type: array - items { "$ref": "#/definitions/stat_item" } - } - split_by_variant { - description: "If true then break statistics by hardware sub types" - type: boolean - default: false - } - } - } - response { - type: object - properties { - workers { - type: array - description: "List of the requested workers with their statistics" - items { "$ref": "#/definitions/worker_stats" } - } - } - } - } - } - get_activity_report { - "2.4" { - description: "Returns count of active company workers in the selected time range." - request { - type: object - required: [ from_date, to_date, interval ] - properties { - from_date { - description: "Starting time (in seconds from epoch) for collecting statistics" - type: number - } - to_date { - description: "Ending time (in seconds from epoch) for collecting statistics" - type: number - } - interval { - description: "Time interval in seconds for a single statistics point. The minimal value is 1" - type: integer - } - } - } - response { - type: object - properties { - total { - description: "Activity series that include all the workers that sent reports in the given time interval." - "$ref": "#/definitions/activity_series" - } - active { - description: "Activity series that include only workers that worked on a task in the given time interval." - "$ref": "#/definitions/activity_series" - } - } - } - } - } -} \ No newline at end of file +} diff --git a/apiserver/services/workers.py b/apiserver/services/workers.py index 0e0a87b..a2e4906 100644 --- a/apiserver/services/workers.py +++ b/apiserver/services/workers.py @@ -41,7 +41,9 @@ worker_bll = WorkerBLL() ) def get_all(call: APICall, company_id: str, request: GetAllRequest): call.result.data_model = GetAllResponse( - workers=worker_bll.get_all_with_projection(company_id, request.last_seen) + workers=worker_bll.get_all_with_projection( + company_id, request.last_seen, tags=request.tags + ) ) @@ -72,7 +74,9 @@ def unregister(call: APICall, company_id, req_model: WorkerRequest): worker_bll.unregister_worker(company_id, call.identity.user, req_model.worker) -@endpoint("workers.status_report", min_version="2.4", request_data_model=StatusReportRequest) +@endpoint( + "workers.status_report", min_version="2.4", request_data_model=StatusReportRequest +) def status_report(call: APICall, company_id, request: StatusReportRequest): worker_bll.status_report( company_id=company_id,