"""The endpoints for oauth_token objects.
SPDX-License-Identifier: AGPL-3.0-only OR BSD-3-Clause-Clear
"""
from __future__ import annotations
import os
import typing as t
import cg_request_args as rqa
from cg_maybe import Maybe, Nothing
from cg_maybe.utils import maybe_from_nullable
from .. import paginated, parsers, utils
if t.TYPE_CHECKING or os.getenv("CG_EAGERIMPORT", False):
from .. import client
from ..models.oauth_token import OAuthToken
from ..models.post_oauth_token_data import PostOAuthTokenData
from ..models.setup_oauth_result import SetupOAuthResult
_ClientT = t.TypeVar("_ClientT", bound="client._BaseClient")
[docs]class OAuthTokenService(t.Generic[_ClientT]):
__slots__ = ("__client",)
def __init__(self, client: _ClientT) -> None:
self.__client = client
[docs] def get_all(
self: OAuthTokenService[client.AuthenticatedClient],
*,
page_size: int = 20,
) -> paginated.Response[OAuthToken]:
"""Get all OAuth tokens of the current user.
:param page_size: The size of a single page, maximum is 50.
:returns: The token.
"""
url = "/api/v1/oauth_tokens/"
params: t.Dict[str, str | int | bool] = {
"page-size": page_size,
}
if t.TYPE_CHECKING:
import httpx
def do_request(next_token: str | None) -> httpx.Response:
if next_token is None:
params.pop("next-token", "")
else:
params["next-token"] = next_token
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
return resp
def parse_response(resp: httpx.Response) -> t.Sequence[OAuthToken]:
if utils.response_code_matches(resp.status_code, 200):
from ..models.oauth_token import OAuthToken
return parsers.JsonResponseParser(
rqa.List(parsers.ParserFor.make(OAuthToken))
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
return paginated.Response(do_request, parse_response)
[docs] def post(
self: OAuthTokenService[client.AuthenticatedClient],
json_body: PostOAuthTokenData,
) -> SetupOAuthResult:
"""Create an OAuth token for the specified provider.
:param json_body: The body of the request. See
:class:`.PostOAuthTokenData` for information about the possible
fields. You can provide this data as a :class:`.PostOAuthTokenData`
or as a dictionary.
:returns: The OAuth callback URL.
"""
url = "/api/v1/oauth_tokens/"
params = None
with self.__client as client:
resp = client.http.post(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.setup_oauth_result import SetupOAuthResult
return parsers.JsonResponseParser(
parsers.ParserFor.make(SetupOAuthResult)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs] def get(
self: OAuthTokenService[client.AuthenticatedClient],
*,
token_id: str,
is_temp_id: bool = False,
) -> OAuthToken:
"""Get an OAuth token by id.
:param token_id: The id of the token to get.
:param is_temp_id: Whether the id in the request is a `temp_id` or an
`id`.
:returns: The requested token.
"""
url = "/api/v1/oauth_tokens/{tokenId}".format(tokenId=token_id)
params: t.Dict[str, str | int | bool] = {
"is_temp_id": is_temp_id,
}
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.oauth_token import OAuthToken
return parsers.JsonResponseParser(
parsers.ParserFor.make(OAuthToken)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs] def delete(
self: OAuthTokenService[client.AuthenticatedClient],
*,
token_id: str,
) -> None:
"""Delete an OAuth token.
:param token_id: The id of the token you want to delete.
:returns: Nothing.
"""
url = "/api/v1/oauth_tokens/{tokenId}".format(tokenId=token_id)
params = None
with self.__client as client:
resp = client.http.delete(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 204):
return parsers.ConstantlyParser(None).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)