from lib.providers.services import service
from .models import UserAuth, UserCreate, UserBase
from .email_service import send_verification_email, setup_verification
from .role_service import has_role, add_role, remove_role, get_user_roles
#from .admin_init import initialize_admin
import bcrypt
import json
import os
from datetime import datetime
from typing import Optional, List
USER_DATA_ROOT = "data/users"
[docs]
@service()
async def create_user(user_data: UserCreate, roles: List[str] = None, skip_verification: bool = False, context=None) -> UserBase:
"""Create new user directory and auth file"""
user_dir = os.path.join(USER_DATA_ROOT, user_data.username)
# Ensure user data root exists
os.makedirs(USER_DATA_ROOT, exist_ok=True)
# Check if user exists
if os.path.exists(user_dir):
raise ValueError("Username already exists")
# Create user directory
os.makedirs(user_dir)
# Handle verification
verification_token, verification_expires, email_verified = setup_verification()
if verification_token and not skip_verification:
await send_verification_email(user_data.email, verification_token)
# Setup roles (always include 'user' role)
if roles is None:
roles = ["user"]
else:
roles.append("user")
# Create auth data
now = datetime.utcnow().isoformat()
auth_data = UserAuth(
username=user_data.username,
email=user_data.email,
password_hash=bcrypt.hashpw(user_data.password.encode(), bcrypt.gensalt()).decode(),
created_at=now,
last_login=None,
email_verified=email_verified,
verification_token=verification_token,
verification_expires=verification_expires,
roles=roles
)
# Save auth data
with open(os.path.join(user_dir, "auth.json"), 'w') as f:
json.dump(auth_data.dict(), f, indent=2, default=str)
# Initialize empty settings and workspace
with open(os.path.join(user_dir, "settings.json"), 'w') as f:
json.dump({}, f)
with open(os.path.join(user_dir, "workspace.json"), 'w') as f:
json.dump({}, f)
# Return safe user data
return UserBase(**auth_data.dict())
[docs]
@service()
async def verify_user(username: str, password: str, context=None) -> bool:
"""Verify user credentials and update last login"""
auth_file = os.path.join(USER_DATA_ROOT, username, "auth.json")
if not os.path.exists(auth_file):
print("user not found")
print("path", auth_file)
print("working dir", os.getcwd())
return False
with open(auth_file, 'r') as f:
auth_data = UserAuth(**json.load(f))
if bcrypt.checkpw(password.encode(), auth_data.password_hash.encode()):
print("check pw passed")
# Update last login
auth_data.last_login = datetime.utcnow().isoformat()
with open(auth_file, 'w') as f:
json.dump(auth_data.dict(), f, indent=2, default=str)
return True
else:
print("checkpw failed,")
print("auth_data", auth_data)
print("password:", password.encode())
return False
[docs]
@service()
async def get_user_data(username: str, include_email=False, context=None) -> Optional[UserBase]:
"""Get user data excluding sensitive info"""
auth_file = os.path.join(USER_DATA_ROOT, username, "auth.json")
if not os.path.exists(auth_file):
return None
with open(auth_file, 'r') as f:
auth_data = UserAuth(**json.load(f))
if not include_email:
auth_data.email = None
return UserBase(**auth_data.dict())
[docs]
@service()
async def verify_email(token: str, context=None) -> bool:
"""Verify a user's email using their verification token"""
if not os.environ.get('REQUIRE_EMAIL_VERIFY', '').lower() == 'true':
return True
# Search through user directories for matching token
for username in os.listdir(USER_DATA_ROOT):
auth_file = os.path.join(USER_DATA_ROOT, username, "auth.json")
if os.path.exists(auth_file):
with open(auth_file, 'r') as f:
auth_data = UserAuth(**json.load(f))
if (auth_data.verification_token == token and
auth_data.verification_expires and
datetime.fromisoformat(auth_data.verification_expires) > datetime.utcnow()):
# Update user as verified
auth_data.email_verified = True
auth_data.verification_token = None
auth_data.verification_expires = None
with open(auth_file, 'w') as f:
json.dump(auth_data.dict(), f, indent=2, default=str)
return True
return False
[docs]
@service()
async def list_users( context=None ) -> list[str]:
"""List all usernames"""
if not os.path.exists(USER_DATA_ROOT):
return []
return [d for d in os.listdir(USER_DATA_ROOT)
if os.path.isdir(os.path.join(USER_DATA_ROOT, d))]