[docs]@service()asyncdefhas_role(username:str,role:str,user_data_root:str)->bool:"""Check if user has specified role"""auth_file=os.path.join(user_data_root,username,"auth.json")ifnotos.path.exists(auth_file):returnFalsewithopen(auth_file,'r')asf:try:auth_data=UserAuth(**json.load(f))returnroleinauth_data.rolesexcept:returnFalse
[docs]@service()asyncdefadd_role(username:str,role:str,user_data_root:str)->bool:"""Add a role to a user. Should be called only from admin context."""auth_file=os.path.join(user_data_root,username,"auth.json")ifnotos.path.exists(auth_file):returnFalsewithopen(auth_file,'r')asf:auth_data=UserAuth(**json.load(f))# Add the role if it doesn't existifrolenotinauth_data.roles:auth_data.roles.add(role)withopen(auth_file,'w')asf:json.dump(auth_data.dict(),f,indent=2,default=str)returnTrue
[docs]@service()asyncdefremove_role(username:str,role:str,user_data_root:str)->bool:"""Remove a role from a user. Should be called only from admin context."""ifrole=="user":raiseValueError("Cannot remove 'user' role")auth_file=os.path.join(user_data_root,username,"auth.json")ifnotos.path.exists(auth_file):returnFalsewithopen(auth_file,'r')asf:auth_data=UserAuth(**json.load(f))# Remove the role if it existsifroleinauth_data.roles:auth_data.roles.remove(role)withopen(auth_file,'w')asf:json.dump(auth_data.dict(),f,indent=2,default=str)returnTrue
[docs]@service()asyncdefget_user_roles(username:str,user_data_root:str)->List[str]:"""Get all roles for a user"""auth_file=os.path.join(user_data_root,username,"auth.json")ifnotos.path.exists(auth_file):returnset()withopen(auth_file,'r')asf:try:auth_data=UserAuth(**json.load(f))returnauth_data.rolesexcept:return["user"]