From 48ddbec455de76fc43224daf3438537cd8fcde87 Mon Sep 17 00:00:00 2001 From: bartonzzx Date: Sat, 15 Mar 2025 23:02:38 +0800 Subject: [PATCH] Add langgraph integration example, also support thinking. --- .../integrations/langgraph_pipeline/README.md | 28 +++ .../langgraph_pipeline/langgraph_example.py | 166 ++++++++++++++++++ .../langgraph_stream_pipeline.py | 63 +++++++ .../langgraph_pipeline/requirements.txt | 40 +++++ 4 files changed, 297 insertions(+) create mode 100644 examples/pipelines/integrations/langgraph_pipeline/README.md create mode 100644 examples/pipelines/integrations/langgraph_pipeline/langgraph_example.py create mode 100644 examples/pipelines/integrations/langgraph_pipeline/langgraph_stream_pipeline.py create mode 100644 examples/pipelines/integrations/langgraph_pipeline/requirements.txt diff --git a/examples/pipelines/integrations/langgraph_pipeline/README.md b/examples/pipelines/integrations/langgraph_pipeline/README.md new file mode 100644 index 0000000..8d2cca6 --- /dev/null +++ b/examples/pipelines/integrations/langgraph_pipeline/README.md @@ -0,0 +1,28 @@ +# Example of langgraph integration +## Python version: 3.11 +## Feature +1. Using langgraph stream writer and custom mode of stream to integrate langgraph with open webui pipeline. +2. Support \ block display. +## Prerequirement +Install the open webui pipeline. +You can follow the docs : https://docs.openwebui.com/pipelines/#-quick-start-with-docker + +## Usage +### 1. Upload pipeline file +Upload `langgraph_stream_pipeline.py` to the open webui pipeline. + +### 2. Enable the uploaded pipeline +Properly set up your langgraph api url. + +And choose **"LangGraph stream"** as your model. + +### 2. Install dependencies +Under the folder `pipelines/examples/pipelines/integrations/langgraph_pipeline`, run command below : +``` +pip install -r requirements.txt +``` +### 3. Start langgraph api server +Run command below : +``` +uvicorn langgraph_example:app --reload +``` \ No newline at end of file diff --git a/examples/pipelines/integrations/langgraph_pipeline/langgraph_example.py b/examples/pipelines/integrations/langgraph_pipeline/langgraph_example.py new file mode 100644 index 0000000..6ae57a2 --- /dev/null +++ b/examples/pipelines/integrations/langgraph_pipeline/langgraph_example.py @@ -0,0 +1,166 @@ +""" +title: Langgraph stream integration +author: bartonzzx +author_url: https://github.com/bartonzzx +git_url: +description: Integrate langgraph with open webui pipeline +required_open_webui_version: 0.4.3 +requirements: none +version: 0.4.3 +licence: MIT +""" + + +import os +import json +import getpass +from typing import Annotated, Literal +from typing_extensions import TypedDict + +from fastapi import FastAPI +from fastapi.responses import StreamingResponse + +from langgraph.graph import StateGraph, START, END +from langgraph.graph.message import add_messages +from langchain_openai import ChatOpenAI +from langgraph.config import get_stream_writer + + +''' +Define LLM API key +''' +def _set_env(var: str): + if not os.environ.get(var): + os.environ[var] = getpass.getpass(f"{var}: ") + + +_set_env("OPENAI_API_KEY") + + +''' +Define Langgraph +''' +def generate_custom_stream(type: Literal["think","normal"], content: str): + content = "\n"+content+"\n" + custom_stream_writer = get_stream_writer() + return custom_stream_writer({type:content}) + +class State(TypedDict): + messages: Annotated[list, add_messages] + +llm = ChatOpenAI(model="gpt-3.5-turbo") + +def chatbot(state: State): + think_response = llm.invoke(["Please reasoning:"] + state["messages"]) + normal_response = llm.invoke(state["messages"]) + generate_custom_stream("think", think_response.content) + generate_custom_stream("normal", normal_response.content) + return {"messages": [normal_response]} + +# Define graph +graph_builder = StateGraph(State) + +# Define nodes +graph_builder.add_node("chatbot", chatbot) +graph_builder.add_edge("chatbot", END) + +# Define edges +graph_builder.add_edge(START, "chatbot") + +# Compile graph +graph = graph_builder.compile() + + +''' +Define api processing +''' +app = FastAPI( + title="Langgraph API", + description="Langgraph API", + ) + +@app.get("/test") +async def test(): + return {"message": "Hello World"} + + +@app.post("/stream") +async def stream(inputs: State): + async def event_stream(): + try: + stream_start_msg = { + 'choices': + [ + { + 'delta': {}, + 'finish_reason': None + } + ] + } + + # Stream start + yield f"data: {json.dumps(stream_start_msg)}\n\n" + + # Processing langgraph stream response with block support + async for event in graph.astream(input=inputs, stream_mode="custom"): + print(event) + think_content = event.get("think", None) + normal_content = event.get("normal", None) + + think_msg = { + 'choices': + [ + { + 'delta': + { + 'reasoning_content': think_content, + }, + 'finish_reason': None + } + ] + } + + normal_msg = { + 'choices': + [ + { + 'delta': + { + 'content': normal_content, + }, + 'finish_reason': None + } + ] + } + + yield f"data: {json.dumps(think_msg)}\n\n" + yield f"data: {json.dumps(normal_msg)}\n\n" + + # End of the stream + stream_end_msg = { + 'choices': [ + { + 'delta': {}, + 'finish_reason': 'stop' + } + ] + } + yield f"data: {json.dumps(stream_end_msg)}\n\n" + + except Exception as e: + # Simply print the error information + print(f"An error occurred: {e}") + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + } + ) + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=9000) \ No newline at end of file diff --git a/examples/pipelines/integrations/langgraph_pipeline/langgraph_stream_pipeline.py b/examples/pipelines/integrations/langgraph_pipeline/langgraph_stream_pipeline.py new file mode 100644 index 0000000..65da0df --- /dev/null +++ b/examples/pipelines/integrations/langgraph_pipeline/langgraph_stream_pipeline.py @@ -0,0 +1,63 @@ +""" +title: Langgraph stream integration +author: bartonzzx +author_url: https://github.com/bartonzzx +git_url: +description: Integrate langgraph with open webui pipeline +required_open_webui_version: 0.4.3 +requirements: none +version: 0.4.3 +licence: MIT +""" + + +import os +import requests +from pydantic import BaseModel, Field +from typing import List, Union, Generator, Iterator + + +class Pipeline: + class Valves(BaseModel): + API_URL: str = Field(default="http://127.0.0.1:9000/stream", description="Langgraph API URL") + + def __init__(self): + self.id = "LangGraph stream" + self.name = "LangGraph stream" + # Initialize valve paramaters + self.valves = self.Valves( + **{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()} + ) + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup: {__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is shutdown. + print(f"on_shutdown: {__name__}") + pass + + def pipe( + self, + user_message: str, + model_id: str, + messages: List[dict], + body: dict + ) -> Union[str, Generator, Iterator]: + + data = { + "messages": [[msg['role'], msg['content']] for msg in messages], + } + + headers = { + 'accept': 'text/event-stream', + 'Content-Type': 'application/json', + } + + response = requests.post(self.valves.API_URL, json=data, headers=headers, stream=True) + + response.raise_for_status() + + return response.iter_lines() \ No newline at end of file diff --git a/examples/pipelines/integrations/langgraph_pipeline/requirements.txt b/examples/pipelines/integrations/langgraph_pipeline/requirements.txt new file mode 100644 index 0000000..fc122d6 --- /dev/null +++ b/examples/pipelines/integrations/langgraph_pipeline/requirements.txt @@ -0,0 +1,40 @@ +annotated-types==0.7.0 +anyio==4.8.0 +certifi==2025.1.31 +charset-normalizer==3.4.1 +click==8.1.8 +distro==1.9.0 +fastapi==0.115.11 +h11==0.14.0 +httpcore==1.0.7 +httpx==0.28.1 +idna==3.10 +jiter==0.9.0 +jsonpatch==1.33 +jsonpointer==3.0.0 +langchain-core==0.3.45 +langchain-openai==0.3.8 +langgraph==0.3.11 +langgraph-checkpoint==2.0.20 +langgraph-prebuilt==0.1.3 +langgraph-sdk==0.1.57 +langsmith==0.3.15 +msgpack==1.1.0 +openai==1.66.3 +orjson==3.10.15 +packaging==24.2 +pydantic==2.10.6 +pydantic_core==2.27.2 +PyYAML==6.0.2 +regex==2024.11.6 +requests==2.32.3 +requests-toolbelt==1.0.0 +sniffio==1.3.1 +starlette==0.46.1 +tenacity==9.0.0 +tiktoken==0.9.0 +tqdm==4.67.1 +typing_extensions==4.12.2 +urllib3==2.3.0 +uvicorn==0.34.0 +zstandard==0.23.0