Source code for mindroot.coreplugins.mcp_.dynamic_commands
from typing import Dict, List, Any
import json
import inspect
from lib.providers.commands import command_manager
[docs]
class MCPDynamicCommands:
"""Handles dynamic registration of MCP tools as MindRoot commands"""
def __init__(self):
self.registered_commands: Dict[str, str] = {} # cmd_name -> server_name
self.sessions: Dict[str, Any] = {} # Reference to sessions
[docs]
def set_sessions(self, sessions: Dict[str, Any]):
"""Set reference to MCP sessions"""
self.sessions = sessions
[docs]
async def register_tools(self, server_name: str, tools: List[Any]):
"""Register MCP tools as dynamic MindRoot commands"""
for tool in tools:
try:
tool_name = tool.name
#cmd_name = f"mcp_{server_name}_{tool_name}"
cmd_name = "mcp_"+tool_name
self.registered_commands[cmd_name] = server_name
# Create wrapper function with closure to capture current values
def create_wrapper(srv_name, tl_name):
async def mcp_tool_wrapper(*args, context=None, **kwargs):
if srv_name not in self.sessions:
return f"MCP server {srv_name} not connected"
session = self.sessions[srv_name]
# Convert arguments to MCP format
if len(args) == 1 and isinstance(args[0], dict):
arguments = args[0]
elif kwargs:
# Filter out context from kwargs
arguments = {k: v for k, v in kwargs.items() if k != 'context'}
else:
arguments = {}
try:
result = await session.call_tool(tl_name, arguments)
# Extract content from CallToolResult object
if hasattr(result, 'content'):
if isinstance(result.content, list) and len(result.content) > 0:
# Return the text content of the first item
first_item = result.content[0]
return first_item.text if hasattr(first_item, 'text') else str(first_item)
else:
return str(result.content)
else:
return str(result)
except Exception as e:
return f"Error calling MCP tool {tl_name}: {str(e)}"
return mcp_tool_wrapper
wrapper = create_wrapper(server_name, tool_name)
# Create docstring with parameter information
description = getattr(tool, 'description', 'No description available')
# Extract parameter schema if available
param_info = ""
example_args = {}
if hasattr(tool, 'inputSchema') and tool.inputSchema:
schema = tool.inputSchema
if isinstance(schema, dict) and 'properties' in schema:
properties = schema['properties']
required = schema.get('required', [])
param_lines = []
for param_name, param_def in properties.items():
param_type = param_def.get('type', 'any')
param_desc = param_def.get('description', '')
is_required = param_name in required
req_text = ' (required)' if is_required else ' (optional)'
param_lines.append(f" {param_name} ({param_type}){req_text}: {param_desc}")
# Create example value based on type
if param_type == 'string':
example_args[param_name] = f"example_{param_name}"
elif param_type == 'number':
example_args[param_name] = 42
elif param_type == 'boolean':
example_args[param_name] = True
else:
example_args[param_name] = f"value_for_{param_name}"
if param_lines:
param_info = "\n\nParameters:\n" + "\n".join(param_lines)
# Use actual parameter names in example if available
example_json = json.dumps(example_args, indent=2) if example_args else '{"arg1": "value1"}'
docstring = f"""MCP Tool: {tool_name} from {server_name}
{description}{param_info}
{description}
Example:
{{ "{cmd_name}": {example_json} }}
"""
# Register with command manager
try:
command_manager.register_function(
name=cmd_name,
provider="mcp",
implementation=wrapper,
signature=inspect.signature(wrapper),
docstring=docstring,
flags=[]
)
print(f"Registered MCP command: {cmd_name}")
# Verify registration
if cmd_name in command_manager.functions:
print(f" ✅ Verified: {cmd_name} is in command manager")
else:
print(f" ❌ Warning: {cmd_name} not found in command manager after registration")
except Exception as e:
print(f"Error registering {cmd_name}: {e}")
except Exception as e:
print(f"Error processing tool {getattr(tool, 'name', 'unknown')}: {e}")
continue
[docs]
async def unregister_server_tools(self, server_name: str):
"""Unregister all tools for a server"""
commands_to_remove = []
for cmd_name, srv_name in self.registered_commands.items():
if srv_name == server_name:
commands_to_remove.append(cmd_name)
for cmd_name in commands_to_remove:
try:
if cmd_name in command_manager.functions:
del command_manager.functions[cmd_name]
del self.registered_commands[cmd_name]
print(f"Unregistered MCP command: {cmd_name}")
except Exception as e:
print(f"Error unregistering {cmd_name}: {e}")
[docs]
def get_registered_commands(self) -> List[str]:
"""Get list of registered command names"""
return list(self.registered_commands.keys())