Source code for mindroot.coreplugins.admin.agent_router

from fastapi import APIRouter, HTTPException, UploadFile, File, Form
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from pathlib import Path
import json
from .agent_importer import scan_and_import_agents, import_github_agent
from lib.providers.commands import command_manager
from .persona_handler import handle_persona_import, import_persona_from_index
import traceback
import hashlib
import zipfile
import io
import tempfile
import time
from typing import Dict, Any
from datetime import datetime

router = APIRouter()

BASE_DIR = Path('data/agents')
local_dir = BASE_DIR / "local"
shared_dir = BASE_DIR / "shared"
local_dir.mkdir(parents=True, exist_ok=True)
shared_dir.mkdir(parents=True, exist_ok=True)

# Cache for agent ownership info
_agent_ownership_cache = None
_cache_timestamp = 0

[docs] class GitHubImportRequest(BaseModel): repo_path: str # Format: "owner/repo" scope: str tag: str = None
[docs] def scan_agent_ownership() -> Dict[str, Any]: """Scan all agents and build ownership information""" ownership_info = { 'agents': {}, 'scanned_at': datetime.now().isoformat(), 'total_agents': 0, 'owned_agents': 0, 'external_agents': 0 } # Scan both local and shared directories for scope in ['local', 'shared']: scope_dir = BASE_DIR / scope if not scope_dir.exists(): continue for agent_dir in scope_dir.iterdir(): if not agent_dir.is_dir(): continue agent_json_path = agent_dir / 'agent.json' if not agent_json_path.exists(): continue try: with open(agent_json_path, 'r') as f: agent_data = json.load(f) agent_name = agent_data.get('name', agent_dir.name) # Extract ownership information creator = agent_data.get('creator') owner = agent_data.get('owner') registry_owner = agent_data.get('registry_owner') created_by = agent_data.get('created_by') # Determine if this agent has external ownership has_external_owner = bool(creator or owner or registry_owner or created_by) ownership_info['agents'][f"{scope}/{agent_name}"] = { 'name': agent_name, 'scope': scope, 'creator': creator, 'owner': owner, 'registry_owner': registry_owner, 'created_by': created_by, 'has_external_owner': has_external_owner, 'description': agent_data.get('description', ''), 'version': agent_data.get('version', '1.0.0') } ownership_info['total_agents'] += 1 if has_external_owner: ownership_info['external_agents'] += 1 else: ownership_info['owned_agents'] += 1 except Exception as e: print(f"Error reading agent {agent_dir.name}: {e}") continue return ownership_info
[docs] async def load_persona_data(persona_name: str) -> dict: """Load persona data from local or shared directory""" # Handle registry personas: registry/owner/name if persona_name.startswith('registry/'): persona_path = Path(f'personas/{persona_name}/persona.json') if persona_path.exists(): with open(persona_path, 'r') as f: return json.load(f) # Fallback to existing local/shared pattern (UNCHANGED) persona_path = Path('personas/local') / persona_name / 'persona.json' if not persona_path.exists(): persona_path = Path('personas/shared') / persona_name / 'persona.json' if not persona_path.exists(): return {} with open(persona_path, 'r') as f: return json.load(f)
[docs] async def load_agent_data(agent_name: str) -> dict: """Load agent data from local or shared directory""" agent_path = BASE_DIR / 'local' / agent_name / 'agent.json' if not agent_path.exists(): agent_path = BASE_DIR / 'shared' / agent_name / 'agent.json' if not agent_path.exists(): raise FileNotFoundError(f'Agent {agent_name} not found') with open(agent_path, 'r') as f: agent_data = json.load(f) # Get the persona data if specified if 'persona' in agent_data: persona_data = await load_persona_data(agent_data['persona']) agent_data['persona'] = persona_data return agent_data
[docs] @router.get('/agents/full/{scope}/{name}') async def get_full_agent_data(scope: str, name: str): """Get complete agent data including persona""" if scope not in ['local', 'shared']: raise HTTPException(status_code=400, detail='Invalid scope') try: agent_data = await load_agent_data(name) return agent_data except FileNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=str(e))
[docs] @router.get('/command-providers') def get_command_providers(): """Get all available providers for each command""" result = {} for command_name, providers in command_manager.functions.items(): result[command_name] = [ p['provider'] for p in providers ] return result
[docs] @router.get('/agents/{scope}/{name}') def read_agent(scope: str, name: str): if scope not in ['local', 'shared']: raise HTTPException(status_code=400, detail='Invalid scope') agent_path = BASE_DIR / scope / name / 'agent.json' if not agent_path.exists(): raise HTTPException(status_code=404, detail='Agent not found') with open(agent_path, 'r') as f: agent = json.load(f) return agent
[docs] @router.get('/agents/{scope}') def list_agents(scope: str): if scope not in ['local', 'shared']: raise HTTPException(status_code=400, detail='Invalid scope') scope_dir = BASE_DIR / scope agents = [] for p in scope_dir.iterdir(): if p.is_dir(): agent_json_path = p / 'agent.json' if agent_json_path.exists(): try: with open(agent_json_path, 'r') as f: agent_data = json.load(f) agents.append(agent_data) except Exception as e: # If we can't read the agent.json, just include the name agents.append({'name': p.name}) return agents
[docs] @router.get('/agents/ownership-info') def get_agent_ownership_info(): """Get cached ownership information for all agents""" global _agent_ownership_cache, _cache_timestamp # Cache for 5 minutes cache_duration = 300 # 5 minutes in seconds current_time = time.time() if (_agent_ownership_cache is None or current_time - _cache_timestamp > cache_duration): _agent_ownership_cache = scan_agent_ownership() _cache_timestamp = current_time return _agent_ownership_cache
[docs] @router.post('/agents/refresh-ownership-cache') def refresh_agent_ownership_cache(): """Force refresh of the agent ownership cache""" global _agent_ownership_cache, _cache_timestamp _agent_ownership_cache = scan_agent_ownership() _cache_timestamp = time.time() return { 'success': True, 'message': 'Agent ownership cache refreshed', 'data': _agent_ownership_cache }
[docs] @router.post('/agents/{scope}') def create_agent(scope: str, agent: str = Form(...)): try: print(f"Creating agent in scope: {scope}") agent_data = json.loads(agent) if scope not in ['local', 'shared']: raise HTTPException(status_code=400, detail='Invalid scope') agent_name = agent_data.get('name') if not agent_name: raise HTTPException(status_code=400, detail='Agent name is required') print(f"Agent name: {agent_name}") if "indexName" in agent_data and agent_data["indexName"] is not None: print("Import agent from index: " + agent_data["indexName"]) import_persona_from_index(agent_data["indexName"], agent_data['persona']) if 'persona' in agent_data: # This will either return the persona name or handle the import # and return the name # Extract owner information for registry personas owner = agent_data.get('registry_owner') or agent_data.get('owner') or agent_data.get('creator') persona_scope = 'registry' if owner else scope persona_name = handle_persona_import(agent_data['persona'], persona_scope, owner) agent_data['persona'] = persona_name # Ensure recommended_plugins is present and is a list (also handle legacy required_plugins) if 'required_plugins' not in agent_data: agent_data['required_plugins'] = [] elif not isinstance(agent_data['required_plugins'], list): agent_data['required_plugins'] = list(agent_data['required_plugins']) # Ensure preferred_providers is present and is a list if 'preferred_providers' not in agent_data: agent_data['recommended_plugins'] = [] elif not isinstance(agent_data['recommended_plugins'], list): agent_data['recommended_plugins'] = list(agent_data['recommended_plugins']) # Ensure preferred_providers is present and is a list if 'preferred_providers' not in agent_data: agent_data['preferred_providers'] = [] elif not isinstance(agent_data['preferred_providers'], list): agent_data['preferred_providers'] = list(agent_data['preferred_providers']) agent_path = BASE_DIR / scope / agent_name / 'agent.json' if agent_path.exists(): # Check if overwrite parameter is provided overwrite = agent_data.get('overwrite', False) if not overwrite: raise HTTPException(status_code=400, detail='Agent already exists') else: # If overwrite is True, we'll proceed to overwrite the existing agent print(f"Overwriting existing agent: {agent_name}") print(f"Creating agent directory: {agent_path.parent}") agent_path.parent.mkdir(parents=True, exist_ok=True) print(f"Writing agent file: {agent_path}") with open(agent_path, 'w') as f: json.dump(agent_data, f, indent=2) print(f"Successfully created agent: {agent_name}") return {'status': 'success'} except Exception as e: trace = traceback.format_exc() print(f"Error creating agent: {str(e)}\n{trace}") raise HTTPException(status_code=500, detail=f'Internal server error: {str(e)}\nTrace: {trace}')
[docs] @router.put('/agents/{scope}/{name}') def update_agent(scope: str, name: str, agent: str = Form(...)): try: agent = json.loads(agent) if scope not in ['local', 'shared']: raise HTTPException(status_code=400, detail='Invalid scope') # Ensure recommended_plugins is present and is a list (also handle legacy required_plugins) if 'required_plugins' not in agent: agent['required_plugins'] = [] elif not isinstance(agent['required_plugins'], list): agent['required_plugins'] = list(agent['required_plugins']) # Ensure recommended_plugins is present and is a list if 'recommended_plugins' not in agent: agent['required_plugins'] = [] elif not isinstance(agent['required_plugins'], list): agent['required_plugins'] = list(agent['required_plugins']) if 'preferred_providers' not in agent: agent['preferred_providers'] = [] elif not isinstance(agent['preferred_providers'], list): agent['preferred_providers'] = list(agent['preferred_providers']) agent_path = BASE_DIR / scope / name / 'agent.json' without_hash = agent.copy() without_hash.pop('hashver', None) # Remove the hash if it exists consistent_json = json.dumps(without_hash, sort_keys=True, separators=(',', ':')) hashver = hashlib.sha256(consistent_json.encode('utf-8')).hexdigest()[:4] agent['hashver'] = hashver if not agent_path.exists(): raise HTTPException(status_code=404, detail='Agent not found') with open(agent_path, 'w') as f: json.dump(agent, f, indent=2) return {'status': 'success'} except Exception as e: raise HTTPException(status_code=500, detail='Internal server error ' + str(e))
[docs] class ScanDirectoryRequest(BaseModel): directory: str scope: str
[docs] @router.post('/scan-and-import-agents') def scan_and_import_agents_endpoint(request: ScanDirectoryRequest): try: result = scan_and_import_agents(Path(request.directory), request.scope) return { 'success': True, 'message': f"Imported {result['total_imported']} out of {result['total_discovered']} discovered agents", 'imported_agents': result['imported_agents'] } except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=f"Error during import: {str(e)}")
[docs] @router.post('/import-github-agent') def import_github_agent_endpoint(request: GitHubImportRequest): try: if request.scope not in ['local', 'shared']: raise HTTPException(status_code=400, detail='Invalid scope') result = import_github_agent(request.repo_path, request.scope, request.tag) if not result['success']: raise HTTPException(status_code=400, detail=result['message']) return result except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=f"Error during GitHub import: {str(e)}")
[docs] @router.get('/agents/{scope}/{name}/export') def export_agent_zip(scope: str, name: str): """Export an agent as a zip file""" if scope not in ['local', 'shared']: raise HTTPException(status_code=400, detail='Invalid scope') agent_dir = BASE_DIR / scope / name if not agent_dir.exists(): raise HTTPException(status_code=404, detail='Agent not found') try: # Create zip file in memory zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: # Add all files from the agent directory for file_path in agent_dir.rglob('*'): if file_path.is_file(): # Use relative path within the zip arcname = file_path.relative_to(agent_dir) zip_file.write(file_path, arcname) zip_buffer.seek(0) # Return as streaming response return StreamingResponse( io.BytesIO(zip_buffer.read()), media_type='application/zip', headers={'Content-Disposition': f'attachment; filename="{name}_agent.zip"'} ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error creating zip: {str(e)}")
[docs] @router.post('/agents/{scope}/import-zip') async def import_agent_zip(scope: str, file: UploadFile = File(...)): """Import an agent from a zip file""" if scope not in ['local', 'shared']: raise HTTPException(status_code=400, detail='Invalid scope') if not file.filename.endswith('.zip'): raise HTTPException(status_code=400, detail='File must be a zip file') try: # Read the uploaded file content = await file.read() # Create a temporary directory for extraction with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Extract zip file with zipfile.ZipFile(io.BytesIO(content), 'r') as zip_file: zip_file.extractall(temp_path) # Find agent.json file agent_json_files = list(temp_path.rglob('agent.json')) if not agent_json_files: raise HTTPException(status_code=400, detail='No agent.json found in zip file') agent_json_path = agent_json_files[0] # Load agent data with open(agent_json_path, 'r') as f: agent_data = json.load(f) agent_name = agent_data.get('name') if not agent_name: raise HTTPException(status_code=400, detail='Agent name not found in agent.json') # Check if agent already exists target_dir = BASE_DIR / scope / agent_name if target_dir.exists(): # For zip imports, we could add overwrite support in the future # For now, we'll keep the existing behavior but make the error message consistent raise HTTPException( status_code=400, detail=f'Agent {agent_name} already exists. Use the registry manager for updates.' ) # Copy extracted files to target directory target_dir.mkdir(parents=True, exist_ok=True) # Get the directory containing agent.json source_dir = agent_json_path.parent # Copy all files from source to target import shutil for item in source_dir.rglob('*'): if item.is_file(): relative_path = item.relative_to(source_dir) target_file = target_dir / relative_path target_file.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(item, target_file) return { 'success': True, 'message': f'Agent {agent_name} imported successfully', 'agent_name': agent_name } except Exception as e: raise HTTPException(status_code=500, detail=f"Error importing zip: {str(e)}")