mirror of
https://github.com/open-webui/open-webui
synced 2025-06-26 18:26:48 +00:00
feat(trace): optimize for trace env and instrument hooks
This commit is contained in:
@@ -1,8 +1,14 @@
|
||||
import logging
|
||||
import traceback
|
||||
from typing import Collection
|
||||
from typing import Collection, Union
|
||||
|
||||
from aiohttp import (
|
||||
TraceRequestStartParams,
|
||||
TraceRequestEndParams,
|
||||
TraceRequestExceptionParams,
|
||||
)
|
||||
from chromadb.telemetry.opentelemetry.fastapi import instrument_fastapi
|
||||
from fastapi import FastAPI
|
||||
from opentelemetry.instrumentation.httpx import (
|
||||
HTTPXClientInstrumentor,
|
||||
RequestInfo,
|
||||
@@ -17,6 +23,8 @@ from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrument
|
||||
from opentelemetry.trace import Span, StatusCode
|
||||
from redis import Redis
|
||||
from requests import PreparedRequest, Response
|
||||
from sqlalchemy import Engine
|
||||
from fastapi import status
|
||||
|
||||
from open_webui.utils.trace.constants import SPAN_REDIS_TYPE, SpanAttributes
|
||||
|
||||
@@ -105,7 +113,7 @@ def httpx_response_hook(span: Span, request: RequestInfo, response: ResponseInfo
|
||||
)
|
||||
|
||||
|
||||
async def httpx_async_request_hook(span, request):
|
||||
async def httpx_async_request_hook(span: Span, request: RequestInfo):
|
||||
"""
|
||||
Async Request Hook
|
||||
"""
|
||||
@@ -113,7 +121,9 @@ async def httpx_async_request_hook(span, request):
|
||||
httpx_request_hook(span, request)
|
||||
|
||||
|
||||
async def httpx_async_response_hook(span, request, response):
|
||||
async def httpx_async_response_hook(
|
||||
span: Span, request: RequestInfo, response: ResponseInfo
|
||||
):
|
||||
"""
|
||||
Async Response Hook
|
||||
"""
|
||||
@@ -121,20 +131,54 @@ async def httpx_async_response_hook(span, request, response):
|
||||
httpx_response_hook(span, request, response)
|
||||
|
||||
|
||||
def aiohttp_request_hook(span: Span, request: TraceRequestStartParams):
|
||||
"""
|
||||
Aiohttp Request Hook
|
||||
"""
|
||||
|
||||
span.update_name(f"{request.method} {str(request.url)}")
|
||||
span.set_attributes(
|
||||
attributes={
|
||||
SpanAttributes.HTTP_URL: str(request.url),
|
||||
SpanAttributes.HTTP_METHOD: request.method,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def aiohttp_response_hook(
|
||||
span: Span, response: Union[TraceRequestExceptionParams, TraceRequestEndParams]
|
||||
):
|
||||
"""
|
||||
Aiohttp Response Hook
|
||||
"""
|
||||
|
||||
if isinstance(response, TraceRequestEndParams):
|
||||
span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.response.status)
|
||||
span.set_status(
|
||||
StatusCode.ERROR
|
||||
if response.response.status >= status.HTTP_400_BAD_REQUEST
|
||||
else StatusCode.OK
|
||||
)
|
||||
elif isinstance(response, TraceRequestExceptionParams):
|
||||
span.set_status(StatusCode.ERROR)
|
||||
span.set_attribute(SpanAttributes.ERROR_MESSAGE, str(response.exception))
|
||||
|
||||
|
||||
class Instrumentor(BaseInstrumentor):
|
||||
"""
|
||||
Instrument OT
|
||||
"""
|
||||
|
||||
def __init__(self, app):
|
||||
def __init__(self, app: FastAPI, db_engine: Engine):
|
||||
self.app = app
|
||||
self.db_engine = db_engine
|
||||
|
||||
def instrumentation_dependencies(self) -> Collection[str]:
|
||||
return []
|
||||
|
||||
def _instrument(self, **kwargs):
|
||||
instrument_fastapi(app=self.app)
|
||||
SQLAlchemyInstrumentor().instrument()
|
||||
SQLAlchemyInstrumentor().instrument(engine=self.db_engine)
|
||||
RedisInstrumentor().instrument(request_hook=redis_request_hook)
|
||||
RequestsInstrumentor().instrument(
|
||||
request_hook=requests_hook, response_hook=response_hook
|
||||
@@ -146,7 +190,10 @@ class Instrumentor(BaseInstrumentor):
|
||||
async_request_hook=httpx_async_request_hook,
|
||||
async_response_hook=httpx_async_response_hook,
|
||||
)
|
||||
AioHttpClientInstrumentor().instrument()
|
||||
AioHttpClientInstrumentor().instrument(
|
||||
request_hook=aiohttp_request_hook,
|
||||
response_hook=aiohttp_response_hook,
|
||||
)
|
||||
|
||||
def _uninstrument(self, **kwargs):
|
||||
if getattr(self, "instrumentors", None) is None:
|
||||
|
||||
@@ -1,24 +1,23 @@
|
||||
from fastapi import FastAPI
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
|
||||
from sqlalchemy import Engine
|
||||
|
||||
from open_webui.utils.trace.exporters import LazyBatchSpanProcessor
|
||||
from open_webui.utils.trace.instrumentors import Instrumentor
|
||||
from open_webui.env import OT_SERVICE_NAME, OT_HOST, OT_TOKEN
|
||||
from open_webui.env import OTEL_SERVICE_NAME, OTEL_EXPORTER_OTLP_ENDPOINT
|
||||
|
||||
|
||||
def setup(app):
|
||||
def setup(app: FastAPI, db_engine: Engine):
|
||||
# set up trace
|
||||
trace.set_tracer_provider(
|
||||
TracerProvider(
|
||||
resource=Resource.create(
|
||||
{SERVICE_NAME: OT_SERVICE_NAME, "token": OT_TOKEN}
|
||||
),
|
||||
sampler=ALWAYS_ON,
|
||||
resource=Resource.create(attributes={SERVICE_NAME: OTEL_SERVICE_NAME})
|
||||
)
|
||||
)
|
||||
# otlp
|
||||
exporter = OTLPSpanExporter(endpoint=OT_HOST)
|
||||
# otlp export
|
||||
exporter = OTLPSpanExporter(endpoint=OTEL_EXPORTER_OTLP_ENDPOINT)
|
||||
trace.get_tracer_provider().add_span_processor(LazyBatchSpanProcessor(exporter))
|
||||
Instrumentor(app=app).instrument()
|
||||
Instrumentor(app=app, db_engine=db_engine).instrument()
|
||||
|
||||
Reference in New Issue
Block a user