Source code for mindroot.coreplugins.admin.plugin_manager

from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
import traceback
import os
import json
from typing import List, Optional
from lib.plugins import (
    load_plugin_manifest, update_plugin_manifest, plugin_install,
    save_plugin_manifest, plugin_update, toggle_plugin_state, get_plugin_path
)
from lib.plugins.installation import download_github_files
from lib.streamcmd import stream_command_as_events
import asyncio
import httpx


router = APIRouter()

[docs] class DirectoryRequest(BaseModel): directory: str
[docs] class PluginRequest(BaseModel): plugin: str
[docs] class GitHubPluginRequest(BaseModel): plugin: str url: Optional[str] = None github_url: Optional[str] = None
[docs] class TogglePluginRequest(BaseModel): plugin: str enabled: bool
[docs] class InstallFromIndexRequest(BaseModel): plugin: str index_name: str
[docs] class StreamInstallRequest(BaseModel): plugin: str source: str source_path: str = None
[docs] class PluginMetadata(BaseModel): description: Optional[str] = None commands: Optional[List[str]] = None services: Optional[List[str]] = None dependencies: Optional[List[str]] = None
import sys, os, shlex
[docs] @router.post("/stream-install-plugin", response_class=EventSourceResponse) async def stream_install_plugin(request: StreamInstallRequest): """Stream the installation process of a plugin using SSE (POST method).""" # Prepare the command based on the source print("Stream Install Request:", request) if request.source == 'github_direct' or request.source == 'github': if request.source_path.startswith('https://'): # For direct GitHub URLs, we can use the pip install directly cmd = [sys.executable, '-m', 'pip', 'install', request.source_path, '-v', '--no-cache-dir'] else: cmd = [sys.executable, '-m', 'pip', 'install', '-e', request.source_path, '-v', '--no-cache-dir'] elif request.source == 'local': cmd = [sys.executable, '-m', 'pip', 'install', '-e', request.source_path, '-v', '--no-cache-dir'] elif request.source == 'pypi': cmd = [sys.executable, '-m', 'pip', 'install', request.plugin, '-v', '--no-cache-dir'] else: print("Invalid source") return {"success": False, "message": "Invalid source"} print("Command to execute:", cmd) # For GitHub installations, use the plugin_install function which handles the download and extraction if request.source == 'github': try: # Use the streaming approach for GitHub installations parts = request.source_path.split(':') repo_path = parts[0] tag = parts[1] if len(parts) > 1 else None print(1) async def stream_github_install(): print("dling") yield {"event": "message", "data": f"Downloading GitHub repository {repo_path}..."} try: print("dling 2") plugin_dir, _, plugin_info = download_github_files(repo_path, tag) cmd = [sys.executable, '-m', 'pip', 'install', '-e', plugin_dir, '-v', '--no-cache-dir'] async for event in stream_command_as_events(cmd): yield event update_plugin_manifest( plugin_info['name'], 'github', os.path.abspath(plugin_dir), remote_source=repo_path, version=plugin_info.get('version', '0.0.1'), metadata=plugin_info ) except Exception as e: print(e) yield {"event": "error", "data": f"Error installing from GitHub: {str(e)}"} return EventSourceResponse(stream_github_install()) except Exception as e: print(3) print(e) return {"success": False, "message": f"Error setting up GitHub installation: {str(e)}"} # For other sources, use our streamcmd module to stream the command output print("stream cmd as events") return EventSourceResponse(stream_command_as_events(cmd))
[docs] @router.get("/stream-install-plugin", response_class=EventSourceResponse) async def stream_install_plugin_get(request: Request): """Stream the installation process of a plugin using SSE (GET method).""" # Extract parameters from query string print("Stream Install GET Request:", request.query_params) plugin = request.query_params.get("plugin", "") source = request.query_params.get("source", "") source_path = request.query_params.get("source_path", "") # Use the new simpler approach if source == 'github': cmd = [sys.executable, '-m', 'pip', 'install', '-e', source_path, '-v', '--no-cache-dir'] message = f"Installing {plugin} from GitHub repository {source_path}..." elif source == 'local': cmd = [sys.executable, '-m', 'pip', 'install', '-e', source_path, '-v', '--no-cache-dir'] message = f"Installing from local path: {source_path}" elif source == 'pypi': cmd = [sys.executable, '-m', 'pip', 'install', plugin, '-v', '--no-cache-dir'] message = f"Installing from PyPI: {plugin}" else: return {"success": False, "message": "Invalid source"} print("Command to execute:", cmd) tag = None # For GitHub installations, use the plugin_install function which handles the download and extraction if source == 'github': try: # Use the streaming approach for GitHub installations print("source_path:", source_path) parts = source_path.split(':') repo_path = parts[0] tag = parts[1] if len(parts) > 1 else None print("repo_path:", repo_path, "tag:", tag) # First yield a message about downloading # async def stream_github_install(): yield {"event": "message", "data": f"Downloading GitHub repository {repo_path}..."} repo_path_ = repo_path tag_ = tag # Download and extract the GitHub repository try: if source_path.startswith('https://'): print("Processing direct GitHub URL") repo_path_ = source_path tag_ = None parts = repo_path_.split('/') if len(parts) >= 5: repo_path_ = f"{parts[3]}/{parts[4]}" print("repo_path_:", repo_path_) plugin_dir, _, plugin_info = download_github_files(repo_path_, tag_) print('ok') # Now stream the installation from the local directory cmd = [sys.executable, '-m', 'pip', 'install', '-e', plugin_dir, '-v', '--no-cache-dir'] async for event in stream_command_as_events(cmd): yield event # Update the plugin manifest update_plugin_manifest( plugin_info['name'], 'github', os.path.abspath(plugin_dir), remote_source=repo_path_, version=plugin_info.get('version', '0.0.1'), metadata=plugin_info ) except Exception as e: trace = traceback.format_exc() yield {"event": "error", "data": f"Error installing from GitHub: {str(e)} \n\n{trace}"} return EventSourceResponse(stream_github_install()) except Exception as e: trace = traceback.format_exc() return {"success": False, "message": f"Error installing from GitHub: {str(e)}\n\n{trace}"} # Use our new streamcmd module return EventSourceResponse(stream_command_as_events(cmd))
[docs] @router.get("/get-all-plugins") async def get_all_plugins(): try: manifest = load_plugin_manifest() plugins = [] # Process core plugins for plugin_name, plugin_info in manifest['plugins']['core'].items(): plugins.append({ "name": plugin_name, "category": "core", "enabled": plugin_info['enabled'], "source": "core", "remote_source": plugin_name, "version": "1.0.0", "description": plugin_info.get('metadata', {}).get('description', '') }) # Process installed plugins for plugin_name, plugin_info in manifest['plugins']['installed'].items(): plugins.append({ "name": plugin_name, "category": "installed", "enabled": plugin_info['enabled'], "source": plugin_info['source'], "remote_source": plugin_info.get('remote_source', plugin_info.get('github_url')), "source_path": plugin_info.get('source_path'), "version": plugin_info.get('version', '0.0.1'), "description": plugin_info.get('metadata', {}).get('description', ''), "index_source": plugin_info.get('metadata', {}).get('index_source') }) return {"success": True, "data": plugins} except Exception as e: trace = traceback.format_exc() return {"success": False, "message": f"Error fetching plugins: {str(e)}\n\n{trace}"}
[docs] @router.post("/scan-directory") async def scan_directory(request: DirectoryRequest): try: directory = request.directory if not os.path.isdir(directory): return {"success": False, "message": "Invalid directory path"} discovered_plugins = discover_plugins(directory) manifest = load_plugin_manifest() print("discoverd_plugins", discovered_plugins) # Analyze plugins for index compatibility addable_count = 0 for plugin_name, plugin_info in discovered_plugins.items(): # Check if plugin has GitHub info has_github = ( plugin_info.get('github_url') or plugin_info.get('remote_source') or (plugin_info.get('metadata', {}).get('github_url')) ) if has_github: addable_count += 1 # Update installed plugins from discovered ones for plugin_name, plugin_info in discovered_plugins.items(): plugin_info['source'] = 'local' plugin_info['metadata'] = plugin_info.get('metadata', {}) or { "description": plugin_info.get('description', ''), "install_date": plugin_info.get('install_date', ''), "commands": plugin_info.get('commands', []), "services": plugin_info.get('services', []) } print(plugin_info) manifest['plugins']['installed'][plugin_name] = plugin_info # Prepare plugin list for response plugins_list = [{ "name": name, "description": info.get('metadata', {}).get('description', info.get('description', '')) } for name, info in discovered_plugins.items()] save_plugin_manifest(manifest) response = {"success": True, "message": f"Scanned {len(discovered_plugins)} plugins in {directory}", "plugins": plugins_list, "addable_to_index": addable_count} if addable_count < len(discovered_plugins): response["warning"] = f"{len(discovered_plugins) - addable_count} plugins missing GitHub info and cannot be added to indices" return response except Exception as e: trace = traceback.format_exc() return {"success": False, "message": f"Error during scan: {str(e)}\n\n{trace}"}
[docs] @router.post("/install-local-plugin") async def install_local_plugin(request: PluginRequest): try: plugin_name = request.plugin plugin_path = get_plugin_path(plugin_name) if not plugin_path: return {"success": False, "message": "Plugin path not found"} success = await plugin_install(plugin_name, source='local', source_path=plugin_path) if success: return {"success": True, "message": f"Plugin {plugin_name} installed successfully"} else: return {"success": False, "message": "Installation failed"} except Exception as e: trace = traceback.format_exc() return {"success": False, "message": f"Error installing plugin: {str(e)}\n\n{trace}"}
[docs] @router.post("/install-x-github-plugin") async def install_github_plugin(request: GitHubPluginRequest): try: print("Request:", request) url = request.url or request.github_url success = await plugin_install('test', source='github', source_path=url) if success: return {"success": True, "message": "Plugin installed successfully from GitHub"} else: return {"success": False, "message": "Installation failed"} except Exception as e: trace = traceback.format_exc() return {"success": False, "message": f"Error installing from GitHub: {str(e)}\n\n{trace}"}
[docs] @router.post("/install-from-index") async def install_from_index(request: InstallFromIndexRequest): try: # Load the index to get plugin information index_path = os.path.join('indices', f"{request.index_name}.json") if not os.path.exists(index_path): return {"success": False, "message": "Index not found"} with open(index_path, 'r') as f: index_data = json.load(f) # Find plugin in index plugin_data = None for plugin in index_data.get('plugins', []): if plugin['name'] == request.plugin: plugin_data = plugin break if not plugin_data: return {"success": False, "message": "Plugin not found in index"} # Install the plugin if plugin_data.get('github_url'): success = await plugin_install( request.plugin, source='github', source_path=plugin_data['github_url'] ) elif plugin_data.get('source_path'): success = await plugin_install( request.plugin, source='local', source_path=plugin_data['source_path'] ) else: return {"success": False, "message": "No valid installation source in index"} if success: # Update plugin metadata with index information manifest = load_plugin_manifest() if request.plugin in manifest['plugins']['installed']: manifest['plugins']['installed'][request.plugin]['metadata']['index_source'] = request.index_name save_plugin_manifest(manifest) return {"success": True, "message": f"Plugin {request.plugin} installed successfully from index"} else: return {"success": False, "message": "Installation failed"} except Exception as e: trace = traceback.format_exc() return {"success": False, "message": f"Error installing from index: {str(e)}\n\n{trace}"}
[docs] @router.post("/toggle-plugin") async def toggle_plugin(request: TogglePluginRequest): try: success = toggle_plugin_state(request.plugin, request.enabled) if success: return {"success": True, "message": f"Plugin {request.plugin} {'enabled' if request.enabled else 'disabled'} successfully"} else: return {"success": False, "message": "Failed to toggle plugin state"} except Exception as e: trace = traceback.format_exc() return {"success": False, "message": f"Error toggling plugin: {str(e)}\n\n{trace}"}
# Helper function
[docs] def discover_plugins(directory): discovered = {} for item in os.listdir(directory): item_path = os.path.join(directory, item) plugin_info_path = os.path.join(item_path, 'plugin_info.json') if os.path.isfile(plugin_info_path): try: with open(plugin_info_path, 'r') as f: plugin_info = json.load(f) plugin_info['enabled'] = False plugin_info['source_path'] = item_path discovered[plugin_info['name']] = plugin_info except json.JSONDecodeError: print(f"Error reading plugin info for {item}") continue return discovered
[docs] async def publish_plugin_from_github(repo: str, registry_token: str, registry_url: str): """ Fetches plugin_info.json from a GitHub repo and publishes it to the registry. """ plugin_info = None # Try to fetch from 'main' and then 'master' branch for branch in ['main', 'master']: url = f"https://raw.githubusercontent.com/{repo}/{branch}/plugin_info.json" async with httpx.AsyncClient() as client: try: response = await client.get(url) if response.status_code == 200: plugin_info = response.json() break except httpx.RequestError as e: # This might happen if the repo is private or other network issues print(f"Error fetching from {url}: {e}") continue if not plugin_info: raise Exception(f"Could not find or access plugin_info.json in repo {repo} on 'main' or 'master' branch.") # Construct the payload for the registry's /publish endpoint publish_data = { "title": plugin_info.get("name"), "description": plugin_info.get("description", ""), "category": "plugin", "content_type": "mindroot_plugin", "version": plugin_info.get("version", "0.1.0"), "github_url": f"https://github.com/{repo}", "pypi_module": plugin_info.get("pypi_module"), "commands": plugin_info.get("commands", []), "services": plugin_info.get("services", []), "tags": plugin_info.get("tags", ["plugin"]), "dependencies": plugin_info.get("dependencies", []), "data": { "plugin_info": plugin_info, "installation": { "type": "github", "source_path": repo } } } # Post the data to the registry publish_url = f"{registry_url}/publish" headers = { "Authorization": f"Bearer {registry_token}", "Content-Type": "application/json" } async with httpx.AsyncClient() as client: response = await client.post(publish_url, json=publish_data, headers=headers) if response.status_code >= 400: try: error_detail = response.json().get("detail", response.text) except: error_detail = response.text raise Exception(f"Failed to publish to registry: {response.status_code} - {error_detail}") return response.json()