Source code for mindroot.coreplugins.credits.storage

from pathlib import Path
import json
from datetime import datetime, date
from typing import List, Dict, Optional, Iterator
from .models import CreditTransaction

[docs] class CreditStorage: def __init__(self, base_path: str): self.base_path = Path(base_path) def _get_user_dir(self, username: str) -> Path: """Get the user's credit directory""" return self.base_path / 'data' / 'credits' / username def _get_month_dir(self, username: str, timestamp: datetime) -> Path: """Get the monthly transaction directory""" user_dir = self._get_user_dir(username) return user_dir / 'transactions' / timestamp.strftime('%Y-%m') def _get_balance_file(self, username: str, month_date: date) -> Path: """Get the monthly balance snapshot file""" user_dir = self._get_user_dir(username) balance_dir = user_dir / 'balances' balance_dir.mkdir(parents=True, exist_ok=True) return balance_dir / f"{month_date.strftime('%Y-%m')}.json"
[docs] async def store_transaction(self, transaction: CreditTransaction) -> None: """Store a credit transaction""" month_dir = self._get_month_dir(transaction.username, transaction.timestamp) month_dir.mkdir(parents=True, exist_ok=True) transaction_file = month_dir / 'transactions.jsonl' with open(transaction_file, 'a') as f: f.write(json.dumps(transaction.to_dict()) + '\n') # Update monthly balance snapshot balance_file = self._get_balance_file( transaction.username, transaction.timestamp.date().replace(day=1) ) try: with open(balance_file, 'r') as f: balance_data = json.load(f) except (FileNotFoundError, json.JSONDecodeError): balance_data = {'balance': 0.0, 'last_transaction': None} balance_data['balance'] = transaction.balance balance_data['last_transaction'] = transaction.transaction_id with open(balance_file, 'w') as f: json.dump(balance_data, f)
def _iter_month_files(self, username: str, start_date: Optional[date] = None, end_date: Optional[date] = None) -> Iterator[Path]: """Iterate through monthly transaction files""" user_dir = self._get_user_dir(username) transactions_dir = user_dir / 'transactions' if not transactions_dir.exists(): return for month_dir in sorted(transactions_dir.iterdir()): if not month_dir.is_dir(): continue try: month_date = datetime.strptime(month_dir.name, '%Y-%m').date() if start_date and month_date < start_date.replace(day=1): continue if end_date and month_date > end_date.replace(day=1): continue transaction_file = month_dir / 'transactions.jsonl' if transaction_file.exists(): yield transaction_file except ValueError: continue
[docs] async def get_transactions(self, username: str, start_date: Optional[date] = None, end_date: Optional[date] = None) -> List[CreditTransaction]: """Get credit transactions for a user within date range""" transactions = [] for file_path in self._iter_month_files(username, start_date, end_date): with open(file_path, 'r') as f: for line in f: try: data = json.loads(line) transaction = CreditTransaction.from_dict(data) if start_date and transaction.timestamp.date() < start_date: continue if end_date and transaction.timestamp.date() > end_date: continue transactions.append(transaction) except (json.JSONDecodeError, KeyError): continue return sorted(transactions, key=lambda t: t.timestamp)
[docs] async def get_latest_balance(self, username: str) -> float: """Get the user's latest balance""" user_dir = self._get_user_dir(username) balance_dir = user_dir / 'balances' if not balance_dir.exists(): return 0.0 # Find most recent balance file balance_files = sorted(balance_dir.glob('*.json'), reverse=True) for file_path in balance_files: try: with open(file_path, 'r') as f: data = json.load(f) return data['balance'] except (json.JSONDecodeError, KeyError, FileNotFoundError): continue return 0.0
[docs] async def get_balance_at(self, username: str, at_date: date) -> float: """Get the user's balance at a specific date""" # Find the most recent balance snapshot before the date balance_file = self._get_balance_file(username, at_date.replace(day=1)) try: with open(balance_file, 'r') as f: data = json.load(f) return data['balance'] except (FileNotFoundError, json.JSONDecodeError): # If no snapshot, calculate from all previous transactions transactions = await self.get_transactions( username, end_date=at_date ) return transactions[-1].balance if transactions else 0.0