use AsyncLocalStorageContextManager to propagate trace context around (#32)

I noticed that the `llm.chat.*_tokens` attributes weren't in spans, and it turns out none of the attributes from the wrapped function make it in.  only those added synchronously from within `wrapWithSpan` do.

A few things in play here:

1. Switch from `ZoneContextManager` (which doesn't work with native async/await) to `AsyncLocalStorageContextManager` (which does.)
2. (1) requires we use a pnpm patch, since wrangler requires that we import `async_hooks` statically, and the otel code `requires` it, and also requires that it's spelled `node:async_hooks`.
3. encode the payload we try to send _before_ doing the retry loop, so we don't re-encode for each attempt.
4. switch from `startActiveSpan` to `context.with` + `startSpan`.  The latter allows us to wrap an async function.  the former is for synchronous functions.
This commit is contained in:
Chris Toshok 2025-02-19 06:49:32 -08:00 committed by GitHub
parent c52ae1f9da
commit 8d3a8eeb3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 92 additions and 54 deletions

View File

@ -1,13 +1,14 @@
import type { CoreMessage } from "ai";
import Anthropic from "@anthropic-ai/sdk";
import { ChatStreamController } from "~/utils/chatStreamController";
import type { ContentBlockParam, MessageParam } from "@anthropic-ai/sdk/resources/messages/messages.mjs";
import type { FileMap } from "./stream-text";
import { StreamingMessageParser } from "~/lib/runtime/message-parser";
import { extractRelativePath } from "~/utils/diff";
import { wrapWithSpan, getCurrentSpan } from "~/lib/.server/otel";
import type { CoreMessage } from 'ai';
import Anthropic from '@anthropic-ai/sdk';
import { ChatStreamController } from '~/utils/chatStreamController';
import type { ContentBlockParam, MessageParam } from '@anthropic-ai/sdk/resources/messages/messages.mjs';
import type { FileMap } from './stream-text';
import { StreamingMessageParser } from '~/lib/runtime/message-parser';
import { extractRelativePath } from '~/utils/diff';
import { wrapWithSpan, getCurrentSpan } from '~/lib/.server/otel';
import { context } from '@opentelemetry/api';
const Model = "claude-3-5-sonnet-20241022";
const Model = 'claude-3-5-sonnet-20241022';
const MaxMessageTokens = 8192;
function convertContentToAnthropic(content: any): ContentBlockParam[] {
@ -79,7 +80,7 @@ const callAnthropic = wrapWithSpan(
console.log("************************************************");
const response = await anthropic.messages.create({
model: "claude-3-5-sonnet-20241022",
model: Model,
messages,
max_tokens: MaxMessageTokens,
system: systemPrompt,

View File

@ -1,6 +1,6 @@
import type { Tracer } from '@opentelemetry/api';
import { SpanStatusCode, type Attributes, context, trace } from '@opentelemetry/api';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks';
import type { ExportResult } from '@opentelemetry/core';
import { ExportResultCode } from '@opentelemetry/core';
import type { ExportServiceError, OTLPExporterConfigBase } from '@opentelemetry/otlp-exporter-base';
@ -109,6 +109,13 @@ export class OTLPExporter implements SpanExporter {
return;
}
const exportMessage = createExportTraceServiceRequest(spans, {
useHex: true,
useLongBits: false,
});
const exportPayload = JSON.stringify(exportMessage);
let currentRetry = 0;
// types involving config objects with optional fields are such a pain, hence the defaults here.
@ -120,7 +127,7 @@ export class OTLPExporter implements SpanExporter {
await this._semaphore.acquire();
try {
await this._sendSpans(spans);
await this._send(exportPayload);
return;
} finally {
this._semaphore.release();
@ -140,16 +147,15 @@ export class OTLPExporter implements SpanExporter {
}
}
private async _sendSpans(spans: ReadableSpan[]): Promise<void> {
const {url = defaultOptions.url, timeoutMillis = defaultOptions.timeoutMillis, headers = defaultOptions.headers } = this._config;
private async _send(payload: string): Promise<void> {
const {
url = defaultOptions.url,
timeoutMillis = defaultOptions.timeoutMillis,
headers = defaultOptions.headers,
} = this._config;
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMillis);
const exportMessage = createExportTraceServiceRequest(spans, {
useHex: true,
useLongBits: false,
});
try {
const response = await fetch(url, {
method: 'POST',
@ -157,7 +163,7 @@ export class OTLPExporter implements SpanExporter {
'Content-Type': 'application/json',
...headers,
},
body: JSON.stringify(exportMessage),
body: payload,
signal: controller.signal,
});
@ -184,9 +190,9 @@ export class OTLPExporter implements SpanExporter {
}
}
export function createTracer(context: AppLoadContext) {
const honeycombApiKey = (context.cloudflare.env as any).HONEYCOMB_API_KEY;
const honeycombDataset = (context.cloudflare.env as any).HONEYCOMB_DATASET;
export function createTracer(appContext: AppLoadContext) {
const honeycombApiKey = (appContext.cloudflare.env as any).HONEYCOMB_API_KEY;
const honeycombDataset = (appContext.cloudflare.env as any).HONEYCOMB_DATASET;
if (!honeycombApiKey || !honeycombDataset) {
console.warn('OpenTelemetry initialization skipped: HONEYCOMB_API_KEY and/or HONEYCOMB_DATASET not set');
@ -214,9 +220,10 @@ export function createTracer(context: AppLoadContext) {
spanProcessors: [new SimpleSpanProcessor(exporter), new SimpleSpanProcessor(new ConsoleSpanExporter())],
});
provider.register({
contextManager: new ZoneContextManager(),
});
const contextManager = new AsyncLocalStorageContextManager();
context.setGlobalContextManager(contextManager);
provider.register({ contextManager });
return provider.getTracer('nut-server');
} catch (e) {
@ -265,8 +272,14 @@ export function wrapWithSpan<Args extends any[], T>(
opts: SpanOptions,
fn: (...args: Args) => Promise<T>,
): (...args: Args) => Promise<T> {
return async (...args: Args) => {
return ensureTracer().startActiveSpan(opts.name, async (span) => {
return (...args: Args) => {
const span = ensureTracer().startSpan(opts.name);
if (opts.attrs) {
span.setAttributes(opts.attrs);
}
return context.with(trace.setSpan(context.active(), span), async () => {
if (opts.attrs) {
span.setAttributes(opts.attrs);
}
@ -294,5 +307,5 @@ export function wrapWithSpan<Args extends any[], T>(
}
export function getCurrentSpan() {
return trace.getSpan(context.active());
return trace.getActiveSpan();
}

View File

@ -62,7 +62,7 @@
"@octokit/types": "^13.6.2",
"@openrouter/ai-sdk-provider": "^0.0.5",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/context-zone": "^1.30.1",
"@opentelemetry/context-async-hooks": "^1.30.1",
"@opentelemetry/core": "^1.26.0",
"@opentelemetry/otlp-exporter-base": "^0.53.0",
"@opentelemetry/otlp-transformer": "^0.53.0",
@ -147,5 +147,10 @@
"resolutions": {
"@typescript-eslint/utils": "^8.0.0-alpha.30"
},
"packageManager": "pnpm@9.4.0"
"packageManager": "pnpm@9.4.0",
"pnpm": {
"patchedDependencies": {
"@opentelemetry/context-async-hooks@1.30.1": "patches/@opentelemetry__context-async-hooks@1.30.1.patch"
}
}
}

View File

@ -0,0 +1,32 @@
diff --git a/build/src/AsyncHooksContextManager.js b/build/src/AsyncHooksContextManager.js
index 3b8866855dd900570cbd126b9ca57bd91e1d7cb5..b55f3fb34b54695e06fea6a3e39dcbb133d0c594 100644
--- a/build/src/AsyncHooksContextManager.js
+++ b/build/src/AsyncHooksContextManager.js
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import * as asyncHooks from "node:async_hooks";
Object.defineProperty(exports, "__esModule", { value: true });
exports.AsyncHooksContextManager = void 0;
const api_1 = require("@opentelemetry/api");
-const asyncHooks = require("async_hooks");
const AbstractAsyncHooksContextManager_1 = require("./AbstractAsyncHooksContextManager");
class AsyncHooksContextManager extends AbstractAsyncHooksContextManager_1.AbstractAsyncHooksContextManager {
constructor() {
diff --git a/build/src/AsyncLocalStorageContextManager.js b/build/src/AsyncLocalStorageContextManager.js
index 86fe67194acd7af0b8e00fd0eb10dc2edf8945f0..65baff025a8d6a8fafbeae1f6f91906d08a657e6 100644
--- a/build/src/AsyncLocalStorageContextManager.js
+++ b/build/src/AsyncLocalStorageContextManager.js
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import * as async_hooks_1 from "node:async_hooks";
Object.defineProperty(exports, "__esModule", { value: true });
exports.AsyncLocalStorageContextManager = void 0;
const api_1 = require("@opentelemetry/api");
-const async_hooks_1 = require("async_hooks");
const AbstractAsyncHooksContextManager_1 = require("./AbstractAsyncHooksContextManager");
class AsyncLocalStorageContextManager extends AbstractAsyncHooksContextManager_1.AbstractAsyncHooksContextManager {
constructor() {

View File

@ -7,6 +7,11 @@ settings:
overrides:
'@typescript-eslint/utils': ^8.0.0-alpha.30
patchedDependencies:
'@opentelemetry/context-async-hooks@1.30.1':
hash: k7tq4qcxof3n2buu2yqtrqhjhu
path: patches/@opentelemetry__context-async-hooks@1.30.1.patch
importers:
.:
@ -107,9 +112,9 @@ importers:
'@opentelemetry/api':
specifier: ^1.9.0
version: 1.9.0
'@opentelemetry/context-zone':
'@opentelemetry/context-async-hooks':
specifier: ^1.30.1
version: 1.30.1(@opentelemetry/api@1.9.0)
version: 1.30.1(patch_hash=k7tq4qcxof3n2buu2yqtrqhjhu)(@opentelemetry/api@1.9.0)
'@opentelemetry/core':
specifier: ^1.26.0
version: 1.30.1(@opentelemetry/api@1.9.0)
@ -1681,16 +1686,11 @@ packages:
resolution: {integrity: sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==}
engines: {node: '>=8.0.0'}
'@opentelemetry/context-zone-peer-dep@1.30.1':
resolution: {integrity: sha512-8oJQR+MBblY5WGOm26AOBLogN+FoO26QEZssH9WJd1qSD7ABMefq9qyGyN9xilFFl2ABPHgcMzkfYb8vx9JBEA==}
'@opentelemetry/context-async-hooks@1.30.1':
resolution: {integrity: sha512-s5vvxXPVdjqS3kTLKMeBMvop9hbWkwzBpu+mUO2M7sZtlkyDJGwFe33wRKnbaYDo8ExRVBIIdwIGrqpxHuKttA==}
engines: {node: '>=14'}
peerDependencies:
'@opentelemetry/api': '>=1.0.0 <1.10.0'
zone.js: ^0.10.2 || ^0.11.0 || ^0.12.0 || ^0.13.0 || ^0.14.0 || ^0.15.0
'@opentelemetry/context-zone@1.30.1':
resolution: {integrity: sha512-N6CACt5sxXD6XzS2jJPqstNJZ/QFqeW56IiKfHb6hYOelCXVvYfxheF7byAeRXa7+N8rmXUP7aRdupALXP5hdQ==}
engines: {node: '>=14'}
'@opentelemetry/core@1.26.0':
resolution: {integrity: sha512-1iKxXXE8415Cdv0yjG3G6hQnB5eVEsJce3QaawX8SjDn0mAS0ZM8fAbZZJD4ajvhC15cePvosSCut404KrIIvQ==}
@ -6350,9 +6350,6 @@ packages:
zod@3.23.8:
resolution: {integrity: sha512-XBx9AXhXktjUqnepgTiE5flcKIYWi/rme0Eaj+5Y0lftuGBq+jyRu/md4WnuxqgP1ubdpNCsYEYPxrzVHD8d6g==}
zone.js@0.15.0:
resolution: {integrity: sha512-9oxn0IIjbCZkJ67L+LkhYWRyAy7axphb3VgE2MBDlOqnmHMPWGYMxJxBYFueFq/JGY2GMwS0rU+UCLunEmy5UA==}
zwitch@2.0.4:
resolution: {integrity: sha512-bXE4cR/kVZhKZX/RjPEflHaKVhUVl85noU3v6b8apfQEc1x4A+zBxjZ4lN8LqGd6WZ3dl98pY4o717VFmoPp+A==}
@ -7932,17 +7929,9 @@ snapshots:
'@opentelemetry/api@1.9.0': {}
'@opentelemetry/context-zone-peer-dep@1.30.1(@opentelemetry/api@1.9.0)(zone.js@0.15.0)':
'@opentelemetry/context-async-hooks@1.30.1(patch_hash=k7tq4qcxof3n2buu2yqtrqhjhu)(@opentelemetry/api@1.9.0)':
dependencies:
'@opentelemetry/api': 1.9.0
zone.js: 0.15.0
'@opentelemetry/context-zone@1.30.1(@opentelemetry/api@1.9.0)':
dependencies:
'@opentelemetry/context-zone-peer-dep': 1.30.1(@opentelemetry/api@1.9.0)(zone.js@0.15.0)
zone.js: 0.15.0
transitivePeerDependencies:
- '@opentelemetry/api'
'@opentelemetry/core@1.26.0(@opentelemetry/api@1.9.0)':
dependencies:
@ -13549,6 +13538,4 @@ snapshots:
zod@3.23.8: {}
zone.js@0.15.0: {}
zwitch@2.0.4: {}