crossauth_backend.oauth package

Submodules

crossauth_backend.oauth.client module

class crossauth_backend.oauth.client.IdTokenReturn[source]

Bases: TypedDict

error: str
error_description: str
id_payload: Mapping[str, Any]
class crossauth_backend.oauth.client.OAuthClient(auth_server_base_url: str, options: OAuthClientOptions)[source]

Bases: object

Base class for OAuth clients.

Flows supported are Authorization Code Flow with and without PKCE, Client Credentials, Refresh Token, Password and Password MFA. The latter is defined at [auth0.com](https://auth0.com/docs/secure/multi-factor-authentication/multi-factor-authentication-factors).

It also supports the OpenID Connect Authorization Code Flow, with and without PKCE.

async client_credentials_flow(scope: str | None = None) OAuthTokenResponse[source]

Start the client credentials flow

:param str|None scope:, which can be None

Returns:

an OAuth token endpoint response

async code_challenge_and_verifier()[source]
get_oidc_config()[source]
get_token_payload(token: str) Dict[str, Any][source]

Validates a token and, if valid, returns the payload

async id_token_authorized(id_token: str) Dict[str, Any] | None[source]

Validates a token using the token consumer.

Parameters:

(str) (id_token) – the token to validate

:returns the parsed JSON of the payload, or None if it is not valid.

async load_config(oidc_config: OpenIdConfiguration | None = None)[source]

Loads OpenID Connect configuration so that the client can determine the URLs it can call and the features the authorization server provides.

:param oidc_config if defined, loadsa the config from this object.

Otherwise, performs a fetch by appending /.well-known/openid-configuration to the auth_server_base_url.

Throws :class:

crossauth_backend.CrossauthError} with the following :attr: crossauth_backend.ErrorCode.Connection if data from the URL

could not be fetched or parsed.

async mfa_authenticators(mfa_token: str) OAuthMfaAuthenticatorsResponse[source]

Fields that canb be returned by the mfaAuthenticators function call See Auth0’s documentation for the password MFA flow.

:param str mfa_token:, The MFA token returned when the flow was initiated

Returns:

See OAuthMfaAuthenticatorsResponse

async mfa_oob_complete(mfa_token: str, oobCode: str, bindingCode: str, scope: str | None = None) OAuthTokenResponse[source]

Completes the Password MFA OTP flow.

Does not throw exceptions.

Parameters:
  • mfa_token (str) – the MFA token that was returned by the authorization server in the response from the Password Flow.

  • oob_code – the code entered by the user

Returns:

an OAuthTokenResponse object, which may contain an error instead of the response fields.

async mfa_oob_request(mfa_token: str, authenticator_id: str) OAuthMfaAuthenticatorsResponse[source]

This is part of the Auth0 Password MFA flow. Once the client has received a list of valid authenticators, if it wishes to initiate OOB (out of band) login, call this function

Does not throw exceptions.

Parameters:
  • mfa_token (str) – the MFA token that was returned by the authorization server in the response from the Password Flow.

  • authenticator_id (str) – the authenticator ID, as returned in the response

from the mfa_authenticators() request.

Returns:

an object with one or more of the following defined: - challenge_type as per the Auth0 MFA documentation - oob_code as per the Auth0 MFA documentation - binding_method as per the Auth0 MFA documentation - error as per Auth0 Password MFA documentation - error_description friendly error message

async mfa_otp_complete(mfa_token: str, otp: str, scope: str | None = None) OAuthTokenResponse[source]

Completes the Password MFA OTP flow.

Parameters:
  • mfa_token (str) – the MFA token that was returned by the authorization server in the response from the Password Flow.

  • otp (str) – the OTP entered by the user

Returns:

an object with some of the following fields, depending on

authorization server configuration and whether there were errors:

  • access_token an OAuth access token

  • refresh_token an OAuth access token

  • id_token an OpenID Connect ID token

  • expires_in number of seconds when the access token expires

  • scope the scopes the user authorized

  • token_type the OAuth token type

  • error as per Auth0 Password MFA documentation

  • error_description friendly error message

async mfa_otp_request(mfa_token: str, authenticator_id: str) OAuthMfaChallengeResponse[source]

This is part of the Auth0 Password MFA flow. Once the client has received a list of valid authenticators, if it wishes to initiate OTP, call this function

Does not throw exceptions.

Parameters:
  • mfa_token (str) – the MFA token that was returned by the authorization server in the response from the Password Flow.

  • authenticator_id (str) – the authenticator ID, as returned in the response

from the mfaAuthenticators() request.

async password_flow(username: str, password: str, scope: str | None = None) OAuthTokenResponse[source]

Start the password flow

:param str username:, user’s username :param str password:, user’s plaintext password :param str|None scope:, which can be None

Returns:

an OAuth token endpoint response

async poll_device_code_flow(device_code: str) OAuthDeviceResponse[source]

Polls the device endpoint to check if the device code flow has been authorized by the user.

Parameters:

device_code (str) – the device code to poll

Returns:

See OAuthDeviceResponse

abstractmethod random_value(length: int) str[source]

Produce a random Base64-url-encoded str, whose length before base64-url-encoding is the given length, @param length the length of the random array before base64-url-encoding. @returns the random value as a Base64-url-encoded srting

async redirect_endpoint(code: str | None = None, state: str | None = None, code_verifier: str | None = None, error: str | None = None, error_description: str | None = None) OAuthTokenResponse[source]

For calling in a Redirect Uri endpoint

Parameters:

code (str|None) – the authorization code

:param str|None the state, if one is used by the authorization server :param str|None any error: error message returned by the authorization server. It is passed through in the returned value :param str|None any error_description: error description returned by the authorization server. It is passed through in the returned value

Returns:

an OAuth token endpoint response

async refresh_token_flow(refresh_token: str) OAuthTokenResponse[source]

Starts the refresh token flow

Parameters:

refresh_token (str) – the refresh token to exchange

Returns:

a :class:`OAuthTokenResponse? response

abstractmethod async sha256(plaintext: str) str[source]

SHA256 and Base64-url-encodes the given test @param plaintext the text to encode @returns the SHA256 hash, Base64-url-encode

async start_authorization_code_flow(state: str, scope: str | None = None, code_challenge: str | None = None, pkce: bool = False)[source]

Initiates the authorization code flow

:param str|None scope:, which can be None :param bool pkce: if True, start the flow with PKCE (for public clients). Default False

async start_device_code_flow(url: str, scope: str | None = None) OAuthDeviceAuthorizationResponse[source]

Starts the Device Code Flow on the primary device (the one wanting an access token) :param str url: The URl for the device_authorization endpoint, as it is not defined in the OIDC configuration :param str|None scope: optional scope to request authorization for

Returns:

See OAuthDeviceAuthorizationResponse

async user_info_endpoint(access_token: str) Mapping[str, Any][source]
async validate_id_token(token: str) Dict[str, Any] | None[source]

Validates an OpenID ID token, returning None if it is invalid.

Does not raise exceptions.

Parameters:

token – the token to validate. To be valid, the signature must be valid and the type claim in the payload must be set to id.

:returns

the parsed payload or None if the token is invalid.

class crossauth_backend.oauth.client.OAuthClientOptions[source]

Bases: OAuthTokenConsumerOptions

Options for :class: OAuthClientBase

audience: str
auth_server_base_url: str
client_id: str

Client ID for this client

client_secret: str

Client secret for this client (can be undefined for no secret)

clock_tolerance: int
code_challenge_method: Literal['plain', 'S256']

Type of code challenge for PKCE

device_authorization_url: str

URL to call for the device_authorization endpoint, relative to the auth_server_base_url.

Default device_authorization

jwt_key_type: str
jwt_public_key: str
jwt_public_key_file: str
jwt_secret_key: str
jwt_secret_key_file: str
key_storage: KeyStorage | None
oauth_authorize_redirect: str | None

In the special case where you are running this in Docker on a private machine, the client cannot redirect to the authorization endpoint given in the OIDC configuration. You will typically set the auth_server_base_url to the name of the docker host in this case, and set oauth_authorize_redirect to localhost. Default None

oauth_post_type: Literal['json', 'form']

If set to JSON, make calls to the token endpoint as JSON, otherwise as x-www-form-urlencoded.

oauth_use_user_info_endpoint: bool

If your authorization server only returns certain claims in the userinfo endpoint, rather than in the id token, set this to true

oidc_config: OpenIdConfiguration | Dict[str, Any] | None
persist_access_token: bool
redirect_uri: str

Redirect URI to send in authorize requests

state_length: int

Length of random state variable for passing to authorize endpoint (before bsae64-url-encoding)

verifier_length: int

Length of random code verifier to generate (before bsae64-url-encoding)

class crossauth_backend.oauth.client.OAuthDeviceAuthorizationResponse[source]

Bases: TypedDict

These are the fields that can be returned in the device_authorization device code flow endpoint.

device_code: str
error: str
error_description: str
expires_in: str
interval: str
user_code: str
verification_uri: str
verification_uri_complete: str
class crossauth_backend.oauth.client.OAuthDeviceResponse[source]

Bases: TypedDict

These are the fields that can be returned in the device device code flow endpoint.

client_id: str
error: str
error_description: str
scope: str
scope_authorization_needed: bool
class crossauth_backend.oauth.client.OAuthFlows[source]

Bases: object

Crossauth allows you to define which flows are valid for a given client.

All = 'all'

All flows are allowed

AuthorizationCode = 'authorizationCode'

OAuth authorization code flow (without PKCE)

AuthorizationCodeWithPKCE = 'authorizationCodeWithPKCE'

OAuth authorization code flow with PKCE

ClientCredentials = 'clientCredentials'

Auth client credentials flow

DeviceCode = 'device_code'

OAuth device code flow

OidcAuthorizationCode = 'oidcAuthorizationCode'

The OpenID Connect authorization code flow, with or without PKCE

Password = 'password'

OAuth password flow

PasswordMfa = 'passwordMfa'

The Auth0 password MFA extension to the password flow

RefreshToken = 'refresh_token'

OAuth refresh token flow

static all_flows() List[str][source]

Returns a lsit of all possible OAuth flows

static are_valid_flows(flows: List[str]) bool[source]

Returns true only if all given strs are valid flows :param List[str] flows: the flows to check

Returns:

true or false.

flow_name = {'authorizationCode': 'Authorization Code', 'authorizationCodeWithPKCE': 'Authorization Code with PKCE', 'clientCredentials': 'Client Credentials', 'device_code': 'Device Code', 'oidcAuthorizationCode': 'OIDC Authorization Code', 'password': 'Password', 'passwordMfa': 'Password MFA', 'refresh_token': 'Refresh Token'}

A user friendly name for the given flow ID

static flow_names(flows: List[str]) Dict[str, str][source]

Returns a user-friendly name for the given flow strs.

The value returned is the one in flow_name. :param List[str] flows: the flows to return the names of

Returns:

a dictionary of strs

static grant_type(oauthFlow: str) List[str] | None[source]

Returns the OAuth grant types that are valid for a given flow, or None if it is not a valid flow. :param str oauthFlow: the flow to get the grant type for.

Returns:

a list of grant type strs or None

static is_valid_flow(flow: str) bool[source]

Returns true if the given str is a valid flow name. :param str flow: the flow to check

Returns:

true or false.

class crossauth_backend.oauth.client.OAuthMfaAuthenticator[source]

Bases: TypedDict

active: bool
authenticator_type: str
error: str
error_description: str
id: str
name: str
oob_channel: str
class crossauth_backend.oauth.client.OAuthMfaAuthenticatorsOrTokenResponse[source]

Bases: OAuthMfaAuthenticatorsResponse, OAuthTokenResponse

access_token: str
authenticators: List[OAuthMfaAuthenticator]
binding_method: str
challenge_type: str
error: str
error_description: str
expires_in: int
id_payload: Mapping[str, Any]
id_token: str
mfa_token: str
name: str
oob_channel: str
oob_code: str
refresh_token: str
scope: str
token_type: str
class crossauth_backend.oauth.client.OAuthMfaAuthenticatorsResponse[source]

Bases: TypedDict

authenticators: List[OAuthMfaAuthenticator]
error: str
error_description: str
class crossauth_backend.oauth.client.OAuthMfaChallengeResponse[source]

Bases: TypedDict

binding_method: str
challenge_type: str
error: str
error_description: str
oob_code: str
class crossauth_backend.oauth.client.OAuthTokenResponse[source]

Bases: TypedDict

These are the fields that can be returned in the JSON from an OAuth call.

access_token: str
binding_method: str
challenge_type: str
error: str
error_description: str
expires_in: int
id_payload: Mapping[str, Any]
id_token: str
mfa_token: str
name: str
oob_channel: str
oob_code: str
refresh_token: str
scope: str
token_type: str

crossauth_backend.oauth.clientmanager module

class crossauth_backend.oauth.clientmanager.OAuthClientManager(options: OAuthClientManagerOptions)[source]

Bases: object

async create_client(client_name: str, redirect_uri: List[str], valid_flow: List[str] | None = None, confidential: bool = True, userid: str | int | None = None) OAuthClient[source]
static random_client_id() str[source]
static random_client_secret() str[source]
async update_client(client_id: str, client: OAuthClient, reset_secret: bool = False) UpdateClientReturn[source]
static validate_uri(uri: str)[source]
class crossauth_backend.oauth.clientmanager.OAuthClientManagerOptions[source]

Bases: TypedDict

Options for OAuthClientManager

client_storage: Required[OAuthClientStorage]

Database for storage clients

oauth_pbkdf2digest: str

PBKDF2 HMAC for hashing client secret

oauth_pbkdf2iterations: int

PBKDF2 iterations for hashing client secret

oauth_pbkdf2key_length: int

PBKDF2 key length for hashing client secret

class crossauth_backend.oauth.clientmanager.UpdateClientReturn(client, new_secret)[source]

Bases: NamedTuple

client: OAuthClient

Alias for field number 0

new_secret: bool

Alias for field number 1

crossauth_backend.oauth.resserver module

class crossauth_backend.oauth.resserver.OAuthResourceServer(token_consumers: list[OAuthTokenConsumer], options: OAuthResourceServerOptions = {})[source]

Bases: object

async access_token_authorized(access_token: str) Dict[str, Any] | None[source]
property token_consumers
class crossauth_backend.oauth.resserver.OAuthResourceServerOptions[source]

Bases: TypedDict

crossauth_backend.oauth.tokenconsumer module

class crossauth_backend.oauth.tokenconsumer.Keys[source]

Bases: TypedDict

class crossauth_backend.oauth.tokenconsumer.OAuthTokenConsumer(audience: str, options: OAuthTokenConsumerOptions = {})[source]

Bases: object

This abstract class is for validating OAuth JWTs.

property audience
property auth_server_base_url
static decode_header(token: str)[source]
hash(plaintext: str) str[source]
property keys
async load_config(oidc_config: Dict[str, Any] | None = None)[source]

Loads OpenID Connect configuration, or fetches it from the authorization server (using the well-known enpoint appended to authServerBaseUrl ) :param Dict[str, Any]|None oidcConfig: the configuration, or undefined to load it from

the authorization server

:raises common!CrossauthError: object with ErrorCode of
  • Connection if the fetch to the authorization server failed.

async load_jwks(jwks: Keys | None = None)[source]

Loads the JWT signature validation keys, or fetches them from the authorization server (using the URL in the OIDC configuration). :param Dict[str, Any]|None jwks: the keys to load, or undefined to fetch them from

the authorization server.

:raises crossauth_backend.CrossauthError: object with ErrorCode of
  • Connection if the fetch to the authorization server failed, the OIDC configuration wasn’t set or the keys could not be parsed.

async load_keys()[source]

The RSA public keys or symmetric keys for the authorization server, either passed to the constructor or fetched from the authorization server.

property oidc_config
async token_authorized(token: str, tokenType: Literal['access', 'refresh', 'id']) Dict[str, Any] | None[source]

If the given token is valid, the payload is returned. Otherwise undefined is returned.

The signature must be valid, the expiry must not have passed and, if tokenType is defined,. the type claim in the payload must match it.

Doesn’t throw exceptions.

Parameters:
  • token (str) – The token to validate

  • token_type (Literal["access", "refresh", "id"]) – If defined, the type claim in the payload must match this value

class crossauth_backend.oauth.tokenconsumer.OAuthTokenConsumerOptions[source]

Bases: TypedDict

Options that can be passed to: OAuthTokenConsumerBase.

audience: str

The aud claim needs to match self value. No default (required)

auth_server_base_url: str

The value to expect in the iss claim. If the iss does not match self, the token is rejected. No default (required)

clock_tolerance: int

Number of seconds tolerance when checking expiration. Default 10

jwt_key_type: str

Secret key if using a symmetric cipher for signing the JWT. Either this or jwt_secret_key_file is required when using self kind of cipher

jwt_public_key: str

The public key if using a public key cipher for signing the JWT. Either this or jwt_public_key_file is required when using self kind of cipher. privateKey or privateKeyFile is also required.

jwt_public_key_file: str

Filename for the public key if using a public key cipher for signing the JWT. Either self or jwt_public_key is required when using self kind of cipher. privateKey or privateKeyFile is also required.

jwt_secret_key: str

Secret key if using a symmetric cipher for signing the JWT. Either this or jwt_secret_key_file is required when using self kind of cipher

jwt_secret_key_file: str

Filename with secret key if using a symmetric cipher for signing the JWT. Either self or jwt_secret_key is required when using self kind of cipher

key_storage: KeyStorage | None

If persisting tokens, you need to provide a storage to persist them to

oidc_config: OpenIdConfiguration | Dict[str, Any] | None

For initializing the token consumer with a static OpenID Connect configuration.

persist_access_token: bool

Whether to persist access tokens in key storage. Default false.

If you set self to True, you must also set key_storage.

crossauth_backend.oauth.wellknown module

class crossauth_backend.oauth.wellknown.AuthorizeQueryType[source]

Bases: TypedDict

client_id: str
code_challenge: str
code_challenge_method: str
redirect_uri: str
response_type: Required[str]
scope: str
state: str
user: User
crossauth_backend.oauth.wellknown.DEFAULT_OIDCCONFIG: OpenIdConfiguration = {'authorization_endpoint': '', 'claim_types_supported': ['normal'], 'claims_parameter_supported': False, 'grant_types_supported': ['authorization_code', 'implicit'], 'id_token_signing_alg_values_supported': [], 'issuer': '', 'jwks_uri': '', 'request_parameter_supported': False, 'request_uri_parameter_supported': True, 'require_request_uri_registration': False, 'response_modes_supported': ['query', 'fragment'], 'response_types_supported': [], 'subject_types_supported': [], 'token_endpoint': ''}

This is the detault configuration for :class: OAuthAuthorizationServer.

class crossauth_backend.oauth.wellknown.Jwks[source]

Bases: TypedDict

class crossauth_backend.oauth.wellknown.OpenIdConfiguration[source]

Bases: TypedDict

This class encapsulate the data returned by the oidc-configuration well-known endpoint. For further details, see the OpenID Connect specification.

acr_values_supported: NotRequired[list[str]]
authorization_endpoint: str
check_session_iframe: NotRequired[str]
claim_types_supported: NotRequired[list[ClaimType]]
claims_locales_supported: NotRequired[list[str]]
claims_parameter_supported: NotRequired[bool]
claims_supported: NotRequired[list[str]]
display_values_supported: NotRequired[list[str]]
end_session_endpoint: NotRequired[str]
grant_types_supported: list[GrantType]
id_token_encryption_alg_values_supported: NotRequired[list[str]]
id_token_encryption_enc_values_supported: NotRequired[list[str]]
id_token_signing_alg_values_supported: list[str]
issuer: str
jwks_uri: str
op_policy_uri: NotRequired[str]
op_tos_uri: NotRequired[str]
registration_endpoint: NotRequired[str]
request_object_encryption_alg_values_supported: NotRequired[list[str]]
request_object_encryption_enc_values_supported: NotRequired[list[str]]
request_object_signing_alg_values_supported: NotRequired[list[str]]
request_parameter_supported: NotRequired[bool]
request_uri_parameter_supported: NotRequired[bool]
require_request_uri_registration: NotRequired[bool]
response_modes_supported: list[ResponseMode]
response_types_supported: list[str]
scopes_supported: NotRequired[list[str]]
service_documentation: NotRequired[str]
subject_types_supported: list[SubjectType]
token_endpoint: str
token_endpoint_auth_methods_supported: NotRequired[list[TokenEndpointAuthMethod]]
token_endpoint_auth_signing_alg_values_supported: NotRequired[list[str]]
ui_locales_supported: NotRequired[list[str]]
userinfo_encryption_alg_values_supported: NotRequired[list[str]]
userinfo_encryption_enc_values_supported: NotRequired[list[str]]
userinfo_endpoint: NotRequired[str]
userinfo_signing_alg_values_supported: NotRequired[list[str]]
class crossauth_backend.oauth.wellknown.TokenBodyType[source]

Bases: TypedDict

binding_code: str
client_id: Required[str]
client_secret: str
code: str
code_verifier: str
device_code: str
grant_type: Required[str]
mfa_token: str
oobCode: str
otp: str
password: str
refresh_token: str
scope: str
username: str

Module contents