Source code for mindroot.lib.streamcmd

#!/usr/bin/env python3

import asyncio
import sys
from typing import Optional, Callable, Any, List, Dict, AsyncGenerator

# ANSI color codes for terminal output
COLORS = {
    'reset': '\033[0m',
    'green': '\033[32m',  # stdout
    'yellow': '\033[33m',  # stderr/warning
    'blue': '\033[34m',    # info
    'red': '\033[31m'      # error
}


[docs] async def read_stream(stream: asyncio.StreamReader, callback: Callable[[str], Any]): """Read from stream line by line and call the callback for each line.""" while True: line = await stream.readline() if not line: break callback(line.decode('utf-8', errors='replace'))
[docs] async def run_command_with_streaming( cmd: List[str], stdout_callback: Callable[[str], Any], stderr_callback: Callable[[str], Any], cwd: Optional[str] = None, env: Optional[dict] = None ) -> int: """Run a command asynchronously and stream its output. Args: cmd: Command to run as a list of strings stdout_callback: Callback for stdout lines stderr_callback: Callback for stderr lines cwd: Working directory for the command env: Environment variables for the command Returns: Exit code of the command """ try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=env ) # Create tasks to read from stdout and stderr stdout_task = asyncio.create_task( read_stream(process.stdout, stdout_callback) ) stderr_task = asyncio.create_task( read_stream(process.stderr, stderr_callback) ) # Wait for the process to complete and streams to be fully read await asyncio.gather(stdout_task, stderr_task) exit_code = await process.wait() return exit_code except Exception as e: print(f"Failed to run command: {e}") return 1
[docs] async def stream_command_as_events( cmd: List[str], cwd: Optional[str] = None, env: Optional[dict] = None ) -> AsyncGenerator[Dict[str, str], None]: """Run a command and yield its output as events. Args: cmd: Command to run as a list of strings cwd: Working directory for the command env: Environment variables for the command Yields: Events with type and data """ # Debug output print(f"{COLORS['blue']}[DEBUG] Running command: {' '.join(cmd)}{COLORS['reset']}") # Send initial event yield {"event": "message", "data": f"Running command: {' '.join(cmd)}"} # Create queues for stdout and stderr output_queue = asyncio.Queue() # Define callbacks for stdout and stderr def stdout_callback(line): if line.strip(): print(f"{COLORS['green']}[STDOUT] {line.strip()}{COLORS['reset']}") output_queue.put_nowait(("message", line.strip())) def stderr_callback(line): if line.strip(): # Determine if this is a warning or an error if ("WARNING:" in line or "DEPRECATION:" in line or "A new release of pip is available" in line): print(f"{COLORS['yellow']}[WARNING] {line.strip()}{COLORS['reset']}") output_queue.put_nowait(("warning", line.strip())) else: print(f"{COLORS['red']}[ERROR] {line.strip()}{COLORS['reset']}") output_queue.put_nowait(("warning", line.strip())) # Run the command in a separate task run_task = asyncio.create_task( run_command_with_streaming(cmd, stdout_callback, stderr_callback, cwd, env) ) # Stream events from the queue while the command is running while not run_task.done() or not output_queue.empty(): try: event_type, data = await asyncio.wait_for(output_queue.get(), timeout=0.1) yield {"event": event_type, "data": data} except asyncio.TimeoutError: # No output available, just continue await asyncio.sleep(0.01) # Get the exit code exit_code = await run_task # Send completion event print(f"{COLORS['blue']}[INFO] Command completed with exit code {exit_code}{COLORS['reset']}") if exit_code == 0: yield {"event": "complete", "data": "Command completed successfully"} else: print(f"{COLORS['red']}[ERROR] Command failed with exit code {exit_code}{COLORS['reset']}") yield {"event": "error", "data": f"Command failed with exit code {exit_code}"}