From cae961c2d4773587a89615e92c367029ad1d4dc2 Mon Sep 17 00:00:00 2001 From: Justin Hayes Date: Wed, 22 May 2024 15:10:51 -0400 Subject: [PATCH 1/4] Refac --- pipelines/examples/mlx_pipeline.py | 58 +++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/pipelines/examples/mlx_pipeline.py b/pipelines/examples/mlx_pipeline.py index 71faa4f..c8dcd4a 100644 --- a/pipelines/examples/mlx_pipeline.py +++ b/pipelines/examples/mlx_pipeline.py @@ -5,15 +5,17 @@ date: 2024-05-22 version: 1.0 license: MIT description: A pipeline for running the mlx-lm server with a specified model. -dependencies: requests, mlx-lm -environment_variables: MLX_MODEL +dependencies: requests, mlx-lm, huggingface_hub +environment_variables: MLX_MODEL, MLX_STOP, HUGGINGFACE_TOKEN """ from typing import List, Union, Generator, Iterator -import requests import subprocess import os import socket +import time +import requests +from huggingface_hub import login from schemas import OpenAIChatMessage @@ -23,21 +25,30 @@ class Pipeline: self.id = "mlx_pipeline" self.name = "MLX Pipeline" self.process = None - self.model = os.getenv( - "MLX_MODEL", "mistralai/Mistral-7B-Instruct-v0.2" - ) # Default model if not set in environment variable + self.model = os.getenv('MLX_MODEL', 'mistralai/Mistral-7B-Instruct-v0.2') # Default model if not set in environment variable self.port = self.find_free_port() - self.stop_sequences = os.getenv( - "MLX_STOP", "[INST]" - ) # Stop sequences from environment variable + self.stop_sequences = os.getenv('MLX_STOP', '[INST]') # Stop sequences from environment variable + self.hf_token = os.getenv('HUGGINGFACE_TOKEN', None) # Hugging Face token from environment variable + + # Authenticate with Hugging Face if a token is provided + if self.hf_token: + self.authenticate_huggingface(self.hf_token) @staticmethod def find_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) + s.bind(('', 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] + @staticmethod + def authenticate_huggingface(token: str): + try: + login(token) + print("Successfully authenticated with Hugging Face.") + except Exception as e: + print(f"Failed to authenticate with Hugging Face: {e}") + async def on_startup(self): # This function is called when the server is started. print(f"on_startup:{__name__}") @@ -54,13 +65,18 @@ class Pipeline: self.process = subprocess.Popen( ["mlx_lm.server", "--model", self.model, "--port", str(self.port)], stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - print( - f"Subprocess started with PID: {self.process.pid} on port {self.port}" + stderr=subprocess.PIPE ) + print(f"Subprocess started with PID: {self.process.pid} on port {self.port}") + + # Check if the process has started correctly + time.sleep(2) # Give it a moment to start + if self.process.poll() is not None: + raise RuntimeError(f"Subprocess failed to start. Return code: {self.process.returncode}") + except Exception as e: print(f"Failed to start subprocess: {e}") + self.process = None def stop_subprocess(self): # Stop the subprocess if it is running @@ -71,6 +87,8 @@ class Pipeline: print(f"Subprocess with PID {self.process.pid} terminated") except Exception as e: print(f"Failed to terminate subprocess: {e}") + finally: + self.process = None def get_response( self, user_message: str, messages: List[OpenAIChatMessage], body: dict @@ -78,9 +96,15 @@ class Pipeline: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") + if not self.process or self.process.poll() is not None: + return "Error: Subprocess is not running." + MLX_BASE_URL = f"http://localhost:{self.port}" MODEL = self.model + # Convert OpenAIChatMessage objects to dictionaries + messages_dict = [{"role": message.role, "content": message.content} for message in messages] + # Extract additional parameters from the body temperature = body.get("temperature", 0.8) max_tokens = body.get("max_tokens", 1000) @@ -96,12 +120,12 @@ class Pipeline: payload = { "model": MODEL, - "messages": [message.model_dump() for message in messages], + "messages": messages_dict, "temperature": temperature, "max_tokens": max_tokens, "top_p": top_p, "repetition_penalty": repetition_penalty, - "stop": stop, + "stop": stop } try: @@ -115,4 +139,4 @@ class Pipeline: return r.iter_lines() except Exception as e: - return f"Error: {e}" + return f"Error: {e}" \ No newline at end of file From de765a1391391f2780bd718d65b356a55cc5b6aa Mon Sep 17 00:00:00 2001 From: Justin Hayes Date: Wed, 22 May 2024 16:03:10 -0400 Subject: [PATCH 2/4] Revert all recent changes --- pipelines/examples/mlx_pipeline.py | 46 ++++++------------------------ 1 file changed, 8 insertions(+), 38 deletions(-) diff --git a/pipelines/examples/mlx_pipeline.py b/pipelines/examples/mlx_pipeline.py index c8dcd4a..03403a9 100644 --- a/pipelines/examples/mlx_pipeline.py +++ b/pipelines/examples/mlx_pipeline.py @@ -1,21 +1,16 @@ """ -title: MLX Pipeline -author: justinh-rahb -date: 2024-05-22 -version: 1.0 -license: MIT -description: A pipeline for running the mlx-lm server with a specified model. -dependencies: requests, mlx-lm, huggingface_hub -environment_variables: MLX_MODEL, MLX_STOP, HUGGINGFACE_TOKEN +Plugin Name: MLX Pipeline +Description: A pipeline for running the mlx-lm server with a specified model and dynamically allocated port. +Author: justinh-rahb +License: MIT +Python Dependencies: requests, subprocess, os, socket, schemas """ from typing import List, Union, Generator, Iterator +import requests import subprocess import os import socket -import time -import requests -from huggingface_hub import login from schemas import OpenAIChatMessage @@ -28,11 +23,6 @@ class Pipeline: self.model = os.getenv('MLX_MODEL', 'mistralai/Mistral-7B-Instruct-v0.2') # Default model if not set in environment variable self.port = self.find_free_port() self.stop_sequences = os.getenv('MLX_STOP', '[INST]') # Stop sequences from environment variable - self.hf_token = os.getenv('HUGGINGFACE_TOKEN', None) # Hugging Face token from environment variable - - # Authenticate with Hugging Face if a token is provided - if self.hf_token: - self.authenticate_huggingface(self.hf_token) @staticmethod def find_free_port(): @@ -41,14 +31,6 @@ class Pipeline: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] - @staticmethod - def authenticate_huggingface(token: str): - try: - login(token) - print("Successfully authenticated with Hugging Face.") - except Exception as e: - print(f"Failed to authenticate with Hugging Face: {e}") - async def on_startup(self): # This function is called when the server is started. print(f"on_startup:{__name__}") @@ -68,15 +50,8 @@ class Pipeline: stderr=subprocess.PIPE ) print(f"Subprocess started with PID: {self.process.pid} on port {self.port}") - - # Check if the process has started correctly - time.sleep(2) # Give it a moment to start - if self.process.poll() is not None: - raise RuntimeError(f"Subprocess failed to start. Return code: {self.process.returncode}") - except Exception as e: print(f"Failed to start subprocess: {e}") - self.process = None def stop_subprocess(self): # Stop the subprocess if it is running @@ -87,8 +62,6 @@ class Pipeline: print(f"Subprocess with PID {self.process.pid} terminated") except Exception as e: print(f"Failed to terminate subprocess: {e}") - finally: - self.process = None def get_response( self, user_message: str, messages: List[OpenAIChatMessage], body: dict @@ -96,9 +69,6 @@ class Pipeline: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") - if not self.process or self.process.poll() is not None: - return "Error: Subprocess is not running." - MLX_BASE_URL = f"http://localhost:{self.port}" MODEL = self.model @@ -106,8 +76,8 @@ class Pipeline: messages_dict = [{"role": message.role, "content": message.content} for message in messages] # Extract additional parameters from the body - temperature = body.get("temperature", 0.8) - max_tokens = body.get("max_tokens", 1000) + temperature = body.get("temperature", 1.0) + max_tokens = body.get("max_tokens", 100) top_p = body.get("top_p", 1.0) repetition_penalty = body.get("repetition_penalty", 1.0) stop = self.stop_sequences From d23c2c48e73d59260a112cb0ee1d9dfce858c433 Mon Sep 17 00:00:00 2001 From: Justin Hayes Date: Mon, 27 May 2024 11:43:39 -0400 Subject: [PATCH 3/4] Update mlx_pipeline.py --- pipelines/examples/mlx_pipeline.py | 143 ++++++++++++----------------- 1 file changed, 61 insertions(+), 82 deletions(-) diff --git a/pipelines/examples/mlx_pipeline.py b/pipelines/examples/mlx_pipeline.py index 38a8f79..09b2ace 100644 --- a/pipelines/examples/mlx_pipeline.py +++ b/pipelines/examples/mlx_pipeline.py @@ -1,112 +1,91 @@ """ -Plugin Name: MLX Pipeline -Description: A pipeline for running the mlx-lm server with a specified model and dynamically allocated port. -Author: justinh-rahb -License: MIT -Python Dependencies: requests, subprocess, os, socket, schemas +title: MLX Pipeline +author: justinh-rahb +date: 2024-05-27 +version: 1.1 +license: MIT +description: A pipeline for generating text using Apple MLX Framework. +dependencies: requests, mlx-lm, huggingface-hub +environment_variables: MLX_HOST, MLX_PORT, MLX_MODEL, MLX_STOP, MLX_SUBPROCESS, HUGGINGFACE_TOKEN """ from typing import List, Union, Generator, Iterator -import requests -import subprocess -import os -import socket from schemas import OpenAIChatMessage - +import requests +import os +import subprocess +import logging +from huggingface_hub import login class Pipeline: def __init__(self): - # Optionally, you can set the id and name of the pipeline. self.id = "mlx_pipeline" self.name = "MLX Pipeline" - self.process = None - self.model = os.getenv('MLX_MODEL', 'mistralai/Mistral-7B-Instruct-v0.2') # Default model if not set in environment variable - self.port = self.find_free_port() - self.stop_sequences = os.getenv('MLX_STOP', '[INST]') # Stop sequences from environment variable + self.host = os.getenv("MLX_HOST", "localhost") + self.port = os.getenv("MLX_PORT", "8080") + self.model = os.getenv("MLX_MODEL", "mistralai/Mistral-7B-Instruct-v0.2") + self.stop_sequence = os.getenv("MLX_STOP", "[INST]").split(",") # Default stop sequence is [INST] + self.subprocess = os.getenv("MLX_SUBPROCESS", "true").lower() == "true" + self.huggingface_token = os.getenv("HUGGINGFACE_TOKEN", None) - @staticmethod - def find_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - return s.getsockname()[1] + if self.huggingface_token: + login(self.huggingface_token) + + if self.subprocess: + self.start_mlx_server() + + def start_mlx_server(self): + if not os.getenv("MLX_PORT"): + self.port = self.find_free_port() + command = f"mlx_lm.server --model {self.model} --port {self.port}" + self.server_process = subprocess.Popen(command, shell=True) + logging.info(f"Started MLX server on port {self.port}") + + def find_free_port(self): + import socket + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('', 0)) + port = s.getsockname()[1] + s.close() + return port async def on_startup(self): - # This function is called when the server is started. - print(f"on_startup:{__name__}") - self.start_subprocess() + logging.info(f"on_startup:{__name__}") async def on_shutdown(self): - # This function is called when the server is stopped. - print(f"on_shutdown:{__name__}") - self.stop_subprocess() - - def start_subprocess(self): - # Start the subprocess for "mlx_lm.server --model ${MLX_MODEL} --port ${PORT}" - try: - self.process = subprocess.Popen( - ["mlx_lm.server", "--model", self.model, "--port", str(self.port)], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - print(f"Subprocess started with PID: {self.process.pid} on port {self.port}") - except Exception as e: - print(f"Failed to start subprocess: {e}") - - def stop_subprocess(self): - # Stop the subprocess if it is running - if self.process: - try: - self.process.terminate() - self.process.wait() - print(f"Subprocess with PID {self.process.pid} terminated") - except Exception as e: - print(f"Failed to terminate subprocess: {e}") + if self.subprocess and hasattr(self, 'server_process'): + self.server_process.terminate() + logging.info(f"Terminated MLX server on port {self.port}") def get_response( self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: - # This is where you can add your custom pipelines like RAG.' - print(f"get_response:{__name__}") + logging.info(f"get_response:{__name__}") - MLX_BASE_URL = f"http://localhost:{self.port}" - MODEL = self.model + url = f"http://{self.host}:{self.port}/v1/chat/completions" + headers = {"Content-Type": "application/json"} - # Convert OpenAIChatMessage objects to dictionaries - messages_dict = [{"role": message.role, "content": message.content} for message in messages] - - # Extract additional parameters from the body - temperature = body.get("temperature", 1.0) - max_tokens = body.get("max_tokens", 100) - top_p = body.get("top_p", 1.0) - repetition_penalty = body.get("repetition_penalty", 1.0) - stop = self.stop_sequences - - if "user" in body: - print("######################################") - print(f'# User: {body["user"]["name"]} ({body["user"]["id"]})') - print(f"# Message: {user_message}") - print("######################################") + # Extract parameters from the request body + max_tokens = body.get("max_tokens", 1024) + temperature = body.get("temperature", 0.8) + repeat_penalty = body.get("repeat_penalty", 1.0) payload = { - "model": MODEL, - "messages": messages_dict, - "temperature": temperature, + "messages": messages, "max_tokens": max_tokens, - "top_p": top_p, - "repetition_penalty": repetition_penalty, - "stop": stop + "temperature": temperature, + "repetition_penalty": repeat_penalty, + "stop": self.stop_sequence, + "stream": body.get("stream", False) } try: - r = requests.post( - url=f"{MLX_BASE_URL}/v1/chat/completions", - json=payload, - stream=True, - ) - + r = requests.post(url, headers=headers, json=payload, stream=body.get("stream", False)) r.raise_for_status() - return r.iter_lines() + if body.get("stream", False): + return r.iter_lines() + else: + return r.json() except Exception as e: - return f"Error: {e}" \ No newline at end of file + return f"Error: {e}" From 79bab8b3db09013a8d946f6d81079e22987fb544 Mon Sep 17 00:00:00 2001 From: Justin Hayes Date: Mon, 27 May 2024 11:52:33 -0400 Subject: [PATCH 4/4] bounds checks --- pipelines/examples/mlx_pipeline.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pipelines/examples/mlx_pipeline.py b/pipelines/examples/mlx_pipeline.py index 09b2ace..85b7b30 100644 --- a/pipelines/examples/mlx_pipeline.py +++ b/pipelines/examples/mlx_pipeline.py @@ -65,10 +65,18 @@ class Pipeline: url = f"http://{self.host}:{self.port}/v1/chat/completions" headers = {"Content-Type": "application/json"} - # Extract parameters from the request body + # Extract and validate parameters from the request body max_tokens = body.get("max_tokens", 1024) + if not isinstance(max_tokens, int) or max_tokens < 0: + max_tokens = 1024 # Default to 1024 if invalid + temperature = body.get("temperature", 0.8) + if not isinstance(temperature, (int, float)) or temperature < 0: + temperature = 0.8 # Default to 0.8 if invalid + repeat_penalty = body.get("repeat_penalty", 1.0) + if not isinstance(repeat_penalty, (int, float)) or repeat_penalty < 0: + repeat_penalty = 1.0 # Default to 1.0 if invalid payload = { "messages": messages,