Skip to content

Streaming I/O

Streaming writes and reads with BytesIO.

"""Streaming I/O — Streaming writes and reads with `BytesIO`.

Demonstrates streaming-first I/O patterns with remote-store.
"""

from __future__ import annotations

import io
import tempfile

from remote_store import BackendConfig, Registry, RegistryConfig, Store, StoreProfile


def demo(store: Store) -> None:
    """Streaming write, line-by-line read, and chunked read."""
    # --- Write from a BytesIO stream ---
    data = b"line1\nline2\nline3\nline4\nline5\n"
    stream = io.BytesIO(data)
    result = store.write("streamed.txt", stream)
    print(f"Wrote {result.size} bytes from BytesIO stream.")

    # --- Read as a stream ---
    with store.read("streamed.txt") as reader:
        print(f"\nStreaming read (type: {type(reader).__name__}):")
        newline = b"\n"
        for line in reader:
            print(f"  {line.rstrip(newline)!r}")

    # --- Chunked processing ---
    large_data = b"X" * 10_000
    result = store.write("large.bin", large_data)

    with store.read("large.bin") as reader:
        total = 0
        chunk_count = 0
        while True:
            chunk = reader.read(4096)
            if not chunk:
                break
            total += len(chunk)
            chunk_count += 1
    print(f"\nRead large.bin in {chunk_count} chunk(s), {total}/{result.size} bytes total.")

    # --- Write bytes directly ---
    result = store.write("direct.txt", b"Written as raw bytes")
    print(f"\nDirect write ({result.size} bytes): {store.read_bytes('direct.txt').decode()}")


if __name__ == "__main__":
    with tempfile.TemporaryDirectory() as tmp:
        config = RegistryConfig(
            backends={"local": BackendConfig(type="local", options={"root": tmp})},
            stores={"files": StoreProfile(backend="local")},
        )

        with Registry(config) as registry:
            store = registry.get_store("files")
            demo(store)

    print("\nDone!")

See also