clearml-server/server/server.py

213 lines
6.6 KiB
Python
Raw Normal View History

import atexit
2019-06-10 21:24:35 +00:00
from argparse import ArgumentParser
from flask import Flask, request, Response
from flask_compress import Compress
from flask_cors import CORS
from werkzeug.exceptions import BadRequest
import database
from apierrors.base import BaseError
2019-12-14 21:33:04 +00:00
from bll.statistics.stats_reporter import StatisticsReporter
2019-06-10 21:24:35 +00:00
from config import config
2019-12-14 21:33:04 +00:00
from init_data import init_es_data, init_mongo_data
2019-06-10 21:24:35 +00:00
from service_repo import ServiceRepo, APICall
from service_repo.auth import AuthType
from service_repo.errors import PathParsingError
from timing_context import TimingContext
2019-10-28 19:49:16 +00:00
from updates import check_updates_thread
2019-12-14 21:33:04 +00:00
from utilities import json
from utilities.threads_manager import ThreadsManager
2019-06-10 21:24:35 +00:00
app = Flask(__name__, static_url_path="/static")
2019-07-17 15:17:27 +00:00
CORS(app, **config.get("apiserver.cors"))
2019-06-10 21:24:35 +00:00
Compress(app)
log = config.logger(__file__)
log.info("################ API Server initializing #####################")
app.config["SECRET_KEY"] = config.get("secure.http.session_secret.apiserver")
app.config["JSONIFY_PRETTYPRINT_REGULAR"] = config.get("apiserver.pretty_json")
database.initialize()
init_es_data()
init_mongo_data()
ServiceRepo.load("services")
log.info(f"Exposed Services: {' '.join(ServiceRepo.endpoint_names())}")
2019-10-28 19:49:16 +00:00
check_updates_thread.start()
2019-12-14 21:33:04 +00:00
StatisticsReporter.start()
2019-10-28 19:49:16 +00:00
def graceful_shutdown():
ThreadsManager.terminating = True
atexit.register(graceful_shutdown)
2019-06-10 21:24:35 +00:00
@app.before_first_request
def before_app_first_request():
pass
@app.before_request
def before_request():
if request.method == "OPTIONS":
return "", 200
if "/static/" in request.path:
return
try:
call = create_api_call(request)
content, content_type = ServiceRepo.handle_call(call)
headers = {}
2019-06-10 21:24:35 +00:00
if call.result.filename:
2019-12-14 21:33:04 +00:00
headers[
"Content-Disposition"
] = f"attachment; filename={call.result.filename}"
2019-06-10 21:24:35 +00:00
if call.result.headers:
headers.update(call.result.headers)
response = Response(
2019-06-10 21:24:35 +00:00
content, mimetype=content_type, status=call.result.code, headers=headers
)
if call.result.cookies:
for key, value in call.result.cookies.items():
2019-07-17 15:17:27 +00:00
if value is None:
response.set_cookie(key, "", expires=0)
else:
2019-12-14 21:33:04 +00:00
response.set_cookie(
key, value, **config.get("apiserver.auth.cookies")
)
return response
2019-06-10 21:24:35 +00:00
except Exception as ex:
log.exception(f"Failed processing request {request.url}: {ex}")
return f"Failed processing request {request.url}", 500
def update_call_data(call, req):
""" Use request payload/form to fill call data or batched data """
if req.content_type == "application/json-lines":
items = []
for i, line in enumerate(req.data.splitlines()):
try:
event = json.loads(line)
if not isinstance(event, dict):
raise BadRequest(
f"json lines must contain objects, found: {type(event).__name__}"
)
items.append(event)
except ValueError as e:
msg = f"{e} in batch item #{i}"
req.on_json_loading_failed(msg)
call.batched_data = items
else:
json_body = req.get_json(force=True, silent=False) if req.data else None
# merge form and args
form = req.form.copy()
form.update(req.args)
form = form.to_dict()
# convert string numbers to floats
for key in form:
if form[key].replace(".", "", 1).isdigit():
if "." in form[key]:
form[key] = float(form[key])
else:
form[key] = int(form[key])
elif form[key].lower() == "true":
form[key] = True
elif form[key].lower() == "false":
form[key] = False
call.data = json_body or form or {}
def _call_or_empty_with_error(call, req, msg, code=500, subcode=0):
call = call or APICall(
"", remote_addr=req.remote_addr, headers=dict(req.headers), files=req.files
)
call.set_error_result(msg=msg, code=code, subcode=subcode)
return call
def create_api_call(req):
call = None
try:
# Parse the request path
endpoint_version, endpoint_name = ServiceRepo.parse_endpoint_path(req.path)
# Resolve authorization: if cookies contain an authorization token, use it as a starting point.
# in any case, request headers always take precedence.
auth_cookie = req.cookies.get(
config.get("apiserver.auth.session_auth_cookie_name")
)
headers = (
{}
if not auth_cookie
else {"Authorization": f"{AuthType.bearer_token} {auth_cookie}"}
)
headers.update(
list(req.headers.items())
) # add (possibly override with) the headers
# Construct call instance
call = APICall(
endpoint_name=endpoint_name,
remote_addr=req.remote_addr,
endpoint_version=endpoint_version,
headers=headers,
files=req.files,
)
# Update call data from request
with TimingContext("preprocess", "update_call_data"):
update_call_data(call, req)
except PathParsingError as ex:
call = _call_or_empty_with_error(call, req, ex.args[0], 400)
call.log_api = False
except BadRequest as ex:
call = _call_or_empty_with_error(call, req, ex.description, 400)
except BaseError as ex:
call = _call_or_empty_with_error(call, req, ex.msg, ex.code, ex.subcode)
except Exception as ex:
log.exception("Error creating call")
call = _call_or_empty_with_error(
call, req, ex.args[0] if ex.args else type(ex).__name__, 500
)
return call
# =================== MAIN =======================
if __name__ == "__main__":
p = ArgumentParser(description=__doc__)
p.add_argument(
"--port", "-p", type=int, default=config.get("apiserver.listen.port")
)
p.add_argument("--ip", "-i", type=str, default=config.get("apiserver.listen.ip"))
p.add_argument(
"--debug", action="store_true", default=config.get("apiserver.debug")
)
p.add_argument(
"--watch", action="store_true", default=config.get("apiserver.watch")
)
args = p.parse_args()
# logging.info("Starting API Server at %s:%s and env '%s'" % (args.ip, args.port, config.env))
app.run(
debug=args.debug,
host=args.ip,
port=args.port,
threaded=True,
use_reloader=args.watch,
)