Source code for mindroot.coreplugins.chat.router

from fastapi import APIRouter, HTTPException, Request, Response, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi import File, UploadFile, Form
from sse_starlette.sse import EventSourceResponse
from .models import MessageParts
from lib.providers.services import service, service_manager
from .services import init_chat_session, send_message_to_agent, subscribe_to_agent_messages, get_chat_history, run_task
from lib.templates import render
from lib.auth.auth import require_user
from lib.plugins import list_enabled
import nanoid
from lib.providers.commands import *
import asyncio
from lib.chatcontext import get_context, ChatContext
from typing import List
from lib.providers.services import service, service_manager
from lib.providers.commands import command_manager
from lib.utils.debug import debug_box
from lib.session_files import load_session_data, save_session_data
import os
import json
from lib.chatcontext import ChatContext
import shutil
from pydantic import BaseModel
from lib.auth.api_key import verify_api_key

router = APIRouter()

# Global dictionary to store tasks
tasks = {}

[docs] @router.post("/chat/{log_id}/{task_id}/cancel") async def cancel_chat(request: Request, log_id: str, task_id: str): debug_box("cancel_chat") print("Trying to cancel task", task_id) user = request.state.user.username context = await get_context(log_id, user) debug_box(str(context)) context.data['finished_conversation'] = True #if task_id in tasks: # task = tasks[task_id] # await asyncio.sleep(0.75) # task.cancel() # del tasks[task_id] return {"status": "ok", "message": "Task cancelled successfully"}
#else: # raise HTTPException(status_code=404, detail="Task not found")
[docs] @router.get("/context1/{log_id}") async def context1(request: Request, log_id: str): user = request.state.user.username context = await get_context(log_id, user) print(context) return "ok"
# need to serve persona images from ./personas/local/[persona_path]/avatar.png
[docs] @router.get("/chat/personas/{persona_path:path}/avatar.png") async def get_persona_avatar(persona_path: str): # Check if this is a registry persona with deduplicated assets if persona_path.startswith("registry/"): persona_json_path = f"personas/{persona_path}/persona.json" if os.path.exists(persona_json_path): try: with open(persona_json_path, "r") as f: persona_data = json.load(f) # Check if persona has asset hashes (deduplicated storage) asset_hashes = persona_data.get("asset_hashes", {}) if "avatar" in asset_hashes: # Redirect to deduplicated asset endpoint return RedirectResponse(f"/assets/{asset_hashes['avatar']}") except Exception as e: print(f"Error checking for deduplicated assets: {e}") # Handle registry personas: registry/owner/name if persona_path.startswith('registry/'): file_path = f"personas/{persona_path}/avatar.png" else: # Legacy support: check local first, then shared file_path = f"personas/local/{persona_path}/avatar.png" if not os.path.exists(file_path): file_path = f"personas/registry/{persona_path}/avatar.png" if not os.path.exists(file_path): resolved = os.path.realpath(file_path) return {"error": "File not found: " + resolved} with open(file_path, "rb") as f: image_bytes = f.read() return Response( content=image_bytes, media_type="image/png", headers={ "Cache-Control": "max-age=3600", "Content-Disposition": "inline; filename=avatar.png" } )
[docs] @router.get("/chat/personas/{persona_path:path}/faceref.png") async def get_persona_faceref(persona_path: str): # Check if this is a registry persona with deduplicated assets if persona_path.startswith("registry/"): persona_json_path = f"personas/{persona_path}/persona.json" if os.path.exists(persona_json_path): try: with open(persona_json_path, "r") as f: persona_data = json.load(f) # Check if persona has asset hashes (deduplicated storage) asset_hashes = persona_data.get("asset_hashes", {}) if "faceref" in asset_hashes: # Redirect to deduplicated asset endpoint return RedirectResponse(f"/assets/{asset_hashes['faceref']}") except Exception as e: print(f"Error checking for deduplicated assets: {e}") # Handle registry personas: registry/owner/name if persona_path.startswith('registry/'): file_path = f"personas/{persona_path}/faceref.png" else: # Legacy support: check local first, then shared file_path = f"personas/local/{persona_path}/faceref.png" if not os.path.exists(file_path): file_path = f"personas/registry/{persona_path}/faceref.png" if not os.path.exists(file_path): # Fallback to avatar if faceref doesn't exist return RedirectResponse(f"/chat/personas/{persona_path}/avatar.png") with open(file_path, "rb") as f: image_bytes = f.read() return Response( content=image_bytes, media_type="image/png", headers={ "Cache-Control": "max-age=3600", "Content-Disposition": "inline; filename=faceref.png" } )
[docs] @router.get("/chat/{log_id}/events") async def chat_events(log_id: str): return EventSourceResponse(await subscribe_to_agent_messages(log_id))
[docs] @router.post("/chat/{log_id}/send") async def send_message(request: Request, log_id: str, message_parts: List[MessageParts] ): user = request.state.user debug_box("send_message") context = await get_context(log_id, user.username) debug_box(str(context)) #context = ChatContext(command_manager, service_manager, user=user.user) task = asyncio.create_task(send_message_to_agent(log_id, message_parts, context=context, user=user)) #task = asyncio.create_task(send_message_to_agent(log_id, message_parts, user=user)) task_id = nanoid.generate() tasks[task_id] = task return {"status": "ok", "task_id": task_id}
[docs] @router.get("/agent/{agent_name}", response_class=HTMLResponse) async def get_chat_html(request: Request, agent_name: str, api_key: str = Query(None), embed: bool = Query(False)): # Handle API key authentication if provided if api_key: try: user_data = await verify_api_key(api_key) if not user_data: raise HTTPException(status_code=401, detail="Invalid API key") # Create a mock user object for API key users class MockUser: def __init__(self, username): self.username = username user = MockUser(user_data['username']) except Exception as e: raise HTTPException(status_code=401, detail="Invalid API key") else: # Use regular authentication if not hasattr(request.state, "user"): return RedirectResponse("/login") user = request.state.user log_id = nanoid.generate() plugins = list_enabled() print("Init chat with user", user) print(f"Init chat with {agent_name}") await init_chat_session(user, agent_name, log_id) if hasattr(request.state, "access_token"): debug_box("Access token found in request state, saving to session file") access_token = request.state.access_token await save_session_data(log_id, "access_token", access_token) print("..") debug_box("Access token saved to session file") else: debug_box("No access token found in request state") # If embed mode is requested, redirect to embed session if embed: return RedirectResponse(f"/session/{agent_name}/{log_id}?embed=true") # Regular redirect return RedirectResponse(f"/session/{agent_name}/{log_id}")
[docs] @router.get("/makesession/{agent_name}") async def make_session(request: Request, agent_name: str): """ Create a new chat session for the specified agent. Returns a redirect to the chat session page. """ if not hasattr(request.state, "user"): return RedirectResponse("/login") user = request.state.user log_id = nanoid.generate() await init_chat_session(user, agent_name, log_id) return JSONResponse({ "log_id": log_id })
[docs] @router.get("/history/{agent_name}/{log_id}") async def chat_history(request: Request, agent_name: str, log_id: str): user = request.state.user.username history = await get_chat_history(agent_name, log_id, user) if history is None or len(history) == 0: try: print("trying to load from system session") history = await get_chat_history(agent_name, log_id, "system") except Exception as e: print("Error loading from system session:", e) history = [] pass return history
[docs] @router.get("/session/{agent_name}/{log_id}") async def chat_session(request: Request, agent_name: str, log_id: str, embed: bool = Query(False)): # Check authentication (API key or regular user) plugins = list_enabled() if not hasattr(request.state, "user"): return RedirectResponse("/login") user = request.state.user agent = await service_manager.get_agent_data(agent_name) persona = agent['persona']['name'] print("persona is:", persona) auth_token = None try: auth_token = await load_session_data(log_id, "access_token") except: pass chat_data = {"log_id": log_id, "agent_name": agent_name, "user": user, "persona": persona } if auth_token is not None: chat_data["access_token"] = auth_token # Add embed mode flag if embed: chat_data["embed_mode"] = True html = await render('chat', chat_data) return HTMLResponse(html) # use starlette staticfiles to mount ./imgs app.mount("/published", StaticFiles(directory=str(published_dir)), name="published_indices")
[docs] class TaskRequest(BaseModel): instructions: str
[docs] @router.post("/task/{agent_name}") async def run_task_route(request: Request, agent_name: str, task_request: TaskRequest = None): """ Run a task for an agent with the given instructions. This endpoint allows programmatic interaction with agents without a full chat session. Parameters: - agent_name: The name of the agent to run the task - instructions: The instructions/prompt to send to the agent Returns: - JSON with results and log_id for tracking """ user = request.state.user.username instructions = None if task_request is not None: instructions = task_request.instructions if not instructions: return {"status": "error", "message": "No instructions provided"} task_result, full_results, log_id = await run_task(instructions=instructions, agent_name=agent_name, user=user) print(task_result) print(full_results) print(log_id) return {"status": "ok", "results": task_result, "full_results": full_results, "log_id": log_id}
[docs] @router.post("/chat/{log_id}/upload") async def upload_file(request: Request, log_id: str, file: UploadFile = File(...)): """ Upload a file and store it in a user-specific directory. Returns the file path that can be used in messages. """ user = request.state.user.username # Create user uploads directory if it doesn't exist user_upload_dir = f"data/users/{user}/uploads/{log_id}" os.makedirs(user_upload_dir, exist_ok=True) # Generate a safe filename to prevent path traversal filename = os.path.basename(file.filename) file_path = os.path.join(user_upload_dir, filename) # Save the file with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Return the file information return { "status": "ok", "filename": filename, "path": file_path, "mime_type": file.content_type }
from lib.chatlog import count_tokens_for_log_id
[docs] @router.get("/chat/{log_id}/tokens") async def get_token_count(request: Request, log_id: str): """ Get token counts for a chat log identified by log_id, including any delegated tasks. Parameters: - log_id: The log ID to count tokens for Returns: - JSON with token counts or error message if log not found """ token_counts = await count_tokens_for_log_id(log_id) if token_counts is None: return {"status": "error", "message": f"Chat log with ID {log_id} not found"} return {"status": "ok", "token_counts": token_counts}
[docs] @router.get("/chat/del_session/{log_id}") async def delete_chat_session(request: Request, log_id: str, user=Depends(require_user)): """ Delete a chat session by log_id, including chat logs, context files, and all child sessions. Parameters: - log_id: The log ID of the session to delete Returns: - JSON with success status and message """ try: # Try to determine the agent name from the context file first agent_name = "unknown" context_dir = os.environ.get('CHATCONTEXT_DIR', 'data/context') context_file_path = f"{context_dir}/{user.username}/context_{log_id}.json" if os.path.exists(context_file_path): try: with open(context_file_path, 'r') as f: context_data = json.load(f) agent_name = context_data.get('agent_name', 'unknown') print(f"Found agent name '{agent_name}' from context file for log_id {log_id}") except Exception as e: print(f"Error reading context file {context_file_path}: {e}") # If we still don't have the agent name, try to find the chatlog file if agent_name == "unknown": from lib.chatlog import find_chatlog_file chatlog_path = find_chatlog_file(log_id) if chatlog_path: # Extract agent from path: data/chat/{user}/{agent}/chatlog_{log_id}.json path_parts = chatlog_path.split(os.sep) if len(path_parts) >= 3: agent_name = path_parts[-2] # Agent is the second-to-last part print(f"Found agent name '{agent_name}' from chatlog file path for log_id {log_id}") await ChatContext.delete_session_by_id(log_id=log_id, user=user.username, agent=agent_name, cascade=True) return {"status": "ok", "message": f"Chat session {log_id} deleted successfully"} except Exception as e: print(f"Error deleting chat session {log_id}: {e}") raise HTTPException(status_code=500, detail=f"Error deleting chat session: {str(e)}")
[docs] @router.get("/chat/{log_id}/tokens") async def get_token_count_alt(request: Request, log_id: str): """ Alternative token count endpoint using token_counter module. Parameters: - log_id: The log ID to count tokens for Returns: - JSON with token counts or error message if log not found """ from lib.token_counter import count_tokens_for_log_id token_counts = await count_tokens_for_log_id(log_id) if token_counts is None: return {"status": "error", "message": f"Chat log with ID {log_id} not found"} return {"status": "ok", "token_counts": token_counts}
# Include widget routes try: from .widget_routes import router as widget_router router.include_router(widget_router) except ImportError as e: print(f"Warning: Could not load widget routes: {e}")