Source code for mindroot.coreplugins.subscriptions.storage

from pathlib import Path
import json
from datetime import datetime, date
from typing import List, Dict, Optional, Iterator
import os
from .models import SubscriptionPlan, UserSubscription, PlanFeature, PageTemplate

[docs] class SubscriptionStorage: def __init__(self, base_path: str): self.base_path = Path(base_path) self.plans_dir = self.base_path / 'plans' self.subscriptions_dir = self.base_path / 'subscriptions' self.features_dir = self.base_path / 'features' self.templates_dir = self.base_path / 'templates' # Create directories if they don't exist self.plans_dir.mkdir(parents=True, exist_ok=True) self.subscriptions_dir.mkdir(parents=True, exist_ok=True) self.features_dir.mkdir(parents=True, exist_ok=True) self.templates_dir.mkdir(parents=True, exist_ok=True) # Plan management methods
[docs] async def store_plan(self, plan: SubscriptionPlan) -> None: """Store a subscription plan""" plan_file = self.plans_dir / f"{plan.plan_id}.json" with open(plan_file, 'w') as f: json.dump(plan.to_dict(), f, indent=2)
[docs] async def get_plan(self, plan_id: str) -> Optional[SubscriptionPlan]: """Get a subscription plan by ID""" plan_file = self.plans_dir / f"{plan_id}.json" if not plan_file.exists(): return None try: with open(plan_file, 'r') as f: data = json.load(f) return SubscriptionPlan.from_dict(data) except (json.JSONDecodeError, KeyError): return None
[docs] async def get_all_plans(self, active_only: bool = True) -> List[SubscriptionPlan]: """Get all subscription plans""" plans = [] for plan_file in self.plans_dir.glob('*.json'): try: with open(plan_file, 'r') as f: data = json.load(f) plan = SubscriptionPlan.from_dict(data) if active_only and not plan.active: continue plans.append(plan) except (json.JSONDecodeError, KeyError): continue return sorted(plans, key=lambda p: p.price)
[docs] async def update_plan(self, plan: SubscriptionPlan) -> None: """Update a subscription plan""" await self.store_plan(plan)
# Subscription management methods
[docs] async def store_subscription(self, subscription: UserSubscription) -> None: """Store a user subscription""" user_dir = self.subscriptions_dir / subscription.username user_dir.mkdir(parents=True, exist_ok=True) subscription_file = user_dir / f"{subscription.subscription_id}.json" with open(subscription_file, 'w') as f: json.dump(subscription.to_dict(), f, indent=2)
[docs] async def get_subscription(self, subscription_id: str) -> Optional[UserSubscription]: """Get a subscription by ID""" # We need to search all user directories for user_dir in self.subscriptions_dir.iterdir(): if not user_dir.is_dir(): continue subscription_file = user_dir / f"{subscription_id}.json" if subscription_file.exists(): try: with open(subscription_file, 'r') as f: data = json.load(f) return UserSubscription.from_dict(data) except (json.JSONDecodeError, KeyError): return None return None
[docs] async def get_user_subscriptions(self, username: str) -> List[UserSubscription]: """Get all subscriptions for a user""" user_dir = self.subscriptions_dir / username if not user_dir.exists(): return [] subscriptions = [] for subscription_file in user_dir.glob('*.json'): try: with open(subscription_file, 'r') as f: data = json.load(f) subscription = UserSubscription.from_dict(data) subscriptions.append(subscription) except (json.JSONDecodeError, KeyError): continue return sorted(subscriptions, key=lambda s: s.created_at, reverse=True)
[docs] async def update_subscription(self, subscription: UserSubscription) -> None: """Update a user subscription""" subscription.updated_at = datetime.now() await self.store_subscription(subscription)
[docs] async def get_subscriptions_by_provider_id(self, provider: str, provider_subscription_id: str) -> List[UserSubscription]: """Get subscriptions by provider subscription ID""" result = [] # Search all user directories for user_dir in self.subscriptions_dir.iterdir(): if not user_dir.is_dir(): continue for subscription_file in user_dir.glob('*.json'): try: with open(subscription_file, 'r') as f: data = json.load(f) if (data.get('payment_provider') == provider and data.get('provider_subscription_id') == provider_subscription_id): subscription = UserSubscription.from_dict(data) result.append(subscription) except (json.JSONDecodeError, KeyError): continue return result
# Feature management methods
[docs] async def store_feature(self, feature: PlanFeature) -> None: """Store a plan feature""" feature_file = self.features_dir / f"{feature.feature_id}.json" with open(feature_file, 'w') as f: json.dump(feature.to_dict(), f, indent=2)
[docs] async def get_feature(self, feature_id: str) -> Optional[PlanFeature]: """Get a plan feature by ID""" feature_file = self.features_dir / f"{feature_id}.json" if not feature_file.exists(): return None try: with open(feature_file, 'r') as f: data = json.load(f) return PlanFeature.from_dict(data) except (json.JSONDecodeError, KeyError): return None
[docs] async def get_all_features(self, active_only: bool = True) -> List[PlanFeature]: """Get all plan features""" features = [] for feature_file in self.features_dir.glob('*.json'): try: with open(feature_file, 'r') as f: data = json.load(f) feature = PlanFeature.from_dict(data) if active_only and not feature.active: continue features.append(feature) except (json.JSONDecodeError, KeyError): continue return sorted(features, key=lambda f: f.display_order)
[docs] async def update_feature(self, feature: PlanFeature) -> None: """Update a plan feature""" await self.store_feature(feature)
# Template management methods
[docs] async def store_template(self, template: PageTemplate) -> None: """Store a page template""" template_file = self.templates_dir / f"{template.template_id}.json" with open(template_file, 'w') as f: json.dump(template.to_dict(), f, indent=2)
[docs] async def get_template(self, template_id: str) -> Optional[PageTemplate]: """Get a page template by ID""" template_file = self.templates_dir / f"{template_id}.json" if not template_file.exists(): return None try: with open(template_file, 'r') as f: data = json.load(f) return PageTemplate.from_dict(data) except (json.JSONDecodeError, KeyError): return None
[docs] async def get_all_templates(self) -> List[PageTemplate]: """Get all page templates""" templates = [] for template_file in self.templates_dir.glob('*.json'): try: with open(template_file, 'r') as f: data = json.load(f) template = PageTemplate.from_dict(data) templates.append(template) except (json.JSONDecodeError, KeyError): continue return templates
[docs] async def get_default_template(self) -> Optional[PageTemplate]: """Get the default page template""" templates = await self.get_all_templates() for template in templates: if template.is_default: return template return templates[0] if templates else None
[docs] async def set_default_template(self, template_id: str) -> bool: """Set a template as the default""" templates = await self.get_all_templates() # First, unset default on all templates for template in templates: if template.is_default: template.is_default = False await self.store_template(template) # Then set the new default template = await self.get_template(template_id) if template: template.is_default = True await self.store_template(template) return True return False