fix(flowise): add streaming capability to flowise integration

This commit is contained in:
Eric Z 2025-04-12 14:26:55 -05:00
parent 5e1f90dc49
commit cb5a16a5a4

View File

@ -5,7 +5,7 @@ author_url: https://anthropic.com
git_url: https://github.com/open-webui/pipelines/ git_url: https://github.com/open-webui/pipelines/
description: Access FlowiseAI endpoints with customizable flows description: Access FlowiseAI endpoints with customizable flows
required_open_webui_version: 0.4.3 required_open_webui_version: 0.4.3
requirements: requests requirements: requests,flowise>=1.0.4
version: 0.4.3 version: 0.4.3
licence: MIT licence: MIT
""" """
@ -18,6 +18,7 @@ import re
import json import json
from datetime import datetime from datetime import datetime
import time import time
from flowise import Flowise, PredictionData
from logging import getLogger from logging import getLogger
logger = getLogger(__name__) logger = getLogger(__name__)
@ -26,29 +27,51 @@ logger.setLevel("DEBUG")
class Pipeline: class Pipeline:
class Valves(BaseModel): class Valves(BaseModel):
API_KEY: str = Field(default="", description="FlowiseAI API key") FLOWISE_API_KEY: str = Field(default="", description="FlowiseAI API key (from Bearer key, e.g. QMknVTFTB40Pk23n6KIVRgdB7va2o-Xlx73zEfpeOu0)")
API_URL: str = Field(default="", description="FlowiseAI base URL") FLOWISE_BASE_URL: str = Field(default="", description="FlowiseAI base URL (e.g. http://localhost:3000 (URL before '/api/v1/prediction'))")
RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline") RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline (ops/minute)")
FLOW_0_ENABLED: Optional[bool] = Field(default=False, description="Flow 0 Enabled (make this flow available for use)") FLOW_0_ENABLED: Optional[bool] = Field(default=False, description="Flow 0 Enabled (make this flow available for use)")
FLOW_0_ID: Optional[str] = Field(default=None, description="Flow 0 ID (the FlowiseAI flow identifier)") FLOW_0_ID: Optional[str] = Field(default=None, description="Flow 0 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_0_NAME: Optional[str] = Field(default=None, description="Flow 0 Name (human-readable name for the flow)") FLOW_0_NAME: Optional[str] = Field(default=None, description="Flow 0 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
FLOW_1_ENABLED: Optional[bool] = Field(default=False, description="Flow 1 Enabled (make this flow available for use)") FLOW_1_ENABLED: Optional[bool] = Field(default=False, description="Flow 1 Enabled (make this flow available for use)")
FLOW_1_ID: Optional[str] = Field(default=None, description="Flow 1 ID (the FlowiseAI flow identifier)") FLOW_1_ID: Optional[str] = Field(default=None, description="Flow 1 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_1_NAME: Optional[str] = Field(default=None, description="Flow 1 Name (human-readable name for the flow)") FLOW_1_NAME: Optional[str] = Field(default=None, description="Flow 1 Name (human-readable flwo name, no special characters, e.g. news or stock-reader)")
FLOW_2_ENABLED: Optional[bool] = Field(default=False, description="Flow 2 Enabled (make this flow available for use)") FLOW_2_ENABLED: Optional[bool] = Field(default=False, description="Flow 2 Enabled (make this flow available for use)")
FLOW_2_ID: Optional[str] = Field(default=None, description="Flow 2 ID (the FlowiseAI flow identifier)") FLOW_2_ID: Optional[str] = Field(default=None, description="Flow 2 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_2_NAME: Optional[str] = Field(default=None, description="Flow 2 Name (human-readable name for the flow)") FLOW_2_NAME: Optional[str] = Field(default=None, description="Flow 2 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
FLOW_3_ENABLED: Optional[bool] = Field(default=False, description="Flow 3 Enabled (make this flow available for use)") FLOW_3_ENABLED: Optional[bool] = Field(default=False, description="Flow 3 Enabled (make this flow available for use)")
FLOW_3_ID: Optional[str] = Field(default=None, description="Flow 3 ID (the FlowiseAI flow identifier)") FLOW_3_ID: Optional[str] = Field(default=None, description="Flow 3 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_3_NAME: Optional[str] = Field(default=None, description="Flow 3 Name (human-readable name for the flow)") FLOW_3_NAME: Optional[str] = Field(default=None, description="Flow 3 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
FLOW_4_ENABLED: Optional[bool] = Field(default=False, description="Flow 4 Enabled (make this flow available for use)") FLOW_4_ENABLED: Optional[bool] = Field(default=False, description="Flow 4 Enabled (make this flow available for use)")
FLOW_4_ID: Optional[str] = Field(default=None, description="Flow 4 ID (the FlowiseAI flow identifier)") FLOW_4_ID: Optional[str] = Field(default=None, description="Flow 4 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_4_NAME: Optional[str] = Field(default=None, description="Flow 4 Name (human-readable name for the flow)") FLOW_4_NAME: Optional[str] = Field(default=None, description="Flow 4 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
FLOW_5_ENABLED: Optional[bool] = Field(default=False, description="Flow 5 Enabled (make this flow available for use)")
FLOW_5_ID: Optional[str] = Field(default=None, description="Flow 5 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_5_NAME: Optional[str] = Field(default=None, description="Flow 5 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
FLOW_6_ENABLED: Optional[bool] = Field(default=False, description="Flow 6 Enabled (make this flow available for use)")
FLOW_6_ID: Optional[str] = Field(default=None, description="Flow 6 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_6_NAME: Optional[str] = Field(default=None, description="Flow 6 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
FLOW_7_ENABLED: Optional[bool] = Field(default=False, description="Flow 7 Enabled (make this flow available for use)")
FLOW_7_ID: Optional[str] = Field(default=None, description="Flow 7 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_7_NAME: Optional[str] = Field(default=None, description="Flow 7 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
FLOW_8_ENABLED: Optional[bool] = Field(default=False, description="Flow 8 Enabled (make this flow available for use)")
FLOW_8_ID: Optional[str] = Field(default=None, description="Flow 8 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_8_NAME: Optional[str] = Field(default=None, description="Flow 8 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
FLOW_9_ENABLED: Optional[bool] = Field(default=False, description="Flow 9 Enabled (make this flow available for use)")
FLOW_9_ID: Optional[str] = Field(default=None, description="Flow 9 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
FLOW_9_NAME: Optional[str] = Field(default=None, description="Flow 9 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
def __init__(self): def __init__(self):
self.name = "FlowiseAI Pipeline" self.name = "FlowiseAI Pipeline"
@ -122,8 +145,8 @@ class Pipeline:
Returns: Returns:
tuple[str, str]: Flow name and query tuple[str, str]: Flow name and query
""" """
# Match pattern @flow_name: query # Match pattern flow_name: query
pattern = r"^@([^:]+):\s*(.+)$" pattern = r"^([^:]+):\s*(.+)$"
match = re.match(pattern, user_message.strip()) match = re.match(pattern, user_message.strip())
if not match: if not match:
@ -132,6 +155,10 @@ class Pipeline:
flow_name = match.group(1).strip().lower() flow_name = match.group(1).strip().lower()
query = match.group(2).strip() query = match.group(2).strip()
date_now = datetime.now().strftime("%Y-%m-%d")
time_now = datetime.now().strftime("%H:%M:%S")
query = f"{query}; today's date is {date_now} and the current time is {time_now}"
return flow_name, query return flow_name, query
def pipe( def pipe(
@ -155,8 +182,8 @@ class Pipeline:
context = "" context = ""
# Check if we have valid API configuration # Check if we have valid API configuration
if not self.valves.API_KEY or not self.valves.API_URL: if not self.valves.FLOWISE_API_KEY or not self.valves.FLOWISE_BASE_URL:
error_msg = "FlowiseAI configuration missing. Please set API_KEY and API_URL valves." error_msg = "FlowiseAI configuration missing. Please set FLOWISE_API_KEY and FLOWISE_BASE_URL valves."
if streaming: if streaming:
yield error_msg yield error_msg
else: else:
@ -166,7 +193,7 @@ class Pipeline:
flow_name, query = self.parse_user_input(user_message) flow_name, query = self.parse_user_input(user_message)
# If no flow specified or invalid flow, list available flows # If no flow specified or invalid flow, list available flows
if not flow_name or flow_name not in self.flows: if flow_name is None or flow_name not in self.flows:
available_flows = list(self.flows.keys()) available_flows = list(self.flows.keys())
if not available_flows: if not available_flows:
no_flows_msg = "No flows configured. Enable at least one FLOW_X_ENABLED valve and set its ID and NAME." no_flows_msg = "No flows configured. Enable at least one FLOW_X_ENABLED valve and set its ID and NAME."
@ -175,16 +202,17 @@ class Pipeline:
else: else:
return no_flows_msg return no_flows_msg
flows_list = "\n".join([f"- @{flow}" for flow in available_flows]) flows_list = "\n".join([f"- {flow}" for flow in available_flows])
help_msg = f"Please specify a flow using the format: @flow_name: your query\n\nAvailable flows:\n{flows_list}" help_msg = f"Please specify a flow using the format: flow_name: your query\n\nAvailable flows:\n{flows_list}"
if not flow_name: if flow_name is None:
help_msg = "No flow specified. " + help_msg help_msg = "No flow specified. " + help_msg
else: else:
help_msg = f"Invalid flow '{flow_name}'. " + help_msg help_msg = f"Invalid flow '{flow_name}'. " + help_msg
if streaming: if streaming:
yield help_msg yield help_msg
return
else: else:
return help_msg return help_msg
@ -194,7 +222,7 @@ class Pipeline:
if streaming: if streaming:
yield from self.stream_retrieve(flow_id, flow_name, query, dt_start) yield from self.stream_retrieve(flow_id, flow_name, query, dt_start)
else: else:
for chunk in self.stream_retrieve(flow_id, flow_name, query, dt_start): for chunk in self.static_retrieve(flow_id, flow_name, query, dt_start):
context += chunk context += chunk
return context if context else "No response from FlowiseAI" return context if context else "No response from FlowiseAI"
@ -202,7 +230,7 @@ class Pipeline:
self, flow_id: str, flow_name: str, query: str, dt_start: datetime self, flow_id: str, flow_name: str, query: str, dt_start: datetime
) -> Generator: ) -> Generator:
""" """
Call the FlowiseAI endpoint with the specified flow ID and query. Stream responses from FlowiseAI using the official client library.
Args: Args:
flow_id (str): The ID of the flow to call flow_id (str): The ID of the flow to call
@ -217,8 +245,100 @@ class Pipeline:
yield "Query is empty. Please provide a question or prompt for the flow." yield "Query is empty. Please provide a question or prompt for the flow."
return return
api_url = f"{self.valves.API_URL.rstrip('/')}/api/v1/prediction/{flow_id}" try:
headers = {"Authorization": f"Bearer {self.valves.API_KEY}"} logger.info(f"Streaming from FlowiseAI flow '{flow_name}' with query: {query}")
# Rate limiting check
self.rate_check(dt_start)
# Initialize Flowise client with API configuration
client = Flowise(
base_url=self.valves.FLOWISE_BASE_URL.rstrip('/'),
api_key=self.valves.FLOWISE_API_KEY
)
# Create streaming prediction request
completion = client.create_prediction(
PredictionData(
chatflowId=flow_id,
question=query,
streaming=True
)
)
except Exception as e:
error_msg = f"Error streaming from FlowiseAI: {str(e)}"
logger.error(error_msg)
yield error_msg
idx_last_update = 0
yield f"Analysis started... {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
# Process each streamed chunk
for chunk in completion:
try:
if isinstance(chunk, str):
chunk = json.loads(chunk)
except Exception as e:
# If chunk is not a string, it's already a dictionary
pass
try:
if isinstance(chunk, dict):
# Expected format: {event: "token", data: "content"}
if "event" in chunk:
if ((chunk["event"] in ["start", "update", "agentReasoning"]) and
("data" in chunk) and (isinstance(chunk["data"], list))):
for data_update in chunk["data"][idx_last_update:]:
# e.g. {"event":"start","data":[{"agentName":"Perspective Explorer","messages":["...
idx_last_update += 1
yield "\n---\n"
yield f"\n__Reasoning: {data_update['agentName']} ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})__\n\n"
for message in data_update["messages"]:
yield message # yield message for each agent update
elif chunk["event"] == "end":
# {"event":"end","data":"[DONE]"}
yield "\n---\n"
yield f"\nAnalysis complete. ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})\n\n"
elif chunk["event"] == "token":
# do nothing, this is the flat output of the flow (final)
pass
elif "error" in chunk:
error_msg = f"Error from FlowiseAI: {chunk['error']}"
logger.error(error_msg)
yield error_msg
else:
# If chunk format is unexpected, yield as is
yield str(chunk)
except Exception as e:
logger.error(f"Error processing chunk: {str(e)}")
yield f"\nUnusual Response Chunk: ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})\n{str(e)}\n"
yield f"\n---\n"
yield str(chunk)
return
def static_retrieve(
self, flow_id: str, flow_name: str, query: str, dt_start: datetime
) -> Generator:
"""
Call the FlowiseAI endpoint with the specified flow ID and query using REST API.
Args:
flow_id (str): The ID of the flow to call
flow_name (str): The name of the flow (for logging)
query (str): The user's query
dt_start (datetime): Start time for rate limiting
Returns:
Generator: Response chunks for non-streaming requests
"""
if not query:
yield "Query is empty. Please provide a question or prompt for the flow."
return
api_url = f"{self.valves.FLOWISE_BASE_URL.rstrip('/')}/api/v1/prediction/{flow_id}"
headers = {"Authorization": f"Bearer {self.valves.FLOWISE_API_KEY}"}
payload = { payload = {
"question": query, "question": query,