Source code for mindroot.coreplugins.admin.agent_importer

from pathlib import Path
import json
import shutil
import logging
from typing import Dict, List
import os
import tempfile
import requests
import zipfile

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# These should be moved to a config file or environment variables
AGENT_BASE_DIR = Path('data/agents')
PERSONA_BASE_DIR = Path('personas')

CONFIRM_OVERWRITE_AGENT = os.environ.get('CONFIRM_OVERWRITE_AGENT', 'false')
if CONFIRM_OVERWRITE_AGENT.lower() in ['true', '1']:
    CONFIRM_OVERWRITE_AGENT = True
else:
    CONFIRM_OVERWRITE_AGENT = False

[docs] def download_github_files(repo_path: str, tag: str = None) -> tuple[str, str]: """Download GitHub repo files to temp directory""" # Format the download URL if tag: download_url = f"https://github.com/{repo_path}/archive/refs/tags/{tag}.zip" else: download_url = f"https://github.com/{repo_path}/archive/refs/heads/main.zip" # Create temp directory temp_dir = tempfile.mkdtemp() zip_path = os.path.join(temp_dir, 'repo.zip') try: # Download the zip file response = requests.get(download_url) response.raise_for_status() with open(zip_path, 'wb') as f: f.write(response.content) # Extract zip with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) return temp_dir, os.path.dirname(zip_path) except Exception as e: shutil.rmtree(temp_dir) raise e
[docs] def scan_for_agents(directory: Path) -> Dict[str, Dict]: discovered_agents = {} for agent_file in directory.rglob('agent.json'): try: with agent_file.open('r') as f: agent_data = json.load(f) agent_name = agent_data.get('name') if agent_name: discovered_agents[agent_name] = { 'path': agent_file.parent, 'data': agent_data } except json.JSONDecodeError: logger.error(f"Failed to parse agent file: {agent_file}") except PermissionError: logger.error(f"Permission denied when reading: {agent_file}") return discovered_agents
[docs] def validate_agent_structure(agent_data: Dict) -> bool: required_fields = ['name', 'persona', 'commands'] return all(field in agent_data for field in required_fields)
[docs] def import_agent(agent_name: str, agent_info: Dict, scope: str) -> None: target_dir = Path(AGENT_BASE_DIR) / scope / agent_name if CONFIRM_OVERWRITE_AGENT and target_dir.exists(): logger.warning(f"Agent {agent_name} already exists in {scope} scope, skipping") return if not validate_agent_structure(agent_info['data']): logger.error(f"Agent {agent_name} has invalid structure, skipping") return try: shutil.copytree(agent_info['path'], target_dir, dirs_exist_ok=True) logger.info(f"Imported agent {agent_name} to {target_dir}") persona_name = agent_info['data'].get('persona') if persona_name: persona_dir = os.path.join(agent_info['path'].parent.parent, "personas") import_persona(persona_name, persona_dir, scope) except PermissionError: logger.error(f"Permission denied when copying agent {agent_name}") except shutil.Error as e: logger.error(f"Error copying agent {agent_name}: {e}")
[docs] def import_persona(persona_name: str, source_dir: Path, scope: str) -> None: persona_source = Path(source_dir) / persona_name if not persona_source.exists(): logger.warning(f"Referenced persona {persona_name} not found in {source_dir}") return persona_target = PERSONA_BASE_DIR / scope / persona_name if CONFIRM_OVERWRITE_AGENT and persona_target.exists(): logger.warning(f"Persona {persona_name} already exists, skipping import") return try: shutil.copytree(persona_source, persona_target, dirs_exist_ok=True) logger.info(f"Imported persona {persona_name} to {persona_target}") except PermissionError: logger.error(f"Permission denied when copying persona {persona_name}") except shutil.Error as e: logger.error(f"Error copying persona {persona_name}: {e}")
[docs] def scan_and_import_agents(directory: Path, scope: str) -> Dict[str, List[str]]: if not directory.is_dir(): raise ValueError(f"Invalid directory path: {directory}") if scope not in ['local', 'shared']: raise ValueError(f"Invalid scope: {scope}") discovered_agents = scan_for_agents(directory) imported_agents = [] for agent_name, agent_info in discovered_agents.items(): try: import_agent(agent_name, agent_info, scope) imported_agents.append(agent_name) except Exception as e: logger.error(f"Failed to import agent {agent_name}: {e}") return { 'imported_agents': imported_agents, 'total_imported': len(imported_agents), 'total_discovered': len(discovered_agents) }
[docs] def import_github_agent(repo_path: str, scope: str, tag: str = None) -> Dict[str, any]: """Import an agent from a GitHub repository""" try: # Download and extract GitHub repository temp_dir, extract_path = download_github_files(repo_path, tag) try: # Scan the downloaded directory for agents discovered = scan_for_agents(Path(temp_dir)) if not discovered: raise ValueError("No valid agents found in repository") # Import each discovered agent result = scan_and_import_agents(Path(temp_dir), scope) return { 'success': True, 'message': f"Successfully imported {result['total_imported']} agents from GitHub", 'imported_agents': result['imported_agents'] } finally: # Clean up temp directory shutil.rmtree(temp_dir) except Exception as e: logger.error(f"Failed to import agent from GitHub: {str(e)}") return { 'success': False, 'message': f"Failed to import agent: {str(e)}", 'imported_agents': [] }