Source code for mindroot.coreplugins.index.handlers.publish

import os
import json
import shutil
import zipfile
from datetime import datetime
from pathlib import Path
from fastapi import HTTPException, UploadFile
from fastapi.responses import JSONResponse
from ..utils import install_persona

[docs] async def publish_index(INDEX_DIR: Path, PUBLISHED_DIR: Path, index_name: str): """Publish an index by creating a zip file containing index.json and persona directories.""" try: index_dir = INDEX_DIR / index_name if not index_dir.exists() or not index_dir.is_dir(): raise HTTPException(status_code=404, detail="Index directory not found") # Create a timestamp for the zip file name timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') zip_filename = f"{index_name}-{timestamp}.zip" zip_path = PUBLISHED_DIR / zip_filename with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # Add index.json index_json_path = index_dir / 'index.json' if index_json_path.exists(): zipf.write(index_json_path, 'index.json') # Add persona directories if they exist personas_dir = index_dir / 'personas' if personas_dir.exists(): for persona_dir in personas_dir.iterdir(): if persona_dir.is_dir(): # Add all files from the persona directory for root, _, files in os.walk(persona_dir): for file in files: file_path = Path(root) / file # Create relative path for the zip file arc_name = f"personas/{persona_dir.name}/{file}" zipf.write(file_path, arc_name) # Return URL path that can be used with the static file handler zip_url = f"/published/{zip_filename}" return JSONResponse({ 'success': True, 'message': 'Index published successfully', 'zip_file': zip_url }) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to publish index: {str(e)}")
[docs] async def install_index_from_zip(INDEX_DIR: Path, file: UploadFile): """Install an index from a zip file.""" if not file.filename.endswith('.zip'): raise HTTPException(status_code=400, detail="File must be a zip archive") # Create temporary directory for processing temp_dir = Path("temp_index_install") temp_dir.mkdir(exist_ok=True) try: # Save uploaded file zip_path = temp_dir / file.filename with open(zip_path, 'wb') as f: shutil.copyfileobj(file.file, f) # Extract zip contents with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) # Look for index.json in the extracted contents index_files = list(temp_dir.rglob('index.json')) if not index_files: raise HTTPException(status_code=400, detail="No index.json found in zip file") # Read the index metadata with open(index_files[0], 'r') as f: index_data = json.load(f) index_name = index_data.get('name') if not index_name: raise HTTPException(status_code=400, detail="Invalid index.json: missing name field") # Process agents and their personas index_root = index_files[0].parent personas_dir = index_root / 'personas' if personas_dir.exists(): for persona_dir in personas_dir.iterdir(): if persona_dir.is_dir(): persona_name = persona_dir.name # Install persona to the correct location install_persona(persona_dir, persona_name) # Move index to final location target_dir = INDEX_DIR / index_name if target_dir.exists(): shutil.rmtree(target_dir) # Create new index directory with index.json target_dir.mkdir(parents=True) shutil.copy2(index_files[0], target_dir / 'index.json') # Copy personas directory if it exists if personas_dir.exists(): shutil.copytree(personas_dir, target_dir / 'personas', dirs_exist_ok=True) return JSONResponse({ 'success': True, 'message': f"Index '{index_name}' installed successfully" }) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to install index: {str(e)}") finally: # Clean up temporary directory if temp_dir.exists(): shutil.rmtree(temp_dir)