Source code for mindroot.coreplugins.subscriptions.credit_integration

from typing import Dict, Any
from loguru import logger

[docs] class CreditIntegration: """Handles integration with the credits plugin for subscription-related credit operations""" def __init__(self): pass
[docs] async def allocate_subscription_credits(self, username: str, amount: float, subscription_id: str, metadata: Dict[str, Any]) -> float: """Allocate credits to a user based on their subscription Args: username: User to allocate credits to amount: Amount of credits to allocate subscription_id: Subscription ID for reference metadata: Additional information about the allocation Returns: float: New credit balance """ try: # Import here to avoid circular imports from mindroot.coreplugins.credits.mod import allocate_credits # Add subscription-specific metadata allocation_metadata = { 'source_type': 'subscription', **metadata } # Allocate credits new_balance = await allocate_credits( username=username, amount=amount, source='subscription', reference_id=subscription_id, metadata=allocation_metadata ) logger.info(f"Allocated {amount} credits to {username} for subscription {subscription_id}") return new_balance except Exception as e: logger.error(f"Failed to allocate subscription credits: {str(e)}") raise
[docs] async def check_credit_balance(self, username: str) -> float: """Get current credit balance for a user""" try: from mindroot.coreplugins.credits.mod import get_credit_balance return await get_credit_balance(username) except Exception as e: logger.error(f"Failed to check credit balance: {str(e)}") raise