diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index 245a046..ab5d937 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -23,6 +23,17 @@ import requests from utils.pipelines.main import pop_system_message +REASONING_EFFORT_BUDGET_TOKEN_MAP = { + "none": None, + "low": 1024, + "medium": 4096, + "high": 16384, + "max": 32768, +} + +# Maximum combined token limit for Claude 3.7 +MAX_COMBINED_TOKENS = 64000 + class Pipeline: class Valves(BaseModel): @@ -170,7 +181,44 @@ class Pipeline: }, "additionalModelRequestFields": {"top_k": body.get("top_k", 200)} } + if body.get("stream", False): + supports_thinking = "claude-3-7" in model_id + reasoning_effort = body.get("reasoning_effort", "none") + budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort) + + # Allow users to input an integer value representing budget tokens + if ( + not budget_tokens + and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys() + ): + try: + budget_tokens = int(reasoning_effort) + except ValueError as e: + print("Failed to convert reasoning effort to int", e) + budget_tokens = None + + if supports_thinking and budget_tokens: + # Check if the combined tokens (budget_tokens + max_tokens) exceeds the limit + max_tokens = payload.get("max_tokens", 4096) + combined_tokens = budget_tokens + max_tokens + + if combined_tokens > MAX_COMBINED_TOKENS: + error_message = f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {MAX_COMBINED_TOKENS}" + print(error_message) + return error_message + + payload["inferenceConfig"]["maxTokens"] = combined_tokens + payload["additionalModelRequestFields"]["thinking"] = { + "type": "enabled", + "budget_tokens": budget_tokens, + } + # Thinking requires temperature 1.0 and does not support top_p, top_k + payload["inferenceConfig"]["temperature"] = 1.0 + if "top_k" in payload["additionalModelRequestFields"]: + del payload["additionalModelRequestFields"]["top_k"] + if "topP" in payload["inferenceConfig"]: + del payload["inferenceConfig"]["topP"] return self.stream_response(model_id, payload) else: return self.get_completion(model_id, payload) @@ -194,11 +242,23 @@ class Pipeline: def stream_response(self, model_id: str, payload: dict) -> Generator: streaming_response = self.bedrock_runtime.converse_stream(**payload) + + in_resasoning_context = False for chunk in streaming_response["stream"]: - if "contentBlockDelta" in chunk: - yield chunk["contentBlockDelta"]["delta"]["text"] + if in_resasoning_context and "contentBlockStop" in chunk: + in_resasoning_context = False + yield "\n \n\n" + elif "contentBlockDelta" in chunk and "delta" in chunk["contentBlockDelta"]: + if "reasoningContent" in chunk["contentBlockDelta"]["delta"]: + if not in_resasoning_context: + yield "" + + in_resasoning_context = True + if "text" in chunk["contentBlockDelta"]["delta"]["reasoningContent"]: + yield chunk["contentBlockDelta"]["delta"]["reasoningContent"]["text"] + elif "text" in chunk["contentBlockDelta"]["delta"]: + yield chunk["contentBlockDelta"]["delta"]["text"] def get_completion(self, model_id: str, payload: dict) -> str: response = self.bedrock_runtime.converse(**payload) return response['output']['message']['content'][0]['text'] -