Source code for mindroot.coreplugins.subscriptions.credit_integration
fromtypingimportDict,Anyfromloguruimportlogger
[docs]classCreditIntegration:"""Handles integration with the credits plugin for subscription-related credit operations"""def__init__(self):pass
[docs]asyncdefallocate_subscription_credits(self,username:str,amount:float,subscription_id:str,metadata:Dict[str,Any])->float:"""Allocate credits to a user based on their subscription Args: username: User to allocate credits to amount: Amount of credits to allocate subscription_id: Subscription ID for reference metadata: Additional information about the allocation Returns: float: New credit balance """try:# Import here to avoid circular importsfrommindroot.coreplugins.credits.modimportallocate_credits# Add subscription-specific metadataallocation_metadata={'source_type':'subscription',**metadata}# Allocate creditsnew_balance=awaitallocate_credits(username=username,amount=amount,source='subscription',reference_id=subscription_id,metadata=allocation_metadata)logger.info(f"Allocated {amount} credits to {username} for subscription {subscription_id}")returnnew_balanceexceptExceptionase:logger.error(f"Failed to allocate subscription credits: {str(e)}")raise
[docs]asyncdefcheck_credit_balance(self,username:str)->float:"""Get current credit balance for a user"""try:frommindroot.coreplugins.credits.modimportget_credit_balancereturnawaitget_credit_balance(username)exceptExceptionase:logger.error(f"Failed to check credit balance: {str(e)}")raise