Skip to content

Commit a7be067

Browse files
committed
feat: add options support to Store for format-specific Spark DataFrameWriter options
1 parent 88f7474 commit a7be067

File tree

4 files changed

+189
-2
lines changed

4 files changed

+189
-2
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,27 @@ The `Store` class supports the following options:
9999
Use `Store.for_download()` as a convenient shorthand for storing results
100100
as a single Parquet file with a presigned URL.
101101

102+
#### Store options
103+
104+
You can pass format-specific Spark write options through the `options`
105+
parameter. These correspond to the options available in Spark's
106+
`DataFrameWriter` and are applied after the server's default options,
107+
allowing you to override them.
108+
109+
```python
110+
# CSV without headers and a custom delimiter
111+
store = Store.for_download(
112+
format=StorageFormat.CSV,
113+
options={"header": "false", "delimiter": "|"},
114+
)
115+
116+
# GeoJSON preserving null fields
117+
store = Store.for_download(
118+
format=StorageFormat.GEOJSON,
119+
options={"ignoreNullFields": "false"},
120+
)
121+
```
122+
102123
### Execution progress
103124

104125
You can monitor the progress of running queries by registering a

tests/test_store_options.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
"""Tests for Store options support."""
2+
3+
import json
4+
5+
from wherobots.db.models import Store
6+
from wherobots.db.types import StorageFormat
7+
8+
9+
class TestStoreOptions:
10+
"""Tests for the options field on Store."""
11+
12+
def test_default_options_is_none(self):
13+
store = Store(format=StorageFormat.PARQUET)
14+
assert store.options is None
15+
16+
def test_options_set(self):
17+
store = Store(
18+
format=StorageFormat.CSV,
19+
options={"header": "false", "delimiter": "|"},
20+
)
21+
assert store.options == {"header": "false", "delimiter": "|"}
22+
23+
def test_empty_options_normalized_to_none(self):
24+
store = Store(format=StorageFormat.PARQUET, options={})
25+
assert store.options is None
26+
27+
def test_none_options_stays_none(self):
28+
store = Store(format=StorageFormat.PARQUET, options=None)
29+
assert store.options is None
30+
31+
def test_options_defensively_copied(self):
32+
original = {"header": "false"}
33+
store = Store(format=StorageFormat.CSV, options=original)
34+
# Mutating the original should not affect the store
35+
original["delimiter"] = "|"
36+
assert "delimiter" not in store.options
37+
38+
def test_options_dict_is_mutable(self):
39+
"""Store is not frozen, so options dict can be mutated after construction."""
40+
store = Store(format=StorageFormat.CSV, options={"header": "false"})
41+
store.options["delimiter"] = "|"
42+
assert store.options == {"header": "false", "delimiter": "|"}
43+
44+
45+
class TestStoreForDownloadWithOptions:
46+
"""Tests for Store.for_download() with options parameter."""
47+
48+
def test_for_download_default_no_options(self):
49+
store = Store.for_download()
50+
assert store.options is None
51+
52+
def test_for_download_with_options(self):
53+
store = Store.for_download(options={"header": "false"})
54+
assert store.options == {"header": "false"}
55+
assert store.single is True
56+
assert store.generate_presigned_url is True
57+
58+
def test_for_download_with_format_and_options(self):
59+
store = Store.for_download(
60+
format=StorageFormat.CSV,
61+
options={"header": "false", "delimiter": "|"},
62+
)
63+
assert store.format == StorageFormat.CSV
64+
assert store.options == {"header": "false", "delimiter": "|"}
65+
66+
def test_for_download_empty_options_normalized(self):
67+
store = Store.for_download(options={})
68+
assert store.options is None
69+
70+
71+
class TestStoreSerializationWithOptions:
72+
"""Tests for store dict serialization matching connection.py's format."""
73+
74+
def _serialize_store(self, store: Store) -> dict:
75+
"""Replicate the serialization logic from Connection.__execute_sql."""
76+
store_dict = {
77+
"format": store.format.value,
78+
"single": str(store.single).lower(),
79+
"generate_presigned_url": str(store.generate_presigned_url).lower(),
80+
}
81+
if store.options:
82+
store_dict["options"] = store.options
83+
return store_dict
84+
85+
def test_serialize_without_options(self):
86+
store = Store.for_download(format=StorageFormat.GEOJSON)
87+
d = self._serialize_store(store)
88+
assert d == {
89+
"format": "geojson",
90+
"single": "true",
91+
"generate_presigned_url": "true",
92+
}
93+
assert "options" not in d
94+
95+
def test_serialize_with_options(self):
96+
store = Store.for_download(
97+
format=StorageFormat.CSV,
98+
options={"header": "false", "delimiter": "|"},
99+
)
100+
d = self._serialize_store(store)
101+
assert d == {
102+
"format": "csv",
103+
"single": "true",
104+
"generate_presigned_url": "true",
105+
"options": {"header": "false", "delimiter": "|"},
106+
}
107+
108+
def test_serialize_empty_options_omitted(self):
109+
store = Store(format=StorageFormat.PARQUET, options={})
110+
d = self._serialize_store(store)
111+
assert "options" not in d
112+
113+
def test_json_roundtrip_with_options(self):
114+
store = Store.for_download(
115+
format=StorageFormat.GEOJSON,
116+
options={"ignoreNullFields": "false"},
117+
)
118+
d = self._serialize_store(store)
119+
payload = json.dumps(d)
120+
parsed = json.loads(payload)
121+
assert parsed["options"] == {"ignoreNullFields": "false"}
122+
123+
def test_full_request_shape(self):
124+
"""Verify the full execute_sql request dict shape with store options."""
125+
store = Store.for_download(
126+
format=StorageFormat.CSV,
127+
options={"header": "false"},
128+
)
129+
request = {
130+
"kind": "execute_sql",
131+
"execution_id": "test-id",
132+
"statement": "SELECT 1",
133+
}
134+
store_dict = self._serialize_store(store)
135+
request["store"] = store_dict
136+
137+
assert request == {
138+
"kind": "execute_sql",
139+
"execution_id": "test-id",
140+
"statement": "SELECT 1",
141+
"store": {
142+
"format": "csv",
143+
"single": "true",
144+
"generate_presigned_url": "true",
145+
"options": {"header": "false"},
146+
},
147+
}

wherobots/db/connection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,14 @@ def __execute_sql(
279279
request["enable_progress_events"] = True
280280

281281
if store:
282-
request["store"] = {
282+
store_dict: dict[str, Any] = {
283283
"format": store.format.value,
284284
"single": str(store.single).lower(),
285285
"generate_presigned_url": str(store.generate_presigned_url).lower(),
286286
}
287+
if store.options:
288+
store_dict["options"] = store.options
289+
request["store"] = store_dict
287290

288291
self.__queries[execution_id] = Query(
289292
sql=sql,

wherobots/db/models.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,40 @@ class Store:
3131
single: If True, store as a single file. If False, store as multiple files.
3232
generate_presigned_url: If True, generate a presigned URL for the result.
3333
Requires single=True.
34+
options: Optional dict of format-specific Spark DataFrameWriter options
35+
(e.g. ``{"header": "false", "delimiter": "|"}`` for CSV). These are
36+
applied after the server's default options, so they can override them.
37+
An empty dict is normalized to None.
3438
"""
3539

3640
format: StorageFormat
3741
single: bool = False
3842
generate_presigned_url: bool = False
43+
options: dict[str, str] | None = None
3944

4045
def __post_init__(self) -> None:
4146
if self.generate_presigned_url and not self.single:
4247
raise ValueError("Presigned URL can only be generated when single=True")
48+
# Normalize empty options to None and defensively copy.
49+
if self.options:
50+
self.options = dict(self.options)
51+
else:
52+
self.options = None
4353

4454
@classmethod
45-
def for_download(cls, format: StorageFormat | None = None) -> "Store":
55+
def for_download(
56+
cls,
57+
format: StorageFormat | None = None,
58+
options: dict[str, str] | None = None,
59+
) -> "Store":
4660
"""Create a configuration for downloading results via a presigned URL.
4761
4862
This is a convenience method that creates a configuration with
4963
single file mode and presigned URL generation enabled.
5064
5165
Args:
5266
format: The storage format.
67+
options: Optional format-specific Spark DataFrameWriter options.
5368
5469
Returns:
5570
A Store configured for single-file download with presigned URL.
@@ -58,6 +73,7 @@ def for_download(cls, format: StorageFormat | None = None) -> "Store":
5873
format=format or DEFAULT_STORAGE_FORMAT,
5974
single=True,
6075
generate_presigned_url=True,
76+
options=options,
6177
)
6278

6379

0 commit comments

Comments
 (0)