RFC-0004: Streaming Atomic Writes¶
Status¶
Accepted
Summary¶
Add open_atomic() to Backend and Store, returning a context manager that
yields a writable file object. Data is written to a temporary location; on
successful exit the file is atomically promoted to its final path. On failure
the temporary artifact is cleaned up and the target path is never modified.
This eliminates the memory-buffering requirement of write_atomic() for
multi-GB workloads (Parquet exports, log rotation, report generation).
Motivation¶
write_atomic(path, content) accepts WritableContent = BinaryIO | bytes.
Even the BinaryIO variant requires the caller to have the complete content
available as a seekable stream before the call — there is no way to
incrementally produce content and have the backend handle atomicity.
Real-world data-lake workflows produce large files incrementally:
# Current: caller must buffer everything
buf = io.BytesIO()
pq.write_table(table, buf) # entire table in memory
store.write_atomic("out.parquet", buf.getvalue(), overwrite=True)
For a 2 GB Parquet file this requires 2 GB+ of Python heap. The desired API:
# Proposed: streaming, constant memory
with store.open_atomic("out.parquet", overwrite=True) as f:
pq.write_table(table, f) # PyArrow writes directly to f
The context manager owns the temp-path lifecycle: on __exit__(None) it
promotes the temp file; on __exit__(exc) it deletes the temp file. The
caller never sees the temporary artifact.
Prior art inside the codebase¶
ext.arrow._StoreSink already implements a write-buffer-then-flush pattern
using SpooledTemporaryFile. However it always calls Store.write() (not
write_atomic()) on close, so it provides no atomicity guarantee. It also
unconditionally uses overwrite=True, ignoring the caller's intent.
open_atomic() formalises and generalises this pattern at the backend level
where temp-path strategies can be backend-native.
Proposal¶
1. New abstract method on Backend¶
# src/remote_store/_backend.py
@abc.abstractmethod
@contextlib.contextmanager
def open_atomic(
self, path: str, *, overwrite: bool = False
) -> Iterator[BinaryIO]:
"""Yield a writable file object backed by a temporary location.
On successful exit the temp file is atomically promoted to *path*.
On exception the temp file is removed and *path* is untouched.
:raises AlreadyExists: If *path* exists and *overwrite* is ``False``.
:raises CapabilityNotSupported: If the backend lacks ``ATOMIC_WRITE``.
"""
The method is @contextmanager so backends return Iterator[BinaryIO]
(a Generator). Callers use it as with backend.open_atomic(...) as f:.
2. New public method on Store¶
# src/remote_store/_store.py
@contextlib.contextmanager
def open_atomic(
self, path: str, *, overwrite: bool = False
) -> Iterator[BinaryIO]:
"""Open a file for streaming atomic writing.
Yields a writable file object. Content written to it is staged in a
backend-specific temporary location. On successful context exit the
file is atomically promoted to *path*. On exception the temporary
artifact is removed and *path* is never modified.
:param path: Store-relative key for the target file.
:param overwrite: If ``False``, raise if the file already exists.
:raises AlreadyExists: If *path* exists and *overwrite* is ``False``.
:raises CapabilityNotSupported: If the backend lacks ``ATOMIC_WRITE``.
:raises InvalidPath: If *path* is empty.
"""
_bk = self._backend.name
log.debug(
"open_atomic path=%r overwrite=%r", path, overwrite,
extra={"op": "open_atomic", "path": path, "backend": _bk},
)
self._backend.capabilities.require(Capability.ATOMIC_WRITE, backend=_bk)
with self._backend.open_atomic(
self._require_file_path(path), overwrite=overwrite
) as f:
yield f
log.info(
"open_atomic complete path=%r", path,
extra={"op": "open_atomic", "path": path, "backend": _bk},
)
3. Per-backend temp-path strategies¶
LocalBackend¶
Write to tempfile.mkstemp(dir=parent), os.replace() on success,
os.unlink() on failure. Identical to the existing write_atomic() strategy.
@contextlib.contextmanager
def open_atomic(self, path: str, *, overwrite: bool = False) -> Iterator[BinaryIO]:
full = self._resolve(path)
if not overwrite and full.exists():
raise AlreadyExists(...)
full.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=str(full.parent))
try:
with os.fdopen(fd, "wb") as f:
yield f
os.replace(tmp_path, str(full))
except BaseException:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
SFTPBackend¶
Write to .~tmp.<name>.<random> in the same directory, posix_rename() on
success (with rename() fallback), remove() on failure. Matches existing
write_atomic().
@contextlib.contextmanager
def open_atomic(self, path: str, *, overwrite: bool = False) -> Iterator[BinaryIO]:
with self._errors(path):
sftp_path = self._sftp_path(path)
if not overwrite:
# existence check (same as write_atomic)
...
self._ensure_parent_dirs(sftp_path)
tmp_path = self._make_tmp_path(sftp_path)
try:
with self._sftp.file(tmp_path, "w") as f:
yield f
# rename (same posix_rename + fallback as write_atomic)
self._atomic_rename(tmp_path, sftp_path, overwrite)
except Exception:
with contextlib.suppress(Exception):
self._sftp.remove(tmp_path)
raise
Private helpers _make_tmp_path and _atomic_rename can be extracted from
the existing write_atomic() to share logic (DRY, not a new public surface).
S3Backend¶
S3 PUT is atomic. There is no server-side temp file concept. Buffer locally
using SpooledTemporaryFile (same pattern as _StoreSink), then upload on
close via self.write().
@contextlib.contextmanager
def open_atomic(self, path: str, *, overwrite: bool = False) -> Iterator[BinaryIO]:
with self._errors(path):
if not overwrite and self._fs.exists(self._s3_path(path)):
raise AlreadyExists(...)
buf: tempfile.SpooledTemporaryFile[bytes] = tempfile.SpooledTemporaryFile(
max_size=8 * 1024 * 1024 # 8 MB before spilling to disk
)
try:
yield cast("BinaryIO", buf)
buf.seek(0)
self.write(path, cast("BinaryIO", buf), overwrite=overwrite)
finally:
buf.close()
Note on S3 multipart upload: S3 supports multipart upload for files >5 MB,
which would enable true streaming without local buffering. However, multipart
upload introduces significant complexity (part tracking, abort on failure,
minimum 5 MB part size) and s3fs already handles multipart transparently
inside pipe_file() / open("wb"). Adding our own multipart layer is out of
scope for this RFC. If profiling shows local spill is a bottleneck, a follow-up
RFC can address S3-native streaming.
S3PyArrowBackend¶
Same SpooledTemporaryFile strategy as S3Backend. PyArrow's
open_output_stream writes synchronously and doesn't support incremental
commit, so buffering is necessary.
AzureBackend¶
- Non-HNS (flat namespace): Same
SpooledTemporaryFilestrategy — Azure Blob PUT is atomic. - HNS (Data Lake Storage Gen2): Write to temp blob via DFS, then atomic
rename. Matches the existing
write_atomic()HNS path.
MemoryBackend¶
Buffer in BytesIO, commit to the internal tree on successful exit.
@contextlib.contextmanager
def open_atomic(self, path: str, *, overwrite: bool = False) -> Iterator[BinaryIO]:
segments = self._split_path(path)
if not segments:
raise InvalidPath(...)
if not overwrite and self._find_file(segments):
raise AlreadyExists(...)
buf = io.BytesIO()
yield buf
buf.seek(0)
self.write(path, buf.getvalue(), overwrite=overwrite)
4. Observe and OTel integration¶
ext.observe.ObservedBackend wraps open_atomic the same way it wraps other
methods:
@contextlib.contextmanager
def open_atomic(self, path: str, *, overwrite: bool = False) -> Iterator[BinaryIO]:
with self._observe_op("open_atomic", path, {"overwrite": overwrite}):
with self._inner.open_atomic(path, overwrite=overwrite) as f:
yield f
The _OP_TO_HOOK mapping gains "open_atomic": "on_write" (same hook as
write and write_atomic).
ext.otel gains a corresponding span via the same backend-wrapper pattern.
5. Proposed spec IDs¶
New spec 022-streaming-atomic-writes.md with IDs.
Capacity note (S3-010, AZ-007 scope): S3Backend, S3PyArrowBackend, and
AzureBackend (non-HNS) buffer the entire file via SpooledTemporaryFile
before uploading. Files <= 8 MB are held in memory; larger files spill to
disk. For streams exceeding ~10 GB, callers should consider native multipart
methods or splitting the file. A follow-up RFC may add S3-native streaming
to eliminate this constraint.
Spec IDs:
| ID | Requirement |
|---|---|
| SAW-001 | Backend.open_atomic() is abstract, returns Iterator[BinaryIO] |
| SAW-002 | Store.open_atomic() gates on Capability.ATOMIC_WRITE |
| SAW-003 | On successful exit, file is atomically visible at target path |
| SAW-004 | On exception, target path is unchanged (no partial file) |
| SAW-005 | Temp artifact is cleaned up on both success and failure |
| SAW-006 | AlreadyExists raised if file exists and overwrite=False |
| SAW-007 | InvalidPath raised if path is empty |
| SAW-008 | LocalBackend uses mkstemp + os.replace |
| SAW-009 | SFTPBackend uses .~tmp.* + posix_rename (with fallback) |
| SAW-010 | S3Backend / S3PyArrowBackend buffer then PUT (atomic by nature) |
| SAW-011 | AzureBackend non-HNS buffers then PUT; HNS uses temp + DFS rename |
| SAW-012 | MemoryBackend buffers in BytesIO then commits |
| SAW-013 | Yielded file object supports write() and tell(); seekability is backend-dependent and MUST NOT be relied upon by callers |
| SAW-014 | ext.observe fires on_write hook after successful promotion |
| SAW-015 | ext.otel emits a span covering the full open-write-promote lifecycle |
6. Refactoring opportunity¶
With open_atomic() in place, write_atomic() can be reimplemented in terms
of it on backends where the two share identical temp-path logic (Local, SFTP):
def write_atomic(self, path, content, *, overwrite=False):
with self.open_atomic(path, overwrite=overwrite) as f:
if isinstance(content, bytes):
f.write(content)
else:
shutil.copyfileobj(content, f)
This is optional — it reduces duplication but adds a function-call layer. The spec should not require this; backends may keep separate implementations if profiling shows the overhead matters.
Alternatives Considered¶
-
Extension-only (
ext.atomic). Put the context manager inext/rather than onStore. Rejected: atomicity is a core backend concern. The temp-path strategy is backend-specific (mkstemp vs.~tmp.*vs PUT semantics). An extension can't access backend internals and would have to buffer-then-callwrite_atomic(), defeating the purpose for backends that support true streaming (Local, SFTP). -
Generator-based protocol (no context manager). Have
open_atomic()return a special writer object with explicit.commit()/.abort(). Rejected: context managers are idiomatic Python for resource lifecycle. An explicit commit API risks forgottenabort()calls on error paths.contextlib.contextmanagergives us try/finally cleanup for free. -
S3 multipart upload. Use S3's native multipart upload to avoid local buffering entirely. Rejected for this RFC: significant complexity (part tracking, 5 MB minimum part size, explicit abort).
s3fshandles multipart internally for largepipe_file()calls, so theSpooledTemporaryFileapproach gets multipart upload "for free" once it flushes. A follow-up RFC can add native multipart if profiling shows the spill-to-disk is a bottleneck. -
Add
open_write()(non-atomic) instead. A streaming write without atomicity guarantees. Rejected: the primary motivation is safely producing large files. A non-atomic streaming write leaves partial files on failure, which is the exact problem users want to avoid in data-lake workflows. Non-atomic streaming can be added separately if needed.
Impact¶
- Public API:
Store.open_atomic()added.Backend.open_atomic()added (abstract).__all__unchanged (Store is already exported; Backend is not in__all__). - Backwards compatibility: Non-breaking. New method, no changes to existing
methods. Third-party
Backendsubclasses will need to implementopen_atomic()— this is a breaking change for external backend implementations, but the library is Beta and the Backend ABC is documented as unstable. - Performance: For Local and SFTP, streaming avoids buffering the entire
file in memory — peak memory drops from O(file_size) to O(chunk_size). For
S3/Azure (non-HNS), memory profile is similar to
write_atomic()withSpooledTemporaryFileproviding disk spill for files > 8 MB. - Testing: Spec
022-streaming-atomic-writes.mdmust include tests for: - Success path (SAW-003): temp file created during write, promoted to
target on
__exit__(None), temp artifact no longer exists, target readable with correct content. - Exception path (SAW-004, SAW-005): exception inside
withblock, target path unchanged (no partial file), temp artifact cleaned up. - Early close (SAW-005): caller calls
f.close()before exiting context — behaviour defined (promote or raise). AlreadyExistsguard (SAW-006):overwrite=Falseraises when target exists;overwrite=Truesucceeds and replaces content.InvalidPath(SAW-007): empty path raisesInvalidPath.- Per-backend temp-path validation (SAW-008/009/010/011/012):
- Local:
mkstemptemp file exists in parent dir during write. - SFTP:
.~tmp.*file exists on server during write, removed after. - S3/S3-PyArrow:
SpooledTemporaryFileused (no server-side temp). - Azure non-HNS: buffered PUT; Azure HNS: temp blob + DFS rename.
- Memory:
BytesIObuffer, committed on success.
- Local:
- Content correctness: multi-chunk write, content matches after promotion (all 6 backends).
- Observe hook (SAW-014):
on_writefires after successful promotion, does not fire on exception path. - OTel span (SAW-015): span covers full open-write-promote lifecycle.
- Integration with PyArrow
pq.write_table()(example, not unit test).
Open Questions¶
-
Spill threshold for cloud backends. The proposal uses 8 MB as the
SpooledTemporaryFilethreshold for S3/Azure. Should this be configurable? The_StoreSinkinext.arrowuses a configurablewrite_spill_threshold(default 5 MB). Using the same default and making it a backend config option adds complexity. Recommendation: hardcode 8 MB for now, add configurability if users request it. -
Should
open_atomicbe onStoreor inext/? This RFC proposesStore.open_atomic()(core). The argument: atomicity is already a core concern (write_atomicis on Store), and the temp-path strategy requires backend internals. Counter-argument: it adds surface area to the core API. Recommendation: core, matchingwrite_atomic(). -
Seekability of the yielded stream. Should callers be able to
seek()on the yielded file object? For Local/SFTP, the underlying temp file is seekable. For cloud backends usingSpooledTemporaryFile, it's also seekable. However, seeking mid-write is an unusual pattern, and documenting seekability as guaranteed would constrain future backends (e.g. native multipart). Resolution: SAW-013 declares seekability as backend-dependent — callers MUST NOT rely on it. All current backends happen to yield seekable streams, but this is an implementation detail, not a contract. Future backends (e.g. native S3 multipart) may yield non-seekable streams.
References¶
- Backlog: ID-026 (Streaming atomic writes)
- ADR-0008 (Extension architecture) — mentions ID-026 needs context manager protocol
- Existing implementation:
write_atomic()on all 6 backends - Related pattern:
ext.arrow._StoreSink(buffer-then-flush) - Data lake guide:
guides/data-lake-patterns.md(references ID-026) - Spec prefix:
SAW(Streaming Atomic Writes)