#!/usr/bin/env python3importasyncioimportsysfromtypingimportOptional,Callable,Any,List,Dict,AsyncGenerator# ANSI color codes for terminal outputCOLORS={'reset':'\033[0m','green':'\033[32m',# stdout'yellow':'\033[33m',# stderr/warning'blue':'\033[34m',# info'red':'\033[31m'# error}
[docs]asyncdefread_stream(stream:asyncio.StreamReader,callback:Callable[[str],Any]):"""Read from stream line by line and call the callback for each line."""whileTrue:line=awaitstream.readline()ifnotline:breakcallback(line.decode('utf-8',errors='replace'))
[docs]asyncdefrun_command_with_streaming(cmd:List[str],stdout_callback:Callable[[str],Any],stderr_callback:Callable[[str],Any],cwd:Optional[str]=None,env:Optional[dict]=None)->int:"""Run a command asynchronously and stream its output. Args: cmd: Command to run as a list of strings stdout_callback: Callback for stdout lines stderr_callback: Callback for stderr lines cwd: Working directory for the command env: Environment variables for the command Returns: Exit code of the command """try:process=awaitasyncio.create_subprocess_exec(*cmd,stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE,cwd=cwd,env=env)# Create tasks to read from stdout and stderrstdout_task=asyncio.create_task(read_stream(process.stdout,stdout_callback))stderr_task=asyncio.create_task(read_stream(process.stderr,stderr_callback))# Wait for the process to complete and streams to be fully readawaitasyncio.gather(stdout_task,stderr_task)exit_code=awaitprocess.wait()returnexit_codeexceptExceptionase:print(f"Failed to run command: {e}")return1
[docs]asyncdefstream_command_as_events(cmd:List[str],cwd:Optional[str]=None,env:Optional[dict]=None)->AsyncGenerator[Dict[str,str],None]:"""Run a command and yield its output as events. Args: cmd: Command to run as a list of strings cwd: Working directory for the command env: Environment variables for the command Yields: Events with type and data """# Debug outputprint(f"{COLORS['blue']}[DEBUG] Running command: {' '.join(cmd)}{COLORS['reset']}")# Send initial eventyield{"event":"message","data":f"Running command: {' '.join(cmd)}"}# Create queues for stdout and stderroutput_queue=asyncio.Queue()# Define callbacks for stdout and stderrdefstdout_callback(line):ifline.strip():print(f"{COLORS['green']}[STDOUT] {line.strip()}{COLORS['reset']}")output_queue.put_nowait(("message",line.strip()))defstderr_callback(line):ifline.strip():# Determine if this is a warning or an errorif("WARNING:"inlineor"DEPRECATION:"inlineor"A new release of pip is available"inline):print(f"{COLORS['yellow']}[WARNING] {line.strip()}{COLORS['reset']}")output_queue.put_nowait(("warning",line.strip()))else:print(f"{COLORS['red']}[ERROR] {line.strip()}{COLORS['reset']}")output_queue.put_nowait(("warning",line.strip()))# Run the command in a separate taskrun_task=asyncio.create_task(run_command_with_streaming(cmd,stdout_callback,stderr_callback,cwd,env))# Stream events from the queue while the command is runningwhilenotrun_task.done()ornotoutput_queue.empty():try:event_type,data=awaitasyncio.wait_for(output_queue.get(),timeout=0.1)yield{"event":event_type,"data":data}exceptasyncio.TimeoutError:# No output available, just continueawaitasyncio.sleep(0.01)# Get the exit codeexit_code=awaitrun_task# Send completion eventprint(f"{COLORS['blue']}[INFO] Command completed with exit code {exit_code}{COLORS['reset']}")ifexit_code==0:yield{"event":"complete","data":"Command completed successfully"}else:print(f"{COLORS['red']}[ERROR] Command failed with exit code {exit_code}{COLORS['reset']}")yield{"event":"error","data":f"Command failed with exit code {exit_code}"}