[docs]asyncdeffind_chatlog_file(log_id:str)->str:""" Find a chatlog file by its log_id. Args: log_id: The log ID to search for Returns: The full path to the chatlog file if found, None otherwise """chat_dir=os.environ.get('CHATLOG_DIR','data/chat')# Use os.walk to search through all subdirectoriesforroot,dirs,filesinawaitasyncio.to_thread(os.walk,chat_dir):forfileinfiles:iffile==f"chatlog_{log_id}.json":returnos.path.join(root,file)returnNone
[docs]defextract_delegate_task_log_ids(messages:List[Dict])->List[str]:""" Extract log IDs from delegate_task commands in messages. Args: messages: List of chat messages Returns: List of log IDs found in delegate_task commands """log_ids=[]formessageinmessages:ifmessage['role']=='assistant':content=message['content']# Handle both string and list content formatsifisinstance(content,str):text=contentelifisinstance(content,list)andlen(content)>0and'text'incontent[0]:text=content[0]['text']else:continue# Try to parse as JSONtry:commands=json.loads(text)ifnotisinstance(commands,list):commands=[commands]forcmdincommands:forkey,valueincmd.items():ifkey=='delegate_task'and'log_id'invalue:log_ids.append(value['log_id'])except(json.JSONDecodeError,TypeError,KeyError):# If not JSON, try regex to find log_ids in delegate_task commandsmatches=re.findall(r'"delegate_task"\s*:\s*{\s*"log_id"\s*:\s*"([^"]+)"',text)log_ids.extend(matches)returnlog_ids
[docs]asyncdefget_cache_dir()->str:""" Get the directory for token count cache files. Creates the directory if it doesn't exist. """cache_dir=os.environ.get('TOKEN_CACHE_DIR','data/token_cache')ifnotawaitaiofiles.os.path.exists(cache_dir):awaitaiofiles.os.makedirs(cache_dir)returncache_dir
[docs]asyncdefget_cache_path(log_id:str)->str:""" Get the path to the cache file for a specific log_id. """cache_dir=awaitget_cache_dir()returnos.path.join(cache_dir,f"tokens_{log_id}.json")
[docs]asyncdefget_cached_token_counts(log_id:str,log_path:str)->Dict[str,int]:""" Get cached token counts if available and valid. Args: log_id: The log ID log_path: Path to the actual log file Returns: Cached token counts if valid, None otherwise """cache_path=awaitget_cache_path(log_id)# If cache doesn't exist, return Noneifnotawaitaiofiles.os.path.exists(cache_path):returnNonetry:# Get modification timeslog_mtime=awaitaiofiles.os.path.getmtime(log_path)cache_mtime=awaitaiofiles.os.path.getmtime(cache_path)current_time=time.time()# If log was modified after cache was created, cache is invalidiflog_mtime>cache_mtime:returnNone# Don't recalculate sooner than 3 minutes after last calculationifcurrent_time-cache_mtime<180:# 3 minutes in secondsasyncwithaiofiles.open(cache_path,'r')asf:content=awaitf.read()returnjson.loads(content)# For logs that haven't been modified in over an hour, consider them "finished"# and use the cache regardless of when it was last calculatedifcurrent_time-log_mtime>3600:# 1 hour in secondsasyncwithaiofiles.open(cache_path,'r')asf:content=awaitf.read()returnjson.loads(content)except(json.JSONDecodeError,IOError)ase:print(f"Error reading token cache: {e}")returnNone
[docs]asyncdefsave_token_counts_to_cache(log_id:str,token_counts:Dict[str,int])->None:""" Save token counts to cache. """cache_path=awaitget_cache_path(log_id)asyncwithaiofiles.open(cache_path,'w')asf:awaitf.write(json.dumps(token_counts))
[docs]asyncdefcount_tokens_for_log_id(log_id:str)->Dict[str,int]:""" Count tokens for a chat log identified by log_id, including any delegated tasks. Args: log_id: The log ID to count tokens for Returns: Dictionary with token counts or None if log not found """# Find the chatlog filechatlog_path=awaitfind_chatlog_file(log_id)ifnotchatlog_path:returnNone# Check cache firstcached_counts=awaitget_cached_token_counts(log_id,chatlog_path)ifcached_counts:print(f"Using cached token counts for {log_id}")returncached_countsprint(f"Calculating token counts for {log_id}")# Load the chat logasyncwithaiofiles.open(chatlog_path,'r')asf:content=awaitf.read()log_data=json.loads(content)# Create a temporary ChatLog instance to count tokenstemp_log=ChatLog(log_id=log_id,user="system",agent=log_data.get('agent','unknown'))temp_log.messages=log_data.get('messages',[])# Count tokens for this logparent_counts=temp_log.count_tokens()# Create combined counts (starting with parent counts)combined_counts={}combined_counts['input_tokens_sequence']=parent_counts['input_tokens_sequence']combined_counts['output_tokens_sequence']=parent_counts['output_tokens_sequence']combined_counts['input_tokens_total']=parent_counts['input_tokens_total']# Find delegated task log IDsdelegated_log_ids=extract_delegate_task_log_ids(temp_log.messages)# Recursively count tokens for delegated tasksfordelegated_idindelegated_log_ids:delegated_counts=awaitcount_tokens_for_log_id(delegated_id)ifdelegated_counts:combined_counts['input_tokens_sequence']+=delegated_counts['input_tokens_sequence']combined_counts['output_tokens_sequence']+=delegated_counts['output_tokens_sequence']combined_counts['input_tokens_total']+=delegated_counts['input_tokens_total']# Create final result with both parent and combined countstoken_counts={}# Parent session only countstoken_counts['input_tokens_sequence']=parent_counts['input_tokens_sequence']token_counts['output_tokens_sequence']=parent_counts['output_tokens_sequence']token_counts['input_tokens_total']=parent_counts['input_tokens_total']# Combined counts (parent + all subtasks)token_counts['combined_input_tokens_sequence']=combined_counts['input_tokens_sequence']token_counts['combined_output_tokens_sequence']=combined_counts['output_tokens_sequence']token_counts['combined_input_tokens_total']=combined_counts['input_tokens_total']# Save to cacheawaitsave_token_counts_to_cache(log_id,token_counts)returntoken_counts