Merge branch 'main' into add-minimum-docker-build

This commit is contained in:
Feng Wang 2024-10-23 19:32:17 +08:00
commit 34a14826f6
13 changed files with 788 additions and 459 deletions

View File

@ -0,0 +1,120 @@
name: Build Docker Image
on:
workflow_call:
inputs:
build_args:
required: false
default: ""
type: string
cache_id:
required: true
type: string
extract_flavor:
required: false
default: ""
type: string
image_name:
required: true
type: string
image_tag:
required: false
default: ""
type: string
registry:
required: false
default: ghcr.io
type: string
env:
FULL_IMAGE_NAME: ${{ inputs.registry }}/${{ inputs.image_name }}
jobs:
build-image:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
strategy:
fail-fast: false
matrix:
platform:
- linux/amd64
- linux/arm64
steps:
- name: Prepare
run: |
platform=${{ matrix.platform }}
echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ inputs.registry }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata for Docker images
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=tag
type=sha,prefix=git-
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
${{ inputs.image_tag }}
flavor: |
latest=${{ github.ref == 'refs/heads/main' }}
${{ inputs.extract_flavor }}
- name: Extract metadata for Docker cache
id: cache-meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
flavor: |
prefix=cache-${{ inputs.cache_id }}-${{ matrix.platform }}-
- name: Build Docker image
uses: docker/build-push-action@v5
id: build
with:
context: .
push: true
platforms: ${{ matrix.platform }}
labels: ${{ steps.meta.outputs.labels }}
outputs: type=image,name=${{ env.FULL_IMAGE_NAME }},push-by-digest=true,name-canonical=true,push=true
cache-from: type=registry,ref=${{ steps.cache-meta.outputs.tags }}
cache-to: type=registry,ref=${{ steps.cache-meta.outputs.tags }},mode=max
build-args: |
BUILD_HASH=${{ github.sha }}
${{ inputs.build_args }}
- name: Export digest
run: |
mkdir -p /tmp/digests
digest="${{ steps.build.outputs.digest }}"
touch "/tmp/digests/${digest#sha256:}"
- name: Upload digest
uses: actions/upload-artifact@v4
with:
name: digests-${{ inputs.cache_id }}-${{ env.PLATFORM_PAIR }}
path: /tmp/digests/*
if-no-files-found: error
retention-days: 1

View File

@ -1,409 +1,60 @@
name: Create and publish Docker images with specific build args
on:
workflow_dispatch:
push:
branches:
- main
- dev
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
FULL_IMAGE_NAME: ghcr.io/${{ github.repository }}
workflow_dispatch:
push:
branches:
- main
- dev
jobs:
build-main-image:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
strategy:
fail-fast: false
matrix:
platform:
- linux/amd64
- linux/arm64
steps:
- name: Prepare
run: |
platform=${{ matrix.platform }}
echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
build-main-image:
uses: ./.github/workflows/build-docker-image.yaml
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
image_name: ${{ github.repository }}
cache_id: main
- name: Extract metadata for Docker images (default latest tag)
id: meta
uses: docker/metadata-action@v5
build-cuda-image:
uses: ./.github/workflows/build-docker-image.yaml
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=tag
type=sha,prefix=git-
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
flavor: |
latest=${{ github.ref == 'refs/heads/main' }}
- name: Extract metadata for Docker cache
id: cache-meta
uses: docker/metadata-action@v5
image_name: ${{ github.repository }}
cache_id: cuda
image_tag: type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=cuda
extract_flavor: suffix=-cuda,onlatest=true
build_args: |
USE_CUDA=true
build-minimum-image:
uses: ./.github/workflows/build-docker-image.yaml
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
flavor: |
prefix=cache-${{ matrix.platform }}-
image_name: ${{ github.repository }}
cache_id: minimum
image_tag: type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=minimum
extract_flavor: suffix=-minimum,onlatest=true
build_args: |
MINIMUM_BUILD=true
- name: Build Docker image (latest)
uses: docker/build-push-action@v5
id: build
merge-main-images:
uses: ./.github/workflows/merge-docker-images.yaml
needs: [build-main-image]
with:
context: .
push: true
platforms: ${{ matrix.platform }}
labels: ${{ steps.meta.outputs.labels }}
outputs: type=image,name=${{ env.FULL_IMAGE_NAME }},push-by-digest=true,name-canonical=true,push=true
cache-from: type=registry,ref=${{ steps.cache-meta.outputs.tags }}
cache-to: type=registry,ref=${{ steps.cache-meta.outputs.tags }},mode=max
build-args: |
BUILD_HASH=${{ github.sha }}
image_name: ${{ github.repository }}
cache_id: main
- name: Export digest
run: |
mkdir -p /tmp/digests
digest="${{ steps.build.outputs.digest }}"
touch "/tmp/digests/${digest#sha256:}"
- name: Upload digest
uses: actions/upload-artifact@v4
merge-cuda-images:
uses: ./.github/workflows/merge-docker-images.yaml
needs: [build-cuda-image]
with:
name: digests-main-${{ env.PLATFORM_PAIR }}
path: /tmp/digests/*
if-no-files-found: error
retention-days: 1
image_name: ${{ github.repository }}
cache_id: cuda
extract_flavor: suffix=-cuda,onlatest=true
extract_tags: type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=cuda
build-cuda-image:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
strategy:
fail-fast: false
matrix:
platform:
- linux/amd64
- linux/arm64
steps:
- name: Prepare
run: |
platform=${{ matrix.platform }}
echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
merge-minimum-images:
uses: ./.github/workflows/merge-docker-images.yaml
needs: [build-minimum-image]
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata for Docker images (cuda tag)
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=tag
type=sha,prefix=git-
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=cuda
flavor: |
latest=${{ github.ref == 'refs/heads/main' }}
suffix=-cuda,onlatest=true
- name: Extract metadata for Docker cache
id: cache-meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
flavor: |
prefix=cache-cuda-${{ matrix.platform }}-
- name: Build Docker image (cuda)
uses: docker/build-push-action@v5
id: build
with:
context: .
push: true
platforms: ${{ matrix.platform }}
labels: ${{ steps.meta.outputs.labels }}
outputs: type=image,name=${{ env.FULL_IMAGE_NAME }},push-by-digest=true,name-canonical=true,push=true
cache-from: type=registry,ref=${{ steps.cache-meta.outputs.tags }}
cache-to: type=registry,ref=${{ steps.cache-meta.outputs.tags }},mode=max
build-args: |
BUILD_HASH=${{ github.sha }}
USE_CUDA=true
- name: Export digest
run: |
mkdir -p /tmp/digests
digest="${{ steps.build.outputs.digest }}"
touch "/tmp/digests/${digest#sha256:}"
- name: Upload digest
uses: actions/upload-artifact@v4
with:
name: digests-cuda-${{ env.PLATFORM_PAIR }}
path: /tmp/digests/*
if-no-files-found: error
retention-days: 1
build-minimum-image:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
strategy:
fail-fast: false
matrix:
platform:
- linux/amd64
- linux/arm64
steps:
- name: Prepare
run: |
platform=${{ matrix.platform }}
echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata for Docker images (default latest tag)
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=tag
type=sha,prefix=git-
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
flavor: |
latest=${{ github.ref == 'refs/heads/main' }}
- name: Extract metadata for Docker cache
id: cache-meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
flavor: |
prefix=cache-${{ matrix.platform }}-
- name: Build Docker image (latest)
uses: docker/build-push-action@v5
id: build
with:
context: .
push: true
platforms: ${{ matrix.platform }}
labels: ${{ steps.meta.outputs.labels }}
outputs: type=image,name=${{ env.FULL_IMAGE_NAME }},push-by-digest=true,name-canonical=true,push=true
cache-from: type=registry,ref=${{ steps.cache-meta.outputs.tags }}
cache-to: type=registry,ref=${{ steps.cache-meta.outputs.tags }},mode=max
build-args: |
BUILD_HASH=${{ github.sha }}
MINIMUM_BUILD=true
- name: Export digest
run: |
mkdir -p /tmp/digests
digest="${{ steps.build.outputs.digest }}"
touch "/tmp/digests/${digest#sha256:}"
- name: Upload digest
uses: actions/upload-artifact@v4
with:
name: digests-minimum-${{ env.PLATFORM_PAIR }}
path: /tmp/digests/*
if-no-files-found: error
retention-days: 1
merge-main-images:
runs-on: ubuntu-latest
needs: [build-main-image]
steps:
- name: Download digests
uses: actions/download-artifact@v4
with:
pattern: digests-main-*
path: /tmp/digests
merge-multiple: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata for Docker images (default latest tag)
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=tag
type=sha,prefix=git-
flavor: |
latest=${{ github.ref == 'refs/heads/main' }}
- name: Create manifest list and push
working-directory: /tmp/digests
run: |
docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \
$(printf '${{ env.FULL_IMAGE_NAME }}@sha256:%s ' *)
- name: Inspect image
run: |
docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ steps.meta.outputs.version }}
merge-cuda-images:
runs-on: ubuntu-latest
needs: [build-cuda-image]
steps:
- name: Download digests
uses: actions/download-artifact@v4
with:
pattern: digests-cuda-*
path: /tmp/digests
merge-multiple: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata for Docker images (default latest tag)
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=tag
type=sha,prefix=git-
type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=cuda
flavor: |
latest=${{ github.ref == 'refs/heads/main' }}
suffix=-cuda,onlatest=true
- name: Create manifest list and push
working-directory: /tmp/digests
run: |
docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \
$(printf '${{ env.FULL_IMAGE_NAME }}@sha256:%s ' *)
- name: Inspect image
run: |
docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ steps.meta.outputs.version }}
merge-minimum-images:
runs-on: ubuntu-latest
needs: [build-minimum-image]
steps:
- name: Download digests
uses: actions/download-artifact@v4
with:
pattern: digests-minimum-*
path: /tmp/digests
merge-multiple: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata for Docker images (default latest tag)
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=tag
type=sha,prefix=git-
type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=minimum
flavor: |
latest=${{ github.ref == 'refs/heads/main' }}
suffix=-minimum,onlatest=true
- name: Create manifest list and push
working-directory: /tmp/digests
run: |
docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \
$(printf '${{ env.FULL_IMAGE_NAME }}@sha256:%s ' *)
- name: Inspect image
run: |
docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ steps.meta.outputs.version }}
image_name: ${{ github.repository }}
cache_id: minimum
extract_flavor: suffix=-minimum,onlatest=true
extract_tags: type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=minimum

View File

@ -0,0 +1,71 @@
name: Merge Docker Images
on:
workflow_call:
inputs:
cache_id:
required: true
type: string
extract_flavor:
required: false
default: ""
type: string
extract_tags:
required: false
default: ""
type: string
image_name:
required: true
type: string
registry:
required: false
default: ghcr.io
type: string
env:
FULL_IMAGE_NAME: ${{ inputs.registry }}/${{ inputs.image_name }}
jobs:
merge-images:
runs-on: ubuntu-latest
steps:
- name: Download digests
uses: actions/download-artifact@v4
with:
pattern: digests-${{ inputs.cache_id }}-*
path: /tmp/digests
merge-multiple: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ inputs.registry }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata for Docker images
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.FULL_IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=tag
type=sha,prefix=git-
${{ inputs.extract_tags }}
flavor: |
latest=${{ github.ref == 'refs/heads/main' }}
${{ inputs.extract_flavor }}
- name: Create manifest list and push
working-directory: /tmp/digests
run: |
docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \
$(printf '${{ env.FULL_IMAGE_NAME }}@sha256:%s ' *)
- name: Inspect image
run: |
docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ steps.meta.outputs.version }}

View File

@ -4,6 +4,10 @@
# Pipelines: UI-Agnostic OpenAI API Plugin Framework
> [!TIP]
> If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability.
Welcome to **Pipelines**, an [Open WebUI](https://github.com/open-webui) initiative. Pipelines bring modular, customizable workflows to any UI client supporting OpenAI API specs and much more! Easily extend functionalities, integrate unique logic, and create dynamic workflows with just a few lines of code.
## 🚀 Why Choose Pipelines?

View File

@ -11,6 +11,16 @@ from utils.pipelines.main import (
get_tools_specs,
)
# System prompt for function calling
DEFAULT_SYSTEM_PROMPT = (
"""Tools: {}
If a function tool doesn't match the query, return an empty string. Else, pick a
function tool, fill in the parameters from the function tool's schema, and
return it in the format {{ "name": \"functionName\", "parameters": {{ "key":
"value" }} }}. Only pick a function if the user asks. Only return the object. Do not return any other text."
"""
)
class Pipeline:
class Valves(BaseModel):
@ -29,7 +39,7 @@ class Pipeline:
TASK_MODEL: str
TEMPLATE: str
def __init__(self):
def __init__(self, prompt: str | None = None) -> None:
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
@ -40,6 +50,8 @@ class Pipeline:
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "function_calling_blueprint"
self.name = "Function Calling Blueprint"
self.prompt = prompt or DEFAULT_SYSTEM_PROMPT
self.tools: object = None
# Initialize valves
self.valves = self.Valves(
@ -87,14 +99,45 @@ And answer according to the language of the user's question.""",
# Get the tools specs
tools_specs = get_tools_specs(self.tools)
# System prompt for function calling
fc_system_prompt = (
f"Tools: {json.dumps(tools_specs, indent=2)}"
+ """
If a function tool doesn't match the query, return an empty string. Else, pick a function tool, fill in the parameters from the function tool's schema, and return it in the format { "name": \"functionName\", "parameters": { "key": "value" } }. Only pick a function if the user asks. Only return the object. Do not return any other text."
"""
)
prompt = self.prompt.format(json.dumps(tools_specs, indent=2))
content = "History:\n" + "\n".join(
[
f"{message['role']}: {message['content']}"
for message in body["messages"][::-1][:4]
]
) + f"Query: {user_message}"
result = self.run_completion(prompt, content)
messages = self.call_function(result, body["messages"])
return {**body, "messages": messages}
# Call the function
def call_function(self, result, messages: list[dict]) -> list[dict]:
if "name" not in result:
return messages
function = getattr(self.tools, result["name"])
function_result = None
try:
function_result = function(**result["parameters"])
except Exception as e:
print(e)
# Add the function result to the system prompt
if function_result:
system_prompt = self.valves.TEMPLATE.replace(
"{{CONTEXT}}", function_result
)
messages = add_or_update_system_message(
system_prompt, messages
)
# Return the updated messages
return messages
def run_completion(self, system_prompt: str, content: str) -> dict:
r = None
try:
# Call the OpenAI API to get the function response
@ -105,18 +148,11 @@ If a function tool doesn't match the query, return an empty string. Else, pick a
"messages": [
{
"role": "system",
"content": fc_system_prompt,
"content": system_prompt,
},
{
"role": "user",
"content": "History:\n"
+ "\n".join(
[
f"{message['role']}: {message['content']}"
for message in body["messages"][::-1][:4]
]
)
+ f"Query: {user_message}",
"content": content,
},
],
# TODO: dynamically add response_format?
@ -137,29 +173,7 @@ If a function tool doesn't match the query, return an empty string. Else, pick a
if content != "":
result = json.loads(content)
print(result)
# Call the function
if "name" in result:
function = getattr(self.tools, result["name"])
function_result = None
try:
function_result = function(**result["parameters"])
except Exception as e:
print(e)
# Add the function result to the system prompt
if function_result:
system_prompt = self.valves.TEMPLATE.replace(
"{{CONTEXT}}", function_result
)
print(system_prompt)
messages = add_or_update_system_message(
system_prompt, body["messages"]
)
# Return the updated messages
return {**body, "messages": messages}
return result
except Exception as e:
print(f"Error: {e}")
@ -170,4 +184,4 @@ If a function tool doesn't match the query, return an empty string. Else, pick a
except:
pass
return body
return {}

View File

@ -0,0 +1,84 @@
from typing import List, Union, Generator, Iterator, Optional
from pprint import pprint
import requests, json, warnings
# Uncomment to disable SSL verification warnings if needed.
# warnings.filterwarnings('ignore', message='Unverified HTTPS request')
class Pipeline:
def __init__(self):
self.name = "Dify Agent Pipeline"
self.api_url = "http://dify.hostname/v1/workflows/run" # Set correct hostname
self.api_key = "app-dify-key" # Insert your actual API key here.v
self.api_request_stream = True # Dify support stream
self.verify_ssl = True
self.debug = False
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup: {__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is shutdown.
print(f"on_shutdown: {__name__}")
pass
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
# This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API.
print(f"inlet: {__name__}")
if self.debug:
print(f"inlet: {__name__} - body:")
pprint(body)
print(f"inlet: {__name__} - user:")
pprint(user)
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
# This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API.
print(f"outlet: {__name__}")
if self.debug:
print(f"outlet: {__name__} - body:")
pprint(body)
print(f"outlet: {__name__} - user:")
pprint(user)
return body
def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]:
print(f"pipe: {__name__}")
if self.debug:
print(f"pipe: {__name__} - received message from user: {user_message}")
# Set reponse mode Dify API parameter
if self.api_request_stream is True:
response_mode = "streaming"
else:
response_mode = "blocking"
# This function triggers the workflow using the specified API.
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
data = {
"inputs": {"prompt": user_message},
"response_mode": response_mode,
"user": body["user"]["email"]
}
response = requests.post(self.api_url, headers=headers, json=data, stream=self.api_request_stream, verify=self.verify_ssl)
if response.status_code == 200:
# Process and yield each chunk from the response
for line in response.iter_lines():
if line:
try:
# Remove 'data: ' prefix and parse JSON
json_data = json.loads(line.decode('utf-8').replace('data: ', ''))
# Extract and yield only the 'text' field from the nested 'data' object
if 'data' in json_data and 'text' in json_data['data']:
yield json_data['data']['text']
except json.JSONDecodeError:
print(f"Failed to parse JSON: {line}")
else:
yield f"Workflow request failed with status code: {response.status_code}"

View File

@ -0,0 +1,79 @@
from typing import List, Union, Generator, Iterator, Optional
from pprint import pprint
import requests, json, warnings
# Uncomment to disable SSL verification warnings if needed.
# warnings.filterwarnings('ignore', message='Unverified HTTPS request')
class Pipeline:
def __init__(self):
self.name = "N8N Agent Pipeline"
self.api_url = "https://n8n.host/webhook/myflow" # Set correct hostname
self.api_key = "" # Insert your actual API key here
self.verify_ssl = True
self.debug = False
# Please note that N8N do not support stream reponses
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup: {__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is shutdown.
print(f"on_shutdown: {__name__}")
pass
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
# This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API.
print(f"inlet: {__name__}")
if self.debug:
print(f"inlet: {__name__} - body:")
pprint(body)
print(f"inlet: {__name__} - user:")
pprint(user)
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
# This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API.
print(f"outlet: {__name__}")
if self.debug:
print(f"outlet: {__name__} - body:")
pprint(body)
print(f"outlet: {__name__} - user:")
pprint(user)
return body
def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]:
# This is where you can add your custom pipelines like RAG.
print(f"pipe: {__name__}")
if self.debug:
print(f"pipe: {__name__} - received message from user: {user_message}")
# This function triggers the workflow using the specified API.
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
data = {
"inputs": {"prompt": user_message},
"user": body["user"]["email"]
}
response = requests.post(self.api_url, headers=headers, json=data, verify=self.verify_ssl)
if response.status_code == 200:
# Process and yield each chunk from the response
try:
for line in response.iter_lines():
if line:
# Decode each line assuming UTF-8 encoding and directly parse it as JSON
json_data = json.loads(line.decode('utf-8'))
# Check if 'output' exists in json_data and yield it
if 'output' in json_data:
yield json_data['output']
except json.JSONDecodeError as e:
print(f"Failed to parse JSON from line. Error: {str(e)}")
yield "Error in JSON parsing."
else:
yield f"Workflow request failed with status code: {response.status_code}"

View File

@ -46,7 +46,7 @@ class Pipeline:
{"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"},
{"id": "claude-3-opus-20240229", "name": "claude-3-opus"},
{"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"},
{"id": "claude-3-5-sonnet-20240620", "name": "claude-3.5-sonnet"},
{"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"},
]
async def on_startup(self):

View File

@ -0,0 +1,215 @@
"""
title: Jais Azure Pipeline with Stream Handling Fix
author: Abdessalaam Al-Alestini
date: 2024-06-20
version: 1.3
license: MIT
description: A pipeline for generating text using the Jais model via Azure AI Inference API, with fixed stream handling.
About Jais: https://inceptionai.ai/jais/
requirements: azure-ai-inference
environment_variables: AZURE_INFERENCE_CREDENTIAL, AZURE_INFERENCE_ENDPOINT, MODEL_ID
"""
import os
import json
import logging
from typing import List, Union, Generator, Iterator, Tuple
from pydantic import BaseModel
from azure.ai.inference import ChatCompletionsClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.inference.models import SystemMessage, UserMessage, AssistantMessage
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def pop_system_message(messages: List[dict]) -> Tuple[str, List[dict]]:
"""
Extract the system message from the list of messages.
Args:
messages (List[dict]): List of message dictionaries.
Returns:
Tuple[str, List[dict]]: A tuple containing the system message (or empty string) and the updated list of messages.
"""
system_message = ""
updated_messages = []
for message in messages:
if message['role'] == 'system':
system_message = message['content']
else:
updated_messages.append(message)
return system_message, updated_messages
class Pipeline:
class Valves(BaseModel):
AZURE_INFERENCE_CREDENTIAL: str = ""
AZURE_INFERENCE_ENDPOINT: str = ""
MODEL_ID: str = "jais-30b-chat"
def __init__(self):
self.type = "manifold"
self.id = "jais-azure"
self.name = "jais-azure/"
self.valves = self.Valves(
**{
"AZURE_INFERENCE_CREDENTIAL":
os.getenv("AZURE_INFERENCE_CREDENTIAL",
"your-azure-inference-key-here"),
"AZURE_INFERENCE_ENDPOINT":
os.getenv("AZURE_INFERENCE_ENDPOINT",
"your-azure-inference-endpoint-here"),
"MODEL_ID":
os.getenv("MODEL_ID", "jais-30b-chat"),
})
self.update_client()
def update_client(self):
self.client = ChatCompletionsClient(
endpoint=self.valves.AZURE_INFERENCE_ENDPOINT,
credential=AzureKeyCredential(
self.valves.AZURE_INFERENCE_CREDENTIAL))
def get_jais_models(self):
return [
{
"id": "jais-30b-chat",
"name": "Jais 30B Chat"
},
]
async def on_startup(self):
logger.info(f"on_startup:{__name__}")
pass
async def on_shutdown(self):
logger.info(f"on_shutdown:{__name__}")
pass
async def on_valves_updated(self):
self.update_client()
def pipelines(self) -> List[dict]:
return self.get_jais_models()
def pipe(self, user_message: str, model_id: str, messages: List[dict],
body: dict) -> Union[str, Generator, Iterator]:
try:
logger.debug(
f"Received request - user_message: {user_message}, model_id: {model_id}"
)
logger.debug(f"Messages: {json.dumps(messages, indent=2)}")
logger.debug(f"Body: {json.dumps(body, indent=2)}")
# Remove unnecessary keys
for key in ['user', 'chat_id', 'title']:
body.pop(key, None)
system_message, messages = pop_system_message(messages)
# Prepare messages for Jais
jais_messages = [SystemMessage(
content=system_message)] if system_message else []
jais_messages += [
UserMessage(content=msg['content']) if msg['role'] == 'user'
else SystemMessage(content=msg['content']) if msg['role']
== 'system' else AssistantMessage(content=msg['content'])
for msg in messages
]
# Prepare the payload
allowed_params = {
'temperature', 'max_tokens', 'presence_penalty',
'frequency_penalty', 'top_p'
}
filtered_body = {
k: v
for k, v in body.items() if k in allowed_params
}
logger.debug(f"Prepared Jais messages: {jais_messages}")
logger.debug(f"Filtered body: {filtered_body}")
is_stream = body.get("stream", False)
if is_stream:
return self.stream_response(jais_messages, filtered_body)
else:
return self.get_completion(jais_messages, filtered_body)
except Exception as e:
logger.error(f"Error in pipe: {str(e)}", exc_info=True)
return json.dumps({"error": str(e)})
def stream_response(self, jais_messages: List[Union[SystemMessage, UserMessage, AssistantMessage]], params: dict) -> str:
try:
complete_response = ""
response = self.client.complete(messages=jais_messages,
model=self.valves.MODEL_ID,
stream=True,
**params)
for update in response:
if update.choices:
delta_content = update.choices[0].delta.content
if delta_content:
complete_response += delta_content
return complete_response
except Exception as e:
logger.error(f"Error in stream_response: {str(e)}", exc_info=True)
return json.dumps({"error": str(e)})
def get_completion(self, jais_messages: List[Union[SystemMessage, UserMessage, AssistantMessage]], params: dict) -> str:
try:
response = self.client.complete(messages=jais_messages,
model=self.valves.MODEL_ID,
**params)
if response.choices:
result = response.choices[0].message.content
logger.debug(f"Completion result: {result}")
return result
else:
logger.warning("No choices in completion response")
return ""
except Exception as e:
logger.error(f"Error in get_completion: {str(e)}", exc_info=True)
return json.dumps({"error": str(e)})
# TEST CASE TO RUN THE PIPELINE
if __name__ == "__main__":
pipeline = Pipeline()
messages = [{
"role": "user",
"content": "How many languages are in the world?"
}]
body = {
"temperature": 0.5,
"max_tokens": 150,
"presence_penalty": 0.1,
"frequency_penalty": 0.8,
"stream": True # Change to True to test streaming
}
result = pipeline.pipe(user_message="How many languages are in the world?",
model_id="jais-30b-chat",
messages=messages,
body=body)
# Handle streaming result
if isinstance(result, str):
content = json.dumps({"content": result}, ensure_ascii=False)
print(content)
else:
complete_response = ""
for part in result:
content_delta = json.loads(part).get("delta")
if content_delta:
complete_response += content_delta
print(json.dumps({"content": complete_response}, ensure_ascii=False))

View File

@ -0,0 +1,99 @@
from typing import List, Union, Generator, Iterator
from pydantic import BaseModel
import requests
import os
class Pipeline:
class Valves(BaseModel):
# You can add your custom valves here.
AZURE_OPENAI_API_KEY: str
AZURE_OPENAI_ENDPOINT: str
AZURE_OPENAI_API_VERSION: str
AZURE_OPENAI_MODELS: str
AZURE_OPENAI_MODEL_NAMES: str
def __init__(self):
self.type = "manifold"
self.name = "Azure OpenAI: "
self.valves = self.Valves(
**{
"AZURE_OPENAI_API_KEY": os.getenv("AZURE_OPENAI_API_KEY", "your-azure-openai-api-key-here"),
"AZURE_OPENAI_ENDPOINT": os.getenv("AZURE_OPENAI_ENDPOINT", "your-azure-openai-endpoint-here"),
"AZURE_OPENAI_API_VERSION": os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01"),
"AZURE_OPENAI_MODELS": os.getenv("AZURE_OPENAI_MODELS", "gpt-35-turbo;gpt-4o"),
"AZURE_OPENAI_MODEL_NAMES": os.getenv("AZURE_OPENAI_MODEL_NAMES", "GPT-35 Turbo;GPT-4o"),
}
)
self.set_pipelines()
pass
def set_pipelines(self):
models = self.valves.AZURE_OPENAI_MODELS.split(";")
model_names = self.valves.AZURE_OPENAI_MODEL_NAMES.split(";")
self.pipelines = [
{"id": model, "name": name} for model, name in zip(models, model_names)
]
print(f"azure_openai_manifold_pipeline - models: {self.pipelines}")
pass
async def on_valves_updated(self):
self.set_pipelines()
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
pass
def pipe(
self, user_message: str, model_id: str, messages: List[dict], body: dict
) -> Union[str, Generator, Iterator]:
# This is where you can add your custom pipelines like RAG.
print(f"pipe:{__name__}")
print(messages)
print(user_message)
headers = {
"api-key": self.valves.AZURE_OPENAI_API_KEY,
"Content-Type": "application/json",
}
url = f"{self.valves.AZURE_OPENAI_ENDPOINT}/openai/deployments/{model_id}/chat/completions?api-version={self.valves.AZURE_OPENAI_API_VERSION}"
allowed_params = {'messages', 'temperature', 'role', 'content', 'contentPart', 'contentPartImage',
'enhancements', 'dataSources', 'n', 'stream', 'stop', 'max_tokens', 'presence_penalty',
'frequency_penalty', 'logit_bias', 'user', 'function_call', 'funcions', 'tools',
'tool_choice', 'top_p', 'log_probs', 'top_logprobs', 'response_format', 'seed'}
# remap user field
if "user" in body and not isinstance(body["user"], str):
body["user"] = body["user"]["id"] if "id" in body["user"] else str(body["user"])
filtered_body = {k: v for k, v in body.items() if k in allowed_params}
# log fields that were filtered out as a single line
if len(body) != len(filtered_body):
print(f"Dropped params: {', '.join(set(body.keys()) - set(filtered_body.keys()))}")
try:
r = requests.post(
url=url,
json=filtered_body,
headers=headers,
stream=True,
)
r.raise_for_status()
if body["stream"]:
return r.iter_lines()
else:
return r.json()
except Exception as e:
if r:
text = r.text
return f"Error: {e} ({text})"
else:
return f"Error: {e}"

View File

@ -1,23 +1,13 @@
from typing import List, Union, Optional
from pydantic import BaseModel, RootModel, ConfigDict
class ImageContent(BaseModel):
type: str
image_url: dict
class TextContent(BaseModel):
type: str
text: str
class MessageContent(RootModel):
root: Union[TextContent, ImageContent]
from typing import List, Optional
from pydantic import BaseModel, ConfigDict
class OpenAIChatMessage(BaseModel):
role: str
content: Union[str, List[MessageContent]]
content: str | List
model_config = ConfigDict(extra="allow")
class OpenAIChatCompletionForm(BaseModel):
stream: bool = True
model: str
@ -25,6 +15,7 @@ class OpenAIChatCompletionForm(BaseModel):
model_config = ConfigDict(extra="allow")
class FilterForm(BaseModel):
body: dict
user: Optional[dict] = None

View File

@ -8,13 +8,13 @@ PIPELINES_DIR=${PIPELINES_DIR:-./pipelines}
reset_pipelines_dir() {
if [ "$RESET_PIPELINES_DIR" = true ]; then
echo "Resetting pipelines directory: $PIPELINES_DIR"
# Check if the directory exists
if [ -d "$PIPELINES_DIR" ]; then
# Remove all contents of the directory
rm -rf "${PIPELINES_DIR:?}"/*
echo "All contents in $PIPELINES_DIR have been removed."
# Optionally recreate the directory if needed
mkdir -p "$PIPELINES_DIR"
echo "$PIPELINES_DIR has been recreated."
@ -87,14 +87,14 @@ install_frontmatter_requirements() {
local file_content=$(cat "$1")
# Extract the first triple-quoted block
local first_block=$(echo "$file_content" | awk '/"""/{flag=!flag; if(flag) count++; if(count == 2) {exit}} flag' )
# Check if the block contains requirements
local requirements=$(echo "$first_block" | grep -i 'requirements:')
if [ -n "$requirements" ]; then
# Extract the requirements list
requirements=$(echo "$requirements" | awk -F': ' '{print $2}' | tr ',' ' ' | tr -d '\r')
# Construct and echo the pip install command
local pip_command="pip install $requirements"
echo "$pip_command"
@ -108,13 +108,14 @@ install_frontmatter_requirements() {
# Check if PIPELINES_URLS environment variable is set and non-empty
if [[ -n "$PIPELINES_URLS" ]]; then
pipelines_dir="./pipelines"
mkdir -p "$pipelines_dir"
if [ ! -d "$PIPELINES_DIR" ]; then
mkdir -p "$PIPELINES_DIR"
fi
# Split PIPELINES_URLS by ';' and iterate over each path
IFS=';' read -ra ADDR <<< "$PIPELINES_URLS"
for path in "${ADDR[@]}"; do
download_pipelines "$path" "$pipelines_dir"
download_pipelines "$path" "$PIPELINES_DIR"
done
for file in "$pipelines_dir"/*; do

View File

@ -62,7 +62,7 @@ def pop_system_message(messages: List[dict]) -> Tuple[dict, List[dict]]:
return get_system_message(messages), remove_system_message(messages)
def add_or_update_system_message(content: str, messages: List[dict]):
def add_or_update_system_message(content: str, messages: List[dict]) -> List[dict]:
"""
Adds a new system message at the beginning of the messages list
or updates the existing system message at the beginning.