"""The endpoints for oauth_provider objects.
SPDX-License-Identifier: AGPL-3.0-only OR BSD-3-Clause-Clear
"""
import os
import typing as t
import cg_request_args as rqa
from cg_maybe import Maybe, Nothing
from cg_maybe.utils import maybe_from_nullable
from .. import parsers, utils
if t.TYPE_CHECKING or os.getenv("CG_EAGERIMPORT", False):
from .. import client
from ..models.oauth_provider import OAuthProvider
from ..models.oauth_token import OAuthToken
from ..models.setup_oauth_result import SetupOAuthResult
_ClientT = t.TypeVar("_ClientT", bound="client._BaseClient")
[docs]class OAuthProviderService(t.Generic[_ClientT]):
__slots__ = ("__client",)
def __init__(self, client: _ClientT) -> None:
self.__client = client
[docs] def get_all(
self,
*,
extra_parameters: t.Optional[
t.Mapping[str, t.Union[str, bool, int, float]]
] = None,
) -> "t.Sequence[OAuthProvider]":
"""Get all OAuth providers connected to this instance.
:param extra_parameters: The extra query parameters you might want to
add. By default no extra query parameters are added.
:returns: All connected providers.
"""
url = "/api/v1/oauth_providers/"
params = extra_parameters or {}
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.oauth_provider import OAuthProvider
return parsers.JsonResponseParser(
rqa.List(parsers.ParserFor.make(OAuthProvider))
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs] def get_oauth_token(
self: "OAuthProviderService[client.AuthenticatedClient]",
*,
provider_id: "str",
extra_parameters: t.Optional[
t.Mapping[str, t.Union[str, bool, int, float]]
] = None,
) -> "OAuthToken":
"""Get an OAuth token for the specified provider.
:param provider_id: The provider for which you want to get a token.
:param extra_parameters: The extra query parameters you might want to
add. By default no extra query parameters are added.
:returns: The token.
"""
url = "/api/v1/oauth_providers/{providerId}/token".format(
providerId=provider_id
)
params = extra_parameters or {}
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.oauth_token import OAuthToken
return parsers.JsonResponseParser(
parsers.ParserFor.make(OAuthToken)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs] def put_oauth_token(
self: "OAuthProviderService[client.AuthenticatedClient]",
*,
provider_id: "str",
extra_parameters: t.Optional[
t.Mapping[str, t.Union[str, bool, int, float]]
] = None,
) -> "t.Union[OAuthToken, SetupOAuthResult]":
"""Get or create an OAuth token for the specified provider.
:param provider_id: The provider for which you want to get/create a
token.
:param extra_parameters: The extra query parameters you might want to
add. By default no extra query parameters are added.
:returns: The token.
"""
url = "/api/v1/oauth_providers/{providerId}/token".format(
providerId=provider_id
)
params = extra_parameters or {}
with self.__client as client:
resp = client.http.put(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.oauth_token import OAuthToken
from ..models.setup_oauth_result import SetupOAuthResult
return parsers.JsonResponseParser(
parsers.make_union(
parsers.ParserFor.make(OAuthToken),
parsers.ParserFor.make(SetupOAuthResult),
)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs] def callback(
self,
*,
provider_id: "str",
state: Maybe["str"] = Nothing,
code: Maybe["str"] = Nothing,
error: Maybe["str"] = Nothing,
error_description: Maybe["str"] = Nothing,
extra_parameters: t.Optional[
t.Mapping[str, t.Union[str, bool, int, float]]
] = None,
) -> "bytes":
"""The method that is used by OAuth providers after they completed the
login flow.
For the precise meaning of the query parameters see RFC 6749.
:param provider_id: The id of the OAuthProvider that completed the
flow.
:param state: The state.
:param code: The code.
:param error: The error.
:param error_description: The description of the error.
:param extra_parameters: The extra query parameters you might want to
add. By default no extra query parameters are added.
:returns: A response that is not in any defined format.
"""
url = "/api/v1/oauth_providers/{providerId}/callback".format(
providerId=provider_id
)
params: t.Dict[str, t.Any] = {
**(extra_parameters or {}),
}
maybe_from_nullable(t.cast(t.Any, state)).if_just(
lambda val: params.__setitem__("state", val)
)
maybe_from_nullable(t.cast(t.Any, code)).if_just(
lambda val: params.__setitem__("code", val)
)
maybe_from_nullable(t.cast(t.Any, error)).if_just(
lambda val: params.__setitem__("error", val)
)
maybe_from_nullable(t.cast(t.Any, error_description)).if_just(
lambda val: params.__setitem__("error_description", val)
)
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
return parsers.ResponsePropertyParser("content", bytes).try_parse(
resp
)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)