Source code for mindroot.coreplugins.subscriptions.webhook_handler

from typing import Dict, Optional, Any
from datetime import datetime
from loguru import logger
from .subscription_manager import SubscriptionManager
from .credit_integration import CreditIntegration

[docs] class WebhookHandler: """Handles webhook events from payment providers for subscription events""" def __init__(self, subscription_manager: SubscriptionManager, credit_integration: CreditIntegration): self.subscription_manager = subscription_manager self.credit_integration = credit_integration self.processed_events = set() # For idempotency
[docs] async def handle_stripe_event(self, event: Dict[str, Any]) -> Optional[Dict]: """Process Stripe webhook events related to subscriptions Args: event: Stripe event object Returns: Optional[Dict]: Processing result """ event_id = event.get('id') event_type = event.get('type') logger.info(f"Processing Stripe event: {event_type} ({event_id})") # Ensure idempotency if event_id in self.processed_events: logger.info(f"Event already processed: {event_id}") return {'status': 'already_processed', 'event_id': event_id} try: if event_type == 'checkout.session.completed': await self._handle_checkout_completed(event) elif event_type == 'invoice.paid': await self._handle_invoice_paid(event) elif event_type == 'customer.subscription.updated': await self._handle_subscription_updated(event) elif event_type == 'customer.subscription.deleted': await self._handle_subscription_deleted(event) else: logger.info(f"Ignoring unhandled event type: {event_type}") return {'status': 'ignored', 'event_id': event_id, 'event_type': event_type} # Mark as processed self.processed_events.add(event_id) return {'status': 'success', 'event_id': event_id, 'event_type': event_type} except Exception as e: logger.error(f"Error processing event {event_id}: {str(e)}") return {'status': 'error', 'event_id': event_id, 'error': str(e)}
async def _handle_checkout_completed(self, event: Dict[str, Any]) -> None: """Handle initial subscription creation and credit allocation""" session = event['data']['object'] # Only process subscription checkouts if session.get('mode') != 'subscription': logger.info(f"Ignoring non-subscription checkout: {session.get('id')}") return username = session.get('client_reference_id') subscription_id = session.get('subscription') metadata = session.get('metadata', {}) plan_id = metadata.get('plan_id') if not all([username, subscription_id, plan_id]): logger.error(f"Missing required subscription data: username={username}, " f"subscription_id={subscription_id}, plan_id={plan_id}") raise ValueError("Missing required subscription data") logger.info(f"Creating subscription for {username}, plan: {plan_id}") # Create subscription record subscription = await self.subscription_manager.create_subscription( username=username, plan_id=plan_id, provider_data={ 'provider': 'stripe', 'provider_subscription_id': subscription_id } ) # Allocate initial credits plan = await self.subscription_manager.get_plan(plan_id) if plan: logger.info(f"Allocating {plan.credits_per_cycle} credits to {username}") await self.credit_integration.allocate_subscription_credits( username=username, amount=plan.credits_per_cycle, subscription_id=subscription.subscription_id, metadata={ 'event': 'subscription_created', 'plan_id': plan_id, 'plan_name': plan.name, 'stripe_session_id': session.get('id') } ) else: logger.error(f"Plan not found: {plan_id}") async def _handle_invoice_paid(self, event: Dict[str, Any]) -> None: """Handle subscription renewal and credit allocation""" invoice = event['data']['object'] subscription_id = invoice.get('subscription') if not subscription_id: logger.info(f"Invoice has no subscription: {invoice.get('id')}") return logger.info(f"Processing paid invoice for subscription: {subscription_id}") # Get subscription details from Stripe import stripe try: stripe_subscription = stripe.Subscription.retrieve(subscription_id) except Exception as e: logger.error(f"Failed to retrieve Stripe subscription: {str(e)}") raise # Find our internal subscription subscriptions = await self.subscription_manager.get_subscriptions_by_provider_id( provider='stripe', provider_subscription_id=subscription_id ) if not subscriptions: logger.warning(f"No matching subscription found for Stripe ID: {subscription_id}") return subscription = subscriptions[0] # Update subscription period period_start = datetime.fromtimestamp(stripe_subscription.current_period_start) period_end = datetime.fromtimestamp(stripe_subscription.current_period_end) logger.info(f"Updating subscription period: {period_start} to {period_end}") subscription = await self.subscription_manager.update_subscription_period( subscription_id=subscription.subscription_id, period_start=period_start, period_end=period_end ) # Allocate renewal credits plan = await self.subscription_manager.get_plan(subscription.plan_id) if plan: logger.info(f"Allocating renewal credits: {plan.credits_per_cycle} to {subscription.username}") await self.credit_integration.allocate_subscription_credits( username=subscription.username, amount=plan.credits_per_cycle, subscription_id=subscription.subscription_id, metadata={ 'event': 'subscription_renewed', 'plan_id': plan.plan_id, 'plan_name': plan.name, 'invoice_id': invoice.get('id'), 'period_start': period_start.isoformat(), 'period_end': period_end.isoformat() } ) else: logger.error(f"Plan not found for renewal: {subscription.plan_id}") async def _handle_subscription_updated(self, event: Dict[str, Any]) -> None: """Handle subscription updates (plan changes, etc.)""" stripe_subscription = event['data']['object'] subscription_id = stripe_subscription.get('id') logger.info(f"Processing subscription update: {subscription_id}") subscriptions = await self.subscription_manager.get_subscriptions_by_provider_id( provider='stripe', provider_subscription_id=subscription_id ) if not subscriptions: logger.warning(f"No matching subscription found for update: {subscription_id}") return subscription = subscriptions[0] # Update status status = stripe_subscription.get('status') cancel_at_period_end = stripe_subscription.get('cancel_at_period_end', False) period_end = datetime.fromtimestamp(stripe_subscription.current_period_end) logger.info(f"Updating subscription status: {status}, " f"cancel_at_period_end: {cancel_at_period_end}") subscription.status = status subscription.cancel_at_period_end = cancel_at_period_end subscription.current_period_end = period_end await self.subscription_manager.update_subscription_status( subscription_id=subscription.subscription_id, status=status ) async def _handle_subscription_deleted(self, event: Dict[str, Any]) -> None: """Handle subscription cancellation""" stripe_subscription = event['data']['object'] subscription_id = stripe_subscription.get('id') logger.info(f"Processing subscription deletion: {subscription_id}") subscriptions = await self.subscription_manager.get_subscriptions_by_provider_id( provider='stripe', provider_subscription_id=subscription_id ) if not subscriptions: logger.warning(f"No matching subscription found for deletion: {subscription_id}") return subscription = subscriptions[0] logger.info(f"Cancelling subscription: {subscription.subscription_id}") await self.subscription_manager.update_subscription_status( subscription_id=subscription.subscription_id, status='canceled' )