Merge branch 'main' into prompt_injection_filter

This commit is contained in:
Jannik Streidl 2024-06-05 20:54:22 +02:00
commit 12c67205d2
40 changed files with 991 additions and 569 deletions

View File

@ -8,29 +8,72 @@ Welcome to **Pipelines**, an [Open WebUI](https://github.com/open-webui) initiat
## 🚀 Why Choose Pipelines?
- **Seamless Integration:** Compatible with any UI/client supporting OpenAI API specs.
- **Limitless Possibilities:** Easily add custom logic and integrate Python libraries, from AI agents to home automation APIs.
- **Seamless Integration:** Compatible with any UI/client supporting OpenAI API specs. (Only pipe-type pipelines are supported; filter types require clients with Pipelines support.)
- **Custom Hooks:** Build and integrate custom pipelines.
### Examples of What You Can Achieve:
- [**Function Calling Pipeline**](/examples/filters/function_calling_filter_pipeline.py): Easily handle function calls and enhance your applications with custom logic.
- [**Custom RAG Pipeline**](/examples/pipelines/rag/llamaindex_pipeline.py): Implement sophisticated Retrieval-Augmented Generation pipelines tailored to your needs.
- [**Message Monitoring Using Langfuse**](/examples/filters/langfuse_filter_pipeline.py): Monitor and analyze message interactions in real-time using Langfuse.
- [**Rate Limit Filter**](/examples/filters/rate_limit_filter_pipeline.py): Control the flow of requests to prevent exceeding rate limits.
- [**Real-Time Translation Filter with LibreTranslate**](/examples/filters/libretranslate_filter_pipeline.py): Seamlessly integrate real-time translations into your LLM interactions.
- [**Toxic Message Filter**](/examples/filters/detoxify_filter_pipeline.py): Implement filters to detect and handle toxic messages effectively.
- **And Much More!**: The sky is the limit for what you can accomplish with Pipelines and Python. [Check out our scaffolds](/examples/scaffolds) to get a head start on your projects and see how you can streamline your development process!
## 🔧 How It Works
<p align="center">
<a href="#"><img src="./docs/images/workflow.png" alt="Pipelines Workflow"></a>
<a href="./docs/images/workflow.png"><img src="./docs/images/workflow.png" alt="Pipelines Workflow"></a>
</p>
Integrating Pipelines with any OpenAI API-compatible UI client is simple. Launch your Pipelines instance and set the OpenAI URL on your client to the Pipelines URL. That's it! You're ready to leverage any Python library for your needs.
## 📂 Directory Structure and Examples
## ⚡ Quick Start with Docker
The `/pipelines` directory is the core of your setup. Add new modules, customize existing ones, and manage your workflows here. All the pipelines in the `/pipelines` directory will be **automatically loaded** when the server launches.
> [!WARNING]
> Pipelines are a plugin system with arbitrary code execution — **don't fetch random pipelines from sources you don't trust**.
### Integration Examples
For a streamlined setup using Docker:
Find various integration examples in the `/pipelines/examples` directory. These examples show how to integrate different functionalities, providing a foundation for building your own custom pipelines.
1. **Run the Pipelines container:**
```sh
docker run -d -p 9099:9099 --add-host=host.docker.internal:host-gateway -v pipelines:/app/pipelines --name pipelines --restart always ghcr.io/open-webui/pipelines:main
```
2. **Connect to Open WebUI:**
- Navigate to the **Settings > Connections > OpenAI API** section in Open WebUI.
- Set the API URL to `http://localhost:9099` and the API key to `0p3n-w3bu!`. Your pipelines should now be active.
> [!NOTE]
> If your Open WebUI is running in a Docker container, replace `localhost` with `host.docker.internal` in the API URL.
3. **Manage Configurations:**
- In the admin panel, go to **Admin Settings > Pipelines tab**.
- Select your desired pipeline and modify the valve values directly from the WebUI.
> [!TIP]
> If you are unable to connect, it is most likely a Docker networking issue. We encourage you to troubleshoot on your own and share your methods and solutions in the discussions forum.
If you need to install a custom pipeline with additional dependencies:
- **Run the following command:**
```sh
docker run -d -p 9099:9099 --add-host=host.docker.internal:host-gateway -e PIPELINES_URLS="https://github.com/open-webui/pipelines/blob/main/examples/filters/detoxify_filter_pipeline.py" -v pipelines:/app/pipelines --name pipelines --restart always ghcr.io/open-webui/pipelines:main
```
Alternatively, you can directly install pipelines from the admin settings by copying and pasting the pipeline URL, provided it doesn't have additional dependencies.
That's it! You're now ready to build customizable AI integrations effortlessly with Pipelines. Enjoy!
## 📦 Installation and Setup
Get started with Pipelines in a few steps:
Get started with Pipelines in a few easy steps:
1. **Ensure Python 3.11 is installed.**
2. **Clone the Pipelines repository:**
@ -54,20 +97,15 @@ Get started with Pipelines in a few steps:
Once the server is running, set the OpenAI URL on your client to the Pipelines URL. This unlocks the full capabilities of Pipelines, integrating any Python library and creating custom workflows tailored to your needs.
## ⚡ Quick Start Example
## 📂 Directory Structure and Examples
1. **Copy and paste `rate_limit_filter_pipeline.py` from `/pipelines/examples` to the `/pipelines` folder.**
2. **Run the Pipelines server:** It will be hosted at `http://localhost:9099` by default.
3. **Configure Open WebUI:**
The `/pipelines` directory is the core of your setup. Add new modules, customize existing ones, and manage your workflows here. All the pipelines in the `/pipelines` directory will be **automatically loaded** when the server launches.
- In Open WebUI, go to **Settings > Connections > OpenAI API**.
- Set the API URL to `http://localhost:9099` and set the API key (default: `0p3n-w3bu!`). Your filter should now be active.
You can change this directory from `/pipelines` to another location using the `PIPELINES_DIR` env variable.
4. **Adjust Settings:**
- In the admin panel, go to **Admin Settings > Pipelines tab**.
- Select the filter and change the valve values directly from the WebUI.
### Integration Examples
That's it! You're now set up with Pipelines. Enjoy building customizable AI integrations effortlessly!
Find various integration examples in the `/examples` directory. These examples show how to integrate different functionalities, providing a foundation for building your own custom pipelines.
## 🎉 Work in Progress

View File

@ -5,51 +5,51 @@ import os
import requests
import json
from utils.main import (
from utils.pipelines.main import (
get_last_user_message,
add_or_update_system_message,
get_function_specs,
get_tools_specs,
)
from typing import Literal
class Pipeline:
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves for function calling
OPENAI_API_BASE_URL: str
OPENAI_API_KEY: str
TASK_MODEL: str
TEMPLATE: str
def __init__(self):
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
# Assign a unique identifier to the pipeline.
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "function_calling_filter_pipeline"
self.name = "Function Calling Filter"
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves for function calling
OPENAI_API_BASE_URL: str
OPENAI_API_KEY: str
TASK_MODEL: str
TEMPLATE: str
OPENWEATHERMAP_API_KEY: str = ""
# self.id = "function_calling_blueprint"
self.name = "Function Calling Blueprint"
# Initialize valves
self.valves = Valves(
self.valves = self.Valves(
**{
"pipelines": ["*"], # Connect to all pipelines
"OPENAI_API_BASE_URL": "https://api.openai.com/v1",
"OPENAI_API_BASE_URL": os.getenv(
"OPENAI_API_BASE_URL", "https://api.openai.com/v1"
),
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY"),
"TASK_MODEL": "gpt-3.5-turbo",
"TASK_MODEL": os.getenv("TASK_MODEL", "gpt-3.5-turbo"),
"TEMPLATE": """Use the following context as your learned knowledge, inside <context></context> XML tags.
<context>
{{CONTEXT}}
@ -63,64 +63,6 @@ And answer according to the language of the user's question.""",
}
)
class Functions:
def __init__(self, pipeline) -> None:
self.pipeline = pipeline
def get_current_weather(
self,
location: str,
unit: Literal["metric", "fahrenheit"] = "fahrenheit",
) -> str:
"""
Get the current weather for a location. If the location is not found, return an empty string.
:param location: The location to get the weather for.
:param unit: The unit to get the weather in. Default is fahrenheit.
:return: The current weather for the location.
"""
# https://openweathermap.org/api
if self.pipeline.valves.OPENWEATHERMAP_API_KEY == "":
return "OpenWeatherMap API Key not set, ask the user to set it up."
else:
units = "imperial" if unit == "fahrenheit" else "metric"
params = {
"q": location,
"appid": self.pipeline.valves.OPENWEATHERMAP_API_KEY,
"units": units,
}
response = requests.get(
"http://api.openweathermap.org/data/2.5/weather", params=params
)
response.raise_for_status() # Raises an HTTPError for bad responses
data = response.json()
weather_description = data["weather"][0]["description"]
temperature = data["main"]["temp"]
return f"{location}: {weather_description.capitalize()}, {temperature}°{unit.capitalize()[0]}"
def calculator(self, equation: str) -> str:
"""
Calculate the result of an equation.
:param equation: The equation to calculate.
"""
# Avoid using eval in production code
# https://nedbatchelder.com/blog/201206/eval_really_is_dangerous.html
try:
result = eval(equation)
return f"{equation} = {result}"
except Exception as e:
print(e)
return "Invalid equation"
self.functions = Functions(self)
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
@ -142,14 +84,14 @@ And answer according to the language of the user's question.""",
# Get the last user message
user_message = get_last_user_message(body["messages"])
# Get the function specs
function_specs = get_function_specs(self.functions)
# Get the tools specs
tools_specs = get_tools_specs(self.tools)
# System prompt for function calling
fc_system_prompt = (
f"Functions: {json.dumps(function_specs, indent=2)}"
f"Tools: {json.dumps(tools_specs, indent=2)}"
+ """
If a function doesn't match the query, return an empty string. Else, pick a function, fill in the parameters from the function's schema, and return it in the format { "name": \"functionName\", "parameters": { "key": "value" } }. Only pick a function if the user asks. Only return the object. Do not return any other text."
If a function tool doesn't match the query, return an empty string. Else, pick a function tool, fill in the parameters from the function tool's schema, and return it in the format { "name": \"functionName\", "parameters": { "key": "value" } }. Only pick a function if the user asks. Only return the object. Do not return any other text."
"""
)
@ -198,7 +140,7 @@ If a function doesn't match the query, return an empty string. Else, pick a func
# Call the function
if "name" in result:
function = getattr(self.functions, result["name"])
function = getattr(self.tools, result["name"])
function_result = None
try:
function_result = function(**result["parameters"])

View File

@ -1,171 +0,0 @@
October 2023
One of the most important things I didn't understand about the world when I was a child is the degree to which the returns for performance are superlinear.
Teachers and coaches implicitly told us the returns were linear. "You get out," I heard a thousand times, "what you put in." They meant well, but this is rarely true. If your product is only half as good as your competitor's, you don't get half as many customers. You get no customers, and you go out of business.
It's obviously true that the returns for performance are superlinear in business. Some think this is a flaw of capitalism, and that if we changed the rules it would stop being true. But superlinear returns for performance are a feature of the world, not an artifact of rules we've invented. We see the same pattern in fame, power, military victories, knowledge, and even benefit to humanity. In all of these, the rich get richer. [1]
You can't understand the world without understanding the concept of superlinear returns. And if you're ambitious you definitely should, because this will be the wave you surf on.
It may seem as if there are a lot of different situations with superlinear returns, but as far as I can tell they reduce to two fundamental causes: exponential growth and thresholds.
The most obvious case of superlinear returns is when you're working on something that grows exponentially. For example, growing bacterial cultures. When they grow at all, they grow exponentially. But they're tricky to grow. Which means the difference in outcome between someone who's adept at it and someone who's not is very great.
Startups can also grow exponentially, and we see the same pattern there. Some manage to achieve high growth rates. Most don't. And as a result you get qualitatively different outcomes: the companies with high growth rates tend to become immensely valuable, while the ones with lower growth rates may not even survive.
Y Combinator encourages founders to focus on growth rate rather than absolute numbers. It prevents them from being discouraged early on, when the absolute numbers are still low. It also helps them decide what to focus on: you can use growth rate as a compass to tell you how to evolve the company. But the main advantage is that by focusing on growth rate you tend to get something that grows exponentially.
YC doesn't explicitly tell founders that with growth rate "you get out what you put in," but it's not far from the truth. And if growth rate were proportional to performance, then the reward for performance p over time t would be proportional to pt.
Even after decades of thinking about this, I find that sentence startling.
Whenever how well you do depends on how well you've done, you'll get exponential growth. But neither our DNA nor our customs prepare us for it. No one finds exponential growth natural; every child is surprised, the first time they hear it, by the story of the man who asks the king for a single grain of rice the first day and double the amount each successive day.
What we don't understand naturally we develop customs to deal with, but we don't have many customs about exponential growth either, because there have been so few instances of it in human history. In principle herding should have been one: the more animals you had, the more offspring they'd have. But in practice grazing land was the limiting factor, and there was no plan for growing that exponentially.
Or more precisely, no generally applicable plan. There was a way to grow one's territory exponentially: by conquest. The more territory you control, the more powerful your army becomes, and the easier it is to conquer new territory. This is why history is full of empires. But so few people created or ran empires that their experiences didn't affect customs very much. The emperor was a remote and terrifying figure, not a source of lessons one could use in one's own life.
The most common case of exponential growth in preindustrial times was probably scholarship. The more you know, the easier it is to learn new things. The result, then as now, was that some people were startlingly more knowledgeable than the rest about certain topics. But this didn't affect customs much either. Although empires of ideas can overlap and there can thus be far more emperors, in preindustrial times this type of empire had little practical effect. [2]
That has changed in the last few centuries. Now the emperors of ideas can design bombs that defeat the emperors of territory. But this phenomenon is still so new that we haven't fully assimilated it. Few even of the participants realize they're benefitting from exponential growth or ask what they can learn from other instances of it.
The other source of superlinear returns is embodied in the expression "winner take all." In a sports match the relationship between performance and return is a step function: the winning team gets one win whether they do much better or just slightly better. [3]
The source of the step function is not competition per se, however. It's that there are thresholds in the outcome. You don't need competition to get those. There can be thresholds in situations where you're the only participant, like proving a theorem or hitting a target.
It's remarkable how often a situation with one source of superlinear returns also has the other. Crossing thresholds leads to exponential growth: the winning side in a battle usually suffers less damage, which makes them more likely to win in the future. And exponential growth helps you cross thresholds: in a market with network effects, a company that grows fast enough can shut out potential competitors.
Fame is an interesting example of a phenomenon that combines both sources of superlinear returns. Fame grows exponentially because existing fans bring you new ones. But the fundamental reason it's so concentrated is thresholds: there's only so much room on the A-list in the average person's head.
The most important case combining both sources of superlinear returns may be learning. Knowledge grows exponentially, but there are also thresholds in it. Learning to ride a bicycle, for example. Some of these thresholds are akin to machine tools: once you learn to read, you're able to learn anything else much faster. But the most important thresholds of all are those representing new discoveries. Knowledge seems to be fractal in the sense that if you push hard at the boundary of one area of knowledge, you sometimes discover a whole new field. And if you do, you get first crack at all the new discoveries to be made in it. Newton did this, and so did Durer and Darwin.
Are there general rules for finding situations with superlinear returns? The most obvious one is to seek work that compounds.
There are two ways work can compound. It can compound directly, in the sense that doing well in one cycle causes you to do better in the next. That happens for example when you're building infrastructure, or growing an audience or brand. Or work can compound by teaching you, since learning compounds. This second case is an interesting one because you may feel you're doing badly as it's happening. You may be failing to achieve your immediate goal. But if you're learning a lot, then you're getting exponential growth nonetheless.
This is one reason Silicon Valley is so tolerant of failure. People in Silicon Valley aren't blindly tolerant of failure. They'll only continue to bet on you if you're learning from your failures. But if you are, you are in fact a good bet: maybe your company didn't grow the way you wanted, but you yourself have, and that should yield results eventually.
Indeed, the forms of exponential growth that don't consist of learning are so often intermixed with it that we should probably treat this as the rule rather than the exception. Which yields another heuristic: always be learning. If you're not learning, you're probably not on a path that leads to superlinear returns.
But don't overoptimize what you're learning. Don't limit yourself to learning things that are already known to be valuable. You're learning; you don't know for sure yet what's going to be valuable, and if you're too strict you'll lop off the outliers.
What about step functions? Are there also useful heuristics of the form "seek thresholds" or "seek competition?" Here the situation is trickier. The existence of a threshold doesn't guarantee the game will be worth playing. If you play a round of Russian roulette, you'll be in a situation with a threshold, certainly, but in the best case you're no better off. "Seek competition" is similarly useless; what if the prize isn't worth competing for? Sufficiently fast exponential growth guarantees both the shape and magnitude of the return curve — because something that grows fast enough will grow big even if it's trivially small at first — but thresholds only guarantee the shape. [4]
A principle for taking advantage of thresholds has to include a test to ensure the game is worth playing. Here's one that does: if you come across something that's mediocre yet still popular, it could be a good idea to replace it. For example, if a company makes a product that people dislike yet still buy, then presumably they'd buy a better alternative if you made one. [5]
It would be great if there were a way to find promising intellectual thresholds. Is there a way to tell which questions have whole new fields beyond them? I doubt we could ever predict this with certainty, but the prize is so valuable that it would be useful to have predictors that were even a little better than random, and there's hope of finding those. We can to some degree predict when a research problem isn't likely to lead to new discoveries: when it seems legit but boring. Whereas the kind that do lead to new discoveries tend to seem very mystifying, but perhaps unimportant. (If they were mystifying and obviously important, they'd be famous open questions with lots of people already working on them.) So one heuristic here is to be driven by curiosity rather than careerism — to give free rein to your curiosity instead of working on what you're supposed to.
The prospect of superlinear returns for performance is an exciting one for the ambitious. And there's good news in this department: this territory is expanding in both directions. There are more types of work in which you can get superlinear returns, and the returns themselves are growing.
There are two reasons for this, though they're so closely intertwined that they're more like one and a half: progress in technology, and the decreasing importance of organizations.
Fifty years ago it used to be much more necessary to be part of an organization to work on ambitious projects. It was the only way to get the resources you needed, the only way to have colleagues, and the only way to get distribution. So in 1970 your prestige was in most cases the prestige of the organization you belonged to. And prestige was an accurate predictor, because if you weren't part of an organization, you weren't likely to achieve much. There were a handful of exceptions, most notably artists and writers, who worked alone using inexpensive tools and had their own brands. But even they were at the mercy of organizations for reaching audiences. [6]
A world dominated by organizations damped variation in the returns for performance. But this world has eroded significantly just in my lifetime. Now a lot more people can have the freedom that artists and writers had in the 20th century. There are lots of ambitious projects that don't require much initial funding, and lots of new ways to learn, make money, find colleagues, and reach audiences.
There's still plenty of the old world left, but the rate of change has been dramatic by historical standards. Especially considering what's at stake. It's hard to imagine a more fundamental change than one in the returns for performance.
Without the damping effect of institutions, there will be more variation in outcomes. Which doesn't imply everyone will be better off: people who do well will do even better, but those who do badly will do worse. That's an important point to bear in mind. Exposing oneself to superlinear returns is not for everyone. Most people will be better off as part of the pool. So who should shoot for superlinear returns? Ambitious people of two types: those who know they're so good that they'll be net ahead in a world with higher variation, and those, particularly the young, who can afford to risk trying it to find out. [7]
The switch away from institutions won't simply be an exodus of their current inhabitants. Many of the new winners will be people they'd never have let in. So the resulting democratization of opportunity will be both greater and more authentic than any tame intramural version the institutions themselves might have cooked up.
Not everyone is happy about this great unlocking of ambition. It threatens some vested interests and contradicts some ideologies. [8] But if you're an ambitious individual it's good news for you. How should you take advantage of it?
The most obvious way to take advantage of superlinear returns for performance is by doing exceptionally good work. At the far end of the curve, incremental effort is a bargain. All the more so because there's less competition at the far end — and not just for the obvious reason that it's hard to do something exceptionally well, but also because people find the prospect so intimidating that few even try. Which means it's not just a bargain to do exceptional work, but a bargain even to try to.
There are many variables that affect how good your work is, and if you want to be an outlier you need to get nearly all of them right. For example, to do something exceptionally well, you have to be interested in it. Mere diligence is not enough. So in a world with superlinear returns, it's even more valuable to know what you're interested in, and to find ways to work on it. [9] It will also be important to choose work that suits your circumstances. For example, if there's a kind of work that inherently requires a huge expenditure of time and energy, it will be increasingly valuable to do it when you're young and don't yet have children.
There's a surprising amount of technique to doing great work. It's not just a matter of trying hard. I'm going to take a shot giving a recipe in one paragraph.
Choose work you have a natural aptitude for and a deep interest in. Develop a habit of working on your own projects; it doesn't matter what they are so long as you find them excitingly ambitious. Work as hard as you can without burning out, and this will eventually bring you to one of the frontiers of knowledge. These look smooth from a distance, but up close they're full of gaps. Notice and explore such gaps, and if you're lucky one will expand into a whole new field. Take as much risk as you can afford; if you're not failing occasionally you're probably being too conservative. Seek out the best colleagues. Develop good taste and learn from the best examples. Be honest, especially with yourself. Exercise and eat and sleep well and avoid the more dangerous drugs. When in doubt, follow your curiosity. It never lies, and it knows more than you do about what's worth paying attention to. [10]
And there is of course one other thing you need: to be lucky. Luck is always a factor, but it's even more of a factor when you're working on your own rather than as part of an organization. And though there are some valid aphorisms about luck being where preparedness meets opportunity and so on, there's also a component of true chance that you can't do anything about. The solution is to take multiple shots. Which is another reason to start taking risks early.
The best example of a field with superlinear returns is probably science. It has exponential growth, in the form of learning, combined with thresholds at the extreme edge of performance — literally at the limits of knowledge.
The result has been a level of inequality in scientific discovery that makes the wealth inequality of even the most stratified societies seem mild by comparison. Newton's discoveries were arguably greater than all his contemporaries' combined. [11]
This point may seem obvious, but it might be just as well to spell it out. Superlinear returns imply inequality. The steeper the return curve, the greater the variation in outcomes.
In fact, the correlation between superlinear returns and inequality is so strong that it yields another heuristic for finding work of this type: look for fields where a few big winners outperform everyone else. A kind of work where everyone does about the same is unlikely to be one with superlinear returns.
What are fields where a few big winners outperform everyone else? Here are some obvious ones: sports, politics, art, music, acting, directing, writing, math, science, starting companies, and investing. In sports the phenomenon is due to externally imposed thresholds; you only need to be a few percent faster to win every race. In politics, power grows much as it did in the days of emperors. And in some of the other fields (including politics) success is driven largely by fame, which has its own source of superlinear growth. But when we exclude sports and politics and the effects of fame, a remarkable pattern emerges: the remaining list is exactly the same as the list of fields where you have to be independent-minded to succeed — where your ideas have to be not just correct, but novel as well. [12]
This is obviously the case in science. You can't publish papers saying things that other people have already said. But it's just as true in investing, for example. It's only useful to believe that a company will do well if most other investors don't; if everyone else thinks the company will do well, then its stock price will already reflect that, and there's no room to make money.
What else can we learn from these fields? In all of them you have to put in the initial effort. Superlinear returns seem small at first. At this rate, you find yourself thinking, I'll never get anywhere. But because the reward curve rises so steeply at the far end, it's worth taking extraordinary measures to get there.
In the startup world, the name for this principle is "do things that don't scale." If you pay a ridiculous amount of attention to your tiny initial set of customers, ideally you'll kick off exponential growth by word of mouth. But this same principle applies to anything that grows exponentially. Learning, for example. When you first start learning something, you feel lost. But it's worth making the initial effort to get a toehold, because the more you learn, the easier it will get.
There's another more subtle lesson in the list of fields with superlinear returns: not to equate work with a job. For most of the 20th century the two were identical for nearly everyone, and as a result we've inherited a custom that equates productivity with having a job. Even now to most people the phrase "your work" means their job. But to a writer or artist or scientist it means whatever they're currently studying or creating. For someone like that, their work is something they carry with them from job to job, if they have jobs at all. It may be done for an employer, but it's part of their portfolio.
It's an intimidating prospect to enter a field where a few big winners outperform everyone else. Some people do this deliberately, but you don't need to. If you have sufficient natural ability and you follow your curiosity sufficiently far, you'll end up in one. Your curiosity won't let you be interested in boring questions, and interesting questions tend to create fields with superlinear returns if they're not already part of one.
The territory of superlinear returns is by no means static. Indeed, the most extreme returns come from expanding it. So while both ambition and curiosity can get you into this territory, curiosity may be the more powerful of the two. Ambition tends to make you climb existing peaks, but if you stick close enough to an interesting enough question, it may grow into a mountain beneath you.
Notes
There's a limit to how sharply you can distinguish between effort, performance, and return, because they're not sharply distinguished in fact. What counts as return to one person might be performance to another. But though the borders of these concepts are blurry, they're not meaningless. I've tried to write about them as precisely as I could without crossing into error.
[1] Evolution itself is probably the most pervasive example of superlinear returns for performance. But this is hard for us to empathize with because we're not the recipients; we're the returns.
[2] Knowledge did of course have a practical effect before the Industrial Revolution. The development of agriculture changed human life completely. But this kind of change was the result of broad, gradual improvements in technique, not the discoveries of a few exceptionally learned people.
[3] It's not mathematically correct to describe a step function as superlinear, but a step function starting from zero works like a superlinear function when it describes the reward curve for effort by a rational actor. If it starts at zero then the part before the step is below any linearly increasing return, and the part after the step must be above the necessary return at that point or no one would bother.
[4] Seeking competition could be a good heuristic in the sense that some people find it motivating. It's also somewhat of a guide to promising problems, because it's a sign that other people find them promising. But it's a very imperfect sign: often there's a clamoring crowd chasing some problem, and they all end up being trumped by someone quietly working on another one.
[5] Not always, though. You have to be careful with this rule. When something is popular despite being mediocre, there's often a hidden reason why. Perhaps monopoly or regulation make it hard to compete. Perhaps customers have bad taste or have broken procedures for deciding what to buy. There are huge swathes of mediocre things that exist for such reasons.
[6] In my twenties I wanted to be an artist and even went to art school to study painting. Mostly because I liked art, but a nontrivial part of my motivation came from the fact that artists seemed least at the mercy of organizations.
[7] In principle everyone is getting superlinear returns. Learning compounds, and everyone learns in the course of their life. But in practice few push this kind of everyday learning to the point where the return curve gets really steep.
[8] It's unclear exactly what advocates of "equity" mean by it. They seem to disagree among themselves. But whatever they mean is probably at odds with a world in which institutions have less power to control outcomes, and a handful of outliers do much better than everyone else.
It may seem like bad luck for this concept that it arose at just the moment when the world was shifting in the opposite direction, but I don't think this was a coincidence. I think one reason it arose now is because its adherents feel threatened by rapidly increasing variation in performance.
[9] Corollary: Parents who pressure their kids to work on something prestigious, like medicine, even though they have no interest in it, will be hosing them even more than they have in the past.
[10] The original version of this paragraph was the first draft of "How to Do Great Work." As soon as I wrote it I realized it was a more important topic than superlinear returns, so I paused the present essay to expand this paragraph into its own. Practically nothing remains of the original version, because after I finished "How to Do Great Work" I rewrote it based on that.
[11] Before the Industrial Revolution, people who got rich usually did it like emperors: capturing some resource made them more powerful and enabled them to capture more. Now it can be done like a scientist, by discovering or building something uniquely valuable. Most people who get rich use a mix of the old and the new ways, but in the most advanced economies the ratio has shifted dramatically toward discovery just in the last half century.
[12] It's not surprising that conventional-minded people would dislike inequality if independent-mindedness is one of the biggest drivers of it. But it's not simply that they don't want anyone to have what they can't. The conventional-minded literally can't imagine what it's like to have novel ideas. So the whole phenomenon of great variation in performance seems unnatural to them, and when they encounter it they assume it must be due to cheating or to some malign external influence.
Thanks to Trevor Blackwell, Patrick Collison, Tyler Cowen, Jessica Livingston, Harj Taggar, and Garry Tan for reading drafts of this.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 59 KiB

After

Width:  |  Height:  |  Size: 56 KiB

View File

@ -6,32 +6,33 @@ import time
class Pipeline:
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves for conversation turn limiting
target_user_roles: List[str] = ["user"]
max_turns: Optional[int] = None
def __init__(self):
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
# Assign a unique identifier to the pipeline.
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "conversation_turn_limit_filter_pipeline"
# self.id = "conversation_turn_limit_filter_pipeline"
self.name = "Conversation Turn Limit Filter"
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves for conversation turn limiting
target_user_roles: List[str] = ["user"]
max_turns: Optional[int] = None
self.valves = Valves(
self.valves = self.Valves(
**{
"pipelines": os.getenv("CONVERSATION_TURN_PIPELINES", "*").split(","),
"max_turns": 10,

View File

@ -16,31 +16,31 @@ import os
class Pipeline:
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
# e.g. ["llama3:latest", "gpt-3.5-turbo"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
def __init__(self):
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "detoxify_filter_pipeline"
# self.id = "detoxify_filter_pipeline"
self.name = "Detoxify Filter"
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
# e.g. ["llama3:latest", "gpt-3.5-turbo"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Initialize
self.valves = Valves(
self.valves = self.Valves(
**{
"pipelines": ["*"], # Connect to all pipelines
}

View File

@ -0,0 +1,100 @@
import os
import requests
from typing import Literal, List, Optional
from datetime import datetime
from blueprints.function_calling_blueprint import Pipeline as FunctionCallingBlueprint
class Pipeline(FunctionCallingBlueprint):
class Valves(FunctionCallingBlueprint.Valves):
# Add your custom parameters here
OPENWEATHERMAP_API_KEY: str = ""
pass
class Tools:
def __init__(self, pipeline) -> None:
self.pipeline = pipeline
def get_current_time(
self,
) -> str:
"""
Get the current time.
:return: The current time.
"""
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
return f"Current Time = {current_time}"
def get_current_weather(
self,
location: str,
unit: Literal["metric", "fahrenheit"] = "fahrenheit",
) -> str:
"""
Get the current weather for a location. If the location is not found, return an empty string.
:param location: The location to get the weather for.
:param unit: The unit to get the weather in. Default is fahrenheit.
:return: The current weather for the location.
"""
# https://openweathermap.org/api
if self.pipeline.valves.OPENWEATHERMAP_API_KEY == "":
return "OpenWeatherMap API Key not set, ask the user to set it up."
else:
units = "imperial" if unit == "fahrenheit" else "metric"
params = {
"q": location,
"appid": self.pipeline.valves.OPENWEATHERMAP_API_KEY,
"units": units,
}
response = requests.get(
"http://api.openweathermap.org/data/2.5/weather", params=params
)
response.raise_for_status() # Raises an HTTPError for bad responses
data = response.json()
weather_description = data["weather"][0]["description"]
temperature = data["main"]["temp"]
return f"{location}: {weather_description.capitalize()}, {temperature}°{unit.capitalize()[0]}"
def calculator(self, equation: str) -> str:
"""
Calculate the result of an equation.
:param equation: The equation to calculate.
"""
# Avoid using eval in production code
# https://nedbatchelder.com/blog/201206/eval_really_is_dangerous.html
try:
result = eval(equation)
return f"{equation} = {result}"
except Exception as e:
print(e)
return "Invalid equation"
def __init__(self):
super().__init__()
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "my_tools_pipeline"
self.name = "My Tools Pipeline"
self.valves = self.Valves(
**{
**self.valves.model_dump(),
"pipelines": ["*"], # Connect to all pipelines
"OPENWEATHERMAP_API_KEY": os.getenv("OPENWEATHERMAP_API_KEY", ""),
},
)
self.tools = self.Tools(self)

View File

@ -0,0 +1,135 @@
"""
title: Langfuse Filter Pipeline
author: open-webui
date: 2024-05-30
version: 1.1
license: MIT
description: A filter pipeline that uses Langfuse.
requirements: langfuse
"""
from typing import List, Optional
from schemas import OpenAIChatMessage
import os
from utils.pipelines.main import get_last_user_message, get_last_assistant_message
from pydantic import BaseModel
from langfuse import Langfuse
class Pipeline:
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
# e.g. ["llama3:latest", "gpt-3.5-turbo"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves
secret_key: str
public_key: str
host: str
def __init__(self):
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "langfuse_filter_pipeline"
self.name = "Langfuse Filter"
# Initialize
self.valves = self.Valves(
**{
"pipelines": ["*"], # Connect to all pipelines
"secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"),
"public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"),
"host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
}
)
self.langfuse = None
self.chat_generations = {}
pass
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
self.set_langfuse()
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
self.langfuse.flush()
pass
async def on_valves_updated(self):
# This function is called when the valves are updated.
self.set_langfuse()
pass
def set_langfuse(self):
self.langfuse = Langfuse(
secret_key=self.valves.secret_key,
public_key=self.valves.public_key,
host=self.valves.host,
debug=False,
)
self.langfuse.auth_check()
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"inlet:{__name__}")
trace = self.langfuse.trace(
name=f"filter:{__name__}",
input=body,
user_id=user["id"],
metadata={"name": user["name"]},
session_id=body["chat_id"],
)
generation = trace.generation(
name=body["chat_id"],
model=body["model"],
input=body["messages"],
metadata={"interface": "open-webui"},
)
self.chat_generations[body["chat_id"]] = generation
print(trace.get_trace_url())
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"outlet:{__name__}")
if body["chat_id"] not in self.chat_generations:
return body
generation = self.chat_generations[body["chat_id"]]
user_message = get_last_user_message(body["messages"])
generated_message = get_last_assistant_message(body["messages"])
# Update usage cost based on the length of the input and output messages
# Below does not reflect the actual cost of the API
# You can adjust the cost based on your requirements
generation.end(
output=generated_message,
usage={
"totalCost": (len(user_message) + len(generated_message)) / 1000,
"unit": "CHARACTERS",
},
metadata={"interface": "open-webui"},
)
return body

View File

@ -4,48 +4,49 @@ from pydantic import BaseModel
import requests
import os
from utils.main import get_last_user_message, get_last_assistant_message
from utils.pipelines.main import get_last_user_message, get_last_assistant_message
class Pipeline:
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
# e.g. ["llama3:latest", "gpt-3.5-turbo"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves
libretranslate_url: str
# Source and target languages
# User message will be translated from source_user to target_user
source_user: Optional[str] = "auto"
target_user: Optional[str] = "en"
# Assistant languages
# Assistant message will be translated from source_assistant to target_assistant
source_assistant: Optional[str] = "en"
target_assistant: Optional[str] = "es"
def __init__(self):
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "libretranslate_filter_pipeline"
# self.id = "libretranslate_filter_pipeline"
self.name = "LibreTranslate Filter"
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
# e.g. ["llama3:latest", "gpt-3.5-turbo"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves
libretranslate_url: str
# Source and target languages
# User message will be translated from source_user to target_user
source_user: Optional[str] = "auto"
target_user: Optional[str] = "en"
# Assistant languages
# Assistant message will be translated from source_assistant to target_assistant
source_assistant: Optional[str] = "en"
target_assistant: Optional[str] = "es"
# Initialize
self.valves = Valves(
self.valves = self.Valves(
**{
"pipelines": ["*"], # Connect to all pipelines
"libretranslate_url": os.getenv(

View File

@ -4,48 +4,52 @@ from pydantic import BaseModel
from schemas import OpenAIChatMessage
import time
class Pipeline:
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves for rate limiting
requests_per_minute: Optional[int] = None
requests_per_hour: Optional[int] = None
sliding_window_limit: Optional[int] = None
sliding_window_minutes: Optional[int] = None
def __init__(self):
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
# Assign a unique identifier to the pipeline.
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "rate_limit_filter_pipeline"
# self.id = "rate_limit_filter_pipeline"
self.name = "Rate Limit Filter"
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves for rate limiting
requests_per_minute: Optional[int] = None
requests_per_hour: Optional[int] = None
sliding_window_limit: Optional[int] = None
sliding_window_minutes: Optional[int] = None
# Initialize rate limits
pipelines = os.getenv("RATE_LIMIT_PIPELINES", "*").split(",")
requests_per_minute = int(os.getenv("RATE_LIMIT_REQUESTS_PER_MINUTE", 10))
requests_per_hour = int(os.getenv("RATE_LIMIT_REQUESTS_PER_HOUR", 1000))
sliding_window_limit = int(os.getenv("RATE_LIMIT_SLIDING_WINDOW_LIMIT", 100))
sliding_window_minutes = int(os.getenv("RATE_LIMIT_SLIDING_WINDOW_MINUTES", 15))
self.valves = Valves(
self.valves = self.Valves(
**{
"pipelines": pipelines,
"requests_per_minute": requests_per_minute,
"requests_per_hour": requests_per_hour,
"sliding_window_limit": sliding_window_limit,
"sliding_window_minutes": sliding_window_minutes,
"pipelines": os.getenv("RATE_LIMIT_PIPELINES", "*").split(","),
"requests_per_minute": int(
os.getenv("RATE_LIMIT_REQUESTS_PER_MINUTE", 10)
),
"requests_per_hour": int(
os.getenv("RATE_LIMIT_REQUESTS_PER_HOUR", 1000)
),
"sliding_window_limit": int(
os.getenv("RATE_LIMIT_SLIDING_WINDOW_LIMIT", 100)
),
"sliding_window_minutes": int(
os.getenv("RATE_LIMIT_SLIDING_WINDOW_MINUTES", 15)
),
}
)

View File

@ -1,102 +0,0 @@
"""
title: Langfuse Filter Pipeline
author: open-webui
date: 2024-05-30
version: 1.0
license: MIT
description: A filter pipeline that uses Langfuse.
requirements: langfuse
"""
from typing import List, Optional
from schemas import OpenAIChatMessage
import os
from pydantic import BaseModel
from langfuse import Langfuse
class Pipeline:
def __init__(self):
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "langfuse_filter_pipeline"
self.name = "Langfuse Filter"
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
# e.g. ["llama3:latest", "gpt-3.5-turbo"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves
secret_key: str
public_key: str
host: str
# Initialize
self.valves = Valves(
**{
"pipelines": ["*"], # Connect to all pipelines
"secret_key": os.getenv("LANGFUSE_SECRET_KEY"),
"public_key": os.getenv("LANGFUSE_PUBLIC_KEY"),
"host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
}
)
self.langfuse = None
pass
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
self.set_langfuse()
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
self.langfuse.flush()
pass
async def on_valves_updated(self):
# This function is called when the valves are updated.
self.set_langfuse()
pass
def set_langfuse(self):
self.langfuse = Langfuse(
secret_key=self.valves.secret_key,
public_key=self.valves.public_key,
host=self.valves.host,
debug=False,
)
self.langfuse.auth_check()
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"inlet:{__name__}")
trace = self.langfuse.trace(
name=f"filter:{__name__}",
input=body,
user_id=user["id"],
metadata={"name": user["name"]},
session_id=body["chat_id"]
)
print(trace.get_trace_url())
return body

View File

@ -9,11 +9,10 @@ from subprocess import call
class Pipeline:
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "applescript_pipeline"
# self.id = "applescript_pipeline"
self.name = "AppleScript Pipeline"
pass

View File

@ -6,7 +6,10 @@ import subprocess
class Pipeline:
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
self.id = "python_code_pipeline"
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "python_code_pipeline"
self.name = "Python Code Pipeline"
pass

View File

@ -6,18 +6,19 @@ import os
class Pipeline:
class Valves(BaseModel):
pass
def __init__(self):
# Assign a unique identifier to the pipeline.
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "wiki_pipeline"
# self.id = "wiki_pipeline"
self.name = "Wikipedia Pipeline"
class Valves(BaseModel):
pass
# Initialize rate limits
self.valves = Valves(**{"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "")})
self.valves = self.Valves(**{"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "")})
async def on_startup(self):
# This function is called when the server is started.

View File

@ -19,15 +19,17 @@ import requests
class Pipeline:
class Valves(BaseModel):
ANTHROPIC_API_KEY: str = ""
def __init__(self):
self.type = "manifold"
self.id = "anthropic"
self.name = "anthropic/"
class Valves(BaseModel):
ANTHROPIC_API_KEY: str
self.valves = Valves(**{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY")})
self.valves = self.Valves(
**{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "your-api-key-here")}
)
self.client = Anthropic(api_key=self.valves.ANTHROPIC_API_KEY)
def get_anthropic_models(self):
@ -61,6 +63,13 @@ class Pipeline:
self, user_message: str, model_id: str, messages: List[dict], body: dict
) -> Union[str, Generator, Iterator]:
try:
if "user" in body:
del body["user"]
if "chat_id" in body:
del body["chat_id"]
if "title" in body:
del body["title"]
if body.get("stream", False):
return self.stream_response(model_id, messages, body)
else:

View File

@ -1,16 +1,27 @@
from typing import List, Union, Generator, Iterator
from schemas import OpenAIChatMessage
from pydantic import BaseModel
import requests
class Pipeline:
class Valves(BaseModel):
# You can add your custom valves here.
AZURE_OPENAI_API_KEY: str = "your-azure-openai-api-key-here"
AZURE_OPENAI_ENDPOINT: str = "your-azure-openai-endpoint-here"
DEPLOYMENT_NAME: str = "your-deployment-name-here"
API_VERSION: str = "2023-10-01-preview"
MODEL: str = "gpt-3.5-turbo"
pass
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "azure_openai_pipeline"
# self.id = "azure_openai_pipeline"
self.name = "Azure OpenAI Pipeline"
self.valves = self.Valves()
pass
async def on_startup(self):
@ -32,25 +43,22 @@ class Pipeline:
print(messages)
print(user_message)
AZURE_OPENAI_API_KEY = "your-azure-openai-api-key-here"
AZURE_OPENAI_ENDPOINT = "your-azure-openai-endpoint-here"
DEPLOYMENT_NAME = "your-deployment-name-here"
MODEL = "gpt-3.5-turbo"
headers = {
"api-key": self.valves.AZURE_OPENAI_API_KEY,
"Content-Type": "application/json",
}
headers = {"api-key": AZURE_OPENAI_API_KEY, "Content-Type": "application/json"}
url = f"{AZURE_OPENAI_ENDPOINT}/openai/deployments/{DEPLOYMENT_NAME}/chat/completions?api-version=2023-10-01-preview"
url = f"{self.valves.AZURE_OPENAI_ENDPOINT}/openai/deployments/{self.valves.DEPLOYMENT_NAME}/chat/completions?api-version={self.valves.API_VERSION}"
try:
r = requests.post(
url=url,
json={**body, "model": MODEL},
json={**body, "model": self.valves.MODEL},
headers=headers,
stream=True,
)
r.raise_for_status()
if body["stream"]:
return r.iter_lines()
else:

View File

@ -18,16 +18,25 @@ import requests
class Pipeline:
class Valves(BaseModel):
COHERE_API_BASE_URL: str = "https://api.cohere.com/v1"
COHERE_API_KEY: str = ""
def __init__(self):
self.type = "manifold"
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "cohere"
self.name = "cohere/"
class Valves(BaseModel):
COHERE_API_BASE_URL: str = "https://api.cohere.com/v1"
COHERE_API_KEY: str
self.valves = Valves(**{"COHERE_API_KEY": os.getenv("COHERE_API_KEY")})
self.valves = self.Valves(
**{"COHERE_API_KEY": os.getenv("COHERE_API_KEY", "your-api-key-here")}
)
self.pipelines = self.get_cohere_models()

View File

@ -0,0 +1,122 @@
from typing import List, Union, Generator, Iterator
from schemas import OpenAIChatMessage
from pydantic import BaseModel
import os
import requests
class Pipeline:
class Valves(BaseModel):
GROQ_API_BASE_URL: str = "https://api.groq.com/openai/v1"
GROQ_API_KEY: str = ""
pass
def __init__(self):
self.type = "manifold"
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "groq"
self.name = "Groq: "
self.valves = self.Valves(
**{
"GROQ_API_KEY": os.getenv(
"GROQ_API_KEY", "your-groq-api-key-here"
)
}
)
self.pipelines = self.get_models()
pass
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
pass
async def on_valves_updated(self):
# This function is called when the valves are updated.
print(f"on_valves_updated:{__name__}")
self.pipelines = self.get_models()
pass
def get_models(self):
if self.valves.GROQ_API_KEY:
try:
headers = {}
headers["Authorization"] = f"Bearer {self.valves.GROQ_API_KEY}"
headers["Content-Type"] = "application/json"
r = requests.get(
f"{self.valves.GROQ_API_BASE_URL}/models", headers=headers
)
models = r.json()
return [
{
"id": model["id"],
"name": model["name"] if "name" in model else model["id"],
}
for model in models["data"]
]
except Exception as e:
print(f"Error: {e}")
return [
{
"id": "error",
"name": "Could not fetch models from Groq, please update the API Key in the valves.",
},
]
else:
return []
def pipe(
self, user_message: str, model_id: str, messages: List[dict], body: dict
) -> Union[str, Generator, Iterator]:
# This is where you can add your custom pipelines like RAG.
print(f"pipe:{__name__}")
print(messages)
print(user_message)
headers = {}
headers["Authorization"] = f"Bearer {self.valves.GROQ_API_KEY}"
headers["Content-Type"] = "application/json"
payload = {**body, "model": model_id}
if "user" in payload:
del payload["user"]
if "chat_id" in payload:
del payload["chat_id"]
if "title" in payload:
del payload["title"]
print(payload)
try:
r = requests.post(
url=f"{self.valves.GROQ_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
stream=True,
)
r.raise_for_status()
if body["stream"]:
return r.iter_lines()
else:
return r.json()
except Exception as e:
return f"Error: {e}"

View File

@ -11,9 +11,16 @@ from typing import List, Union, Generator, Iterator
from schemas import OpenAIChatMessage
from pydantic import BaseModel
import requests
import os
class Pipeline:
class Valves(BaseModel):
LITELLM_BASE_URL: str = ""
LITELLM_API_KEY: str = ""
LITELLM_PIPELINE_DEBUG: bool = False
def __init__(self):
# You can also set the pipelines that are available in this pipeline.
# Set manifold to True if you want to use this pipeline as a manifold.
@ -21,19 +28,24 @@ class Pipeline:
self.type = "manifold"
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "litellm_manifold"
# self.id = "litellm_manifold"
# Optionally, you can set the name of the manifold pipeline.
self.name = "LiteLLM: "
class Valves(BaseModel):
LITELLM_BASE_URL: str
# Initialize rate limits
self.valves = Valves(**{"LITELLM_BASE_URL": "http://localhost:4001"})
self.valves = self.Valves(
**{
"LITELLM_BASE_URL": os.getenv(
"LITELLM_BASE_URL", "http://localhost:4001"
),
"LITELLM_API_KEY": os.getenv("LITELLM_API_KEY", "your-api-key-here"),
"LITELLM_PIPELINE_DEBUG": os.getenv("LITELLM_PIPELINE_DEBUG", False),
}
)
self.pipelines = []
pass
@ -54,9 +66,16 @@ class Pipeline:
pass
def get_litellm_models(self):
headers = {}
if self.valves.LITELLM_API_KEY:
headers["Authorization"] = f"Bearer {self.valves.LITELLM_API_KEY}"
if self.valves.LITELLM_BASE_URL:
try:
r = requests.get(f"{self.valves.LITELLM_BASE_URL}/v1/models")
r = requests.get(
f"{self.valves.LITELLM_BASE_URL}/v1/models", headers=headers
)
models = r.json()
return [
{
@ -69,7 +88,7 @@ class Pipeline:
print(f"Error: {e}")
return [
{
"id": self.id,
"id": "error",
"name": "Could not fetch models from LiteLLM, please update the URL in the valves.",
},
]
@ -85,10 +104,20 @@ class Pipeline:
print(f"# Message: {user_message}")
print("######################################")
headers = {}
if self.valves.LITELLM_API_KEY:
headers["Authorization"] = f"Bearer {self.valves.LITELLM_API_KEY}"
try:
payload = {**body, "model": model_id, "user": body["user"]["id"]}
payload.pop("chat_id", None)
payload.pop("user", None)
payload.pop("title", None)
r = requests.post(
url=f"{self.valves.LITELLM_BASE_URL}/v1/chat/completions",
json={**body, "model": model_id, "user_id": body["user"]["id"]},
json=payload,
headers=headers,
stream=True,
)

View File

@ -21,6 +21,12 @@ import yaml
class Pipeline:
class Valves(BaseModel):
LITELLM_CONFIG_DIR: str = "./litellm/config.yaml"
LITELLM_PROXY_PORT: int = 4001
LITELLM_PROXY_HOST: str = "127.0.0.1"
litellm_config: dict = {}
def __init__(self):
# You can also set the pipelines that are available in this pipeline.
# Set manifold to True if you want to use this pipeline as a manifold.
@ -28,22 +34,16 @@ class Pipeline:
self.type = "manifold"
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "litellm_subprocess_manifold"
# self.id = "litellm_subprocess_manifold"
# Optionally, you can set the name of the manifold pipeline.
self.name = "LiteLLM: "
class Valves(BaseModel):
LITELLM_CONFIG_DIR: str = "./litellm/config.yaml"
LITELLM_PROXY_PORT: int = 4001
LITELLM_PROXY_HOST: str = "127.0.0.1"
litellm_config: dict = {}
# Initialize Valves
self.valves = Valves(**{"LITELLM_CONFIG_DIR": f"./litellm/config.yaml"})
self.valves = self.Valves(**{"LITELLM_CONFIG_DIR": f"./litellm/config.yaml"})
self.background_process = None
pass
@ -173,7 +173,7 @@ class Pipeline:
print(f"Error: {e}")
return [
{
"id": self.id,
"id": "error",
"name": "Could not fetch models from LiteLLM, please update the URL in the valves.",
},
]
@ -197,7 +197,7 @@ class Pipeline:
try:
r = requests.post(
url=f"http://{self.valves.LITELLM_PROXY_HOST}:{self.valves.LITELLM_PROXY_PORT}/v1/chat/completions",
json={**body, "model": model_id, "user_id": body["user"]["id"]},
json={**body, "model": model_id, "user": body["user"]["id"]},
stream=True,
)

View File

@ -15,10 +15,10 @@ from schemas import OpenAIChatMessage
class Pipeline:
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "llama_cpp_pipeline"
# self.id = "llama_cpp_pipeline"
self.name = "Llama C++ Pipeline"
self.llm = None

View File

@ -21,10 +21,10 @@ from huggingface_hub import login
class Pipeline:
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "mlx_pipeline"
# self.id = "mlx_pipeline"
self.name = "MLX Pipeline"
self.host = os.getenv("MLX_HOST", "localhost")
self.port = os.getenv("MLX_PORT", "8080")

View File

@ -5,6 +5,10 @@ import requests
class Pipeline:
class Valves(BaseModel):
OLLAMA_BASE_URL: str
def __init__(self):
# You can also set the pipelines that are available in this pipeline.
# Set manifold to True if you want to use this pipeline as a manifold.
@ -12,18 +16,15 @@ class Pipeline:
self.type = "manifold"
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "ollama_manifold"
# self.id = "ollama_manifold"
# Optionally, you can set the name of the manifold pipeline.
self.name = "Ollama: "
class Valves(BaseModel):
OLLAMA_BASE_URL: str
self.valves = Valves(**{"OLLAMA_BASE_URL": "http://localhost:11435"})
self.valves = self.Valves(**{"OLLAMA_BASE_URL": "http://localhost:11435"})
self.pipelines = []
pass
@ -57,7 +58,7 @@ class Pipeline:
print(f"Error: {e}")
return [
{
"id": self.id,
"id": "error",
"name": "Could not fetch models from Ollama, please update the URL in the valves.",
},
]

View File

@ -6,10 +6,10 @@ import requests
class Pipeline:
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "ollama_pipeline"
# self.id = "ollama_pipeline"
self.name = "Ollama Pipeline"
pass

View File

@ -0,0 +1,123 @@
from typing import List, Union, Generator, Iterator
from schemas import OpenAIChatMessage
from pydantic import BaseModel
import os
import requests
class Pipeline:
class Valves(BaseModel):
OPENAI_API_BASE_URL: str = "https://api.openai.com/v1"
OPENAI_API_KEY: str = ""
pass
def __init__(self):
self.type = "manifold"
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "openai_pipeline"
self.name = "OpenAI: "
self.valves = self.Valves(
**{
"OPENAI_API_KEY": os.getenv(
"OPENAI_API_KEY", "your-openai-api-key-here"
)
}
)
self.pipelines = self.get_openai_models()
pass
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
pass
async def on_valves_updated(self):
# This function is called when the valves are updated.
print(f"on_valves_updated:{__name__}")
self.pipelines = self.get_openai_models()
pass
def get_openai_models(self):
if self.valves.OPENAI_API_KEY:
try:
headers = {}
headers["Authorization"] = f"Bearer {self.valves.OPENAI_API_KEY}"
headers["Content-Type"] = "application/json"
r = requests.get(
f"{self.valves.OPENAI_API_BASE_URL}/models", headers=headers
)
models = r.json()
return [
{
"id": model["id"],
"name": model["name"] if "name" in model else model["id"],
}
for model in models["data"]
if "gpt" in model["id"]
]
except Exception as e:
print(f"Error: {e}")
return [
{
"id": "error",
"name": "Could not fetch models from OpenAI, please update the API Key in the valves.",
},
]
else:
return []
def pipe(
self, user_message: str, model_id: str, messages: List[dict], body: dict
) -> Union[str, Generator, Iterator]:
# This is where you can add your custom pipelines like RAG.
print(f"pipe:{__name__}")
print(messages)
print(user_message)
headers = {}
headers["Authorization"] = f"Bearer {self.valves.OPENAI_API_KEY}"
headers["Content-Type"] = "application/json"
payload = {**body, "model": model_id}
if "user" in payload:
del payload["user"]
if "chat_id" in payload:
del payload["chat_id"]
if "title" in payload:
del payload["title"]
print(payload)
try:
r = requests.post(
url=f"{self.valves.OPENAI_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
stream=True,
)
r.raise_for_status()
if body["stream"]:
return r.iter_lines()
else:
return r.json()
except Exception as e:
return f"Error: {e}"

View File

@ -1,16 +1,29 @@
from typing import List, Union, Generator, Iterator
from schemas import OpenAIChatMessage
from pydantic import BaseModel
import os
import requests
class Pipeline:
class Valves(BaseModel):
OPENAI_API_KEY: str = ""
pass
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "openai_pipeline"
# self.id = "openai_pipeline"
self.name = "OpenAI Pipeline"
self.valves = self.Valves(
**{
"OPENAI_API_KEY": os.getenv(
"OPENAI_API_KEY", "your-openai-api-key-here"
)
}
)
pass
async def on_startup(self):
@ -39,10 +52,21 @@ class Pipeline:
headers["Authorization"] = f"Bearer {OPENAI_API_KEY}"
headers["Content-Type"] = "application/json"
payload = {**body, "model": MODEL}
if "user" in payload:
del payload["user"]
if "chat_id" in payload:
del payload["chat_id"]
if "title" in payload:
del payload["title"]
print(payload)
try:
r = requests.post(
url="https://api.openai.com/v1/chat/completions",
json={**body, "model": MODEL},
json=payload,
headers=headers,
stream=True,
)

View File

@ -1,16 +1,21 @@
from typing import List, Union, Generator, Iterator
from schemas import OpenAIChatMessage
from pydantic import BaseModel
class Pipeline:
class Valves(BaseModel):
pass
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "pipeline_example"
self.name = "Pipeline Example"
# self.id = "pipeline_example"
# The name of the pipeline.
self.name = "Pipeline Example"
pass
async def on_startup(self):
@ -51,6 +56,10 @@ class Pipeline:
# This is where you can add your custom pipelines like RAG.
print(f"pipe:{__name__}")
# If you'd like to check for title generation, you can add the following check
if body.get("title", False):
print("Title Generation Request")
print(messages)
print(user_message)
print(body)

View File

@ -14,32 +14,33 @@ from schemas import OpenAIChatMessage
class Pipeline:
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Add your custom parameters here
pass
def __init__(self):
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "filter_pipeline"
# self.id = "filter_pipeline"
self.name = "Filter"
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Add your custom parameters here
pass
self.valves = Valves(**{"pipelines": ["llama3:latest"]})
self.valves = self.Valves(**{"pipelines": ["llama3:latest"]})
pass
@ -57,6 +58,10 @@ class Pipeline:
# This filter is applied to the form data before it is sent to the OpenAI API.
print(f"inlet:{__name__}")
# If you'd like to check for title generation, you can add the following check
if body.get("title", False):
print("Title Generation Request")
print(body)
print(user)

View File

@ -0,0 +1,33 @@
from blueprints.function_calling_blueprint import Pipeline as FunctionCallingBlueprint
class Pipeline(FunctionCallingBlueprint):
class Valves(FunctionCallingBlueprint.Valves):
# Add your custom valves here
pass
class Tools:
def __init__(self, pipeline) -> None:
self.pipeline = pipeline
# Add your custom tools using pure Python code here, make sure to add type hints
# Use Sphinx-style docstrings to document your tools, they will be used for generating tools specifications
# Please refer to function_calling_filter_pipeline.py for an example
pass
def __init__(self):
super().__init__()
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "my_tools_pipeline"
self.name = "My Tools Pipeline"
self.valves = self.Valves(
**{
**self.valves.model_dump(),
"pipelines": ["*"], # Connect to all pipelines
},
)
self.tools = self.Tools(self)

View File

@ -10,13 +10,16 @@ class Pipeline:
self.type = "manifold"
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "manifold_pipeline"
# self.id = "manifold_pipeline"
# Optionally, you can set the name of the manifold pipeline.
self.name = "Manifold: "
# Define pipelines that are available in this manifold pipeline.
# This is a list of dictionaries where each dictionary has an id and name.
self.pipelines = [
{
"id": "pipeline-1", # This will turn into `manifold_pipeline.pipeline-1`
@ -45,6 +48,10 @@ class Pipeline:
# This is where you can add your custom pipelines like RAG.
print(f"pipe:{__name__}")
# If you'd like to check for title generation, you can add the following check
if body.get("title", False):
print("Title Generation Request")
print(messages)
print(user_message)
print(body)

81
main.py
View File

@ -8,9 +8,9 @@ from pydantic import BaseModel, ConfigDict
from typing import List, Union, Generator, Iterator
from utils.auth import bearer_security, get_current_user
from utils.main import get_last_user_message, stream_message_template
from utils.misc import convert_to_raw_url
from utils.pipelines.auth import bearer_security, get_current_user
from utils.pipelines.main import get_last_user_message, stream_message_template
from utils.pipelines.misc import convert_to_raw_url
from contextlib import asynccontextmanager
from concurrent.futures import ThreadPoolExecutor
@ -108,11 +108,25 @@ def get_all_pipelines():
async def load_module_from_path(module_name, module_path):
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
print(f"Loaded module: {module.__name__}")
if hasattr(module, "Pipeline"):
return module.Pipeline()
try:
spec.loader.exec_module(module)
print(f"Loaded module: {module.__name__}")
if hasattr(module, "Pipeline"):
return module.Pipeline()
else:
raise Exception("No Pipeline class found")
except Exception as e:
print(f"Error loading module: {module_name}")
# Move the file to the error folder
failed_pipelines_folder = os.path.join(PIPELINES_DIR, "failed")
if not os.path.exists(failed_pipelines_folder):
os.makedirs(failed_pipelines_folder)
failed_file_path = os.path.join(failed_pipelines_folder, f"{module_name}.py")
os.rename(module_path, failed_file_path)
print(e)
return None
@ -124,8 +138,38 @@ async def load_modules_from_directory(directory):
if filename.endswith(".py"):
module_name = filename[:-3] # Remove the .py extension
module_path = os.path.join(directory, filename)
# Create subfolder matching the filename without the .py extension
subfolder_path = os.path.join(directory, module_name)
if not os.path.exists(subfolder_path):
os.makedirs(subfolder_path)
logging.info(f"Created subfolder: {subfolder_path}")
# Create a valves.json file if it doesn't exist
valves_json_path = os.path.join(subfolder_path, "valves.json")
if not os.path.exists(valves_json_path):
with open(valves_json_path, "w") as f:
json.dump({}, f)
logging.info(f"Created valves.json in: {subfolder_path}")
pipeline = await load_module_from_path(module_name, module_path)
if pipeline:
# Overwrite pipeline.valves with values from valves.json
if os.path.exists(valves_json_path):
with open(valves_json_path, "r") as f:
valves_json = json.load(f)
if hasattr(pipeline, "valves"):
ValvesModel = pipeline.valves.__class__
# Create a ValvesModel instance using default values and overwrite with valves_json
combined_valves = {
**pipeline.valves.model_dump(),
**valves_json,
}
valves = ValvesModel(**combined_valves)
pipeline.valves = valves
logging.info(f"Updated valves for module: {module_name}")
pipeline_id = pipeline.id if hasattr(pipeline, "id") else module_name
PIPELINE_MODULES[pipeline_id] = pipeline
PIPELINE_NAMES[pipeline_id] = module_name
@ -203,7 +247,6 @@ async def get_models():
Returns the available pipelines
"""
app.state.PIPELINES = get_all_pipelines()
return {
"data": [
{
@ -442,6 +485,14 @@ async def update_valves(pipeline_id: str, form_data: dict):
valves = ValvesModel(**form_data)
pipeline.valves = valves
# Determine the directory path for the valves.json file
subfolder_path = os.path.join(PIPELINES_DIR, PIPELINE_NAMES[pipeline_id])
valves_json_path = os.path.join(subfolder_path, "valves.json")
# Save the updated valves data back to the valves.json file
with open(valves_json_path, "w") as f:
json.dump(valves.model_dump(), f)
if hasattr(pipeline, "on_valves_updated"):
await pipeline.on_valves_updated()
except Exception as e:
@ -463,6 +514,13 @@ async def filter_inlet(pipeline_id: str, form_data: FilterForm):
detail=f"Filter {pipeline_id} not found",
)
try:
pipeline = app.state.PIPELINES[form_data.body["model"]]
if pipeline["type"] == "manifold":
pipeline_id = pipeline_id.split(".")[0]
except:
pass
pipeline = PIPELINE_MODULES[pipeline_id]
try:
@ -488,6 +546,13 @@ async def filter_outlet(pipeline_id: str, form_data: FilterForm):
detail=f"Filter {pipeline_id} not found",
)
try:
pipeline = app.state.PIPELINES[form_data.body["model"]]
if pipeline["type"] == "manifold":
pipeline_id = pipeline_id.split(".")[0]
except:
pass
pipeline = PIPELINE_MODULES[pipeline_id]
try:

View File

@ -43,6 +43,7 @@ sentence-transformers
transformers
tokenizers
nltk
tiktoken
# Image processing
Pillow

View File

@ -1,6 +1,33 @@
#!/usr/bin/env bash
PORT="${PORT:-9099}"
HOST="${HOST:-0.0.0.0}"
# Default value for PIPELINES_DIR
PIPELINES_DIR=${PIPELINES_DIR:-./pipelines}
# Function to reset pipelines
reset_pipelines_dir() {
if [ "$RESET_PIPELINES_DIR" = true ]; then
echo "Resetting pipelines directory: $PIPELINES_DIR"
# Check if the directory exists
if [ -d "$PIPELINES_DIR" ]; then
# Remove all contents of the directory
rm -rf "${PIPELINES_DIR:?}"/*
echo "All contents in $PIPELINES_DIR have been removed."
# Optionally recreate the directory if needed
mkdir -p "$PIPELINES_DIR"
echo "$PIPELINES_DIR has been recreated."
else
echo "Directory $PIPELINES_DIR does not exist. No action taken."
fi
else
echo "RESET_PIPELINES_DIR is not set to true. No action taken."
fi
}
# Example usage of the function
reset_pipelines_dir
# Function to install requirements if requirements.txt is provided
install_requirements() {

View File

@ -80,12 +80,11 @@ def doc_to_dict(docstring):
return ret_dict
def get_function_specs(functions) -> List[dict]:
def get_tools_specs(tools) -> List[dict]:
function_list = [
{"name": func, "function": getattr(functions, func)}
for func in dir(functions)
if callable(getattr(functions, func)) and not func.startswith("__")
{"name": func, "function": getattr(tools, func)}
for func in dir(tools)
if callable(getattr(tools, func)) and not func.startswith("__")
]
specs = []