Add langgraph integration example, also support thinking.

This commit is contained in:
bartonzzx 2025-03-15 23:02:38 +08:00
parent f89ab37f53
commit 48ddbec455
4 changed files with 297 additions and 0 deletions

View File

@ -0,0 +1,28 @@
# Example of langgraph integration
## Python version: 3.11
## Feature
1. Using langgraph stream writer and custom mode of stream to integrate langgraph with open webui pipeline.
2. Support \<think\> block display.
## Prerequirement
Install the open webui pipeline.
You can follow the docs : https://docs.openwebui.com/pipelines/#-quick-start-with-docker
## Usage
### 1. Upload pipeline file
Upload `langgraph_stream_pipeline.py` to the open webui pipeline.
### 2. Enable the uploaded pipeline
Properly set up your langgraph api url.
And choose **"LangGraph stream"** as your model.
### 2. Install dependencies
Under the folder `pipelines/examples/pipelines/integrations/langgraph_pipeline`, run command below :
```
pip install -r requirements.txt
```
### 3. Start langgraph api server
Run command below :
```
uvicorn langgraph_example:app --reload
```

View File

@ -0,0 +1,166 @@
"""
title: Langgraph stream integration
author: bartonzzx
author_url: https://github.com/bartonzzx
git_url:
description: Integrate langgraph with open webui pipeline
required_open_webui_version: 0.4.3
requirements: none
version: 0.4.3
licence: MIT
"""
import os
import json
import getpass
from typing import Annotated, Literal
from typing_extensions import TypedDict
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langgraph.config import get_stream_writer
'''
Define LLM API key
'''
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
'''
Define Langgraph
'''
def generate_custom_stream(type: Literal["think","normal"], content: str):
content = "\n"+content+"\n"
custom_stream_writer = get_stream_writer()
return custom_stream_writer({type:content})
class State(TypedDict):
messages: Annotated[list, add_messages]
llm = ChatOpenAI(model="gpt-3.5-turbo")
def chatbot(state: State):
think_response = llm.invoke(["Please reasoning:"] + state["messages"])
normal_response = llm.invoke(state["messages"])
generate_custom_stream("think", think_response.content)
generate_custom_stream("normal", normal_response.content)
return {"messages": [normal_response]}
# Define graph
graph_builder = StateGraph(State)
# Define nodes
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge("chatbot", END)
# Define edges
graph_builder.add_edge(START, "chatbot")
# Compile graph
graph = graph_builder.compile()
'''
Define api processing
'''
app = FastAPI(
title="Langgraph API",
description="Langgraph API",
)
@app.get("/test")
async def test():
return {"message": "Hello World"}
@app.post("/stream")
async def stream(inputs: State):
async def event_stream():
try:
stream_start_msg = {
'choices':
[
{
'delta': {},
'finish_reason': None
}
]
}
# Stream start
yield f"data: {json.dumps(stream_start_msg)}\n\n"
# Processing langgraph stream response with <think> block support
async for event in graph.astream(input=inputs, stream_mode="custom"):
print(event)
think_content = event.get("think", None)
normal_content = event.get("normal", None)
think_msg = {
'choices':
[
{
'delta':
{
'reasoning_content': think_content,
},
'finish_reason': None
}
]
}
normal_msg = {
'choices':
[
{
'delta':
{
'content': normal_content,
},
'finish_reason': None
}
]
}
yield f"data: {json.dumps(think_msg)}\n\n"
yield f"data: {json.dumps(normal_msg)}\n\n"
# End of the stream
stream_end_msg = {
'choices': [
{
'delta': {},
'finish_reason': 'stop'
}
]
}
yield f"data: {json.dumps(stream_end_msg)}\n\n"
except Exception as e:
# Simply print the error information
print(f"An error occurred: {e}")
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=9000)

View File

@ -0,0 +1,63 @@
"""
title: Langgraph stream integration
author: bartonzzx
author_url: https://github.com/bartonzzx
git_url:
description: Integrate langgraph with open webui pipeline
required_open_webui_version: 0.4.3
requirements: none
version: 0.4.3
licence: MIT
"""
import os
import requests
from pydantic import BaseModel, Field
from typing import List, Union, Generator, Iterator
class Pipeline:
class Valves(BaseModel):
API_URL: str = Field(default="http://127.0.0.1:9000/stream", description="Langgraph API URL")
def __init__(self):
self.id = "LangGraph stream"
self.name = "LangGraph stream"
# Initialize valve paramaters
self.valves = self.Valves(
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
)
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup: {__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is shutdown.
print(f"on_shutdown: {__name__}")
pass
def pipe(
self,
user_message: str,
model_id: str,
messages: List[dict],
body: dict
) -> Union[str, Generator, Iterator]:
data = {
"messages": [[msg['role'], msg['content']] for msg in messages],
}
headers = {
'accept': 'text/event-stream',
'Content-Type': 'application/json',
}
response = requests.post(self.valves.API_URL, json=data, headers=headers, stream=True)
response.raise_for_status()
return response.iter_lines()

View File

@ -0,0 +1,40 @@
annotated-types==0.7.0
anyio==4.8.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.1.8
distro==1.9.0
fastapi==0.115.11
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
idna==3.10
jiter==0.9.0
jsonpatch==1.33
jsonpointer==3.0.0
langchain-core==0.3.45
langchain-openai==0.3.8
langgraph==0.3.11
langgraph-checkpoint==2.0.20
langgraph-prebuilt==0.1.3
langgraph-sdk==0.1.57
langsmith==0.3.15
msgpack==1.1.0
openai==1.66.3
orjson==3.10.15
packaging==24.2
pydantic==2.10.6
pydantic_core==2.27.2
PyYAML==6.0.2
regex==2024.11.6
requests==2.32.3
requests-toolbelt==1.0.0
sniffio==1.3.1
starlette==0.46.1
tenacity==9.0.0
tiktoken==0.9.0
tqdm==4.67.1
typing_extensions==4.12.2
urllib3==2.3.0
uvicorn==0.34.0
zstandard==0.23.0