From a1683951a0b74a3c2cde8370a1f66dd16d8a01e1 Mon Sep 17 00:00:00 2001 From: Sean Robinson Date: Thu, 12 Jun 2025 12:36:08 -0700 Subject: [PATCH] adding cv to repo --- clockwork_viking/api_latest.py | 1640 +++++++++++++++++++++++++++ clockwork_viking/readme.txt | 9 + clockwork_viking/run_api_latest.sh | 2 + clockwork_viking/setup_mongodb.sh | 41 + clockwork_viking/source_me_first.sh | 4 + clockwork_viking/url_scraper.py | 240 ++++ 6 files changed, 1936 insertions(+) create mode 100644 clockwork_viking/api_latest.py create mode 100644 clockwork_viking/readme.txt create mode 100755 clockwork_viking/run_api_latest.sh create mode 100755 clockwork_viking/setup_mongodb.sh create mode 100755 clockwork_viking/source_me_first.sh create mode 100644 clockwork_viking/url_scraper.py diff --git a/clockwork_viking/api_latest.py b/clockwork_viking/api_latest.py new file mode 100644 index 0000000..bf7e346 --- /dev/null +++ b/clockwork_viking/api_latest.py @@ -0,0 +1,1640 @@ +from fastapi import FastAPI, HTTPException, UploadFile, File, Body +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from typing import List, Dict, Any, Union, Optional +from motor.motor_asyncio import AsyncIOMotorClient +import uuid +import json +import asyncio +import openai +from itertools import product +import os +import io +import pandas as pd +import PyPDF2 +from contextlib import asynccontextmanager +import concurrent.futures + +from datetime import datetime, timedelta +import logging + + +from docx import Document +from url_scraper import URLScraper +from fastapi.middleware.cors import CORSMiddleware +from skyvern import Skyvern +# For chatbots +import requests +from typing import Optional +from langchain_openai import ChatOpenAI +from langchain.agents import ( + AgentExecutor, + create_openai_functions_agent, +) +from langchain.tools import StructuredTool +from langchain.tools import Tool +from langchain.memory import ConversationBufferMemory +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from dotenv import load_dotenv + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Initialize FastAPI app +app = FastAPI(title="Data Pipeline System", version="1.0.0", redirect_slashes=True) + +origins = ["*"] +app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=False, + allow_methods=["*"], + allow_headers=["*"], +) + + +# Load variables from .env into environment +load_dotenv() + +# MongoDB client - you'll need to set your connection string +MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017") +DATABASE_NAME = "pipeline_db" +COLLECTION_NAME = "data_objects" + +client = AsyncIOMotorClient(MONGODB_URL) +db = client[DATABASE_NAME] +collection = db[COLLECTION_NAME] + +# OpenAI client - set your API key +openai.api_key = os.getenv("OPENAI_API_KEY") + +# Initialize Skyvern with API key +# You should set your API key as an environment variable for security +skyvern_api_key = os.getenv("SKYVERN_API_KEY") + +# Initialize Skyvern client +skyvern_client = Skyvern(api_key=skyvern_api_key) + +# set up webhook stuff for the internet-research functions +webhook_base_url = os.getenv("WEBHOOK_BASE_URL", "https://staging.impromptu-labs.com") +# In-memory storage for research tasks/Skyvern (we could use a database in production) +pending_tasks: Dict[str, Dict] = {} +completed_tasks: Dict[str, Dict] = {} + +# Pydantic models +class DataEntry(BaseModel): + key_list: List[str] + value: str + +class DataObject(BaseModel): + object_name: str + data: List[DataEntry] + +class InputDataRequest(BaseModel): + created_object_name: str + data_type: str # 'files', 'strings', or 'urls' + input_data: List[Any] # Can be strings, file data, or URLs + +class InputSpec(BaseModel): + input_object_name: str + mode: str # 'combine_events', 'use_individually', or 'match_keys' + +class ApplyPromptRequest(BaseModel): + created_object_names: List[str] + prompt_string: str + inputs: List[InputSpec] + + +# Chatbots and Agents +class CreateAgentRequest(BaseModel): + instructions: str + agent_name: Optional[str] = None + +class ChatRequest(BaseModel): + agent_id: str + message: str + +class ChatResponse(BaseModel): + agent_id: str + response: str + conversation_id: Optional[str] = None + +class AgentInfo(BaseModel): + agent_id: str + agent_name: str + instructions: str + created_at: str + +class ResearchTopicRequest(BaseModel): + goal: str + return_data: List[str] + +# class ResearchTopicResponse(BaseModel): +# output_data: Dict[str, Any] +# status: str + +class ResearchTopicResponse(BaseModel): + task_id: str + status: str + output_data: Optional[Dict] = None + message: str + created_at: Optional[str] = None + completed_at: Optional[str] = None + +class WebhookPayload(BaseModel): + task_id: str + status: str + output: Optional[Dict] = None + error: Optional[str] = None + + +## CHATBOTS AND AGENT STUFF +# Global storage for agents (TODO: store these as doc-store elements too) +agents_storage: Dict[str, Any] = {} + + + + +class SimplifiedAgentBuilder: + def __init__(self, llm_model="gpt-4.1-mini", base_url="http://localhost:5000"): + # def __init__(self, llm_model="gpt-4.1-mini", base_url="http://staging.impromptu-labs.com:5000"): + self.llm = ChatOpenAI(model=llm_model, temperature=0.3) + self.base_url = base_url + self.default_tools = self._create_default_tools() + self.api_tools = [] + + def register_api_endpoint_as_tool( + self, + endpoint_path: str, + method: str, + description: str, + request_model: Optional[BaseModel] = None, + tool_name: Optional[str] = None + ) -> Tool: + """Register an existing API endpoint as a LangChain Tool""" + + if tool_name is None: + tool_name = f"{method.lower()}_{endpoint_path.strip('/').replace('/', '_').replace('-', '_')}" + + # sync/async attempt ########### + + # async def async_api_tool_function(*args, **kwargs) -> str: + # try: + # # Handle both positional and keyword arguments + # # If we get positional args, try to parse as JSON or use as single param + # if args: + # if len(args) == 1 and isinstance(args[0], str): + # try: + # # Try to parse as JSON first + # print('Trying to parse as json...') + # parsed = json.loads(args[0]) + # if isinstance(parsed, dict): + # kwargs.update(parsed) + # except (json.JSONDecodeError, ValueError): + # print(f"Got an error, tool name: {tool_name}") + # # If not JSON, treat as a single parameter + # # For research_topic, assume it's the 'goal' + # if 'research_topic' in tool_name: + # kwargs['goal'] = args[0] + # print(f'kwargs of goal is: {kwargs['goal']}') + # if 'return_data' not in kwargs: + # kwargs['return_data'] = ['general_information'] + # else: + # # For other endpoints, use as generic input + # kwargs['input'] = args[0] + + # url = f"{self.base_url}{endpoint_path}" + # method_upper = method.upper() + # print(f"\nCalling up endpoint: {url}\nWith kwargs:{kwargs}\nWith Method:{method_upper}") + + # async with httpx.AsyncClient(timeout=150) as client: + # if method_upper == "GET": + # response = await client.get(url, params=kwargs) + # elif method_upper == "POST": + # response = await client.post(url, json=kwargs) + # elif method_upper == "PUT": + # response = await client.put(url, json=kwargs) + # elif method_upper == "DELETE": + # response = await client.delete(url, params=kwargs) + # else: + # return f"Unsupported HTTP method: {method}" + + # if response.status_code == 200: + # try: + # return json.dumps(response.json(), indent=2) + # except Exception: + # return response.text + # else: + # return f"API call failed: {response.status_code} - {response.text}" + + # except Exception as e: + # return f"API error: {str(e)}" + + # # But LangChain tools expect sync functions, so you need a wrapper: + # def sync_wrapper(*args, **kwargs): + # return asyncio.run(async_api_tool_function(*args, **kwargs)) + + # # The sync wrapper that LangChain will actually call + # def api_tool_function(*args, **kwargs) -> str: + # return asyncio.run(async_api_tool_function(*args, **kwargs)) + # end sync/async attempt ################# + + + + ## exra parsing logic ################### + + # def api_tool_function(*args, **kwargs) -> str: + # try: + # # Handle both positional and keyword arguments + # # If we get positional args, try to parse as JSON or use as single param + # if args: + # if len(args) == 1 and isinstance(args[0], str): + # try: + # # Try to parse as JSON first + # print('Trying to parse as json...') + # parsed = json.loads(args[0]) + # if isinstance(parsed, dict): + # kwargs.update(parsed) + # except (json.JSONDecodeError, ValueError): + # print(f"Got an error, tool name: {tool_name}") + # # If not JSON, treat as a single parameter + # # For research_topic, assume it's the 'goal' + # if 'research_topic' in tool_name: + # kwargs['goal'] = args[0] + # print(f'kwargs of goal is: {kwargs['goal']}') + # if 'return_data' not in kwargs: + # kwargs['return_data'] = ['general_information'] + # else: + # # For other endpoints, use as generic input + # kwargs['input'] = args[0] + + # url = f"{self.base_url}{endpoint_path}" + # response = None + # method_upper = method.upper() + # print(f"\nCalling up endpoint: {url}\nWith kwargs:{kwargs}\nWith Method:{method_upper}") + + # if method_upper == "GET": + # response = requests.get(url, params=kwargs, timeout=300) + # elif method_upper == "POST": + # response = requests.post(url, json=kwargs, timeout=300) + # elif method_upper == "PUT": + # response = requests.put(url, json=kwargs, timeout=300) + # elif method_upper == "DELETE": + # response = requests.delete(url, params=kwargs, timeout=300) + # else: + # return f"Unsupported HTTP method: {method}" + + # if response.status_code == 200: + # try: + # return json.dumps(response.json(), indent=2) + # except Exception: + # return response.text + # else: + # return f"API call failed: {response.status_code} - {response.text}" + + # except Exception as e: + # return f"API error: {str(e)}" + #################################### + # Original ############## + # def api_tool_function(**kwargs) -> str: + # try: + # url = f"{self.base_url}{endpoint_path}" + # response = None + # method_upper = method.upper() + + # if method_upper == "GET": + # response = requests.get(url, params=kwargs, timeout=30) + # elif method_upper == "POST": + # response = requests.post(url, json=kwargs, timeout=30) + # elif method_upper == "PUT": + # response = requests.put(url, json=kwargs, timeout=30) + # elif method_upper == "DELETE": + # response = requests.delete(url, params=kwargs, timeout=30) + # else: + # return f"Unsupported HTTP method: {method}" + + # if response.status_code == 200: + # try: + # return json.dumps(response.json(), indent=2) + # except Exception: + # return response.text + # else: + # return f"API call failed: {response.status_code} - {response.text}" + + # except Exception as e: + # return f"API error: {str(e)}" + ###################### + #######NEW ###### + ################ + def api_tool_function(*args, **kwargs) -> str: + def make_request(): + try: + # Handle both positional and keyword arguments + # If we get positional args, try to parse as JSON or use as single param + request_kwargs = kwargs.copy() # Make a copy to avoid modifying original + + if args: + if len(args) == 1 and isinstance(args[0], str): + try: + # Try to parse as JSON first + print('Trying to parse as json...') + parsed = json.loads(args[0]) + if isinstance(parsed, dict): + request_kwargs.update(parsed) + except (json.JSONDecodeError, ValueError): + print(f"Got an error, tool name: {tool_name}") + # If not JSON, treat as a single parameter + # For research_topic, assume it's the 'goal' + if 'research_topic' in tool_name: + request_kwargs['goal'] = args[0] + print(f'kwargs of goal is: {request_kwargs["goal"]}') + if 'return_data' not in request_kwargs: + request_kwargs['return_data'] = ['general_information'] + else: + # For other endpoints, use as generic input + request_kwargs['input'] = args[0] + + url = f"{self.base_url}{endpoint_path}" + response = None + method_upper = method.upper() + print(f"\nCalling up endpoint: {url}\nWith kwargs:{request_kwargs}\nWith Method:{method_upper}") + + if method_upper == "GET": + response = requests.get(url, params=request_kwargs, timeout=30) + elif method_upper == "POST": + response = requests.post(url, json=request_kwargs, timeout=30) + elif method_upper == "PUT": + response = requests.put(url, json=request_kwargs, timeout=30) + elif method_upper == "DELETE": + response = requests.delete(url, params=request_kwargs, timeout=30) + else: + return f"Unsupported HTTP method: {method}" + + if response.status_code == 200: + try: + return json.dumps(response.json(), indent=2) + except Exception: + return response.text + else: + return f"API call failed: {response.status_code} - {response.text}" + + except Exception as e: + return f"API error: {str(e)}" + + # Run the request in a separate thread to avoid blocking the main event loop + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(make_request) + return future.result(timeout=30) # 30 second timeout + except concurrent.futures.TimeoutError: + return f"Request timed out after 30 seconds" + except Exception as e: + return f"Thread execution error: {str(e)}" + + + + tool = Tool.from_function( + func=api_tool_function, + name=tool_name, + description=description, + args_schema=request_model if request_model else None, + ) + self.api_tools.append(tool) + return tool + + def _create_default_tools(self) -> list[Tool]: + """Create default toolset (e.g., search, math)""" + + def search_internet(query: str) -> str: + return f"Search results for '{query}' (stubbed). Replace with real API." + + def calculate(expression: str) -> str: + try: + allowed_chars = set('0123456789+-*/.() ') + if not all(c in allowed_chars for c in expression): + return "Invalid characters in expression" + result = eval(expression) + return str(result) + except Exception as e: + return f"Calculation error: {str(e)}" + return [] + # return [ + # Tool.from_function( + # func=search_internet, + # name="search_internet", + # description="Search the internet for information given a query string." + # ), + # Tool.from_function( + # func=calculate, + # name="calculate", + # description="Perform basic math given a string expression like '2 + 2'" + # ) + # ] + + def create_agent(self, instructions: str, agent_name: Optional[str] = None) -> tuple[str, AgentExecutor]: + """Build a new LLM agent with tools and memory""" + + agent_id = agent_name or str(uuid.uuid4()) + + prompt = ChatPromptTemplate.from_messages([ + ("system", f"{instructions}\nAlways be helpful, accurate, and use tools when needed."), + MessagesPlaceholder(variable_name="chat_history"), + ("human", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad") + ]) + + memory = ConversationBufferMemory( + memory_key="chat_history", + return_messages=True, + max_token_limit=2000 + ) + + tools = self.default_tools + self.api_tools + + agent = create_openai_functions_agent( + llm=self.llm, + tools=tools, + prompt=prompt, + ) + + executor = AgentExecutor( + agent=agent, + tools=tools, + memory=memory, + verbose=True, + max_iterations=10, + max_execution_time=60, + handle_parsing_errors=True, + ) + + return agent_id, executor + +# Initialize the builder +builder = SimplifiedAgentBuilder() + +# Register your existing API endpoints as tools +# Example for your input_data endpoint + +builder.register_api_endpoint_as_tool( + endpoint_path="/input_data", + method="POST", + description="""Process and store input data in MongoDB. + Required parameters: + - created_object_name: Name for the object to store data in + - input_data: List of data items to process + - data_type: Type of data ('strings', 'files', or 'urls') + + Example usage: {"created_object_name": "my_dataset", "input_data": ["hello world", "test data"], "data_type": "strings"}""", + request_model=InputDataRequest, + tool_name="input_data" +) + +# Register your existing API endpoints as tools +# Example for your input_data endpoint +builder.register_api_endpoint_as_tool( + endpoint_path="/objects/{object_name}", + method="GET", + description="""Gets an object from MongoDB by name. """, + tool_name="object_by_name" +) + + +builder.register_api_endpoint_as_tool( + endpoint_path="/research_topic", + method="POST", + description="""Begin Researching a topic using an online browser to find information through web search. Starts a new research task and returns immediately with a task ID for tracking. The actual answer is gotten later by using the /research_status/{task_id} endpoint. + Required parameters: + - goal (string): A desired goal to achieve, describing what information you want to find. Example: "the linkedin URL and phone number for John Doe, the CEO of ABC" + - return_data (list of strings): List of specific data elements that should be returned from the research. + Returned Values: + - task_id (string): The API will immediately return a task_id. Keep this - you'll need it to get your results. + - status (string): Status of the research operation. + + The function will perform web searches and extract the requested information without requiring logins.""", + request_model=ResearchTopicRequest, + tool_name="research_topic" +) + + +builder.register_api_endpoint_as_tool( + endpoint_path="/research_status/{task_id}", + method="GET", + description="""Check if your research task is complete and get results. + Use the task ID to check periodically until the status changes from "pending" to "completed", "failed", or "timeout". + Recommended polling pattern: + Check every 15-30 seconds + When status is "completed", the output_data field will contain your research results, with the keys as in return_data from research_topic. Expect to wait for these results.""", + tool_name="research_status" +) + + +# Add more of your existing endpoints here +# Example patterns for other common endpoints: + +# builder.register_api_endpoint_as_tool( +# endpoint_path="/get_data", +# method="GET", +# description="Retrieve stored data from MongoDB by object name", +# tool_name="get_data" +# ) + +# builder.register_api_endpoint_as_tool( +# endpoint_path="/delete_data", +# method="DELETE", +# description="Delete data from MongoDB by object name", +# tool_name="delete_data" +# ) + +# builder.register_api_endpoint_as_tool( +# endpoint_path="/list_objects", +# method="GET", +# description="List all available data objects in MongoDB", +# tool_name="list_objects" +# ) + +@app.post("/create-agent", response_model=dict) +async def create_agent(request: CreateAgentRequest): + """Create a new agent with custom instructions""" + try: + agent_name = request.agent_name or f"Agent-{len(agents_storage) + 1}" + agent_id, agent_executor = builder.create_agent( + instructions=request.instructions, + agent_name=agent_name + ) + + # Store agent + agents_storage[agent_id] = { + "agent_executor": agent_executor, + "agent_name": agent_name, + "instructions": request.instructions, + "created_at": str(uuid.uuid4()) # In production, use actual timestamp + } + + return { + "agent_id": agent_id, + "agent_name": agent_name, + "message": "Agent created successfully", + "instructions": request.instructions + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to create agent: {str(e)}") + +@app.post("/chat", response_model=ChatResponse) +async def chat_with_agent(request: ChatRequest): + """Send a message to a specific agent""" + if request.agent_id not in agents_storage: + raise HTTPException(status_code=404, detail="Agent not found") + + try: + agent_info = agents_storage[request.agent_id] + agent_executor = agent_info["agent_executor"] + + # Get response from agent + response = agent_executor.invoke({ + "input": request.message + }) + + return ChatResponse( + agent_id=request.agent_id, + response=response["output"], + conversation_id=None # Could implement conversation tracking + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to process message: {str(e)}") + +@app.get("/agents", response_model=List[AgentInfo]) +async def list_agents(): + """List all created agents""" + agents = [] + for agent_id, info in agents_storage.items(): + agents.append(AgentInfo( + agent_id=agent_id, + agent_name=info["agent_name"], + instructions=info["instructions"], + created_at=info["created_at"] + )) + return agents + +@app.get("/agents/{agent_id}", response_model=AgentInfo) +async def get_agent(agent_id: str): + """Get information about a specific agent""" + if agent_id not in agents_storage: + raise HTTPException(status_code=404, detail="Agent not found") + + info = agents_storage[agent_id] + return AgentInfo( + agent_id=agent_id, + agent_name=info["agent_name"], + instructions=info["instructions"], + created_at=info["created_at"] + ) + +@app.delete("/agents/{agent_id}") +async def delete_agent(agent_id: str): + """Delete a specific agent""" + if agent_id not in agents_storage: + raise HTTPException(status_code=404, detail="Agent not found") + + del agents_storage[agent_id] + return {"message": "Agent deleted successfully"} + +@app.get("/tools") +async def list_available_tools(): + """List all available tools that agents can use""" + tools_info = [] + + # Add default tools + for tool in builder.default_tools: + tools_info.append({ + "name": tool.name, + "description": tool.description, + "type": "default" + }) + + # Add API tools + for tool in builder.api_tools: + tools_info.append({ + "name": tool.name, + "description": tool.description, + "type": "api_endpoint" + }) + + return { + "total_tools": len(tools_info), + "tools": tools_info + } + +@app.post("/register-tool") +async def register_new_tool(request: dict): + """Register a new API endpoint as a tool""" + try: + required_fields = ["endpoint_path", "method", "description"] + if not all(field in request for field in required_fields): + raise HTTPException( + status_code=400, + detail=f"Missing required fields: {required_fields}" + ) + + builder.register_api_endpoint_as_tool( + endpoint_path=request["endpoint_path"], + method=request["method"], + description=request["description"], + tool_name=request.get("tool_name") + ) + + return {"message": "Tool registered successfully", "tool_name": request.get("tool_name")} + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to register tool: {str(e)}") + +@app.get("/health") +async def health_check(): + """Health check endpoint""" + return {"status": "healthy", "agents_count": len(agents_storage)} + +# Example usage and testing endpoints +@app.get("/chatbot_instructions") +async def root(): + """API documentation and examples""" + return { + "message": "Simplified Agent Service", + "usage": { + "create_agent": "POST /create-agent with {'instructions': 'Your instructions here'}", + "chat": "POST /chat with {'agent_id': 'agent_id', 'message': 'Your message'}", + "list_agents": "GET /agents", + "delete_agent": "DELETE /agents/{agent_id}" + }, + "example": { + "create_agent": { + "instructions": "You are a helpful customer service agent for a tech company. Be friendly and professional.", + "agent_name": "Customer Service Bot" + }, + "chat": { + "agent_id": "your-agent-id-here", + "message": "Hello, how can you help me today?" + } + } + } +### END CHATBOT STUFF + +### BEGIN Browser use/Skyvern Stuff + +# @app.post("/research_topic", response_model=ResearchTopicResponse) +# async def research_topic(request: ResearchTopicRequest): +# try: +# if not skyvern_api_key: +# raise HTTPException( +# status_code=500, +# detail="Skyvern API key not configured. Please set SKYVERN_API_KEY environment variable." +# ) + +# print('AAAAAAAA') # This should now print immediately + +# # Generate unique task ID +# task_id = str(uuid.uuid4()) +# created_at = datetime.now() + +# # Store task info in pending tasks +# pending_tasks[task_id] = { +# "request": request.dict(), +# "created_at": created_at, +# "status": "pending" +# } + +# # Construct webhook URL +# webhook_url = f"{webhook_base_url}/webhook/skyvern/{task_id}" + +# # Construct the prompt text +# plural_return_keys = len(request.return_data) > 1 +# if plural_return_keys: +# prompt_text = ( +# f"Starting from a google search, find {request.goal}. " +# f"Attempt to find this information without needing to log into any sites. " +# f"Return an object with the following keys: {', '.join(request.return_data)}. " +# f"Do not nest additional keys or categories under these keys, each of these keys should contain a single string." +# ) +# else: +# prompt_text = ( +# f"Starting from a google search, find {request.goal}. " +# f"Attempt to find this information without needing to log into any sites. " +# f"Return an object with the following key: {', '.join(request.return_data)}. " +# f"Do not nest additional keys or categories under this key, it should just contain a single string." +# ) + +# logger.info(f"Starting Skyvern task {task_id} with webhook URL: {webhook_url}") + +# # Fire and forget - don't await this! +# asyncio.create_task(run_skyvern_task(task_id, prompt_text, webhook_url)) + +# logger.info(f"Skyvern task {task_id} initiated successfully") + +# # Return immediately +# return ResearchTopicResponse( +# task_id=task_id, +# status="pending", +# message="Research task started successfully. Use /research_status/{task_id} to check progress.", +# created_at=created_at.isoformat() +# ) + +# except Exception as e: +# logger.error(f"Error starting research task: {str(e)}") +# raise HTTPException( +# status_code=500, +# detail=f"Error starting research task: {str(e)}" +# ) + +# # Separate function to handle the long-running task +# async def run_skyvern_task(task_id: str, prompt_text: str, webhook_url: str): +# try: +# # This will run in the background +# skyvern_result = await skyvern_client.run_task( +# prompt=prompt_text, +# webhook_url=webhook_url +# ) +# logger.info(f"Skyvern task {task_id} completed successfully") + +# # Update task status if needed (or let webhook handle it) +# if task_id in pending_tasks: +# pending_tasks[task_id]["status"] = "completed" + +# except Exception as e: +# logger.error(f"Skyvern task {task_id} failed: {str(e)}") +# # Update task status to failed +# if task_id in pending_tasks: +# pending_tasks[task_id]["status"] = "failed" +# pending_tasks[task_id]["error"] = str(e) + + +@app.post("/research_topic", response_model=ResearchTopicResponse) +async def research_topic(request: ResearchTopicRequest): + """ + Start a research task using Skyvern API + + Args: + request: Contains goal (string) and return_data (list of strings) + + Returns: + Task information with task_id for polling status + """ + try: + if not skyvern_api_key: + raise HTTPException( + status_code=500, + detail="Skyvern API key not configured. Please set SKYVERN_API_KEY environment variable." + ) + print('AAAAAAAA') + # Generate unique task ID + task_id = str(uuid.uuid4()) + created_at = datetime.now() + + # Store task info in pending tasks + pending_tasks[task_id] = { + "request": request.model_dump(), #dict(), + "created_at": created_at, + "status": "pending" + } + + # Construct webhook URL + webhook_url = f"{webhook_base_url}/webhook/skyvern/{task_id}" + + # Initialize Skyvern client + #skyvern = Skyvern(api_key=skyvern_api_key) + + # Construct the prompt text + plural_return_keys = len(request.return_data) > 1 + prompt_text = '' + if plural_return_keys: + prompt_text = ( + f"Starting from a google search, find {request.goal}. " + f"Attempt to find this information without needing to log into any sites. " + f"Return an object with the following keys: {', '.join(request.return_data)}. Do not nest additional keys or categories under these keys, each of these keys should contain a single string." + ) + else: + prompt_text = ( + f"Starting from a google search, find {request.goal}. " + f"Attempt to find this information without needing to log into any sites. " + f"Return an object with the following key: {', '.join(request.return_data)}. Do not nest additional keys or categories under this key, it should just contain a single string." + ) + logger.info(f"Starting Skyvern task {task_id} with webhook URL: {webhook_url}") + + # Start the Skyvern task with webhook + skyvern_result = await skyvern_client.run_task( + # skyvern_client.run_task( + prompt=prompt_text, + webhook_url=webhook_url + ) + + print(f"Skyvern result: {skyvern_result}") + + logger.info(f"Skyvern task {task_id} initiated successfully") + + return ResearchTopicResponse( + task_id=task_id, + status="pending", + message="Research task started successfully. Use /research_status/{task_id} to check progress.", + created_at=created_at.isoformat() + ) + + except Exception as e: + logger.error(f"Error starting research task: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error starting research task: {str(e)}" + ) + +@app.get("/research_status/{task_id}", response_model=ResearchTopicResponse) +async def get_research_status(task_id: str): + """ + Check the status of a research task + + Args: + task_id: The unique identifier for the research task + + Returns: + Current status and results (if completed) of the research task + """ + # Check completed tasks first + if task_id in completed_tasks: + task_data = completed_tasks[task_id] + return ResearchTopicResponse( + task_id=task_id, + status=task_data["status"], + output_data=task_data.get("output"), + message="Task completed successfully" if task_data["status"] == "completed" else "Task failed", + created_at=task_data["created_at"].isoformat(), + completed_at=task_data.get("completed_at", datetime.now()).isoformat() + ) + + # Check pending tasks + if task_id in pending_tasks: + task_data = pending_tasks[task_id] + + # Check if task has timed out (30 minutes default) + timeout_minutes = 30 + if datetime.now() - task_data["created_at"] > timedelta(minutes=timeout_minutes): + # Move to completed with timeout status + completed_tasks[task_id] = { + **task_data, + "status": "timeout", + "completed_at": datetime.now() + } + pending_tasks.pop(task_id) + + return ResearchTopicResponse( + task_id=task_id, + status="timeout", + message=f"Task timed out after {timeout_minutes} minutes", + created_at=task_data["created_at"].isoformat(), + completed_at=datetime.now().isoformat() + ) + + return ResearchTopicResponse( + task_id=task_id, + status="pending", + message="Task is still in progress. Please check again in a few moments.", + created_at=task_data["created_at"].isoformat() + ) + + raise HTTPException( + status_code=404, + detail=f"Task with ID {task_id} not found. It may have been cleaned up or never existed." + ) + +@app.post("/webhook/skyvern/{task_id}") +async def skyvern_webhook(task_id: str, payload: WebhookPayload): + """ + Webhook endpoint to receive Skyvern task completion results + + This endpoint is called by Skyvern when a task completes. + It should not be called directly by clients. + """ + logger.info(f"Received webhook for task {task_id}: status={payload.status}") + + if task_id not in pending_tasks: + logger.warning(f"Received webhook for unknown task: {task_id}") + return {"status": "unknown_task", "message": "Task ID not found in pending tasks"} + + # Move task from pending to completed + task_data = pending_tasks.pop(task_id) + completed_tasks[task_id] = { + **task_data, + "status": payload.status, + "output": payload.output, + "error": payload.error, + "completed_at": datetime.now() + } + + logger.info(f"Task {task_id} moved to completed with status: {payload.status}") + + return { + "status": "received", + "message": f"Webhook received for task {task_id}", + "task_status": payload.status + } + +@app.get("/tasks/pending") +async def list_pending_tasks(): + """ + List all currently pending tasks (useful for debugging) + """ + return { + "pending_tasks": len(pending_tasks), + "task_ids": list(pending_tasks.keys()), + "tasks": { + task_id: { + "created_at": task_data["created_at"].isoformat(), + "goal": task_data["request"]["goal"], + "status": task_data["status"] + } + for task_id, task_data in pending_tasks.items() + } + } + +@app.get("/tasks/completed") +async def list_completed_tasks(): + """ + List recently completed tasks (useful for debugging) + """ + return { + "completed_tasks": len(completed_tasks), + "task_ids": list(completed_tasks.keys()), + "tasks": { + task_id: { + "created_at": task_data["created_at"].isoformat(), + "completed_at": task_data.get("completed_at", datetime.now()).isoformat(), + "goal": task_data["request"]["goal"], + "status": task_data["status"] + } + for task_id, task_data in completed_tasks.items() + } + } + +@app.delete("/tasks/{task_id}") +async def delete_task(task_id: str): + """ + Delete a task from memory (cleanup) + """ + removed_from = [] + + if task_id in pending_tasks: + pending_tasks.pop(task_id) + removed_from.append("pending") + + if task_id in completed_tasks: + completed_tasks.pop(task_id) + removed_from.append("completed") + + if not removed_from: + raise HTTPException(status_code=404, detail="Task not found") + + return { + "message": f"Task {task_id} removed from {', '.join(removed_from)} tasks", + "task_id": task_id + } + +# Background cleanup task +async def cleanup_old_tasks(): + """ + Background task to clean up old completed tasks to prevent memory bloat + """ + while True: + try: + current_time = datetime.now() + cutoff_time = current_time - timedelta(hours=2) # Keep tasks for 2 hours + + # Clean up old completed tasks + to_remove = [ + task_id for task_id, task_data in completed_tasks.items() + if task_data.get("completed_at", current_time) < cutoff_time + ] + + for task_id in to_remove: + completed_tasks.pop(task_id, None) + logger.info(f"Cleaned up old completed task: {task_id}") + + # Clean up very old pending tasks (longer timeout) + old_pending_cutoff = current_time - timedelta(hours=1) + old_pending_to_remove = [ + task_id for task_id, task_data in pending_tasks.items() + if task_data["created_at"] < old_pending_cutoff + ] + + for task_id in old_pending_to_remove: + pending_tasks.pop(task_id, None) + logger.info(f"Cleaned up old pending task: {task_id}") + + if to_remove or old_pending_to_remove: + logger.info(f"Cleanup completed: {len(to_remove)} completed tasks, {len(old_pending_to_remove)} old pending tasks removed") + + except Exception as e: + logger.error(f"Error during cleanup: {str(e)}") + + await asyncio.sleep(1800) # Run cleanup every 30 minutes +### END Browser use/Skyvern stuff + +### BEGIN OTHER API FUNCTIONS + +# Text extraction functions +def get_text_from_file(file_data: Any) -> str: + """ + Extract text from various file types. + file_data can be: + - UploadFile object from FastAPI + - File-like object with .read() method + - Bytes data + - String (assumed to be file content) + """ + try: + # Handle different input types + if hasattr(file_data, 'filename') and hasattr(file_data, 'file'): + # FastAPI UploadFile object + filename = file_data.filename + file_content = file_data.file.read() + file_data.file.seek(0) # Reset for potential reuse + elif hasattr(file_data, 'read'): + # File-like object + filename = getattr(file_data, 'name', 'unknown_file') + file_content = file_data.read() + if hasattr(file_data, 'seek'): + file_data.seek(0) # Reset for potential reuse + elif isinstance(file_data, bytes): + # Raw bytes - assume text file + filename = 'unknown_file.txt' + file_content = file_data + elif isinstance(file_data, str): + # String content - return as is + return file_data + else: + return str(file_data) + + # Determine file extension + if '.' in filename: + ext = '.' + filename.split('.')[-1].lower() + else: + ext = '.txt' # Default assumption + + # Extract text based on file type + if ext == '.txt': + if isinstance(file_content, bytes): + return file_content.decode('utf-8') + else: + return str(file_content) + + elif ext == '.csv': + # Use pandas to read CSV + if isinstance(file_content, bytes): + df = pd.read_csv(io.BytesIO(file_content)) + else: + df = pd.read_csv(io.StringIO(str(file_content))) + return df.to_string(index=False) + + elif ext == '.pdf': + # Use PyPDF2 to extract text from PDF + if isinstance(file_content, bytes): + pdf_file = io.BytesIO(file_content) + else: + pdf_file = io.BytesIO(file_content.encode('utf-8')) + + reader = PyPDF2.PdfReader(pdf_file) + file_text = "" + for page in reader.pages: + text = page.extract_text() + if text: + file_text += text + "\n" + return file_text.strip() + + elif ext == '.docx': + # Use python-docx to extract text from Word documents + if isinstance(file_content, bytes): + docx_file = io.BytesIO(file_content) + else: + docx_file = io.BytesIO(file_content.encode('utf-8')) + + doc = Document(docx_file) + file_text = "" + for para in doc.paragraphs: + file_text += para.text + "\n" + return file_text.strip() + + elif ext in ['.xls', '.xlsx']: + # Use pandas to read Excel files + if isinstance(file_content, bytes): + df = pd.read_excel(io.BytesIO(file_content)) + else: + # This case is less likely for Excel files + df = pd.read_excel(io.StringIO(str(file_content))) + return df.to_string(index=False) + + else: + # Unknown file type - try to decode as text + if isinstance(file_content, bytes): + try: + return file_content.decode('utf-8') + except UnicodeDecodeError: + return file_content.decode('utf-8', errors='ignore') + else: + return str(file_content) + + except Exception as e: + # If all else fails, return error message + return f"Error extracting text from file: {str(e)}" + +async def get_text_from_url(url: str) -> str: + """ + Extract text from a URL using the robust URLScraper. + This uses your existing URLScraper which is good at getting text + from URLs even when they're somewhat hard to scrape. + """ + try: + # Normalize URL + normalized_url = normalize_url(url.strip()) + + # Use your existing URLScraper which handles difficult URLs well + text = URLScraper.get_visible_text_and_links(normalized_url) + + # Add source URL information + content = f"--- Content from {normalized_url} ---\n{text}" + + return content + + except Exception as e: + return f"Error fetching content from {url}: {str(e)}" + +def normalize_url(url: str) -> str: + """Normalize URL by adding protocol if missing""" + url = url.strip() + if not url.startswith(('http://', 'https://')): + url = 'https://' + url + return url + +# Helper functions +async def get_or_create_object(object_name: str) -> Dict[str, Any]: + """Get existing object or create new one""" + existing = await collection.find_one({"object_name": object_name}) + if existing: + return existing + else: + new_obj = {"object_name": object_name, "data": []} + await collection.insert_one(new_obj) + return new_obj + +async def add_data_entry(object_name: str, key_list: List[str], value: str): + """Add a data entry to an object""" + await collection.update_one( + {"object_name": object_name}, + {"$push": {"data": {"key_list": key_list, "value": value}}} + ) + +def combine_events(data_entries: List[DataEntry]) -> DataEntry: + """Combine multiple data entries into one""" + combined_value = " ".join([entry.value for entry in data_entries]) + combined_keys = [] + for entry in data_entries: + combined_keys.extend(entry.key_list) + unique_keys = list(set(combined_keys)) + + return DataEntry(key_list=unique_keys, value=combined_value) + +def get_matching_entries(entries1: List[DataEntry], entries2: List[DataEntry]) -> List[tuple]: + """Get pairs of entries that have matching keys""" + matches = [] + for entry1 in entries1: + for entry2 in entries2: + if set(entry1.key_list).intersection(set(entry2.key_list)): + matches.append((entry1, entry2)) + return matches + +async def call_openai_api(prompt: str, output_keys: List[str]) -> Dict[str, Any]: + """Call OpenAI API with the given prompt""" + try: + # Add instruction for JSON output format + json_instruction = f"\n\nPlease return your response as a JSON object with the following keys: {', '.join(output_keys)}. Return a value or a list of values for each as appropriate." + full_prompt = prompt + json_instruction + client_openai = openai.OpenAI() + response = client_openai.chat.completions.create( + model="gpt-4.1-mini", + messages=[ + {"role": "system", "content": "You are a helpful assistant that returns responses in JSON format."}, + {"role": "user", "content": full_prompt} + ], + response_format={"type":"json_object"}, # this is the strict mode + temperature=0.2 + ) + # print(f"Got response: {response.choices[0].message.content}") + return json.loads(response.choices[0].message.content) # Already parsed JSON + + except Exception as e: + raise HTTPException(status_code=500, detail=f"OpenAI API error: {str(e)}") + +# API Endpoints +@app.post("/input_data") +async def input_data(request: InputDataRequest): + """ + Process input data and store in MongoDB + """ + try: + # Get or create the target object + await get_or_create_object(request.created_object_name) + + processed_count = 0 + + for item in request.input_data: + text_content = "" + + # Process based on data type + if request.data_type == "strings": + text_content = str(item) + elif request.data_type == "files": + text_content = get_text_from_file(item) + elif request.data_type == "urls": + text_content = await get_text_from_url(str(item)) + else: + raise HTTPException(status_code=400, detail="Invalid data_type. Must be 'files', 'strings', or 'urls'") + + # Create UUID for this entry + entry_uuid = str(uuid.uuid4()) + + # Add to database + await add_data_entry( + object_name=request.created_object_name, + key_list=[entry_uuid], + value=text_content + ) + + processed_count += 1 + + return JSONResponse( + content={ + "message": f"Successfully processed {processed_count} items", + "object_name": request.created_object_name + } + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=f"OpenAI API error: {str(e)}") + +@app.post("/apply_prompt") +async def apply_prompt(request: ApplyPromptRequest): + """ + Apply prompts to data combinations and generate new objects + """ + try: + # Step 1: Get all input objects from database + input_objects = {} + for input_spec in request.inputs: + obj = await collection.find_one({"object_name": input_spec.input_object_name}) + if not obj: + raise HTTPException(status_code=404, detail=f"Object {input_spec.input_object_name} not found") + input_objects[input_spec.input_object_name] = obj + + # Step 2: Process each input according to its mode + processed_inputs = {} + for input_spec in request.inputs: + obj_data = input_objects[input_spec.input_object_name] + data_entries = [DataEntry(**entry) for entry in obj_data["data"]] + + if input_spec.mode == "combine_events": + # Combine all entries into one + combined = combine_events(data_entries) + processed_inputs[input_spec.input_object_name] = [combined] + else: + # Keep individual entries + processed_inputs[input_spec.input_object_name] = data_entries + + # Step 3: Generate combinations based on modes + combinations = [] + input_names = list(processed_inputs.keys()) + input_specs_by_name = {spec.input_object_name: spec for spec in request.inputs} + + if len(input_names) == 1: + # Single input case + name = input_names[0] + for entry in processed_inputs[name]: + combinations.append({name: entry}) + else: + # Multiple inputs - check if any use match_keys mode + has_match_keys = any(input_specs_by_name[name].mode == "match_keys" for name in input_names) + + if has_match_keys: + # Handle match_keys mode + for i, name1 in enumerate(input_names): + for j, name2 in enumerate(input_names[i+1:], i+1): + if (input_specs_by_name[name1].mode == "match_keys" or + input_specs_by_name[name2].mode == "match_keys"): + matches = get_matching_entries( + processed_inputs[name1], + processed_inputs[name2] + ) + for entry1, entry2 in matches: + combo = {name1: entry1, name2: entry2} + # Add other inputs if they exist + for other_name in input_names: + if other_name not in [name1, name2]: + # For now, just take first entry of other inputs + if processed_inputs[other_name]: + combo[other_name] = processed_inputs[other_name][0] + combinations.append(combo) + else: + # All permutations for combine_events and use_individually + input_lists = [processed_inputs[name] for name in input_names] + for combo_tuple in product(*input_lists): + combo_dict = dict(zip(input_names, combo_tuple)) + combinations.append(combo_dict) + + # Step 4: Apply prompts and call OpenAI + results_processed = 0 + + for combo in combinations: + # Replace placeholders in prompt + filled_prompt = request.prompt_string + all_keys = [] + + for input_name, data_entry in combo.items(): + placeholder = "{" + input_name + "}" + filled_prompt = filled_prompt.replace(placeholder, data_entry.value) + all_keys.extend(data_entry.key_list) + + # Call OpenAI API + openai_result = await call_openai_api(filled_prompt, request.created_object_names) + + # Step 5: Store results + unique_keys = list(set(all_keys)) + new_uuid = str(uuid.uuid4()) + final_keys = unique_keys + [new_uuid] + + for obj_name in request.created_object_names: + if obj_name in openai_result: + # Ensure target object exists + await get_or_create_object(obj_name) + + # Add the result + await add_data_entry( + object_name=obj_name, + key_list=final_keys, + # value=str(openai_result[obj_name]) + value=openai_result[obj_name] + ) + + results_processed += 1 + + return JSONResponse( + content={ + "message": f"Successfully processed {results_processed} combinations", + "created_objects": request.created_object_names, + "combinations_processed": len(combinations) + } + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/objects/{object_name}") +async def get_object(object_name: str): + """Get an object by name""" + obj = await collection.find_one({"object_name": object_name}) + if not obj: + raise HTTPException(status_code=404, detail="Object not found") + + # Remove MongoDB's _id field for cleaner response + obj.pop("_id", None) + return obj + + +@app.get("/return_data/{object_name}") +async def return_data(object_name: str): + """ + Return an object plus all related objects that share at least one key. + This finds objects with data entries that have overlapping key_list values. + """ + try: + # Get the primary object + primary_obj = await collection.find_one({"object_name": object_name}) + if not primary_obj: + raise HTTPException(status_code=404, detail=f"Object '{object_name}' not found") + + # Extract all keys from the primary object + primary_keys = set() + for data_entry in primary_obj.get("data", []): + primary_keys.update(data_entry.get("key_list", [])) + + if not primary_keys: + # If primary object has no keys, just return it alone + primary_obj.pop("_id", None) + return { + "object_name": primary_obj.get("object_name",object_name), + "data": primary_obj.get("data",[]), + "related_objects": [], + "total_objects": 1, + "shared_keys_found": [] + } + + # Find all other objects that have at least one matching key + related_objects = [] + shared_keys_summary = [] + + # Get all objects except the primary one + cursor = collection.find({"object_name": {"$ne": object_name}}) + all_other_objects = await cursor.to_list(length=None) + + for obj in all_other_objects: + obj_keys = set() + matching_keys = set() + + # Collect all keys from this object and find matches + for data_entry in obj.get("data", []): + entry_keys = set(data_entry.get("key_list", [])) + obj_keys.update(entry_keys) + + # Check if any keys match with primary object + matches = entry_keys.intersection(primary_keys) + matching_keys.update(matches) + + # If there are any matching keys, include this object + if matching_keys: + obj.pop("_id", None) # Remove MongoDB ID + related_objects.append(obj) + shared_keys_summary.append({ + "object_name": obj["object_name"], + "shared_keys": list(matching_keys), + "shared_key_count": len(matching_keys) + }) + + # Remove MongoDB ID from primary object + primary_obj.pop("_id", None) + + returned_object = { + # "data": primary_obj, + "object_name": primary_obj.get("object_name",object_name), + "data": primary_obj.get("data",[]), + "related_objects": related_objects, + "total_objects": 1 + len(related_objects), + "shared_keys_summary": shared_keys_summary, + "primary_object_keys": list(primary_keys) + } + + print(f"\nRETURNING THIS OBJECT:\n{returned_object}") + + return returned_object #{ + # # "data": primary_obj, + # "object_name": primary_obj.get("object_name",object_name), + # "data": primary_obj.get("data",[]), + # "related_objects": related_objects, + # "total_objects": 1 + len(related_objects), + # "shared_keys_summary": shared_keys_summary, + # "primary_object_keys": list(primary_keys) + # } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@app.get("/objects") +async def list_objects(): + """List all objects""" + cursor = collection.find({}, {"_id": 0}) + objects = await cursor.to_list(length=None) + return {"objects": [obj["object_name"] for obj in objects]} + +@app.delete("/objects/{object_name}") +async def delete_object(object_name: str): + """Delete an object""" + result = await collection.delete_one({"object_name": object_name}) + if result.deleted_count == 0: + raise HTTPException(status_code=404, detail="Object not found") + return {"message": f"Object {object_name} deleted successfully"} + +@app.post("/scrape_urls") +async def scrape_urls( + object_name: str = Body(...), + urls: Union[str, List[str]] = Body(...) +): + """ + Legacy endpoint: Scrape URLs and create data object with the new system structure. + This endpoint maintains backward compatibility while using the new data model. + """ + try: + # Convert input to list + if isinstance(urls, str): + url_list = [normalize_url(url.strip()) for url in urls.split(',') if url.strip()] + else: + url_list = [normalize_url(str(url).strip()) for url in urls if str(url).strip()] + + # Get or create the target object + await get_or_create_object(object_name) + + # Process URLs + processed_count = 0 + for url in url_list: + try: + # Use the existing get_text_from_url function + content = await get_text_from_url(url) + + # Create UUID for this entry (matches new system structure) + entry_uuid = str(uuid.uuid4()) + + # Add to database using new system structure + await add_data_entry( + object_name=object_name, + key_list=[entry_uuid], # Use UUID as key like the new system + value=content + ) + + processed_count += 1 + + except Exception as e: + # Still process failed URLs but with error message + error_content = f"--- Failed to fetch from {url}: {str(e)} ---" + entry_uuid = str(uuid.uuid4()) + + await add_data_entry( + object_name=object_name, + key_list=[entry_uuid], + value=error_content + ) + + return JSONResponse( + content={ + "message": f"URLs scraped and saved under object_name: {object_name}", + "urls_processed": len(url_list), + "rows_created": processed_count, + "object_name": object_name + } + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +# Health check endpoint +@app.get("/health") +async def health_check(): + return {"status": "healthy"} + + + +# # Set lifetime for Skyvern/web-research events +# @asynccontextmanager +# async def lifespan(app: FastAPI): +# """ +# Lifespan event handler for startup and shutdown events +# """ +# # Startup +# logger.info("Starting Skyvern Research API") +# logger.info(f"Webhook base URL: {webhook_base_url}") +# logger.info(f"Skyvern API configured: {bool(skyvern_api_key)}") + +# # Start the cleanup task +# cleanup_task = asyncio.create_task(cleanup_old_tasks()) + +# yield # Application runs here + +# # Shutdown +# logger.info("Shutting down Skyvern Research API") +# cleanup_task.cancel() +# try: +# await cleanup_task +# except asyncio.CancelledError: +# logger.info("Cleanup task cancelled successfully") + diff --git a/clockwork_viking/readme.txt b/clockwork_viking/readme.txt new file mode 100644 index 0000000..96c268a --- /dev/null +++ b/clockwork_viking/readme.txt @@ -0,0 +1,9 @@ +Example Prompt for V0: + +Can you help me build a simple UI to interface with an API I have? The app should do the following: + +give the user a way to send in files, and send those files to an api endpoint as a POST request, to "http://localhost:8000/receive_files/". The input data should be called "filenames" and should be a list of List[UploadFile] objects as used by FastAPI. The other input should be called "object_names", and be the string "raw_files". + +Run the following API functions in the same location once the files are input: extract_text_from_files(object_name='raw_files', new_object_name='extracted_text'), extract_details_from_text(object_name='extracted_text', new_object_name='contact_info', extract_elements=['name', 'organization', 'phone number', 'email']) + +Every 15 seconds, try to POST a variable called object_name='contact_info' to the endpoint http://localhost:8000/return_data/. Once a 200 result is gotten from the endpoint, show the value of the returned object to the user. diff --git a/clockwork_viking/run_api_latest.sh b/clockwork_viking/run_api_latest.sh new file mode 100755 index 0000000..59ef905 --- /dev/null +++ b/clockwork_viking/run_api_latest.sh @@ -0,0 +1,2 @@ +# uvicorn api:app --reload +python -m uvicorn api_latest:app --reload --host 0.0.0.0 --port 5000 diff --git a/clockwork_viking/setup_mongodb.sh b/clockwork_viking/setup_mongodb.sh new file mode 100755 index 0000000..4180bb3 --- /dev/null +++ b/clockwork_viking/setup_mongodb.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +# Set your database and collection names +DATABASE_NAME="pipeline_db" +COLLECTION_NAME="data_objects" + +echo "Updating package list..." +sudo apt-get update + +echo "Installing gnupg and curl if not present..." +sudo apt-get install -y gnupg curl + +echo "Importing MongoDB public GPG Key..." +curl -fsSL https://pgp.mongodb.com/server-7.0.asc | \ + sudo gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg \ + --dearmor + +echo "Creating MongoDB source list..." + +echo "deb [ signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/7.0 multiverse" | \ + sudo tee /etc/apt/sources.list.d/mongodb-org-7.0.list +echo "Updating packages again..." +sudo apt-get update + +echo "Installing MongoDB..." +sudo apt-get install -y mongodb-org + +echo "Starting MongoDB..." +sudo systemctl start mongod +sudo systemctl enable mongod + +echo "Waiting for MongoDB to start..." +sleep 5 + +echo "Creating database '$DATABASE_NAME' and collection '$COLLECTION_NAME'..." +mongosh <"}, + skipTags=['script', 'style'] ): + # verify types + if hasattr(types,'__iter__') and not isinstance(types,type): + types = tuple([t for t in types if isinstance(t, type)]) + if not (types and isinstance(types,(type,tuple))): types = NavigableString + + # skip text in tags included in aRef + # skipTags += list(aRef.keys()) + + for descendant in self.descendants: + # yield extra strings according to aRef + if isinstance(descendant, Tag) and descendant.name in aRef: + extraStr = aRef[descendant.name](descendant) + if isinstance(extraStr, str): yield extraStr + + # skip text nodes DIRECTLY inside a Tag in aRef + # if descendant.parent.name in aRef: continue + + # skip ALL text nodes inside skipTags + if skipTags and descendant.find_parent(skipTags): continue + + # default behavior + if not isinstance(descendant, types): continue + + if strip: + descendant = descendant.strip() + if len(descendant) == 0: continue + yield descendant + + def get_text_plus(self, separator=" ", srcUrl=None, **aspArgs): + if srcUrl and isinstance(srcUrl, str): + def hrefStr(aTag): + href = aTag.get('href') + if not (href is None or href.startswith('javascript:')): + return f"<{urljoin(srcUrl, href)}>" + aspArgs.setdefault('aRef', {}) + aspArgs['aRef']['a'] = hrefStr + + return separator.join(self._all_strings_plus(**aspArgs)) + + + +class URLScraper: + + @staticmethod + def open_page(url): + chrome_options = Options() + chrome_options.add_argument("--headless") # Run in headless mode to capture full page + #chrome_options.add_argument("window-size=6709,5337") + #chrome_options.add_argument("window-size=1920,5337") + # Initialize the Chrome WebDriver with the specified options + driver = webdriver.Chrome(options=chrome_options) + # Open the URL in the browser + driver.get(url) + #time.sleep(3) # Wait for the page to load (you can adjust the waiting time if needed) + #driver.get(url) + soup = MyBeautifulSoup(driver.page_source, "html.parser") + print('TYPE OF SOUP:') + print(type(soup)) + return soup + + @staticmethod + def get_visible_text_and_links(url, use_selenium = False): + if use_selenium: + print('OPENING PAGE:') + soup = open_page(url) + print(type(soup)) + + # Regardless, Make a GET request to the URL to get the base url: + headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36' + } + response = requests.get(url, headers=headers) + # Expand relative links to absolute URLs + base_url = response.url + + # Check if the request was successful (status code 200) + if use_selenium or response.status_code == 200: + # Parse the HTML content of the page + if use_selenium == False: + soup = MyBeautifulSoup(response.text.encode('ascii', 'ignore').decode('ascii'), 'html.parser') + + # Extract visible text + visible_text = soup.body.get_text(' ', strip=True) + + else: + visible_text = soup.get_text_plus(srcUrl=base_url)# ' '.join(soup.stripped_strings) #' '.join(soup.stripped_strings) + #print(visible_text) + return visible_text + + links = {link.get('href'): {'url': urljoin(base_url, link.get('href')), 'index': visible_text.find(link.text.strip())+len(link.text.lstrip()),'link_text':link.text.strip()} for link in soup.find_all('a') if visible_text.find(link.text.strip())>0} + + link_replacements = [] + for link, info in links.items(): + #print(f"Link: {link}, URL: {info['url']}, Index: {info['index']}") + if link is not None and info['index']>=0: + link_replacements.append([info['index'],link]) + + link_replacements = sorted(link_replacements, key=lambda x: x[0],reverse=True) + for linkrep in link_replacements: + # NOTE: Turning off link replacement for this task + visible_text = visible_text[:linkrep[0]] + ' (' + linkrep[1] + ') ' + visible_text[linkrep[0]:] + # Output the text with expanded links + output = f"\n{visible_text}" + return output + + else: + return f'Failed to retrieve the page. Status code: {response.status_code}' + + + + @staticmethod + def extract_with_spacing(in_str: str, find_str: str, spacing: int) -> list: + results = [] + find_len = len(find_str) + + idx = 0 + while idx < len(in_str): + idx = in_str.find(find_str, idx) + if idx == -1: + break + + # Calculate start and end indices with spacing + start = max(0, idx - spacing) + end = min(len(in_str), idx + find_len + spacing) + + results.append(in_str[start:end]) + idx += 1 # move forward to find overlapping matches if any + + return results + + + @staticmethod + def create_keyword_results_csv(filename='keyword_results.csv'): + # Open the file in write mode initially + with open(filename, 'w', newline='', encoding='utf-8') as csvfile: + # Create a CSV writer object + csv_writer = csv.writer(csvfile) + + # Write the header row + csv_writer.writerow(['Scraped_URL', 'Company_Name', 'Website', 'Scraped_URLs', 'Page_Text']) + + return filename + + @staticmethod + def create_error_results_csv(filename='error_results.csv'): + # Open the file in write mode initially + with open(filename, 'w', newline='', encoding='utf-8') as csvfile: + # Create a CSV writer object + csv_writer = csv.writer(csvfile) + + # Write the header row + csv_writer.writerow(['Scraped_URL', 'Company_Name', 'Website', 'Scraped_URLs', 'Page_Text']) + + return filename + + @staticmethod + def append_keyword_result(filename, url_to_scrape, keyword, example): + # Open the file in append mode + with open(filename, 'a', newline='', encoding='utf-8') as csvfile: + # Create a CSV writer object + csv_writer = csv.writer(csvfile) + + # Write the data row + csv_writer.writerow([url_to_scrape, keyword, example]) + + @staticmethod + def append_error_result(filename, url_to_scrape, page_text,company_name, base_url, pip_urls): + # Open the file in append mode + with open(filename, 'a', newline='', encoding='utf-8') as csvfile: + # Create a CSV writer object + csv_writer = csv.writer(csvfile) + + # Write the data row + csv_writer.writerow([url_to_scrape, company_name, base_url, pip_urls, page_text]) + + @staticmethod + def append_good_result(filename, url_to_scrape, page_text, company_name, base_url, pip_urls): + # Open the file in append mode + with open(filename, 'a', newline='', encoding='utf-8') as csvfile: + # Create a CSV writer object + csv_writer = csv.writer(csvfile) + + # Write the data row + csv_writer.writerow([url_to_scrape, company_name, base_url, pip_urls, page_text]) + +# input_data = pd.read_json(args.input_file).to_dict(orient='records') +# # print(f"Input Data: {input_data}") +# # print(f"{type(input_data)}") + +# results_file = create_keyword_results_csv('./scraped_data/' + 'scraped_results.csv') +# errors_file = create_error_results_csv('./scraped_data/' + 'scraped_errors.csv') + +# for company in input_data: +# urls = list(company.get("People-info page URL")) +# company_name = str(company.get("Company Name")) +# base_url = str(company.get("Base URL")) +# pip_urls = str(company.get("People-info page URL")) +# print(f"Working on company: {company_name}") +# # urls = list(pd.read_json(args.input_file)['Address']) + +# for url_to_scrape in urls: +# try: +# web_page_text = '\n\n' + get_visible_text_and_links(url_to_scrape,use_selenium = False) +# if (web_page_text is None) or (len(web_page_text) < 50): +# print(f'\n\nSomething messed up! Possible too-short page. The web_page_text was: {web_page_text}\n\n') +# append_error_result(errors_file, url_to_scrape, web_page_text, company_name, base_url, pip_urls) +# else: +# print(f'Appending good result: {(results_file, url_to_scrape, web_page_text, company_name, base_url, pip_urls)}') +# append_good_result(results_file, url_to_scrape, web_page_text, company_name, base_url, pip_urls) + +# except Exception as e: +# print(f'Error: {e}') +# append_error_result(errors_file, url_to_scrape, web_page_text, company_name, base_url, pip_urls)