# pyright: reportAny=false, reportExplicitAny=false
import asyncio
import json
from collections.abc import Callable, Container, Coroutine
from contextlib import ExitStack
from pathlib import Path
from typing import Any, TypeVar
from urllib.parse import quote_plus
from httpx import Response
from sf_toolkit._models import SObjectSaveResult
from sf_toolkit.async_utils import run_concurrently
from sf_toolkit.data.transformers import chunked, flatten
from ..client import AsyncSalesforceClient, SalesforceClient
from ..data.bulk import BulkApiIngestJob
from ..data.fields import (
FIELD_TYPE_LOOKUP,
BlobData,
Field,
FieldConfigurableObject,
FieldFlag,
IdField,
dirty_fields,
object_fields,
query_fields,
serialize_object,
)
from ..data.sobject import SObject, SObjectDescribe, SObjectList
from ..logger import getLogger
_logger = getLogger(__name__)
_sObject = TypeVar("_sObject", bound=SObject)
[docs]
def resolve_client(
cls: type[_sObject], client: SalesforceClient | str | None = None
) -> SalesforceClient:
if isinstance(client, SalesforceClient):
return client
return SalesforceClient.get_connection(client or cls.attributes.connection)
[docs]
def resolve_async_client(
cls: type[_sObject], client: AsyncSalesforceClient | str | None = None
):
if isinstance(client, AsyncSalesforceClient):
return client
return AsyncSalesforceClient.get_connection(client or cls.attributes.connection)
[docs]
def fetch(
cls: type[_sObject],
record_id: str,
sf_client: SalesforceClient | None = None,
) -> _sObject:
sf_client = resolve_client(cls, sf_client)
if cls.attributes.tooling:
url = f"{sf_client.tooling_sobjects_url}/{cls.attributes.type}/{record_id}"
else:
url = f"{sf_client.sobjects_url}/{cls.attributes.type}/{record_id}"
fields = list(object_fields(cls).keys())
response_data = sf_client.get(url, params={"fields": ",".join(fields)}).json()
return cls(**response_data)
[docs]
async def fetch_async(
cls: type[_sObject],
record_id: str,
sf_client: AsyncSalesforceClient | None = None,
) -> _sObject:
sf_client = resolve_async_client(cls, sf_client)
if cls.attributes.tooling:
url = f"{sf_client.tooling_sobjects_url}/{cls.attributes.type}/{record_id}"
else:
url = f"{sf_client.sobjects_url}/{cls.attributes.type}/{record_id}"
fields = list(object_fields(cls).keys())
response = await sf_client.get(url, params={"fields": ",".join(fields)})
response_data = response.json()
return cls(**response_data)
[docs]
def save_insert(
record: SObject,
sf_client: SalesforceClient | None = None,
reload_after_success: bool = False,
):
sf_client = resolve_client(type(record), sf_client)
# Assert that there is no ID on the record
if _id := getattr(record, record.attributes.id_field, None):
raise ValueError(
f"Cannot insert record that already has an {record.attributes.id_field} set: {_id}"
)
# Prepare the payload with all fields
payload = serialize_object(record)
if record.attributes.tooling:
url = f"{sf_client.tooling_sobjects_url}/{record.attributes.type}"
else:
url = f"{sf_client.sobjects_url}/{record.attributes.type}"
blob_data: BlobData | None = None
# Create a new record
if record.attributes.blob_field and (
blob_data := getattr(record, record.attributes.blob_field)
):
with blob_data as blob_payload:
# use BlobData context manager to safely open & close files
response_data = sf_client.post(
url,
files=[
(
"entity_document",
(None, json.dumps(payload), "application/json"),
),
(
record.attributes.blob_field,
(blob_data.filename, blob_payload, blob_data.content_type),
),
],
).json()
else:
response_data = sf_client.post(
url,
json=payload,
).json()
# Set the new ID on the object
_id_val = response_data["id"]
setattr(record, record.attributes.id_field, _id_val)
# Reload the record if requested
if reload_after_success:
reload(record, sf_client)
# Clear dirty fields since we've saved
dirty_fields(record).clear()
return
[docs]
async def save_insert_async(
record: SObject,
sf_client: AsyncSalesforceClient | None = None,
reload_after_success: bool = False,
):
sf_client = resolve_async_client(type(record), sf_client)
# Assert that there is no ID on the record
if _id := getattr(record, record.attributes.id_field, None):
raise ValueError(
f"Cannot insert record that already has an {record.attributes.id_field} set: {_id}"
)
# Prepare the payload with all fields
payload = serialize_object(record)
if record.attributes.tooling:
url = f"{sf_client.tooling_sobjects_url}/{record.attributes.type}"
else:
url = f"{sf_client.sobjects_url}/{record.attributes.type}"
blob_data: BlobData | None = None
# Create a new record
if record.attributes.blob_field and (
blob_data := getattr(record, record.attributes.blob_field)
):
with blob_data as blob_payload:
# use BlobData context manager to safely open & close files
response_data = (
await sf_client.post(
url,
files=[
(
"entity_document",
(None, json.dumps(payload), "application/json"),
),
(
record.attributes.blob_field,
(blob_data.filename, blob_payload, blob_data.content_type),
),
],
)
).json()
else:
response_data = (
await sf_client.post(
url,
json=payload,
)
).json()
# Set the new ID on the object
_id_val = response_data["id"]
setattr(record, record.attributes.id_field, _id_val)
# Reload the record if requested
if reload_after_success:
await reload_async(record, sf_client)
# Clear dirty fields since we've saved
dirty_fields(record).clear()
return
[docs]
def save_update(
record: SObject,
sf_client: SalesforceClient | None = None,
only_changes: bool = False,
reload_after_success: bool = False,
only_blob: bool = False, # pyright: ignore[reportUnusedParameter]
):
sf_client = resolve_client(type(record), sf_client)
# Assert that there is an ID on the record
if not (_id_val := getattr(record, record.attributes.id_field, None)):
raise ValueError(f"Cannot update record without {record.attributes.id_field}")
# If only tracking changes and there are no changes, do nothing
if only_changes and not dirty_fields(record):
return
# Prepare the payload
payload = serialize_object(record, only_changes)
payload.pop(record.attributes.id_field, None)
if record.attributes.tooling:
url = f"{sf_client.tooling_sobjects_url}/{record.attributes.type}/{_id_val}"
else:
url = f"{sf_client.sobjects_url}/{record.attributes.type}/{_id_val}"
blob_data: BlobData | None = None
# Create a new record
if record.attributes.blob_field and (
blob_data := getattr(record, record.attributes.blob_field)
):
with blob_data as blob_payload:
# use BlobData context manager to safely open & close files
sf_client.patch(
url,
files=[
(
"entity_content",
(None, json.dumps(payload), "application/json"),
),
(
record.attributes.blob_field,
(blob_data.filename, blob_payload, blob_data.content_type),
),
],
).json()
elif payload:
_ = sf_client.patch(
url,
json=payload,
headers={"Content-Type": "application/json"},
)
# Reload the record if requested
if reload_after_success:
reload(record, sf_client)
# Clear dirty fields since we've saved
dirty_fields(record).clear()
return
[docs]
async def save_update_async(
record: SObject,
sf_client: AsyncSalesforceClient | None = None,
only_changes: bool = False,
reload_after_success: bool = False,
only_blob: bool = False, # pyright: ignore[reportUnusedParameter]
):
sf_client = resolve_async_client(type(record), sf_client)
# Assert that there is an ID on the record
if not (_id_val := getattr(record, record.attributes.id_field, None)):
raise ValueError(f"Cannot update record without {record.attributes.id_field}")
# If only tracking changes and there are no changes, do nothing
if only_changes and not dirty_fields(record):
return
# Prepare the payload
payload = serialize_object(record, only_changes)
payload.pop(record.attributes.id_field, None)
if record.attributes.tooling:
url = f"{sf_client.tooling_sobjects_url}/{record.attributes.type}/{_id_val}"
else:
url = f"{sf_client.sobjects_url}/{record.attributes.type}/{_id_val}"
blob_data: BlobData | None = None
# Create a new record
if record.attributes.blob_field and (
blob_data := getattr(record, record.attributes.blob_field)
):
with blob_data as blob_payload:
# use BlobData context manager to safely open & close files
(
await sf_client.patch(
url,
files=[
(
"entity_content",
(None, json.dumps(payload), "application/json"),
),
(
record.attributes.blob_field,
(blob_data.filename, blob_payload, blob_data.content_type),
),
],
)
).json()
elif payload:
_ = await sf_client.patch(
url,
json=payload,
headers={"Content-Type": "application/json"},
)
# Reload the record if requested
if reload_after_success:
await reload_async(record, sf_client)
# Clear dirty fields since we've saved
dirty_fields(record).clear()
return
[docs]
def save_upsert(
record: SObject,
external_id_field: str,
sf_client: SalesforceClient | None = None,
reload_after_success: bool = False,
update_only: bool = False,
only_changes: bool = False,
):
if record.attributes.tooling:
raise TypeError("Upsert is not available for Tooling SObjects.")
sf_client = resolve_client(type(record), sf_client)
# Get the external ID value
if not (ext_id_val := getattr(record, external_id_field, None)):
raise ValueError(
f"Cannot upsert record without a value for external ID field: {external_id_field}"
)
# Encode the external ID value in the URL to handle special characters
ext_id_val = quote_plus(str(ext_id_val))
# Prepare the payload
payload = serialize_object(record, only_changes)
payload.pop(external_id_field, None)
# If there's nothing to update when only_changes=True, just return
if only_changes and not payload:
return
# Execute the upsert
response = sf_client.patch(
f"{sf_client.sobjects_url}/{record.attributes.type}/{external_id_field}/{ext_id_val}",
json=payload,
params={"updateOnly": update_only} if update_only else None,
headers={"Content-Type": "application/json"},
)
# For an insert via upsert, the response contains the new ID
if response.is_success:
response_data = response.json()
_id_val = response_data.get("id")
if _id_val:
setattr(record, record.attributes.id_field, _id_val)
elif update_only and response.status_code == 404:
raise ValueError(
f"Record not found for external ID field {external_id_field} with value {ext_id_val}"
)
# Reload the record if requested
if reload_after_success and (
_id_val := getattr(record, record.attributes.id_field, None)
):
reload(record, sf_client)
# Clear dirty fields since we've saved
dirty_fields(record).clear()
return record
[docs]
async def save_upsert_async(
record: SObject,
external_id_field: str,
sf_client: AsyncSalesforceClient | None = None,
reload_after_success: bool = False,
update_only: bool = False,
only_changes: bool = False,
):
if record.attributes.tooling:
raise TypeError("Upsert is not available for Tooling SObjects.")
sf_client = resolve_async_client(type(record), sf_client)
# Get the external ID value
if not (ext_id_val := getattr(record, external_id_field, None)):
raise ValueError(
f"Cannot upsert record without a value for external ID field: {external_id_field}"
)
# Encode the external ID value in the URL to handle special characters
ext_id_val = quote_plus(str(ext_id_val))
# Prepare the payload
payload = serialize_object(record, only_changes)
payload.pop(external_id_field, None)
# If there's nothing to update when only_changes=True, just return
if only_changes and not payload:
return
# Execute the upsert
response = await sf_client.patch(
f"{sf_client.sobjects_url}/{record.attributes.type}/{external_id_field}/{ext_id_val}",
json=payload,
params={"updateOnly": update_only} if update_only else None,
headers={"Content-Type": "application/json"},
)
# For an insert via upsert, the response contains the new ID
if response.is_success:
response_data = response.json()
_id_val = response_data.get("id")
if _id_val:
setattr(record, record.attributes.id_field, _id_val)
elif update_only and response.status_code == 404:
raise ValueError(
f"Record not found for external ID field {external_id_field} with value {ext_id_val}"
)
# Reload the record if requested
if reload_after_success and (
_id_val := getattr(record, record.attributes.id_field, None)
):
await reload_async(record, sf_client)
# Clear dirty fields since we've saved
dirty_fields(record).clear()
return record
[docs]
def sobject_save_csv(
record: SObject, filepath: Path | str, encoding: str = "utf-8"
) -> None:
import csv
if isinstance(filepath, str):
filepath = Path(filepath).resolve()
with filepath.open("w+", encoding=encoding) as outfile:
writer = csv.DictWriter(outfile, fieldnames=query_fields(type(record)))
writer.writeheader()
writer.writerow(flatten(serialize_object(record)))
[docs]
def sobject_save_json(
record: SObject, filepath: Path | str, encoding: str = "utf-8", **json_options: Any
) -> None:
if isinstance(filepath, str):
filepath = Path(filepath).resolve()
with filepath.open("w+", encoding=encoding) as outfile:
json.dump(serialize_object(record), outfile, **json_options)
[docs]
def save(
self: SObject,
sf_client: SalesforceClient | None = None,
only_changes: bool = False,
reload_after_success: bool = False,
external_id_field: str | None = None,
update_only: bool = False,
):
"""
Generic save function that decides whether to insert, update, or upsert
the record based on its state and provided parameters.
"""
# If we have an ID value, use save_update
if getattr(self, self.attributes.id_field, None) is not None:
return save_update(
self,
sf_client=sf_client,
only_changes=only_changes,
reload_after_success=reload_after_success,
)
# If we have an external ID field, use save_upsert
elif external_id_field:
return save_upsert(
self,
external_id_field=external_id_field,
sf_client=sf_client,
reload_after_success=reload_after_success,
update_only=update_only,
only_changes=only_changes,
)
# Otherwise, if not update_only, use save_insert
elif not update_only:
return save_insert(
self, sf_client=sf_client, reload_after_success=reload_after_success
)
else:
# If update_only is True and there's no ID or external ID, raise an error
raise ValueError("Cannot update record without an ID or external ID")
[docs]
async def save_async(
self: SObject,
sf_client: AsyncSalesforceClient | None = None,
only_changes: bool = False,
only_blob: bool = False,
reload_after_success: bool = False,
external_id_field: str | None = None,
update_only: bool = False,
):
# If we have an ID value, use save_update
if getattr(self, self.attributes.id_field, None) is not None:
return await save_update_async(
self,
sf_client=sf_client,
only_changes=only_changes,
reload_after_success=reload_after_success,
only_blob=only_blob,
)
# If we have an external ID field, use save_upsert
elif external_id_field:
return await save_upsert_async(
self,
external_id_field=external_id_field,
sf_client=sf_client,
reload_after_success=reload_after_success,
update_only=update_only,
only_changes=only_changes,
)
# Otherwise, if not update_only, use save_insert
elif not update_only:
return await save_insert_async(
self, sf_client=sf_client, reload_after_success=reload_after_success
)
else:
# If update_only is True and there's no ID or external ID, raise an error
raise ValueError("Cannot update record without an ID or external ID")
[docs]
def delete(
record: SObject,
sf_client: SalesforceClient | None = None,
clear_id_field: bool = True,
):
sf_client = resolve_client(type(record), sf_client)
_id_val = getattr(record, record.attributes.id_field, None)
if not _id_val:
raise ValueError("Cannot delete unsaved record (missing ID to delete)")
if record.attributes.tooling:
url = f"{sf_client.tooling_sobjects_url}/{record.attributes.type}/{_id_val}"
else:
url = f"{sf_client.sobjects_url}/{record.attributes.type}/{_id_val}"
_ = sf_client.delete(url).raise_for_status()
if clear_id_field:
delattr(record, record.attributes.id_field)
[docs]
async def delete_async(
record: SObject,
sf_client: AsyncSalesforceClient | None = None,
clear_id_field: bool = True,
):
sf_client = resolve_async_client(type(record), sf_client)
_id_val = getattr(record, record.attributes.id_field, None)
if not _id_val:
raise ValueError("Cannot delete unsaved record (missing ID to delete)")
if record.attributes.tooling:
url = f"{sf_client.tooling_sobjects_url}/{record.attributes.type}/{_id_val}"
else:
url = f"{sf_client.sobjects_url}/{record.attributes.type}/{_id_val}"
_ = (await sf_client.delete(url)).raise_for_status()
if clear_id_field:
delattr(record, record.attributes.id_field)
[docs]
def download_file(
record: SObject, dest: Path | None, sf_client: SalesforceClient | None = None
) -> None | bytes:
"""
Download the file associated with the blob field to the specified destination.
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_sobject_blob_retrieve.htm
Args:
dest (Path | None): The destination path to save the file.
If None, file content will be returned as bytes instead.
"""
assert record.attributes.blob_field, "Object type must specify a blob field"
assert not record.attributes.tooling, (
"Cannot download file/BLOB from tooling object"
)
record_id = getattr(record, record.attributes.id_field, None)
assert record_id, "Record ID cannot be None or Empty for file download"
sf_client = resolve_client(type(record), sf_client)
url = (
f"{sf_client.sobjects_url}/{record.attributes.type}"
f"/{record_id}/{record.attributes.blob_field}"
)
with sf_client.stream("GET", url) as response:
if dest:
with dest.open("wb") as file:
for block in response.iter_bytes():
_ = file.write(block)
return None
else:
return response.read()
[docs]
async def download_file_async(
record: SObject, dest: Path | None, sf_client: AsyncSalesforceClient | None = None
) -> None | bytes:
"""
Download the file associated with the blob field to the specified destination.
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_sobject_blob_retrieve.htm
Args:
dest (Path | None): The destination path to save the file.
If None, file content will be returned as bytes instead.
"""
assert record.attributes.blob_field, "Object type must specify a blob field"
assert not record.attributes.tooling, (
"Cannot download file/BLOB from tooling object"
)
record_id = getattr(record, record.attributes.id_field, None)
assert record_id, "Record ID cannot be None or Empty for file download"
sf_client = resolve_async_client(type(record), sf_client)
url = (
f"{sf_client.sobjects_url}/{record.attributes.type}"
f"/{record_id}/{record.attributes.blob_field}"
)
async with sf_client.stream("GET", url) as response:
if dest:
with dest.open("wb") as file:
for block in response.iter_bytes():
_ = file.write(block)
return None
else:
return response.read()
[docs]
def reload(record: SObject, sf_client: SalesforceClient | None = None):
record_id: str = getattr(record, record.attributes.id_field)
sf_client = resolve_client(type(record), sf_client)
reloaded = fetch(type(record), record_id, sf_client)
record._values.update(reloaded._values) # pyright: ignore[reportPrivateUsage]
[docs]
async def reload_async(record: SObject, sf_client: AsyncSalesforceClient | None = None):
record_id: str = getattr(record, record.attributes.id_field)
sf_client = resolve_async_client(type(record), sf_client)
reloaded = await fetch_async(type(record), record_id, sf_client)
record._values.update(reloaded._values) # pyright: ignore[reportPrivateUsage]
[docs]
def update_record(record: FieldConfigurableObject, /, **props: Any):
_fields = object_fields(type(record))
for key, value in props.items():
if key in _fields:
setattr(record, key, value)
[docs]
def fetch_list(
cls: type[_sObject],
*ids: str,
sf_client: SalesforceClient | None = None,
on_chunk_received: Callable[[Response], None] | None = None,
) -> "SObjectList[_sObject]":
sf_client = resolve_client(cls, sf_client)
if len(ids) == 1:
return SObjectList(
[fetch(cls, ids[0], sf_client)], connection=cls.attributes.connection
)
# pull in batches with composite API
result: SObjectList[_sObject] = SObjectList(connection=cls.attributes.connection)
for chunk in chunked(ids, 2000):
response = sf_client.post(
sf_client.composite_sobjects_url(cls.attributes.type),
json={"ids": chunk, "fields": query_fields(cls)},
)
result.extend([cls(**record) for record in response.json()]) # pyright: ignore[reportUnknownMemberType]
if on_chunk_received:
on_chunk_received(response)
return result
[docs]
async def fetch_list_async(
cls: type[_sObject],
*ids: str,
sf_client: AsyncSalesforceClient | None = None,
concurrency: int = 1,
on_chunk_received: Callable[[Response], Coroutine[None, None, None] | None]
| None = None,
) -> "SObjectList[_sObject]":
sf_client = resolve_async_client(cls, sf_client)
async with sf_client:
tasks = [
sf_client.post(
sf_client.composite_sobjects_url(cls.attributes.type),
json={"ids": chunk, "fields": query_fields(cls)},
)
for chunk in chunked(ids, 2000)
]
records: SObjectList[_sObject] = SObjectList(
( # type: ignore
cls(**record)
for response in (
await run_concurrently(concurrency, tasks, on_chunk_received)
)
for record in response.json()
),
connection=cls.attributes.connection,
)
return records
[docs]
def sobject_describe(cls: type[_sObject]):
"""
Retrieves detailed metadata information about the SObject from Salesforce.
Returns:
dict: The full describe result containing metadata about the SObject's
fields, relationships, and other properties.
"""
sf_client = resolve_client(cls, None)
# Use the describe endpoint for this SObject type
describe_url = f"{sf_client.sobjects_url}/{cls.attributes.type}/describe"
# Make the request to get the describe metadata
response = sf_client.get(describe_url)
# Return the describe metadata as a dictionary
return response.json()
[docs]
def sobject_from_description(
sobject: str,
connection: str = "",
ignore_fields: Container[str] | None = None,
base_class: type[_sObject] = SObject,
) -> type[_sObject]:
"""
Build an SObject type definition for the named SObject based on the object 'describe' from Salesforce
Args:
sobject (str): The API name of the SObject in Salesforce
connection (str): The name of the Salesforce connection to use
Returns:
type[SObject]: A dynamically created SObject subclass with fields matching the describe result
"""
sf_client = SalesforceClient.get_connection(connection)
# Get the describe metadata for this SObject
describe_url = f"{sf_client.sobjects_url}/{sobject}/describe"
describe_data = SObjectDescribe.from_dict(sf_client.get(describe_url).json())
# Extract field information
fields = {}
for field in describe_data.fields:
if ignore_fields and field.name in ignore_fields:
continue
if field.type == "reference":
field_cls = IdField
elif field.type in FIELD_TYPE_LOOKUP:
field_cls: type[Field[Any]] = FIELD_TYPE_LOOKUP[field.type]
else:
_logger.error(
"Unsupported field type '%s' for field '%s.%s'",
field.type,
sobject,
field.name,
)
continue
kwargs: dict[str, Any] = {}
flags: list[FieldFlag] = []
if not field.updateable:
flags.append(FieldFlag.readonly)
fields[field.name] = field_cls(*flags, **kwargs) # type: ignore
# Create a new SObject subclass
sobject_class = type(
f"SObject__{sobject}",
(base_class,),
{
"__doc__": f"Auto-generated SObject class for {sobject} ({describe_data.label})",
**fields,
},
api_name=sobject,
connection=connection,
)
return sobject_class # pyright: ignore[reportReturnType]
### SOBJECT LIST OPERATORS ###
def _ensure_consistent_sobject_type(
self: SObjectList[_sObject],
) -> type[_sObject] | None:
"""Validate that all SObjects in the list are of the same type."""
if not self:
return None
first_type = type(self[0])
for i, obj in enumerate(self[1:], 1):
if type(obj) is not first_type:
raise TypeError(
(
f"All objects must be of the same type. First item is {first_type.__name__}, "
f"but item at index {i} is {type(obj).__name__}"
)
)
return first_type
def _generate_record_batches(
self: SObjectList[_sObject],
max_batch_size: int = 200,
only_changes: bool = False,
include_fields: list[str] | None = None,
):
"""
Generate batches of records for processing such that Salesforce will not
reject any given batch due to size or type.
Excerpt from https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_create.htm
> If the request body includes objects of more than one type, they are processed as chunks.
> For example, if the incoming objects are {account1, account2, contact1, account3},
> the request is processed in three chunks: {{account1, account2}, {contact1}, {account3}}.
> A single request can process up to 10 chunks.
"""
if max_batch_size > 200:
_logger.warning(
"batch size is %d, but Salesforce only allows 200", max_batch_size
)
max_batch_size = 200
emitted_records: list[_sObject] = []
batches: list[tuple[list[dict[str, Any]], list[tuple[str, BlobData]]]] = []
previous_record = None
batch_records: list[dict[str, Any]] = []
batch_binary_parts: list[tuple[str, BlobData]] = []
batch_chunk_count = 0
for idx, record in enumerate(self):
if only_changes and not dirty_fields(record):
continue
s_record = serialize_object(record, only_changes)
if include_fields:
rec_fields = object_fields(type(record))
for fieldname in include_fields:
s_record[fieldname] = rec_fields[fieldname].format(
getattr(record, fieldname)
)
s_record["attributes"] = {"type": record.attributes.type}
if record.attributes.blob_field and (
blob_value := getattr(record, record.attributes.blob_field)
):
binary_part_name = "binaryPart" + str(idx)
s_record["attributes"].update(
{
"binaryPartName": binary_part_name,
"binaryPartNameAlias": record.attributes.blob_field,
}
)
batch_binary_parts.append((binary_part_name, blob_value))
if len(batch_records) >= max_batch_size:
batches.append((batch_records, batch_binary_parts))
batch_records = []
batch_chunk_count = 0
previous_record = None
if (
previous_record is None
or previous_record.attributes.type != record.attributes.type
):
batch_chunk_count += 1
if batch_chunk_count > 10:
batches.append((batch_records, batch_binary_parts))
batch_records = []
batch_chunk_count = 0
previous_record = None
batch_records.append(s_record)
emitted_records.append(record)
previous_record = record
if batch_records:
batches.append((batch_records, batch_binary_parts))
return batches, emitted_records
[docs]
def save_list(
self: SObjectList[_sObject],
external_id_field: str | None = None,
only_changes: bool = False,
batch_size: int = 200,
all_or_none: bool = False,
update_only: bool = False,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Save all SObjects in the list, determining whether to insert, update, or upsert based on the records and parameters.
Args:
external_id_field: Name of the external ID field to use for upserting (if provided)
only_changes: If True, only send changed fields for updates
concurrency: Number of concurrent requests to make
batch_size: Number of records to include in each batch
all_or_none: If True, all records must succeed or all will fail
update_only: If True with external_id_field, only update existing records
**callout_options: Additional options to pass to the API calls
Returns:
list[SObjectSaveResult]: List of save results
"""
if not self:
return []
# If external_id_field is provided, use upsert
if external_id_field:
# Create a new list to ensure all objects have the external ID field
upsert_objects = SObjectList(
[obj for obj in self if hasattr(obj, external_id_field)],
connection=self.connection,
)
# Check if any objects are missing the external ID field
if len(upsert_objects) != len(self):
missing_ext_ids = sum(
1 for obj in self if not hasattr(obj, external_id_field)
)
raise ValueError(
f"Cannot upsert: {missing_ext_ids} records missing external ID field '{external_id_field}'"
)
return save_upsert_list(
upsert_objects,
external_id_field=external_id_field,
batch_size=batch_size,
only_changes=only_changes,
all_or_none=all_or_none,
**callout_options,
)
# Check if we're dealing with mixed operations (some records have IDs, some don't)
has_ids = [obj for obj in self if getattr(obj, obj.attributes.id_field, None)]
missing_ids = [
obj for obj in self if not getattr(obj, obj.attributes.id_field, None)
]
# If all records have IDs, use update
if len(has_ids) == len(self):
return save_update_list(
self,
only_changes=only_changes,
batch_size=batch_size,
**callout_options,
)
# If all records are missing IDs, use insert
elif len(missing_ids) == len(self):
if update_only:
raise ValueError(
"Cannot perform update_only operation when no records have IDs"
)
return save_insert_list(self, batch_size=batch_size, **callout_options)
# Mixed case - some records have IDs, some don't
else:
if update_only:
# If update_only, we should only process records with IDs
return save_update_list(
SObjectList(has_ids, connection=self.connection),
only_changes=only_changes,
batch_size=batch_size,
**callout_options,
)
# Otherwise, split and process separately
results: list[SObjectSaveResult] = []
# Process updates first
if has_ids:
update_results = save_update_list(
SObjectList(has_ids, connection=self.connection),
only_changes=only_changes,
batch_size=batch_size,
**callout_options,
)
results.extend(update_results)
# Then process inserts
if missing_ids and not update_only:
insert_results = save_insert_list(
SObjectList(missing_ids, connection=self.connection),
batch_size=batch_size,
**callout_options,
)
results.extend(insert_results)
return results
[docs]
def save_upsert_bulk(
self: SObjectList[_sObject],
external_id_field: str,
connection: SalesforceClient | str | None = None,
**callout_options: Any,
) -> BulkApiIngestJob:
"""Upsert records in bulk using Salesforce Bulk API 2.0
This method uses the Bulk API 2.0 to upsert records based on an external ID field.
The external ID field must exist on the object and be marked as an external ID.
Args:
external_id_field: The API name of the external ID field to use for the upsert
timeout: Maximum time in seconds to wait for the job to complete
Returns:
Dict[str, Any]: Job result information
Raises:
SalesforceBulkV2LoadError: If the job fails or times out
ValueError: If the list is empty or the external ID field doesn't exist
"""
assert self, "Cannot upsert empty SObjectList"
if not connection:
connection = self[0].attributes.connection
job = BulkApiIngestJob.init_job(
self[0].attributes.type,
"upsert",
external_id_field=external_id_field,
connection=connection,
**callout_options,
)
_ = job.upload_batches(self, **callout_options)
return job
[docs]
async def save_upsert_bulk_async(
self: SObjectList[_sObject],
external_id_field: str,
connection: AsyncSalesforceClient | str | None = None,
) -> BulkApiIngestJob:
"""Upsert records in bulk using Salesforce Bulk API 2.0
This method uses the Bulk API 2.0 to upsert records based on an external ID field.
The external ID field must exist on the object and be marked as an external ID.
Args:
external_id_field: The API name of the external ID field to use for the upsert
timeout: Maximum time in seconds to wait for the job to complete
Returns:
Dict[str, Any]: Job result information
Raises:
SalesforceBulkV2LoadError: If the job fails or times out
ValueError: If the list is empty or the external ID field doesn't exist
"""
assert self, "Cannot upsert empty SObjectList"
if not connection:
connection = self[0].attributes.connection
job = await BulkApiIngestJob.init_job_async(
self[0].attributes.type,
"upsert",
external_id_field=external_id_field,
connection=connection,
)
_ = await job.upload_batches_async(self)
return job
[docs]
def save_insert_bulk(
self: SObjectList[_sObject],
connection: SalesforceClient | str | None = None,
**callout_options: Any,
) -> BulkApiIngestJob:
"""Insert records in bulk using Salesforce Bulk API 2.0
This method uses the Bulk API 2.0 to insert records.
Args:
timeout: Maximum time in seconds to wait for the job to complete
Returns:
Dict[str, Any]: Job result information
Raises:
SalesforceBulkV2LoadError: If the job fails or times out
ValueError: If the list is empty or the external ID field doesn't exist
"""
assert self, "Cannot upsert empty SObjectList"
if not connection:
connection = self[0].attributes.connection
job = BulkApiIngestJob.init_job(
self[0].attributes.type, "insert", connection=connection, **callout_options
)
_ = job.upload_batches(self, **callout_options)
return job
[docs]
async def save_insert_bulk_async(
records: SObjectList[_sObject],
connection: AsyncSalesforceClient | str | None = None,
**callout_options: Any,
) -> BulkApiIngestJob | None:
"""Insert records in bulk using Salesforce Bulk API 2.0
This method uses the Bulk API 2.0 to insert records.
Returns:
Dict[str, Any]: Job result information
Raises:
SalesforceBulkV2LoadError: If the job fails or times out
ValueError: If the list is empty or the external ID field doesn't exist
"""
if not records:
_logger.warning("Cannot update empty SObjectList")
return None
if not connection:
connection = records[0].attributes.connection
job: BulkApiIngestJob = await BulkApiIngestJob.init_job_async(
records[0].attributes.type, "insert", connection=connection, **callout_options
)
_ = await job.upload_batches_async(records, **callout_options)
return job
[docs]
def save_update_bulk(
records: SObjectList[_sObject],
connection: SalesforceClient | str | None = None,
**callout_options: Any,
) -> BulkApiIngestJob | None:
"""Update records in bulk using Salesforce Bulk API 2.0
This method uses the Bulk API 2.0 to update records.
Returns:
Dict[str, Any]: Job result information
Raises:
SalesforceBulkV2LoadError: If the job fails or times out
ValueError: If the list is empty or the external ID field doesn't exist
"""
if not records:
_logger.warning("Cannot update empty SObjectList")
return None
if not connection:
connection = records[0].attributes.connection
job = BulkApiIngestJob.init_job(
records[0].attributes.type, "update", connection=connection, **callout_options
)
_ = job.upload_batches(records, **callout_options)
return job
[docs]
async def save_update_bulk_async(
records: SObjectList[_sObject],
connection: AsyncSalesforceClient | str | None = None,
**callout_options: Any,
) -> BulkApiIngestJob | None:
"""Update records in bulk using Salesforce Bulk API 2.0
This method uses the Bulk API 2.0 to update records.
Returns:
Dict[str, Any]: Job result information
Raises:
SalesforceBulkV2LoadError: If the job fails or times out
ValueError: If the list is empty or the external ID field doesn't exist
"""
if not records:
_logger.warning("Cannot update empty SObjectList")
return None
if not connection:
connection = records[0].attributes.connection
job: BulkApiIngestJob = await BulkApiIngestJob.init_job_async(
records[0].attributes.type, "update", connection=connection, **callout_options
)
_ = await job.upload_batches_async(records, **callout_options)
return job
[docs]
def save_insert_list(
self: SObjectList[_sObject],
batch_size: int = 200,
all_or_none: bool = False,
sf_client: SalesforceClient | None = None,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Insert all SObjects in the list.
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_create.htm
Returns:
self: The list of SObjectSaveResults indicating success or failure of each insert operation
"""
if not self:
return []
sf_client = resolve_client(type(self[0]), sf_client)
# Ensure none of the records have IDs
for obj in self:
if getattr(obj, obj.attributes.id_field, None):
raise ValueError(
f"Cannot insert record that already has an {obj.attributes.id_field} set"
)
# Prepare records for insert
record_chunks, emitted_records = _generate_record_batches(self, batch_size)
headers = {"Content-Type": "application/json"}
if headers_option := callout_options.pop("headers", None):
headers.update(headers_option)
# execute sync
results: list[SObjectSaveResult] = []
for records, blobs in record_chunks:
if blobs:
with ExitStack() as blob_context:
files: list[tuple[str, tuple[str | None, Any, str | None]]] = [
(
"entity_content",
(None, json.dumps(records), "application/json"),
),
# (
# self.attributes.blob_field,
# (blob_data.filename, blob_payload, blob_data.content_type)
# ),
]
for name, blob_data in blobs:
blob_payload = blob_context.enter_context(blob_data)
files.append(
(
name,
(
blob_data.filename,
blob_payload,
blob_data.content_type,
),
)
)
response = sf_client.post(
sf_client.composite_sobjects_url(), files=files
)
else:
response = sf_client.post(
sf_client.composite_sobjects_url(),
json={"allOrNone": all_or_none, "records": records},
headers=headers,
**callout_options,
)
results.extend([SObjectSaveResult(**result) for result in response.json()])
for record, result in zip(emitted_records, results):
if result.success:
setattr(record, record.attributes.id_field, result.id)
return results
[docs]
async def save_insert_list_async(
records: SObjectList[_sObject],
concurrency: int = 5,
batch_size: int = 200,
all_or_none: bool = False,
sf_client: AsyncSalesforceClient | None = None,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Insert all SObjects in the list.
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_create.htm
Returns:
self: The list of SObjectSaveResults indicating success or failure of each insert operation
"""
if not records:
return []
sf_client = resolve_async_client(type(records[0]), sf_client)
# Ensure none of the records have IDs
for obj in records:
if getattr(obj, obj.attributes.id_field, None):
raise ValueError(
f"Cannot insert record that already has an {obj.attributes.id_field} set"
)
# Prepare records for insert
record_chunks, _ = _generate_record_batches(records, batch_size)
headers = {"Content-Type": "application/json"}
if headers_option := callout_options.pop("headers", None):
headers.update(headers_option)
results = await _insert_list_chunks_async(
sf_client,
record_chunks,
headers,
concurrency,
all_or_none,
**callout_options,
)
for record, result in zip(records, results):
if result.success:
setattr(record, record.attributes.id_field, result.id)
return results
async def _insert_list_chunks_async(
sf_client: AsyncSalesforceClient | None,
record_chunks: list[tuple[list[dict[str, Any]], list[tuple[str, BlobData]]]],
headers: dict[str, str],
concurrency: int,
all_or_none: bool,
**callout_options: Any,
) -> list[SObjectSaveResult]:
sf_client = sf_client or AsyncSalesforceClient.get_connection()
if header_options := callout_options.pop("headers", None):
headers.update(header_options)
tasks = [
_save_insert_async_batch(
sf_client,
sf_client.composite_sobjects_url(),
records,
blobs,
all_or_none,
headers,
**callout_options,
)
for records, blobs in record_chunks
]
responses = await run_concurrently(concurrency, tasks)
return [
SObjectSaveResult(**result)
for response in responses
for result in response.json()
]
async def _save_insert_async_batch(
# cls: type[SObjectList[_sObject]],
sf_client: AsyncSalesforceClient,
url: str,
records: list[dict[str, Any]],
blobs: list[tuple[str, BlobData]] | None,
all_or_none: bool,
headers: dict[str, str],
**callout_options: Any,
) -> Response:
if blobs:
with ExitStack() as blob_context:
return await sf_client.post(
url,
files=[
(
"entity_content",
(
None,
json.dumps({"allOrNone": all_or_none, "records": records}),
"application/json",
),
),
*(
(
name,
(
blob_data.filename,
blob_context.enter_context(blob_data),
blob_data.content_type,
),
)
for name, blob_data in blobs
),
],
)
return await sf_client.post(
sf_client.composite_sobjects_url(),
json={"allOrNone": all_or_none, "records": records},
headers=headers,
**callout_options,
)
[docs]
def save_update_list(
self: SObjectList[_sObject],
only_changes: bool = False,
all_or_none: bool = False,
batch_size: int = 200,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Update all SObjects in the list.
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_update.htm
Args:
only_changes: If True, only send changed fields
batch_size: Number of records to include in each batch
**callout_options: Additional options to pass to the API call
Returns:
list[SObjectSaveResult]: List of save results
"""
if not self:
return []
# Ensure all records have IDs
for i, record in enumerate(self):
id_val = getattr(record, record.attributes.id_field, None)
if not id_val:
raise ValueError(
f"Record at index {i} has no {record.attributes.id_field} for update"
)
if record.attributes.blob_field and getattr(
record, record.attributes.blob_field
):
raise ValueError(
(
f"Cannot update files in composite calls. "
f"{type(record).__name__} Record at index {i} has Blob/File "
f"value for field {record.attributes.blob_field}"
)
)
# Prepare records for update
record_chunks, emitted_records = _generate_record_batches(
self, batch_size, only_changes
)
headers = {"Content-Type": "application/json"}
if headers_option := callout_options.pop("headers", None):
headers.update(headers_option)
sf_client = resolve_client(type(self[0]), None)
# execute sync
results: list[SObjectSaveResult] = []
for records, blobs in record_chunks:
assert not blobs, "Cannot update collections with files"
response = sf_client.patch(
sf_client.composite_sobjects_url(),
json={"allOrNone": all_or_none, "records": records},
headers=headers,
**callout_options,
)
results.extend([SObjectSaveResult(**result) for result in response.json()])
for record, result in zip(emitted_records, results):
if result.success:
dirty_fields(record).clear()
return results
[docs]
async def save_update_list_async(
self: SObjectList[_sObject],
only_changes: bool = False,
all_or_none: bool = False,
batch_size: int = 200,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Update all SObjects in the list.
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_update.htm
Args:
only_changes: If True, only send changed fields
batch_size: Number of records to include in each batch
**callout_options: Additional options to pass to the API call
Returns:
list[SObjectSaveResult]: List of save results
"""
if not self:
return []
# Ensure all records have IDs
for i, record in enumerate(self):
id_val = getattr(record, record.attributes.id_field, None)
if not id_val:
raise ValueError(
f"Record at index {i} has no {record.attributes.id_field} for update"
)
if record.attributes.blob_field and getattr(
record, record.attributes.blob_field
):
raise ValueError(
(
f"Cannot update files in composite calls. "
f"{type(record).__name__} Record at index {i} has Blob/File "
f"value for field {record.attributes.blob_field}"
)
)
# Prepare records for update
record_chunks, emitted_records = _generate_record_batches(
self, batch_size, only_changes
)
headers = {"Content-Type": "application/json"}
if headers_option := callout_options.pop("headers", None):
headers.update(headers_option)
sf_client = resolve_async_client(type(self[0]), None)
# execute sync
results: list[SObjectSaveResult] = await _list_save_update_async(
[chunk[0] for chunk in record_chunks],
all_or_none,
headers,
sf_client,
**callout_options,
)
for record, result in zip(emitted_records, results):
if result.success:
dirty_fields(record).clear()
return results
async def _list_save_update_async(
record_chunks: list[list[dict[str, Any]]],
all_or_none: bool,
headers: dict[str, str],
sf_client: AsyncSalesforceClient,
**callout_options: Any,
) -> list[SObjectSaveResult]:
tasks = [
sf_client.post(
sf_client.composite_sobjects_url(),
json={"allOrNone": all_or_none, "records": chunk},
headers=headers,
**callout_options,
)
for chunk in record_chunks
]
responses = await asyncio.gather(*tasks)
return [
SObjectSaveResult(**result)
for response in responses
for result in response.json()
]
[docs]
def save_upsert_list(
records: SObjectList[_sObject],
external_id_field: str,
concurrency: int = 1,
batch_size: int = 200,
only_changes: bool = False,
all_or_none: bool = False,
sf_client: SalesforceClient | None = None,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Upsert all SObjects in the list using an external ID field.
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_upsert.htm
Args:
external_id_field: Name of the external ID field to use for upserting
concurrency: Number of concurrent requests to make
batch_size: Number of records to include in each batch
only_changes: If True, only send changed fields for updates
**callout_options: Additional options to pass to the API call
Returns:
list[SObjectSaveResult]: List of save results
"""
object_type = _ensure_consistent_sobject_type(records)
if not object_type:
# no records to upsert, early return
return []
sf_client = resolve_client(object_type, sf_client or records.connection)
# Ensure all records have the external ID field
for i, record in enumerate(records):
ext_id_val = getattr(record, external_id_field, None)
if not ext_id_val:
raise AssertionError(
f"Record at index {i} has no value for external ID field '{external_id_field}'"
)
if record.attributes.blob_field and getattr(
record, record.attributes.blob_field
):
raise ValueError(
(
f"Cannot update files in composite calls. "
f"{type(record).__name__} Record at index {i} has Blob/File "
f"value for field {record.attributes.blob_field}"
)
)
# Chunk the requests
record_batches, emitted_records = _generate_record_batches(
records, batch_size, only_changes, include_fields=[external_id_field]
)
headers = {"Content-Type": "application/json"}
headers_option: dict[str, str] | None
if headers_option := callout_options.pop("headers", None):
headers.update(headers_option)
url = (
sf_client.composite_sobjects_url(object_type.attributes.type)
+ "/"
+ external_id_field
)
results: list[SObjectSaveResult]
if concurrency > 1 and len(record_batches) > 1:
sf_client = resolve_async_client(object_type, None)
# execute async
results = asyncio.run(
_save_upsert_list_chunks_async(
sf_client,
url,
[batch[0] for batch in record_batches],
headers,
concurrency,
all_or_none,
**callout_options,
)
)
else:
# execute sync
results = []
for record_batch in record_batches:
response = sf_client.patch(
url,
json={"allOrNone": all_or_none, "records": record_batch[0]},
headers=headers,
)
results.extend([SObjectSaveResult(**result) for result in response.json()])
# Clear dirty fields as operations were successful
for record, result in zip(emitted_records, results):
if result.success:
dirty_fields(record).clear()
return results
[docs]
async def save_upsert_list_async(
records: SObjectList[_sObject],
external_id_field: str,
concurrency: int = 1,
batch_size: int = 200,
only_changes: bool = False,
all_or_none: bool = False,
sf_client: AsyncSalesforceClient | None = None,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Upsert all SObjects in the list using an external ID field.
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_upsert.htm
Args:
external_id_field: Name of the external ID field to use for upserting
concurrency: Number of concurrent requests to make
batch_size: Number of records to include in each batch
only_changes: If True, only send changed fields for updates
**callout_options: Additional options to pass to the API call
Returns:
list[SObjectSaveResult]: List of save results
"""
object_type = _ensure_consistent_sobject_type(records)
if not object_type:
# no records to upsert, early return
return []
sf_client = resolve_async_client(object_type, sf_client or records.connection)
# Ensure all records have the external ID field
for i, record in enumerate(records):
ext_id_val = getattr(record, external_id_field, None)
if not ext_id_val:
raise AssertionError(
f"Record at index {i} has no value for external ID field '{external_id_field}'"
)
if record.attributes.blob_field and getattr(
record, record.attributes.blob_field
):
raise ValueError(
(
f"Cannot update files in composite calls. "
f"{type(record).__name__} Record at index {i} has Blob/File "
f"value for field {record.attributes.blob_field}"
)
)
# Chunk the requests
record_batches, emitted_records = _generate_record_batches(
records, batch_size, only_changes, include_fields=[external_id_field]
)
headers = {"Content-Type": "application/json"}
if headers_option := callout_options.pop("headers", None):
headers.update(headers_option)
url = (
sf_client.composite_sobjects_url(object_type.attributes.type)
+ "/"
+ external_id_field
)
results: list[SObjectSaveResult] = await _save_upsert_list_chunks_async(
sf_client,
url,
[batch[0] for batch in record_batches],
headers,
concurrency,
all_or_none,
**callout_options,
)
# Clear dirty fields as operations were successful
for record, result in zip(emitted_records, results):
if result.success:
dirty_fields(record).clear()
return results
async def _save_upsert_list_chunks_async(
sf_client: AsyncSalesforceClient,
url: str,
record_chunks: list[list[dict[str, Any]]],
headers: dict[str, str],
concurrency: int,
all_or_none: bool,
**callout_options: Any,
) -> list[SObjectSaveResult]:
tasks = [
sf_client.patch(
url,
json={"allOrNone": all_or_none, "records": chunk},
headers=headers,
**callout_options,
)
for chunk in record_chunks
if chunk
]
responses = await run_concurrently(concurrency, tasks)
results = [
SObjectSaveResult(**result)
for response in responses
for result in response.json()
]
return results
[docs]
def delete_list(
records: SObjectList[_sObject],
clear_id_field: bool = False,
batch_size: int = 200,
all_or_none: bool = False,
sf_client: SalesforceClient | None = None,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Delete all SObjects in the list.
Args:
clear_id_field: If True, clear the ID field on the objects after deletion
Returns:
self: The list itself for method chaining
"""
if not records:
return []
record_id_batches = list(
chunked(
[
record_id
for obj in records
if (record_id := getattr(obj, obj.attributes.id_field, None))
],
batch_size,
)
)
results: list[SObjectSaveResult]
sf_client = resolve_client(type(records[0]), sf_client or records.connection)
headers = {"Content-Type": "application/json"}
if headers_option := callout_options.pop("headers", None):
headers.update(headers_option)
url = sf_client.composite_sobjects_url()
results = []
for batch in record_id_batches:
response = sf_client.delete(
url,
params={"allOrNone": all_or_none, "ids": ",".join(batch)},
headers=headers,
**callout_options,
)
results.extend([SObjectSaveResult(**result) for result in response.json()])
if clear_id_field:
for record, result in zip(records, results):
if result.success:
delattr(record, record.attributes.id_field)
return results
[docs]
async def delete_list_async(
records: SObjectList[_sObject],
clear_id_field: bool = False,
batch_size: int = 200,
concurrency: int = 1,
all_or_none: bool = False,
sf_client: AsyncSalesforceClient | None = None,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Delete all SObjects in the list.
Args:
clear_id_field: If True, clear the ID field on the objects after deletion
Returns:
self: The list itself for method chaining
"""
if not records:
return []
record_id_batches = list(
chunked(
[
record_id
for obj in records
if (record_id := getattr(obj, obj.attributes.id_field, None))
],
batch_size,
)
)
results: list[SObjectSaveResult] = []
sf_client = resolve_async_client(type(records[0]), sf_client or records.connection)
results = await _delete_list_chunks_async(
sf_client,
record_id_batches,
all_or_none,
concurrency,
**callout_options,
)
if clear_id_field:
for record, result in zip(records, results):
if result.success:
delattr(record, record.attributes.id_field)
return results
async def _delete_list_chunks_async(
sf_client: AsyncSalesforceClient,
record_id_batches: list[list[str]],
all_or_none: bool,
concurrency: int,
**callout_options: Any,
) -> list[SObjectSaveResult]:
"""
Delete all SObjects in the list asynchronously.
Args:
sf_client: The Salesforce client
record_id_batches: List of batches of record IDs to delete
all_or_none: If True, delete all records or none
callout_options: Additional options for the callout
Returns:
List of SObjectSaveResult objects
"""
url = sf_client.composite_sobjects_url()
headers = {"Content-Type": "application/json"}
if headers_option := callout_options.pop("headers", None):
headers.update(headers_option)
tasks = [
sf_client.delete(
url,
params={"allOrNone": all_or_none, "ids": ",".join(record_id)},
headers=headers,
**callout_options,
)
for record_id in record_id_batches
]
responses = await run_concurrently(concurrency, tasks)
results = [
SObjectSaveResult(**result)
for response in responses
for result in response.json()
]
return results