From 827b47d2d5f3a2f5a12b1138ae57a813b10ff66e Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 12:01:14 +0900 Subject: [PATCH 1/6] Fix system message handling and payload cleanup in AWS Bedrock Claude Pipeline - Corrected the system message extraction to use the "content" field. - Removed unnecessary deletion of the "system" field from the payload in stream_response method. --- examples/pipelines/providers/aws_bedrock_claude_pipeline.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index 4990927..9d03426 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -139,7 +139,7 @@ class Pipeline: payload = {"modelId": model_id, "messages": processed_messages, - "system": [{'text': system_message if system_message else 'you are an intelligent ai assistant'}], + "system": [{'text': system_message["content"] if system_message else 'you are an intelligent ai assistant'}], "inferenceConfig": {"temperature": body.get("temperature", 0.5)}, "additionalModelRequestFields": {"top_k": body.get("top_k", 200), "top_p": body.get("top_p", 0.9)} } @@ -166,8 +166,6 @@ class Pipeline: } def stream_response(self, model_id: str, payload: dict) -> Generator: - if "system" in payload: - del payload["system"] if "additionalModelRequestFields" in payload: del payload["additionalModelRequestFields"] streaming_response = self.bedrock_runtime.converse_stream(**payload) From 51e267c10f68bee3847dab2ddce61781892c394f Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 13:29:15 +0900 Subject: [PATCH 2/6] Refactor payload structure to comply with Bedrock Converse API - Updated `inferenceConfig` to include `temperature`, `topP`, `maxTokens`, and `stopSequences`. - Added `additionalModelRequestFields` with `top_k` parameter. - Removed unnecessary deletion of `additionalModelRequestFields` in `stream_response` method. --- .../providers/aws_bedrock_claude_pipeline.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index 9d03426..80edbd7 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -140,8 +140,13 @@ class Pipeline: payload = {"modelId": model_id, "messages": processed_messages, "system": [{'text': system_message["content"] if system_message else 'you are an intelligent ai assistant'}], - "inferenceConfig": {"temperature": body.get("temperature", 0.5)}, - "additionalModelRequestFields": {"top_k": body.get("top_k", 200), "top_p": body.get("top_p", 0.9)} + "inferenceConfig": { + "temperature": body.get("temperature", 0.5), + "topP": body.get("top_p", 0.9), + "maxTokens": body.get("max_tokens", 4096), + "stopSequences": body.get("stop", []), + }, + "additionalModelRequestFields": {"top_k": body.get("top_k", 200)} } if body.get("stream", False): return self.stream_response(model_id, payload) @@ -166,8 +171,6 @@ class Pipeline: } def stream_response(self, model_id: str, payload: dict) -> Generator: - if "additionalModelRequestFields" in payload: - del payload["additionalModelRequestFields"] streaming_response = self.bedrock_runtime.converse_stream(**payload) for chunk in streaming_response["stream"]: if "contentBlockDelta" in chunk: From c1bbbe11650b5cf10c7f5d4ceb6a893d93fee943 Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 16:33:44 +0900 Subject: [PATCH 3/6] Refactor AWS Bedrock Claude Pipeline to support Instance Profile and Task Role - Updated `Valves` class to use `Optional[str]` for AWS credentials. - Modified `__init__` method to initialize `valves` with environment variables. - Added `update_pipelines` method to handle Bedrock client initialization and model fetching. - Refactored `on_startup` and `on_valves_updated` methods to call `update_pipelines`. - Improved error handling in `update_pipelines` and `get_models` methods. --- .../providers/aws_bedrock_claude_pipeline.py | 98 ++++++++++--------- 1 file changed, 54 insertions(+), 44 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index 80edbd7..f347d77 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -12,7 +12,7 @@ import base64 import json import logging from io import BytesIO -from typing import List, Union, Generator, Iterator +from typing import List, Union, Generator, Iterator, Optional, Any import boto3 @@ -26,9 +26,9 @@ from utils.pipelines.main import pop_system_message class Pipeline: class Valves(BaseModel): - AWS_ACCESS_KEY: str = "" - AWS_SECRET_KEY: str = "" - AWS_REGION_NAME: str = "" + AWS_ACCESS_KEY: Optional[str] = None + AWS_SECRET_KEY: Optional[str] = None + AWS_REGION_NAME: Optional[str] = None def __init__(self): self.type = "manifold" @@ -47,21 +47,25 @@ class Pipeline: } ) - self.bedrock = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY, - aws_secret_access_key=self.valves.AWS_SECRET_KEY, - service_name="bedrock", - region_name=self.valves.AWS_REGION_NAME) - self.bedrock_runtime = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY, - aws_secret_access_key=self.valves.AWS_SECRET_KEY, - service_name="bedrock-runtime", - region_name=self.valves.AWS_REGION_NAME) + self.valves = self.Valves( + **{ + "AWS_ACCESS_KEY": os.getenv("AWS_ACCESS_KEY", ""), + "AWS_SECRET_KEY": os.getenv("AWS_SECRET_KEY", ""), + "AWS_REGION_NAME": os.getenv( + "AWS_REGION_NAME", os.getenv( + "AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "") + ) + ), + } + ) - self.pipelines = self.get_models() + self.update_pipelines() async def on_startup(self): # This function is called when the server is started. print(f"on_startup:{__name__}") + self.update_pipelines() pass async def on_shutdown(self): @@ -72,40 +76,46 @@ class Pipeline: async def on_valves_updated(self): # This function is called when the valves are updated. print(f"on_valves_updated:{__name__}") - self.bedrock = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY, - aws_secret_access_key=self.valves.AWS_SECRET_KEY, - service_name="bedrock", - region_name=self.valves.AWS_REGION_NAME) - self.bedrock_runtime = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY, - aws_secret_access_key=self.valves.AWS_SECRET_KEY, - service_name="bedrock-runtime", - region_name=self.valves.AWS_REGION_NAME) - self.pipelines = self.get_models() + self.update_pipelines() - def pipelines(self) -> List[dict]: - return self.get_models() + def update_pipelines(self) -> None: + try: + self.bedrock = boto3.client(service_name="bedrock", + aws_access_key_id=self.valves.AWS_ACCESS_KEY, + aws_secret_access_key=self.valves.AWS_SECRET_KEY, + region_name=self.valves.AWS_REGION_NAME) + self.bedrock_runtime = boto3.client(service_name="bedrock-runtime", + aws_access_key_id=self.valves.AWS_ACCESS_KEY, + aws_secret_access_key=self.valves.AWS_SECRET_KEY, + region_name=self.valves.AWS_REGION_NAME) + self.pipelines = self.get_models() + except Exception as e: + print(f"Error: {e}") + self.pipelines = [ + { + "id": "error", + "name": "Could not fetch models from Bedrock, please set up AWS Key/Secret or Instance/Task Role.", + }, + ] def get_models(self): - if self.valves.AWS_ACCESS_KEY and self.valves.AWS_SECRET_KEY: - try: - response = self.bedrock.list_foundation_models(byProvider='Anthropic', byInferenceType='ON_DEMAND') - return [ - { - "id": model["modelId"], - "name": model["modelName"], - } - for model in response["modelSummaries"] - ] - except Exception as e: - print(f"Error: {e}") - return [ - { - "id": "error", - "name": "Could not fetch models from Bedrock, please update the Access/Secret Key in the valves.", - }, - ] - else: - return [] + try: + response = self.bedrock.list_foundation_models(byProvider='Anthropic', byInferenceType='ON_DEMAND') + return [ + { + "id": model["modelId"], + "name": model["modelName"], + } + for model in response["modelSummaries"] + ] + except Exception as e: + print(f"Error: {e}") + return [ + { + "id": "error", + "name": "Could not fetch models from Bedrock, please check permissoin.", + }, + ] def pipe( self, user_message: str, model_id: str, messages: List[dict], body: dict From ecc44ebd1e0707db56af4e186cc71e4ead9bbced Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 16:48:12 +0900 Subject: [PATCH 4/6] Enhance get_models method to include models with INFERENCE_PROFILE type - Updated the get_models method to fetch models that support both ON_DEMAND and INFERENCE_PROFILE inference types. - Added a helper method getInferenceProfileId to retrieve the inference profile ID for models with INFERENCE_PROFILE type. - This change ensures that models with different inference types are correctly listed and available for use. --- .../providers/aws_bedrock_claude_pipeline.py | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index f347d77..245a046 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -100,14 +100,18 @@ class Pipeline: def get_models(self): try: - response = self.bedrock.list_foundation_models(byProvider='Anthropic', byInferenceType='ON_DEMAND') - return [ - { - "id": model["modelId"], - "name": model["modelName"], - } - for model in response["modelSummaries"] - ] + res = [] + response = self.bedrock.list_foundation_models(byProvider='Anthropic') + for model in response['modelSummaries']: + inference_types = model.get('inferenceTypesSupported', []) + if "ON_DEMAND" in inference_types: + res.append({'id': model['modelId'], 'name': model['modelName']}) + elif "INFERENCE_PROFILE" in inference_types: + inferenceProfileId = self.getInferenceProfileId(model['modelArn']) + if inferenceProfileId: + res.append({'id': inferenceProfileId, 'name': model['modelName']}) + + return res except Exception as e: print(f"Error: {e}") return [ @@ -117,6 +121,14 @@ class Pipeline: }, ] + def getInferenceProfileId(self, modelArn: str) -> str: + response = self.bedrock.list_inference_profiles() + for profile in response.get('inferenceProfileSummaries', []): + for model in profile.get('models', []): + if model.get('modelArn') == modelArn: + return profile['inferenceProfileId'] + return None + def pipe( self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: From 327062733a168ffbaa4dca9cbbd061fed29c6a30 Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 17:38:46 +0900 Subject: [PATCH 5/6] Add support for Claude 3.7 thinking mode - Implemented support for Claude 3.7 thinking mode by adding reasoning effort and budget tokens. - Added checks to ensure combined tokens do not exceed the maximum limit. - Adjusted inference configuration to accommodate thinking mode requirements. - Referenced implementation from https://github.com/open-webui/pipelines/blob/main/examples/pipelines/providers/anthropic_manifold_pipeline.py. --- .../providers/aws_bedrock_claude_pipeline.py | 66 ++++++++++++++++++- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index 245a046..ab5d937 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -23,6 +23,17 @@ import requests from utils.pipelines.main import pop_system_message +REASONING_EFFORT_BUDGET_TOKEN_MAP = { + "none": None, + "low": 1024, + "medium": 4096, + "high": 16384, + "max": 32768, +} + +# Maximum combined token limit for Claude 3.7 +MAX_COMBINED_TOKENS = 64000 + class Pipeline: class Valves(BaseModel): @@ -170,7 +181,44 @@ class Pipeline: }, "additionalModelRequestFields": {"top_k": body.get("top_k", 200)} } + if body.get("stream", False): + supports_thinking = "claude-3-7" in model_id + reasoning_effort = body.get("reasoning_effort", "none") + budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort) + + # Allow users to input an integer value representing budget tokens + if ( + not budget_tokens + and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys() + ): + try: + budget_tokens = int(reasoning_effort) + except ValueError as e: + print("Failed to convert reasoning effort to int", e) + budget_tokens = None + + if supports_thinking and budget_tokens: + # Check if the combined tokens (budget_tokens + max_tokens) exceeds the limit + max_tokens = payload.get("max_tokens", 4096) + combined_tokens = budget_tokens + max_tokens + + if combined_tokens > MAX_COMBINED_TOKENS: + error_message = f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {MAX_COMBINED_TOKENS}" + print(error_message) + return error_message + + payload["inferenceConfig"]["maxTokens"] = combined_tokens + payload["additionalModelRequestFields"]["thinking"] = { + "type": "enabled", + "budget_tokens": budget_tokens, + } + # Thinking requires temperature 1.0 and does not support top_p, top_k + payload["inferenceConfig"]["temperature"] = 1.0 + if "top_k" in payload["additionalModelRequestFields"]: + del payload["additionalModelRequestFields"]["top_k"] + if "topP" in payload["inferenceConfig"]: + del payload["inferenceConfig"]["topP"] return self.stream_response(model_id, payload) else: return self.get_completion(model_id, payload) @@ -194,11 +242,23 @@ class Pipeline: def stream_response(self, model_id: str, payload: dict) -> Generator: streaming_response = self.bedrock_runtime.converse_stream(**payload) + + in_resasoning_context = False for chunk in streaming_response["stream"]: - if "contentBlockDelta" in chunk: - yield chunk["contentBlockDelta"]["delta"]["text"] + if in_resasoning_context and "contentBlockStop" in chunk: + in_resasoning_context = False + yield "\n \n\n" + elif "contentBlockDelta" in chunk and "delta" in chunk["contentBlockDelta"]: + if "reasoningContent" in chunk["contentBlockDelta"]["delta"]: + if not in_resasoning_context: + yield "" + + in_resasoning_context = True + if "text" in chunk["contentBlockDelta"]["delta"]["reasoningContent"]: + yield chunk["contentBlockDelta"]["delta"]["reasoningContent"]["text"] + elif "text" in chunk["contentBlockDelta"]["delta"]: + yield chunk["contentBlockDelta"]["delta"]["text"] def get_completion(self, model_id: str, payload: dict) -> str: response = self.bedrock_runtime.converse(**payload) return response['output']['message']['content'][0]['text'] - From 488c43edd91176b08e3a1218692c3d82da6ffd85 Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 21:28:05 +0900 Subject: [PATCH 6/6] improve: enhance image format detection in process_image method - Add proper MIME type detection for both data URLs and HTTP requests - Extract media type from Content-Type header or MIME type - Make format detection more robust and generic - Remove hardcoded PNG/JPEG format assumptions --- .../providers/aws_bedrock_claude_pipeline.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index ab5d937..e6ad83f 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -227,17 +227,24 @@ class Pipeline: def process_image(self, image: str): img_stream = None - if image["url"].startswith("data:image"): - if ',' in image["url"]: - base64_string = image["url"].split(',')[1] - image_data = base64.b64decode(base64_string) + content_type = None + if image["url"].startswith("data:image"): + mime_type, base64_string = image["url"].split(",", 1) + content_type = mime_type.split(":")[1].split(";")[0] + image_data = base64.b64decode(base64_string) img_stream = BytesIO(image_data) else: - img_stream = requests.get(image["url"]).content + response = requests.get(image["url"]) + img_stream = BytesIO(response.content) + content_type = response.headers.get('Content-Type', 'image/jpeg') + + media_type = content_type.split('/')[-1] if '/' in content_type else content_type return { - "image": {"format": "png" if image["url"].endswith(".png") else "jpeg", - "source": {"bytes": img_stream.read()}} + "image": { + "format": media_type, + "source": {"bytes": img_stream.read()} + } } def stream_response(self, model_id: str, payload: dict) -> Generator: