◐ Shell
clean mode source ↗

feat(client): replace basic auth with OAuth ROPC flow by nejch · Pull Request #2422 · python-gitlab/python-gitlab

Expand Up @@ -12,7 +12,7 @@ import gitlab.config import gitlab.const import gitlab.exceptions from gitlab import _backends, utils from gitlab import _backends, oauth, utils
REDIRECT_MSG = ( "python-gitlab detected a {status_code} ({reason!r}) redirection. You must update " Expand Down Expand Up @@ -41,8 +41,6 @@ class Gitlab: the value is a string, it is the path to a CA file used for certificate validation. timeout: Timeout to use for requests to the GitLab server. http_username: Username for HTTP authentication http_password: Password for HTTP authentication api_version: Gitlab API version to use (support for 4 only) pagination: Can be set to 'keyset' to use keyset pagination order_by: Set order_by globally Expand All @@ -51,6 +49,7 @@ class Gitlab: or 52x responses. Defaults to False. keep_base_url: keep user-provided base URL for pagination if it differs from response headers oauth_credentials: Password credentials for authenticating via OAuth ROPC flow
Keyword Args: requests.Session session: HTTP Requests Session Expand All @@ -64,8 +63,6 @@ def __init__( oauth_token: Optional[str] = None, job_token: Optional[str] = None, ssl_verify: Union[bool, str] = True, http_username: Optional[str] = None, http_password: Optional[str] = None, timeout: Optional[float] = None, api_version: str = "4", per_page: Optional[int] = None, Expand All @@ -74,6 +71,8 @@ def __init__( user_agent: str = gitlab.const.USER_AGENT, retry_transient_errors: bool = False, keep_base_url: bool = False, *, oauth_credentials: Optional[oauth.PasswordCredentials] = None, **kwargs: Any, ) -> None: self._api_version = str(api_version) Expand All @@ -92,11 +91,9 @@ def __init__( self.ssl_verify = ssl_verify
self.private_token = private_token self.http_username = http_username self.http_password = http_password self.oauth_token = oauth_token self.job_token = job_token self._set_auth_info() self.oauth_credentials = oauth_credentials
#: Create a session object for requests _backend: Type[_backends.DefaultBackend] = kwargs.pop( Expand All @@ -105,6 +102,7 @@ def __init__( self._backend = _backend(**kwargs) self.session = self._backend.client
self._set_auth_info() self.per_page = per_page self.pagination = pagination self.order_by = order_by Expand Down Expand Up @@ -271,8 +269,6 @@ def from_config( job_token=config.job_token, ssl_verify=config.ssl_verify, timeout=config.timeout, http_username=config.http_username, http_password=config.http_password, api_version=config.api_version, per_page=config.per_page, pagination=config.pagination, Expand Down Expand Up @@ -471,41 +467,51 @@ def set_license(self, license: str, **kwargs: Any) -> Dict[str, Any]: return result
def _set_auth_info(self) -> None: tokens = [ token for token in [self.private_token, self.oauth_token, self.job_token] if token auth_types = [ auth for auth in [ self.private_token, self.oauth_token, self.oauth_credentials, self.job_token, ] if auth ] if len(tokens) > 1: if len(auth_types) > 1: raise ValueError( "Only one of private_token, oauth_token or job_token should " "be defined" ) if (self.http_username and not self.http_password) or ( not self.http_username and self.http_password ): raise ValueError("Both http_username and http_password should be defined") if tokens and self.http_username: raise ValueError( "Only one of token authentications or http " "authentication should be defined" "Only one of private_token, oauth_token, oauth_credentials" "or job_token should be defined" )
self._auth: Optional[requests.auth.AuthBase] = None if self.private_token: self._auth = _backends.PrivateTokenAuth(self.private_token) return
if self.oauth_token: self._auth = _backends.OAuthTokenAuth(self.oauth_token) return
if self.oauth_credentials: post_data = { "grant_type": self.oauth_credentials.grant_type, "scope": self.oauth_credentials.scope, "username": self.oauth_credentials.username, "password": self.oauth_credentials.password, } response = self.http_post( f"{self._base_url}/oauth/token", post_data=post_data ) if isinstance(response, dict): self.oauth_token = response["access_token"] else: self.oauth_token = response.json()["access_token"] self._auth = self.oauth_credentials.basic_auth return
if self.job_token: self._auth = _backends.JobTokenAuth(self.job_token)
if self.http_username and self.http_password: self._auth = requests.auth.HTTPBasicAuth( self.http_username, self.http_password )
@staticmethod def enable_debug() -> None: import logging Expand Down