open-webui/backend/utils/misc.py
Timothy J. Baek f2b9a5f5bf refac: rag
2024-06-09 03:01:25 -07:00

256 lines
7.5 KiB
Python

from pathlib import Path
import hashlib
import json
import re
from datetime import timedelta
from typing import Optional, List
def get_last_user_message(messages: List[dict]) -> str:
for message in reversed(messages):
if message["role"] == "user":
if isinstance(message["content"], list):
for item in message["content"]:
if item["type"] == "text":
return item["text"]
return message["content"]
return None
def get_last_assistant_message(messages: List[dict]) -> str:
for message in reversed(messages):
if message["role"] == "assistant":
if isinstance(message["content"], list):
for item in message["content"]:
if item["type"] == "text":
return item["text"]
return message["content"]
return None
def add_or_update_system_message(content: str, messages: List[dict]):
"""
Adds a new system message at the beginning of the messages list
or updates the existing system message at the beginning.
:param msg: The message to be added or appended.
:param messages: The list of message dictionaries.
:return: The updated list of message dictionaries.
"""
if messages and messages[0].get("role") == "system":
messages[0]["content"] += f"{content}\n{messages[0]['content']}"
else:
# Insert at the beginning
messages.insert(0, {"role": "system", "content": content})
return messages
def get_gravatar_url(email):
# Trim leading and trailing whitespace from
# an email address and force all characters
# to lower case
address = str(email).strip().lower()
# Create a SHA256 hash of the final string
hash_object = hashlib.sha256(address.encode())
hash_hex = hash_object.hexdigest()
# Grab the actual image URL
return f"https://www.gravatar.com/avatar/{hash_hex}?d=mp"
def calculate_sha256(file):
sha256 = hashlib.sha256()
# Read the file in chunks to efficiently handle large files
for chunk in iter(lambda: file.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
def calculate_sha256_string(string):
# Create a new SHA-256 hash object
sha256_hash = hashlib.sha256()
# Update the hash object with the bytes of the input string
sha256_hash.update(string.encode("utf-8"))
# Get the hexadecimal representation of the hash
hashed_string = sha256_hash.hexdigest()
return hashed_string
def validate_email_format(email: str) -> bool:
if email.endswith("@localhost"):
return True
return bool(re.match(r"[^@]+@[^@]+\.[^@]+", email))
def sanitize_filename(file_name):
# Convert to lowercase
lower_case_file_name = file_name.lower()
# Remove special characters using regular expression
sanitized_file_name = re.sub(r"[^\w\s]", "", lower_case_file_name)
# Replace spaces with dashes
final_file_name = re.sub(r"\s+", "-", sanitized_file_name)
return final_file_name
def extract_folders_after_data_docs(path):
# Convert the path to a Path object if it's not already
path = Path(path)
# Extract parts of the path
parts = path.parts
# Find the index of '/data/docs' in the path
try:
index_data_docs = parts.index("data") + 1
index_docs = parts.index("docs", index_data_docs) + 1
except ValueError:
return []
# Exclude the filename and accumulate folder names
tags = []
folders = parts[index_docs:-1]
for idx, part in enumerate(folders):
tags.append("/".join(folders[: idx + 1]))
return tags
def parse_duration(duration: str) -> Optional[timedelta]:
if duration == "-1" or duration == "0":
return None
# Regular expression to find number and unit pairs
pattern = r"(-?\d+(\.\d+)?)(ms|s|m|h|d|w)"
matches = re.findall(pattern, duration)
if not matches:
raise ValueError("Invalid duration string")
total_duration = timedelta()
for number, _, unit in matches:
number = float(number)
if unit == "ms":
total_duration += timedelta(milliseconds=number)
elif unit == "s":
total_duration += timedelta(seconds=number)
elif unit == "m":
total_duration += timedelta(minutes=number)
elif unit == "h":
total_duration += timedelta(hours=number)
elif unit == "d":
total_duration += timedelta(days=number)
elif unit == "w":
total_duration += timedelta(weeks=number)
return total_duration
def parse_ollama_modelfile(model_text):
parameters_meta = {
"mirostat": int,
"mirostat_eta": float,
"mirostat_tau": float,
"num_ctx": int,
"repeat_last_n": int,
"repeat_penalty": float,
"temperature": float,
"seed": int,
"tfs_z": float,
"num_predict": int,
"top_k": int,
"top_p": float,
"num_keep": int,
"typical_p": float,
"presence_penalty": float,
"frequency_penalty": float,
"penalize_newline": bool,
"numa": bool,
"num_batch": int,
"num_gpu": int,
"main_gpu": int,
"low_vram": bool,
"f16_kv": bool,
"vocab_only": bool,
"use_mmap": bool,
"use_mlock": bool,
"num_thread": int,
}
data = {"base_model_id": None, "params": {}}
# Parse base model
base_model_match = re.search(
r"^FROM\s+(\w+)", model_text, re.MULTILINE | re.IGNORECASE
)
if base_model_match:
data["base_model_id"] = base_model_match.group(1)
# Parse template
template_match = re.search(
r'TEMPLATE\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
)
if template_match:
data["params"] = {"template": template_match.group(1).strip()}
# Parse stops
stops = re.findall(r'PARAMETER stop "(.*?)"', model_text, re.IGNORECASE)
if stops:
data["params"]["stop"] = stops
# Parse other parameters from the provided list
for param, param_type in parameters_meta.items():
param_match = re.search(rf"PARAMETER {param} (.+)", model_text, re.IGNORECASE)
if param_match:
value = param_match.group(1)
try:
if param_type == int:
value = int(value)
elif param_type == float:
value = float(value)
elif param_type == bool:
value = value.lower() == "true"
except Exception as e:
print(e)
continue
data["params"][param] = value
# Parse adapter
adapter_match = re.search(r"ADAPTER (.+)", model_text, re.IGNORECASE)
if adapter_match:
data["params"]["adapter"] = adapter_match.group(1)
# Parse system description
system_desc_match = re.search(
r'SYSTEM\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE
)
system_desc_match_single = re.search(
r"SYSTEM\s+([^\n]+)", model_text, re.IGNORECASE
)
if system_desc_match:
data["params"]["system"] = system_desc_match.group(1).strip()
elif system_desc_match_single:
data["params"]["system"] = system_desc_match_single.group(1).strip()
# Parse messages
messages = []
message_matches = re.findall(r"MESSAGE (\w+) (.+)", model_text, re.IGNORECASE)
for role, content in message_matches:
messages.append({"role": role, "content": content})
if messages:
data["params"]["messages"] = messages
return data