Source code for sf_toolkit.data.sobject

import asyncio
from contextlib import ExitStack
import json
from pathlib import Path
from typing import (
    Any,
    Callable,
    Iterable,
    AsyncIterable,
    TypeVar,
    Coroutine,
)

from urllib.parse import quote_plus
import warnings
from httpx import Response

from .. import client as sftk_client

from more_itertools import chunked

from ..async_utils import run_concurrently
from .._models import SObjectAttributes, SObjectSaveResult
from ..interfaces import I_AsyncSalesforceClient, I_SObject, I_SalesforceClient
from .fields import (
    BlobData,
    BlobField,
    Field,
    FIELD_TYPE_LOOKUP,
    FieldConfigurableObject,
    FieldFlag,
    SObjectFieldDescribe,
)
from .transformers import flatten, unflatten

_sObject = TypeVar("_sObject", bound=("SObject"))

_T = TypeVar("_T")


[docs] class SObjectDescribe: """Represents metadata about a Salesforce SObject from a describe call"""
[docs] def __init__( self, *, name: str = "", label: str = "", labelPlural: str = "", keyPrefix: str = "", custom: bool = False, customSetting: bool = False, createable: bool = False, updateable: bool = False, deletable: bool = False, undeletable: bool = False, mergeable: bool = False, queryable: bool = False, feedEnabled: bool = False, searchable: bool = False, layoutable: bool = False, activateable: bool = False, fields: list[SObjectFieldDescribe] | None = None, childRelationships: list[dict] | None = None, recordTypeInfos: list[dict] | None = None, **additional_properties, ): self.name = name self.label = label self.labelPlural = labelPlural self.keyPrefix = keyPrefix self.custom = custom self.customSetting = customSetting self.createable = createable self.updateable = updateable self.deletable = deletable self.undeletable = undeletable self.mergeable = mergeable self.queryable = queryable self.feedEnabled = feedEnabled self.searchable = searchable self.layoutable = layoutable self.activateable = activateable self.fields = fields or [] self.childRelationships = childRelationships or [] self.recordTypeInfos = recordTypeInfos or [] self._raw_data = {**additional_properties} # Add all explicit properties to _raw_data too for key, value in self.__dict__.items(): if not key.startswith("_"): self._raw_data[key] = value
[docs] @classmethod def from_dict(cls, data: dict) -> "SObjectDescribe": """Create an SObjectDescribe instance from a dictionary (typically from a Salesforce API response)""" # Extract fields specifically to convert them to SObjectFieldDescribe objects fields_data = data.pop("fields", []) if "fields" in data else [] # Create SObjectFieldDescribe instances for each field fields = [ SObjectFieldDescribe( **{ k: v for k, v in field_data.items() if k in SObjectFieldDescribe._fields } ) for field_data in fields_data ] # Create the SObjectDescribe with all remaining properties return cls(fields=fields, **data)
[docs] def get_field(self, field_name: str) -> SObjectFieldDescribe | None: """Get the field metadata for a specific field by name""" for field in self.fields: if field.name == field_name: return field return None
[docs] def get_raw_data(self) -> dict: """Get the raw JSON data from the describe call""" return self._raw_data
[docs] class SObject(FieldConfigurableObject, I_SObject): def __init_subclass__( cls, api_name: str | None = None, connection: str = "", id_field: str = "Id", tooling: bool = False, **kwargs, ) -> None: super().__init_subclass__(**kwargs) if not api_name: api_name = cls.__name__ blob_field = None connection = connection or I_SalesforceClient.DEFAULT_CONNECTION_NAME for name, field in cls._fields.items(): if isinstance(field, BlobField): assert blob_field is None, ( "Cannot have multiple Field/Blob fields on a single object" ) blob_field = name if blob_field: del cls._fields[blob_field] cls.attributes = SObjectAttributes( api_name, connection, id_field, blob_field, tooling )
[docs] def __init__(self, /, _strict_fields: bool = True, **fields): fields.pop("attributes", None) blob_value = None if self.attributes.blob_field: blob_value = fields.pop(self.attributes.blob_field, None) super().__init__(_strict_fields=_strict_fields, **fields) if self.attributes.blob_field and blob_value is not None: setattr(self, self.attributes.blob_field, blob_value)
[docs] @classmethod def query( cls: type[_sObject], include_deleted: bool = False ) -> "SoqlQuery[_sObject]": """Create a new SoqlSelect query builder for this SObject type. Args: include_deleted (bool, optional): Whether to include deleted records in the query. Defaults to False. Returns: SoqlSelect: A new query builder instance for this SObject type. Example: ```python # Create a query builder for Contact query = Contact.select() # Add conditions and execute the query result = query.query() ``` """ # delayed import to avoid circular imports if "SoqlQuery" not in globals(): global SoqlQuery from .query_builder import SoqlQuery return SoqlQuery(cls, include_deleted)
[docs] @classmethod def from_file(cls, filepath: Path | str): "" if isinstance(filepath, str): filepath = Path(filepath).resolve() file_extension = filepath.suffix.lower() if file_extension == ".csv": return cls.from_csv_file(filepath) if file_extension == ".json": return cls.from_json_file(filepath) raise ValueError(f"Unknown file extension {file_extension}")
[docs] @classmethod def from_csv_file( cls, filepath: Path | str, file_encoding="utf-8", fieldnames: list[str] | None = None, ): "" import csv if isinstance(filepath, str): filepath = Path(filepath).resolve() with filepath.open(encoding=file_encoding) as csv_file: reader = csv.DictReader(csv_file, fieldnames=fieldnames) assert reader.fieldnames, "no fieldnames found for reader." object_fields = set(cls.query_fields()) for field in reader.fieldnames: if field not in object_fields: raise KeyError( f"Field {field} in {filepath} not found for SObject {cls.__qualname__} ({cls.attributes.type})" ) return SObjectList( (cls(**unflatten(row)) for row in reader), connection=cls.attributes.connection, ) # type: ignore
[docs] @classmethod def from_json_file(cls, filepath: Path | str, file_encoding="utf-8"): "" if isinstance(filepath, str): filepath = Path(filepath).resolve() with filepath.open(encoding=file_encoding) as csv_file: data = json.load(csv_file) if isinstance(data, list): return SObjectList( (cls(**record) for record in data), connection=cls.attributes.connection, ) elif isinstance(data, dict): return SObjectList([cls(**data)], connection=cls.attributes.connection) raise TypeError( f"Unexpected {type(data).__name__} value " f"{str(data)[:50] + '...' if len(str(data)) > 50 else ''} " f"while attempting to load {cls.__qualname__} from {filepath}" )
@classmethod def _client_connection(cls) -> I_SalesforceClient: return sftk_client.SalesforceClient.get_connection(cls.attributes.connection) def _has_blob_content(self) -> bool: """ Check if the SObject instance has any BlobFields with content set """ if not self.attributes.blob_field: return False if self.attributes.blob_field in self._values: return True return False
[docs] @classmethod def read( cls: type[_sObject], record_id: str, sf_client: I_SalesforceClient | None = None, ) -> _sObject: if sf_client is None: sf_client = cls._client_connection() if cls.attributes.tooling: url = f"{sf_client.tooling_sobjects_url}/{cls.attributes.type}/{record_id}" else: url = f"{sf_client.sobjects_url}/{cls.attributes.type}/{record_id}" fields = list(cls.keys()) response_data = sf_client.get(url, params={"fields": ",".join(fields)}).json() return cls(**response_data)
[docs] def save_insert( self, sf_client: I_SalesforceClient | None = None, reload_after_success: bool = False, ): if sf_client is None: sf_client = self._client_connection() # Assert that there is no ID on the record if _id := getattr(self, self.attributes.id_field, None): raise ValueError( f"Cannot insert record that already has an {self.attributes.id_field} set: {_id}" ) # Prepare the payload with all fields payload = self.serialize() if self.attributes.tooling: url = f"{sf_client.tooling_sobjects_url}/{self.attributes.type}" else: url = f"{sf_client.sobjects_url}/{self.attributes.type}" blob_data: BlobData | None = None # Create a new record if self.attributes.blob_field and ( blob_data := getattr(self, self.attributes.blob_field) ): with blob_data as blob_payload: # use BlobData context manager to safely open & close files response_data = sf_client.post( url, files=[ ( "entity_document", (None, json.dumps(payload), "application/json"), ), ( self.attributes.blob_field, (blob_data.filename, blob_payload, blob_data.content_type), ), ], ).json() else: response_data = sf_client.post( url, json=payload, ).json() # Set the new ID on the object _id_val = response_data["id"] setattr(self, self.attributes.id_field, _id_val) # Reload the record if requested if reload_after_success: self.reload(sf_client) # Clear dirty fields since we've saved self.dirty_fields.clear() return
[docs] def save_update( self, sf_client: I_SalesforceClient | None = None, only_changes: bool = False, reload_after_success: bool = False, only_blob: bool = False, ): if sf_client is None: sf_client = self._client_connection() # Assert that there is an ID on the record if not (_id_val := getattr(self, self.attributes.id_field, None)): raise ValueError(f"Cannot update record without {self.attributes.id_field}") # If only tracking changes and there are no changes, do nothing if only_changes and not self.dirty_fields: return # Prepare the payload payload = self.serialize(only_changes) payload.pop(self.attributes.id_field, None) if self.attributes.tooling: url = f"{sf_client.tooling_sobjects_url}/{self.attributes.type}/{_id_val}" else: url = f"{sf_client.sobjects_url}/{self.attributes.type}/{_id_val}" blob_data: BlobData | None = None # Create a new record if self.attributes.blob_field and ( blob_data := getattr(self, self.attributes.blob_field) ): with blob_data as blob_payload: # use BlobData context manager to safely open & close files sf_client.patch( url, files=[ ( "entity_content", (None, json.dumps(payload), "application/json"), ), ( self.attributes.blob_field, (blob_data.filename, blob_payload, blob_data.content_type), ), ], ).json() elif payload: sf_client.patch( url, json=payload, headers={"Content-Type": "application/json"}, ) # Reload the record if requested if reload_after_success: self.reload(sf_client) # Clear dirty fields since we've saved self.dirty_fields.clear() return
[docs] def save_upsert( self, external_id_field: str, sf_client: I_SalesforceClient | None = None, reload_after_success: bool = False, update_only: bool = False, only_changes: bool = False, ): if self.attributes.tooling: raise TypeError("Upsert is not available for Tooling SObjects.") if sf_client is None: sf_client = self._client_connection() # Get the external ID value if not (ext_id_val := getattr(self, external_id_field, None)): raise ValueError( f"Cannot upsert record without a value for external ID field: {external_id_field}" ) # Encode the external ID value in the URL to handle special characters ext_id_val = quote_plus(str(ext_id_val)) # Prepare the payload payload = self.serialize(only_changes) payload.pop(external_id_field, None) # If there's nothing to update when only_changes=True, just return if only_changes and not payload: return # Execute the upsert response = sf_client.patch( f"{sf_client.sobjects_url}/{self.attributes.type}/{external_id_field}/{ext_id_val}", json=payload, params={"updateOnly": update_only} if update_only else None, headers={"Content-Type": "application/json"}, ) # For an insert via upsert, the response contains the new ID if response.is_success: response_data = response.json() _id_val = response_data.get("id") if _id_val: setattr(self, self.attributes.id_field, _id_val) elif update_only and response.status_code == 404: raise ValueError( f"Record not found for external ID field {external_id_field} with value {ext_id_val}" ) # Reload the record if requested if reload_after_success and ( _id_val := getattr(self, self.attributes.id_field, None) ): self.reload(sf_client) # Clear dirty fields since we've saved self.dirty_fields.clear() return self
[docs] def save_csv(self, filepath: Path | str, encoding="utf-8") -> None: import csv if isinstance(filepath, str): filepath = Path(filepath).resolve() with filepath.open("w+", encoding=encoding) as outfile: writer = csv.DictWriter(outfile, fieldnames=self.query_fields()) writer.writeheader() writer.writerow(flatten(self.serialize()))
[docs] def save_json(self, filepath: Path | str, encoding="utf-8", **json_options) -> None: if isinstance(filepath, str): filepath = Path(filepath).resolve() with filepath.open("w+", encoding=encoding) as outfile: json.dump(self.serialize(), outfile, **json_options)
[docs] def save( self, sf_client: I_SalesforceClient | None = None, only_changes: bool = False, reload_after_success: bool = False, external_id_field: str | None = None, update_only: bool = False, ): # If we have an ID value, use save_update if getattr(self, self.attributes.id_field, None) is not None: return self.save_update( sf_client=sf_client, only_changes=only_changes, reload_after_success=reload_after_success, ) # If we have an external ID field, use save_upsert elif external_id_field: return self.save_upsert( external_id_field=external_id_field, sf_client=sf_client, reload_after_success=reload_after_success, update_only=update_only, only_changes=only_changes, ) # Otherwise, if not update_only, use save_insert elif not update_only: return self.save_insert( sf_client=sf_client, reload_after_success=reload_after_success ) else: # If update_only is True and there's no ID or external ID, raise an error raise ValueError("Cannot update record without an ID or external ID")
[docs] def delete( self, sf_client: I_SalesforceClient | None = None, clear_id_field: bool = True ): if sf_client is None: sf_client = self._client_connection() _id_val = getattr(self, self.attributes.id_field, None) if not _id_val: raise ValueError("Cannot delete unsaved record (missing ID to delete)") if self.attributes.tooling: url = f"{sf_client.tooling_sobjects_url}/{self.attributes.type}/{_id_val}" else: url = f"{sf_client.sobjects_url}/{self.attributes.type}/{_id_val}" sf_client.delete(url) if clear_id_field: delattr(self, self.attributes.id_field)
[docs] def download_file( self, dest: Path | None, sf_client: I_SalesforceClient | None = None ) -> None | bytes: """ Download the file associated with the blob field to the specified destination. https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_sobject_blob_retrieve.htm Args: dest (Path | None): The destination path to save the file. If None, file content will be returned as bytes instead. """ assert self.attributes.blob_field, "Object type must specify a blob field" assert not self.attributes.tooling, ( "Cannot download file/BLOB from tooling object" ) record_id = getattr(self, self.attributes.id_field, None) assert record_id, "Record ID cannot be None or Empty for file download" if sf_client is None: sf_client = self._client_connection() url = ( f"{sf_client.sobjects_url}/{self.attributes.type}" f"/{record_id}/{self.attributes.blob_field}" ) with sf_client.stream("GET", url) as response: if dest: with dest.open("wb") as file: for block in response.iter_bytes(): file.write(block) return None else: return response.read()
[docs] def reload(self, sf_client: I_SalesforceClient | None = None): record_id: str = getattr(self, self.attributes.id_field) if sf_client is None: sf_client = self._client_connection() reloaded = type(self).read(record_id, sf_client) self._values.update(reloaded._values)
[docs] def update_values(self, /, **kwargs): for key, value in kwargs.items(): if key in self.keys(): self[key] = value
[docs] @classmethod def list( cls: type[_sObject], *ids: str, sf_client: I_SalesforceClient | None = None, concurrency: int = 1, on_chunk_received: Callable[[Response], None] | None = None, ) -> "SObjectList[_sObject]": if sf_client is None: sf_client = cls._client_connection() if len(ids) == 1: return SObjectList( [cls.read(ids[0], sf_client)], connection=cls.attributes.connection ) # pull in batches with composite API if concurrency > 1 and len(ids) > 2000: # do some async shenanigans return asyncio.run( cls.read_async( *ids, sf_client=sf_client.as_async, concurrency=concurrency, on_chunk_received=on_chunk_received, ) ) else: result: SObjectList[_sObject] = SObjectList( connection=cls.attributes.connection ) for chunk in chunked(ids, 2000): response = sf_client.post( sf_client.composite_sobjects_url(cls.attributes.type), json={"ids": chunk, "fields": list(cls.query_fields())}, ) chunk_result: list[_sObject] = [ cls(**record) for record in response.json() ] result.extend(chunk_result) if on_chunk_received: on_chunk_received(response) return result
[docs] @classmethod async def read_async( cls: type[_sObject], *ids: str, sf_client: I_AsyncSalesforceClient | None = None, concurrency: int = 1, on_chunk_received: Callable[[Response], Coroutine | None] | None = None, ) -> "SObjectList[_sObject]": if sf_client is None: sf_client = cls._client_connection().as_async async with sf_client: tasks = [ sf_client.post( sf_client.composite_sobjects_url(cls.attributes.type), json={"ids": chunk, "fields": list(cls.query_fields())}, ) for chunk in chunked(ids, 2000) ] records: SObjectList[_sObject] = SObjectList( ( # type: ignore cls(**record) for response in ( await run_concurrently(concurrency, tasks, on_chunk_received) ) for record in response.json() ), connection=cls.attributes.connection, ) return records
[docs] @classmethod def describe(cls): """ Retrieves detailed metadata information about the SObject from Salesforce. Returns: dict: The full describe result containing metadata about the SObject's fields, relationships, and other properties. """ sf_client = cls._client_connection() # Use the describe endpoint for this SObject type describe_url = f"{sf_client.sobjects_url}/{cls.attributes.type}/describe" # Make the request to get the describe metadata response = sf_client.get(describe_url) # Return the describe metadata as a dictionary return response.json()
[docs] @classmethod def from_description(cls, sobject: str, connection: str = "") -> type["SObject"]: """ Build an SObject type definition for the named SObject based on the object 'describe' from Salesforce Args: sobject (str): The API name of the SObject in Salesforce connection (str): The name of the Salesforce connection to use Returns: type[SObject]: A dynamically created SObject subclass with fields matching the describe result """ sf_client = sftk_client.SalesforceClient.get_connection(connection) # Get the describe metadata for this SObject describe_url = f"{sf_client.sobjects_url}/{sobject}/describe" describe_data = SObjectDescribe.from_dict(sf_client.get(describe_url).json()) # Extract field information fields = {} for field in describe_data.fields: field_name = field.name field_type = field.type field_cls: type[Field] = FIELD_TYPE_LOOKUP[field_type] kwargs: dict[str, Any] = {} flags: list[FieldFlag] = [] if not field.updateable: flags.append(FieldFlag.readonly) fields[field_name] = field_cls(*flags, **kwargs) # type: ignore # Create a new SObject subclass sobject_class = type( f"SObject__{sobject}", (SObject,), { "__doc__": f"Auto-generated SObject class for {sobject} ({describe_data.label})", **fields, }, api_name=sobject, connection=connection, ) return sobject_class
def _is_sobject(value): return isinstance(value, SObject) def _is_sobject_subclass(cls): return issubclass(cls, SObject)
[docs] class SObjectList(list[_sObject]): """A list that contains SObject instances and provides bulk operations via Salesforce's composite API."""
[docs] def __init__(self, iterable: Iterable[_sObject] = (), *, connection: str = ""): """ Initialize an SObjectList. Args: iterable: An optional iterable of SObject instances connection: Optional name of the Salesforce connection to use """ # items must be captured first because the iterable may be a generator, # and validating items before they are added to the list super().__init__(iterable) # Validate all items are SObjects for item in self: if not isinstance(item, SObject): raise TypeError( f"All items must be SObject instances, got {type(item)}" ) self.connection = connection
[docs] @classmethod async def async_init( cls, a_iterable: AsyncIterable[_sObject], connection: str = "" ): collected_records = [record async for record in a_iterable] return cls(collected_records, connection=connection)
[docs] def append(self, item): """Add an SObject to the list.""" if not isinstance(item, SObject): raise TypeError(f"Can only append SObject instances, got {type(item)}") super().append(item) # type: ignore
[docs] def extend(self, iterable): """Extend the list with an iterable of SObjects.""" if not isinstance(iterable, (tuple, list, set)): # ensure that we're not going to be exhausting a generator and losing items. iterable = tuple(iterable) for item in iterable: if not isinstance(item, SObject): raise TypeError( f"All items must be SObject instances, got {type(item)}" ) super().extend(iterable)
def _get_client(self): """Get the Salesforce client to use for operations.""" if self.connection: return sftk_client.SalesforceClient.get_connection(self.connection) elif self: return self[0]._client_connection() else: raise ValueError( "Cannot determine Salesforce connection: list is empty and no connection specified" ) def _ensure_consistent_sobject_type(self) -> type[SObject] | None: """Validate that all SObjects in the list are of the same type.""" if not self: return None first_type = type(self[0]) for i, obj in enumerate(self[1:], 1): if type(obj) is not first_type: raise TypeError( f"All objects must be of the same type. First item is {first_type.__name__}, " f"but item at index {i} is {type(obj).__name__}" ) return first_type def _generate_record_batches( self, max_batch_size: int = 200, only_changes: bool = False, include_fields: list[str] | None = None, ): """ Generate batches of records for processing such that Salesforce will not reject any given batch due to size or type. Excerpt from https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_create.htm > If the request body includes objects of more than one type, they are processed as chunks. > For example, if the incoming objects are {account1, account2, contact1, account3}, > the request is processed in three chunks: {{account1, account2}, {contact1}, {account3}}. > A single request can process up to 10 chunks. """ if max_batch_size > 200: warnings.warn( f"batch size is {max_batch_size}, but Salesforce only allows 200", ) max_batch_size = 200 emitted_records: list[_sObject] = [] batches: list[tuple[list[dict[str, Any]], list[tuple[str, BlobData]]]] = [] previous_record = None batch_records: list[dict[str, Any]] = [] batch_binary_parts: list[tuple[str, BlobData]] = [] batch_chunk_count = 0 for idx, record in enumerate(self): if only_changes and not record.dirty_fields: continue s_record = record.serialize(only_changes) if include_fields: for fieldname in include_fields: s_record[fieldname] = record._fields[fieldname].format( record._values.get(fieldname) ) s_record["attributes"] = {"type": record.attributes.type} if record.attributes.blob_field and ( blob_value := getattr(record, record.attributes.blob_field) ): binary_part_name = "binaryPart" + str(idx) s_record["attributes"].update( { "binaryPartName": binary_part_name, "binaryPartNameAlias": record.attributes.blob_field, } ) batch_binary_parts.append((binary_part_name, blob_value)) if len(batch_records) >= max_batch_size: batches.append((batch_records, batch_binary_parts)) batch_records = [] batch_chunk_count = 0 previous_record = None if ( previous_record is None or previous_record.attributes.type != record.attributes.type ): batch_chunk_count += 1 if batch_chunk_count > 10: batches.append((batch_records, batch_binary_parts)) batch_records = [] batch_chunk_count = 0 previous_record = None batch_records.append(s_record) emitted_records.append(record) previous_record = record if batch_records: batches.append((batch_records, batch_binary_parts)) return batches, emitted_records
[docs] def save( self, external_id_field: str | None = None, only_changes: bool = False, concurrency: int = 1, batch_size: int = 200, all_or_none: bool = False, update_only: bool = False, **callout_options, ) -> list[SObjectSaveResult]: """ Save all SObjects in the list, determining whether to insert, update, or upsert based on the records and parameters. Args: external_id_field: Name of the external ID field to use for upserting (if provided) only_changes: If True, only send changed fields for updates concurrency: Number of concurrent requests to make batch_size: Number of records to include in each batch all_or_none: If True, all records must succeed or all will fail update_only: If True with external_id_field, only update existing records **callout_options: Additional options to pass to the API calls Returns: list[SObjectSaveResult]: List of save results """ if not self: return [] # If external_id_field is provided, use upsert if external_id_field: # Create a new list to ensure all objects have the external ID field upsert_objects = SObjectList( [obj for obj in self if hasattr(obj, external_id_field)], connection=self.connection, ) # Check if any objects are missing the external ID field if len(upsert_objects) != len(self): missing_ext_ids = sum( 1 for obj in self if not hasattr(obj, external_id_field) ) raise ValueError( f"Cannot upsert: {missing_ext_ids} records missing external ID field '{external_id_field}'" ) return upsert_objects.save_upsert( external_id_field=external_id_field, concurrency=concurrency, batch_size=batch_size, only_changes=only_changes, all_or_none=all_or_none, **callout_options, ) # Check if we're dealing with mixed operations (some records have IDs, some don't) has_ids = [obj for obj in self if getattr(obj, obj.attributes.id_field, None)] missing_ids = [ obj for obj in self if not getattr(obj, obj.attributes.id_field, None) ] # If all records have IDs, use update if len(has_ids) == len(self): return self.save_update( only_changes=only_changes, concurrency=concurrency, batch_size=batch_size, **callout_options, ) # If all records are missing IDs, use insert elif len(missing_ids) == len(self): if update_only: raise ValueError( "Cannot perform update_only operation when no records have IDs" ) return self.save_insert( concurrency=concurrency, batch_size=batch_size, **callout_options ) # Mixed case - some records have IDs, some don't else: if update_only: # If update_only, we should only process records with IDs return SObjectList(has_ids, connection=self.connection).save_update( only_changes=only_changes, concurrency=concurrency, batch_size=batch_size, **callout_options, ) # Otherwise, split and process separately results = [] # Process updates first if has_ids: update_results = SObjectList( has_ids, connection=self.connection ).save_update( only_changes=only_changes, concurrency=concurrency, batch_size=batch_size, **callout_options, ) results.extend(update_results) # Then process inserts if missing_ids and not update_only: insert_results = SObjectList( missing_ids, connection=self.connection ).save_insert( concurrency=concurrency, batch_size=batch_size, **callout_options ) results.extend(insert_results) return results
[docs] def save_csv(self, filepath: Path | str, encoding="utf-8") -> None: import csv if isinstance(filepath, str): filepath = Path(filepath).resolve() assert self, "Cannot save an empty list" fieldnames = self[0].query_fields() with filepath.open("w+", encoding=encoding) as outfile: writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(flatten(row.serialize()) for row in self)
[docs] def save_upsert_bulk( self, external_id_field: str, timeout: int = 600, connection: I_SalesforceClient | str | None = None ) -> "BulkApiIngestJob": """Upsert records in bulk using Salesforce Bulk API 2.0 This method uses the Bulk API 2.0 to upsert records based on an external ID field. The external ID field must exist on the object and be marked as an external ID. Args: external_id_field: The API name of the external ID field to use for the upsert timeout: Maximum time in seconds to wait for the job to complete Returns: Dict[str, Any]: Job result information Raises: SalesforceBulkV2LoadError: If the job fails or times out ValueError: If the list is empty or the external ID field doesn't exist """ assert self, "Cannot upsert empty SObjectList" global BulkApiIngestJob try: _ = BulkApiIngestJob except NameError: from .bulk import BulkApiIngestJob if not connection: connection = self[0].attributes.connection job = BulkApiIngestJob.init_job( self[0].attributes.type, "upsert", external_id_field = external_id_field, connection=connection ) job.upload_batches(self) return job
[docs] def save_insert_bulk( self, connection: I_SalesforceClient | str | None = None, **callout_options ) -> "BulkApiIngestJob": """Insert records in bulk using Salesforce Bulk API 2.0 This method uses the Bulk API 2.0 to insert records. Args: timeout: Maximum time in seconds to wait for the job to complete Returns: Dict[str, Any]: Job result information Raises: SalesforceBulkV2LoadError: If the job fails or times out ValueError: If the list is empty or the external ID field doesn't exist """ assert self, "Cannot upsert empty SObjectList" global BulkApiIngestJob try: _ = BulkApiIngestJob except NameError: from .bulk import BulkApiIngestJob if not connection: connection = self[0].attributes.connection job = BulkApiIngestJob.init_job( self[0].attributes.type, "insert", connection=connection, **callout_options ) job.upload_batches(self, **callout_options) return job
[docs] def save_update_bulk( self, connection: I_SalesforceClient | str | None = None, **callout_options ) -> "BulkApiIngestJob": """Update records in bulk using Salesforce Bulk API 2.0 This method uses the Bulk API 2.0 to update records. Returns: Dict[str, Any]: Job result information Raises: SalesforceBulkV2LoadError: If the job fails or times out ValueError: If the list is empty or the external ID field doesn't exist """ assert self, "Cannot upsert empty SObjectList" global BulkApiIngestJob try: _ = BulkApiIngestJob except NameError: from .bulk import BulkApiIngestJob if not connection: connection = self[0].attributes.connection job = BulkApiIngestJob.init_job( self[0].attributes.type, "update", connection=connection, **callout_options ) job.upload_batches(self, **callout_options) return job
[docs] def save_json(self, filepath: Path | str, encoding="utf-8", **json_options) -> None: if isinstance(filepath, str): filepath = Path(filepath).resolve() with filepath.open("w+", encoding=encoding) as outfile: json.dump([record.serialize() for record in self], outfile, **json_options)
[docs] def save_insert( self, concurrency: int = 1, batch_size: int = 200, all_or_none: bool = False, **callout_options ) -> list[SObjectSaveResult]: """ Insert all SObjects in the list. https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_create.htm Returns: self: The list of SObjectSaveResults indicating success or failure of each insert operation """ if not self: return [] sf_client = self._get_client() # Ensure none of the records have IDs for obj in self: if getattr(obj, obj.attributes.id_field, None): raise ValueError( f"Cannot insert record that already has an {obj.attributes.id_field} set" ) # Prepare records for insert record_chunks, emitted_records = self._generate_record_batches(batch_size) headers = {"Content-Type": "application/json"} if headers_option := callout_options.pop("headers", None): headers.update(headers_option) if concurrency > 1 and len(record_chunks) > 1: # execute async return asyncio.run( self.save_insert_async( sf_client, record_chunks, headers, concurrency, all_or_none, **callout_options ) ) # execute sync results = [] for records, blobs in record_chunks: if blobs: with ExitStack() as blob_context: files: list[tuple[str, tuple[str | None, Any, str | None]]] = [ ( "entity_content", (None, json.dumps(records), "application/json"), ), # ( # self.attributes.blob_field, # (blob_data.filename, blob_payload, blob_data.content_type) # ), ] for name, blob_data in blobs: blob_payload = blob_context.enter_context(blob_data) files.append( ( name, ( blob_data.filename, blob_payload, blob_data.content_type, ), ) ) response = sf_client.post( sf_client.composite_sobjects_url(), files=files ) else: response = sf_client.post( sf_client.composite_sobjects_url(), json={ "allOrNone": all_or_none, "records": records }, headers=headers, **callout_options, ) results.extend([SObjectSaveResult(**result) for result in response.json()]) for record, result in zip(emitted_records, results): if result.success: setattr(record, record.attributes.id_field, result.id) return results
[docs] @classmethod async def save_insert_async( cls, sf_client: I_SalesforceClient, record_chunks: list[tuple[list[dict[str, Any]], list[tuple[str, BlobData]]]], headers: dict[str, str], concurrency: int, all_or_none: bool, **callout_options ): if header_options := callout_options.pop("headers", None): headers.update(header_options) async with sf_client.as_async as a_client: tasks = [ cls._save_insert_async_batch( a_client, sf_client.composite_sobjects_url(), records, blobs, all_or_none, headers, **callout_options, ) for records, blobs in record_chunks ] responses = await run_concurrently(concurrency, tasks) return [ SObjectSaveResult(**result) for response in responses for result in response.json() ]
@classmethod async def _save_insert_async_batch( cls, sf_client: I_AsyncSalesforceClient, url: str, records: list[dict[str, Any]], blobs: list[tuple[str, BlobData]] | None, all_or_none: bool, headers: dict[str, str], **callout_options, ): if blobs: with ExitStack() as blob_context: return await sf_client.post( url, files=[ ( "entity_content", (None, json.dumps({ "allOrNone": all_or_none, "records": records }), "application/json"), ), *( ( name, ( blob_data.filename, blob_context.enter_context(blob_data), blob_data.content_type, ), ) for name, blob_data in blobs ), ], ) return await sf_client.post( sf_client.composite_sobjects_url(), json={ "allOrNone": all_or_none, "records": records }, headers=headers, **callout_options, )
[docs] def save_update( self, only_changes: bool = False, all_or_none: bool = False, concurrency: int = 1, batch_size: int = 200, **callout_options, ) -> list[SObjectSaveResult]: """ Update all SObjects in the list. https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_update.htm Args: only_changes: If True, only send changed fields concurrency: Number of concurrent requests to make batch_size: Number of records to include in each batch **callout_options: Additional options to pass to the API call Returns: list[SObjectSaveResult]: List of save results """ if not self: return [] sf_client = self._get_client() # Ensure all records have IDs for i, record in enumerate(self): id_val = getattr(record, record.attributes.id_field, None) if not id_val: raise ValueError( f"Record at index {i} has no {record.attributes.id_field} for update" ) if record.attributes.blob_field and getattr( record, record.attributes.blob_field ): raise ValueError( f"Cannot update files in composite calls. " f"{type(record).__name__} Record at index {i} has Blob/File " f"value for field {record.attributes.blob_field}" ) # Prepare records for update record_chunks, emitted_records = self._generate_record_batches( batch_size, only_changes ) headers = {"Content-Type": "application/json"} if headers_option := callout_options.pop("headers", None): headers.update(headers_option) if concurrency > 1: # execute async return asyncio.run( self.save_update_async( [chunk[0] for chunk in record_chunks], all_or_none, headers, sf_client, **callout_options, ) ) # execute sync results: list[SObjectSaveResult] = [] for records, blobs in record_chunks: assert not blobs, "Cannot update collections with files" response = sf_client.patch( sf_client.composite_sobjects_url(), json={ "allOrNone": all_or_none, "records": records }, headers=headers, **callout_options, ) results.extend([SObjectSaveResult(**result) for result in response.json()]) for record, result in zip(emitted_records, results): if result.success: record.dirty_fields.clear() return results
[docs] @staticmethod async def save_update_async( record_chunks: list[list[dict[str, Any]]], all_or_none: bool, headers: dict[str, str], sf_client: I_SalesforceClient, **callout_options, ) -> list[SObjectSaveResult]: async with sf_client.as_async as a_client: tasks = [ a_client.post( sf_client.composite_sobjects_url(), json={ "allOrNone": all_or_none, "records": chunk }, headers=headers, **callout_options, ) for chunk in record_chunks ] responses = await asyncio.gather(*tasks) return [ SObjectSaveResult(**result) for response in responses for result in response.json() ]
[docs] def save_upsert( self, external_id_field: str, concurrency: int = 1, batch_size: int = 200, only_changes: bool = False, all_or_none: bool = False, **callout_options, ): """ Upsert all SObjects in the list using an external ID field. https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections_upsert.htm Args: external_id_field: Name of the external ID field to use for upserting concurrency: Number of concurrent requests to make batch_size: Number of records to include in each batch only_changes: If True, only send changed fields for updates **callout_options: Additional options to pass to the API call Returns: list[SObjectSaveResult]: List of save results """ object_type = self._ensure_consistent_sobject_type() if not object_type: # no records to upsert, early return return [] sf_client = self._get_client() # Ensure all records have the external ID field for i, record in enumerate(self): ext_id_val = getattr(record, external_id_field, None) if not ext_id_val: raise AssertionError( f"Record at index {i} has no value for external ID field '{external_id_field}'" ) if record.attributes.blob_field and getattr( record, record.attributes.blob_field ): raise ValueError( f"Cannot update files in composite calls. " f"{type(record).__name__} Record at index {i} has Blob/File " f"value for field {record.attributes.blob_field}" ) # Chunk the requests record_batches, emitted_records = self._generate_record_batches( batch_size, only_changes, include_fields=[external_id_field] ) headers = {"Content-Type": "application/json"} if headers_option := callout_options.pop("headers", None): headers.update(headers_option) url = ( sf_client.composite_sobjects_url(object_type.attributes.type) + "/" + external_id_field ) results: list[SObjectSaveResult] if concurrency > 1 and len(record_batches) > 1: # execute async results = asyncio.run( self.save_upsert_async( sf_client, url, [batch[0] for batch in record_batches], headers, concurrency, all_or_none, **callout_options, ) ) else: # execute sync results = [] for record_batch in record_batches: response = sf_client.patch( url, json={"allOrNone": all_or_none, "records": record_batch[0]}, headers=headers, ) results.extend( [SObjectSaveResult(**result) for result in response.json()] ) # Clear dirty fields as operations were successful for record, result in zip(emitted_records, results): if result.success: record.dirty_fields.clear() return results
[docs] @staticmethod async def save_upsert_async( sf_client: I_SalesforceClient, url: str, record_chunks: list[list[dict[str, Any]]], headers: dict[str, str], concurrency: int, all_or_none: bool, **callout_options, ): async with sf_client.as_async as a_client: tasks = [ a_client.patch( url, json={"allOrNone": all_or_none, "records": chunk}, headers=headers, **callout_options, ) for chunk in record_chunks if chunk ] responses = await run_concurrently(concurrency, tasks) results = [ SObjectSaveResult(**result) for response in responses for result in response.json() ] return results
[docs] def delete( self, clear_id_field: bool = False, batch_size: int = 200, concurrency: int = 1, all_or_none: bool = False, **callout_options, ): """ Delete all SObjects in the list. Args: clear_id_field: If True, clear the ID field on the objects after deletion Returns: self: The list itself for method chaining """ if not self: return [] record_id_batches = list( chunked( [ record_id for obj in self if (record_id := getattr(obj, obj.attributes.id_field, None)) ], batch_size, ) ) sf_client = self._get_client() results: list[SObjectSaveResult] if len(record_id_batches) > 1 and concurrency > 1: results = asyncio.run( self.delete_async( sf_client, record_id_batches, all_or_none, concurrency, **callout_options, ) ) else: headers = {"Content-Type": "application/json"} if headers_option := callout_options.pop("headers", None): headers.update(headers_option) url = sf_client.composite_sobjects_url() results = [] for batch in record_id_batches: response = sf_client.delete( url, params={"allOrNone": all_or_none, "ids": ",".join(batch)}, headers=headers, **callout_options, ) results.extend( [SObjectSaveResult(**result) for result in response.json()] ) if clear_id_field: for record, result in zip(self, results): if result.success: delattr(record, record.attributes.id_field) return results
[docs] @staticmethod async def delete_async( sf_client: I_SalesforceClient, record_id_batches: list[list[str]], all_or_none: bool, concurrency: int, **callout_options, ): """ Delete all SObjects in the list asynchronously. Args: sf_client: The Salesforce client record_id_batches: List of batches of record IDs to delete all_or_none: If True, delete all records or none callout_options: Additional options for the callout Returns: List of SObjectSaveResult objects """ url = sf_client.composite_sobjects_url() headers = {"Content-Type": "application/json"} if headers_option := callout_options.pop("headers", None): headers.update(headers_option) async with sf_client.as_async as async_client: tasks = [ async_client.delete( url, params={"allOrNone": all_or_none, "ids": ",".join(record_id)}, headers=headers, **callout_options, ) for record_id in record_id_batches ] responses = await run_concurrently(concurrency, tasks) results = [ SObjectSaveResult(**result) for response in responses for result in response.json() ] return results
[docs] def assert_single_type(self): """Assert there is exactly one type of record in the list""" assert len(self) > 0, "There must be at least one record." record_type = type(self[0]) assert all(isinstance(record, record_type) for record in self),\ "Records must be of the same type."