Source code for mindroot.coreplugins.admin.plugin_routes

from fastapi import APIRouter, HTTPException, Header
from pydantic import BaseModel
import json
import os
from typing import Optional
from lib import plugins
from . import plugin_manager
from lib.route_decorators import requires_role

# Create router with admin role requirement
router = APIRouter(
    dependencies=[requires_role('admin')]
)

[docs] class PluginUpdateRequest(BaseModel): plugins: dict
[docs] class GithubPublishRequest(BaseModel): repo: str registry_url: Optional[str] = None
# --- Plugin Management Routes ---
[docs] @router.post("/update-plugins") def update_plugins(request: PluginUpdateRequest): """Update plugin enabled/disabled status.""" try: with open('plugins.json', 'r') as file: plugins_data = json.load(file) for plugin in plugins_data: if plugin['name'] in request.plugins: plugin['enabled'] = request.plugins[plugin['name']] with open('plugins.json', 'w') as file: json.dump(plugins_data, file, indent=2) plugins.load('data/plugin_manifest.json') return {"message": "Plugins updated successfully"} except Exception as e: raise HTTPException(status_code=500, detail=str(e))
[docs] @router.get("/get-plugins") async def get_plugins(): """Get list of all plugins.""" try: return plugins.load_plugin_manifest() except Exception as e: raise HTTPException(status_code=500, detail=str(e))
[docs] @router.post("/plugins/publish_from_github") async def publish_plugin_from_github( request: GithubPublishRequest, authorization: Optional[str] = Header(None) ): """Publish a plugin from GitHub repository to the registry. This endpoint allows publishing a plugin by simply providing the GitHub repository in the format 'username/repo'. It will fetch the plugin_info.json from the repository and publish it to the configured registry. The registry token can be provided via: 1. Authorization header: "Bearer <token>" 2. REGISTRY_TOKEN environment variable 3. registry_token in data/registry_settings.json """ try: # Get registry token from multiple sources registry_token = None # 1. Try Authorization header if authorization and authorization.startswith("Bearer "): registry_token = authorization.split(" ")[1] # 2. Try environment variable if not registry_token: registry_token = os.getenv('REGISTRY_TOKEN') # 3. Try settings file if not registry_token: try: settings_file = 'data/registry_settings.json' if os.path.exists(settings_file): with open(settings_file, 'r') as f: settings = json.load(f) registry_token = settings.get('registry_token') except Exception as e: print(f"Error reading registry settings: {e}") if not registry_token: raise HTTPException( status_code=401, detail="Registry authentication token not provided. Please provide via Authorization header, REGISTRY_TOKEN environment variable, or registry_settings.json file." ) # Use the existing plugin_manager functionality registry_url = request.registry_url or "https://registry.mindroot.io" result = await plugin_manager.publish_plugin_from_github( request.repo, registry_token, registry_url ) return { "success": True, "message": f"Plugin '{result.get('title', request.repo)}' published successfully!", "data": result } except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))