Update chats.py

clean up
This commit is contained in:
PVBLIC Foundation 2025-05-30 20:14:59 -07:00 committed by GitHub
parent 484133de4c
commit bcc2d7233d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -15,7 +15,7 @@ from sqlalchemy import or_, func, select, and_, text
from sqlalchemy.sql import exists
from sqlalchemy.sql.elements import TextClause
# Import JSONB for PostgreSQL support
# Import JSONB for PostgreSQL support - gracefully handle absence in other databases
try:
from sqlalchemy.dialects.postgresql import JSONB
except ImportError:
@ -25,18 +25,33 @@ except ImportError:
# Database Adapter
####################
class DatabaseType(Enum):
SQLITE = "sqlite"
POSTGRESQL_JSON = "postgresql_json"
POSTGRESQL_JSONB = "postgresql_jsonb"
UNSUPPORTED = "unsupported"
"""
Enumeration of supported database types and their JSON capabilities.
Used by DatabaseAdapter to determine optimal query strategies and
features available for each database backend.
"""
SQLITE = "sqlite" # SQLite with JSON1 extension
POSTGRESQL_JSON = "postgresql_json" # PostgreSQL with standard JSON
POSTGRESQL_JSONB = "postgresql_jsonb" # PostgreSQL with binary JSONB
UNSUPPORTED = "unsupported" # Unsupported database type
class DatabaseAdapter:
"""Centralized database-specific query generation with caching"""
def __init__(self, db):
"""
Initialize adapter with database session and setup caching.
Args:
db: SQLAlchemy database session for introspection and queries
"""
self.db = db
self.dialect = db.bind.dialect.name
# Cache database types to avoid repeated introspection queries
self._cache: Dict[str, DatabaseType] = {}
def get_database_type(self, column_name: str = "meta") -> DatabaseType:
@ -45,13 +60,20 @@ class DatabaseAdapter:
if cache_key in self._cache:
return self._cache[cache_key]
# Determine database type based on dialect and column inspection
if self.dialect == "sqlite":
result = DatabaseType.SQLITE
elif self.dialect == "postgresql":
result = DatabaseType.POSTGRESQL_JSONB if self._is_jsonb_column(column_name) else DatabaseType.POSTGRESQL_JSON
# Check if column uses JSONB or standard JSON
result = (
DatabaseType.POSTGRESQL_JSONB
if self._is_jsonb_column(column_name)
else DatabaseType.POSTGRESQL_JSON
)
else:
result = DatabaseType.UNSUPPORTED
# Cache result for future queries
self._cache[cache_key] = result
return result
@ -61,72 +83,99 @@ class DatabaseAdapter:
return False
try:
result = self.db.execute(text("""
# Query PostgreSQL system catalog for column data type
result = self.db.execute(
text(
"""
SELECT data_type FROM information_schema.columns
WHERE table_name = 'chat' AND column_name = :column_name
"""), {"column_name": column_name})
"""
),
{"column_name": column_name},
)
row = result.fetchone()
return row[0].lower() == 'jsonb' if row else False
return row[0].lower() == "jsonb" if row else False
except Exception:
# Gracefully handle permission or connection issues
return False
def _get_function_template(self, db_type: DatabaseType, function_type: str) -> Optional[str]:
def _get_function_template(
self, db_type: DatabaseType, function_type: str
) -> Optional[str]:
"""Get function template for specific database type and function"""
templates = {
DatabaseType.SQLITE: {
"tag_exists": "EXISTS (SELECT 1 FROM json_each({column}, '$.tags') WHERE json_each.value = :tag_id)",
# SQLite with JSON1 extension functions
"tag_exists": (
"EXISTS (SELECT 1 FROM json_each({column}, '$.tags') "
"WHERE json_each.value = :tag_id)"
),
"has_key": "json_extract({column}, '$.{path}') IS NOT NULL",
"array_length": "json_array_length({column}, '$.{path}')",
"array_elements": "json_each({column}, '$.{path}')",
"content_search": """EXISTS (
SELECT 1 FROM json_each({column}, '$.messages') AS message
WHERE LOWER(message.value->>'content') LIKE '%' || :search_text || '%'
)"""
)""",
},
DatabaseType.POSTGRESQL_JSON: {
"tag_exists": "EXISTS (SELECT 1 FROM json_array_elements_text({column}->'tags') elem WHERE elem = :tag_id)",
# PostgreSQL with standard JSON operators
"tag_exists": (
"EXISTS (SELECT 1 FROM json_array_elements_text({column}->'tags') "
"elem WHERE elem = :tag_id)"
),
"has_key": "{column} ? '{path}'",
"array_length": "json_array_length({column}->'{path}')",
"array_elements": "json_array_elements({column}->'{path}')",
"content_search": """EXISTS (
SELECT 1 FROM json_array_elements({column}->'messages') AS message
WHERE LOWER(message->>'content') LIKE '%' || :search_text || '%'
)"""
)""",
},
DatabaseType.POSTGRESQL_JSONB: {
"tag_exists": "EXISTS (SELECT 1 FROM jsonb_array_elements_text({column}->'tags') elem WHERE elem = :tag_id)",
# PostgreSQL with optimized JSONB binary format
"tag_exists": (
"EXISTS (SELECT 1 FROM jsonb_array_elements_text({column}->'tags') "
"elem WHERE elem = :tag_id)"
),
"has_key": "{column} ? '{path}'",
"array_length": "jsonb_array_length({column}->'{path}')",
"array_elements": "jsonb_array_elements({column}->'{path}')",
"content_search": """EXISTS (
SELECT 1 FROM jsonb_array_elements({column}->'messages') AS message
WHERE LOWER(message->>'content') LIKE '%' || :search_text || '%'
)"""
}
)""",
},
}
return templates.get(db_type, {}).get(function_type)
def build_tag_filter(self, column_name: str, tag_ids: List[str], match_all: bool = True) -> Optional[Union[TextClause, and_, or_]]:
def build_tag_filter(
self, column_name: str, tag_ids: List[str], match_all: bool = True
) -> Optional[Union[TextClause, and_, or_]]:
"""Build optimized tag filtering query"""
if not tag_ids:
return None
# Get database-specific template for tag existence checks
db_type = self.get_database_type(column_name)
template = self._get_function_template(db_type, "tag_exists")
if not template:
return None
# Replace column placeholder with actual column reference
query_template = template.replace("{column}", f"Chat.{column_name}")
if match_all:
return and_(*[
text(query_template).params(tag_id=tag_id)
for tag_id in tag_ids
])
# AND logic: all tags must be present
return and_(
*[text(query_template).params(tag_id=tag_id) for tag_id in tag_ids]
)
else:
# OR logic: any tag can be present
# Use separate parameters to avoid conflicts
conditions = []
params = {}
for idx, tag_id in enumerate(tag_ids):
@ -145,6 +194,7 @@ class DatabaseAdapter:
if not template:
return None
# Replace column placeholder and bind search parameter
query = template.replace("{column}", "Chat.chat")
return text(query).params(search_text=search_text)
@ -152,78 +202,102 @@ class DatabaseAdapter:
"""Build filter for chats without tags"""
db_type = self.get_database_type(column_name)
# Get templates for key existence and array length checks
has_key_template = self._get_function_template(db_type, "has_key")
array_length_template = self._get_function_template(db_type, "array_length")
if not has_key_template or not array_length_template:
return None
has_key = has_key_template.replace("{column}", f"Chat.{column_name}").replace("{path}", "tags")
array_length = array_length_template.replace("{column}", f"Chat.{column_name}").replace("{path}", "tags")
return or_(
text(f"NOT ({has_key})"),
text(f"{array_length} = 0")
# Build conditions for missing key and empty array
has_key = has_key_template.replace("{column}", f"Chat.{column_name}").replace(
"{path}", "tags"
)
array_length = array_length_template.replace(
"{column}", f"Chat.{column_name}"
).replace("{path}", "tags")
# Return OR condition: no tags key OR empty tags array
return or_(text(f"NOT ({has_key})"), text(f"{array_length} = 0"))
####################
# Utility Functions
####################
def normalize_tag_name(tag_name: str) -> str:
"""Normalize tag name for consistent storage and querying"""
return tag_name.replace(" ", "_").lower()
def normalize_tag_names(tag_names: List[str]) -> List[str]:
"""Normalize multiple tag names"""
return [normalize_tag_name(tag) for tag in tag_names]
####################
# Chat DB Schema
####################
# Initialize logging with appropriate level for model operations
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])
class Chat(Base):
__tablename__ = "chat"
# Primary identifier - UUID for global uniqueness
id = Column(String, primary_key=True)
# User ownership - references user system
user_id = Column(String)
# Human-readable chat title for UI display
title = Column(Text)
chat = Column(JSON) # For JSONB support, change to: Column(JSONB) if JSONB else Column(JSON)
# Full chat data including messages, history, and configuration
chat = Column(
JSON
) # For JSONB support, change to: Column(JSONB) if JSONB else Column(JSON)
created_at = Column(BigInteger)
updated_at = Column(BigInteger)
# Timestamp fields (Unix epoch for efficiency and timezone independence)
created_at = Column(BigInteger) # Creation timestamp
updated_at = Column(BigInteger) # Last modification timestamp
# Sharing functionality - UUID for shared public links
share_id = Column(Text, unique=True, nullable=True)
archived = Column(Boolean, default=False)
pinned = Column(Boolean, default=False, nullable=True)
# Organization and state flags
archived = Column(Boolean, default=False) # Hidden from main view
pinned = Column(Boolean, default=False, nullable=True) # Pinned to top
meta = Column(JSON, server_default="{}") # For JSONB support, change to: Column(JSONB, server_default="{}") if JSONB else Column(JSON, server_default="{}")
# Extensible metadata storage (tags, custom fields, etc.)
# For JSONB support, change to:
# Column(JSONB, server_default="{}") if JSONB else Column(JSON, server_default="{}")
meta = Column(JSON, server_default="{}")
# Optional folder organization
folder_id = Column(Text, nullable=True)
class ChatModel(BaseModel):
model_config = ConfigDict(from_attributes=True)
# Core identification and ownership
id: str
user_id: str
title: str
chat: dict
# Timestamps (Unix epoch seconds)
created_at: int # timestamp in epoch
updated_at: int # timestamp in epoch
share_id: Optional[str] = None
archived: bool = False
pinned: Optional[bool] = False
# Optional fields with defaults
share_id: Optional[str] = None # Public sharing identifier
archived: bool = False # Archive status
pinned: Optional[bool] = False # Pin status (nullable for compatibility)
meta: dict = {}
folder_id: Optional[str] = None
# Extensible fields
meta: dict = {} # Metadata including tags
folder_id: Optional[str] = None # Folder organization
####################
@ -232,39 +306,90 @@ class ChatModel(BaseModel):
class ChatForm(BaseModel):
"""
Form model for creating new chats with basic data validation.
Used for API endpoints that create new chat conversations.
Validates that the required chat data structure is present.
"""
chat: dict
class ChatImportForm(ChatForm):
meta: Optional[dict] = {}
pinned: Optional[bool] = False
folder_id: Optional[str] = None
"""
Extended form model for importing chats with additional metadata.
Used for bulk import operations or when creating chats with
pre-existing metadata, tags, or organizational settings.
Extends ChatForm with optional metadata fields that are not
required for basic chat creation but useful for import scenarios.
"""
meta: Optional[dict] = {} # Tags and other metadata
pinned: Optional[bool] = False # Pin status
folder_id: Optional[str] = None # Folder assignment
class ChatTitleMessagesForm(BaseModel):
"""
Form model for operations that need title and messages separately.
Used by endpoints that work with chat titles and message lists
independently, such as chat generation or title updates.
"""
title: str
messages: list[dict]
class ChatTitleForm(BaseModel):
"""
Simple form model for chat title updates.
Used by endpoints that only modify the chat title without
affecting the conversation content or metadata.
"""
title: str
class ChatResponse(BaseModel):
"""
Complete response model for chat API endpoints.
Provides a comprehensive view of chat data for API responses,
including all fields that clients might need for display,
organization, and functionality.
Used by endpoints that return full chat information to ensure
consistent response structure across the API.
"""
# Core chat identification and data
id: str
user_id: str
title: str
chat: dict
# Timestamps for sorting and display
updated_at: int # timestamp in epoch
created_at: int # timestamp in epoch
# Sharing and organization
share_id: Optional[str] = None # id of the chat to be shared
archived: bool
pinned: Optional[bool] = False
meta: dict = {}
folder_id: Optional[str] = None
archived: bool # archive status
pinned: Optional[bool] = False # pin status
# Extensible metadata and organization
meta: dict = {} # tags and other metadata
folder_id: Optional[str] = None # folder assignment
class ChatTitleIdResponse(BaseModel):
"""
Lightweight response model for chat list operations.
Optimized for chat list views that only need basic identification
and sorting information. Reduces payload size for better performance
when displaying large numbers of chats.
Used by endpoints that return chat lists, search results, or
navigation menus where full chat content is not needed.
"""
id: str
title: str
updated_at: int
@ -273,6 +398,13 @@ class ChatTitleIdResponse(BaseModel):
class ChatTable:
def __init__(self):
"""
Initialize the ChatTable with default configuration.
No initialization parameters required as the class uses
dependency injection through the get_db() context manager
for database sessions.
"""
pass
def _get_adapter(self, db) -> DatabaseAdapter:
@ -301,10 +433,14 @@ class ChatTable:
meta_type = adapter.get_database_type("meta")
chat_type = adapter.get_database_type("chat")
# Build comprehensive compatibility report
compatibility = {
"database_type": dialect_name,
"json_support": meta_type != DatabaseType.UNSUPPORTED,
"jsonb_support": meta_type == DatabaseType.POSTGRESQL_JSONB or chat_type == DatabaseType.POSTGRESQL_JSONB,
"jsonb_support": (
meta_type == DatabaseType.POSTGRESQL_JSONB
or chat_type == DatabaseType.POSTGRESQL_JSONB
),
"gin_indexes_support": dialect_name == "postgresql",
"tag_filtering_support": meta_type != DatabaseType.UNSUPPORTED,
"advanced_search_support": chat_type != DatabaseType.UNSUPPORTED,
@ -312,17 +448,30 @@ class ChatTable:
"chat_column_type": chat_type.value,
"features": [],
"limitations": [],
"recommendations": []
"recommendations": [],
}
# Add features based on database type
# Add database-specific features and limitations
if dialect_name == "sqlite":
compatibility["features"] = ["JSON1 extension", "Basic tag filtering", "Message search"]
compatibility["limitations"] = ["No GIN indexes", "Limited JSON optimization"]
compatibility["features"] = [
"JSON1 extension",
"Basic tag filtering",
"Message search",
]
compatibility["limitations"] = [
"No GIN indexes",
"Limited JSON optimization",
]
elif dialect_name == "postgresql":
compatibility["features"] = ["Full JSON/JSONB support", "GIN indexes", "Advanced filtering"]
compatibility["features"] = [
"Full JSON/JSONB support",
"GIN indexes",
"Advanced filtering",
]
if compatibility["jsonb_support"]:
compatibility["features"].append("JSONB binary format optimization")
compatibility["features"].append(
"JSONB binary format optimization"
)
return compatibility
@ -330,14 +479,13 @@ class ChatTable:
log.error(f"Error checking database compatibility: {e}")
return {"error": str(e), "database_type": "unknown"}
def create_gin_indexes(self) -> bool:
"""Create GIN indexes on JSONB columns for better query performance"""
try:
with get_db() as db:
adapter = self._get_adapter(db)
# Only PostgreSQL supports GIN indexes
if db.bind.dialect.name != "postgresql":
return False
@ -347,41 +495,72 @@ class ChatTable:
has_jsonb_meta = meta_type == DatabaseType.POSTGRESQL_JSONB
has_jsonb_chat = chat_type == DatabaseType.POSTGRESQL_JSONB
# Need at least one JSONB column for GIN indexes
if not (has_jsonb_meta or has_jsonb_chat):
return False
# Create GIN indexes
# Create GIN indexes for meta column if it's JSONB
if has_jsonb_meta:
try:
db.execute(text("""
# General meta column index for all JSON operations
db.execute(
text(
"""
CREATE INDEX IF NOT EXISTS idx_chat_meta_gin
ON chat USING GIN (meta)
"""))
db.execute(text("""
"""
)
)
# Specific index for tags array operations
db.execute(
text(
"""
CREATE INDEX IF NOT EXISTS idx_chat_meta_tags_gin
ON chat USING GIN ((meta->'tags'))
"""))
db.execute(text("""
"""
)
)
# BTREE index for tag existence and count operations
db.execute(
text(
"""
CREATE INDEX IF NOT EXISTS idx_chat_has_tags
ON chat USING BTREE ((meta ? 'tags' AND jsonb_array_length(meta->'tags') > 0))
ON chat USING BTREE ((meta ? 'tags' AND
jsonb_array_length(meta->'tags') > 0))
WHERE meta ? 'tags'
"""))
"""
)
)
except Exception:
# Continue if some indexes fail - partial optimization is better than none
pass
# Create GIN indexes for chat column if it's JSONB
if has_jsonb_chat:
try:
db.execute(text("""
# General chat content index for all operations
db.execute(
text(
"""
CREATE INDEX IF NOT EXISTS idx_chat_chat_gin
ON chat USING GIN (chat)
"""))
db.execute(text("""
"""
)
)
# Specific index for message search operations
db.execute(
text(
"""
CREATE INDEX IF NOT EXISTS idx_chat_messages_gin
ON chat USING GIN ((chat->'messages'))
"""))
"""
)
)
except Exception:
# Continue if some indexes fail
pass
# Commit all index creations
db.commit()
return True
@ -398,21 +577,24 @@ class ChatTable:
if db.bind.dialect.name != "postgresql":
return {"error": "GIN indexes are only supported on PostgreSQL"}
result = db.execute(text("""
# Query PostgreSQL system catalog for GIN indexes on chat table
result = db.execute(
text(
"""
SELECT indexname, indexdef
FROM pg_indexes
WHERE tablename = 'chat'
AND indexdef LIKE '%USING gin%'
"""))
"""
)
)
# Process existing indexes
indexes = {}
for row in result:
indexes[row[0]] = {
"exists": True,
"definition": row[1]
}
indexes[row[0]] = {"exists": True, "definition": row[1]}
# Check for expected indexes
# Check for all expected indexes
expected_indexes = [
"idx_chat_meta_gin",
"idx_chat_chat_gin",
@ -420,9 +602,10 @@ class ChatTable:
"idx_chat_has_tags",
"idx_chat_tag_count",
"idx_chat_json_tags",
"idx_chat_messages_gin"
"idx_chat_messages_gin",
]
# Mark missing indexes
for idx_name in expected_indexes:
if idx_name not in indexes:
indexes[idx_name] = {"exists": False}
@ -440,6 +623,7 @@ class ChatTable:
if db.bind.dialect.name != "postgresql":
return False
# List of all indexes that might have been created
indexes_to_drop = [
"idx_chat_meta_gin",
"idx_chat_chat_gin",
@ -447,13 +631,17 @@ class ChatTable:
"idx_chat_has_tags",
"idx_chat_tag_count",
"idx_chat_json_tags",
"idx_chat_messages_gin"
"idx_chat_messages_gin",
]
# Drop each index with CONCURRENTLY to avoid blocking
for idx_name in indexes_to_drop:
try:
db.execute(text(f"DROP INDEX CONCURRENTLY IF EXISTS {idx_name}"))
db.execute(
text(f"DROP INDEX CONCURRENTLY IF EXISTS {idx_name}")
)
except Exception:
# Continue dropping other indexes even if one fails
pass
db.commit()
@ -477,51 +665,76 @@ class ChatTable:
indexes_created = []
if has_jsonb_meta:
# JSONB-specific tag indexes
# JSONB-specific tag indexes for optimal performance
tag_indexes = [
{
"name": "idx_chat_meta_tags_gin",
"sql": "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_meta_tags_gin ON chat USING GIN ((meta->'tags'))",
"purpose": "Fast tag containment queries (@>, ?, etc.)"
"sql": (
"CREATE INDEX CONCURRENTLY IF NOT EXISTS "
"idx_chat_meta_tags_gin ON chat USING GIN ((meta->'tags'))"
),
"purpose": "Fast tag containment queries (@>, ?, etc.)",
},
{
"name": "idx_chat_has_tags",
"sql": "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_has_tags ON chat USING BTREE ((meta ? 'tags' AND jsonb_array_length(meta->'tags') > 0)) WHERE meta ? 'tags'",
"purpose": "Fast filtering for chats with/without tags"
"sql": (
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_has_tags "
"ON chat USING BTREE ((meta ? 'tags' AND "
"jsonb_array_length(meta->'tags') > 0)) WHERE meta ? 'tags'"
),
"purpose": "Fast filtering for chats with/without tags",
},
{
"name": "idx_chat_tag_count",
"sql": "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_tag_count ON chat USING BTREE ((jsonb_array_length(meta->'tags'))) WHERE meta ? 'tags'",
"purpose": "Fast filtering by number of tags"
"sql": (
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_tag_count "
"ON chat USING BTREE ((jsonb_array_length(meta->'tags'))) "
"WHERE meta ? 'tags'"
),
"purpose": "Fast filtering by number of tags",
},
{
"name": "idx_chat_specific_tags",
"sql": "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_specific_tags ON chat USING GIN ((meta->'tags')) WHERE jsonb_array_length(meta->'tags') > 0",
"purpose": "Optimized for chats that actually have tags"
}
"sql": (
"CREATE INDEX CONCURRENTLY IF NOT EXISTS "
"idx_chat_specific_tags ON chat USING GIN ((meta->'tags')) "
"WHERE jsonb_array_length(meta->'tags') > 0"
),
"purpose": "Optimized for chats that actually have tags",
},
]
else:
# JSON-specific tag indexes (less optimal but still helpful)
tag_indexes = [
{
"name": "idx_chat_json_tags",
"sql": "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_json_tags ON chat USING GIN ((meta->'tags')) WHERE meta ? 'tags'",
"purpose": "Tag queries for JSON columns"
"sql": (
"CREATE INDEX CONCURRENTLY IF NOT EXISTS "
"idx_chat_json_tags ON chat USING GIN ((meta->'tags')) "
"WHERE meta ? 'tags'"
),
"purpose": "Tag queries for JSON columns",
}
]
# Create each index with error handling
for index_info in tag_indexes:
try:
db.execute(text(index_info["sql"]))
indexes_created.append(f"{index_info['name']} ({index_info['purpose']})")
indexes_created.append(
f"{index_info['name']} ({index_info['purpose']})"
)
log.info(f"Created tag index: {index_info['name']}")
except Exception as e:
log.warning(f"Failed to create {index_info['name']}: {e}")
db.commit()
# Log results
if indexes_created:
log.info(f"Successfully created tag indexes: {len(indexes_created)} indexes")
log.info(
f"Successfully created tag indexes: {len(indexes_created)} indexes"
)
for idx in indexes_created:
log.info(f"{idx}")
else:
@ -545,54 +758,77 @@ class ChatTable:
stats = {}
# Get basic tag statistics
result = db.execute(text("""
# Get comprehensive tag usage statistics
result = db.execute(
text(
"""
SELECT
COUNT(*) as total_chats,
COUNT(*) FILTER (WHERE meta ? 'tags') as chats_with_tags,
COUNT(*) FILTER (WHERE meta ? 'tags' AND jsonb_array_length(meta->'tags') > 0) as chats_with_actual_tags,
AVG(CASE WHEN meta ? 'tags' THEN jsonb_array_length(meta->'tags') ELSE 0 END) as avg_tags_per_chat
COUNT(*) FILTER (WHERE meta ? 'tags' AND
jsonb_array_length(meta->'tags') > 0) as chats_with_actual_tags,
AVG(CASE WHEN meta ? 'tags' THEN
jsonb_array_length(meta->'tags') ELSE 0 END) as avg_tags_per_chat
FROM chat
"""))
"""
)
)
row = result.fetchone()
if row:
stats.update({
"total_chats": row[0],
"chats_with_tags": row[1],
"chats_with_actual_tags": row[2],
"avg_tags_per_chat": float(row[3]) if row[3] else 0
})
stats.update(
{
"total_chats": row[0],
"chats_with_tags": row[1],
"chats_with_actual_tags": row[2],
"avg_tags_per_chat": float(row[3]) if row[3] else 0,
}
)
# Get most common tags
result = db.execute(text("""
# Get most frequently used tags for optimization insights
result = db.execute(
text(
"""
SELECT tag_value, COUNT(*) as usage_count
FROM chat, jsonb_array_elements_text(meta->'tags') as tag_value
WHERE meta ? 'tags'
GROUP BY tag_value
ORDER BY usage_count DESC
LIMIT 10
"""))
"""
)
)
stats["top_tags"] = [{"tag": row[0], "count": row[1]} for row in result]
# Check index usage
# Check current index status for recommendations
indexes = self.check_gin_indexes()
tag_indexes = {k: v for k, v in indexes.items() if "tag" in k.lower()}
stats["tag_indexes"] = tag_indexes
# Provide recommendations
# Generate intelligent recommendations based on usage patterns
recommendations = []
if stats["chats_with_actual_tags"] > 1000:
recommendations.append("Consider creating tag-specific indexes for better performance")
recommendations.append(
"Consider creating tag-specific indexes for better performance"
)
if stats["avg_tags_per_chat"] > 5:
recommendations.append("High tag usage detected - GIN indexes will provide significant benefits")
recommendations.append(
"High tag usage detected - GIN indexes will provide "
"significant benefits"
)
tag_coverage = stats["chats_with_actual_tags"] / stats["total_chats"] if stats["total_chats"] > 0 else 0
tag_coverage = (
stats["chats_with_actual_tags"] / stats["total_chats"]
if stats["total_chats"] > 0
else 0
)
if tag_coverage < 0.1:
recommendations.append("Low tag usage - consider partial indexes with WHERE clauses")
recommendations.append(
"Low tag usage - consider partial indexes with WHERE clauses"
)
stats["recommendations"] = recommendations
stats["tag_coverage_percentage"] = round(tag_coverage * 100, 2)
@ -687,12 +923,15 @@ class ChatTable:
if chat is None:
return None
# Remove all existing tags from this chat
self.delete_all_tags_by_id_and_user_id(id, user.id)
# Clean up orphaned tags (tags no longer used by any chat)
for tag in chat.meta.get("tags", []):
if self.count_chats_by_tag_name_and_user_id(tag, user.id) == 0:
Tags.delete_tag_by_name_and_user_id(tag, user.id)
# Add new tags to the chat
for tag_name in tags:
if tag_name.lower() == "none":
continue
@ -733,6 +972,7 @@ class ChatTable:
chat = chat.chat
history = chat.get("history", {})
# Upsert the message (merge with existing or create new)
if message_id in history.get("messages", {}):
history["messages"][message_id] = {
**history["messages"][message_id],
@ -741,6 +981,7 @@ class ChatTable:
else:
history["messages"][message_id] = message
# Update current message pointer for conversation flow
history["currentId"] = message_id
chat["history"] = history
@ -756,6 +997,7 @@ class ChatTable:
chat = chat.chat
history = chat.get("history", {})
# Add status to message's status history if message exists
if message_id in history.get("messages", {}):
status_history = history["messages"][message_id].get("statusHistory", [])
status_history.append(status)
@ -975,7 +1217,8 @@ class ChatTable:
all_chats = query.all()
# result has to be destructured from sqlalchemy `row` and mapped to a dict since the `ChatModel`is not the returned dataclass.
# result has to be destructured from sqlalchemy `row` and mapped to a dict
# since the `ChatModel` is not the returned dataclass.
return [
ChatTitleIdResponse.model_validate(
{
@ -1013,7 +1256,8 @@ class ChatTable:
try:
with get_db() as db:
# it is possible that the shared link was deleted. hence,
# we check if the chat is still shared by checking if a chat with the share_id exists
# we check if the chat is still shared by checking if a chat with
# the share_id exists
chat = db.query(Chat).filter_by(share_id=id).first()
if chat:
@ -1078,7 +1322,8 @@ class ChatTable:
limit: int = 60,
) -> list[ChatModel]:
"""
Filters chats based on a search query using Python, allowing pagination using skip and limit.
Filters chats based on a search query using Python, allowing pagination
using skip and limit.
"""
search_text = search_text.lower().strip()
@ -1089,7 +1334,8 @@ class ChatTable:
search_text_words = search_text.split(" ")
# search_text might contain 'tag:tag_name' format so we need to extract the tag_name, split the search_text and remove the tags
# search_text might contain 'tag:tag_name' format so we need to extract
# the tag_name, split the search_text and remove the tags
tag_ids = [
normalize_tag_name(word.replace("tag:", ""))
for word in search_text_words
@ -1207,7 +1453,12 @@ class ChatTable:
return []
def get_chats_by_multiple_tags(
self, user_id: str, tag_names: List[str], match_all: bool = True, skip: int = 0, limit: int = 50
self,
user_id: str,
tag_names: List[str],
match_all: bool = True,
skip: int = 0,
limit: int = 50,
) -> list[ChatModel]:
"""Get chats that match multiple tags"""
with get_db() as db:
@ -1231,7 +1482,9 @@ class ChatTable:
else:
return []
def get_chats_without_tags(self, user_id: str, skip: int = 0, limit: int = 50) -> list[ChatModel]:
def get_chats_without_tags(
self, user_id: str, skip: int = 0, limit: int = 50
) -> list[ChatModel]:
"""Get chats that have no tags"""
with get_db() as db:
adapter = self._get_adapter(db)