Observability Hooks¶
The ext.observe extension wraps a Store in a proxy that fires
user-defined callbacks after each operation. This enables logging,
metrics collection, auditing, and tracing without modifying business
code.
Quick Start¶
from remote_store import observe
def on_write(event):
print(f"Wrote {event.path} in {event.duration_ms:.1f}ms")
observed = observe(store, on_write=on_write)
observed.write("data/report.csv", csv_bytes)
# prints: Wrote data/report.csv in 12.3ms
The returned ObservedStore is a full Store subclass — it passes
isinstance(observed, Store) and works everywhere a Store is expected.
Hook Types¶
Per-operation hooks (after-only)¶
Each hook fires after the operation completes (success or failure):
| Hook | Operations |
|---|---|
on_read |
read, read_bytes |
on_write |
write, write_atomic, open_atomic |
on_delete |
delete, delete_folder |
on_copy |
copy |
on_move |
move |
on_list |
list_files, list_folders, iter_children, glob, get_file_info, get_folder_info, exists, is_file, is_folder |
on_ping |
ping |
on_error |
Any operation that raises an exception |
on_any |
Every operation (catch-all) |
Around hook (context manager)¶
The around parameter accepts a factory that returns a context manager
wrapping the entire operation:
import contextlib
@contextlib.contextmanager
def trace_span(op, path, backend):
span = tracer.start_span(f"store.{op}")
span.set_attribute("path", path)
try:
yield
finally:
span.end()
observed = observe(store, around=trace_span)
StoreEvent¶
Every hook receives a StoreEvent frozen dataclass:
@dataclasses.dataclass(frozen=True)
class StoreEvent:
operation: str # "read", "write", "delete", ...
path: str # store-relative key
backend: str # backend name
started_at: float # time.monotonic()
duration_ms: float # elapsed milliseconds
error: Exception | None # None on success
metadata: dict[str, Any] # op-specific: overwrite, dst, recursive, ...
correlation_id: str | None # from set_correlation_id(), None if not set
Correlation IDs¶
Use set_correlation_id() to tag all events within a scope with
a shared identifier (e.g., a request ID):
from remote_store import observe, set_correlation_id
observed = observe(store, on_any=lambda e: print(e.correlation_id))
token = set_correlation_id("req-abc-123")
observed.read_bytes("data.csv") # correlation_id="req-abc-123"
observed.write("out.csv", b"...") # correlation_id="req-abc-123"
set_correlation_id(None) # or: from contextvars import Token; _correlation_id.reset(token)
The correlation ID is stored in a contextvars.ContextVar, so it
works correctly with threading and async tasks.
BufferedObserver¶
For high-throughput scenarios, BufferedObserver collects events in a
thread-safe queue and flushes them in batches:
from remote_store import BufferedObserver, observe
def send_to_analytics(events):
for event in events:
analytics.track("store_op", {
"op": event.operation,
"path": event.path,
"duration_ms": event.duration_ms,
})
observer = BufferedObserver(send_to_analytics, flush_interval=10.0)
observed = observe(store, on_any=observer.on_event)
# ... use observed store ...
observer.close() # final flush + stop background thread
Parameters:
- max_queue (default 1000): events are dropped when the queue is full.
- flush_interval (default 5.0): seconds between automatic flushes.
Intrinsic Logging¶
The library ships with built-in stdlib logging. Every module uses
logging.getLogger(__name__), so all loggers live under the
remote_store namespace (e.g. remote_store._store,
remote_store.backends._local, remote_store.ext.observe).
A NullHandler is attached to the root "remote_store" logger so
the library is silent by default.
Showing only warnings¶
Set the level on the "remote_store" logger — this applies to all
child loggers in the hierarchy:
import logging
logging.basicConfig() # ensure at least one handler on root
logging.getLogger("remote_store").setLevel(logging.WARNING)
Verbose debug output¶
import logging
logging.basicConfig(level=logging.WARNING) # keep other libs quiet
logging.getLogger("remote_store").setLevel(logging.DEBUG)
# DEBUG remote_store._store: read path='data/file.csv'
# INFO remote_store._store: write complete path='output.csv'
Logging levels used¶
| Level | When |
|---|---|
DEBUG |
Method entry (every Store operation) |
INFO |
Mutating-operation completion (write, delete, move, copy) |
WARNING |
Suppressed hook exceptions, fallback behaviour |
ERROR |
Before re-raising backend errors |
Structured extra fields¶
Log records include structured extra fields accessible via custom
formatters or structlog processors:
| Field | Description |
|---|---|
op |
Operation name ("read", "write", ...) |
path |
Store-relative key |
backend |
Backend name ("local", "s3", ...) |
Error Handling¶
- Operation errors always propagate. The proxy catches exceptions only
to build the
StoreEvent(witherrorset) and fire hooks, then re-raises. - Hook exceptions are suppressed. A failing hook never breaks the observed operation. Hook errors are logged at WARNING level.
Composing with Other Extensions¶
ext.observe wraps at the Store level, so it composes naturally with
other extensions:
from remote_store import observe, batch_delete
observed = observe(store, on_any=print_event)
# batch_delete calls observed.delete() for each path --
# each individual delete fires hooks
batch_delete(observed, ["a.txt", "b.txt", "c.txt"])
The ext.transfer on_progress callback remains separate (different
concern: UI progress vs telemetry).
Layer 3: OpenTelemetry Bridge¶
The ext.otel module provides pre-built hooks that emit
OpenTelemetry spans and metrics. Install
the optional dependency:
This depends only on opentelemetry-api (not the SDK). If no SDK is
configured at runtime, all OTel calls become zero-cost no-ops.
Quick start¶
from remote_store.ext.otel import otel_observe
observed = otel_observe(store)
observed.write("data/report.csv", csv_bytes)
# -> OTel span "store.write" + metrics recorded automatically
Or compose with other hooks using otel_hooks():
from remote_store import observe
from remote_store.ext.otel import otel_hooks
observed = observe(
store,
**otel_hooks(),
on_error=my_error_alerter,
)
Spans¶
Each operation creates a span with:
- Name:
store.{operation}(e.g.store.read,store.write) - Kind:
SpanKind.CLIENT(outbound call to storage) - Attributes:
remote_store.operation,remote_store.backend,remote_store.path - On error: Status set to ERROR, exception recorded,
error.typeattribute added
Metrics¶
Three instruments are created:
| Type | Name | Unit | Attributes |
|---|---|---|---|
| Counter | remote_store.operations |
1 |
operation, backend, status |
| Counter | remote_store.errors |
1 |
operation, backend, error.type |
| Histogram | remote_store.operation.duration |
s |
operation, backend; plus error.type on error |
Note: path is intentionally excluded from metric attributes to
avoid high-cardinality issues with metric backends like Prometheus.
Configuring the SDK¶
The bridge works with any OpenTelemetry SDK configuration. A typical production setup exports to an OTLP-compatible collector:
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(OTLPSpanExporter())
)
# Now otel_observe() spans flow to your collector
observed = otel_observe(store)