Source code for mindroot.coreplugins.chat.services

from lib.providers.services import service, service_manager
from lib.providers.commands import command_manager, command
from lib.providers.hooks import hook
from lib.pipelines.pipe import pipeline_manager, pipe
from lib.chatcontext import ChatContext
from lib.chatlog import ChatLog
from typing import List
from lib.utils.dataurl import dataurl_to_pil
from .models import MessageParts
from coreplugins.agent import agent
from lib.utils.debug import debug_box
import os
import sys
import colored
import time
import traceback
import asyncio
import json
import termcolor
from PIL import Image
from io import BytesIO
import base64
import nanoid
sse_clients = {}

[docs] @service() async def prompt(model: str, instructions: str, temperature=0, max_tokens=400, json=False, context=None): messages = [ { "role": "system", "content": "Respond to prompt with no extraneous commentary." }, { "role": "user", "content": instructions }] stream = await context.stream_chat(model, temperature=temperature, max_tokens=max_tokens, messages=messages, json=False, context=context) text = "" if os.environ.get("AH_DEBUG") == "True": print("Prompting, instructions ", instructions) async for chunk in stream: #print("Chunk received: ", chunk) if chunk is None or chunk == "": continue else: text += chunk if os.environ.get("AH_DEBUG") == "True": print(chunk, end='', flush=True) return text
[docs] def results_text(results): text = "" for result in results: if 'text' in result['args']: text += result['args']['text'] + "\n" elif 'markdown' in result['args']: text += result['args']['markdown'] + "\n" text = text.rstrip() return text
[docs] def results_output(results): text = "" for result in results: if 'output' in result['args']: return result['args']['output']
[docs] def results_text_output(results): text = "" for result in results: if 'output' in result['args']: return result['args']['output'] text += result['args']['output'] + "\n" text = text.rstrip() return text
[docs] @service() async def run_task(instructions: str, agent_name:str = None, user:str = None, log_id=None, parent_log_id=None, llm=None, retries=3, context=None): """ Run a task with the given instructions IMPORTANT NOTE: agent must have the task_result() command enabled. """ if context is None: debug_box("Context is none") debug_box("agent_name: " + agent_name) if log_id is None: log_id = nanoid.generate() if user is None: raise Exception("chat: run_task: user required") if agent_name is None: raise Exception("chat: run_task: agent_name required") context = ChatContext(command_manager, service_manager, user) context.agent_name = agent_name context.username = user context.name = agent_name context.log_id = log_id context.parent_log_id = parent_log_id context.agent = await service_manager.get_agent_data(agent_name) context.data['llm'] = llm context.current_model = llm context.chat_log = ChatLog(log_id=log_id, agent=agent_name, user=user, parent_log_id=parent_log_id) await context.save_context() else: debug_box("Context is not none") print(context) print("run_task: ", instructions, "log_id: ", context.log_id) await init_chat_session(context.username, context.agent_name, context.log_id, context) retried = 0 msg = """ # SYSTEM NOTE This task is being run via API and requires a textual or structured output. If your instructions indicate multiple steps with multiple function calls, wait for the system results as you process each step in turn, then call task_result() with the final output after all steps are truly complete. You MUST call task_result() with the final output if you are completing the task. For multi-stage tasks, do not call task_result until the final step is complete. """ instructions = instructions + msg while retried < retries: [results, full_results] = await send_message_to_agent(context.log_id, instructions, context=context) text = results_output(full_results) if text == "": retried += 1 debug_box(f"No output found, retrying task: {retried}") instructions += f"\n\nNot output found (call task_result()!), retrying task: {retried}" else: debug_box(f"Task output found: {text}") break return (text, full_results, context.log_id)
[docs] @service() async def init_chat_session(user:str, agent_name: str, log_id: str, context=None): if agent_name is None or agent_name == "" or log_id is None or log_id == "": print("Invalid agent_name or log_id") print("agent_name: ", agent_name) print("log_id: ", log_id) raise Exception("Invalid agent_name or log_id") if context is None: context = ChatContext(command_manager, service_manager, user) context.agent_name = agent_name context.name = agent_name context.log_id = log_id context.agent = await service_manager.get_agent_data(agent_name) context.chat_log = ChatLog(log_id=log_id, agent=agent_name, user=user) print("context.agent_name: ", context.agent_name) await context.save_context() print("initiated_chat_session: ", log_id, agent_name, context.agent_name, context.agent) return log_id
[docs] @service() async def get_chat_history(agent_name: str, session_id: str, user:str): print("-----------------") print("get_chat_history: ", agent_name, session_id) #context = ChatContext(command_manager, service_manager) #await context.load_context(session_id) agent = await service_manager.get_agent_data(agent_name) persona = agent['persona']['name'] chat_log = ChatLog(log_id=session_id, agent=agent_name, user=user) print("Got chat chat log") messages = chat_log.get_recent() print("messages length: ", len(messages)) for message in messages: if message['role'] == 'user': message['persona'] = 'user' else: message['persona'] = persona return messages
[docs] def process_result(result, formatted_results): print("type of result is ", type(result)) if 'result' in result and type(result['result']) is dict and 'type' in result['result'] and 'image' in result['result']['type']: print("A") img_data = result['result'] result['result'] = '...' new_result = { "type": "text", "text": json.dumps(result) } formatted_results.append(new_result) formatted_results.append(img_data) elif 'result' in result and type(result['result']) is list: print("B") found_image = json.dumps(result['result']).find('"image"') > -1 if found_image: print("Found image") for item in result['result']: process_result({ "result": item}, formatted_results) else: new_result = { "type": "text", "text": json.dumps(result['result']) } formatted_results.append(new_result) else: print("C") new_result = { "type": "text", "text": json.dumps(result) } formatted_results.append(new_result) print("length of results is ", len(formatted_results)) #with open("output/processed_results.json", "w") as f: # f.write(json.dumps(formatted_results) + "\n") return formatted_results
[docs] @service() async def send_message_to_agent(session_id: str, message: str | List[MessageParts], max_iterations=35, context=None, user=None): if os.environ.get("MR_MAX_ITERATIONS") is not None: max_iterations = int(os.environ.get("MR_MAX_ITERATIONS")) if not user: # check context if not context.username: raise Exception("User required") else: user = {"user": context.username } else: if hasattr(user, "dict"): user = user.dict() try: if type(message) is list: message = [m.dict() for m in message] if session_id is None or session_id == "" or message is None or message == "": print("Invalid session_id or message") return [] print("send_message_to_agent: ", session_id, message, max_iterations) if context is None: context = ChatContext(command_manager, service_manager, user) await context.load_context(session_id) agent_ = agent.Agent(agent=context.agent) if user is not None and hasattr(user, "keys"): for key in user.keys(): context.data[key] = user[key] context.data['finished_conversation'] = False tmp_data = { "message": message } tmp_data = await pipeline_manager.pre_process_msg(tmp_data, context=context) message = tmp_data['message'] termcolor.cprint("Final message: " + str(message), "yellow") if type(message) is str: #context.chat_log.add_message({"role": "user", "content": [{"type": "text", "text": message}]}) context.chat_log.add_message({"role": "user", "content": message }) else: new_parts = [] has_image = False for part in message: if part['type'] == 'image': has_image = True img = dataurl_to_pil(part['data']) img_msg = await context.format_image_message(img) new_parts.append(img_msg) elif part['type'] == 'text' and '[UPLOADED FILE]' in part['text']: # Ensure we don't duplicate file entries if not any('[UPLOADED FILE]' in p.get('text', '') for p in new_parts): new_parts.append(part) else: new_parts.append(part) msg_to_add= {"role": "user", "content": new_parts } has_image = has_image or str(msg_to_add).find("image") > -1 context.chat_log.add_message(msg_to_add) await context.save_context() continue_processing = True iterations = 0 results = [] full_results = [] invalid = "ERROR, invalid response format." consecutive_parse_errors = 0 while continue_processing and iterations < max_iterations: iterations += 1 continue_processing = False try: if context.current_model is None: if 'llm' in context.data: context.current_model = context.data['llm'] parse_error = False max_tokens = os.environ.get("MR_MAX_TOKENS", 4000) results, full_cmds = await agent_.chat_commands(context.current_model, context, messages=context.chat_log.get_recent(), max_tokens=max_tokens) if results is not None: try: for result in results: if result['cmd'] == 'UNKNOWN': consecutive_parse_errors += 1 parse_error = True except Exception as e: pass if not parse_error: consecutive_parse_errors = 0 else: await asyncio.sleep(1) if consecutive_parse_errors > 6: raise Exception("Too many consecutive parse errors, stopping processing.") elif consecutive_parse_errors > 3: results.append({"cmd": "UNKNOWN", "args": { "SYSTEM WARNING: Issue valid command list or task; processing will be halted. Simplify output."}}) try: tmp_data3 = { "results": full_cmds } tmp_data3 = await pipeline_manager.process_results(tmp_data3, context=context) out_results = tmp_data3['results'] except Exception as e: print("Error processing results: ", e) print(traceback.format_exc()) for cmd in full_cmds: full_results.append(cmd) out_results = [] stop_requested= False actual_results = False await asyncio.sleep(0.001) for result in results: if 'result' in result and result['result'] is not None: if result['result'] == 'continue': out_results.append(result) continue_processing = True elif result['result'] == 'stop': continue_processing = False stop_requested = True else: out_results.append(result) # only print up to 200 characters truncated_result = str(result)[:200] + '...' termcolor.cprint("Found result: " + truncated_result, "magenta") actual_results = True continue_processing = True else: continue_processing = False if actual_results and not stop_requested: continue_processing = True if len(out_results) > 0: try: tmp_data2 = { "results": out_results } tmp_data2 = await pipeline_manager.process_results(tmp_data2, context=context) out_results = tmp_data2['results'] except Exception as e: print("Error processing results: ", e) print(traceback.format_exc()) formatted_results = [] st_process = time.time() for result in out_results: process_result(result, formatted_results) print("Time to process results: ", time.time() - st_process) context.chat_log.add_message({"role": "user", "content": formatted_results}) results.append(out_results) else: print("Processing iteration: ", iterations, "no message added") if context.data.get('finished_conversation') is True: termcolor.cprint("Finished conversation, exiting send_message_to_agent", "red") continue_processing = False except Exception as e: continue_processing = False await asyncio.sleep(1) trace = traceback.format_exc() msg = str(e) descr = msg + "\n\n" + trace print(descr) print('------') print(msg) try: persona = agent_['persona']['name'] except Exception as e: persona = "System error" context.chat_log.add_message({"role": "user", "content": msg }) await context.agent_output("system_error", { "error": msg }) await asyncio.sleep(0.001) print("Exiting send_message_to_agent: ", session_id, message, max_iterations) await context.finished_chat() return [results, full_results] except Exception as e: print("Error in send_message_to_agent: ", e) print(traceback.format_exc()) return []
[docs] @pipe(name='process_results', priority=5) def add_current_time(data: dict, context=None) -> dict: data['results'] = data['results'] return data
[docs] @service() async def finished_chat(context=None): await context.agent_output("finished_chat", { "persona": context.agent['persona']['name'] })
[docs] @hook() async def quit(context=None): print("Chat service is quitting..") # Close all existing SSE connections for session_id, queues in sse_clients.items(): for queue in queues.copy(): # Use copy to avoid modification during iteration try: await queue.put({'event': 'close', 'data': 'Server shutting down'}) except: pass # Clear the global sse_clients sse_clients.clear() # Give clients a moment to receive the close message await asyncio.sleep(1) print("Chat service finished.") return {"status": "shutdown_complete"}
[docs] @service() async def subscribe_to_agent_messages(session_id: str, context=None): async def event_generator(): queue = asyncio.Queue() if session_id not in sse_clients: sse_clients[session_id] = set() sse_clients[session_id].add(queue) try: while True: data = await queue.get() await asyncio.sleep(0.001) print('.', end='', flush=True) yield data except asyncio.CancelledError: sse_clients[session_id].remove(queue) if not sse_clients[session_id]: del sse_clients[session_id] return event_generator()
[docs] @service() async def close_chat_session(session_id: str, context=None): if session_id in sse_clients: del sse_clients[session_id]
# Any additional cleanup needed
[docs] @service() async def agent_output(event: str, data: dict, context=None): log_id = context.log_id if log_id in sse_clients: for queue in sse_clients[log_id]: await queue.put({"event": event, "data": json.dumps(data)})
[docs] @service() async def append_message(role: str, content, context=None): await context.chat_log.add_message({"role": role, "content": content})
[docs] @service() async def partial_command(command: str, chunk: str, params, context=None): agent_ = context.agent await context.agent_output("partial_command", { "command": command, "chunk": chunk, "params": params, "persona": agent_['persona']['name'] })
[docs] @service() async def running_command(command: str, args, context=None): agent_ = context.agent await context.agent_output("running_command", { "command": command, "args": args, "persona": agent_['persona']['name'] })
[docs] @service() async def command_result(command: str, result, context=None): agent_ = context.agent await context.agent_output("command_result", { "command": command, "result": result, "persona": agent_['persona']['name'] })