Source code for mindroot.coreplugins.usage.storage

from pathlib import Path
import json
from datetime import datetime, date
from typing import List, Dict, Optional, Iterator, Any
import aiofiles
import asyncio
from .models import UsageEvent
from loguru import logger
import traceback

[docs] class UsageStorage: def __init__(self, base_path: str): self.base_path = Path(base_path) self.config_path = self.base_path / 'config' / 'usage' self.data_path = self.base_path / 'data' / 'usage' logger.info(f"Usage storage initialized at {self.base_path}") self._ensure_paths() def _ensure_paths(self): """Create necessary directories if they don't exist""" self.config_path.mkdir(parents=True, exist_ok=True) self.data_path.mkdir(parents=True, exist_ok=True) async def _atomic_write_json(self, path: Path, data: Any): """Atomically write JSON data to a file""" temp_path = path.with_suffix('.tmp') async with aiofiles.open(temp_path, 'w') as f: await f.write(json.dumps(data, indent=2)) temp_path.rename(path) async def _read_json(self, path: Path, default: Any = None) -> Any: """Read JSON data from a file, return default if file doesn't exist""" try: async with aiofiles.open(path, 'r') as f: content = await f.read() return json.loads(content) except (FileNotFoundError, json.JSONDecodeError): logger.error(f"Failed to read JSON from {path}") return default
[docs] async def load_cost_types(self) -> Dict: """Load cost type definitions""" return await self._read_json(self.config_path / 'cost_types.json', {})
[docs] async def save_cost_type(self, cost_type_id: str, info: Dict): """Save a cost type definition""" cost_types = await self.load_cost_types() cost_types[cost_type_id] = info await self._atomic_write_json(self.config_path / 'cost_types.json', cost_types)
[docs] async def load_costs(self) -> Dict: """Load cost configurations""" trace = traceback.format_stack() logger.info(f"Loading costs from {self.config_path} - {trace}") costs = await self._read_json(self.config_path / 'costs.json', {}) logger.info(f"Loaded cost configurations: {costs}") return costs
[docs] async def save_cost(self, plugin_id: str, cost_type_id: str, unit_cost: float, model_id: Optional[str] = None): """Save a cost configuration""" costs = await self.load_costs() if plugin_id not in costs: costs[plugin_id] = {} if cost_type_id not in costs[plugin_id]: costs[plugin_id][cost_type_id] = {} if model_id: if 'model_specific' not in costs[plugin_id][cost_type_id]: costs[plugin_id][cost_type_id]['model_specific'] = {} costs[plugin_id][cost_type_id]['model_specific'][model_id] = unit_cost else: costs[plugin_id][cost_type_id]['default'] = unit_cost await self._atomic_write_json(self.config_path / 'costs.json', costs)
def _get_user_dir(self, username: str) -> Path: """Get the directory for a user's usage data""" user_dir = self.data_path / username user_dir.mkdir(exist_ok=True) return user_dir def _get_date_file(self, username: str, date_obj: date) -> Path: """Get the file path for a user's usage data on a specific date""" return self._get_user_dir(username) / f"usage_{date_obj.isoformat()}.jsonl"
[docs] async def store_event(self, event: UsageEvent, cost: float): """Store a usage event with its calculated cost""" file_path = self._get_date_file(event.username, event.timestamp.date()) event_dict = event.to_dict() event_dict['cost'] = cost async with aiofiles.open(file_path, 'a') as f: await f.write(json.dumps(event_dict) + '\n')
[docs] async def get_usage(self, username: str, start_date: Optional[date] = None, end_date: Optional[date] = None) -> List[Dict]: """Get usage records for a user within date range""" usage_records = [] user_dir = self._get_user_dir(username) if not user_dir.exists(): return [] for file_path in sorted(user_dir.glob("usage_*.jsonl")): try: file_date = date.fromisoformat(file_path.stem.replace('usage_', '')) if start_date and file_date < start_date: continue if end_date and file_date > end_date: continue async with aiofiles.open(file_path, 'r') as f: async for line in f: try: record = json.loads(line) record_date = datetime.fromisoformat(record['timestamp']).date() if start_date and record_date < start_date: continue if end_date and record_date > end_date: continue usage_records.append(record) except (json.JSONDecodeError, KeyError): continue except ValueError: continue return usage_records
[docs] async def get_total_cost(self, username: str, start_date: Optional[date] = None, end_date: Optional[date] = None) -> float: """Calculate total cost for a user within date range""" total = 0.0 records = await self.get_usage(username, start_date, end_date) for record in records: total += record.get('cost', 0.0) return total