Research: Dagster Extension (ext.dagster, ID-075)¶
Date: 2026-03-13
Backlog item: ID-075 — Dagster integration (ext.dagster)
Status: Research complete — ready for design decisions
1. Problem Statement¶
Dagster already provides backend-specific IO managers (dagster-aws, dagster-azure,
dagster-gcp) that handle S3/Azure/GCS portability natively — swapping the "io_manager"
resource is all that's needed; asset code stays unchanged.
The gap this extension fills is for teams already using remote-store who adopt Dagster.
These teams have an existing Store (or Registry) with configured backends, credentials,
retry policy, caching, and observability wrapping. They should not need to duplicate that
config into dagster-aws/dagster-azure alongside what they already have.
A thin adapter — remote_store_io_manager(store) — lets any existing Store object serve
as a Dagster IO manager with zero duplication. This is genuinely useful and not covered by
any existing Dagster library. Additionally, Dagster has no native SFTP IO manager, so
remote-store covers that backend directly.
2. Dagster IO Manager API (1.7–1.12+)¶
2.1 Core interface¶
| Class | Use when |
|---|---|
IOManager |
Raw base class; implement handle_output(context, obj) and load_input(context) |
ConfigurableIOManager |
Preferred when config fields and I/O live in the same class |
ConfigurableIOManagerFactory |
Preferred when factory class is separate from the actual IOManager impl (e.g., wraps a stateful connection) |
UPathIOManager |
Subclass of ConfigurableIOManager; filesystem-agnostic via universal_pathlib/fsspec; define dump_to_path(ctx, obj, path) and load_from_path(ctx, path) |
2.2 OutputContext — key attributes¶
| Attribute | Type | Notes |
|---|---|---|
asset_key |
AssetKey |
.path is Sequence[str] |
has_partition_key |
bool |
True for partitioned assets |
asset_partition_key |
str |
Current partition key (single) |
asset_partition_keys |
Sequence[str] |
All partition keys for this run |
definition_metadata |
dict |
Metadata from AssetOut / Out (was metadata, deprecated) |
add_output_metadata(dict) |
method | Adds materialization metadata visible in the UI |
2.3 InputContext — key attributes¶
| Attribute | Type | Notes |
|---|---|---|
asset_key |
AssetKey |
|
has_partition_key |
bool |
|
asset_partition_key |
str |
|
asset_partition_keys |
Sequence[str] |
|
upstream_output |
OutputContext |
Access upstream output definition metadata |
get_asset_identifier() returns [*asset_key.path, partition_key] when partitioned —
useful for deriving storage paths.
2.4 Testing helpers¶
from dagster import build_output_context, build_input_context, build_asset_context
ctx = build_output_context(asset_key=AssetKey(["my", "asset"]))
io_manager.handle_output(ctx, obj)
ctx = build_input_context(asset_key=AssetKey(["my", "asset"]))
result = io_manager.load_input(ctx)
No running orchestrator needed; these cover unit tests completely.
2.5 Version landscape¶
| Dagster | Python | Notes |
|---|---|---|
| 1.7.x | 3.8–3.12 | Introduced ConfigurableResource / ConfigurableIOManager |
| 1.9.x | 3.9–3.12 | Dropped Python 3.8 |
| 1.12.x+ | 3.10–3.14 | Dropped Python 3.9; aligns with remote-store's requires-python = ">=3.10" |
Recommendation: Floor at dagster>=1.9. remote-store requires Python 3.10+ so our
users will effectively be on Dagster 1.12+, but 1.9 is the lowest stable version that
aligns with our Python floor without imposing an artificially tight pin.
3. Why NOT UPathIOManager¶
UPathIOManager is the right base for fsspec/universal_pathlib backends (S3, GCS,
Azure via fsspec). However:
-
Dependency leak.
UPathIOManagerrequiresuniversal_pathlib(which requiresfsspec). remote-store explicitly avoids fsspec as a user-visible dependency (ADR-0003). Pulling it in through Dagster would leak a dep we committed to hiding. -
Conceptual mismatch.
UPathIOManageroperates onUPathobjects (fsspec paths). remote-store'sStoreAPI is its own abstraction, not an fsspec filesystem. Bridging throughext.arrow'spyarrow_fsadapter would addpyarrowas a required dep just to make path objects work. -
Path-kwargs stripping bug.
UPathIOManager._with_extension()instantiates a newUPathwithout preserving auth kwargs — a known issue that would affect cloud backends. -
Extension is the right fit. The remote-store public API (
read_bytes,write,exists) is exactly the interface needed. There is no value in the fsspec abstraction layer.
Decision: Implement against the public Store API directly. v1 wraps the
raw IOManager base class; v2 adds ConfigurableIOManagerFactory (two-class
pattern) for Dagster-config-driven Store construction.
4. Proposed Design¶
4.1 Scope split: v1 vs v2¶
v1 — adapter for existing Store users (primary audience):
- remote_store_io_manager(store, serializer="pickle") factory function
- _RemoteStoreIOManagerImpl — internal IOManager subclass
- Built-in serializers: pickle, JSON, Parquet (optional PyArrow)
- Caller owns Store lifecycle (no teardown responsibility); the adapter does
not close the Store — see §4.3 lifecycle note
v2 (deferred) — Dagster-config-driven Store construction:
- DagsterStoreResource — ConfigurableResource that builds a Store from
Dagster config fields (backend_type, backend_options, root_path)
- DagsterStoreResource.teardown_after_execution() → calls store.close()
to prevent connection leaks on connection-based backends (SFTP, S3)
- RemoteStoreIOManager — ConfigurableIOManagerFactory wrapping the resource
- Targets Dagster-first users who don't already have a Store
This split ships value to the primary audience (existing remote-store users) sooner, while deferring the higher-effort Dagster-native config integration.
4.2 v1 architecture¶
remote_store_io_manager(store, serializer) → factory function (exported)
Returns a _RemoteStoreIOManagerImpl
_RemoteStoreIOManagerImpl → IOManager (internal, not exported)
__init__(store: Store, serializer: Serializer)
handle_output(context, obj) → None
load_input(context) → Any
Only remote_store_io_manager is exported in __all__ for v1.
4.3 v1 usage¶
from dagster import Definitions, asset, io_manager, IOManager
from remote_store import Store
from remote_store.backends import LocalBackend
from remote_store.ext.dagster import remote_store_io_manager
@io_manager
def my_io_manager() -> IOManager:
store = Store(LocalBackend(root="/data/dagster"))
return remote_store_io_manager(store, serializer="pickle")
@asset
def raw_data() -> dict:
return {"rows": [1, 2, 3]}
defs = Definitions(
assets=[raw_data],
resources={"io_manager": my_io_manager},
)
For teams using Registry:
@io_manager
def production_io_manager() -> IOManager:
registry = Registry(config)
store = registry.get_store("production")
return remote_store_io_manager(store, serializer="pickle")
Lifecycle note (v1): The caller owns the Store. The adapter does not close it. If the Store was created inline (not from a long-lived Registry), the caller is responsible for cleanup.
4.4 v2 architecture (deferred)¶
DagsterStoreResource → ConfigurableResource that builds a Store
backend_type: str
backend_options: dict
root_path: str = ""
teardown_after_execution() → store.close()
RemoteStoreIOManager → ConfigurableIOManagerFactory
store_resource: ResourceDependency[DagsterStoreResource]
serializer: str = "pickle" # "pickle" | "json" | "parquet"
extension: str = "" # override file extension (default: serializer default)
DagsterStoreResource and RemoteStoreIOManager would be added to __all__
in v2. teardown_after_execution() calls store.close() to prevent connection
leaks on SFTP/S3 backends.
4.5 v2 usage (deferred)¶
from dagster import Definitions, asset
from remote_store.ext.dagster import DagsterStoreResource, RemoteStoreIOManager
@asset
def raw_data() -> dict:
return {"rows": [1, 2, 3]}
defs = Definitions(
assets=[raw_data],
resources={
"io_manager": RemoteStoreIOManager(
store_resource=DagsterStoreResource(
backend_type="s3",
backend_options={"bucket": "my-bucket"},
root_path="dagster",
),
serializer="pickle",
)
},
)
4.6 Path generation strategy¶
Storage path derived from context.get_asset_identifier() (includes partition key when
partitioned):
| Asset / Partition | Derived path |
|---|---|
AssetKey(["foo", "bar"]), no partition |
foo/bar.<ext> |
AssetKey(["foo", "bar"]), partition "2026-01" |
foo/bar/2026-01.<ext> |
AssetKey(["ns", "report"]), no partition |
ns/report.<ext> |
Implementation:
def _asset_path(context: OutputContext | InputContext, ext: str) -> str:
identifier = context.get_asset_identifier() # ["foo", "bar"] or ["foo", "bar", "2026-01"]
*parts, last = identifier
path = "/".join(parts + [last]) if parts else last
return f"{path}{ext}"
The root_path of the Store (set on DagsterStoreResource) acts as the namespace
prefix — no need to embed it in the path generation logic.
4.7 Serializer protocol¶
from typing import Protocol, runtime_checkable
@runtime_checkable
class Serializer(Protocol):
extension: str # e.g., ".pkl", ".json", ".parquet"
def serialize(self, obj: Any) -> bytes: ...
def deserialize(self, data: bytes) -> Any: ...
Built-in serializers:
| Name | serializer= string |
extension |
Dependency | Notes |
|---|---|---|---|---|
PickleSerializer |
"pickle" (default) |
.pkl |
stdlib | Universal; opaque |
JsonSerializer |
"json" |
.json |
stdlib | JSON-serializable objects only |
ParquetSerializer |
"parquet" |
.parquet |
pyarrow>=14.0 |
DataFrames (pandas, polars) |
ParquetSerializer is guarded behind the same try/except pattern as ext.arrow.
Users who pass serializer="parquet" without PyArrow installed get a clear error:
"PyArrow is required for the parquet serializer. Install it with: pip install 'remote-store[dagster,arrow]'".
Custom serializers can be passed as objects:
RemoteStoreIOManager(
store_resource=...,
serializer=MyCustomSerializer(), # any object matching Serializer protocol
)
The serializer field type would be str | Serializer. In practice Dagster config
fields must be Pydantic-compatible types — so the serializer field of
RemoteStoreIOManager stores a string ("pickle" | "json" | "parquet"), and a
custom_serializer field can accept an arbitrary Serializer object at construction
time (but this field won't be config-drivable). This is a known trade-off of Pydantic-
based config.
5. Design Decision: Single file vs. separate distribution¶
| Option | Pros | Cons |
|---|---|---|
ext/dagster.py in remote-store |
Consistent with existing extensions; no extra PyPI package to maintain; single install | Adds dagster as an optional dep to the main package |
Separate remote-store-dagster PyPI package |
Completely separate release cycle; dagster dependency not in remote-store | Two packages, doubled maintenance; harder discovery |
Recommendation: ext/dagster.py in the main package — consistent with
ext/arrow.py, ext/otel.py, ext/pydantic.py. Dagster is declared as an optional
dep under the dagster extra. Users install pip install "remote-store[dagster]".
The backlog's third-party naming convention (remote-store-<name>) is for external
packages, not for first-party extensions — this is first-party.
6. Design Decision: DagsterStoreResource vs. RegistryConfig YAML (v2)¶
Note: This section applies to the v2 DagsterStoreResource scope.
The backlog mentions "wraps RegistryConfig". Two approaches:
Option A: Direct backend fields (recommended)
DagsterStoreResource has backend_type, backend_options, root_path fields.
Simple, transparent, no RegistryConfig indirection.
Option B: Profile-based via RegistryConfig
DagsterStoreResource has profile_name and points to a YAML/env-var config.
More powerful but complex; requires ext.yaml or ext.toml.
Recommendation: Option A for v1, Option B as a follow-up. Ship simple first.
A profile field can be added later without breaking Option A.
7. Design Decision: DagsterStoreResource backend_options schema (v2)¶
Note: This section applies to the v2 DagsterStoreResource scope.
backend_options is dict[str, Any] — same as BackendConfig.options in RegistryConfig.
Secret wrapping (e.g., account_key, password) should use the same logic as
RegistryConfig.from_dict — wrap sensitive keys in Secret.
Dagster's EnvVar("MY_SECRET") can be used for the string values, which Dagster
resolves at runtime. The DagsterStoreResource.get_store() method calls
RegistryConfig.from_dict({"backends": {...}, "stores": {...}}) to build the Store.
8. Testing Strategy¶
Unit tests (no Dagster orchestrator)¶
from dagster import build_output_context, build_input_context, AssetKey
from remote_store.backends import MemoryBackend
from remote_store import Store
from remote_store.ext.dagster import remote_store_io_manager
def test_roundtrip_pickle():
store = Store(MemoryBackend())
mgr = remote_store_io_manager(store, serializer="pickle")
obj = {"key": "value"}
out_ctx = build_output_context(asset_key=AssetKey(["my", "asset"]))
mgr.handle_output(out_ctx, obj)
in_ctx = build_input_context(asset_key=AssetKey(["my", "asset"]),
upstream_output=out_ctx)
assert mgr.load_input(in_ctx) == obj
build_output_context / build_input_context are sufficient — no dagster-daemon or
dagster-webserver needed.
What must be tested¶
| Test | Spec ID to assign | Scope |
|---|---|---|
| Pickle roundtrip (MemoryBackend) | DAG-001 | v1 |
| JSON roundtrip | DAG-002 | v1 |
| Parquet roundtrip (pandas DataFrame) | DAG-003 | v1 |
| Partitioned asset — path includes partition key | DAG-004 | v1 |
| Multi-segment asset key maps to nested path | DAG-005 | v1 |
DagsterStoreResource.get_store() builds correct Store |
DAG-006 | v2 |
Missing file raises NotFound on load |
DAG-007 | v1 |
handle_output adds metadata to context |
DAG-008 | v1 |
| Custom serializer protocol respected | DAG-009 | v1 |
| Missing PyArrow for parquet gives helpful error | DAG-010 | v1 |
9. Interaction with Existing Extensions¶
-
ext.arrow:ParquetSerializercan reusepyarrowdirectly — no need to go throughStoreFileSystemHandler. The serializer just writesbytesandstore.write()handles the rest. -
ext.observe: Users who want observability on their Dagster Store operations can wrap the Store withobserve()before passing toDagsterStoreResource. The IO manager only sees the publicStoreAPI so any wrapping is transparent. -
ext.cache: Same pattern — wrap withcached_store()before passing. No special integration needed. -
ext.partition:ext.partitionprovidespartition_path()for date-based paths. This is complementary; users can override path logic via a custom_get_pathhook if needed, but the default asset-key-based path is sufficient for most cases.
10. Open Questions — Resolved¶
| Question | Resolution |
|---|---|
| Single package vs. separate distribution? | ext/dagster.py in remote-store — consistent with existing extensions |
| Serialization default? | "pickle" (universal); Parquet behind optional PyArrow guard |
| Dagster version floor? | dagster>=1.9; aligns with Python 3.9+ drop and ConfigurableResource stability |
| Testing without orchestrator? | build_output_context / build_input_context are sufficient |
Use UPathIOManager? |
No — introduces fsspec/universal_pathlib dep; violates ADR-0003 |
| Path for multi-segment asset keys? | "/".join(asset_key.path) + "/" + partition_key + ext |
Secret handling in backend_options? |
Delegate to RegistryConfig.from_dict — same Secret wrapping as everywhere else |
11. Remaining Open Questions (for owner to decide)¶
-
DagsterStoreResource.backend_optionstyping (v2): Should sensitive keys inbackend_optionsaccept Dagster'sEnvVar("...")type directly, or require users to use Dagster'sEnvVarstring resolution before passing? The simplest approach is plain strings (users set env vars themselves). DagsterEnvVarintegration would be a follow-up. Deferred to v2 scope. -
Multi-partition loading: When a downstream asset has multiple upstream partitions (e.g., time-window aggregation),
load_inputis called once withasset_partition_keyscontaining N keys. The IO manager should return adict[str, Any]mapping partition key → deserialized object. This is the Dagster convention. Confirm this behavior is in scope for v1 or explicitly deferred. -
extensionoverride: Should users be able to override the file extension independently of the serializer (e.g., serializer="pickle" but extension=".dat")? Probably not — keep it simple; extension is determined by the serializer. -
handle_outputforNone: When an asset returnsNone(e.g., a sink asset that writes to a DB), shouldhandle_outputskip writing (no-op) or write the pickledNone? The convention in Dagster's own IO managers is to write it, so downstream can distinguish "never materialized" from "materialized as None". Confirm.
12. Proposed Spec IDs¶
sdd/specs/030-ext-dagster.md — DAGSTER-NNN series, DAG-001 through DAG-010+ (see §8).
13. Maintenance & Risk¶
- Dagster API churn: Dagster actively renames classes and changes metadata
APIs (e.g.,
ConfigurablePickledObjectS3IOManagerrenamed;definition_metadatareplacedmetadata). Each Dagster major bump may require adapter updates. - v1 scope minimises exposure:
remote_store_io_manager(store)wraps only the stableIOManagerbase class (handle_output/load_input). No dependency onConfigurableResourceor Dagster config plumbing in v1. - Version floor at
dagster>=1.9(not 1.7): aligns with Python 3.10+ floor and avoids supporting the transitional 1.7–1.8 API surface. - Alternative for SFTP-only users: A 40-line custom
ConfigurableIOManagerwith paramiko directly is viable. The extension's value is avoiding that boilerplate and reusing existingStoreconfig/wrapping.
14. Implementation Checklist (SDD pipeline)¶
v1 — remote_store_io_manager(store)¶
- [ ] RFC:
sdd/rfcs/rfc-0005-dagster-extension.md - [ ] Spec:
sdd/specs/030-ext-dagster.md(DAG-001 … DAG-010+) - [ ] Implementation:
src/remote_store/ext/dagster.py(remote_store_io_manager, serializers,_RemoteStoreIOManagerImpl) - [ ] Tests:
tests/test_dagster.pywith@pytest.mark.spec("DAG-NNN") - [ ] Guide:
guides/dagster.md(setup, usage patterns, serializer guide) - [ ] Docs wiring:
docs-src/api/ext-dagster.md,docs-src/_nav.yml - [ ]
pyproject.toml: adddagsterextra (dagster>=1.9) - [ ] CHANGELOG, BACKLOG updates (same commit)
v2 (deferred) — DagsterStoreResource + RemoteStoreIOManager¶
- [ ]
DagsterStoreResource:ConfigurableResourcewithbackend_type,backend_options,root_path - [ ]
DagsterStoreResource.teardown_after_execution()→store.close() - [ ]
RemoteStoreIOManager:ConfigurableIOManagerFactorywrapping the resource - [ ]
remote_store/__init__.py: conditional re-export of v2 classes - [ ] Additional tests: DAG-006 (
DagsterStoreResource.get_store()), teardown test - [ ] Guide update: Dagster-config-driven usage section
References¶
- Dagster IO Managers API
- Defining a Custom IO Manager
- dagster-aws S3 IO Manager source
- UPathIOManager PR
- Dagster releases and compatibility
- ADR-0003: fsspec is an implementation detail (
sdd/adrs/0003-fsspec-is-implementation-detail.md) - ADR-0008: Extension architecture (
sdd/adrs/0008-extension-architecture.md)