from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
import traceback
import os
import json
from typing import List, Optional
from lib.plugins import (
load_plugin_manifest, update_plugin_manifest, plugin_install,
save_plugin_manifest, plugin_update, toggle_plugin_state, get_plugin_path
)
from lib.plugins.installation import download_github_files
from lib.streamcmd import stream_command_as_events
import asyncio
import httpx
router = APIRouter()
[docs]
class DirectoryRequest(BaseModel):
directory: str
[docs]
class PluginRequest(BaseModel):
plugin: str
[docs]
class GitHubPluginRequest(BaseModel):
plugin: str
url: Optional[str] = None
github_url: Optional[str] = None
[docs]
class TogglePluginRequest(BaseModel):
plugin: str
enabled: bool
[docs]
class InstallFromIndexRequest(BaseModel):
plugin: str
index_name: str
[docs]
class StreamInstallRequest(BaseModel):
plugin: str
source: str
source_path: str = None
import sys, os, shlex
[docs]
@router.post("/stream-install-plugin", response_class=EventSourceResponse)
async def stream_install_plugin(request: StreamInstallRequest):
"""Stream the installation process of a plugin using SSE (POST method)."""
# Prepare the command based on the source
print("Stream Install Request:", request)
if request.source == 'github_direct' or request.source == 'github':
if request.source_path.startswith('https://'):
# For direct GitHub URLs, we can use the pip install directly
cmd = [sys.executable, '-m', 'pip', 'install', request.source_path, '-v', '--no-cache-dir']
else:
cmd = [sys.executable, '-m', 'pip', 'install', '-e', request.source_path, '-v', '--no-cache-dir']
elif request.source == 'local':
cmd = [sys.executable, '-m', 'pip', 'install', '-e', request.source_path, '-v', '--no-cache-dir']
elif request.source == 'pypi':
cmd = [sys.executable, '-m', 'pip', 'install', request.plugin, '-v', '--no-cache-dir']
else:
print("Invalid source")
return {"success": False, "message": "Invalid source"}
print("Command to execute:", cmd)
# For GitHub installations, use the plugin_install function which handles the download and extraction
if request.source == 'github':
try:
# Use the streaming approach for GitHub installations
parts = request.source_path.split(':')
repo_path = parts[0]
tag = parts[1] if len(parts) > 1 else None
print(1)
async def stream_github_install():
print("dling")
yield {"event": "message", "data": f"Downloading GitHub repository {repo_path}..."}
try:
print("dling 2")
plugin_dir, _, plugin_info = download_github_files(repo_path, tag)
cmd = [sys.executable, '-m', 'pip', 'install', '-e', plugin_dir, '-v', '--no-cache-dir']
async for event in stream_command_as_events(cmd):
yield event
update_plugin_manifest(
plugin_info['name'], 'github', os.path.abspath(plugin_dir),
remote_source=repo_path, version=plugin_info.get('version', '0.0.1'),
metadata=plugin_info
)
except Exception as e:
print(e)
yield {"event": "error", "data": f"Error installing from GitHub: {str(e)}"}
return EventSourceResponse(stream_github_install())
except Exception as e:
print(3)
print(e)
return {"success": False, "message": f"Error setting up GitHub installation: {str(e)}"}
# For other sources, use our streamcmd module to stream the command output
print("stream cmd as events")
return EventSourceResponse(stream_command_as_events(cmd))
[docs]
@router.get("/stream-install-plugin", response_class=EventSourceResponse)
async def stream_install_plugin_get(request: Request):
"""Stream the installation process of a plugin using SSE (GET method)."""
# Extract parameters from query string
print("Stream Install GET Request:", request.query_params)
plugin = request.query_params.get("plugin", "")
source = request.query_params.get("source", "")
source_path = request.query_params.get("source_path", "")
# Use the new simpler approach
if source == 'github':
cmd = [sys.executable, '-m', 'pip', 'install', '-e', source_path, '-v', '--no-cache-dir']
message = f"Installing {plugin} from GitHub repository {source_path}..."
elif source == 'local':
cmd = [sys.executable, '-m', 'pip', 'install', '-e', source_path, '-v', '--no-cache-dir']
message = f"Installing from local path: {source_path}"
elif source == 'pypi':
cmd = [sys.executable, '-m', 'pip', 'install', plugin, '-v', '--no-cache-dir']
message = f"Installing from PyPI: {plugin}"
else:
return {"success": False, "message": "Invalid source"}
print("Command to execute:", cmd)
tag = None
# For GitHub installations, use the plugin_install function which handles the download and extraction
if source == 'github':
try:
# Use the streaming approach for GitHub installations
print("source_path:", source_path)
parts = source_path.split(':')
repo_path = parts[0]
tag = parts[1] if len(parts) > 1 else None
print("repo_path:", repo_path, "tag:", tag)
# First yield a message about downloading
#
async def stream_github_install():
yield {"event": "message", "data": f"Downloading GitHub repository {repo_path}..."}
repo_path_ = repo_path
tag_ = tag
# Download and extract the GitHub repository
try:
if source_path.startswith('https://'):
print("Processing direct GitHub URL")
repo_path_ = source_path
tag_ = None
parts = repo_path_.split('/')
if len(parts) >= 5:
repo_path_ = f"{parts[3]}/{parts[4]}"
print("repo_path_:", repo_path_)
plugin_dir, _, plugin_info = download_github_files(repo_path_, tag_)
print('ok')
# Now stream the installation from the local directory
cmd = [sys.executable, '-m', 'pip', 'install', '-e', plugin_dir, '-v', '--no-cache-dir']
async for event in stream_command_as_events(cmd):
yield event
# Update the plugin manifest
update_plugin_manifest(
plugin_info['name'], 'github', os.path.abspath(plugin_dir),
remote_source=repo_path_, version=plugin_info.get('version', '0.0.1'),
metadata=plugin_info
)
except Exception as e:
trace = traceback.format_exc()
yield {"event": "error", "data": f"Error installing from GitHub: {str(e)} \n\n{trace}"}
return EventSourceResponse(stream_github_install())
except Exception as e:
trace = traceback.format_exc()
return {"success": False, "message": f"Error installing from GitHub: {str(e)}\n\n{trace}"}
# Use our new streamcmd module
return EventSourceResponse(stream_command_as_events(cmd))
[docs]
@router.get("/get-all-plugins")
async def get_all_plugins():
try:
manifest = load_plugin_manifest()
plugins = []
# Process core plugins
for plugin_name, plugin_info in manifest['plugins']['core'].items():
plugins.append({
"name": plugin_name,
"category": "core",
"enabled": plugin_info['enabled'],
"source": "core",
"remote_source": plugin_name,
"version": "1.0.0",
"description": plugin_info.get('metadata', {}).get('description', '')
})
# Process installed plugins
for plugin_name, plugin_info in manifest['plugins']['installed'].items():
plugins.append({
"name": plugin_name,
"category": "installed",
"enabled": plugin_info['enabled'],
"source": plugin_info['source'],
"remote_source": plugin_info.get('remote_source', plugin_info.get('github_url')),
"source_path": plugin_info.get('source_path'),
"version": plugin_info.get('version', '0.0.1'),
"description": plugin_info.get('metadata', {}).get('description', ''),
"index_source": plugin_info.get('metadata', {}).get('index_source')
})
return {"success": True, "data": plugins}
except Exception as e:
trace = traceback.format_exc()
return {"success": False, "message": f"Error fetching plugins: {str(e)}\n\n{trace}"}
[docs]
@router.post("/scan-directory")
async def scan_directory(request: DirectoryRequest):
try:
directory = request.directory
if not os.path.isdir(directory):
return {"success": False, "message": "Invalid directory path"}
discovered_plugins = discover_plugins(directory)
manifest = load_plugin_manifest()
print("discoverd_plugins", discovered_plugins)
# Analyze plugins for index compatibility
addable_count = 0
for plugin_name, plugin_info in discovered_plugins.items():
# Check if plugin has GitHub info
has_github = (
plugin_info.get('github_url') or
plugin_info.get('remote_source') or
(plugin_info.get('metadata', {}).get('github_url'))
)
if has_github:
addable_count += 1
# Update installed plugins from discovered ones
for plugin_name, plugin_info in discovered_plugins.items():
plugin_info['source'] = 'local'
plugin_info['metadata'] = plugin_info.get('metadata', {}) or {
"description": plugin_info.get('description', ''),
"install_date": plugin_info.get('install_date', ''),
"commands": plugin_info.get('commands', []),
"services": plugin_info.get('services', [])
}
print(plugin_info)
manifest['plugins']['installed'][plugin_name] = plugin_info
# Prepare plugin list for response
plugins_list = [{
"name": name,
"description": info.get('metadata', {}).get('description', info.get('description', ''))
} for name, info in discovered_plugins.items()]
save_plugin_manifest(manifest)
response = {"success": True,
"message": f"Scanned {len(discovered_plugins)} plugins in {directory}",
"plugins": plugins_list,
"addable_to_index": addable_count}
if addable_count < len(discovered_plugins):
response["warning"] = f"{len(discovered_plugins) - addable_count} plugins missing GitHub info and cannot be added to indices"
return response
except Exception as e:
trace = traceback.format_exc()
return {"success": False, "message": f"Error during scan: {str(e)}\n\n{trace}"}
[docs]
@router.post("/install-local-plugin")
async def install_local_plugin(request: PluginRequest):
try:
plugin_name = request.plugin
plugin_path = get_plugin_path(plugin_name)
if not plugin_path:
return {"success": False, "message": "Plugin path not found"}
success = await plugin_install(plugin_name, source='local', source_path=plugin_path)
if success:
return {"success": True, "message": f"Plugin {plugin_name} installed successfully"}
else:
return {"success": False, "message": "Installation failed"}
except Exception as e:
trace = traceback.format_exc()
return {"success": False, "message": f"Error installing plugin: {str(e)}\n\n{trace}"}
[docs]
@router.post("/install-x-github-plugin")
async def install_github_plugin(request: GitHubPluginRequest):
try:
print("Request:", request)
url = request.url or request.github_url
success = await plugin_install('test', source='github', source_path=url)
if success:
return {"success": True, "message": "Plugin installed successfully from GitHub"}
else:
return {"success": False, "message": "Installation failed"}
except Exception as e:
trace = traceback.format_exc()
return {"success": False, "message": f"Error installing from GitHub: {str(e)}\n\n{trace}"}
[docs]
@router.post("/install-from-index")
async def install_from_index(request: InstallFromIndexRequest):
try:
# Load the index to get plugin information
index_path = os.path.join('indices', f"{request.index_name}.json")
if not os.path.exists(index_path):
return {"success": False, "message": "Index not found"}
with open(index_path, 'r') as f:
index_data = json.load(f)
# Find plugin in index
plugin_data = None
for plugin in index_data.get('plugins', []):
if plugin['name'] == request.plugin:
plugin_data = plugin
break
if not plugin_data:
return {"success": False, "message": "Plugin not found in index"}
# Install the plugin
if plugin_data.get('github_url'):
success = await plugin_install(
request.plugin,
source='github',
source_path=plugin_data['github_url']
)
elif plugin_data.get('source_path'):
success = await plugin_install(
request.plugin,
source='local',
source_path=plugin_data['source_path']
)
else:
return {"success": False, "message": "No valid installation source in index"}
if success:
# Update plugin metadata with index information
manifest = load_plugin_manifest()
if request.plugin in manifest['plugins']['installed']:
manifest['plugins']['installed'][request.plugin]['metadata']['index_source'] = request.index_name
save_plugin_manifest(manifest)
return {"success": True, "message": f"Plugin {request.plugin} installed successfully from index"}
else:
return {"success": False, "message": "Installation failed"}
except Exception as e:
trace = traceback.format_exc()
return {"success": False, "message": f"Error installing from index: {str(e)}\n\n{trace}"}
[docs]
@router.post("/toggle-plugin")
async def toggle_plugin(request: TogglePluginRequest):
try:
success = toggle_plugin_state(request.plugin, request.enabled)
if success:
return {"success": True, "message": f"Plugin {request.plugin} {'enabled' if request.enabled else 'disabled'} successfully"}
else:
return {"success": False, "message": "Failed to toggle plugin state"}
except Exception as e:
trace = traceback.format_exc()
return {"success": False, "message": f"Error toggling plugin: {str(e)}\n\n{trace}"}
# Helper function
[docs]
def discover_plugins(directory):
discovered = {}
for item in os.listdir(directory):
item_path = os.path.join(directory, item)
plugin_info_path = os.path.join(item_path, 'plugin_info.json')
if os.path.isfile(plugin_info_path):
try:
with open(plugin_info_path, 'r') as f:
plugin_info = json.load(f)
plugin_info['enabled'] = False
plugin_info['source_path'] = item_path
discovered[plugin_info['name']] = plugin_info
except json.JSONDecodeError:
print(f"Error reading plugin info for {item}")
continue
return discovered
[docs]
async def publish_plugin_from_github(repo: str, registry_token: str, registry_url: str):
"""
Fetches plugin_info.json from a GitHub repo and publishes it to the registry.
"""
plugin_info = None
# Try to fetch from 'main' and then 'master' branch
for branch in ['main', 'master']:
url = f"https://raw.githubusercontent.com/{repo}/{branch}/plugin_info.json"
async with httpx.AsyncClient() as client:
try:
response = await client.get(url)
if response.status_code == 200:
plugin_info = response.json()
break
except httpx.RequestError as e:
# This might happen if the repo is private or other network issues
print(f"Error fetching from {url}: {e}")
continue
if not plugin_info:
raise Exception(f"Could not find or access plugin_info.json in repo {repo} on 'main' or 'master' branch.")
# Construct the payload for the registry's /publish endpoint
publish_data = {
"title": plugin_info.get("name"),
"description": plugin_info.get("description", ""),
"category": "plugin",
"content_type": "mindroot_plugin",
"version": plugin_info.get("version", "0.1.0"),
"github_url": f"https://github.com/{repo}",
"pypi_module": plugin_info.get("pypi_module"),
"commands": plugin_info.get("commands", []),
"services": plugin_info.get("services", []),
"tags": plugin_info.get("tags", ["plugin"]),
"dependencies": plugin_info.get("dependencies", []),
"data": {
"plugin_info": plugin_info,
"installation": {
"type": "github",
"source_path": repo
}
}
}
# Post the data to the registry
publish_url = f"{registry_url}/publish"
headers = {
"Authorization": f"Bearer {registry_token}",
"Content-Type": "application/json"
}
async with httpx.AsyncClient() as client:
response = await client.post(publish_url, json=publish_data, headers=headers)
if response.status_code >= 400:
try:
error_detail = response.json().get("detail", response.text)
except:
error_detail = response.text
raise Exception(f"Failed to publish to registry: {response.status_code} - {error_detail}")
return response.json()