From f385f290e23eb4e600d209337b9e9b2e43266bd2 Mon Sep 17 00:00:00 2001 From: John Kerl Date: Fri, 12 Apr 2024 12:24:03 -0400 Subject: [PATCH] [python] Ingestion performance (#2434) * AlreadyExistsError; use for DataFrame in tiledbsoma.io * apply in `_create_or_open_collection` * apply in _create_from_matrix * apply in _ingest_uns_ndarray * lint * Run SO copying workflow on macos-13 to avoid SIP (#2435) * AlreadyExistsError; use for DataFrame in tiledbsoma.io * apply in `_create_or_open_collection` * apply in _create_from_matrix * apply in _ingest_uns_ndarray * lint * neaten * neaten * Update raises-notes * code-review feedback Co-authored-by: nguyenv --------- Co-authored-by: John Blischak Co-authored-by: nguyenv --- apis/python/src/tiledbsoma/__init__.py | 3 +- apis/python/src/tiledbsoma/_collection.py | 28 +++++++++---- .../python/src/tiledbsoma/_common_nd_array.py | 18 ++++++--- apis/python/src/tiledbsoma/_dataframe.py | 18 ++++++--- apis/python/src/tiledbsoma/_exception.py | 30 ++++++++++++++ apis/python/src/tiledbsoma/io/ingest.py | 40 +++++++++---------- 6 files changed, 98 insertions(+), 39 deletions(-) diff --git a/apis/python/src/tiledbsoma/__init__.py b/apis/python/src/tiledbsoma/__init__.py index 337aebce38..03a67e72a7 100644 --- a/apis/python/src/tiledbsoma/__init__.py +++ b/apis/python/src/tiledbsoma/__init__.py @@ -147,7 +147,7 @@ from ._constants import SOMA_JOINID from ._dataframe import DataFrame from ._dense_nd_array import DenseNDArray -from ._exception import DoesNotExistError, SOMAError +from ._exception import AlreadyExistsError, DoesNotExistError, SOMAError from ._experiment import Experiment from ._factory import open from ._general_utilities import ( @@ -171,6 +171,7 @@ __version__ = get_implementation_version() __all__ = [ + "AlreadyExistsError", "AxisColumnNames", "AxisQuery", "Collection", diff --git a/apis/python/src/tiledbsoma/_collection.py b/apis/python/src/tiledbsoma/_collection.py index d74bcb11e8..922e2f9817 100644 --- a/apis/python/src/tiledbsoma/_collection.py +++ b/apis/python/src/tiledbsoma/_collection.py @@ -37,7 +37,12 @@ from ._common_nd_array import NDArray from ._dataframe import DataFrame from ._dense_nd_array import DenseNDArray -from ._exception import SOMAError, is_does_not_exist_error +from ._exception import ( + AlreadyExistsError, + SOMAError, + is_already_exists_error, + is_does_not_exist_error, +) from ._funcs import typeguard_ignore from ._sparse_nd_array import SparseNDArray from ._tiledb_object import AnyTileDBObject, TileDBObject @@ -112,6 +117,8 @@ def create( the context. Raises: + tiledbsoma.AlreadyExistsError: + If the underlying object already exists at the given URI. TileDBError: If unable to create the underlying object. @@ -119,13 +126,18 @@ def create( Experimental. """ context = _validate_soma_tiledb_context(context) - tiledb.group_create(uri=uri, ctx=context.tiledb_ctx) - handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp) - cls._set_create_metadata(handle) - return cls( - handle, - _dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code", - ) + try: + tiledb.group_create(uri=uri, ctx=context.tiledb_ctx) + handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp) + cls._set_create_metadata(handle) + return cls( + handle, + _dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code", + ) + except tiledb.TileDBError as tdbe: + if is_already_exists_error(tdbe): + raise AlreadyExistsError(f"{uri!r} already exists") + raise @classmethod def open( diff --git a/apis/python/src/tiledbsoma/_common_nd_array.py b/apis/python/src/tiledbsoma/_common_nd_array.py index 3f476de60f..adc6e8c94a 100644 --- a/apis/python/src/tiledbsoma/_common_nd_array.py +++ b/apis/python/src/tiledbsoma/_common_nd_array.py @@ -16,6 +16,7 @@ import tiledb from . import _arrow_types, _util +from ._exception import AlreadyExistsError, is_already_exists_error from ._tiledb_array import TileDBArray from ._types import OpenTimestamp from .options._soma_tiledb_context import ( @@ -77,6 +78,8 @@ def create( If the ``type`` is unsupported. ValueError: If the ``shape`` is unsupported. + tiledbsoma.AlreadyExistsError: + If the underlying object already exists at the given URI. TileDBError: If unable to create the underlying object. @@ -91,11 +94,16 @@ def create( context, is_sparse=cls.is_sparse, ) - handle = cls._create_internal(uri, schema, context, tiledb_timestamp) - return cls( - handle, - _dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code", - ) + try: + handle = cls._create_internal(uri, schema, context, tiledb_timestamp) + return cls( + handle, + _dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code", + ) + except tiledb.TileDBError as tdbe: + if is_already_exists_error(tdbe): + raise AlreadyExistsError(f"{uri!r} already exists") + raise @property def shape(self) -> Tuple[int, ...]: diff --git a/apis/python/src/tiledbsoma/_dataframe.py b/apis/python/src/tiledbsoma/_dataframe.py index 4daae22ec8..76c76cb691 100644 --- a/apis/python/src/tiledbsoma/_dataframe.py +++ b/apis/python/src/tiledbsoma/_dataframe.py @@ -20,6 +20,7 @@ from . import _arrow_types, _util from . import pytiledbsoma as clib from ._constants import SOMA_JOINID +from ._exception import AlreadyExistsError, is_already_exists_error from ._query_condition import QueryCondition from ._read_iters import TableReadIter from ._tdb_handles import DataFrameWrapper @@ -187,6 +188,8 @@ def create( an undefined column name. ValueError: If the ``schema`` specifies illegal column names. + tiledbsoma.AlreadyExistsError: + If the underlying object already exists at the given URI. TileDBError: If unable to create the underlying object. @@ -217,11 +220,16 @@ def create( TileDBCreateOptions.from_platform_config(platform_config), context, ) - handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp) - return cls( - handle, - _dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code", - ) + try: + handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp) + return cls( + handle, + _dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code", + ) + except tiledb.TileDBError as tdbe: + if is_already_exists_error(tdbe): + raise AlreadyExistsError(f"{uri!r} already exists") + raise def keys(self) -> Tuple[str, ...]: """Returns the names of the columns when read back as a dataframe. diff --git a/apis/python/src/tiledbsoma/_exception.py b/apis/python/src/tiledbsoma/_exception.py index 109a42e72c..10a024855d 100644 --- a/apis/python/src/tiledbsoma/_exception.py +++ b/apis/python/src/tiledbsoma/_exception.py @@ -55,6 +55,36 @@ def is_does_not_exist_error(e: tiledb.TileDBError) -> bool: return False +class AlreadyExistsError(SOMAError): + """Raised when attempting to create an already existing SOMA object. + + Lifecycle: experimental + """ + + pass + + +def is_already_exists_error(e: tiledb.TileDBError) -> bool: + """Given a TileDBError, return true if it indicates the object already exists + + Lifecycle: experimental + + Example: + try: + tiledb.Array.create(uri, schema, ctx=ctx) + ... + except tiledb.TileDBError as e: + if is_already_exists_error(e): + ... + raise e + """ + stre = str(e) + # Local-disk, S3, and TileDB Cloud exceptions all have the substring + # "already exists". Here we lower-case the exception message just + # in case someone ever uppercases it on the other end. + return "already exists" in stre.lower() + + def is_duplicate_group_key_error(e: tiledb.TileDBError) -> bool: """Given a TileDBError, return try if it indicates a duplicate member add request in a tiledb.Group. diff --git a/apis/python/src/tiledbsoma/io/ingest.py b/apis/python/src/tiledbsoma/io/ingest.py index 80f74c396c..7863da60f4 100644 --- a/apis/python/src/tiledbsoma/io/ingest.py +++ b/apis/python/src/tiledbsoma/io/ingest.py @@ -58,7 +58,11 @@ from .._collection import AnyTileDBCollection, CollectionBase from .._common_nd_array import NDArray from .._constants import SOMA_JOINID -from .._exception import DoesNotExistError, SOMAError +from .._exception import ( + AlreadyExistsError, + DoesNotExistError, + SOMAError, +) from .._tdb_handles import RawHandle from .._tiledb_array import TileDBArray from .._tiledb_object import AnyTileDBObject, TileDBObject @@ -984,17 +988,13 @@ def _create_or_open_collection( additional_metadata: AdditionalMetadata = None, ) -> CollectionBase[_TDBO]: try: - thing = cls.open(uri, "w", context=context) - except DoesNotExistError: - pass # This is always OK; make a new one. - else: + coll = cls.create(uri, context=context) + except AlreadyExistsError: # It already exists. Are we resuming? if ingestion_params.error_if_already_exists: raise SOMAError(f"{uri} already exists") - add_metadata(thing, additional_metadata) - return thing + coll = cls.open(uri, "w", context=context) - coll = cls.create(uri, context=context) add_metadata(coll, additional_metadata) return coll @@ -1194,15 +1194,18 @@ def _write_dataframe_impl( ) try: - soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context) - except DoesNotExistError: soma_df = DataFrame.create( df_uri, schema=arrow_table.schema, platform_config=platform_config, context=context, ) - else: + except AlreadyExistsError: + if ingestion_params.error_if_already_exists: + raise SOMAError(f"{soma_df.uri} already exists") + + soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context) + if ingestion_params.skip_existing_nonempty_domain: storage_ned = _read_nonempty_domain(soma_df) dim_range = ((int(df.index.min()), int(df.index.max())),) @@ -1212,8 +1215,6 @@ def _write_dataframe_impl( _util.format_elapsed(s, f"SKIPPED {soma_df.uri}"), ) return soma_df - elif ingestion_params.error_if_already_exists: - raise SOMAError(f"{soma_df.uri} already exists") if ingestion_params.write_schema_no_data: logging.log_io( @@ -1291,10 +1292,6 @@ def _create_from_matrix( logging.log_io(None, f"START WRITING {uri}") try: - soma_ndarray = cls.open( - uri, "w", platform_config=platform_config, context=context - ) - except DoesNotExistError: # A SparseNDArray must be appendable in soma.io. shape = [None for _ in matrix.shape] if cls.is_sparse else matrix.shape soma_ndarray = cls.create( @@ -1304,9 +1301,12 @@ def _create_from_matrix( platform_config=platform_config, context=context, ) - else: + except AlreadyExistsError: if ingestion_params.error_if_already_exists: raise SOMAError(f"{soma_ndarray.uri} already exists") + soma_ndarray = cls.open( + uri, "w", platform_config=platform_config, context=context + ) if ingestion_params.write_schema_no_data: logging.log_io( @@ -2749,8 +2749,6 @@ def _ingest_uns_ndarray( logging.log_io(msg, msg) return try: - soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context) - except DoesNotExistError: soma_arr = DenseNDArray.create( arr_uri, type=pa_dtype, @@ -2758,6 +2756,8 @@ def _ingest_uns_ndarray( platform_config=platform_config, context=context, ) + except AlreadyExistsError: + soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context) # If resume mode: don't re-write existing data. This is the user's explicit request # that we not re-write things that have already been written.