From cfc89e61557fb81276b502c9f96edb4d77d90f43 Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Sun, 7 Jun 2026 17:22:31 -0500 Subject: [PATCH] fix(staged_insert): converge metadata shape with ObjectCodec.encode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bring the staged-insert metadata dict into structural equality with the ordinary insert1 path, per the Staged Insert Specification (datajoint-docs#177). Without this change the same content stored via the two paths yields different column dicts: ObjectCodec.encode -> {path, store, size, ext, is_dir, item_count, timestamp} staged (directory) -> {path, size, hash, ext, is_dir, item_count, timestamp} staged (single file)-> {path, size, hash, ext, is_dir, timestamp, mime_type?} The staged path is now the canonical shape. Drops: - hash: None (never carried information) - mime_type (file case) (not in encode shape) The file case now also carries item_count: None (matching encode). Also fix the store_name divergence: staged_insert resolved the backend from stores.default regardless of the field's type spec. A field declared would write through stores.default — and the store key recorded in the metadata column would point at the wrong store. Now resolve store_name from attr.store via resolve_dtype() and use that for both path/backend resolution and the metadata's store field. Drop the .manifest.json sidecar that the staged path wrote and the encode path didn't. The metadata dict already records total size and item_count; per-file listings are recoverable by walking the canonical directory if ever needed. Fix docstrings that showed `staged.rec['raw_data'] = z` — the framework computes the metadata; the caller does not assign anything to the staged field on staged.rec. Add tests/integration/test_object.py::TestStagedInsert:: test_staged_insert_metadata_shape_matches_encode covering both the single-file and directory cases against ObjectCodec.encode for equivalent content. Slated for DataJoint 2.3. --- src/datajoint/staged_insert.py | 218 +++++++++++++------------------ tests/integration/test_object.py | 98 ++++++++++++++ 2 files changed, 189 insertions(+), 127 deletions(-) diff --git a/src/datajoint/staged_insert.py b/src/datajoint/staged_insert.py index 1f6ee7afb..ffbe8a8f2 100644 --- a/src/datajoint/staged_insert.py +++ b/src/datajoint/staged_insert.py @@ -5,16 +5,19 @@ to object storage before finalizing the database insert. """ -import json -import mimetypes from contextlib import contextmanager from datetime import datetime, timezone -from typing import IO, Any +from typing import IO, TYPE_CHECKING, Any import fsspec +from .codecs import resolve_dtype from .errors import DataJointError -from .storage import StorageBackend, build_object_path +from .hash_registry import get_store_backend +from .storage import build_object_path + +if TYPE_CHECKING: + from .storage import StorageBackend class StagedInsert: @@ -30,15 +33,14 @@ class StagedInsert: staged.rec['subject_id'] = 123 staged.rec['session_id'] = 45 - # Create object storage directly + # Write directly to object storage z = zarr.open(staged.store('raw_data', '.zarr'), mode='w', shape=(1000, 1000)) z[:] = data - # Assign to record - staged.rec['raw_data'] = z - - # On successful exit: metadata computed, record inserted - # On exception: storage cleaned up, no record inserted + # On clean exit: metadata is computed and the row is inserted. + # The caller does NOT assign anything to staged.rec[] — + # the framework computes the metadata dict. + # On exception: storage cleaned up, no row inserted. """ def __init__(self, table): @@ -50,8 +52,7 @@ def __init__(self, table): """ self._table = table self._rec: dict[str, Any] = {} - self._staged_objects: dict[str, dict] = {} # field -> {path, ext, token} - self._backend: StorageBackend | None = None + self._staged_objects: dict[str, dict] = {} # field -> {relative_path, ext, token, store_name} @property def rec(self) -> dict[str, Any]: @@ -60,47 +61,39 @@ def rec(self) -> dict[str, Any]: @property def fs(self) -> fsspec.AbstractFileSystem: - """Return fsspec filesystem for advanced operations.""" - self._ensure_backend() - return self._backend.fs + """ + Return fsspec filesystem for the default store, for advanced operations. - def _ensure_backend(self): - """Ensure storage backend is initialized.""" - if self._backend is None: - try: - spec = self._table.connection._config.get_store_spec() # Uses stores.default - self._backend = StorageBackend(spec) - except DataJointError: - raise DataJointError( - "Storage is not configured. Set stores.default and stores. settings in datajoint.json." - ) - - def _get_storage_path(self, field: str, ext: str = "") -> str: + For per-field access, prefer ``staged.store(field)`` or ``staged.open(field)`` — + those route to the store resolved from the field's type spec. """ - Get or create the storage path for a field. + return self._default_backend().fs - Args: - field: Name of the object attribute - ext: Optional extension (e.g., ".zarr") + def _default_backend(self): + """Return the StorageBackend for the default store, or raise a clear error.""" + try: + return get_store_backend(None, config=self._table.connection._config) + except DataJointError: + raise DataJointError("Storage is not configured. Set stores.default and stores. settings in datajoint.json.") - Returns: - Full storage path + def _resolve_field(self, field: str, ext: str) -> tuple[str, "StorageBackend"]: """ - self._ensure_backend() + Resolve a field to its (relative_path, backend), caching on first call. + Validates the field is an ```` attribute and that the full + primary key is set on ``staged.rec``. + """ if field in self._staged_objects: - return self._staged_objects[field]["full_path"] + info = self._staged_objects[field] + return info["relative_path"], self._field_backend(info["store_name"]) - # Validate field is an object attribute if field not in self._table.heading: raise DataJointError(f"Attribute '{field}' not found in table heading") attr = self._table.heading[field] - # Check if this is an object Codec (has codec with "object" as name) if not (attr.codec and attr.codec.name == "object"): raise DataJointError(f"Attribute '{field}' is not an type") - # Extract primary key from rec primary_key = {k: self._rec[k] for k in self._table.primary_key if k in self._rec} if len(primary_key) != len(self._table.primary_key): raise DataJointError( @@ -108,12 +101,17 @@ def _get_storage_path(self, field: str, ext: str = "") -> str: f"Missing: {set(self._table.primary_key) - set(primary_key)}" ) - # Get storage spec (uses stores.default) - spec = self._table.connection._config.get_store_spec() + # Resolve the store name from the field's type spec (e.g., -> "local") + _, _, store_name = resolve_dtype(f"<{attr.codec.name}>", store_name=attr.store) + + config = self._table.connection._config + try: + spec = config.get_store_spec(store_name) + except DataJointError: + raise DataJointError("Storage is not configured. Set stores.default and stores. settings in datajoint.json.") partition_pattern = spec.get("partition_pattern") token_length = spec.get("token_length", 8) - # Build storage path (relative - StorageBackend will add location prefix) relative_path, token = build_object_path( schema=self._table.database, table=self._table.class_name, @@ -124,18 +122,25 @@ def _get_storage_path(self, field: str, ext: str = "") -> str: token_length=token_length, ) - # Store staged object info (all paths are relative, backend adds location) self._staged_objects[field] = { "relative_path": relative_path, "ext": ext if ext else None, "token": token, + "store_name": store_name, } - return relative_path + return relative_path, self._field_backend(store_name) + + def _field_backend(self, store_name: str | None): + """Return the StorageBackend for the named store.""" + try: + return get_store_backend(store_name, config=self._table.connection._config) + except DataJointError: + raise DataJointError("Storage is not configured. Set stores.default and stores. settings in datajoint.json.") def store(self, field: str, ext: str = "") -> fsspec.FSMap: """ - Get an FSMap store for direct writes to an object field. + Get an FSMap for direct writes to an ```` field. Args: field: Name of the object attribute @@ -144,12 +149,12 @@ def store(self, field: str, ext: str = "") -> fsspec.FSMap: Returns: fsspec.FSMap suitable for Zarr/xarray """ - path = self._get_storage_path(field, ext) - return self._backend.get_fsmap(path) + relative_path, backend = self._resolve_field(field, ext) + return backend.get_fsmap(relative_path) def open(self, field: str, ext: str = "", mode: str = "wb") -> IO: """ - Open a file for direct writes to an object field. + Open a file for direct writes to an ```` field. Args: field: Name of the object attribute @@ -159,127 +164,86 @@ def open(self, field: str, ext: str = "", mode: str = "wb") -> IO: Returns: File-like object for writing """ - path = self._get_storage_path(field, ext) - return self._backend.open(path, mode) + relative_path, backend = self._resolve_field(field, ext) + return backend.open(relative_path, mode) def _compute_metadata(self, field: str) -> dict: """ - Compute metadata for a staged object after writing is complete. + Compute the canonical ```` metadata dict for a staged write. - Args: - field: Name of the object attribute + The returned dict is structurally equal to what ``ObjectCodec.encode`` + would produce for the same content, modulo ``timestamp``. - Returns: - JSON-serializable metadata dict + Returns + ------- + dict + ``{path, store, size, ext, is_dir, item_count, timestamp}`` """ info = self._staged_objects[field] relative_path = info["relative_path"] ext = info["ext"] + store_name = info["store_name"] + backend = self._field_backend(store_name) - # Check if it's a directory (multiple files) or single file - # _full_path adds the location prefix - full_remote_path = self._backend._full_path(relative_path) + full_remote_path = backend._full_path(relative_path) try: - is_dir = self._backend.fs.isdir(full_remote_path) + is_dir = backend.fs.isdir(full_remote_path) except Exception: is_dir = False if is_dir: - # Calculate total size and file count total_size = 0 item_count = 0 - files = [] - - for root, dirs, filenames in self._backend.fs.walk(full_remote_path): + for root, _dirs, filenames in backend.fs.walk(full_remote_path): for filename in filenames: - file_path = f"{root}/{filename}" try: - file_size = self._backend.fs.size(file_path) - rel_path = file_path[len(full_remote_path) :].lstrip("/") - files.append({"path": rel_path, "size": file_size}) - total_size += file_size + total_size += backend.fs.size(f"{root}/{filename}") item_count += 1 except Exception: pass - - # Create manifest - manifest = { - "files": files, - "total_size": total_size, - "item_count": item_count, - "created": datetime.now(timezone.utc).isoformat(), - } - - # Write manifest alongside folder - manifest_path = f"{relative_path}.manifest.json" - self._backend.put_buffer(json.dumps(manifest, indent=2).encode(), manifest_path) - - metadata = { - "path": relative_path, - "size": total_size, - "hash": None, - "ext": ext, - "is_dir": True, - "timestamp": datetime.now(timezone.utc).isoformat(), - "item_count": item_count, - } + size = total_size else: - # Single file try: - size = self._backend.size(relative_path) + size = backend.size(relative_path) except Exception: size = 0 - - metadata = { - "path": relative_path, - "size": size, - "hash": None, - "ext": ext, - "is_dir": False, - "timestamp": datetime.now(timezone.utc).isoformat(), - } - - # Add mime_type for files - if ext: - mime_type, _ = mimetypes.guess_type(f"file{ext}") - if mime_type: - metadata["mime_type"] = mime_type - - return metadata + item_count = None + + return { + "path": relative_path, + "store": store_name, + "size": size, + "ext": ext, + "is_dir": is_dir, + "item_count": item_count, + "timestamp": datetime.now(timezone.utc).isoformat(), + } def _finalize(self): """ - Finalize the staged insert by computing metadata and inserting the record. + Compute metadata for each staged object and insert the row. """ - # Process each staged object for field in list(self._staged_objects.keys()): - metadata = self._compute_metadata(field) - # Store metadata dict in the record (ObjectType.encode handles it) - self._rec[field] = metadata - - # Insert the record + self._rec[field] = self._compute_metadata(field) self._table.insert1(self._rec) def _cleanup(self): """ - Clean up staged objects on failure. + Best-effort removal of staged objects on failure. """ - if self._backend is None: - return - for field, info in self._staged_objects.items(): relative_path = info["relative_path"] try: - # Check if it's a directory - full_remote_path = self._backend._full_path(relative_path) - if self._backend.fs.exists(full_remote_path): - if self._backend.fs.isdir(full_remote_path): - self._backend.remove_folder(relative_path) + backend = self._field_backend(info["store_name"]) + full_remote_path = backend._full_path(relative_path) + if backend.fs.exists(full_remote_path): + if backend.fs.isdir(full_remote_path): + backend.remove_folder(relative_path) else: - self._backend.remove(relative_path) + backend.remove(relative_path) except Exception: - pass # Best effort cleanup + pass # Best-effort cleanup @contextmanager @@ -299,7 +263,7 @@ def staged_insert1(table): staged.rec['session_id'] = 45 z = zarr.open(staged.store('raw_data', '.zarr'), mode='w') z[:] = data - staged.rec['raw_data'] = z + # Metadata for 'raw_data' is computed on clean exit; do not assign it here. """ staged = StagedInsert(table) try: diff --git a/tests/integration/test_object.py b/tests/integration/test_object.py index f0ac8c1d9..c03540b58 100644 --- a/tests/integration/test_object.py +++ b/tests/integration/test_object.py @@ -758,3 +758,101 @@ def test_staged_insert_missing_pk_raises(self, schema_obj, mock_object_storage): with table.staged_insert1 as staged: # Don't set primary key staged.store("data_file", ".dat") + + def test_staged_insert_metadata_shape_matches_encode(self, schema_obj, mock_object_storage, tmpdir_factory): + """ + Spec contract: the metadata dict produced by a staged insert is structurally + equal to what ObjectCodec.encode would produce for equivalent content. + + Per the Staged Insert Specification, both paths converge on the shape + {path, store, size, ext, is_dir, item_count, timestamp}. + """ + from datajoint.builtin_codecs.object import ObjectCodec + + # ---- Single-file case ---- + captured = {} + table = ObjectFile() + original_insert1 = table.insert1 + + def capture_file(row, **kwargs): + captured["file"] = dict(row) + return original_insert1(row, **kwargs) + + table.insert1 = capture_file + with table.staged_insert1 as staged: + staged.rec["file_id"] = 800 + with staged.open("data_file", ".dat") as f: + f.write(b"staged content") + + staged_file_meta = captured["file"]["data_file"] + + ref_path = Path(str(tmpdir_factory.mktemp("encode_ref_file"))) / "ref.dat" + ref_path.write_bytes(b"reference content") + codec = ObjectCodec() + encode_file_meta = codec.encode( + ref_path, + key={ + "_schema": table.database, + "_table": table.class_name, + "_field": "data_file", + "_config": table.connection._config, + "file_id": 801, + }, + store_name="local", + ) + + assert set(staged_file_meta.keys()) == set(encode_file_meta.keys()), ( + f"file metadata keys mismatch: " f"staged={sorted(staged_file_meta)}, encode={sorted(encode_file_meta)}" + ) + for key in ("store", "ext", "is_dir", "item_count"): + assert staged_file_meta[key] == encode_file_meta[key], ( + f"file {key} mismatch: " f"staged={staged_file_meta[key]!r}, encode={encode_file_meta[key]!r}" + ) + + table.delete() + + # ---- Directory case ---- + captured.clear() + table_folder = ObjectFolder() + original_folder_insert1 = table_folder.insert1 + + def capture_folder(row, **kwargs): + captured["folder"] = dict(row) + return original_folder_insert1(row, **kwargs) + + table_folder.insert1 = capture_folder + with table_folder.staged_insert1 as staged: + staged.rec["folder_id"] = 802 + fsmap = staged.store("data_folder") + fsmap["a.bin"] = b"aaa" + fsmap["b.bin"] = b"bbbb" + + staged_dir_meta = captured["folder"]["data_folder"] + + ref_dir = Path(str(tmpdir_factory.mktemp("encode_ref_dir"))) + (ref_dir / "x.bin").write_bytes(b"x") + (ref_dir / "y.bin").write_bytes(b"yy") + encode_dir_meta = codec.encode( + ref_dir, + key={ + "_schema": table_folder.database, + "_table": table_folder.class_name, + "_field": "data_folder", + "_config": table_folder.connection._config, + "folder_id": 803, + }, + store_name="local", + ) + + assert set(staged_dir_meta.keys()) == set(encode_dir_meta.keys()), ( + f"directory metadata keys mismatch: " f"staged={sorted(staged_dir_meta)}, encode={sorted(encode_dir_meta)}" + ) + for key in ("store", "ext", "is_dir"): + assert staged_dir_meta[key] == encode_dir_meta[key], ( + f"directory {key} mismatch: " f"staged={staged_dir_meta[key]!r}, encode={encode_dir_meta[key]!r}" + ) + # item_count must be a non-negative integer for directories on both paths + assert isinstance(staged_dir_meta["item_count"], int) and staged_dir_meta["item_count"] >= 0 + assert isinstance(encode_dir_meta["item_count"], int) and encode_dir_meta["item_count"] >= 0 + + table_folder.delete()