crossauth_backend package

Subpackages

Submodules

crossauth_backend.apikey module

class crossauth_backend.apikey.ApiKeyManager(key_storage: KeyStorage, options: ApiKeyManagerOptions = {})[source]

Bases: object

Manager API keys.

The caller must pass a {@link KeyStorage} object. This must provide a string field called name in the returned {@link @crossauth/common!Key} objects (in other words, the databsae table behind it must have a name field).

Api keys have three forms in their value. The {@link @crossauth/common!Key} object’s value field is a base64-url-encoded random number. When the key is in a header, it is expected to be folled by a dot and a signature to protect against injection attacks. When stored in the key storage, only the unsigned part is used (before the dot), it is hashed and preceded by prefix. The signature part is dropped for storage economy. This does not compromise security so long as the signature is always validated before comparing with the database.

async create_key(name: str, userid: str | int | None = None, data: Dict[str, Any] | None = None, expiry: int | None = None, extra_fields: Dict[str, Any] = {}) KeyReturn[source]

Creates a new random key and returns it, unsigned. It is also persisted in the key storage as a hash of the unsigned part prefixed with prefix().

Parameters:
  • name

    a name for the key. This is for the user to refer to it

    (eg, for showing the keys the user has created or deleting a key)

    userid: id for the user who owns this key, which may be None

    for keys not associated with a user

  • data – any application-specific extra data. If it contains an array called scope and this array contains editUser, the api key can be used for user manipulation functions (eg change password)

  • expiry – expiry as a number of seconds from now

  • extra_fields – any extra fields to save in key storage, and pass back in the Key object.

Returns:

  • key: the new key as an ApiKey object

  • token: the token for the Authorization header (with the signature appended.)

Return type:

Dictionary containing

async get_key(signed_value: str) NamedKey[source]

Get API key from signed value.

Parameters:

signedValue – The signed API key value

:return Dict containing the API key data

Raises:

CrossauthError – If the key is invalid

static hashSignedApiKeyValue(unsigned_value: str) str[source]

Returns the hash of the bearer value from the Authorization header.

This has little practical value other than for reporting. Unhashed tokens are never reported. @param unsignedValue the part of the Authorization header after “Berear “. @returns a hash of the value (without the prefix).

async validate_token(header_value: str) ApiKey[source]

Returns the ApiKey if the token is valid, throws an exception otherwise.

Parameters:

headerValue – the token from the Authorization header (after the “Bearer “).

:return The ApiKey object

Raises:

CrossauthError – with code InvalidKey

class crossauth_backend.apikey.ApiKeyManagerOptions[source]

Bases: TypedDict

Configuration options for TokenEmailer

auth_scheme: str

The token type in the Authorization header. Defaults to “ApiKey”

key_length: int

Length in bytes of the randomly-created key (before Base64 encoding and signature)

prefix: str

Prefix.api_key

Type:

The prefix to add to the hashed key in storage. Defaults to

Type:

class

secret: str

Server secret. Needed for emailing tokens and for csrf tokens

class crossauth_backend.apikey.KeyReturn(key, token)[source]

Bases: NamedTuple

key: ApiKey

Alias for field number 0

token: str

Alias for field number 1

class crossauth_backend.apikey.NamedKey[source]

Bases: Key

created: datetime
data: NotRequired[str]
expires: datetime | NullType
lastactive: NotRequired[datetime]
name: str
userid: NotRequired[str | int | NullType]
value: str

crossauth_backend.auth module

class crossauth_backend.auth.AuthenticationOptions[source]

Bases: TypedDict

Options to pass to the constructor.

friendly_name: str

If passed, this is what will be displayed to the user when selecting an authentication method.

class crossauth_backend.auth.AuthenticationParameters[source]

Bases: UserSecretsInputFields

Parameters needed for this this class to authenticator a user (besides username) An example is password

expiry: int
otp: str
password: str
totpsecret: str
class crossauth_backend.auth.Authenticator(options: AuthenticationOptions = {})[source]

Bases: ABC

Base class for username/password authentication.

Subclass this if you want something other than PBKDF2 password hashing.

abstractmethod async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]
abstractmethod can_create_user() bool[source]
abstractmethod can_update_secrets() bool[source]
abstractmethod can_update_user() bool[source]
capabilities() AuthenticatorCapabilities[source]
abstractmethod async create_one_time_secrets(user: User) UserSecretsInputFields[source]
abstractmethod async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]
factor_name: str = ''
friendly_name: str
abstractmethod mfa_channel() str[source]
abstractmethod mfa_type() str[source]
abstractmethod async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]
abstractmethod async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]
require_user_entry() bool[source]

If your authenticator doesn’t need a user to be in the table (because it can create one), override this and return false. Default is true

abstractmethod secret_names() List[str][source]
abstractmethod skip_email_verification_on_signup() bool[source]
abstractmethod transient_secret_names() List[str][source]
abstractmethod validate_secrets(params: AuthenticationParameters) List[str][source]
class crossauth_backend.auth.AuthenticatorCapabilities[source]

Bases: TypedDict

can_create_user: bool
can_update_secrets: bool
can_update_user: bool
class crossauth_backend.auth.PasswordAuthenticator(options: AuthenticationOptions = {})[source]

Bases: Authenticator

base class for authenticators that validate passwords

mfa_channel() str[source]
mfa_type() str[source]
secret_names() List[str][source]
transient_secret_names() List[str][source]

crossauth_backend.cookieauth module

class crossauth_backend.cookieauth.Cookie[source]

Bases: TypedDict

Object encapsulating a cookie name, value and options.

name: str
options: CookieOptions
value: str
class crossauth_backend.cookieauth.CookieOptions[source]

Bases: TypedDict

Optional parameters when setting cookies,

These match the HTTP cookie parameters of the same name.

domain: str
expires: datetime
httpOnly: bool
maxAge: int
path: str
sameSite: bool | Literal['lax', 'strict', 'none']
secure: bool
class crossauth_backend.cookieauth.CookieReturn(userid, value, created, expires)[source]

Bases: NamedTuple

created: datetime

Alias for field number 2

expires: datetime | None

Alias for field number 3

userid: str | int | None

Alias for field number 0

value: str

Alias for field number 1

class crossauth_backend.cookieauth.DoubleSubmitCsrfToken(options: DoubleSubmitCsrfTokenOptions = {})[source]

Bases: object

Class for creating and validating CSRF tokens according to the double-submit cookie pattern.

CSRF token is send as a cookie plus either a header or a hidden form field.

property cookie_name
create_csrf_token() str[source]

Creates a session key and saves in storage

Date created is the current date/time on the server.

Returns:

a random CSRF token.

property domain
property header_name
property httpOnly

Returns a Cookie object with the given session key.

Parameters:

token (str) – the value of the csrf token, with signature

:return a Cookie object,

Takes a session ID and creates a string representation of the cookie (value of the HTTP Cookie header).

:param str cookie_value the value to put in the cookie

Returns:

a string representation of the cookie and options.

make_csrf_form_or_header_token(token: str) str[source]
mask_csrf_token(token: str) str[source]
property path
property sameSite
property secure
unmask_csrf_token(mask_and_token: str) str[source]

Validates the passed CSRF cookie (doesn’t check it matches the token, just that the cookie is valid).

To be valid:
  • The signature in the cookie must match the token in the cookie

  • The token in the cookie must matched the value in the form or header after unmasking

Parameters:

cookie_value (str) – the CSRF cookie value to validate.

:raises crossauth_backend.CrossauthError with ErrorCode of InvalidKey

validate_double_submit_csrf_token(cookie_value: str, form_or_header_name: str) None[source]

Validates the passed CSRF token.

To be valid:
  • The signature in the cookie must match the token in the cookie

  • The token in the cookie must matched the value in the form or header after unmasking

Parameters:

cookie_value (str) – the CSRF cookie value to validate.

:param str form_or_header_name the value from the csrf_token form header or the X-CROSSAUTH-CSRF header.

:raises crossauth_backend.CrossauthError with ErrorCode of InvalidKey

class crossauth_backend.cookieauth.DoubleSubmitCsrfTokenOptions[source]

Bases: CookieOptions

Options for double-submit csrf tokens

cookie_name: NotRequired[str]
domain: str
expires: datetime
header_name: NotRequired[str]
httpOnly: bool
maxAge: int
path: str
sameSite: bool | Literal['lax', 'strict', 'none']
secret: NotRequired[str]
secure: bool
class crossauth_backend.cookieauth.SessionCookie(key_storage: KeyStorage, options: SessionCookieOptions = {})[source]

Bases: object

Class for session management using a session id cookie.

property cookie_name
async create_session_key(userid: str | int | None, extra_fields: Mapping[str, Any] = {}) Key[source]

Creates a session key and saves in storage

Date created is the current date/time on the server.

In the unlikely event of the key already existing, it is retried up to 10 times before throwing an error with ErrorCode.KeyExists

Parameters:
  • userid (str | int | None) – the user ID to store with the session key.

  • extra_fields (Dict[str, Any]|None) – Any fields in here will also be added to the session record

Returns:

the new session key

:raises crossauth_backend.CrossauthError: with
ErrorCode KeyExists if maximum

attempts exceeded trying to create a unique session id

async delete_all_for_user(userid: str | int, except_key: str | None = None) None[source]

Deletes all keys for the given user :param str|int userid: the user to delete keys for :param str|None except_key: if defined, don’t delete this key

property domain
async get_session_key(session_id: str) Key[source]

Returns the user matching the given session key in session storage, or throws an exception.

Looks the user up in the UserStorage instance passed to the constructor.

Undefined will also fail is CookieAuthOptions.filterFunction is defined and returns false,

Parameters:

session_id (str) – the unsigned value of the session cookie

Returns:

a crossauth_backend.User object, with the password hash removed.

:raises crossauth_backend.CrossauthError: with ErrorCode set to InvalidSessionId, Expired or UserNotExist.

async get_user_for_session_id(session_id: str, options: UserStorageGetOptions = {}) UserAndKey[source]

Returns the user matching the given session key in session storage, or throws an exception.

Looks the user up in the crossauth_backend.UserStorage instance passed to the constructor.

Undefined will also fail is CookieAuthOptions.filterFunction is defined and returns false,

Parameters:
Returns:

a crossauth_backend.User object, with the password hash removed, and the:class:crossauth_backend.Key with the unhashed session_id

:raises crossauth_backend.CrossauthError: with ErrorCode set to InvalidSessionId or Expired.

static hash_session_id(session_id: str) str[source]

Returns a hash of a session ID, with the session ID prefix for storing in the storage table. :param str session_id the session ID to hash

Returns:

a base64-url-encoded string that can go into the storage

property httpOnly
property idle_timeout

Returns a Cookie object with the given session key.

This class is compatible, for example, with Express.

Parameters:
  • session_key (crossauth_backend.Key) – the value of the session key

  • persist (bool|None) – if passed, overrides the persistSessionId setting

Returns:

a Cookie object,

Takes a session ID and creates a string representation of the cookie (value of the HTTP Cookie header).

Parameters:

cookie (Cookie) – the cookie vlaues to make a string from

Returns:

a string representation of the cookie and options.

property maxAge
property path
property sameSite
property secure

Unsigns a cookie and returns the original value. :param str cookie_value: the signed cookie value

Returns:

the unsigned value

:raises crossauth_backend.CrossauthError: if the signature is invalid.

async update_session_key(session_key: PartialKey) None[source]

Updates a session record in storage :param crossauth_backend.PartialKey session_key: the fields to update. value must be set, and will not be updated. All other defined fields will be updated.

:raises crossauth_backend.CrossauthError: if the session does not exist.

class crossauth_backend.cookieauth.SessionCookieOptions[source]

Bases: CookieOptions, TokenEmailerOptions

Options for double-submit csrf tokens

cookie_name: str

Name of cookie. Defaults to “CSRFTOKEN”

domain: str
email_from: NotRequired[str]
email_verification_html_body: NotRequired[str]
email_verification_subject: NotRequired[str]
email_verification_text_body: NotRequired[str]
expires: datetime
filter_function: Callable[[Key], bool]

self will be called with the session key to filter sessions before returning. Function should return true if the session is valid or false otherwise.

hash_session_id: bool

If true, session IDs are stored in hashed form in the key storage. Default False.

httpOnly: bool
idle_timeout: int

If non zero, sessions will time out after self number of seconds have elapsed without activity. Default 0 (no timeout)

maxAge: int
password_reset_expires: NotRequired[int]
password_reset_html_body: NotRequired[str]
password_reset_subject: NotRequired[str]
password_reset_text_body: NotRequired[str]
path: str
persist: bool

If true, sessions cookies will be persisted between browser sessions. Default True

prefix: NotRequired[str]
render: NotRequired[Callable[[str, Dict[str, Any]], str]]
sameSite: bool | Literal['lax', 'strict', 'none']
secret: str

App secret

secure: bool
site_url: NotRequired[str]
smtp_host: NotRequired[str]
smtp_password: NotRequired[str]
smtp_port: NotRequired[int]
smtp_use_tls: NotRequired[bool]
smtp_username: NotRequired[str]
user_storage: UserStorage

If user login is enabled, you must provide the user storage class

verify_email_expires: NotRequired[int]
views: NotRequired[str]
class crossauth_backend.cookieauth.UserAndKey(user, key)[source]

Bases: NamedTuple

key: Key

Alias for field number 1

user: User | None

Alias for field number 0

crossauth_backend.crypto module

class crossauth_backend.crypto.Crypto[source]

Bases: object

Provides cryptographic functions

Base32 = 'ABCDEFGHJKLMNPQRSTUVWXYZ23456789'
static base64_decode(encoded: str) str[source]

Decodes a string from base64 to UTF-8 :param str encoded: base64-encoded text

Returns:

URF-8 text

static base64_encode(text: str) str[source]

Base64-encodes UTF-8 text :param str: text UTF-8 text

Returns:

Base64 text

static base64_pad(s: str) str[source]
static base64_to_base64url(s: str) str[source]
static base64url_to_base64(s: str) str[source]
static base64url_to_str(s: str) str[source]
static decode_password_hash(hash: str) PasswordHash[source]

Splits a hashed password into its component parts. Return it as a PasswordHash.

The format of the hash should be ` digest:int key_len:iterations:useSecret:salt:hashedPassword ` The hashed password part is the Base64 encoding of the PBKDF2 password. :param str hash: the hassed password to decode. See above for format

Returns:

PasswordHash object containing the deecoded hash components

static encode_password_hash(hashed_password: str, salt: str, use_secret: bool, iterations: int, key_len: int, digest: str) str[source]

Encodes a hashed password into the string format it is stored as.

See decodePasswordHash() for the format it is stored in.

Parameters:
  • hashed_password (str) – the Base64-encoded PBKDF2 hash of the password

  • salt (str) – the salt used for the password.

  • use_secret (bool) – whether or not to use the application secret as part of the hash.

  • iterations (int) – the number of PBKDF2 iterations

  • key_len (int) – the key length PBKDF2 parameter - results in a hashed password this length, before Base64,

  • digest (str) – The digest algorithm, eg pbkdf2

Returns:

a string encode the above parameters.

static hash(plaintext: str) str[source]

Standard hash using SHA256 (not PBKDF2 or HMAC)

Parameters:

plaintext (:str) – text to hash

Returns:

the string containing the hash

async static password_hash(plaintext: str, options: HashOptions = {}) str[source]

Hashes a password and returns it as a base64 or base64url encoded string :param str plaintext: password to hash :param HashOptions options:

  • salt: salt to use. Make a random one if not passed

  • secret: optional application secret password to apply as a pepper

  • encode: if true, returns the full string as it should be stored in the database.

Returns:

the string containing the hash and the values to decode it

async static passwords_equal(plaintext: str, encoded_hash: str, secret: str | None = None) bool[source]

Returns true if the plaintext password, when hashed, equals the one in the hash, using it’s hasher settings :param str plaintext: the plaintext password :param str encoded_hash: the previously-hashed version :param str|None secret: if useHash`in `encodedHash is true, uses as a pepper for the hasher

Returns:

true if they are equal, false otherwise

static random_base32(length: int, dash_every: int | None = None) str[source]

Creates a random base-23 string :param int length: length of the string to create

Returns:

the random value as a string. Number of bytes will be greater as it is base64 encoded.

static random_salt() str[source]

Creates a random salt

Returns:

random salt as a base64 encoded string

static random_value(length: int) str[source]

Creates a random string encoded as in base64url :param int length: length of the string to create

Returns:

the random value as a string. Number of bytes will be greater as it is base64 encoded.

static sha256(plaintext: str) str[source]

Standard hash using SHA256 (not PBKDF2 or HMAC)

Parameters:

plaintext (str) – text to hash

Returns:

the string containing the hash

static sign(payload: Dict[str, Any], secret: str, salt: str | None = None, timestamp: int | None = None) str[source]

Signs a JSON payload by creating a hash, using a secret and optionally also a salt and timestamp

Parameters:
  • payload (Dict[str, Any]) – object to sign (will be stringified as a JSON)

  • secret (str) – secret key, which must be a string

  • salt (str|None) – optionally, a salt to concatenate with the payload (must be a string)

Paramint|None timestamp:

optionally, a timestamp to include in the signed date as a Unix date

Returns:

Base64-url encoded hash

static sign_secure_token(payload: str, secret: str) str[source]

This can be called for a string payload that is a cryptographically secure random string. No salt is added and the token is assumed to be Base64Url already

Parameters:
  • payload (str) – string to sign

  • secret (str) – the secret to sign with

Returns:

Base64-url encoded hash

static signable_token(payload: Dict[str, Any], salt: str | None = None, timestamp: int | None = None) str[source]

hash is of a JSON containing the payload, timestamp and optionally a salt. :param Dict[str, Any]: payload the payload to hash :param str|None salt: optional salt (use if the payload is small) :param int|None timestamp: time the token will expire

Returns:

a Base64-URL-encoded string that can be hashed.

static str_to_base64url(s: str) str[source]
static symmetric_decrypt(ciphertext: str, key_string: str) str[source]

Symmetric decryption using a key that must be a string

Parameters:
  • ciphertext (str) – Base64-url encoded ciphertext

  • key_string (str) – the symmetric key

Returns:

Decrypted text

static symmetric_encrypt(plaintext: str, key_string: str, iv: bytes | None = None) str[source]

Symmetric encryption using a key that must be a string

Parameters:
  • plaintext (str) – Text to encrypt

  • key_string (str) – the symmetric key

  • bytes|None (iv) – the iv value. In None, a random one is created

Returns:

Encrypted text Base64-url encoded.

static unsign(signed_message: str, secret: str, expiry: int | None = None) Dict[str, Any][source]

Validates a signature and, if valid, return the unstringified payload :param str signed_message: signed message (base64-url encoded) :param str secret: secret key, which must be a string :param int|None expiry: if set, validation will fail if the timestamp in the payload is after this date

Returns:

if signature is valid, the payload as an object

:raises crossauth_backend.CrossauthError: with

ErrorCode of InvalidKey if signature is invalid or has expired.

static unsign_secure_token(signed_message: str, secret: str, expiry: int | None = None) str[source]

Validates a signature signed with signSecureToken and, if valid, return the unstringified payload :param str signed_message: signed message (base64-url encoded) :param str secret: secret key, which must be a string

Returns:

if signature is valid, the payload as a string

:raises crossauth_backend.CrossauthError: with

{ErrorCode of InvalidKey if signature is invalid or has expired.

static uuid() str[source]

Creates a UUID

static xor(value: str, mask: str) str[source]

XOR’s two arrays of base64url-encoded strings :param str value: to XOR :param str mask: mask to XOR it with

Returns:

an XOR’r string

class crossauth_backend.crypto.HashOptions[source]

Bases: TypedDict

Option parameters for Crypto.passwordHash

digest: str

PBKDF2 digest method

encode: bool

Whether to Base64-URL-encode the result

iterations: int

Number of PBKDF2 iterations

key_len: int

Length (before Base64-encoding) of the PBKDF2 key being generated

salt: str

A salt to prepend to the message before hashing

secret: str

A secret to append to the salt when hashing, or undefined for no secret

class crossauth_backend.crypto.PasswordHash[source]

Bases: TypedDict

An object that contains all components of a hashed password. Hashing is done with PBKDF2

digest: str

The digest algorithm to use, eg sha512

hashed_password: str

The actual hashed password in Base64 format

iterations: int

Number of iterations for PBKDF2

key_len: int

The key length parameter passed to PBKDF2 - hash will be this number of characters long

salt: str

The random salt used to create the hashed password

use_secret: bool

If true, secret (application secret) is also used to hash the password

crossauth_backend.emailtoken module

class crossauth_backend.emailtoken.TokenEmailer(user_storage: UserStorage, key_storage: KeyStorage, options: TokenEmailerOptions = {})[source]

Bases: object

Sends password reset and email verification tokens to an email address

async create_and_save_email_verification_token(userid: str | int, new_email: str = '') str[source]

Create and save email verification token

async create_and_save_password_reset_token(userid: str | int) str[source]

Create and save password reset token

create_emailer() SMTP[source]

Create SMTP emailer

async delete_email_verification_token(token: str)[source]

Delete email verification token

static hash_email_verification_token(token: str) str[source]

Produces a hash of the given email verification token with the correct prefix for inserting into storage.

static hash_password_reset_token(token: str) str[source]

Produces a hash of the given password reset token with the correct prefix for inserting into storage.

static is_email_valid(email: str) bool[source]

Returns true if the given email has a valid format, false otherwise.

Parameters:

email – the email to validate

Returns:

true or false

async send_email_verification_token(userid: str | int, new_email: str = '', extra_data: Dict[str, Any] = {}) None[source]

Send an email verification email using the Jinja2 templates.

The email address to send it to will be taken from the user’s record in user storage. It will first be validated, throwing a CrossauthError with ErrorCode of InvalidEmail if it is not valid.

Parameters:
  • userid – userid to send it for

  • new_email – if this is a token to verify email for account activation, leave this empty. If it is for changing an email, this will be the field it is being changed do.

  • extra_data – these extra variables will be passed to the Jinja2 templates

async send_password_reset_token(userid: int | str, extra_data: Dict[str, Any] = {}, as_admin: bool = False) None[source]

Send a password reset token email using the Jinja2 templates

Parameters:
  • userid – userid to send it for

  • extra_data – these extra variables will be passed to the Jinja2 templates

  • as_admin – whether this is being sent by an admin

static validate_email(email: str | None) None[source]

Returns if the given email has a valid format. Throws a CrossauthError with ErrorCode InvalidEmail otherwise.

Parameters:

email – the email to validate

async verify_email_verification_token(token: str) Dict[str, str | int][source]

Validates an email verification token.

The following must match:
  • expiry date in the key storage record must be less than current time

  • userid in the token must match the userid in the key storage

  • email address in user storage must match the email in the key. If there is no email address, the username field is set if it is in email format.

  • expiry time in the key storage must match the expiry time in the key

Looks the token up in key storage and verifies it matches and has not expired.

Parameters:

token – the token to validate

Retu:returns:

the userid of the user the token is for and the email address the user is validating

async verify_password_reset_token(token: str) User[source]

Validates a password reset token

The following must match:
  • expiry date in the key storage record must be less than current time

  • userid in the token must match the userid in the key storage

  • the email in the token matches either the email or username field in user storage

  • the password in user storage must match the password in the key

  • expiry time in the key storage must match the expiry time in the key

Looks the token up in key storage and verifies it matches and has not expired. Also verifies the user exists and password has not changed in the meantime.

Parameters:

token – the token to validate

:return

the user that the token is for

class crossauth_backend.emailtoken.TokenEmailerOptions[source]

Bases: TypedDict

Configuration options for TokenEmailer

email_from: NotRequired[str]

Sender for emails

email_verification_html_body: NotRequired[str]

Template file containing page for producing the HTML version of the email verification email body

email_verification_subject: NotRequired[str]

Subject for the the email verification email

email_verification_text_body: NotRequired[str]

Template file containing page for producing the text version of the email verification email body

password_reset_expires: NotRequired[int]

Number of seconds befire password reset tokens should expire. Default 1 day

password_reset_html_body: NotRequired[str]

Template file containing page for producing the HTML version of the password reset email body

password_reset_subject: NotRequired[str]

Subject for the the password reset email

password_reset_text_body: NotRequired[str]

Template file containing page for producing the text version of the password reset email body

prefix: NotRequired[str]

The prefix between the site url and the email verification/password reset link. Default “/”

render: NotRequired[Callable[[str, Dict[str, Any]], str]]

if passed, use this instead of the default nunjucks renderer

site_url: NotRequired[str]

3000”. No default - required parameter

Type:

The site url, used to create a link, eg “https

Type:

//mysite.com

smtp_host: NotRequired[str]

Hostname of the SMTP server. No default - required parameter

smtp_password: NotRequired[str]

Password for connecting to SMTP servger. Default undefined

smtp_port: NotRequired[int]

Port the SMTP server is running on. Default 25

smtp_use_tls: NotRequired[bool]

Whether or not TLS is used by the SMTP server. Default false

smtp_username: NotRequired[str]

Username for connecting to SMTP servger. Default undefined

verify_email_expires: NotRequired[int]

Number of seconds befire email verification tokens should expire. Default 1 day

views: NotRequired[str]

The directory containing views (by default, Nunjucks templates)

crossauth_backend.session module

class crossauth_backend.session.Csrf(csrf_cookie, csrf_form_or_header_value)[source]

Bases: NamedTuple

Alias for field number 0

csrf_form_or_header_value: str

Alias for field number 1

class crossauth_backend.session.SessionManager(key_storage: KeyStorage, authenticators: Mapping[str, Authenticator], options: SessionManagerOptions = {})[source]

Bases: object

Class for managing sessions.

property allowed_factor2
async apply_email_verification_token(token: str) User[source]

Takes an email verification token as input and applies it to the user storage.

The state is reset to active. If the token was for changing the password, the new password is saved to the user in user storage.

Parameters:

token – the token to apply

Returns:

the new user record

property authenticators
async cancel_two_factor_page_visit(session_id: str) Dict[str, Any][source]

Cancels the 2FA that was previously initiated but not completed..

If successful, returns. Otherwise an exception is thrown.

Parameters:

session_id – the session id (unsigned)

:return

Dict[str, Any]: the 2FA data that was created on initiation

:raise

CrossauthError: of Unauthorized if 2FA was not initiated.

async change_secrets(username: str, factor_number: int, new_params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None, old_params: AuthenticationParameters | None = None) User[source]
async complete_two_factor_login(params: AuthenticationParameters, session_id: str, extra_fields: Mapping[str, Any] = {}, persist: bool = False) SessionTokens[source]

Performs the second factor authentication as the second step of the login process

If authentication is successful, the user’s state will be set to active and a new session will be created, bound to the user. The anonymous session will be deleted.

Parameters:
  • params – the user-provided parameters to authenticate with (eg TOTP code).

  • session_id – the user’s anonymous session

  • extra_fields – extra fields to add to the user-bound new session table entry

  • persist – if true, the cookie will be perstisted (with an expiry value); otberwise it will be a session-only cookie.

Return AuthResult containing:

session_cookie: the new session cookie csrf_cookie: the new CSRF cookie csrf_form_or_header_value: the new CSRF token corresponding to the cookie user: the newly-logged in user.

async complete_two_factor_page_visit(params: AuthenticationParameters, session_id: str)[source]

Completes 2FA when visiting a protected page.

If successful, returns. Otherwise an exception is thrown.

Parameters:
  • params – the parameters from user input needed to authenticate (eg TOTP code). Passed to the authenticator

  • session_id – the session cookie value (ie still signed)

:raise

CrossauthError: if authentication fails.

async complete_two_factor_setup(params: AuthenticationParameters, session_id: str) User[source]

Authenticates with the second factor.

If successful, the new user object is returned. Otherwise an exception is thrown, :param params the parameters from user input needed to authenticate (eg TOTP code) :param session_id the session cookie value (ie still signed) :return the user object :raise CrossauthError if authentication fails.

async create_anonymous_session(extra_fields: Mapping[str, Any] | None = None) SessionTokens[source]
async create_csrf_form_or_header_value(csrf_cookie_value: str)[source]

Validates the signature on the CSRF cookie value and returns a value that can be put in the form or CSRF header value.

Parameters:

csrf_cookie_value (str) – the value from the CSRF cookie

Returns:

the value to put in the form or CSRF header

async create_csrf_token() Csrf[source]

Creates and returns a signed CSRF token based on the session ID

Returns:

a CSRF cookie and value to put in the form or CSRF header

async create_user(user: UserInputFields, params: UserSecrets, repeat_params: UserSecrets | None = None, skip_email_verification: bool = False, empty_password: bool = False) User[source]

Returns the name used for CSRF token cookies.

Returns the name used for CSRF token cookies.

property csrf_header_name

Returns the name used for CSRF token cookies

property csrf_tokens
async data_for_session_id(session_id: str) Dict[str, Any] | None[source]

Returns the data object for a session id, or undefined, as an object.

If the user is undefined, or the key has expired, returns undefined.

Parameters:

session_id (str) – the session key to look up in session storage

Returns:

a string from the data field

:raise crossauth_backend.CrossauthError: with

ErrorCode of Connection, InvalidSessionId UserNotExist or Expired.

async data_string_for_session_id(session_id: str) str | None[source]

Returns the data object for a session key, or undefined, as a JSON string (which is how it is stored in the session table)

If the user is undefined, or the key has expired, returns undefined.

Parameters:

session_id (str) – the session id to look up in session storage

Returns:

a string from the data field

:raise crossauth_backend.CrossauthError: with

ErrorCode of Connection, InvalidSessionId UserNotExist or Expired.

async delete_session(session_id: str) None[source]

Deletes the given session ID from the key storage (not the cookie)

Parameters:

session_id (str) – the session Id to delete

async delete_session_data(session_id: str, name: str) None[source]

Deletes a field from the session data.

The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id; the session Id to update.

async delete_user_by_username(username: str) None[source]
property email_token_storage
get_session_id(session_cookie_value: str)[source]

Returns the session ID from the signed session cookie value

Parameters:

session_cookie_value (str) – value from the session ID cookie

Returns:

the usigned cookie value.

:raises crossauth_backend.CrossauthError with InvalidKey

if the signature is invalid.

async initiate_two_factor_login(user: User) SessionTokens[source]

Initiates the two factor login process.

Creates an anonymous session and corresponding CSRF token

Parameters:

user – the user, which should already have been authenticated with factor1

Returns:

a new anonymous session cookie and corresponding CSRF cookie and token.

async initiate_two_factor_page_visit(user: User, session_id: str, request_body: Mapping[str, Any], url: str | None = None, content_type: str | None = None) SessionTokens[source]

Initiates the two factor process when visiting a protected page.

Creates an anonymous session and corresponding CSRF token

Parameters:
  • user – the user, which should already have been authenticated with factor1

  • session_id – the logged in session associated with the user

  • request_body – the parameters from the request made before being redirected to factor2 authentication

  • url – the requested url, including path and query parameters content_type: optional content type from the request

:return

If a token was passed a new anonymous session cookie and corresponding CSRF cookie and token.

async initiate_two_factor_setup(user: User, new_factor2: str | None, session_id: str) Mapping[str, Any][source]

Begins the process of setting up 2FA for a user which has already been created and activated. Called when changing 2FA or changing its parameters.

Parameters:
  • user – the logged in user

  • new_factor2 – new second factor to change user to

  • session_id – the session cookie for the user

:return

the 2FA data that can be displayed to the user in the configure 2FA step (such as the secret and QR code for TOTP).

async initiate_two_factor_signup(user: UserInputFields, params: UserSecrets, session_id: str, repeat_params: UserSecrets | None) UserIdAndData[source]

Creates a user with 2FA enabled.

The user storage entry will be createed, with the state set to awaitingtwofactorsetup or awaitingtwofactorsetupandemailverification. The passed session key will be updated to include the username and details needed by 2FA during the configure step.

Parameters:
  • user – details to save in the user table

  • params – params the parameters needed to authenticate with factor1 (eg password)

  • session_id – the anonymous session cookie

  • repeat_params – if passed, these will be compared with params and if they don’t match, PasswordMatch is thrown.

Returns:

Dict containing:

userid: the id of the created user. userData: data that can be displayed to the user in the page to

complete 2FA set up (eg the secret key and QR codee for TOTP),

property key_storage
async login(username: str, params: AuthenticationParameters, extra_fields: Mapping[str, Any] = {}, persist: bool = False, user: User | None = None, bypass_2fa: bool = False) SessionTokens[source]

Performs a user login

  • Authenticates the username and password

  • Creates a session key - if 2FA is enabled, this is an anonymous session, otherwise it is bound to the user

  • Returns the user (without the password hash) and the session cookie.

If the user object is defined, authentication (and 2FA) is bypassed

Parameters:
  • username – the username to validate

  • params – user-provided credentials (eg password) to authenticate with

  • extra_fields – add these extra fields to the session key if authentication is successful

  • persist – if passed, overrides the persistSessionId setting.

  • user

    if this is defined, the username and password are ignored and the given user is logged in.

    The 2FA step is also skipped

    bypass2FA: if true, the 2FA step will be skipped

:return

Dict containing the user, user secrets, and session cookie and CSRF cookie and token. if a 2fa step is needed, it will be an anonymouos session, otherwise bound to the user

Raise:
CrossauthError: with ErrorCode of Connection, UserNotValid,

PasswordNotMatch or UserNotExist.

async logout(session_id: str)[source]
async logout_from_all(userid: str | int, except_id: str | None = None)[source]
async repeat_two_factor_signup(session_id: str) UserIdAndData[source]

This can be called if the user has finished signing up with factor1 but closed the browser before completing factor2 setup. Call it if the user signs up again with the same factor1 credentials.

Parameters:

session_id – the anonymous session ID for the user

Returns:

Dict containing:

userid: the id of the created user userData: data that can be displayed to the user in the page to

complete 2FA set up (eg the secret key and QR code for TOTP),

secrets: data that is saved in the session for factor2. In the

case of TOTP, both userData and secrets contain the shared secret but only userData has the QR code, since it can be generated from the shared secret.

async request_password_reset(email: str) None[source]

Sends a password reset token :param email: the user’s email (where the token will be sent)

async reset_secret(token: str, factror_number: int, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) User[source]

Resets the secret for factor1 or 2 (eg reset password)

Parameters:
  • token – the reset password token that was emailed

  • factror_number – which factor to reset (1 or 2)

  • params – the new secrets entered by the user (eg new password)

  • repeat_params – optionally, repeat of the secrets. If passed, an exception will be thrown if they do not match

:return the user object

Raises:

CrossauthError – if the repeat_params don’t match params, the token is invalid or the user storage cannot be updated.

property session

Returns the name used for session ID cookies.

Returns the name used for session ID cookies

async update_many_session_data(session_id: str, data_array: List[KeyDataEntry]) None[source]

Update field sin the session data.

The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id: the session Id to update. :param List[crossauth_backend.KeyDataEntry] data_array: names and values.

async update_session_activity(session_id: str)[source]

If session_idle_timeout is set, update the last activcity time in key storage to current time.

Parameters:

session_id (str) – the session Id to update.

async update_session_data(session_id: str, name: str, value: Mapping[str, Any]) None[source]

Update a field in the session data.

The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id: the session Id to update. :param str name: of the field. :param Mapping[str, Any] value: new value to store

async update_user(current_user: User, new_user: User, skip_email_verification: bool = False, as_admin: bool = False) TokensSent[source]

Updates a user entry in storage :param current_user the current user details :param new_user the new user details :return dict with emailVerificationTokenSent and passwordResetTokenSent booleans

async user_for_password_reset_token(token: str) User[source]

Returns the user associated with a password reset token

Parameters:

token – the token that was emailed

Returns:

the user

Raises:

CrossauthError – if the token is not valid.

async user_for_session_id(session_id: str)[source]
property user_storage

Throws crossauth_backend.CrossauthError with InvalidKey if the passed CSRF cookie value is not valid (ie invalid signature) :param str csrf_cookie_value: the CSRF cookie value

validate_double_submit_csrf_token(csrf_cookie_value: str, csrf_form_or_header_value: str)[source]

Throws crossauth_backend.CrossauthError with InvalidKey if the passed CSRF token is not valid for the given session ID. Otherwise returns without error

Parameters:
  • strcsrf_cookie_value – the CSRF cookie value

  • csrf_form_or_header_value (str) – the value from the form field or CSRF header

class crossauth_backend.session.SessionManagerOptions[source]

Bases: TokenEmailerOptions

Options for SessionManager

allowed_factor2: List[str]

Set of 2FA factor names a user is allowed to set.

The name corresponds to the key you give when adding authenticators. See authenticators in SessionManager.constructor.

Options for csrf cookie manager

email_from: NotRequired[str]
email_token_storage: KeyStorage

Store for password reset and email verification tokens. If not passed, the same store as for sessions is used.

email_verification_html_body: NotRequired[str]
email_verification_subject: NotRequired[str]
email_verification_text_body: NotRequired[str]
enable_email_verification: bool

If true, users will have to verify their email address before account is created or when changing their email address. See class description for details. Default True

enable_password_reset: bool

If true, allow password reset by email token. See class description for details. Default True

password_reset_expires: NotRequired[int]
password_reset_html_body: NotRequired[str]
password_reset_subject: NotRequired[str]
password_reset_text_body: NotRequired[str]
prefix: NotRequired[str]
render: NotRequired[Callable[[str, Dict[str, Any]], str]]
secret: str

Server secret. Needed for emailing tokens and for csrf tokens

options for session cookie manager

site_url: str

Base URL for the site.

This is used when constructing URLs, eg for sending password reset tokens.

smtp_host: NotRequired[str]
smtp_password: NotRequired[str]
smtp_port: NotRequired[int]
smtp_use_tls: NotRequired[bool]
smtp_username: NotRequired[str]
user_storage: UserStorage

If user login is enabled, you must provide the object where users are stored.

verify_email_expires: NotRequired[int]
views: NotRequired[str]
class crossauth_backend.session.SessionTokens(session_cookie, csrf_cookie, csrf_form_or_header_value, user, secrets)[source]

Bases: NamedTuple

Alias for field number 1

csrf_form_or_header_value: str | None

Alias for field number 2

secrets: UserSecrets | None

Alias for field number 4

Alias for field number 0

user: User | None

Alias for field number 3

class crossauth_backend.session.TokensSent(email_verification_token_sent, password_reset_token_sent)[source]

Bases: NamedTuple

email_verification_token_sent: bool

Alias for field number 0

password_reset_token_sent: bool

Alias for field number 1

class crossauth_backend.session.UserIdAndData(userid, user_data, secrets)[source]

Bases: NamedTuple

secrets: UserSecrets | None

Alias for field number 2

user_data: Dict[str, Any]

Alias for field number 1

userid: str | int

Alias for field number 0

crossauth_backend.sessionendpoints module

When not overriding which endpoints to enable, all of these will be.

crossauth_backend.sessionendpoints.AllEndpointsMinusOAuth = ['signup', 'api/signup', 'login', 'logout', 'changepassword', 'updateuser', 'deleteuser', 'api/login', 'api/logout', 'api/changepassword', 'api/userforsessionkey', 'api/getcsrftoken', 'api/updateuser', 'api/deleteuser', 'admin/createuser', 'admin/changepassword', 'admin/selectuser', 'admin/updateuser', 'admin/changepassword', 'admin/deleteuser', 'admin/api/createuser', 'admin/api/changepassword', 'admin/api/updateuser', 'admin/api/changepassword', 'admin/api/deleteuser', 'verifyemail', 'emailverified', 'api/verifyemail', 'requestpasswordreset', 'resetpassword', 'api/requestpasswordreset', 'api/resetpassword', 'configurefactor2', 'loginfactor2', 'changefactor2', 'factor2', 'api/configurefactor2', 'api/loginfactor2', 'api/changefactor2', 'api/factor2', 'api/cancelfactor2']

These are all the endpoints

crossauth_backend.sessionendpoints.EmailVerificationApiEndpoints = ['api/verifyemail']

Page endpoints that depend on password reset being enabled

If not overriding which endpoints to enable with endpoints, and if password reset is enabled, then this endpoints will be added.

crossauth_backend.sessionendpoints.EmailVerificationPageEndpoints = ['verifyemail', 'emailverified']

API (JSON) endpoints that depend on email verification.

If not overriding which endpoints to enable with endpoints, and if email verification is enabled, then this endpoints will be added.

crossauth_backend.sessionendpoints.Factor2ApiEndpoints = ['api/configurefactor2', 'api/loginfactor2', 'api/changefactor2', 'api/factor2', 'api/cancelfactor2']

Page endpoints that depend on email verification being enabled.

If not overriding which endpoints to enable with endpoints, and if email verification is enabled, then this endpoints will be added.

crossauth_backend.sessionendpoints.Factor2PageEndpoints = ['configurefactor2', 'loginfactor2', 'changefactor2', 'factor2']

These are the endpoints created ig endpoints is set to allMinusOAuth

crossauth_backend.sessionendpoints.PasswordResetApiEndpoints = ['api/requestpasswordreset', 'api/resetpassword']

Endpoints for signing a user up that display HTML

When not overriding which endpoints to enable, all of these will be.

crossauth_backend.sessionendpoints.PasswordResetPageEndpoints = ['requestpasswordreset', 'resetpassword']

API (JSON) endpoints that depend on password reset being enabled

If not overriding which endpoints to enable with endpoints, and if password reset is enabled, then this endpoints will be added.

crossauth_backend.sessionendpoints.SessionAdminApiEndpoints = ['admin/api/createuser', 'admin/api/changepassword', 'admin/api/updateuser', 'admin/api/changepassword', 'admin/api/deleteuser']

When not overriding which endpoints to enable, and with both enableAdminEndpoints and enableAdminClientManagement enabled, all these will be.

crossauth_backend.sessionendpoints.SessionAdminClientApiEndpoints = ['admin/api/createclient', 'admin/api/deleteclient', 'admin/api/updateclient']

When not overriding which endpoints to enable, and with enableAdminClientManagement enabled, all these will be.

crossauth_backend.sessionendpoints.SessionAdminClientPageEndpoints = ['admin/selectclient', 'admin/createclient', 'admin/deleteclient', 'admin/updateclient']

When not overriding which endpoints to enable, and with enableAdminClientManagement enabled, all these will be.

crossauth_backend.sessionendpoints.SessionAdminPageEndpoints = ['admin/createuser', 'admin/changepassword', 'admin/selectuser', 'admin/updateuser', 'admin/changepassword', 'admin/deleteuser']

When not overriding which endpoints to enable, and with both enableAdminEndpoints and enableAdminClientManagement enabled, all these will be.

crossauth_backend.sessionendpoints.SessionApiEndpoints = ['api/login', 'api/logout', 'api/changepassword', 'api/userforsessionkey', 'api/getcsrftoken', 'api/updateuser', 'api/deleteuser']

When not overriding which endpoints to enable, and with enableAdminEndpoints enabled, all of these will be.

crossauth_backend.sessionendpoints.SessionClientApiEndpoints = ['api/deleteclient', 'api/updateclient', 'api/createclient']

API (JSON) endpoints that depend on 2FA being enabled.

If not overriding which endpoints to enable with endpoints, and if any 2FA factors are enabled, then this endpoints will be added.

crossauth_backend.sessionendpoints.SessionClientPageEndpoints = ['selectclient', 'createclient', 'updateclient', 'deleteclient']

When not overriding which endpoints to enable, all of these will be.

crossauth_backend.sessionendpoints.SessionPageEndpoints = ['login', 'logout', 'changepassword', 'updateuser', 'deleteuser']

When not overriding which endpoints to enable, and with enableAdminEndpoints enabled, all of these will be.

crossauth_backend.sessionendpoints.SignupApiEndpoints = ['api/signup']

Endpoints for signing a user up that display HTML

crossauth_backend.sessionendpoints.SignupPageEndpoints = ['signup']

API (JSON) endpoints for signing a user up that display HTML

When not overriding which endpoints to enable, all of these will be.

crossauth_backend.storage module

class crossauth_backend.storage.KeyDataEntry[source]

Bases: TypedDict

data_name: str
value: NotRequired[Any]
class crossauth_backend.storage.KeyStorage[source]

Bases: ABC

Base class for storing session and API keys.

This class is subclassed for various types of session key storage. For example, PrismaKeyStorage is for storing sessions in a database table, managed by the Prisma ORM.

static decode_data(data: str | None) Dict[str, Any][source]

Returns an object decoded from the data field as a JSON string

Parameters:

data (str|None) – The JSON string to decode

Returns:

The parsed JSON object

Raises:

json.JSONDecodeError – If data is not a valid JSON string

abstractmethod async delete_all_for_user(userid: str | int | None, prefix: str, except_key: str | None = None) None[source]

Deletes all keys from storage for the given user ID

Parameters:
  • userid (int|str|None) – User ID to delete keys for

  • prefix (str) – Only keys starting with this prefix will be deleted

  • except_key (str|None) – If defined, the key with this value will not be deleted

abstractmethod async delete_data(key_name: str, data_name: str) None[source]

The ‘data’ field in a key entry is a JSON string. This method should atomically delete a field in it.

Parameters:
  • key_name (str) – The name of the key to update, as it appears in the table.

  • data_name (str) – The field name to delete. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is deleted.

abstractmethod async delete_key(value: str) None[source]

Deletes a key from storage (e.g., the database).

Parameters:

value (str) – The key to delete

abstractmethod async delete_matching(key: PartialKey) None[source]

Deletes all matching the given specs

Parameters:

key (crossauth_backend.PartialKey) – Any key matching all defined values in this object will be deleted

static encode_data(data: Dict[str, Any] | None = None) str[source]

Returns a JSON string encoded from the given object

Parameters:

data (Dict[str, Any]|None) – The object to encode

Returns:

A JSON string

abstractmethod async get_all_for_user(userid: str | int | None = None) List[Key][source]

Return all keys matching the given user ID

Parameters:

userid (str|int|None) – User to return keys for

Returns:

List[Key]: An array of keys

abstractmethod async get_key(key: str) Key[source]

Returns the matching key in the session storage or raises an exception if it doesn’t exist.

Parameters:

key (ster) – The key to look up, as it will appear in this storage (typically unsigned, hashed)

Returns:

The matching Key record.

abstractmethod async save_key(userid: str | int | None, value: str, date_created: datetime, expires: datetime | None = None, data: str | None = None, extra_fields: Mapping[str, Any] | None = None) None[source]

Saves a session key in the session storage (e.g., database).

Parameters:
  • userid (int|str|None) – The ID of the user. This matches the primary key in the UserStorage implementation.

  • value (str) – The key value to store.

  • date_created (datetime) – The date/time the key was created.

  • expires (datetime|None) – The date/time the key expires.

  • extra_fields (Mapping[str, Any]|None) – These will also be saved in the key record

Padam str|None data:

An optional value, specific to the type of key, e.g., new email for email change tokens

abstractmethod async update_data(key_name: str, data_name: str, value: Any | None) None[source]

The ‘data’ field in a key entry is a JSON string. This method should atomically update a field in it.

Parameters:
  • key_name (str) – The name of the key to update, as it appears in the table.

  • data_name (str) – The field name to update. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is updated.

  • value (Any|None) – The new value.

abstractmethod async update_key(key: PartialKey) None[source]

If the given session key exists in the database, update it with the passed values. If it doesn’t exist, raise a CrossauthError with ErrorCode ‘InvalidKey’.

Parameters:

key (crossauth_backend.PartialKey) – The fields defined in this will be updated. ‘id’ must be present and it will not be updated.

abstractmethod async update_many_data(key_name: str, data_array: List[KeyDataEntry]) None[source]

Same as ‘update_data’ but updates several keys.

Ensure it is done as a single transaction.

Parameters:
class crossauth_backend.storage.OAuthAuthorizationStorage(options: OAuthAuthorizationStorageOptions = {})[source]

Bases: ABC

Base class for storing scopes that have been authorized by a user (or for client credentials, for a client).

This class is subclassed for various types of storage. For example, PrismaOAuthAuthorizationStorage is for storing in a database table, managed by the Prisma ORM.

abstractmethod async get_authorizations(client_id: str, userid: str | int | None = None) List[str | None][source]

Returns the matching all scopes authorized for the given client and optionally user.

Parameters:
  • client_id (str) – the client_id to look up

  • userid (int|str|None) – the userid to look up, None for a client authorization not user authorization

Returns:

The authorized scopes as a list.

abstractmethod async update_authorizations(client_id: str, userid: str | int | None, authorizations: List[str | None]) None[source]

Saves a new set of authorizations for the given client and optionally user.

Deletes the old ones.

Parameters:
  • client_id (str) – the client_id to look up

  • userid (str|int|None) – the userid to look up, None for a client authorization not user authorization

  • List[str|None]authorizations – new set of authorized scopes, which may be empty

class crossauth_backend.storage.OAuthAuthorizationStorageOptions[source]

Bases: TypedDict

class crossauth_backend.storage.OAuthClientStorage(options: OAuthClientStorageOptions = {})[source]

Bases: ABC

Base class for storing OAuth clients.

This class is subclassed for various types of client storage. For example, PrismaOAuthClientStorage is for storing clients in a database table, managed by the Prisma ORM.

abstractmethod async create_client(client: OAuthClient) OAuthClient[source]

Creates and returns a new client with random ID and optionally secret.

Saves in the database.

Parameters:

client (crossauth_backend.OAuthClient) – the client to save.

Returns:

The new client.

abstractmethod async delete_client(client_id: str) None[source]

Deletes a key from storage.

Parameters:

client_id (str) – the client to delete

abstractmethod async get_client_by_id(client_id: str) OAuthClient[source]

Returns the matching client by its auto-generated id in the storage or throws an exception if it doesn’t exist.

Parameters:

client_id (str) – the client_id to look up

Returns:

he matching OAuthClient object.

abstractmethod async get_client_by_name(name: str, userid: str | int | None | NullType = None) List[OAuthClient][source]

Returns the matching client in the storage by friendly name or throws an exception if it doesn’t exist.

Parameters:
  • name (str) – the client name to look up

  • userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients with this name.

Returns:

A list of OAuthClient objects.

:raises crossauth_backend.CrossauthError: with ErrorCode of ‘InvalidSessionId’ if a match was not found in session storage.

abstractmethod async get_clients(skip: int | None = None, take: int | None = None, userid: str | int | None | NullType = None) List[OAuthClient][source]

Returns all clients in alphabetical order of client name.

Parameters:
  • skip (int|None) – skip this number of records from the start in alphabetical order

  • take (int|None) – return at most this number of records

  • userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients.

Returns:

A list of OAuthClient objects.

abstractmethod async update_client(client: PartialOAuthClient) None[source]

If the given session key exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ‘InvalidClient’.

Parameters:

client (crossauth_backend.OAuthClient) – all fields to update (client_id must be set but will not be updated)

:raises crossauth_backend.CrossauthError: with ‘InvalidClient’ if the client doesn’t exist.

class crossauth_backend.storage.OAuthClientStorageOptions[source]

Bases: TypedDict

class crossauth_backend.storage.UserAndSecrets[source]

Bases: TypedDict

secrets: UserSecrets
user: User
class crossauth_backend.storage.UserStorage(options: UserStorageOptions = {})[source]

Bases: ABC

Base class for place where user details are stored.

This class is subclassed for various types of user storage, e.g. PrismaUserStorage is for storing username and password in a database table, managed by the Prisma ORM.

Username and email searches should be case insensitive, as should their unique constraints. ID searches need not be case insensitive.

property admin_editable_fields
async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]

Creates a user with the given details and secrets.

Parameters:
Returns:

the new user as a User object

:raises crossauth_backend.CrossauthError with ErrorCode Configuration

abstractmethod async delete_user_by_id(id: str | int) None[source]

If the storage supports this, delete the user with the given ID from storage.

Parameters:

id (str|int) – id of user to delete

abstractmethod async delete_user_by_username(username: str) None[source]

If the storage supports this, delete the named user from storage.

Parameters:

username (str) – username to delete

abstractmethod async get_user_by(field: str, value: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given field, or throws an exception.

This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.

Parameters:
  • field (str) – the field to match

  • value (str) – the value to match

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

abstractmethod async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given email address, or throws an exception.

If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column

Parameters:
  • email – the email address to return the user of

  • options – optionally turn off checks. Used internally

Raises:

crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection

abstractmethod async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given user id, or throws an exception.

Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.

Parameters:
  • id (str|int) – the user id to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError with ErrorCode either UserNotExist or Connection

abstractmethod async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given username, or throws an exception.

If normalize_username is true, the username should be matched normalized and lowercased (using normalize())

Parameters:
  • username (str) – the username to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

abstractmethod async get_users(skip: int | None = None, take: int | None = None) List[User][source]

Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)

Parameters:
  • skip (int|None) – skip this number of records from the start of the set

  • take (int|None) – only return at most this number of records

Returns:

an array of User objects

static normalize(string: str) str[source]

By default, usernames and emails are stored in lowercase, normalized format. This function returns that normalization.

Parameters:

string (str) – the string to normalize

Returns:

the normalized string, in lowercase with diacritics removed

property normalize_email
property normalize_username
abstractmethod async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]

Updates an existing user with the given details and secrets.

If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.

Parameters:
  • user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.

  • secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update

property user_editable_fields
class crossauth_backend.storage.UserStorageGetOptions[source]

Bases: TypedDict

Passed to get methods :class: UserStorage.

skip_active_check: bool

If true, a valid user will be returned even if state is not set to active

skip_email_verified_check: bool

If true, a valid user will be returned even if state is set to awaitingemailverification

class crossauth_backend.storage.UserStorageOptions[source]

Bases: TypedDict

Options passed to :class: UserStorage constructor

admin_editable_fields: List[str]

Fields that admins are allowed to edit (in addition to userEditableFields)

normalize_email: bool

If true, email addresses (in the email column not in the username column) will be matched as lowercase and with diacritics removed. Default true.

normalize_username: bool

If true, usernames will be matched as lowercase and with diacritics removed. Default true,

Note: this doesn’t apply to the ID column

user_editable_fields: List[str]

Fields that users are allowed to edit. Any fields passed to a create or update call that are not in this list will be ignored.

crossauth_backend.utils module

class crossauth_backend.utils.MapGetter[source]

Bases: Generic[T]

classmethod get(mapping: Mapping[str, Any], field: str, default: T) T[source]
classmethod get_or_none(mapping: Mapping[str, Any], field: str) T | None[source]
classmethod get_or_null(mapping: Mapping[str, Any], field: str) T | NullType[source]
classmethod get_or_raise(mapping: Mapping[str, Any], field: str) T[source]
class crossauth_backend.utils.ParamType(*values)[source]

Bases: Enum

Type of parameter that can be parsed from an option value or environment variable

Boolean = 4
Integer = 3
Json = 5
JsonArray = 6
Number = 2
String = 1
crossauth_backend.utils.attr_name(instance: Any, param: str, public: bool = False, protected: bool = False)[source]
crossauth_backend.utils.get_option(param: str, options: Mapping[str, Any]) Any[source]
crossauth_backend.utils.has_option(param: str, options: Mapping[str, Any]) bool[source]
crossauth_backend.utils.set_from_env(instance: Any, param: str, param_type: ParamType, name_in_env_file: str, public: bool = False, protected: bool = False) None[source]
crossauth_backend.utils.set_from_option(instance: Any, param: str, options: Mapping[str, Any], public: bool = False, protected: bool = False) None[source]
crossauth_backend.utils.set_parameter(param: str, param_type: ParamType, instance: Any, options: Mapping[str, Any], env_name: str | None = None, required: bool = False, public: bool = False, protected: bool = False) None[source]

Sets an instance variable in the passed object from the passed options object and environment variable.

If the named parameter exists in the options object, then the instance variable is set to that value. Otherwise, if the named environment variable exists, it is set from that. Otherwise, the instance variable is not updated.

Parameters:
  • param (str) – The name of the parameter in the options variable and the name of the variable in the instance.

  • param_type (ParamType) – The type of variable. If the value is JsonArray or Json, both the option and the environment variable value should be a string, which will be parsed.

  • instance (Any) – Options present in the options or environment variables will be set on a corresponding instance variable in this class or object.

  • options (Dict[str, Any]) – Object containing options as key/value pairs.

  • env_name (str) – Name of environment variable.

  • required (bool) – If true, an exception will be thrown if the variable is not present in options or the environment variable.

  • public (bool) – If false, _ will be prepended to the field nam,e in the target. Default False.

  • protected (bool) – If true, __ will be prepended to the field nam,e in the target. Default False. Don’t use this and public together.

Raises:

crossauth_backend.CrossauthError: with ErrorCode Configuration if required is set but the option was not present, or if there was a parsing error.

Module contents

class crossauth_backend.ApiKey[source]

Bases: Key

An API key is a str that can be used in place of a username and password. These are not automatically created, like OAuth access tokens.

created: datetime
data: NotRequired[str]
expires: datetime | NullType
lastactive: NotRequired[datetime]
name: str

A name for the key, unique to the user

userid: NotRequired[str | int | NullType]
value: str
class crossauth_backend.AuthenticationOptions[source]

Bases: TypedDict

Options to pass to the constructor.

friendly_name: str

If passed, this is what will be displayed to the user when selecting an authentication method.

class crossauth_backend.AuthenticationParameters[source]

Bases: UserSecretsInputFields

Parameters needed for this this class to authenticator a user (besides username) An example is password

expiry: int
otp: str
password: str
totpsecret: str
class crossauth_backend.Authenticator(options: AuthenticationOptions = {})[source]

Bases: ABC

Base class for username/password authentication.

Subclass this if you want something other than PBKDF2 password hashing.

abstractmethod async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]
abstractmethod can_create_user() bool[source]
abstractmethod can_update_secrets() bool[source]
abstractmethod can_update_user() bool[source]
capabilities() AuthenticatorCapabilities[source]
abstractmethod async create_one_time_secrets(user: User) UserSecretsInputFields[source]
abstractmethod async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]
factor_name: str = ''
friendly_name: str
abstractmethod mfa_channel() str[source]
abstractmethod mfa_type() str[source]
abstractmethod async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]
abstractmethod async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]
require_user_entry() bool[source]

If your authenticator doesn’t need a user to be in the table (because it can create one), override this and return false. Default is true

abstractmethod secret_names() List[str][source]
abstractmethod skip_email_verification_on_signup() bool[source]
abstractmethod transient_secret_names() List[str][source]
abstractmethod validate_secrets(params: AuthenticationParameters) List[str][source]
class crossauth_backend.AuthenticatorCapabilities[source]

Bases: TypedDict

can_create_user: bool
can_update_secrets: bool
can_update_user: bool
class crossauth_backend.AuthorizeQueryType[source]

Bases: TypedDict

client_id: str
code_challenge: str
code_challenge_method: str
redirect_uri: str
response_type: Required[str]
scope: str
state: str
user: User
exception crossauth_backend.CrossauthError(code: ErrorCode, message: str | list[str] | None = None)[source]

Bases: Exception

Thrown by Crossauth functions whenever it encounters an error.

static as_crossauth_error(e: Any, default_message: str | None = None) CrossauthError[source]

If the passed object is a crossauth_backend.CrossauthError instance, simply returns it. If not and it is an object with errorCode in it, creates a CrossauthError from that and errorMessage, if present. Otherwise creates a crossauth_backend.CrossauthError object with ErrorCode of Unknown from it, setting the message if possible.

Parameters:
  • e (Any) – the error to convert.

  • default_message (str|None) – message to use if there was none in the original exception.

Returns:

a crossauth_backend.CrossauthError instance.

property code_name

Return the name of the error code

static from_oauth_error(error: str, error_description: str | None = None) CrossauthError[source]

OAuth defines certain error types. To convert the error in an OAuth response into a CrossauthError object, call this function.

Parameters:

error (str) – as returned by an OAuth call (converted to an ErrorCode).

:param str error_description as returned by an OAuth call (put in the

message)

:return a crossauth_backend.CrossauthError instance.

static http_status_name(status: str | int) str[source]

Returns the friendly name for an HTTP response code.

If it is not a recognized one, returns the friendly name for 500. @param status the HTTP response code, which, while being numeric,

can be in a string or number.

@returns the string version of the response code.

property oauthErrorCode: str

Return the OAuth name of an error code (eg “server_error”)

class crossauth_backend.CrossauthLogger(level: int | None = None)[source]

Bases: CrossauthLoggerInterface

A very simple logging class with no dependencies.

Logs to console.

The logging API is designed so that you can replace this with other common loggers, eg Pino. To change it, use the global CrossauthLogger.set_logger() function. This has a parameter to tell Crossauth whether your logger accepts JSON input or not.

When writing logs, we use the helper function j() to send JSON to the logger if it is supprted, and a stringified JSON otherwise.

Crossauth logs

All Crossauth log messages are JSON (or stringified JSON, depending on whether the logger supports JSON input - this one does). The following fields may be present depending on context (msg is always present):

  • msg : main contents of the log

  • erran error object. If a subclass of Error, it wil contain at least message and

    a stack trace in stack. If the error is of type crossauth_backend.CrossauthError it also will also contain code and http_status.

  • hashedSessionCookiefor security reasons, session cookies are not included in logs.

    However, so that errors can be correlated with each other, a hash of it is included in errors originating from a session.

  • hashedCsrfCookiefor security reasons, csrf cookies are not included in logs.

    However, so that errors can be correlated with each other, a hash of it is included in errors originating from a session.

  • user : username

  • emailMessageId : internal id of any email that is sent

  • email : email address

  • userid : sometimes provided in addition to username, or when username not available

  • hahedApiKeya hash of an API key. The unhashed version is not logged for security,

    but a hash of it is logged for correlation purposes.

  • headeran HTTP header that relates to an error (eg Authorization), only if

    it is non-secret or invalid

  • accessTokenHashhash of the JTI of an access token. For security reasons, the

    unhashed version is not logged.

  • method: request method (GET, PUT etc)

  • url : relevant URL

  • ip : relevant IP address

  • scope : OAuth scope

  • error_code : Crossauth error code

  • error_code_name : String version of Crossauth error code

  • http_status : HTTP status that will be returned

  • port port service is running on (only for starting a service)

  • prefix prefix for endpoints (only when starting a service)

  • authorized whether or not a valid OAuth access token was provided

debug(output: Any) None[source]

Log a debug message

error(output: Any) None[source]

Log an error

info(output: Any) None[source]

Log an info message

levelName = ['NONE', 'ERROR', 'WARN', 'INFO', 'DEBUG']
static logger() CrossauthLoggerInterface[source]

Returns the static logger instance

rossauth_logger_accepts_json = True
set_level(level: int) None[source]

Set the level to report down to

static set_logger(logger: CrossauthLoggerInterface, accepts_json: bool) None[source]

Set the static logger instance

warn(output: Any) None[source]

Log a warning

class crossauth_backend.Crypto[source]

Bases: object

Provides cryptographic functions

Base32 = 'ABCDEFGHJKLMNPQRSTUVWXYZ23456789'
static base64_decode(encoded: str) str[source]

Decodes a string from base64 to UTF-8 :param str encoded: base64-encoded text

Returns:

URF-8 text

static base64_encode(text: str) str[source]

Base64-encodes UTF-8 text :param str: text UTF-8 text

Returns:

Base64 text

static base64_pad(s: str) str[source]
static base64_to_base64url(s: str) str[source]
static base64url_to_base64(s: str) str[source]
static base64url_to_str(s: str) str[source]
static decode_password_hash(hash: str) PasswordHash[source]

Splits a hashed password into its component parts. Return it as a PasswordHash.

The format of the hash should be ` digest:int key_len:iterations:useSecret:salt:hashedPassword ` The hashed password part is the Base64 encoding of the PBKDF2 password. :param str hash: the hassed password to decode. See above for format

Returns:

PasswordHash object containing the deecoded hash components

static encode_password_hash(hashed_password: str, salt: str, use_secret: bool, iterations: int, key_len: int, digest: str) str[source]

Encodes a hashed password into the string format it is stored as.

See decodePasswordHash() for the format it is stored in.

Parameters:
  • hashed_password (str) – the Base64-encoded PBKDF2 hash of the password

  • salt (str) – the salt used for the password.

  • use_secret (bool) – whether or not to use the application secret as part of the hash.

  • iterations (int) – the number of PBKDF2 iterations

  • key_len (int) – the key length PBKDF2 parameter - results in a hashed password this length, before Base64,

  • digest (str) – The digest algorithm, eg pbkdf2

Returns:

a string encode the above parameters.

static hash(plaintext: str) str[source]

Standard hash using SHA256 (not PBKDF2 or HMAC)

Parameters:

plaintext (:str) – text to hash

Returns:

the string containing the hash

async static password_hash(plaintext: str, options: HashOptions = {}) str[source]

Hashes a password and returns it as a base64 or base64url encoded string :param str plaintext: password to hash :param HashOptions options:

  • salt: salt to use. Make a random one if not passed

  • secret: optional application secret password to apply as a pepper

  • encode: if true, returns the full string as it should be stored in the database.

Returns:

the string containing the hash and the values to decode it

async static passwords_equal(plaintext: str, encoded_hash: str, secret: str | None = None) bool[source]

Returns true if the plaintext password, when hashed, equals the one in the hash, using it’s hasher settings :param str plaintext: the plaintext password :param str encoded_hash: the previously-hashed version :param str|None secret: if useHash`in `encodedHash is true, uses as a pepper for the hasher

Returns:

true if they are equal, false otherwise

static random_base32(length: int, dash_every: int | None = None) str[source]

Creates a random base-23 string :param int length: length of the string to create

Returns:

the random value as a string. Number of bytes will be greater as it is base64 encoded.

static random_salt() str[source]

Creates a random salt

Returns:

random salt as a base64 encoded string

static random_value(length: int) str[source]

Creates a random string encoded as in base64url :param int length: length of the string to create

Returns:

the random value as a string. Number of bytes will be greater as it is base64 encoded.

static sha256(plaintext: str) str[source]

Standard hash using SHA256 (not PBKDF2 or HMAC)

Parameters:

plaintext (str) – text to hash

Returns:

the string containing the hash

static sign(payload: Dict[str, Any], secret: str, salt: str | None = None, timestamp: int | None = None) str[source]

Signs a JSON payload by creating a hash, using a secret and optionally also a salt and timestamp

Parameters:
  • payload (Dict[str, Any]) – object to sign (will be stringified as a JSON)

  • secret (str) – secret key, which must be a string

  • salt (str|None) – optionally, a salt to concatenate with the payload (must be a string)

Paramint|None timestamp:

optionally, a timestamp to include in the signed date as a Unix date

Returns:

Base64-url encoded hash

static sign_secure_token(payload: str, secret: str) str[source]

This can be called for a string payload that is a cryptographically secure random string. No salt is added and the token is assumed to be Base64Url already

Parameters:
  • payload (str) – string to sign

  • secret (str) – the secret to sign with

Returns:

Base64-url encoded hash

static signable_token(payload: Dict[str, Any], salt: str | None = None, timestamp: int | None = None) str[source]

hash is of a JSON containing the payload, timestamp and optionally a salt. :param Dict[str, Any]: payload the payload to hash :param str|None salt: optional salt (use if the payload is small) :param int|None timestamp: time the token will expire

Returns:

a Base64-URL-encoded string that can be hashed.

static str_to_base64url(s: str) str[source]
static symmetric_decrypt(ciphertext: str, key_string: str) str[source]

Symmetric decryption using a key that must be a string

Parameters:
  • ciphertext (str) – Base64-url encoded ciphertext

  • key_string (str) – the symmetric key

Returns:

Decrypted text

static symmetric_encrypt(plaintext: str, key_string: str, iv: bytes | None = None) str[source]

Symmetric encryption using a key that must be a string

Parameters:
  • plaintext (str) – Text to encrypt

  • key_string (str) – the symmetric key

  • bytes|None (iv) – the iv value. In None, a random one is created

Returns:

Encrypted text Base64-url encoded.

static unsign(signed_message: str, secret: str, expiry: int | None = None) Dict[str, Any][source]

Validates a signature and, if valid, return the unstringified payload :param str signed_message: signed message (base64-url encoded) :param str secret: secret key, which must be a string :param int|None expiry: if set, validation will fail if the timestamp in the payload is after this date

Returns:

if signature is valid, the payload as an object

:raises crossauth_backend.CrossauthError: with

ErrorCode of InvalidKey if signature is invalid or has expired.

static unsign_secure_token(signed_message: str, secret: str, expiry: int | None = None) str[source]

Validates a signature signed with signSecureToken and, if valid, return the unstringified payload :param str signed_message: signed message (base64-url encoded) :param str secret: secret key, which must be a string

Returns:

if signature is valid, the payload as a string

:raises crossauth_backend.CrossauthError: with

{ErrorCode of InvalidKey if signature is invalid or has expired.

static uuid() str[source]

Creates a UUID

static xor(value: str, mask: str) str[source]

XOR’s two arrays of base64url-encoded strings :param str value: to XOR :param str mask: mask to XOR it with

Returns:

an XOR’r string

class crossauth_backend.DoubleSubmitCsrfToken(options: DoubleSubmitCsrfTokenOptions = {})[source]

Bases: object

Class for creating and validating CSRF tokens according to the double-submit cookie pattern.

CSRF token is send as a cookie plus either a header or a hidden form field.

property cookie_name
create_csrf_token() str[source]

Creates a session key and saves in storage

Date created is the current date/time on the server.

Returns:

a random CSRF token.

property domain
property header_name
property httpOnly

Returns a Cookie object with the given session key.

Parameters:

token (str) – the value of the csrf token, with signature

:return a Cookie object,

Takes a session ID and creates a string representation of the cookie (value of the HTTP Cookie header).

:param str cookie_value the value to put in the cookie

Returns:

a string representation of the cookie and options.

make_csrf_form_or_header_token(token: str) str[source]
mask_csrf_token(token: str) str[source]
property path
property sameSite
property secure
unmask_csrf_token(mask_and_token: str) str[source]

Validates the passed CSRF cookie (doesn’t check it matches the token, just that the cookie is valid).

To be valid:
  • The signature in the cookie must match the token in the cookie

  • The token in the cookie must matched the value in the form or header after unmasking

Parameters:

cookie_value (str) – the CSRF cookie value to validate.

:raises crossauth_backend.CrossauthError with ErrorCode of InvalidKey

validate_double_submit_csrf_token(cookie_value: str, form_or_header_name: str) None[source]

Validates the passed CSRF token.

To be valid:
  • The signature in the cookie must match the token in the cookie

  • The token in the cookie must matched the value in the form or header after unmasking

Parameters:

cookie_value (str) – the CSRF cookie value to validate.

:param str form_or_header_name the value from the csrf_token form header or the X-CROSSAUTH-CSRF header.

:raises crossauth_backend.CrossauthError with ErrorCode of InvalidKey

class crossauth_backend.DoubleSubmitCsrfTokenOptions[source]

Bases: CookieOptions

Options for double-submit csrf tokens

cookie_name: NotRequired[str]
domain: str
expires: datetime
header_name: NotRequired[str]
httpOnly: bool
maxAge: int
path: str
sameSite: bool | Literal['lax', 'strict', 'none']
secret: NotRequired[str]
secure: bool
class crossauth_backend.DummyFactor2Authenticator(code: str, options: DummyFactor2AuthenticatorOptions = {})[source]

Bases: PasswordAuthenticator

This authenticator creates fixed one-time code

async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]

Authenticates the user by comparing the user-provided otp with the one in secrets.

Validation fails if the otp is incorrect or has expired.

Parameters:
  • user – ignored

  • secrets – taken from the session and should contain otp and expiry

  • params – user input and should contain otp

Raises:

CrossauthError – with ErrorCode InvalidToken or Expired.

can_create_user() bool[source]

returns false

can_update_secrets() bool[source]

returns false

can_update_user() bool[source]

returns false

property code
async create_one_time_secrets(user: User) UserSecretsInputFields[source]

Creates and emails a new one-time code.

Parameters:

user – ignored

:return Dictionary with ‘otp’ and ‘expiry’ as a Unix time (number).

async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]

Does nothing for this class

mfa_channel() str[source]

Returns email

mfa_type() str[source]

Returns oob

async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]
async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]

Reprepare configuration for 2FA authentication

Parameters:
  • username – Username (unused parameter indicated by underscore prefix)

  • session_key – Key object containing session data

:return Dictionary with user_data, secrets, and new_session_data, or None

secret_names() List[str][source]

Returns emty list

skip_email_verification_on_signup() bool[source]

:return false

transient_secret_names() List[str][source]

Returns otp

validate_secrets(params: AuthenticationParameters) List[str][source]

Nothing to do for this class. Returns empty set

class crossauth_backend.DummyFactor2AuthenticatorOptions[source]

Bases: AuthenticationOptions

Optional parameters for :class: DummyFactor2Authenticator.

See :func: DummyFactor2Authenticator__init__ for details

friendly_name: str
class crossauth_backend.DummySmsAuthenticator(options: SmsAuthenticatorOptions = {})[source]

Bases: SmsAuthenticator

This authenticator mocks sending an SMS OTP

class crossauth_backend.EmailAuthenticator(options: EmailAuthenticatorOptions = {})[source]

Bases: Authenticator

This authenticator sends a one-time code by email

async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]

Authenticates the user by comparing the user-provided otp with the one in secrets.

Validation fails if the otp is incorrect or has expired.

:param _user ignored :param secrets taken from the session and should contain otp and

expiry

:param params user input and should contain otp :raise CrossauthError with ErrorCode InvalidToken or Expired.

can_create_user() bool[source]

:return true - this class can create users

can_update_secrets() bool[source]

:return false - users cannot update secrets

can_update_user() bool[source]

:return true - this class can update users

async create_one_time_secrets(user: User) UserSecretsInputFields[source]

Creates and emails a new one-time code. @param user the user to create it for. Uses the email field if

present, username otherwise (which in this case is expected to contain an email address)

:return otp and expiry as a Unix time (number).

async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]

Does nothing for this class

static is_email_valid(email: str) bool[source]

Returns whether or not the passed email has a valid form. @param email the email address to validate

:return true if it is valid. false otherwise

mfa_channel() Literal['none', 'email', 'sms'][source]

Used by the OAuth password_mfa grant type.

mfa_type() Literal['none', 'oob', 'otp'][source]

Used by the OAuth password_mfa grant type.

async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]

Creates and emails the one-time code @param user the user to create it for. Uses the email field if

present, username otherwise (which in this case is expected to contain an email address)

:return userData containing username, email, factor2
sessionData containing the same plus otp and expiry which

is a Unix time (number).

async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]

Creates and emails a new one-time code. :param _username ignored :param sessionKey the session containing the previously created data. :return dict

secret_names() List[str][source]

:return empty - this authenticator has no persistent secrets

skip_email_verification_on_signup() bool[source]
:return true - as a code is sent to the registers email address, no

additional email verification is needed

transient_secret_names() List[str][source]

:return otp

static validate_email(email: str | None) None[source]

Throws an exception if an email address doesn’t have a valid form. @param email the email address to validate @throws CrossauthError with ErrorCode InvalidEmail.

validate_secrets(params: AuthenticationParameters) List[str][source]

Does nothing for this class

static zero_pad(num: int, places: int) str[source]

Takes a number and turns it into a zero-padded string @param num number to pad @param places total number of required digits

:return zero-padded string

class crossauth_backend.EmailAuthenticatorOptions[source]

Bases: AuthenticationOptions

Optional parameters for :class: EmailAuthenticator.

See :func: EmailAuthenticator__init__ for details

email_authenticator_html_body: str

Template file containing page for producing the HTML version of the email verification email body

email_authenticator_subject: str

Subject for the the email verification email

email_authenticator_text_bod: str

Template file containing page for producing the text version of the email verification email body

email_authenticator_token_expires: int

Number of seconds before otps should expire. Default 5 minutes

email_from: str

Sender for emails

friendly_name: str
render: Callable[[str, Dict[str, Any]], str]

if passed, use this instead of the default nunjucks renderer

smtp_host: str

Hostname of the SMTP server. No default - required parameter

smtp_password: str

Password for connecting to SMTP servger. Default undefined

smtp_port: int

Port the SMTP server is running on. Default 25

smtp_use_tls: bool

Whether or not TLS is used by the SMTP server. Default false

smtp_username: str

Username for connecting to SMTP servger. Default undefined

views: str

The directory containing views (by default, Jinja2 templates)

class crossauth_backend.ErrorCode(*values)[source]

Bases: Enum

Indicates the type of error reported by crossauth_backend.CrossauthError

AuthorizationPending = 44

Thrown in the OAuth device code flow

BadRequest = 43

Thrown when an invalid request is made, eg configure 2FA when 2FA is switched off for user

ClientExists = 6

This is returned if attempting to make a client which already exists (client_id or name/userid)

Configuration = 31

Thrown when something is missing or inconsistent in configuration

Connection = 24

Thrown when there is a connection error, eg to a database

ConstraintViolation = 47

Thrown in database handlers where an insert causes a constraint violation

DataFormat = 39

Thrown when a the data field of key storage is not valid json

EmailNotExist = 3

Thrown when a a password reset is requested and the email does not exist

EmailNotVerified = 12

Thrown on login attempt with a user account marked not having had the email address validated

Expired = 23

Thrown when a session or API key has expired

ExpiredToken = 46

Thrown in the OAuth device code flow

Factor2ResetNeeded = 30

Thrown if the user needs to reset factor2 before logging in

FetchError = 40

Thrown if a fetch failed

Forbidden = 19

Returned with an HTTP 403 response

FormEntry = 42

Thrown by user-supplied validation functions if a user details form was incorrectly filled out

InsufficientPriviledges = 18

Returned if user is valid but doesn’t have permission to access resource

InsufficientScope = 17

Thrown for the OAuth insufficient_scope error

InvalidClientId = 5

This is returned if an OAuth2 client id is invalid

InvalidClientIdOrSecret = 8

Server endpoints in this package will return this instead of InvalidClientId or InvalidClientSecret for security purposes

InvalidClientSecret = 7

This is returned if an OAuth2 client secret is invalid

InvalidCsrf = 21

Thrown if the CSRF token is invalid

InvalidEmail = 32

Thrown if an email address in invalid

InvalidHash = 25

Thrown when a hash, eg password, is not in the given format

InvalidKey = 20

Thrown when a session or API key was provided that is not in the key table. For CSRF and sesison key, an InvalidCsrf or InvalidSession will be thrown instead

InvalidOAuthFlow = 10

This is returned a request is made with a an oauth flow name that is not recognized

InvalidPhoneNumber = 33

Thrown if a phone number in invalid

InvalidRedirectUri = 9

This is returned a request is made with a redirect Uri that is not registered

InvalidScope = 16

Thrown for the OAuth invalid_scope error

InvalidSession = 22

Thrown if the session cookie is invalid

InvalidToken = 36

Thrown when a token (eg TOTP or OTP) is invalid

InvalidUsername = 34

Thrown if an email address in invalid

KeyExists = 27

Thrown if you try to create a key which already exists in key storage

MfaRequired = 37

Thrown during OAuth password flow if an MFA step is needed

NotImplemented = 48

Thrown if a method is unimplemented, typically when a feature is not yet supported in this language.

PasswordChangeNeeded = 28

Thrown if the user needs to reset his or her password

PasswordFormat = 38

Thrown when a password does not match rules (length, uppercase/lowercase/digits)

PasswordInvalid = 2

Thrown when a password does not match, eg during login or signup

PasswordMatch = 35

Thrown when two passwords do not match each other (eg signup)

PasswordResetNeeded = 29

Thrown if the user needs to reset his or her password

SlowDown = 45

Thrown in the OAuth device code flow

TwoFactorIncomplete = 13

Thrown on login attempt with a user account marked not having completed 2FA setup

Unauthorized = 14

Thrown when a resource expecting user authorization was access and authorization not provided or wrong

UnauthorizedClient = 15

Thrown for the OAuth unauthorized_client error (when the client is unauthorized as opposed to the user)

UnknownError = 50

Thrown for an condition not convered above.

UnsupportedAlgorithm = 26

Thrown when an algorithm is requested but not supported, eg hashing algorithm

UserExists = 41

Thrown when attempting to create a user that already exists

UserNotActive = 11

Thrown on login attempt with a user account marked inactive

UserNotExist = 1

Thrown when a given username does not exist, eg during login

UsernameOrPasswordInvalid = 4

For endpoints provided by servers in this package, this is returned instead of UserNotExist or PasswordNotMatch, for security reasons

ValueError = 49

Thrown a dict field is unexpectedly missing or wrong type

class crossauth_backend.HashOptions[source]

Bases: TypedDict

Option parameters for Crypto.passwordHash

digest: str

PBKDF2 digest method

encode: bool

Whether to Base64-URL-encode the result

iterations: int

Number of PBKDF2 iterations

key_len: int

Length (before Base64-encoding) of the PBKDF2 key being generated

salt: str

A salt to prepend to the message before hashing

secret: str

A secret to append to the salt when hashing, or undefined for no secret

class crossauth_backend.InMemoryKeyStorage[source]

Bases: KeyStorage

Implementation of KeyStorage where keys stored in memory. Intended for testing.

async delete_all_for_user(userid: str | int | None, prefix: str, except_key: str | None = None) None[source]

Deletes all keys from storage for the given user ID

Parameters:
  • userid (int|str|None) – User ID to delete keys for

  • prefix (str) – Only keys starting with this prefix will be deleted

  • except_key (str|None) – If defined, the key with this value will not be deleted

async delete_data(key_name: str, data_name: str) None[source]

The ‘data’ field in a key entry is a JSON string. This method should atomically delete a field in it.

Parameters:
  • key_name (str) – The name of the key to update, as it appears in the table.

  • data_name (str) – The field name to delete. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is deleted.

async delete_key(value: str) None[source]

Deletes a key from storage (e.g., the database).

Parameters:

value (str) – The key to delete

async delete_matching(key: PartialKey) None[source]

Deletes all matching the given specs

Parameters:

key (crossauth_backend.PartialKey) – Any key matching all defined values in this object will be deleted

async get_all_for_user(userid: str | int | None = None) List[Key][source]

Return all keys matching the given user ID

Parameters:

userid (str|int|None) – User to return keys for

Returns:

List[Key]: An array of keys

async get_key(key: str) Key[source]

Returns the matching key in the session storage or raises an exception if it doesn’t exist.

Parameters:

key (ster) – The key to look up, as it will appear in this storage (typically unsigned, hashed)

Returns:

The matching Key record.

print()[source]
async save_key(userid: str | int | None, value: str, date_created: datetime, expires: datetime | None = None, data: str | None = None, extra_fields: Mapping[str, Any] | None = None) None[source]

Saves a session key in the session storage (e.g., database).

Parameters:
  • userid (int|str|None) – The ID of the user. This matches the primary key in the UserStorage implementation.

  • value (str) – The key value to store.

  • date_created (datetime) – The date/time the key was created.

  • expires (datetime|None) – The date/time the key expires.

  • extra_fields (Mapping[str, Any]|None) – These will also be saved in the key record

Padam str|None data:

An optional value, specific to the type of key, e.g., new email for email change tokens

async update_data(key_name: str, data_name: str, value: Any | None) None[source]

The ‘data’ field in a key entry is a JSON string. This method should atomically update a field in it.

Parameters:
  • key_name (str) – The name of the key to update, as it appears in the table.

  • data_name (str) – The field name to update. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is updated.

  • value (Any|None) – The new value.

async update_key(key: PartialKey) None[source]

If the given session key exists in the database, update it with the passed values. If it doesn’t exist, raise a CrossauthError with ErrorCode ‘InvalidKey’.

Parameters:

key (crossauth_backend.PartialKey) – The fields defined in this will be updated. ‘id’ must be present and it will not be updated.

async update_many_data(key_name: str, data_array: List[KeyDataEntry]) None[source]

Same as ‘update_data’ but updates several keys.

Ensure it is done as a single transaction.

Parameters:
class crossauth_backend.InMemoryOAuthAuthorizationStorage(options: OAuthAuthorizationStorageOptions = {})[source]

Bases: OAuthAuthorizationStorage

Implementation of KeyStorage where keys stored in memory. Intended for testing.

async get_authorizations(client_id: str, userid: str | int | None = None) List[str | None][source]

Returns the matching all scopes authorized for the given client and optionally user.

Parameters:
  • client_id (str) – the client_id to look up

  • userid (int|str|None) – the userid to look up, None for a client authorization not user authorization

Returns:

The authorized scopes as a list.

async update_authorizations(client_id: str, userid: str | int | None, authorizations: List[str | None]) None[source]

Saves a new set of authorizations for the given client and optionally user.

Deletes the old ones.

Parameters:
  • client_id (str) – the client_id to look up

  • userid (str|int|None) – the userid to look up, None for a client authorization not user authorization

  • List[str|None]authorizations – new set of authorized scopes, which may be empty

class crossauth_backend.InMemoryOAuthClientStorage(options: OAuthClientStorageOptions = {})[source]

Bases: OAuthClientStorage

Implementation of KeyStorage where keys stored in memory. Intended for testing.

async create_client(client: OAuthClient) OAuthClient[source]

Creates and returns a new client with random ID and optionally secret.

Saves in the database.

Parameters:

client (crossauth_backend.OAuthClient) – the client to save.

Returns:

The new client.

async delete_client(client_id: str) None[source]

Deletes a key from storage.

Parameters:

client_id (str) – the client to delete

async get_client_by_id(client_id: str) OAuthClient[source]

Returns the matching client by its auto-generated id in the storage or throws an exception if it doesn’t exist.

Parameters:

client_id (str) – the client_id to look up

Returns:

he matching OAuthClient object.

async get_client_by_name(name: str, userid: str | int | None | NullType = None) List[OAuthClient][source]

Returns the matching client in the storage by friendly name or throws an exception if it doesn’t exist.

Parameters:
  • name (str) – the client name to look up

  • userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients with this name.

Returns:

A list of OAuthClient objects.

:raises crossauth_backend.CrossauthError: with ErrorCode of ‘InvalidSessionId’ if a match was not found in session storage.

async get_clients(skip: int | None = None, take: int | None = None, userid: str | int | None | NullType = None) List[OAuthClient][source]

Returns all clients in alphabetical order of client name.

Parameters:
  • skip (int|None) – skip this number of records from the start in alphabetical order

  • take (int|None) – return at most this number of records

  • userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients.

Returns:

A list of OAuthClient objects.

async update_client(client: PartialOAuthClient) None[source]

If the given session key exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ‘InvalidClient’.

Parameters:

client (crossauth_backend.OAuthClient) – all fields to update (client_id must be set but will not be updated)

:raises crossauth_backend.CrossauthError: with ‘InvalidClient’ if the client doesn’t exist.

class crossauth_backend.InMemoryUserStorage(options: InMemoryUserStorageOptions = {})[source]

Bases: UserStorage

Implementation of KeyStorage where keys stored in memory. Intended for testing.

async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]

Creates a user with the given details and secrets.

Parameters:
Returns:

the new user as a User object

:raises crossauth_backend.CrossauthError with ErrorCode Configuration

async delete_user_by_id(id: str | int) None[source]

If the storage supports this, delete the user with the given ID from storage.

Parameters:

id (str|int) – id of user to delete

async delete_user_by_username(username: str) None[source]

If the storage supports this, delete the named user from storage.

Parameters:

username (str) – username to delete

async get_user_by(field: str, value: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given field, or throws an exception.

This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.

Parameters:
  • field (str) – the field to match

  • value (str) – the value to match

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given email address, or throws an exception.

If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column

Parameters:
  • email – the email address to return the user of

  • options – optionally turn off checks. Used internally

Raises:

crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection

async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given user id, or throws an exception.

Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.

Parameters:
  • id (str|int) – the user id to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError with ErrorCode either UserNotExist or Connection

async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given username, or throws an exception.

If normalize_username is true, the username should be matched normalized and lowercased (using normalize())

Parameters:
  • username (str) – the username to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

async get_users(skip: int | None = None, take: int | None = None) List[User][source]

Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)

Parameters:
  • skip (int|None) – skip this number of records from the start of the set

  • take (int|None) – only return at most this number of records

Returns:

an array of User objects

async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]

Updates an existing user with the given details and secrets.

If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.

Parameters:
  • user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.

  • secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update

class crossauth_backend.JWT(token: str | None = None, payload: Dict[str, Any] = {})[source]

Bases: object

Encapsulates the payload of a JWT, with both the token and decoded JSON payload.

class crossauth_backend.Jwks[source]

Bases: TypedDict

class crossauth_backend.Key[source]

Bases: TypedDict

A key (eg session ID, email reset token) as stored in a database table.

The fields defined here are the ones used by Crossauth. You may add others.

created: datetime

The datetime/time the key was created, in local time on the server

data: NotRequired[str]

Additional key-specific data (eg new email address for email change).

While application specific, any data Crossauth puts in this field is a strified JSON, with its own key so it can co-exist with other data.

expires: datetime | NullType

The datetime/time the key expires

lastactive: NotRequired[datetime]

The datetime/time key was last used (eg last time a request was made with this value as a session ID)

userid: NotRequired[str | int | NullType]

the user this key is for (or undefined for an anonymous session ID)

It accepts the value null as usually this is the value stored in the database, rather than undefined. Some functions need to differentiate between a null value as opposed to the value not being defined (eg for a partial updatetime).

value: str

The value of the keykey. In a cookie, the value part of cookiename=value; options…

class crossauth_backend.KeyDataEntry[source]

Bases: TypedDict

data_name: str
value: NotRequired[Any]
class crossauth_backend.KeyPrefix[source]

Bases: object

These are used prefixes for keye in a key storage entry

access_token = 'access:'
api_key = 'api:'
authorization_code = 'authz:'
device_code = 'dc:'
email_verification_token = 'e:'
mfa_token = 'omfa:'
password_reset_token = 'p:'
refresh_token = 'refresh:'
session = 's:'
user_code = 'uc:'
class crossauth_backend.KeyStorage[source]

Bases: ABC

Base class for storing session and API keys.

This class is subclassed for various types of session key storage. For example, PrismaKeyStorage is for storing sessions in a database table, managed by the Prisma ORM.

static decode_data(data: str | None) Dict[str, Any][source]

Returns an object decoded from the data field as a JSON string

Parameters:

data (str|None) – The JSON string to decode

Returns:

The parsed JSON object

Raises:

json.JSONDecodeError – If data is not a valid JSON string

abstractmethod async delete_all_for_user(userid: str | int | None, prefix: str, except_key: str | None = None) None[source]

Deletes all keys from storage for the given user ID

Parameters:
  • userid (int|str|None) – User ID to delete keys for

  • prefix (str) – Only keys starting with this prefix will be deleted

  • except_key (str|None) – If defined, the key with this value will not be deleted

abstractmethod async delete_data(key_name: str, data_name: str) None[source]

The ‘data’ field in a key entry is a JSON string. This method should atomically delete a field in it.

Parameters:
  • key_name (str) – The name of the key to update, as it appears in the table.

  • data_name (str) – The field name to delete. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is deleted.

abstractmethod async delete_key(value: str) None[source]

Deletes a key from storage (e.g., the database).

Parameters:

value (str) – The key to delete

abstractmethod async delete_matching(key: PartialKey) None[source]

Deletes all matching the given specs

Parameters:

key (crossauth_backend.PartialKey) – Any key matching all defined values in this object will be deleted

static encode_data(data: Dict[str, Any] | None = None) str[source]

Returns a JSON string encoded from the given object

Parameters:

data (Dict[str, Any]|None) – The object to encode

Returns:

A JSON string

abstractmethod async get_all_for_user(userid: str | int | None = None) List[Key][source]

Return all keys matching the given user ID

Parameters:

userid (str|int|None) – User to return keys for

Returns:

List[Key]: An array of keys

abstractmethod async get_key(key: str) Key[source]

Returns the matching key in the session storage or raises an exception if it doesn’t exist.

Parameters:

key (ster) – The key to look up, as it will appear in this storage (typically unsigned, hashed)

Returns:

The matching Key record.

abstractmethod async save_key(userid: str | int | None, value: str, date_created: datetime, expires: datetime | None = None, data: str | None = None, extra_fields: Mapping[str, Any] | None = None) None[source]

Saves a session key in the session storage (e.g., database).

Parameters:
  • userid (int|str|None) – The ID of the user. This matches the primary key in the UserStorage implementation.

  • value (str) – The key value to store.

  • date_created (datetime) – The date/time the key was created.

  • expires (datetime|None) – The date/time the key expires.

  • extra_fields (Mapping[str, Any]|None) – These will also be saved in the key record

Padam str|None data:

An optional value, specific to the type of key, e.g., new email for email change tokens

abstractmethod async update_data(key_name: str, data_name: str, value: Any | None) None[source]

The ‘data’ field in a key entry is a JSON string. This method should atomically update a field in it.

Parameters:
  • key_name (str) – The name of the key to update, as it appears in the table.

  • data_name (str) – The field name to update. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is updated.

  • value (Any|None) – The new value.

abstractmethod async update_key(key: PartialKey) None[source]

If the given session key exists in the database, update it with the passed values. If it doesn’t exist, raise a CrossauthError with ErrorCode ‘InvalidKey’.

Parameters:

key (crossauth_backend.PartialKey) – The fields defined in this will be updated. ‘id’ must be present and it will not be updated.

abstractmethod async update_many_data(key_name: str, data_array: List[KeyDataEntry]) None[source]

Same as ‘update_data’ but updates several keys.

Ensure it is done as a single transaction.

Parameters:
class crossauth_backend.LdapAuthenticator(ldap_storage: LdapUserStorage, options: LdapAuthenticatorOptions = {})[source]

Bases: PasswordAuthenticator

async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]

Authenticates the user, returning a the user as a {@link User} object.

@param user the username field is required and this is used for LDAP authentication.

If ldapAutoCreateAccount is true, these attributes as used for user creation (see {@link LdapUserStorage.createUser}).

@param _secrets Ignored as secrets are stored in LDAP @param params the password field is expected to contain the LDAP password. @throws {@link @crossauth/common!CrossauthError} with {@link @crossauth/common!ErrorCode} of Connection, UsernameOrPasswordInvalid.

can_create_user() bool[source]

Returns true

can_update_secrets() bool[source]

Returns false

can_update_user() bool[source]

Returns true

async create_one_time_secrets(user: User) UserSecretsInputFields[source]

Does nothing in this class

async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]

Does nothing in this class

async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]

Nothing to do in this class

async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]

Nothing to do in this class

skip_email_verification_on_signup() bool[source]

Returns false

validate_secrets(params: AuthenticationParameters) List[str][source]

Does nothing as LDAP is responsible for password format (this class doesn’t create password entries)

class crossauth_backend.LdapAuthenticatorOptions[source]

Bases: AuthenticationOptions

Optional parameters for :class: LdapAuthenticator.

See :func: LdapAuthenticator__init__ for details

friendly_name: str
ldap_auto_create_account: bool

If true, an account will automatically be created (with factor1 taken from ldap_auto_create_factor1 when a user logs in with LDAP)

ldap_auto_create_factor1: str

crossauth_backend.LdapAuthenticatorOptions

Type:

See

Type:

class

class crossauth_backend.LdapUser[source]

Bases: TypedDict

dn: str

The user’s dn in LDAP

uid: NotRequired[str | List[str]]
class crossauth_backend.LdapUserStorage(local_storage: UserStorage, options: LdapUserStorageOptions = {})[source]

Bases: UserStorage

Wraps another user storage but with the authentication done in LDAP.

This class still needs a user to be created in another database, with for example a user id that can be referenced in key storage, and a state variable.

An admin account is not used. Searches are done as the user, with the user’s password.

async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]

Creates a user with the given details and secrets.

Parameters:
Returns:

the new user as a User object

:raises crossauth_backend.CrossauthError with ErrorCode Configuration

async delete_user_by_id(id: str | int) None[source]

If the storage supports this, delete the user with the given ID from storage.

Parameters:

id (str|int) – id of user to delete

async delete_user_by_username(username: str) None[source]

If the storage supports this, delete the named user from storage.

Parameters:

username (str) – username to delete

async get_ldap_user(username: str, password: str) LdapUser[source]

Gets the user from LDAP. Does not check local storage.

If the user doesn’t exist or authentication fails, an exception is thrown :param username: the username to fetch :param password: the LDAP password :returns: the matching LdapUser :raises: CrossauthError with ErrorCode UsernameOrPasswordInvalid or Connection

async get_user_by(field: str, value: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given field, or throws an exception.

This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.

Parameters:
  • field (str) – the field to match

  • value (str) – the value to match

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given email address, or throws an exception.

If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column

Parameters:
  • email – the email address to return the user of

  • options – optionally turn off checks. Used internally

Raises:

crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection

async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given user id, or throws an exception.

Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.

Parameters:
  • id (str|int) – the user id to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError with ErrorCode either UserNotExist or Connection

async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given username, or throws an exception.

If normalize_username is true, the username should be matched normalized and lowercased (using normalize())

Parameters:
  • username (str) – the username to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

async get_users(skip: int | None = None, take: int | None = None) List[User][source]

Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)

Parameters:
  • skip (int|None) – skip this number of records from the start of the set

  • take (int|None) – only return at most this number of records

Returns:

an array of User objects

async ldap_bind(dn: str, password: str) Connection[source]

bind and return the ldap client from https://github.com/shaozi/ldap-authentication/blob/master/index.js

require_user_entry() bool[source]

Returns false

async search_user(ldap_client: Connection, user_dn: str, attributes: List[str] | None = None) LdapUser[source]

Search for user in LDAP

async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]

Updates an existing user with the given details and secrets.

If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.

Parameters:
  • user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.

  • secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update

class crossauth_backend.LdapUserStorageOptions[source]

Bases: UserStorageOptions

Optional parameters for {@link LdapUserStorage}.

admin_editable_fields: List[str]
create_user_fn: Callable[[UserInputFields, LdapUser], UserInputFields]

A function to create a user object given the entry in LDAP and additional fields. The additional fields might be useful for attributes that aren’t in LDAP and the user needs to be prompted for, for example email address. The default function sets username to uid from ldapUser, state to active and takes every field for user (overriding status and username if present).

ldap_urls: List[str]

//ldap,example.com:1636 No default (required)

Type:

Utl running LDAP server. eg ldap

Type:

//ldap.example.com or ldaps

ldap_user_search_base: str

Search base, for user queries, eg ou=users,dc=example,dc=com. Default empty

ldap_username_attribute: str

Username attribute for searches. Default “cn”.

normalize_email: bool
normalize_username: bool
user_editable_fields: List[str]
class crossauth_backend.LocalPasswordAuthenticator(user_storage: UserStorage, options: LocalPasswordAuthenticatorOptions = {})[source]

Bases: PasswordAuthenticator

NoPassword = '********'
async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]

Authenticates the user, returning a the user as a {@link User} object.

If you set extraFields when constructing the {@link UserStorage} instance passed to the constructor, these will be included in the returned User object. hashedPassword, if present in the User object, will be removed.

:param user the username field should contain the username :param secrets from the UserSecrets table. password is expected to be present :param params the user input. password is expected to be present :raises crossauth_backend.CrossauthError with crossauth_backend.ErrorCode

of Connection, UserNotExist`or `PasswordInvalid, TwoFactorIncomplete, EmailNotVerified or UserNotActive.

can_create_user() bool[source]

Returns true

can_update_secrets() bool[source]

Returns true

can_update_user() bool[source]

Returns true

async create_one_time_secrets(user: User) UserSecretsInputFields[source]

Does nothing for this class.

async create_password_for_storage(password: str) str[source]

Just calls createPasswordHash with encode set to true @param password the password to hash @returns a string for storing in storage

async create_password_hash(password: str, salt: str | None = None, encode: bool = True) str[source]

Creates and returns a hash of the passed password, with the hashing parameters encoded ready for storage.

If salt is not provided, a random one is created. If secret was passed to the constructor or in the .env, and enableSecretInPasswords was set to true, it is used as the pepper. used as the pepper.

Parameters:
  • password – the password to hash

  • salt – the salt to use. If None, a random one will be generated.

  • encode – fi true, a hash suitable for DB storage is created (algorithm etc prefixed to the hash)

:return the encoded hash string.

async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]
async password_matches_hash(password: str, passwordHash: str, secret: str | None = None)[source]

A static version of the password hasher, provided for convenience

:param password : unhashed password :param passwordHash : hashed password :param secret secret, if used when hashing passwords, or undefined if not :return true if match, false otherwise

async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]

Does nothing for this class

async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]

Does nothing for this class

skip_email_verification_on_signup() bool[source]

false, if email verification is enabled, it should be for this authenticator too

validate_secrets(params: AuthenticationParameters) List[str][source]

Calls the implementor-provided validatePasswordFn

This function is called to apply local password policy (password length, uppercase/lowercase etc)

:param params the password should be in password

:return an array of errors

class crossauth_backend.LocalPasswordAuthenticatorOptions[source]

Bases: AuthenticationOptions

Optional parameters for :class: LocalPasswordAuthenticator.

See :func: LocalPasswordAuthenticator__init__ for details

enable_secret_for_password_hash: bool

If true, the secret will be concatenated to the salt when generating a hash for storing the password

friendly_name: str
pbkdf2_digest: str

Digest method for PBKDF2 hasher.. Default sha256

pbkdf2_iterations: int

Number of PBKDF2 iterations. Default 600_000

pbkdf2_key_length: int

Length the PBKDF2 key to generate, before bsae64-url encoding. Default 32

pbkdf2_salt_length: int

Number of characters for salt, before base64-enoding. Default 16

secret: str

Application secret. If defined, it is used as the secret in PBKDF2 to hash passwords

validate_password_fn: Callable[[AuthenticationParameters], List[str]]

Function that throws a {@link @crossauth/common!CrossauthError} with crossauth_backend.CrossauthError with crossauth_backend.ErrorCode

PasswordFormat if the password

doesn’t confirm to local rules (eg number of charafters) */

class crossauth_backend.MapGetter[source]

Bases: Generic[T]

classmethod get(mapping: Mapping[str, Any], field: str, default: T) T[source]
classmethod get_or_none(mapping: Mapping[str, Any], field: str) T | None[source]
classmethod get_or_null(mapping: Mapping[str, Any], field: str) T | NullType[source]
classmethod get_or_raise(mapping: Mapping[str, Any], field: str) T[source]
class crossauth_backend.OAuthAuthorizationStorage(options: OAuthAuthorizationStorageOptions = {})[source]

Bases: ABC

Base class for storing scopes that have been authorized by a user (or for client credentials, for a client).

This class is subclassed for various types of storage. For example, PrismaOAuthAuthorizationStorage is for storing in a database table, managed by the Prisma ORM.

abstractmethod async get_authorizations(client_id: str, userid: str | int | None = None) List[str | None][source]

Returns the matching all scopes authorized for the given client and optionally user.

Parameters:
  • client_id (str) – the client_id to look up

  • userid (int|str|None) – the userid to look up, None for a client authorization not user authorization

Returns:

The authorized scopes as a list.

abstractmethod async update_authorizations(client_id: str, userid: str | int | None, authorizations: List[str | None]) None[source]

Saves a new set of authorizations for the given client and optionally user.

Deletes the old ones.

Parameters:
  • client_id (str) – the client_id to look up

  • userid (str|int|None) – the userid to look up, None for a client authorization not user authorization

  • List[str|None]authorizations – new set of authorized scopes, which may be empty

class crossauth_backend.OAuthAuthorizationStorageOptions[source]

Bases: TypedDict

class crossauth_backend.OAuthClient[source]

Bases: TypedDict

OAuth client data as stored in a database table

client_id: str

The client_id, which is auto-generated and immutable

client_name: str

A user-friendly name for the client (not used as part of the OAuth API).

client_secret: NotRequired[str | None]

Client secret, which is autogenerated.

If there is no client secret, it should be set to undefined.

This field allows null as well as undefined this is used, for example, when partially updating a client and you specifically want to set the secret to undefined, as opposed to just not wishing to change the value. Other than that, this value is always either a str or undefined.

confidential: bool

Whether or not the client is confidential (and can therefore keep the client secret secret)

redirect_uri: List[str]

An array of value redirect URIs for the client.

userid: NotRequired[str | int | None]

ID of the user who owns this client, which may be undefined for not being owned by a specific user.

This field allows null as well as undefined this is used, for example, when partially updating a client and you specifically want to set the user ID to undefined, as opposed to just not wishing to change the value. Other than that, this value is always either a str or number (depending on the ID type in your user storage) or undefined.

valid_flow: List[str]

An array of OAuth flows allowed for this client.

See OAuthFlows.

class crossauth_backend.OAuthClientStorage(options: OAuthClientStorageOptions = {})[source]

Bases: ABC

Base class for storing OAuth clients.

This class is subclassed for various types of client storage. For example, PrismaOAuthClientStorage is for storing clients in a database table, managed by the Prisma ORM.

abstractmethod async create_client(client: OAuthClient) OAuthClient[source]

Creates and returns a new client with random ID and optionally secret.

Saves in the database.

Parameters:

client (crossauth_backend.OAuthClient) – the client to save.

Returns:

The new client.

abstractmethod async delete_client(client_id: str) None[source]

Deletes a key from storage.

Parameters:

client_id (str) – the client to delete

abstractmethod async get_client_by_id(client_id: str) OAuthClient[source]

Returns the matching client by its auto-generated id in the storage or throws an exception if it doesn’t exist.

Parameters:

client_id (str) – the client_id to look up

Returns:

he matching OAuthClient object.

abstractmethod async get_client_by_name(name: str, userid: str | int | None | NullType = None) List[OAuthClient][source]

Returns the matching client in the storage by friendly name or throws an exception if it doesn’t exist.

Parameters:
  • name (str) – the client name to look up

  • userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients with this name.

Returns:

A list of OAuthClient objects.

:raises crossauth_backend.CrossauthError: with ErrorCode of ‘InvalidSessionId’ if a match was not found in session storage.

abstractmethod async get_clients(skip: int | None = None, take: int | None = None, userid: str | int | None | NullType = None) List[OAuthClient][source]

Returns all clients in alphabetical order of client name.

Parameters:
  • skip (int|None) – skip this number of records from the start in alphabetical order

  • take (int|None) – return at most this number of records

  • userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients.

Returns:

A list of OAuthClient objects.

abstractmethod async update_client(client: PartialOAuthClient) None[source]

If the given session key exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ‘InvalidClient’.

Parameters:

client (crossauth_backend.OAuthClient) – all fields to update (client_id must be set but will not be updated)

:raises crossauth_backend.CrossauthError: with ‘InvalidClient’ if the client doesn’t exist.

class crossauth_backend.OAuthClientStorageOptions[source]

Bases: TypedDict

class crossauth_backend.OAuthTokenConsumer(audience: str, options: OAuthTokenConsumerOptions = {})[source]

Bases: object

This abstract class is for validating OAuth JWTs.

property audience
property auth_server_base_url
static decode_header(token: str)[source]
hash(plaintext: str) str[source]
property keys
async load_config(oidc_config: Dict[str, Any] | None = None)[source]

Loads OpenID Connect configuration, or fetches it from the authorization server (using the well-known enpoint appended to authServerBaseUrl ) :param Dict[str, Any]|None oidcConfig: the configuration, or undefined to load it from

the authorization server

:raises common!CrossauthError: object with ErrorCode of
  • Connection if the fetch to the authorization server failed.

async load_jwks(jwks: Keys | None = None)[source]

Loads the JWT signature validation keys, or fetches them from the authorization server (using the URL in the OIDC configuration). :param Dict[str, Any]|None jwks: the keys to load, or undefined to fetch them from

the authorization server.

:raises crossauth_backend.CrossauthError: object with ErrorCode of
  • Connection if the fetch to the authorization server failed, the OIDC configuration wasn’t set or the keys could not be parsed.

async load_keys()[source]

The RSA public keys or symmetric keys for the authorization server, either passed to the constructor or fetched from the authorization server.

property oidc_config
async token_authorized(token: str, tokenType: Literal['access', 'refresh', 'id']) Dict[str, Any] | None[source]

If the given token is valid, the payload is returned. Otherwise undefined is returned.

The signature must be valid, the expiry must not have passed and, if tokenType is defined,. the type claim in the payload must match it.

Doesn’t throw exceptions.

Parameters:
  • token (str) – The token to validate

  • token_type (Literal["access", "refresh", "id"]) – If defined, the type claim in the payload must match this value

class crossauth_backend.OAuthTokenConsumerOptions[source]

Bases: TypedDict

Options that can be passed to: OAuthTokenConsumerBase.

audience: str

The aud claim needs to match self value. No default (required)

auth_server_base_url: str

The value to expect in the iss claim. If the iss does not match self, the token is rejected. No default (required)

clock_tolerance: int

Number of seconds tolerance when checking expiration. Default 10

jwt_key_type: str

Secret key if using a symmetric cipher for signing the JWT. Either this or jwt_secret_key_file is required when using self kind of cipher

jwt_public_key: str

The public key if using a public key cipher for signing the JWT. Either this or jwt_public_key_file is required when using self kind of cipher. privateKey or privateKeyFile is also required.

jwt_public_key_file: str

Filename for the public key if using a public key cipher for signing the JWT. Either self or jwt_public_key is required when using self kind of cipher. privateKey or privateKeyFile is also required.

jwt_secret_key: str

Secret key if using a symmetric cipher for signing the JWT. Either this or jwt_secret_key_file is required when using self kind of cipher

jwt_secret_key_file: str

Filename with secret key if using a symmetric cipher for signing the JWT. Either self or jwt_secret_key is required when using self kind of cipher

key_storage: KeyStorage | None

If persisting tokens, you need to provide a storage to persist them to

oidc_config: OpenIdConfiguration | Dict[str, Any] | None

For initializing the token consumer with a static OpenID Connect configuration.

persist_access_token: bool

Whether to persist access tokens in key storage. Default false.

If you set self to True, you must also set key_storage.

class crossauth_backend.OidcAuthenticator(options: OidcAuthenticatorOptions = {})[source]

Bases: Authenticator

OIDC AUthenticator class

async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]

Authenticates the user, returning a the user as a {@link User} object.

If you set extraFields when constructing the {@link UserStorage} instance passed to the constructor, these will be included in the returned User object. hashedPassword, if present in the User object, will be removed.

:param user the username field should contain the username :param secrets from the UserSecrets table. password is expected to be present :param params the user input. password is expected to be present :raise :class: croassuth_backend.CrossauthError with

class:

crossauth_backend.ErrorCode of`Connection`,

UserNotExist`or `PasswordInvalid, TwoFactorIncomplete, EmailNotVerified or UserNotActive.

can_create_user() bool[source]

@returns true - this class can create users

can_update_secrets() bool[source]

@returns true - users can update secrets

can_update_user() bool[source]

@returns true - this class can update users

async create_one_time_secrets(user: User) UserSecretsInputFields[source]

Does nothing for this class.

async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]

This will return p hash of the passed password. :param _username ignored :param params expected to contain password :param repeat_params if defined, this is expected to also contain

password and is checked to match the one in params

:return the newly created password in the password field.

mfa_channel() Literal['none', 'email', 'sms'][source]

@returns none

mfa_type() Literal['none', 'oob', 'otp'][source]

@returns none

async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]

Does nothing for this class.

async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]

Does nothing for this class.

secret_names() List[str][source]

@returns empty array

skip_email_verification_on_signup() bool[source]

@returns false, if email verification is enabled, it should be for this authenticator too

transient_secret_names() List[str][source]

@returns an empty array

validate_secrets(params: AuthenticationParameters) List[str][source]

Does nothing for this class

class crossauth_backend.OidcAuthenticatorOptions[source]

Bases: AuthenticationOptions

friendly_name: str
class crossauth_backend.OpenIdConfiguration[source]

Bases: TypedDict

This class encapsulate the data returned by the oidc-configuration well-known endpoint. For further details, see the OpenID Connect specification.

acr_values_supported: NotRequired[list[str]]
authorization_endpoint: str
check_session_iframe: NotRequired[str]
claim_types_supported: NotRequired[list[ClaimType]]
claims_locales_supported: NotRequired[list[str]]
claims_parameter_supported: NotRequired[bool]
claims_supported: NotRequired[list[str]]
display_values_supported: NotRequired[list[str]]
end_session_endpoint: NotRequired[str]
grant_types_supported: list[GrantType]
id_token_encryption_alg_values_supported: NotRequired[list[str]]
id_token_encryption_enc_values_supported: NotRequired[list[str]]
id_token_signing_alg_values_supported: list[str]
issuer: str
jwks_uri: str
op_policy_uri: NotRequired[str]
op_tos_uri: NotRequired[str]
registration_endpoint: NotRequired[str]
request_object_encryption_alg_values_supported: NotRequired[list[str]]
request_object_encryption_enc_values_supported: NotRequired[list[str]]
request_object_signing_alg_values_supported: NotRequired[list[str]]
request_parameter_supported: NotRequired[bool]
request_uri_parameter_supported: NotRequired[bool]
require_request_uri_registration: NotRequired[bool]
response_modes_supported: list[ResponseMode]
response_types_supported: list[str]
scopes_supported: NotRequired[list[str]]
service_documentation: NotRequired[str]
subject_types_supported: list[SubjectType]
token_endpoint: str
token_endpoint_auth_methods_supported: NotRequired[list[TokenEndpointAuthMethod]]
token_endpoint_auth_signing_alg_values_supported: NotRequired[list[str]]
ui_locales_supported: NotRequired[list[str]]
userinfo_encryption_alg_values_supported: NotRequired[list[str]]
userinfo_encryption_enc_values_supported: NotRequired[list[str]]
userinfo_endpoint: NotRequired[str]
userinfo_signing_alg_values_supported: NotRequired[list[str]]
class crossauth_backend.ParamType(*values)[source]

Bases: Enum

Type of parameter that can be parsed from an option value or environment variable

Boolean = 4
Integer = 3
Json = 5
JsonArray = 6
Number = 2
String = 1
class crossauth_backend.PartialKey[source]

Bases: TypedDict

Same as crossauth_backend.Key but all fields are NotRequired

created: datetime
data: str | None
expires: datetime | NullType
lastactive: datetime | None
userid: str | int | NullType | None
value: str
class crossauth_backend.PartialOAuthClient[source]

Bases: TypedDict

Same as OAuthClient but for partial updates

client_id: str

The client_id, which is auto-generated and immutable

client_name: str

A user-friendly name for the client (not used as part of the OAuth API).

client_secret: NotRequired[str | NullType]

Client secret, which is autogenerated.

If there is no client secret, it should be set to undefined.

This field allows null as well as undefined this is used, for example, when partially updating a client and you specifically want to set the secret to undefined, as opposed to just not wishing to change the value. Other than that, this value is always either a str or undefined.

confidential: bool

Whether or not the client is confidential (and can therefore keep the client secret secret)

redirect_uri: List[str]

An array of value redirect URIs for the client.

userid: NotRequired[str | int | NullType]

ID of the user who owns this client, which may be undefined for not being owned by a specific user.

This field allows null as well as undefined this is used, for example, when partially updating a client and you specifically want to set the user ID to undefined, as opposed to just not wishing to change the value. Other than that, this value is always either a str or number (depending on the ID type in your user storage) or undefined.

valid_flow: List[str]

An array of OAuth flows allowed for this client.

See OAuthFlows.

class crossauth_backend.PartialUser[source]

Bases: PartialUserInputFields

Same as User but all fields are not required

admin: NotRequired[bool]
email: NotRequired[str]
email_normalized: NotRequired[str]

Email lowercased and non language-normalized

factor1: NotRequired[str]
factor2: NotRequired[str]
id: str | int
state: str
username: str
username_normalized: str

Username lowercased and non language-normalized

class crossauth_backend.PartialUserInputFields[source]

Bases: TypedDict

Same as UserInputFields but all fields are not required

admin: NotRequired[bool]
email: NotRequired[str]
factor1: NotRequired[str]
factor2: NotRequired[str]
state: str
username: str
class crossauth_backend.PartialUserSecrets[source]

Bases: UserSecretsInputFields

Same as UserSecrets except all fields are NotRequired

expiry: int
otp: str
password: str
totpsecret: str
userid: str | int
class crossauth_backend.PasswordAuthenticator(options: AuthenticationOptions = {})[source]

Bases: Authenticator

base class for authenticators that validate passwords

mfa_channel() str[source]
mfa_type() str[source]
secret_names() List[str][source]
transient_secret_names() List[str][source]
class crossauth_backend.SMSUser[source]

Bases: UserInputFields

admin: NotRequired[bool]
email: NotRequired[str]
factor1: str
factor2: NotRequired[str]
phone: str
state: str
username: str
class crossauth_backend.SessionCookie(key_storage: KeyStorage, options: SessionCookieOptions = {})[source]

Bases: object

Class for session management using a session id cookie.

property cookie_name
async create_session_key(userid: str | int | None, extra_fields: Mapping[str, Any] = {}) Key[source]

Creates a session key and saves in storage

Date created is the current date/time on the server.

In the unlikely event of the key already existing, it is retried up to 10 times before throwing an error with ErrorCode.KeyExists

Parameters:
  • userid (str | int | None) – the user ID to store with the session key.

  • extra_fields (Dict[str, Any]|None) – Any fields in here will also be added to the session record

Returns:

the new session key

:raises crossauth_backend.CrossauthError: with
ErrorCode KeyExists if maximum

attempts exceeded trying to create a unique session id

async delete_all_for_user(userid: str | int, except_key: str | None = None) None[source]

Deletes all keys for the given user :param str|int userid: the user to delete keys for :param str|None except_key: if defined, don’t delete this key

property domain
async get_session_key(session_id: str) Key[source]

Returns the user matching the given session key in session storage, or throws an exception.

Looks the user up in the UserStorage instance passed to the constructor.

Undefined will also fail is CookieAuthOptions.filterFunction is defined and returns false,

Parameters:

session_id (str) – the unsigned value of the session cookie

Returns:

a crossauth_backend.User object, with the password hash removed.

:raises crossauth_backend.CrossauthError: with ErrorCode set to InvalidSessionId, Expired or UserNotExist.

async get_user_for_session_id(session_id: str, options: UserStorageGetOptions = {}) UserAndKey[source]

Returns the user matching the given session key in session storage, or throws an exception.

Looks the user up in the crossauth_backend.UserStorage instance passed to the constructor.

Undefined will also fail is CookieAuthOptions.filterFunction is defined and returns false,

Parameters:
Returns:

a crossauth_backend.User object, with the password hash removed, and the:class:crossauth_backend.Key with the unhashed session_id

:raises crossauth_backend.CrossauthError: with ErrorCode set to InvalidSessionId or Expired.

static hash_session_id(session_id: str) str[source]

Returns a hash of a session ID, with the session ID prefix for storing in the storage table. :param str session_id the session ID to hash

Returns:

a base64-url-encoded string that can go into the storage

property httpOnly
property idle_timeout

Returns a Cookie object with the given session key.

This class is compatible, for example, with Express.

Parameters:
  • session_key (crossauth_backend.Key) – the value of the session key

  • persist (bool|None) – if passed, overrides the persistSessionId setting

Returns:

a Cookie object,

Takes a session ID and creates a string representation of the cookie (value of the HTTP Cookie header).

Parameters:

cookie (Cookie) – the cookie vlaues to make a string from

Returns:

a string representation of the cookie and options.

property maxAge
property path
property sameSite
property secure

Unsigns a cookie and returns the original value. :param str cookie_value: the signed cookie value

Returns:

the unsigned value

:raises crossauth_backend.CrossauthError: if the signature is invalid.

async update_session_key(session_key: PartialKey) None[source]

Updates a session record in storage :param crossauth_backend.PartialKey session_key: the fields to update. value must be set, and will not be updated. All other defined fields will be updated.

:raises crossauth_backend.CrossauthError: if the session does not exist.

class crossauth_backend.SessionCookieOptions[source]

Bases: CookieOptions, TokenEmailerOptions

Options for double-submit csrf tokens

cookie_name: str

Name of cookie. Defaults to “CSRFTOKEN”

domain: str
email_from: NotRequired[str]
email_verification_html_body: NotRequired[str]
email_verification_subject: NotRequired[str]
email_verification_text_body: NotRequired[str]
expires: datetime
filter_function: Callable[[Key], bool]

self will be called with the session key to filter sessions before returning. Function should return true if the session is valid or false otherwise.

hash_session_id: bool

If true, session IDs are stored in hashed form in the key storage. Default False.

httpOnly: bool
idle_timeout: int

If non zero, sessions will time out after self number of seconds have elapsed without activity. Default 0 (no timeout)

maxAge: int
password_reset_expires: NotRequired[int]
password_reset_html_body: NotRequired[str]
password_reset_subject: NotRequired[str]
password_reset_text_body: NotRequired[str]
path: str
persist: bool

If true, sessions cookies will be persisted between browser sessions. Default True

prefix: NotRequired[str]
render: NotRequired[Callable[[str, Dict[str, Any]], str]]
sameSite: bool | Literal['lax', 'strict', 'none']
secret: str

App secret

secure: bool
site_url: NotRequired[str]
smtp_host: NotRequired[str]
smtp_password: NotRequired[str]
smtp_port: NotRequired[int]
smtp_use_tls: NotRequired[bool]
smtp_username: NotRequired[str]
user_storage: UserStorage

If user login is enabled, you must provide the user storage class

verify_email_expires: NotRequired[int]
views: NotRequired[str]
class crossauth_backend.SessionManager(key_storage: KeyStorage, authenticators: Mapping[str, Authenticator], options: SessionManagerOptions = {})[source]

Bases: object

Class for managing sessions.

property allowed_factor2
async apply_email_verification_token(token: str) User[source]

Takes an email verification token as input and applies it to the user storage.

The state is reset to active. If the token was for changing the password, the new password is saved to the user in user storage.

Parameters:

token – the token to apply

Returns:

the new user record

property authenticators
async cancel_two_factor_page_visit(session_id: str) Dict[str, Any][source]

Cancels the 2FA that was previously initiated but not completed..

If successful, returns. Otherwise an exception is thrown.

Parameters:

session_id – the session id (unsigned)

:return

Dict[str, Any]: the 2FA data that was created on initiation

:raise

CrossauthError: of Unauthorized if 2FA was not initiated.

async change_secrets(username: str, factor_number: int, new_params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None, old_params: AuthenticationParameters | None = None) User[source]
async complete_two_factor_login(params: AuthenticationParameters, session_id: str, extra_fields: Mapping[str, Any] = {}, persist: bool = False) SessionTokens[source]

Performs the second factor authentication as the second step of the login process

If authentication is successful, the user’s state will be set to active and a new session will be created, bound to the user. The anonymous session will be deleted.

Parameters:
  • params – the user-provided parameters to authenticate with (eg TOTP code).

  • session_id – the user’s anonymous session

  • extra_fields – extra fields to add to the user-bound new session table entry

  • persist – if true, the cookie will be perstisted (with an expiry value); otberwise it will be a session-only cookie.

Return AuthResult containing:

session_cookie: the new session cookie csrf_cookie: the new CSRF cookie csrf_form_or_header_value: the new CSRF token corresponding to the cookie user: the newly-logged in user.

async complete_two_factor_page_visit(params: AuthenticationParameters, session_id: str)[source]

Completes 2FA when visiting a protected page.

If successful, returns. Otherwise an exception is thrown.

Parameters:
  • params – the parameters from user input needed to authenticate (eg TOTP code). Passed to the authenticator

  • session_id – the session cookie value (ie still signed)

:raise

CrossauthError: if authentication fails.

async complete_two_factor_setup(params: AuthenticationParameters, session_id: str) User[source]

Authenticates with the second factor.

If successful, the new user object is returned. Otherwise an exception is thrown, :param params the parameters from user input needed to authenticate (eg TOTP code) :param session_id the session cookie value (ie still signed) :return the user object :raise CrossauthError if authentication fails.

async create_anonymous_session(extra_fields: Mapping[str, Any] | None = None) SessionTokens[source]
async create_csrf_form_or_header_value(csrf_cookie_value: str)[source]

Validates the signature on the CSRF cookie value and returns a value that can be put in the form or CSRF header value.

Parameters:

csrf_cookie_value (str) – the value from the CSRF cookie

Returns:

the value to put in the form or CSRF header

async create_csrf_token() Csrf[source]

Creates and returns a signed CSRF token based on the session ID

Returns:

a CSRF cookie and value to put in the form or CSRF header

async create_user(user: UserInputFields, params: UserSecrets, repeat_params: UserSecrets | None = None, skip_email_verification: bool = False, empty_password: bool = False) User[source]

Returns the name used for CSRF token cookies.

Returns the name used for CSRF token cookies.

property csrf_header_name

Returns the name used for CSRF token cookies

property csrf_tokens
async data_for_session_id(session_id: str) Dict[str, Any] | None[source]

Returns the data object for a session id, or undefined, as an object.

If the user is undefined, or the key has expired, returns undefined.

Parameters:

session_id (str) – the session key to look up in session storage

Returns:

a string from the data field

:raise crossauth_backend.CrossauthError: with

ErrorCode of Connection, InvalidSessionId UserNotExist or Expired.

async data_string_for_session_id(session_id: str) str | None[source]

Returns the data object for a session key, or undefined, as a JSON string (which is how it is stored in the session table)

If the user is undefined, or the key has expired, returns undefined.

Parameters:

session_id (str) – the session id to look up in session storage

Returns:

a string from the data field

:raise crossauth_backend.CrossauthError: with

ErrorCode of Connection, InvalidSessionId UserNotExist or Expired.

async delete_session(session_id: str) None[source]

Deletes the given session ID from the key storage (not the cookie)

Parameters:

session_id (str) – the session Id to delete

async delete_session_data(session_id: str, name: str) None[source]

Deletes a field from the session data.

The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id; the session Id to update.

async delete_user_by_username(username: str) None[source]
property email_token_storage
get_session_id(session_cookie_value: str)[source]

Returns the session ID from the signed session cookie value

Parameters:

session_cookie_value (str) – value from the session ID cookie

Returns:

the usigned cookie value.

:raises crossauth_backend.CrossauthError with InvalidKey

if the signature is invalid.

async initiate_two_factor_login(user: User) SessionTokens[source]

Initiates the two factor login process.

Creates an anonymous session and corresponding CSRF token

Parameters:

user – the user, which should already have been authenticated with factor1

Returns:

a new anonymous session cookie and corresponding CSRF cookie and token.

async initiate_two_factor_page_visit(user: User, session_id: str, request_body: Mapping[str, Any], url: str | None = None, content_type: str | None = None) SessionTokens[source]

Initiates the two factor process when visiting a protected page.

Creates an anonymous session and corresponding CSRF token

Parameters:
  • user – the user, which should already have been authenticated with factor1

  • session_id – the logged in session associated with the user

  • request_body – the parameters from the request made before being redirected to factor2 authentication

  • url – the requested url, including path and query parameters content_type: optional content type from the request

:return

If a token was passed a new anonymous session cookie and corresponding CSRF cookie and token.

async initiate_two_factor_setup(user: User, new_factor2: str | None, session_id: str) Mapping[str, Any][source]

Begins the process of setting up 2FA for a user which has already been created and activated. Called when changing 2FA or changing its parameters.

Parameters:
  • user – the logged in user

  • new_factor2 – new second factor to change user to

  • session_id – the session cookie for the user

:return

the 2FA data that can be displayed to the user in the configure 2FA step (such as the secret and QR code for TOTP).

async initiate_two_factor_signup(user: UserInputFields, params: UserSecrets, session_id: str, repeat_params: UserSecrets | None) UserIdAndData[source]

Creates a user with 2FA enabled.

The user storage entry will be createed, with the state set to awaitingtwofactorsetup or awaitingtwofactorsetupandemailverification. The passed session key will be updated to include the username and details needed by 2FA during the configure step.

Parameters:
  • user – details to save in the user table

  • params – params the parameters needed to authenticate with factor1 (eg password)

  • session_id – the anonymous session cookie

  • repeat_params – if passed, these will be compared with params and if they don’t match, PasswordMatch is thrown.

Returns:

Dict containing:

userid: the id of the created user. userData: data that can be displayed to the user in the page to

complete 2FA set up (eg the secret key and QR codee for TOTP),

property key_storage
async login(username: str, params: AuthenticationParameters, extra_fields: Mapping[str, Any] = {}, persist: bool = False, user: User | None = None, bypass_2fa: bool = False) SessionTokens[source]

Performs a user login

  • Authenticates the username and password

  • Creates a session key - if 2FA is enabled, this is an anonymous session, otherwise it is bound to the user

  • Returns the user (without the password hash) and the session cookie.

If the user object is defined, authentication (and 2FA) is bypassed

Parameters:
  • username – the username to validate

  • params – user-provided credentials (eg password) to authenticate with

  • extra_fields – add these extra fields to the session key if authentication is successful

  • persist – if passed, overrides the persistSessionId setting.

  • user

    if this is defined, the username and password are ignored and the given user is logged in.

    The 2FA step is also skipped

    bypass2FA: if true, the 2FA step will be skipped

:return

Dict containing the user, user secrets, and session cookie and CSRF cookie and token. if a 2fa step is needed, it will be an anonymouos session, otherwise bound to the user

Raise:
CrossauthError: with ErrorCode of Connection, UserNotValid,

PasswordNotMatch or UserNotExist.

async logout(session_id: str)[source]
async logout_from_all(userid: str | int, except_id: str | None = None)[source]
async repeat_two_factor_signup(session_id: str) UserIdAndData[source]

This can be called if the user has finished signing up with factor1 but closed the browser before completing factor2 setup. Call it if the user signs up again with the same factor1 credentials.

Parameters:

session_id – the anonymous session ID for the user

Returns:

Dict containing:

userid: the id of the created user userData: data that can be displayed to the user in the page to

complete 2FA set up (eg the secret key and QR code for TOTP),

secrets: data that is saved in the session for factor2. In the

case of TOTP, both userData and secrets contain the shared secret but only userData has the QR code, since it can be generated from the shared secret.

async request_password_reset(email: str) None[source]

Sends a password reset token :param email: the user’s email (where the token will be sent)

async reset_secret(token: str, factror_number: int, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) User[source]

Resets the secret for factor1 or 2 (eg reset password)

Parameters:
  • token – the reset password token that was emailed

  • factror_number – which factor to reset (1 or 2)

  • params – the new secrets entered by the user (eg new password)

  • repeat_params – optionally, repeat of the secrets. If passed, an exception will be thrown if they do not match

:return the user object

Raises:

CrossauthError – if the repeat_params don’t match params, the token is invalid or the user storage cannot be updated.

property session

Returns the name used for session ID cookies.

Returns the name used for session ID cookies

async update_many_session_data(session_id: str, data_array: List[KeyDataEntry]) None[source]

Update field sin the session data.

The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id: the session Id to update. :param List[crossauth_backend.KeyDataEntry] data_array: names and values.

async update_session_activity(session_id: str)[source]

If session_idle_timeout is set, update the last activcity time in key storage to current time.

Parameters:

session_id (str) – the session Id to update.

async update_session_data(session_id: str, name: str, value: Mapping[str, Any]) None[source]

Update a field in the session data.

The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id: the session Id to update. :param str name: of the field. :param Mapping[str, Any] value: new value to store

async update_user(current_user: User, new_user: User, skip_email_verification: bool = False, as_admin: bool = False) TokensSent[source]

Updates a user entry in storage :param current_user the current user details :param new_user the new user details :return dict with emailVerificationTokenSent and passwordResetTokenSent booleans

async user_for_password_reset_token(token: str) User[source]

Returns the user associated with a password reset token

Parameters:

token – the token that was emailed

Returns:

the user

Raises:

CrossauthError – if the token is not valid.

async user_for_session_id(session_id: str)[source]
property user_storage

Throws crossauth_backend.CrossauthError with InvalidKey if the passed CSRF cookie value is not valid (ie invalid signature) :param str csrf_cookie_value: the CSRF cookie value

validate_double_submit_csrf_token(csrf_cookie_value: str, csrf_form_or_header_value: str)[source]

Throws crossauth_backend.CrossauthError with InvalidKey if the passed CSRF token is not valid for the given session ID. Otherwise returns without error

Parameters:
  • strcsrf_cookie_value – the CSRF cookie value

  • csrf_form_or_header_value (str) – the value from the form field or CSRF header

class crossauth_backend.SessionManagerOptions[source]

Bases: TokenEmailerOptions

Options for SessionManager

allowed_factor2: List[str]

Set of 2FA factor names a user is allowed to set.

The name corresponds to the key you give when adding authenticators. See authenticators in SessionManager.constructor.

Options for csrf cookie manager

email_from: NotRequired[str]
email_token_storage: KeyStorage

Store for password reset and email verification tokens. If not passed, the same store as for sessions is used.

email_verification_html_body: NotRequired[str]
email_verification_subject: NotRequired[str]
email_verification_text_body: NotRequired[str]
enable_email_verification: bool

If true, users will have to verify their email address before account is created or when changing their email address. See class description for details. Default True

enable_password_reset: bool

If true, allow password reset by email token. See class description for details. Default True

password_reset_expires: NotRequired[int]
password_reset_html_body: NotRequired[str]
password_reset_subject: NotRequired[str]
password_reset_text_body: NotRequired[str]
prefix: NotRequired[str]
render: NotRequired[Callable[[str, Dict[str, Any]], str]]
secret: str

Server secret. Needed for emailing tokens and for csrf tokens

options for session cookie manager

site_url: str

Base URL for the site.

This is used when constructing URLs, eg for sending password reset tokens.

smtp_host: NotRequired[str]
smtp_password: NotRequired[str]
smtp_port: NotRequired[int]
smtp_use_tls: NotRequired[bool]
smtp_username: NotRequired[str]
user_storage: UserStorage

If user login is enabled, you must provide the object where users are stored.

verify_email_expires: NotRequired[int]
views: NotRequired[str]
class crossauth_backend.SmsAuthenticator(options: SmsAuthenticatorOptions = {})[source]

Bases: Authenticator

Abstract base class for sending OTP by SMS

async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]

Authenticates the user by comparing the user-provided otp with the one in secrets.

Validation fails if the otp is incorrect or has expired.

Parameters:
  • _user – ignored

  • secrets – taken from the session and should contain otp and expiry

  • params – user input and should contain otp

Raises:

CrossauthError – with ErrorCode InvalidToken or Expired.

can_create_user() bool[source]
Returns:

true - this class can create users

can_update_secrets() bool[source]
Returns:

false - users cannot update secrets

can_update_user() bool[source]
Returns:

true - this class can update users

async create_one_time_secrets(user: User) Dict[str, Any][source]

Creates and sends a new one-time code.

Parameters:

user – the user to create it for. Uses the phone field which should start with +

Returns:

otp and expiry as a Unix time (number).

async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) Dict[str, Any][source]

Does nothing for this class

static is_phone_valid(number: str) bool[source]

Returns whether or not the passed phone number has a valid form.

Parameters:

number – the phone number to validate

Returns:

true if it is valid, false otherwise

mfa_channel() str[source]

Used by the OAuth password_mfa grant type.

mfa_type() str[source]

Used by the OAuth password_mfa grant type.

async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]

Creates and sends the one-time code

Parameters:

user – the user to create it for. Uses the phone field which is expected to be a phone number starting with +

Returns:

userData containing username, phone, factor2 sessionData containing the same plus otp and expiry which

is a Unix time (number).

async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]

Creates and sends a new one-time code.

Parameters:
  • _username – ignored

  • session_key – the session containing the previously created data.

Returns:

Dictionary containing userData, secrets, and newSessionData

secret_names() List[str][source]
Returns:

empty - this authenticator has no persistent secrets

skip_email_verification_on_signup() bool[source]
Returns:

false - doesn’t replace email verification

transient_secret_names() List[str][source]
Returns:

otp

static validate_phone(number: str | None) None[source]

Throws an exception if a phone number doesn’t have a valid form.

It must start with a + and be 8 to 15 digits

Parameters:

number – the phone number to validate

Raises:

CrossauthError – with ErrorCode InvalidPhoneNumber.

validate_secrets(params: AuthenticationParameters) List[str][source]

Does nothing for this class

static zero_pad(num: int, places: int) str[source]

Takes a number and turns it into a zero-padded string

Parameters:
  • num – number to pad

  • places – total number of required digits

Returns:

zero-padded string

class crossauth_backend.SmsAuthenticatorOptions[source]

Bases: AuthenticationOptions

Optional parameters for :class: EmailAuthenticator.

See :func: EmailAuthenticator__init__ for details

friendly_name: str
render: Callable[[str, Dict[str, Any]], str]

if passed, use this instead of the default jinja2 renderer

sms_authenticator_body: str

Template file containing text for producing SMS messages. Default smsauthenticationbody.njk

sms_authenticator_from: str

Sender for SMSs

sms_authenticator_token_expires: int

Number of seconds before otps should expire. Default 5 minutes

views: str

The directory containing views (by default, Jinja2 templates)

class crossauth_backend.SqlAlchemyKeyStorage(engine: AsyncEngine, options: SqlAlchemyKeyStorageOptions = {})[source]

Bases: KeyStorage

async delete_all_for_user(userid: str | int | None, prefix: str, except_key: str | None = None) None[source]

Deletes all keys from storage for the given user ID

Parameters:
  • userid (int|str|None) – User ID to delete keys for

  • prefix (str) – Only keys starting with this prefix will be deleted

  • except_key (str|None) – If defined, the key with this value will not be deleted

async delete_data(key_name: str, data_name: str) None[source]

The ‘data’ field in a key entry is a JSON string. This method should atomically delete a field in it.

Parameters:
  • key_name (str) – The name of the key to update, as it appears in the table.

  • data_name (str) – The field name to delete. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is deleted.

async delete_key(value: str) None[source]

Deletes a key from storage (e.g., the database).

Parameters:

value (str) – The key to delete

async delete_matching(key: PartialKey) None[source]

Deletes all matching the given specs

Parameters:

key (crossauth_backend.PartialKey) – Any key matching all defined values in this object will be deleted

async delete_with_prefix(userid: str | int | None, prefix: str) None[source]
async get_all_for_user(userid: str | int | None = None) List[Key][source]

Return all keys matching the given user ID

Parameters:

userid (str|int|None) – User to return keys for

Returns:

List[Key]: An array of keys

async get_key(key: str) Key[source]

Returns the matching key in the session storage or raises an exception if it doesn’t exist.

Parameters:

key (ster) – The key to look up, as it will appear in this storage (typically unsigned, hashed)

Returns:

The matching Key record.

async get_key_in_transaction(conn: AsyncConnection, keyValue: str) Key[source]
async save_key(userid: str | int | None, value: str, date_created: datetime, expires: datetime | None = None, data: str | None = None, extra_fields: Mapping[str, Any] | None = None) None[source]

Saves a session key in the session storage (e.g., database).

Parameters:
  • userid (int|str|None) – The ID of the user. This matches the primary key in the UserStorage implementation.

  • value (str) – The key value to store.

  • date_created (datetime) – The date/time the key was created.

  • expires (datetime|None) – The date/time the key expires.

  • extra_fields (Mapping[str, Any]|None) – These will also be saved in the key record

Padam str|None data:

An optional value, specific to the type of key, e.g., new email for email change tokens

to_dict(row: Row[Any], with_relationships: bool = True) dict[str, Any][source]
async update_data(key_name: str, data_name: str, value: Any) None[source]

The ‘data’ field in a key entry is a JSON string. This method should atomically update a field in it.

Parameters:
  • key_name (str) – The name of the key to update, as it appears in the table.

  • data_name (str) – The field name to update. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is updated.

  • value (Any|None) – The new value.

async update_key(key: PartialKey) None[source]

If the given session key exists in the database, update it with the passed values. If it doesn’t exist, raise a CrossauthError with ErrorCode ‘InvalidKey’.

Parameters:

key (crossauth_backend.PartialKey) – The fields defined in this will be updated. ‘id’ must be present and it will not be updated.

async update_key_in_transaction(conn: AsyncConnection, key: PartialKey) None[source]
async update_many_data(key_name: str, data_array: List[KeyDataEntry]) None[source]

Same as ‘update_data’ but updates several keys.

Ensure it is done as a single transaction.

Parameters:
class crossauth_backend.SqlAlchemyKeyStorageOptions[source]

Bases: TypedDict

Optional parameters for :class: SqlAlchemyKeyStorage.

See :func: SqlAlchemyKeyStorage__init__ for details

key_table: str
userid_foreign_key_column: str
class crossauth_backend.SqlAlchemyOAuthClientStorage(engine: AsyncEngine, options: SqlAlchemyOAuthClientStorageOptions = {})[source]

Bases: OAuthClientStorage

async create_client(client: OAuthClient) OAuthClient[source]

Creates and returns a new client with random ID and optionally secret.

Saves in the database.

Parameters:

client (crossauth_backend.OAuthClient) – the client to save.

Returns:

The new client.

async delete_client(client_id: str) None[source]

Deletes a key from storage.

Parameters:

client_id (str) – the client to delete

async get_client_by_id(client_id: str) OAuthClient[source]

Returns the matching client by its auto-generated id in the storage or throws an exception if it doesn’t exist.

Parameters:

client_id (str) – the client_id to look up

Returns:

he matching OAuthClient object.

async get_client_by_name(name: str, userid: str | int | None | NullType = None) List[OAuthClient][source]

Returns the matching client in the storage by friendly name or throws an exception if it doesn’t exist.

Parameters:
  • name (str) – the client name to look up

  • userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients with this name.

Returns:

A list of OAuthClient objects.

:raises crossauth_backend.CrossauthError: with ErrorCode of ‘InvalidSessionId’ if a match was not found in session storage.

async get_client_in_transaction(conn: AsyncConnection, field: str | None, value: str | None, userid: int | str | NullType | None = None, skip: int | None = None, take: int | None = None) List[OAuthClient][source]
async get_clients(skip: int | None = None, take: int | None = None, userid: str | int | None | NullType = None) List[OAuthClient][source]

Returns all clients in alphabetical order of client name.

Parameters:
  • skip (int|None) – skip this number of records from the start in alphabetical order

  • take (int|None) – return at most this number of records

  • userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients.

Returns:

A list of OAuthClient objects.

async update_client(client: PartialOAuthClient) None[source]

If the given session key exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ‘InvalidClient’.

Parameters:

client (crossauth_backend.OAuthClient) – all fields to update (client_id must be set but will not be updated)

:raises crossauth_backend.CrossauthError: with ‘InvalidClient’ if the client doesn’t exist.

class crossauth_backend.SqlAlchemyOAuthClientStorageOptions[source]

Bases: OAuthClientStorageOptions

Optional parameters for :class: SqlAlchemyUserStorage.

See :func: SqlAlchemyUserStorage__init__ for details

client_table: str

Name of client table. Default OAuthClient

redirect_uri_table: str

Name of the redirect uri table. Default OAuthClientRedirectUri.

userid_foreign_key_column: str

Column name for the userid field in the client table. Default userid

valid_flow_table: str

Name of the valid flows table. Default OAuthClientValidFlow

class crossauth_backend.SqlAlchemyUserStorage(engine: AsyncEngine, options: SqlAlchemyUserStorageOptions = {})[source]

Bases: UserStorage

async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]

Creates a user with the given details and secrets.

Parameters:
Returns:

the new user as a User object

:raises crossauth_backend.CrossauthError with ErrorCode Configuration

async delete_user_by_id(id: str | int) None[source]

If the storage supports this, delete the user with the given ID from storage.

Parameters:

id (str|int) – id of user to delete

async delete_user_by_username(username: str) None[source]

If the storage supports this, delete the named user from storage.

Parameters:

username (str) – username to delete

async get_user_by(field: str, value: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given field, or throws an exception.

This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.

Parameters:
  • field (str) – the field to match

  • value (str) – the value to match

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given email address, or throws an exception.

If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column

Parameters:
  • email – the email address to return the user of

  • options – optionally turn off checks. Used internally

Raises:

crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection

async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given user id, or throws an exception.

Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.

Parameters:
  • id (str|int) – the user id to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError with ErrorCode either UserNotExist or Connection

async get_user_by_in_transaction(conn: AsyncConnection, field: str, value: str | int) UserAndSecrets[source]
async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given username, or throws an exception.

If normalize_username is true, the username should be matched normalized and lowercased (using normalize())

Parameters:
  • username (str) – the username to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

async get_users(skip: int | None = None, take: int | None = None) List[User][source]

Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)

Parameters:
  • skip (int|None) – skip this number of records from the start of the set

  • take (int|None) – only return at most this number of records

Returns:

an array of User objects

to_dict(row: Row[Any]) dict[str, Any][source]
async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]

Updates an existing user with the given details and secrets.

If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.

Parameters:
  • user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.

  • secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update

class crossauth_backend.SqlAlchemyUserStorageOptions[source]

Bases: UserStorageOptions

Optional parameters for :class: SqlAlchemyUserStorage.

See :func: SqlAlchemyUserStorage__init__ for details

admin_editable_fields: List[str]
force_id_to_number: bool

This works around a Fastify and Sveltekit limitation. If the id passed to getUserById() is a string but is numeric, first try forcing it to a number before selecting. If that fails, try it as the string, Default true.

id_column: str

Name of the id column in the user table. Can be set to username if that is your primary key. Default id.

joins: List[str]

Other tables to join. Default is [] (UserSecrets is alwayws joined)

normalize_email: bool
normalize_username: bool
user_editable_fields: List[str]
user_secrets_table: str

Name of user secrets table (Default UserSecrets

user_table: str

Name of user table Default User

userid_foreign_key_column: str

Name of the user id column in the user secrets. Default userid.

class crossauth_backend.TokenBodyType[source]

Bases: TypedDict

binding_code: str
client_id: Required[str]
client_secret: str
code: str
code_verifier: str
device_code: str
grant_type: Required[str]
mfa_token: str
oobCode: str
otp: str
password: str
refresh_token: str
scope: str
username: str
class crossauth_backend.TotpAuthenticator(app_name: str, options: AuthenticationOptions = {})[source]

Bases: Authenticator

This authenticator sends a one-time code by email

async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]

Authenticates the user using the saved TOTP parameters and the passed code.

Parameters:
  • _user – ignored

  • secrets – should contain totpsecret that was saved in the session data

  • params – should contain otp

can_create_user() bool[source]
Returns:

true - this class can create users

can_update_secrets() bool[source]
Returns:

false - users cannot update secrets

can_update_user() bool[source]
Returns:

true - this class can update users

async create_one_time_secrets(user: User) UserSecretsInputFields[source]

Does nothing for this class

async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]

Creates and returns a totpsecret

allow_empty_secrets is ignored.

Parameters:
  • username – the user to create these for

  • _params – ignored

  • _repeat_params – ignored

Returns:

Dictionary where the totpsecret field will be populated.

mfa_channel() str[source]

Used by the OAuth password_mfa grant type.

mfa_type() str[source]

Used by the OAuth password_mfa grant type.

async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]

Creates a shared secret and returns it, along with image data for the QR code to display.

Parameters:

user – the username is expected to be present. All other fields are ignored.

Returns:

  • userData containing username, totpsecret, factor2 and qr.

  • sessionData containing the same except qr.

Return type:

Dictionary containing

async reprepare_configuration(username: str, session_key: Key) Dict[str, Any] | None[source]

For cases when the 2FA page was closed without completing. Returns the same data as prepare_configuration, without generating a new secret.

Parameters:
  • username – user to return this for

  • session_key – the session key, which should contain the sessionData from prepare_configuration

Returns:

  • userData containing totpsecret, factor2 and qr.

  • secrets containing totpsecret.

  • newSessionData containing the same except qr.

Return type:

Dictionary containing

secret_names() List[str][source]
Returns:

List containing totpsecret

skip_email_verification_on_signup() bool[source]
Returns:

false - if email verification is enabled, it should be used

for this class

transient_secret_names() List[str][source]
Returns:

List containing otp

validate_secrets(params: AuthenticationParameters) List[str][source]

Does nothing for this class

class crossauth_backend.User[source]

Bases: UserInputFields

This adds ID to UserInputFields.

If your username field is unique and immutable, you can omit ID (passing username anywhere an ID) is expected. However, if you want users to be able to change their username, you should include ID field and make that immutable instead.

admin: NotRequired[bool]
email: NotRequired[str]
email_normalized: NotRequired[str]

Email lowercased and non language-normalized

factor1: str
factor2: NotRequired[str]
id: str | int

ID fied, which may be auto-generated

state: str
username: str
username_normalized: NotRequired[str]

Username lowercased and non language-normalized

class crossauth_backend.UserAndSecrets[source]

Bases: TypedDict

secrets: UserSecrets
user: User
class crossauth_backend.UserInputFields[source]

Bases: TypedDict

Describes a user as fetched from the user storage (eg, database table), excluding auto-generated fields such as an auto-generated ID

This is extendible with additional fields - provide them to the UserStorage class as extraFields.

You may want to do this if you want to pass additional user data back to the caller, eg real name.

The fields defined here are the ones used by Crossauth. You may add others.

admin: NotRequired[bool]

Whether or not the user has administrator priviledges (and can acess admin-only functions).

email: NotRequired[str]

You can optionally include an email address field in your user table. If your username is an email address, you do not need a separate field.

factor1: str

Factor for primary authentication.

Should match the name of an authenticator

factor2: NotRequired[str]

Factor for second factor authentication.

Should match the name of an authenticator

state: str

You are free to define your own states. The ones Crossauth recognises are defined in UserState.

username: str

The username. This may be an email address or anything else, application-specific.

class crossauth_backend.UserSecrets[source]

Bases: UserSecretsInputFields

This adds the user ID toi UserSecretsInputFields.

expiry: int
otp: str
password: str
totpsecret: str
userid: str | int
class crossauth_backend.UserSecretsInputFields[source]

Bases: TypedDict

Secrets, such as a password, are not in the User object to prevent them accidentally being leaked to the frontend. All functions that return secrets return them in this separate object.

The fields in this class are the ones that are not autogenerated by the database.

expiry: int
otp: str
password: str
totpsecret: str
class crossauth_backend.UserState[source]

Bases: object

These are used valiues for state in a crossauth_backend.User object

active = 'active'
awaiting_email_verification = 'awaitingemailverification'
awaiting_two_factor_setup = 'awaitingtwofactorsetup'
awaiting_two_factor_setup_and_email_verification = 'awaitingtwofactorsetupandemailverification'
disabled = 'disabled'
factor2_reset_needed = 'factor2resetneeded'
password_and_factor2_reset_needed = 'passwordandfactor2resetneeded'
password_change_needed = 'passwordchangeneeded'
password_reset_needed = 'passwordresetneeded'
class crossauth_backend.UserStorage(options: UserStorageOptions = {})[source]

Bases: ABC

Base class for place where user details are stored.

This class is subclassed for various types of user storage, e.g. PrismaUserStorage is for storing username and password in a database table, managed by the Prisma ORM.

Username and email searches should be case insensitive, as should their unique constraints. ID searches need not be case insensitive.

property admin_editable_fields
async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]

Creates a user with the given details and secrets.

Parameters:
Returns:

the new user as a User object

:raises crossauth_backend.CrossauthError with ErrorCode Configuration

abstractmethod async delete_user_by_id(id: str | int) None[source]

If the storage supports this, delete the user with the given ID from storage.

Parameters:

id (str|int) – id of user to delete

abstractmethod async delete_user_by_username(username: str) None[source]

If the storage supports this, delete the named user from storage.

Parameters:

username (str) – username to delete

abstractmethod async get_user_by(field: str, value: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given field, or throws an exception.

This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.

Parameters:
  • field (str) – the field to match

  • value (str) – the value to match

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

abstractmethod async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given email address, or throws an exception.

If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column

Parameters:
  • email – the email address to return the user of

  • options – optionally turn off checks. Used internally

Raises:

crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection

abstractmethod async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given user id, or throws an exception.

Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.

Parameters:
  • id (str|int) – the user id to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError with ErrorCode either UserNotExist or Connection

abstractmethod async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]

Returns user matching the given username, or throws an exception.

If normalize_username is true, the username should be matched normalized and lowercased (using normalize())

Parameters:
  • username (str) – the username to return the user of

  • options (UserStorageGetOptions) – optionally turn off checks. Used internally

:raises crossauth_backend.CrossauthError: with ErrorCode either UserNotExist or Connection

abstractmethod async get_users(skip: int | None = None, take: int | None = None) List[User][source]

Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)

Parameters:
  • skip (int|None) – skip this number of records from the start of the set

  • take (int|None) – only return at most this number of records

Returns:

an array of User objects

static normalize(string: str) str[source]

By default, usernames and emails are stored in lowercase, normalized format. This function returns that normalization.

Parameters:

string (str) – the string to normalize

Returns:

the normalized string, in lowercase with diacritics removed

property normalize_email
property normalize_username
abstractmethod async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]

Updates an existing user with the given details and secrets.

If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.

Parameters:
  • user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.

  • secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update

property user_editable_fields
class crossauth_backend.UserStorageGetOptions[source]

Bases: TypedDict

Passed to get methods :class: UserStorage.

skip_active_check: bool

If true, a valid user will be returned even if state is not set to active

skip_email_verified_check: bool

If true, a valid user will be returned even if state is set to awaitingemailverification

class crossauth_backend.UserStorageOptions[source]

Bases: TypedDict

Options passed to :class: UserStorage constructor

admin_editable_fields: List[str]

Fields that admins are allowed to edit (in addition to userEditableFields)

normalize_email: bool

If true, email addresses (in the email column not in the username column) will be matched as lowercase and with diacritics removed. Default true.

normalize_username: bool

If true, usernames will be matched as lowercase and with diacritics removed. Default true,

Note: this doesn’t apply to the ID column

user_editable_fields: List[str]

Fields that users are allowed to edit. Any fields passed to a create or update call that are not in this list will be ignored.

crossauth_backend.default_create_user_dn(user: UserInputFields, ldap_user: LdapUser) UserInputFields[source]
crossauth_backend.default_password_validator(params: AuthenticationParameters) List[str][source]
crossauth_backend.j(arg: Mapping[str, Any] | str) Dict[str, Any] | str[source]

Helper function that returns JSON if the error log supports it, otherwise a string

crossauth_backend.register_sqlite_datetime()[source]

SQLite has no date column types. As of Python 3.12, the default adapters which made this seamless don’t work.

If you don’t have your own adapters and you want to store dates etc as real, call this function to register the ones supplied with Crossauth and declare your columns as date/datetime/timestamp as normal.

crossauth_backend.set_parameter(param: str, param_type: ParamType, instance: Any, options: Mapping[str, Any], env_name: str | None = None, required: bool = False, public: bool = False, protected: bool = False) None[source]

Sets an instance variable in the passed object from the passed options object and environment variable.

If the named parameter exists in the options object, then the instance variable is set to that value. Otherwise, if the named environment variable exists, it is set from that. Otherwise, the instance variable is not updated.

Parameters:
  • param (str) – The name of the parameter in the options variable and the name of the variable in the instance.

  • param_type (ParamType) – The type of variable. If the value is JsonArray or Json, both the option and the environment variable value should be a string, which will be parsed.

  • instance (Any) – Options present in the options or environment variables will be set on a corresponding instance variable in this class or object.

  • options (Dict[str, Any]) – Object containing options as key/value pairs.

  • env_name (str) – Name of environment variable.

  • required (bool) – If true, an exception will be thrown if the variable is not present in options or the environment variable.

  • public (bool) – If false, _ will be prepended to the field nam,e in the target. Default False.

  • protected (bool) – If true, __ will be prepended to the field nam,e in the target. Default False. Don’t use this and public together.

Raises:

crossauth_backend.CrossauthError: with ErrorCode Configuration if required is set but the option was not present, or if there was a parsing error.