Observe hooks¶
Callback-based instrumentation for Store operations — logging, metrics, auditing, and error tracking.
"""Observe hooks — Callback-based instrumentation for Store operations — logging, metrics, auditing, and error tracking.
Demonstrates:
- observe(): wrapping a Store with callbacks
- on_write / on_read / on_any: per-operation and catch-all hooks
- StoreEvent: inspecting operation details
- around: context-manager hook for before/after instrumentation
- BufferedObserver: batched async event delivery
---
see_also:
- label: Observe
url: ../../guides/observe.md
note: instrumentation guide
"""
from __future__ import annotations
import contextlib
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from collections.abc import Iterator
from remote_store import (
BackendConfig,
BufferedObserver,
Registry,
RegistryConfig,
Store,
StoreEvent,
StoreProfile,
observe,
)
def demo(store: Store) -> dict[str, list[Any]]:
"""Hook-based observation. Returns collected events for test verification."""
results: dict[str, list[Any]] = {
"write_events": [],
"read_events": [],
"any_events": [],
"around_ops": [],
"buffered_events": [],
}
# --- Per-operation hooks ---
print("=== Per-operation hooks ===")
def on_write(event: StoreEvent) -> None:
results["write_events"].append(event)
print(f" [on_write] {event.operation} {event.path} ({event.duration_ms:.2f}ms)")
def on_read(event: StoreEvent) -> None:
results["read_events"].append(event)
print(f" [on_read] {event.operation} {event.path} ({event.duration_ms:.2f}ms)")
observed = observe(store, on_write=on_write, on_read=on_read)
observed.write_text("hello.txt", "Hello, world!")
observed.write("data.csv", b"a,b,c\n1,2,3", overwrite=False)
_ = observed.read_bytes("hello.txt")
print()
# --- Catch-all hook ---
print("=== Catch-all (on_any) ===")
def on_any(event: StoreEvent) -> None:
results["any_events"].append(event)
status = "OK" if event.error is None else f"ERR: {event.error}"
print(f" [{event.operation}] path={event.path!r} {status} ({event.duration_ms:.2f}ms)")
observed = observe(store, on_any=on_any)
observed.exists("hello.txt")
observed.copy("hello.txt", "hello_copy.txt")
observed.delete("hello_copy.txt")
print()
# --- Around hook ---
print("=== Around hook (before/after) ===")
@contextlib.contextmanager
def trace(op: str, path: str, backend: str) -> Iterator[None]:
results["around_ops"].append(op)
print(f" >> BEFORE {op}({path!r}) on {backend}")
yield
print(f" << AFTER {op}({path!r}) on {backend}")
observed = observe(store, around=trace)
observed.is_file("hello.txt")
print()
# --- BufferedObserver ---
print("=== BufferedObserver ===")
def handle_batch(events: list[StoreEvent]) -> None:
results["buffered_events"].extend(events)
print(f" Flushed batch of {len(events)} events:")
for e in events:
print(f" - {e.operation} {e.path}")
observer = BufferedObserver(handle_batch, flush_interval=60.0)
observed = observe(store, on_any=observer.on_event)
observed.write("batch1.txt", b"one", overwrite=True)
observed.write("batch2.txt", b"two", overwrite=True)
observed.exists("batch1.txt")
observer.flush()
observer.close()
return results
if __name__ == "__main__":
config = RegistryConfig(
backends={"mem": BackendConfig(type="memory", options={})},
stores={"data": StoreProfile(backend="mem", root_path="data")},
)
with Registry(config) as registry:
store = registry.get_store("data")
demo(store)
print("\nDone!")
See also¶
- Observe — instrumentation guide
- Source:
examples/extensions/observe_hooks.py