[docs]asyncdefcreate_sse_client(url:str,access_token:str,queue:asyncio.Queue,conv_id:str):"""Creates internal SSE client that forwards to queue with conversation ID"""headers={'Cookie':f'access_token={access_token}'}asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url,headers=headers)asresponse:current_event=Nonecurrent_data=Noneasyncforlineinresponse.content:ifline:try:decoded_line=line.decode().strip()# Handle event lineifdecoded_line.startswith('event:'):current_event=decoded_line.split(':',1)[1].strip()continue# Handle data lineifdecoded_line.startswith('data:'):current_data=decoded_line.split(':',1)[1].strip()try:# Parse and add conversation IDdata=json.loads(current_data)data['conversation_id']=conv_idcurrent_data=json.dumps(data)exceptExceptionase:print(f"Error processing data JSON: {e}")# Reconstruct SSE messageifcurrent_event:awaitqueue.put(f"event: {current_event}\ndata: {current_data}\n\n")else:awaitqueue.put(f"data: {current_data}\n\n")current_event=Nonecurrent_data=NoneexceptExceptionase:print(f"Error processing SSE line: {e}")
[docs]@router.get("/events/multi")asyncdefmultiplexed_events(request:Request,conversation_ids:List[str]=Query(None)):ifnotconversation_ids:raiseHTTPException(status_code=400,detail="conversation_ids parameter is required")# Get access token from cookieaccess_token=request.cookies.get('access_token')ifnotaccess_token:raiseHTTPException(status_code=401,detail="Access token required")# Create queue for aggregated messagesmain_queue=asyncio.Queue()# Get base URL for chat eventsbase_url=str(request.base_url).rstrip('/')tasks=[]forconv_idinconversation_ids:url=f"{base_url}/chat/{conv_id}/events"task=asyncio.create_task(create_sse_client(url,access_token,main_queue,conv_id))tasks.append(task)asyncdefevent_generator():try:whileTrue:message=awaitmain_queue.get()ifmessage:yieldmessageexceptasyncio.CancelledError:fortaskintasks:task.cancel()raisereturnEventSourceResponse(event_generator())