import inspect
import traceback
import json
import logging
from typing import List, Dict, Optional
from ..db.preferences import find_preferred_models
from ..db.organize_models import uses_models, matching_models
from ..utils.check_args import *
from ..utils.debug import debug_box
import sys
import nanoid
from termcolor import colored
# Import the new v2 preferences system with try/except for backward compatibility
try:
from .model_preferences_v2 import ModelPreferencesV2
except ImportError:
ModelPreferencesV2 = None
[docs]
class ProviderManager:
def __init__(self):
self.functions = {}
[docs]
def register_function(self, name, provider, implementation, signature, docstring, flags):
if name not in self.functions:
self.functions[name] = []
if provider in [func_info['provider'] for func_info in self.functions[name]]:
return
if name == 'say':
debug_box('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<')
print("REGISTER:", name, provider)
self.functions[name].append({'implementation': implementation, 'docstring': docstring, 'flags': flags, 'provider': provider})
[docs]
async def exec_with_provider(self, name, provider, *args, **kwargs):
if name not in self.functions:
raise ValueError(f"function '{name}' not found.")
func_info = None
for f in self.functions[name]:
if f['provider'] == provider:
func_info = f
break
implementation = func_info['implementation']
if implementation is None:
raise ValueError(f"function '{name}' not found for provider '{provider}'.")
try:
result = await implementation(*args, **kwargs)
except Exception as e:
raise e
return result
[docs]
async def execute(self, name, *args, **kwargs):
if check_empty_args(args, kwargs=kwargs):
raise ValueError(f"function '{name}' called with empty arguments.")
if name not in self.functions:
raise ValueError(f"function '{name}' not found.")
preferred_models = None
preferred_provider = None
preferred_providers = None
found_context = False
context = None
for arg in args:
if arg.__class__.__name__ == 'ChatContext' or hasattr(arg, 'agent'):
found_context = True
context = arg
break
if not found_context and 'context' in kwargs:
context = kwargs['context']
found_context = True
if not found_context and (not 'context' in kwargs):
kwargs['context'] = self.context
context = self.context
need_model = await uses_models(name)
# DEBUG: Check what we have for model selection
print(f"\n=== MODEL SELECTION DEBUG for {name} ===")
print(f"args[0]: {args[0] if len(args) > 0 else 'N/A'}")
print(f"kwargs.get('model'): {kwargs.get('model', 'N/A')}")
if context and hasattr(context, 'agent') and context.agent:
print(f"Agent service_models: {context.agent.get('service_models', 'N/A')}")
else:
print("No agent context or service_models")
print("=== END DEBUG ===")
if (len(args) > 0 and args[0] is None) and not 'model' in kwargs or ('model' in kwargs and kwargs['model'] is None):
print("No model specified, checking service_models")
if context is not None and context.agent is not None and 'service_models' in context.agent:
service_models = context.agent['service_models']
if name in service_models:
print("found service_models in agent")
print("set model (args[0]) to", service_models[name]['model'])
for func_info in self.functions[name]:
if func_info['provider'] == service_models[name]['provider']:
args = (service_models[name]['model'], *args[1:])
return await func_info['implementation'](*args, **kwargs)
print(f"WARNING: provider {service_models['name']['model']} specified for service {name}, but provider not enabled not enabled.")
else:
print("did not find service_models in agent")
print('context.agent:', context.agent)
# NEW V2 PREFERENCES LOGIC - Only as fallback when no agent-specific model
if ModelPreferencesV2 is not None:
try:
print("No agent-specific model found, trying V2 system preferences...")
prefs_manager = ModelPreferencesV2()
ordered_providers = prefs_manager.get_ordered_providers_for_service(name)
if ordered_providers:
print(f"Found V2 preferences for {name}: {ordered_providers}")
for provider_name, model_name in ordered_providers:
# Check if this provider is available for this function
if name in self.functions:
for func_info in self.functions[name]:
if func_info['provider'] == provider_name:
try:
print(f"Trying V2 provider {provider_name} with model {model_name}")
# Set the model as first argument if needed
if len(args) > 0 and (args[0] is None or not args[0]):
args = (model_name, *args[1:])
elif 'model' not in kwargs:
kwargs['model'] = model_name
return await func_info['implementation'](*args, **kwargs)
except Exception as e:
print(f"V2 provider {provider_name} failed: {e}, trying next...")
continue
except Exception as e:
print(f"V2 preferences failed: {e}, continuing with existing logic")
else:
print("Found possible model in zeroth arg:")
if len(args) > 0:
print(args[0])
from coreplugins.admin.service_models import cached_get_service_models
all_service_models = await cached_get_service_models()
this_service_models = all_service_models.get(name, {})
model_name = args[0]
try:
if '__' in model_name:
#provider = model_name.split('__')[0]
model_name = model_name.split('__')[1]
for provider, model_list in this_service_models.items():
if model_name in model_list:
print("found provider", provider)
for func_info in self.functions[name]:
if func_info['provider'] == provider:
return await func_info['implementation'](*args, **kwargs)
except Exception as e:
pass
required_plugins = []
if context and hasattr(context, 'agent') and context.agent:
required_plugins = context.agent.get('required_plugins', [])
if required_plugins and name in self.functions:
for plugin in required_plugins:
for func_info in self.functions[name]:
if func_info['provider'] == plugin:
function_info = func_info
return await function_info['implementation'](*args, **kwargs)
preferred_providers_list = []
if context is not None and hasattr(context, 'agent') and context.agent:
preferred_providers = context.agent.get('preferred_providers', [])
if isinstance(preferred_providers, list):
preferred_providers_list = preferred_providers
elif isinstance(preferred_providers, dict):
if name in preferred_providers:
preferred_provider = preferred_providers[name]
for func_info in self.functions[name]:
if func_info['provider'] == preferred_provider:
function_info = func_info
return await function_info['implementation'](*args, **kwargs)
if name == 'stream_chat' and context is None:
raise ValueError('stream_chat, context is None')
if name == 'stream_chat' and context.agent is None:
raise ValueError('stream_chat, context.agent is None')
if name == 'stream_chat':
debug_box('execute stream_chat <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<')
if context is not None and hasattr(context, 'data') and 'PREFERRED_PROVIDER' in context.data:
preferred_providers_list = [ context.data['PREFERRED_PROVIDER'] ]
if preferred_providers_list and name in self.functions:
for func_info in self.functions[name]:
if func_info['provider'] in preferred_providers_list:
return await func_info['implementation'](*args, **kwargs)
if preferred_providers and name in preferred_providers:
preferred_provider = preferred_providers[name]
for func_info in self.functions[name]:
if func_info['provider'] == preferred_provider:
function_info = func_info
return await function_info['implementation'](*args, **kwargs)
if context.__class__.__name__ == 'ChatContext':
preferred_models = await find_preferred_models(name, context.flags)
context.data['model'] = None
if need_model and preferred_models is None:
preferred_models = await matching_models(name, context.flags)
if preferred_models is not None:
if len(preferred_models) > 0:
context.data['model'] = preferred_models[0]
if preferred_models is not None:
if len(preferred_models) > 0:
try:
preferred_provider = preferred_models[0]['provider']
except KeyError:
preferred_provider = None
function_info = None
if not need_model and preferred_provider is None:
preferred_provider = self.functions[name][0]['provider']
if preferred_provider is not None:
for func_info in self.functions[name]:
if func_info['provider'] == preferred_provider:
function_info = func_info
break
function_info = self.functions[name][0]
if function_info is None:
raise ValueError(f"1. function '{name}' not found. preferred_provider is '{preferred_provider}'.")
implementation = function_info['implementation']
if implementation is None:
raise ValueError(f"2. function '{name}' not found. preferred_provider is '{preferred_provider}'.")
try:
result = await implementation(*args, **kwargs)
except Exception as e:
raise e
return result
[docs]
def get_docstring(self, name):
if name not in self.functions:
logging.warning(f"docstring for '{name}' not found.")
return []
return self.functions[name][0]['docstring']
[docs]
def get_detailed_functions(self):
return self.functions
[docs]
def get_functions(self):
return list(self.functions.keys())
[docs]
def get_docstrings(self):
return {name: self.get_docstring(name) for name in self.functions.keys()}
[docs]
def get_some_docstrings(self, names):
debug_box("------------------------->>>>>>>>>>>>>>>>>>>>>>")
print("Get some doc strings")
print("self.functions", self.functions.keys())
filtered = []
for name in names:
if name not in self.functions:
logging.warning(f"agent function '{name}' not found")
else:
filtered.append(name)
return {name: self.get_docstring(name) for name in filtered}
def __getattr__(self, name):
async def method(*args, **kwargs):
return await self.execute(name, *args, **kwargs)
return method
[docs]
class HookManager:
_instance = None
_initialized = False
_hook_manager = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._hook_manager = cls._instance
return cls._instance
def __init__(self):
if not self._initialized:
self.unique_id = nanoid.generate()
self.hooks = {}
self.__class__._initialized = True
[docs]
def register_hook(self, name, implementation, signature, docstring):
if name not in self.hooks:
self.hooks[name] = []
self.hooks[name].append({'implementation': implementation, 'docstring': docstring})
[docs]
async def execute_hooks(self, name, *args, **kwargs):
if name not in self.hooks:
return []
results = []
for hook_info in self.hooks[name]:
implementation = hook_info['implementation']
result = await implementation(*args, **kwargs)
results.append(result)
return results
[docs]
def get_docstring(self, name):
if name not in self.hooks:
raise ValueError(f"hook '{name}' not found.")
return [hook_info['docstring'] for hook_info in self.hooks[name]]
[docs]
def get_hooks(self):
return list(self.hooks.keys())
[docs]
def get_docstrings(self):
return {name: self.get_docstring(name) for name in self.hooks.keys()}
def __getattr__(self, name):
async def method(*args, **kwargs):
return await self.execute_hooks(name, *args, **kwargs)
return method