Source code for codegrade._api.user

"""The endpoints for user objects.

SPDX-License-Identifier: AGPL-3.0-only OR BSD-3-Clause-Clear
"""

from __future__ import annotations

import os
import typing as t

import cg_request_args as rqa
from cg_maybe import Maybe, Nothing
from cg_maybe.utils import maybe_from_nullable

from .. import paginated, parsers, utils

if t.TYPE_CHECKING or os.getenv("CG_EAGERIMPORT", False):
    from .. import client
    from ..models.course_coupon_usage import CourseCouponUsage
    from ..models.extended_user import ExtendedUser
    from ..models.login_user_data import LoginUserData
    from ..models.logout_response import LogoutResponse
    from ..models.logout_user_data import LogoutUserData
    from ..models.patch_user_data import PatchUserData
    from ..models.register_user_data import RegisterUserData
    from ..models.session_restriction_data import SessionRestrictionData
    from ..models.tenant_coupon_usage import TenantCouponUsage
    from ..models.transaction import Transaction
    from ..models.user import User
    from ..models.user_login_response import UserLoginResponse


_ClientT = t.TypeVar("_ClientT", bound="client._BaseClient")


[docs]class UserService(t.Generic[_ClientT]): __slots__ = ("__client",) def __init__(self, client: _ClientT) -> None: self.__client = client
[docs] def patch( self: UserService[client.AuthenticatedClient], json_body: PatchUserData, *, user_id: int, ) -> ExtendedUser: """Update the attributes of a user. :param json_body: The body of the request. See :class:`.PatchUserData` for information about the possible fields. You can provide this data as a :class:`.PatchUserData` or as a dictionary. :param user_id: The id of the user you want to change. Currently this can only be your own user id. :returns: The updated user. """ url = "/api/v1/users/{userId}".format(userId=user_id) params = None with self.__client as client: resp = client.http.patch( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.extended_user import ExtendedUser return parsers.JsonResponseParser( parsers.ParserFor.make(ExtendedUser) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def get_coupon_usages( self: UserService[client.AuthenticatedClient], *, user_id: int, page_size: int = 20, ) -> paginated.Response[t.Union[TenantCouponUsage, CourseCouponUsage]]: """Get all the coupons used for the specified user. :param user_id: The user to get the coupons used for. :param page_size: The size of a single page, maximum is 50. :returns: All coupons used for the given user. """ url = "/api/v1/users/{userId}/coupon_usages/".format(userId=user_id) params: t.Dict[str, str | int | bool] = { "page-size": page_size, } if t.TYPE_CHECKING: import httpx def do_request(next_token: str | None) -> httpx.Response: if next_token is None: params.pop("next-token", "") else: params["next-token"] = next_token with self.__client as client: resp = client.http.get(url=url, params=params) utils.log_warnings(resp) return resp def parse_response( resp: httpx.Response, ) -> t.Sequence[t.Union[TenantCouponUsage, CourseCouponUsage]]: if utils.response_code_matches(resp.status_code, 200): from ..models.course_coupon_usage import CourseCouponUsage from ..models.tenant_coupon_usage import TenantCouponUsage return parsers.JsonResponseParser( rqa.List( parsers.make_union( parsers.ParserFor.make(TenantCouponUsage), parsers.ParserFor.make(CourseCouponUsage), ) ) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), ) return paginated.Response(do_request, parse_response)
[docs] def get_transactions( self: UserService[client.AuthenticatedClient], *, user_id: int, page_size: int = 20, ) -> paginated.Response[Transaction]: """Get all transactions for the specified user. :param user_id: The user to get the transactions for. :param page_size: The size of a single page, maximum is 50. :returns: All transactions for the given user. """ url = "/api/v1/users/{userId}/transactions/".format(userId=user_id) params: t.Dict[str, str | int | bool] = { "page-size": page_size, } if t.TYPE_CHECKING: import httpx def do_request(next_token: str | None) -> httpx.Response: if next_token is None: params.pop("next-token", "") else: params["next-token"] = next_token with self.__client as client: resp = client.http.get(url=url, params=params) utils.log_warnings(resp) return resp def parse_response(resp: httpx.Response) -> t.Sequence[Transaction]: if utils.response_code_matches(resp.status_code, 200): from ..models.transaction import Transaction return parsers.JsonResponseParser( rqa.List(parsers.ParserFor.make(Transaction)) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), ) return paginated.Response(do_request, parse_response)
[docs] def get( self: UserService[client.AuthenticatedClient], ) -> ExtendedUser: """Get the info of the currently logged in user. :returns: A response containing the JSON serialized user """ url = "/api/v1/login" params = None with self.__client as client: resp = client.http.get(url=url, params=params) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.extended_user import ExtendedUser return parsers.JsonResponseParser( parsers.ParserFor.make(ExtendedUser) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def login( self, json_body: LoginUserData, ) -> UserLoginResponse: """Login using your username and password. :param json_body: The body of the request. See :class:`.LoginUserData` for information about the possible fields. You can provide this data as a :class:`.LoginUserData` or as a dictionary. :returns: A response containing the JSON serialized user """ url = "/api/v1/login" params = None with self.__client as client: resp = client.http.post( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.user_login_response import UserLoginResponse return parsers.JsonResponseParser( parsers.ParserFor.make(UserLoginResponse) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def register( self, json_body: RegisterUserData, ) -> UserLoginResponse: """Create a new user. :param json_body: The body of the request. See :class:`.RegisterUserData` for information about the possible fields. You can provide this data as a :class:`.RegisterUserData` or as a dictionary. :returns: The registered user and an `access_token` that can be used to perform requests as this new user. """ url = "/api/v1/user" params = None with self.__client as client: resp = client.http.post( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.user_login_response import UserLoginResponse return parsers.JsonResponseParser( parsers.ParserFor.make(UserLoginResponse) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def restrict( self: UserService[client.AuthenticatedClient], json_body: SessionRestrictionData, ) -> UserLoginResponse: """Revoke a given token. :param json_body: The body of the request. See :class:`.SessionRestrictionData` for information about the possible fields. You can provide this data as a :class:`.SessionRestrictionData` or as a dictionary. :returns: An new token that has the given restrictions added. """ url = "/api/v1/token/restrict" params = None with self.__client as client: resp = client.http.post( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.user_login_response import UserLoginResponse return parsers.JsonResponseParser( parsers.ParserFor.make(UserLoginResponse) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def logout( self, multipart_data: LogoutUserData, ) -> LogoutResponse: """Revoke a given token. :param multipart_data: The data that should form the body of the request. See :class:`.LogoutUserData` for information about the possible fields. :returns: An empty 200 response. """ url = "/api/v1/token/revoke" params = None data, files = utils.to_multipart(utils.to_dict(multipart_data)) with self.__client as client: resp = client.http.post( url=url, files=files, data=data, params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.logout_response import LogoutResponse return parsers.JsonResponseParser( parsers.ParserFor.make(LogoutResponse) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def search( self: UserService[client.AuthenticatedClient], *, q: str, exclude_course: Maybe[int] = Nothing, page_size: int = 20, ) -> paginated.Response[User]: """Search for a user by name and username. :param q: The string to search for, all SQL wildcard are escaped and spaces are replaced by wildcards. :param exclude_course: Exclude all users that are in the given course from the search results. You need the permission `can_list_course_users` on this course to use this parameter. :param page_size: The size of a single page, maximum is 50. :returns: The users that match the given query string. """ url = "/api/v1/users/" params: t.Dict[str, str | int | bool] = { "q": q, "page-size": page_size, } exclude_course_as_maybe = maybe_from_nullable(exclude_course) if exclude_course_as_maybe.is_just: params["exclude_course"] = exclude_course_as_maybe.value if t.TYPE_CHECKING: import httpx def do_request(next_token: str | None) -> httpx.Response: if next_token is None: params.pop("next-token", "") else: params["next-token"] = next_token with self.__client as client: resp = client.http.get(url=url, params=params) utils.log_warnings(resp) return resp def parse_response(resp: httpx.Response) -> t.Sequence[User]: if utils.response_code_matches(resp.status_code, 200): from ..models.user import UserParser return parsers.JsonResponseParser( rqa.List(UserParser) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), ) return paginated.Response(do_request, parse_response)