Source code for mindroot.coreplugins.admin.registry_settings_routes
fromfastapiimportAPIRouter,HTTPExceptionimportjsonimportosfromlib.route_decoratorsimportrequires_role# Create router with admin role requirementrouter=APIRouter(dependencies=[requires_role('admin')])# --- Registry Settings Routes ---
[docs]@router.get("/registry/settings")asyncdefget_registry_settings():"""Get registry settings including token status."""try:settings_file='data/registry_settings.json'settings={}ifos.path.exists(settings_file):withopen(settings_file,'r')asf:settings=json.load(f)# Don't return the actual token, just indicate if it's setreturn{"success":True,"data":{"registry_url":settings.get("registry_url","https://registry.mindroot.io"),"has_token":bool(settings.get("registry_token")),"token_source":"file"ifsettings.get("registry_token")else"env"ifos.getenv('REGISTRY_TOKEN')else"none"}}exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))
[docs]@router.post("/registry/settings")asyncdefupdate_registry_settings(settings_data:dict):"""Update registry settings."""try:settings_file='data/registry_settings.json'# Ensure data directory existsos.makedirs('data',exist_ok=True)# Load existing settingssettings={}ifos.path.exists(settings_file):withopen(settings_file,'r')asf:settings=json.load(f)# Update with new datasettings.update(settings_data)# Save updated settingswithopen(settings_file,'w')asf:json.dump(settings,f,indent=2)return{"success":True,"message":"Registry settings updated successfully.","data":{"registry_url":settings.get("registry_url","https://registry.mindroot.io"),"has_token":bool(settings.get("registry_token")),"token_source":"file"ifsettings.get("registry_token")else"env"ifos.getenv('REGISTRY_TOKEN')else"none"}}exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))
[docs]@router.delete("/registry/settings/token")asyncdefclear_registry_token():"""Clear the stored registry token."""try:settings_file='data/registry_settings.json'ifos.path.exists(settings_file):withopen(settings_file,'r')asf:settings=json.load(f)# Remove token if it existsif'registry_token'insettings:delsettings['registry_token']withopen(settings_file,'w')asf:json.dump(settings,f,indent=2)return{"success":True,"message":"Registry token cleared successfully."}exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))
[docs]@router.post("/registry/test-connection")asyncdeftest_registry_connection():"""Test connection to the registry."""try:importhttpxsettings_file='data/registry_settings.json'registry_url="https://registry.mindroot.io"ifos.path.exists(settings_file):withopen(settings_file,'r')asf:settings=json.load(f)registry_url=settings.get("registry_url",registry_url)# Test connection to registryasyncwithhttpx.AsyncClient(timeout=10.0)asclient:response=awaitclient.get(f"{registry_url}/stats")ifresponse.status_code==200:stats=response.json()return{"success":True,"message":"Successfully connected to registry.","data":{"registry_url":registry_url,"stats":stats}}else:return{"success":False,"message":f"Registry returned status code {response.status_code}","data":{"registry_url":registry_url,"status_code":response.status_code}}exceptExceptionase:return{"success":False,"message":f"Failed to connect to registry: {str(e)}","data":{"registry_url":registry_url,"error":str(e)}}