mirror of
https://github.com/open-webui/pipelines
synced 2025-05-12 00:20:48 +00:00
Merge pull request #467 from bartonzzx/langgraph-integration
Add langgraph integration example, also support thinking.
This commit is contained in:
commit
99cfc6bdb9
28
examples/pipelines/integrations/langgraph_pipeline/README.md
Normal file
28
examples/pipelines/integrations/langgraph_pipeline/README.md
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
# Example of langgraph integration
|
||||||
|
## Python version: 3.11
|
||||||
|
## Feature
|
||||||
|
1. Using langgraph stream writer and custom mode of stream to integrate langgraph with open webui pipeline.
|
||||||
|
2. Support \<think\> block display.
|
||||||
|
## Prerequirement
|
||||||
|
Install the open webui pipeline.
|
||||||
|
You can follow the docs : https://docs.openwebui.com/pipelines/#-quick-start-with-docker
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
### 1. Upload pipeline file
|
||||||
|
Upload `langgraph_stream_pipeline.py` to the open webui pipeline.
|
||||||
|
|
||||||
|
### 2. Enable the uploaded pipeline
|
||||||
|
Properly set up your langgraph api url.
|
||||||
|
|
||||||
|
And choose **"LangGraph stream"** as your model.
|
||||||
|
|
||||||
|
### 2. Install dependencies
|
||||||
|
Under the folder `pipelines/examples/pipelines/integrations/langgraph_pipeline`, run command below :
|
||||||
|
```
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
### 3. Start langgraph api server
|
||||||
|
Run command below :
|
||||||
|
```
|
||||||
|
uvicorn langgraph_example:app --reload
|
||||||
|
```
|
@ -0,0 +1,166 @@
|
|||||||
|
"""
|
||||||
|
title: Langgraph stream integration
|
||||||
|
author: bartonzzx
|
||||||
|
author_url: https://github.com/bartonzzx
|
||||||
|
git_url:
|
||||||
|
description: Integrate langgraph with open webui pipeline
|
||||||
|
required_open_webui_version: 0.4.3
|
||||||
|
requirements: none
|
||||||
|
version: 0.4.3
|
||||||
|
licence: MIT
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import getpass
|
||||||
|
from typing import Annotated, Literal
|
||||||
|
from typing_extensions import TypedDict
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
|
|
||||||
|
from langgraph.graph import StateGraph, START, END
|
||||||
|
from langgraph.graph.message import add_messages
|
||||||
|
from langchain_openai import ChatOpenAI
|
||||||
|
from langgraph.config import get_stream_writer
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
Define LLM API key
|
||||||
|
'''
|
||||||
|
def _set_env(var: str):
|
||||||
|
if not os.environ.get(var):
|
||||||
|
os.environ[var] = getpass.getpass(f"{var}: ")
|
||||||
|
|
||||||
|
|
||||||
|
_set_env("OPENAI_API_KEY")
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
Define Langgraph
|
||||||
|
'''
|
||||||
|
def generate_custom_stream(type: Literal["think","normal"], content: str):
|
||||||
|
content = "\n"+content+"\n"
|
||||||
|
custom_stream_writer = get_stream_writer()
|
||||||
|
return custom_stream_writer({type:content})
|
||||||
|
|
||||||
|
class State(TypedDict):
|
||||||
|
messages: Annotated[list, add_messages]
|
||||||
|
|
||||||
|
llm = ChatOpenAI(model="gpt-3.5-turbo")
|
||||||
|
|
||||||
|
def chatbot(state: State):
|
||||||
|
think_response = llm.invoke(["Please reasoning:"] + state["messages"])
|
||||||
|
normal_response = llm.invoke(state["messages"])
|
||||||
|
generate_custom_stream("think", think_response.content)
|
||||||
|
generate_custom_stream("normal", normal_response.content)
|
||||||
|
return {"messages": [normal_response]}
|
||||||
|
|
||||||
|
# Define graph
|
||||||
|
graph_builder = StateGraph(State)
|
||||||
|
|
||||||
|
# Define nodes
|
||||||
|
graph_builder.add_node("chatbot", chatbot)
|
||||||
|
graph_builder.add_edge("chatbot", END)
|
||||||
|
|
||||||
|
# Define edges
|
||||||
|
graph_builder.add_edge(START, "chatbot")
|
||||||
|
|
||||||
|
# Compile graph
|
||||||
|
graph = graph_builder.compile()
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
Define api processing
|
||||||
|
'''
|
||||||
|
app = FastAPI(
|
||||||
|
title="Langgraph API",
|
||||||
|
description="Langgraph API",
|
||||||
|
)
|
||||||
|
|
||||||
|
@app.get("/test")
|
||||||
|
async def test():
|
||||||
|
return {"message": "Hello World"}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/stream")
|
||||||
|
async def stream(inputs: State):
|
||||||
|
async def event_stream():
|
||||||
|
try:
|
||||||
|
stream_start_msg = {
|
||||||
|
'choices':
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'delta': {},
|
||||||
|
'finish_reason': None
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
# Stream start
|
||||||
|
yield f"data: {json.dumps(stream_start_msg)}\n\n"
|
||||||
|
|
||||||
|
# Processing langgraph stream response with <think> block support
|
||||||
|
async for event in graph.astream(input=inputs, stream_mode="custom"):
|
||||||
|
print(event)
|
||||||
|
think_content = event.get("think", None)
|
||||||
|
normal_content = event.get("normal", None)
|
||||||
|
|
||||||
|
think_msg = {
|
||||||
|
'choices':
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'delta':
|
||||||
|
{
|
||||||
|
'reasoning_content': think_content,
|
||||||
|
},
|
||||||
|
'finish_reason': None
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
normal_msg = {
|
||||||
|
'choices':
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'delta':
|
||||||
|
{
|
||||||
|
'content': normal_content,
|
||||||
|
},
|
||||||
|
'finish_reason': None
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
yield f"data: {json.dumps(think_msg)}\n\n"
|
||||||
|
yield f"data: {json.dumps(normal_msg)}\n\n"
|
||||||
|
|
||||||
|
# End of the stream
|
||||||
|
stream_end_msg = {
|
||||||
|
'choices': [
|
||||||
|
{
|
||||||
|
'delta': {},
|
||||||
|
'finish_reason': 'stop'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
yield f"data: {json.dumps(stream_end_msg)}\n\n"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# Simply print the error information
|
||||||
|
print(f"An error occurred: {e}")
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_stream(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
|
uvicorn.run(app, host="0.0.0.0", port=9000)
|
@ -0,0 +1,63 @@
|
|||||||
|
"""
|
||||||
|
title: Langgraph stream integration
|
||||||
|
author: bartonzzx
|
||||||
|
author_url: https://github.com/bartonzzx
|
||||||
|
git_url:
|
||||||
|
description: Integrate langgraph with open webui pipeline
|
||||||
|
required_open_webui_version: 0.4.3
|
||||||
|
requirements: none
|
||||||
|
version: 0.4.3
|
||||||
|
licence: MIT
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
from typing import List, Union, Generator, Iterator
|
||||||
|
|
||||||
|
|
||||||
|
class Pipeline:
|
||||||
|
class Valves(BaseModel):
|
||||||
|
API_URL: str = Field(default="http://127.0.0.1:9000/stream", description="Langgraph API URL")
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.id = "LangGraph stream"
|
||||||
|
self.name = "LangGraph stream"
|
||||||
|
# Initialize valve paramaters
|
||||||
|
self.valves = self.Valves(
|
||||||
|
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
|
||||||
|
)
|
||||||
|
|
||||||
|
async def on_startup(self):
|
||||||
|
# This function is called when the server is started.
|
||||||
|
print(f"on_startup: {__name__}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def on_shutdown(self):
|
||||||
|
# This function is called when the server is shutdown.
|
||||||
|
print(f"on_shutdown: {__name__}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
def pipe(
|
||||||
|
self,
|
||||||
|
user_message: str,
|
||||||
|
model_id: str,
|
||||||
|
messages: List[dict],
|
||||||
|
body: dict
|
||||||
|
) -> Union[str, Generator, Iterator]:
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"messages": [[msg['role'], msg['content']] for msg in messages],
|
||||||
|
}
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
'accept': 'text/event-stream',
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.post(self.valves.API_URL, json=data, headers=headers, stream=True)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
return response.iter_lines()
|
@ -0,0 +1,40 @@
|
|||||||
|
annotated-types==0.7.0
|
||||||
|
anyio==4.8.0
|
||||||
|
certifi==2025.1.31
|
||||||
|
charset-normalizer==3.4.1
|
||||||
|
click==8.1.8
|
||||||
|
distro==1.9.0
|
||||||
|
fastapi==0.115.11
|
||||||
|
h11==0.14.0
|
||||||
|
httpcore==1.0.7
|
||||||
|
httpx==0.28.1
|
||||||
|
idna==3.10
|
||||||
|
jiter==0.9.0
|
||||||
|
jsonpatch==1.33
|
||||||
|
jsonpointer==3.0.0
|
||||||
|
langchain-core==0.3.45
|
||||||
|
langchain-openai==0.3.8
|
||||||
|
langgraph==0.3.11
|
||||||
|
langgraph-checkpoint==2.0.20
|
||||||
|
langgraph-prebuilt==0.1.3
|
||||||
|
langgraph-sdk==0.1.57
|
||||||
|
langsmith==0.3.15
|
||||||
|
msgpack==1.1.0
|
||||||
|
openai==1.66.3
|
||||||
|
orjson==3.10.15
|
||||||
|
packaging==24.2
|
||||||
|
pydantic==2.10.6
|
||||||
|
pydantic_core==2.27.2
|
||||||
|
PyYAML==6.0.2
|
||||||
|
regex==2024.11.6
|
||||||
|
requests==2.32.3
|
||||||
|
requests-toolbelt==1.0.0
|
||||||
|
sniffio==1.3.1
|
||||||
|
starlette==0.46.1
|
||||||
|
tenacity==9.0.0
|
||||||
|
tiktoken==0.9.0
|
||||||
|
tqdm==4.67.1
|
||||||
|
typing_extensions==4.12.2
|
||||||
|
urllib3==2.3.0
|
||||||
|
uvicorn==0.34.0
|
||||||
|
zstandard==0.23.0
|
Loading…
Reference in New Issue
Block a user