[docs]classCreditStorage:def__init__(self,base_path:str):self.base_path=Path(base_path)def_get_user_dir(self,username:str)->Path:"""Get the user's credit directory"""returnself.base_path/'data'/'credits'/usernamedef_get_month_dir(self,username:str,timestamp:datetime)->Path:"""Get the monthly transaction directory"""user_dir=self._get_user_dir(username)returnuser_dir/'transactions'/timestamp.strftime('%Y-%m')def_get_balance_file(self,username:str,month_date:date)->Path:"""Get the monthly balance snapshot file"""user_dir=self._get_user_dir(username)balance_dir=user_dir/'balances'balance_dir.mkdir(parents=True,exist_ok=True)returnbalance_dir/f"{month_date.strftime('%Y-%m')}.json"
[docs]asyncdefstore_transaction(self,transaction:CreditTransaction)->None:"""Store a credit transaction"""month_dir=self._get_month_dir(transaction.username,transaction.timestamp)month_dir.mkdir(parents=True,exist_ok=True)transaction_file=month_dir/'transactions.jsonl'withopen(transaction_file,'a')asf:f.write(json.dumps(transaction.to_dict())+'\n')# Update monthly balance snapshotbalance_file=self._get_balance_file(transaction.username,transaction.timestamp.date().replace(day=1))try:withopen(balance_file,'r')asf:balance_data=json.load(f)except(FileNotFoundError,json.JSONDecodeError):balance_data={'balance':0.0,'last_transaction':None}balance_data['balance']=transaction.balancebalance_data['last_transaction']=transaction.transaction_idwithopen(balance_file,'w')asf:json.dump(balance_data,f)
def_iter_month_files(self,username:str,start_date:Optional[date]=None,end_date:Optional[date]=None)->Iterator[Path]:"""Iterate through monthly transaction files"""user_dir=self._get_user_dir(username)transactions_dir=user_dir/'transactions'ifnottransactions_dir.exists():returnformonth_dirinsorted(transactions_dir.iterdir()):ifnotmonth_dir.is_dir():continuetry:month_date=datetime.strptime(month_dir.name,'%Y-%m').date()ifstart_dateandmonth_date<start_date.replace(day=1):continueifend_dateandmonth_date>end_date.replace(day=1):continuetransaction_file=month_dir/'transactions.jsonl'iftransaction_file.exists():yieldtransaction_fileexceptValueError:continue
[docs]asyncdefget_transactions(self,username:str,start_date:Optional[date]=None,end_date:Optional[date]=None)->List[CreditTransaction]:"""Get credit transactions for a user within date range"""transactions=[]forfile_pathinself._iter_month_files(username,start_date,end_date):withopen(file_path,'r')asf:forlineinf:try:data=json.loads(line)transaction=CreditTransaction.from_dict(data)ifstart_dateandtransaction.timestamp.date()<start_date:continueifend_dateandtransaction.timestamp.date()>end_date:continuetransactions.append(transaction)except(json.JSONDecodeError,KeyError):continuereturnsorted(transactions,key=lambdat:t.timestamp)
[docs]asyncdefget_latest_balance(self,username:str)->float:"""Get the user's latest balance"""user_dir=self._get_user_dir(username)balance_dir=user_dir/'balances'ifnotbalance_dir.exists():return0.0# Find most recent balance filebalance_files=sorted(balance_dir.glob('*.json'),reverse=True)forfile_pathinbalance_files:try:withopen(file_path,'r')asf:data=json.load(f)returndata['balance']except(json.JSONDecodeError,KeyError,FileNotFoundError):continuereturn0.0
[docs]asyncdefget_balance_at(self,username:str,at_date:date)->float:"""Get the user's balance at a specific date"""# Find the most recent balance snapshot before the datebalance_file=self._get_balance_file(username,at_date.replace(day=1))try:withopen(balance_file,'r')asf:data=json.load(f)returndata['balance']except(FileNotFoundError,json.JSONDecodeError):# If no snapshot, calculate from all previous transactionstransactions=awaitself.get_transactions(username,end_date=at_date)returntransactions[-1].balanceiftransactionselse0.0