Source code for mindroot.coreplugins.user_service.mod

from lib.providers.services import service
from .models import UserAuth, UserCreate, UserBase
from .email_service import send_verification_email, setup_verification
from .role_service import has_role, add_role, remove_role, get_user_roles
#from .admin_init import initialize_admin
import bcrypt
import json
import os
from datetime import datetime
from typing import Optional, List 

USER_DATA_ROOT = "data/users"


[docs] @service() async def create_user(user_data: UserCreate, roles: List[str] = None, skip_verification: bool = False, context=None) -> UserBase: """Create new user directory and auth file""" user_dir = os.path.join(USER_DATA_ROOT, user_data.username) # Ensure user data root exists os.makedirs(USER_DATA_ROOT, exist_ok=True) # Check if user exists if os.path.exists(user_dir): raise ValueError("Username already exists") # Create user directory os.makedirs(user_dir) # Handle verification verification_token, verification_expires, email_verified = setup_verification() if verification_token and not skip_verification: await send_verification_email(user_data.email, verification_token) # Setup roles (always include 'user' role) if roles is None: roles = ["user"] else: roles.append("user") # Create auth data now = datetime.utcnow().isoformat() auth_data = UserAuth( username=user_data.username, email=user_data.email, password_hash=bcrypt.hashpw(user_data.password.encode(), bcrypt.gensalt()).decode(), created_at=now, last_login=None, email_verified=email_verified, verification_token=verification_token, verification_expires=verification_expires, roles=roles ) # Save auth data with open(os.path.join(user_dir, "auth.json"), 'w') as f: json.dump(auth_data.dict(), f, indent=2, default=str) # Initialize empty settings and workspace with open(os.path.join(user_dir, "settings.json"), 'w') as f: json.dump({}, f) with open(os.path.join(user_dir, "workspace.json"), 'w') as f: json.dump({}, f) # Return safe user data return UserBase(**auth_data.dict())
[docs] @service() async def verify_user(username: str, password: str, context=None) -> bool: """Verify user credentials and update last login""" auth_file = os.path.join(USER_DATA_ROOT, username, "auth.json") if not os.path.exists(auth_file): print("user not found") print("path", auth_file) print("working dir", os.getcwd()) return False with open(auth_file, 'r') as f: auth_data = UserAuth(**json.load(f)) if bcrypt.checkpw(password.encode(), auth_data.password_hash.encode()): print("check pw passed") # Update last login auth_data.last_login = datetime.utcnow().isoformat() with open(auth_file, 'w') as f: json.dump(auth_data.dict(), f, indent=2, default=str) return True else: print("checkpw failed,") print("auth_data", auth_data) print("password:", password.encode()) return False
[docs] @service() async def get_user_data(username: str, include_email=False, context=None) -> Optional[UserBase]: """Get user data excluding sensitive info""" auth_file = os.path.join(USER_DATA_ROOT, username, "auth.json") if not os.path.exists(auth_file): return None with open(auth_file, 'r') as f: auth_data = UserAuth(**json.load(f)) if not include_email: auth_data.email = None return UserBase(**auth_data.dict())
[docs] @service() async def verify_email(token: str, context=None) -> bool: """Verify a user's email using their verification token""" if not os.environ.get('REQUIRE_EMAIL_VERIFY', '').lower() == 'true': return True # Search through user directories for matching token for username in os.listdir(USER_DATA_ROOT): auth_file = os.path.join(USER_DATA_ROOT, username, "auth.json") if os.path.exists(auth_file): with open(auth_file, 'r') as f: auth_data = UserAuth(**json.load(f)) if (auth_data.verification_token == token and auth_data.verification_expires and datetime.fromisoformat(auth_data.verification_expires) > datetime.utcnow()): # Update user as verified auth_data.email_verified = True auth_data.verification_token = None auth_data.verification_expires = None with open(auth_file, 'w') as f: json.dump(auth_data.dict(), f, indent=2, default=str) return True return False
[docs] @service() async def list_users( context=None ) -> list[str]: """List all usernames""" if not os.path.exists(USER_DATA_ROOT): return [] return [d for d in os.listdir(USER_DATA_ROOT) if os.path.isdir(os.path.join(USER_DATA_ROOT, d))]