Source code for sf_toolkit.auth.login_soap

"""Login classes and functions for salesforce-toolkit

Based on simple-salesforce 1.12.5
"""

import lxml.etree as etree
from html import escape

import httpx

from .types import (
    AuthMissingResponse,
    SalesforceLogin,
    SalesforceToken,
    SalesforceTokenGenerator,
)

from ..exceptions import SalesforceAuthenticationFailed

DEFAULT_CLIENT_ID_PREFIX = "sf-toolkit"


def get_xml_element_value(xmlString: bytes | str, elementName: str) -> str | None:
    """
    Extracts an element value from an XML string.

    For example, invoking
    get_element_value(
        '<?xml version="1.0" encoding="UTF-8"?><foo>bar</foo>', 'foo')
    should return the value 'bar'.
    """
    if isinstance(xmlString, str):
        xmlString = xmlString.encode("utf-8")

    root = etree.fromstring(xmlString)

    elements = root.findall(f".//{{*}}{elementName}")

    if elements and elements[0].text:
        return elements[0].text
    return None


XML_NS = "sf"


def soap_login(
    domain: str, request_body: str | None, sf_version: float | int | None = None
) -> SalesforceTokenGenerator:
    """Process SOAP specific login workflow."""
    if domain.casefold() not in ["login", "test"] and not domain.casefold().endswith(
        ".my"
    ):
        if "--" in domain and not domain.endswith(".sandbox"):
            domain = domain + ".sandbox"
        domain = domain + ".my"
    full_domain = f"https://{domain}.salesforce.com"

    if not sf_version:
        if domain in ["login", "test"]:
            raise ValueError(
                "Cannot infer API version using the shared `login` or `test` domains"
            )
        response = yield httpx.Request(
            "GET",
            f"{full_domain}/services/data",
            headers={"Accept": "application/json"},
        )
        if not response:
            raise ValueError("Unable to infer API version from response")
        response.raise_for_status()
        sf_version = float(max(response.json(), key=lambda x: x["version"])["version"])

    soap_url = httpx.URL(f"{full_domain}/services/Soap/u/{sf_version:.01f}")

    response = yield httpx.Request(
        "POST",
        soap_url,
        content=request_body,
        headers={
            "content-type": "text/xml",
            "charset": "UTF-8",
            "SOAPAction": "login",
        },
    )
    if not response:
        raise AuthMissingResponse("No response provided for SOAP login")

    if not response.is_success:
        except_code = get_xml_element_value(response.text, f"{XML_NS}:exceptionCode")
        except_msg = get_xml_element_value(response.text, f"{XML_NS}:exceptionMessage")
        raise SalesforceAuthenticationFailed(except_code, except_msg)

    session_id = get_xml_element_value(response.text, "sessionId")
    server_url = get_xml_element_value(response.text, "serverUrl")
    assert server_url is not None, "Unable to find Server URL"
    assert session_id is not None, "Unable to find Session ID"

    return SalesforceToken(httpx.URL(server_url), session_id)


[docs] def security_token_login( username: str, password: str, security_token: str, client_id: str | None = None, domain: str = "login", api_version: float | int | None = None, ) -> SalesforceLogin: if client_id: client_id = DEFAULT_CLIENT_ID_PREFIX + "/" + client_id else: client_id = DEFAULT_CLIENT_ID_PREFIX username = escape(username) password = escape(password) login_soap_request_body = f"""<?xml version="1.0" encoding="utf-8" ?> <env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:env="http://schemas.xmlsoap.org/soap/envelope/" xmlns:urn="urn:partner.soap.sforce.com"> <env:Header> <urn:CallOptions> <urn:client>{client_id}</urn:client> <urn:defaultNamespace>{XML_NS}</urn:defaultNamespace> </urn:CallOptions> </env:Header> <env:Body> <n1:login xmlns:n1="urn:partner.soap.sforce.com"> <n1:username>{username}</n1:username> <n1:password>{password}{security_token}</n1:password> </n1:login> </env:Body> </env:Envelope>""" return lambda: soap_login( domain, login_soap_request_body, api_version, )
[docs] def ip_filtering_org_login( username: str, password: str, organizationId: str, client_id: str | None = None, domain: str = "login", api_version: float | int | None = None, ) -> SalesforceLogin: if client_id: client_id = DEFAULT_CLIENT_ID_PREFIX + "/" + client_id else: client_id = DEFAULT_CLIENT_ID_PREFIX username = escape(username) password = escape(password) login_soap_request_body = f"""<?xml version="1.0" encoding="utf-8" ?> <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:urn="urn:partner.soap.sforce.com"> <soapenv:Header> <urn:CallOptions> <urn:client>{client_id}</urn:client> <urn:defaultNamespace>{XML_NS}</urn:defaultNamespace> </urn:CallOptions> <urn:LoginScopeHeader> <urn:organizationId>{organizationId}</urn:organizationId> </urn:LoginScopeHeader> </soapenv:Header> <soapenv:Body> <urn:login> <urn:username>{username}</urn:username> <urn:password>{password}</urn:password> </urn:login> </soapenv:Body> </soapenv:Envelope>""" return lambda: soap_login( domain, login_soap_request_body, api_version, )
[docs] def ip_filtering_non_service_login( username: str, password: str, client_id: str | None = None, domain: str = "login", api_version: float | int | None = None, ) -> SalesforceLogin: if client_id: client_id = DEFAULT_CLIENT_ID_PREFIX + "/" + client_id else: client_id = DEFAULT_CLIENT_ID_PREFIX username = escape(username) password = escape(password) login_soap_request_body = f"""<?xml version="1.0" encoding="utf-8" ?> <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:urn="urn:partner.soap.sforce.com"> <soapenv:Header> <urn:CallOptions> <urn:client>{client_id}</urn:client> <urn:defaultNamespace>{XML_NS}</urn:defaultNamespace> </urn:CallOptions> </soapenv:Header> <soapenv:Body> <urn:login> <urn:username>{username}</urn:username> <urn:password>{password}</urn:password> </urn:login> </soapenv:Body> </soapenv:Envelope>""" return lambda: soap_login( domain, login_soap_request_body, api_version, )
[docs] def lazy_soap_login(**kwargs): """ Infer which SOAP login flow should be used based on the parameters provided. This function examines the kwargs to determine whether to use security_token_login, ip_filtering_org_login, or ip_filtering_non_service_login. Parameters are the same as the underlying login functions, with required parameters determined by the login flow chosen. Returns: SalesforceLogin: A callable that will perform the login workflow """ # Username and password are always required if "username" not in kwargs or "password" not in kwargs: raise ValueError("Username and password are required parameters") # If security_token is provided, use security_token_login if "security_token" in kwargs: return security_token_login( username=kwargs["username"], password=kwargs["password"], security_token=kwargs["security_token"], client_id=kwargs.get("client_id"), domain=kwargs.get("domain", "login"), api_version=kwargs.get("api_version"), ) # If organizationId is provided, use ip_filtering_org_login elif "organizationId" in kwargs: return ip_filtering_org_login( username=kwargs["username"], password=kwargs["password"], organizationId=kwargs["organizationId"], client_id=kwargs.get("client_id"), domain=kwargs.get("domain", "login"), api_version=kwargs.get("api_version"), ) # Otherwise, use ip_filtering_non_service_login else: return ip_filtering_non_service_login( username=kwargs["username"], password=kwargs["password"], client_id=kwargs.get("client_id"), domain=kwargs.get("domain", "login"), api_version=kwargs.get("api_version"), )