Source code for sf_toolkit.auth.login_oauth

import base64
from html import unescape
from json import JSONDecodeError
import time
import warnings

import httpx
import jwt

from sf_toolkit.exceptions import SalesforceAuthenticationFailed

from .types import (
    AuthMissingResponse,
    LazyParametersMissing,
    SalesforceLogin,
    SalesforceToken,
    SalesforceTokenGenerator,
)


def token_login(
    domain: str,
    token_data: dict[str, str],
    consumer_key: str,
    headers: dict[str, str] | None = None,
) -> SalesforceTokenGenerator:
    """Process OAuth 2.0 JWT Bearer Token Flow."""
    response = yield httpx.Request(
        "POST",
        f"https://{domain}.salesforce.com/services/oauth2/token",
        data=token_data,
        headers=headers,
    )

    if not response:
        raise AuthMissingResponse("No response received")

    try:
        json_response = response.json()
    except JSONDecodeError as exc:
        raise SalesforceAuthenticationFailed(
            response.status_code, response.text
        ) from exc

    if response.status_code != 200:
        except_code = json_response.get("error")
        except_msg = json_response.get("error_description")
        if except_msg == "user hasn't approved this consumer":
            auth_url = (
                f"https://{domain}.salesforce.com/services/oauth2/"
                "authorize?response_type=code&client_id="
                f"{consumer_key}&redirect_uri=<approved URI>"
            )
            warnings.warn(
                f"""
    If your connected app policy is set to "All users may
    self-authorize", you may need to authorize this
    application first. Browse to
    {auth_url}
    in order to Allow Access. Check first to ensure you have a valid
    <approved URI>."""
            )
        raise SalesforceAuthenticationFailed(except_code, except_msg)

    access_token = json_response.get("access_token")
    instance_url = json_response.get("instance_url")
    return SalesforceToken(httpx.URL(instance_url), access_token)


[docs] def password_login( username: str, password: str, consumer_key: str, consumer_secret: str, domain: str = "login", ) -> SalesforceLogin: """Process OAuth 2.0 Password Flow.""" domain = domain.removesuffix(".salesforce.com") return lambda: token_login( domain, { "grant_type": "password", "username": unescape(username), "password": unescape(password) if password else "", "client_id": consumer_key, "client_secret": consumer_secret, }, consumer_key, None, )
[docs] def client_credentials_flow_login( consumer_key: str, consumer_secret: str, domain: str ) -> SalesforceLogin: """Process OAuth 2.0 Client Credentials Flow.""" domain = domain.removesuffix(".salesforce.com") token_data = {"grant_type": "client_credentials"} authorization = f"{consumer_key}:{consumer_secret}" encoded = base64.b64encode(authorization.encode()).decode() return lambda: token_login( domain, token_data, consumer_key, headers={"Authorization": f"Basic {encoded}"} )
[docs] def public_key_auth_login( username: str, consumer_key: str, private_key: bytes | str, domain: str = "login" ) -> SalesforceLogin: """Process OAuth 2.0 Public Key JWT Flow.""" domain = domain.removesuffix(".salesforce.com") def _token_login(): return token_login( domain, { "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "assertion": jwt.encode( { "iss": consumer_key, "sub": username, "aud": f"https://{domain}.salesforce.com", "exp": int(time.time()) + 3600, }, private_key, algorithm="RS256", ), }, consumer_key, ) return _token_login
[docs] def lazy_oauth_login(**kwargs): """Determine which login method to use based on the provided kwargs.""" if "private_key" in kwargs: # Public Key JWT Flow return public_key_auth_login( username=kwargs["username"], consumer_key=kwargs["consumer_key"], private_key=kwargs["private_key"], domain=kwargs.get("domain", "login"), ) elif "consumer_secret" in kwargs and "username" in kwargs and "password" in kwargs: # Password Flow return password_login( username=kwargs["username"], password=kwargs["password"], consumer_key=kwargs["consumer_key"], consumer_secret=kwargs["consumer_secret"], domain=kwargs.get("domain", "login"), ) elif "consumer_secret" in kwargs and "username" not in kwargs: # Client Credentials Flow return client_credentials_flow_login( consumer_key=kwargs["consumer_key"], consumer_secret=kwargs["consumer_secret"], domain=kwargs.get("domain", "login"), ) else: raise LazyParametersMissing( "Unable to determine authentication method from provided parameters. " "Please provide appropriate parameters for one of the supported flows." )