Source code for mindroot.coreplugins.mcp_.oauth_storage
"""OAuth token storage implementation for MCP servers."""fromtypingimportOptionalfromdatetimeimportdatetimeimportjsontry:frommcp.client.authimportTokenStoragefrommcp.shared.authimportOAuthToken,OAuthClientInformationFullexceptImportError:# Fallback if MCP not availableclassTokenStorage:passOAuthToken=NoneOAuthClientInformationFull=None
[docs]classMCPTokenStorage(TokenStorage):"""Token storage implementation for MCP OAuth flows. Stores tokens and client information in the MCP server configuration. """def__init__(self,server_name:str,mcp_manager):self.server_name=server_nameself.mcp_manager=mcp_manager
[docs]asyncdefget_tokens(self)->Optional[OAuthToken]:"""Get stored OAuth tokens for the server."""ifOAuthTokenisNone:returnNoneifself.server_namenotinself.mcp_manager.servers:returnNoneserver=self.mcp_manager.servers[self.server_name]ifnotserver.access_token:returnNone# Parse token expirationexpires_at=Noneifserver.token_expires_at:try:expires_at=datetime.fromisoformat(server.token_expires_at.replace('Z','+00:00'))exceptValueError:passreturnOAuthToken(access_token=server.access_token,refresh_token=server.refresh_token,expires_at=expires_at,scope=" ".join(server.scopes)ifserver.scopeselseNone)
[docs]asyncdefset_tokens(self,tokens:OAuthToken)->None:"""Store OAuth tokens for the server."""ifself.server_namenotinself.mcp_manager.servers:returnserver=self.mcp_manager.servers[self.server_name]# Update server with token informationserver.access_token=tokens.access_tokenserver.refresh_token=tokens.refresh_tokenifhasattr(tokens,'expires_at')andtokens.expires_at:server.token_expires_at=tokens.expires_at.isoformat()elifhasattr(tokens,'expires_in')andtokens.expires_in:# Calculate expires_at from expires_infromdatetimeimportdatetime,timedeltaexpires_at=datetime.now()+timedelta(seconds=tokens.expires_in)server.token_expires_at=expires_at.isoformat()else:server.token_expires_at=Noneiftokens.scope:server.scopes=tokens.scope.split(" ")# Save configurationself.mcp_manager.save_config()
[docs]asyncdefget_client_info(self)->Optional[OAuthClientInformationFull]:"""Get stored OAuth client information."""ifOAuthClientInformationFullisNone:returnNoneifself.server_namenotinself.mcp_manager.servers:returnNoneserver=self.mcp_manager.servers[self.server_name]ifnotserver.client_id:returnNone# Create client info from server configurationclient_info_data={"client_id":server.client_id,"client_name":f"MindRoot MCP Client - {server.name}","redirect_uris":[server.redirect_uri]ifserver.redirect_urielse[],"grant_types":["authorization_code","refresh_token"],"response_types":["code"],"scope":" ".join(server.scopes)ifserver.scopeselse"user"}ifserver.client_secret:client_info_data["client_secret"]=server.client_secretreturnOAuthClientInformationFull(**client_info_data)
[docs]asyncdefset_client_info(self,client_info:OAuthClientInformationFull)->None:"""Store OAuth client information."""ifself.server_namenotinself.mcp_manager.servers:returnserver=self.mcp_manager.servers[self.server_name]# Update server with client informationserver.client_id=client_info.client_idifhasattr(client_info,'client_secret')andclient_info.client_secret:server.client_secret=client_info.client_secretifclient_info.redirect_uris:server.redirect_uri=client_info.redirect_uris[0]ifclient_info.scope:server.scopes=client_info.scope.split(" ")# Save configurationself.mcp_manager.save_config()
[docs]defclear_tokens(self):"""Clear stored tokens for the server."""ifself.server_namenotinself.mcp_manager.servers:returnserver=self.mcp_manager.servers[self.server_name]server.access_token=Noneserver.refresh_token=Noneserver.token_expires_at=None# Save configurationself.mcp_manager.save_config()