Skip to content

Commit ddbf94e

Browse files
authored
Merge pull request #59 from james-willis/fix/empty-store-results-hang
fix: prevent hang when store-configured query returns zero rows
2 parents cf7a177 + a749080 commit ddbf94e

File tree

4 files changed

+243
-3
lines changed

4 files changed

+243
-3
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "wherobots-python-dbapi"
3-
version = "0.25.0"
3+
version = "0.25.1"
44
description = "Python DB-API driver for Wherobots DB"
55
authors = [{ name = "Maxime Petazzoni", email = "max@wherobots.com" }]
66
requires-python = ">=3.10, <4"

tests/test_empty_store_results.py

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
"""Tests for the empty store results fix.
2+
3+
These tests verify that:
4+
1. A store-configured query with empty results (result_uri=null) completes
5+
promptly instead of hanging.
6+
2. An execution_result event with results=null unblocks the cursor instead
7+
of hanging forever.
8+
"""
9+
10+
import json
11+
import queue
12+
from unittest.mock import MagicMock, patch
13+
14+
from wherobots.db.connection import Connection, Query
15+
from wherobots.db.models import ExecutionResult, Store
16+
from wherobots.db.types import ExecutionState, StorageFormat
17+
18+
19+
class TestEmptyStoreResults:
20+
"""Tests for the primary fix: store configured, result_uri is null."""
21+
22+
def _make_connection_and_cursor(self):
23+
"""Create a Connection with a mocked WebSocket and return (connection, cursor)."""
24+
mock_ws = MagicMock()
25+
# Prevent the background thread from running the main loop
26+
mock_ws.protocol.state = 4 # CLOSED state, so __main_loop exits immediately
27+
conn = Connection(mock_ws)
28+
cursor = conn.cursor()
29+
return conn, cursor
30+
31+
def test_store_configured_empty_results_completes(self):
32+
"""When a store-configured query succeeds with result_uri=null,
33+
the cursor should receive an empty ExecutionResult (not hang)."""
34+
result_queue = queue.Queue()
35+
36+
def handler(result):
37+
result_queue.put(result)
38+
39+
store = Store.for_download(format=StorageFormat.GEOJSON)
40+
query = Query(
41+
sql="SELECT * FROM t WHERE 1=0",
42+
execution_id="exec-1",
43+
state=ExecutionState.RUNNING,
44+
handler=handler,
45+
store=store,
46+
)
47+
48+
conn, _ = self._make_connection_and_cursor()
49+
conn._Connection__queries["exec-1"] = query
50+
51+
# Simulate receiving a state_updated message with result_uri=null
52+
message = {
53+
"kind": "state_updated",
54+
"execution_id": "exec-1",
55+
"state": "succeeded",
56+
"result_uri": None,
57+
"size": None,
58+
}
59+
conn._Connection__ws.recv.return_value = json.dumps(message)
60+
conn._Connection__listen()
61+
62+
# The handler should have been called
63+
result = result_queue.get(timeout=1)
64+
assert isinstance(result, ExecutionResult)
65+
assert result.results is None
66+
assert result.error is None
67+
assert result.store_result is None
68+
assert query.state == ExecutionState.COMPLETED
69+
70+
def test_store_configured_with_result_uri_produces_store_result(self):
71+
"""When a store-configured query succeeds with result_uri set,
72+
the cursor should receive an ExecutionResult with store_result."""
73+
result_queue = queue.Queue()
74+
75+
def handler(result):
76+
result_queue.put(result)
77+
78+
store = Store.for_download(format=StorageFormat.GEOJSON)
79+
query = Query(
80+
sql="SELECT * FROM t",
81+
execution_id="exec-2",
82+
state=ExecutionState.RUNNING,
83+
handler=handler,
84+
store=store,
85+
)
86+
87+
conn, _ = self._make_connection_and_cursor()
88+
conn._Connection__queries["exec-2"] = query
89+
90+
message = {
91+
"kind": "state_updated",
92+
"execution_id": "exec-2",
93+
"state": "succeeded",
94+
"result_uri": "https://presigned-url.example.com/results.geojson",
95+
"size": 12345,
96+
}
97+
conn._Connection__ws.recv.return_value = json.dumps(message)
98+
conn._Connection__listen()
99+
100+
result = result_queue.get(timeout=1)
101+
assert isinstance(result, ExecutionResult)
102+
assert result.results is None
103+
assert result.error is None
104+
assert result.store_result is not None
105+
assert (
106+
result.store_result.result_uri
107+
== "https://presigned-url.example.com/results.geojson"
108+
)
109+
assert result.store_result.size == 12345
110+
assert query.state == ExecutionState.COMPLETED
111+
112+
def test_no_store_configured_calls_request_results(self):
113+
"""When no store is configured and result_uri is null,
114+
__request_results should be called (existing behavior)."""
115+
result_queue = queue.Queue()
116+
117+
def handler(result):
118+
result_queue.put(result)
119+
120+
query = Query(
121+
sql="SELECT 1",
122+
execution_id="exec-3",
123+
state=ExecutionState.RUNNING,
124+
handler=handler,
125+
store=None, # No store configured
126+
)
127+
128+
conn, _ = self._make_connection_and_cursor()
129+
conn._Connection__queries["exec-3"] = query
130+
131+
message = {
132+
"kind": "state_updated",
133+
"execution_id": "exec-3",
134+
"state": "succeeded",
135+
"result_uri": None,
136+
"size": None,
137+
}
138+
conn._Connection__ws.recv.return_value = json.dumps(message)
139+
140+
with patch.object(conn, "_Connection__request_results") as mock_request:
141+
conn._Connection__listen()
142+
mock_request.assert_called_once_with("exec-3")
143+
144+
# Handler should NOT have been called (waiting for execution_result)
145+
assert result_queue.empty()
146+
147+
148+
class TestDefensiveNullResults:
149+
"""Tests for the defensive fix: execution_result with results=null."""
150+
151+
def _make_connection_and_cursor(self):
152+
mock_ws = MagicMock()
153+
mock_ws.protocol.state = 4
154+
conn = Connection(mock_ws)
155+
cursor = conn.cursor()
156+
return conn, cursor
157+
158+
def test_null_results_in_execution_result_unblocks_cursor(self):
159+
"""When execution_result arrives with results=null,
160+
the cursor should be unblocked with an empty ExecutionResult."""
161+
result_queue = queue.Queue()
162+
163+
def handler(result):
164+
result_queue.put(result)
165+
166+
query = Query(
167+
sql="SELECT 1",
168+
execution_id="exec-4",
169+
state=ExecutionState.RESULTS_REQUESTED,
170+
handler=handler,
171+
store=None,
172+
)
173+
174+
conn, _ = self._make_connection_and_cursor()
175+
conn._Connection__queries["exec-4"] = query
176+
177+
# Simulate execution_result with results=null
178+
# (this is what the server sends for store-only executions
179+
# when retrieve_results is erroneously called)
180+
message = {
181+
"kind": "execution_result",
182+
"execution_id": "exec-4",
183+
"state": "succeeded",
184+
"results": None,
185+
}
186+
conn._Connection__ws.recv.return_value = json.dumps(message)
187+
conn._Connection__listen()
188+
189+
result = result_queue.get(timeout=1)
190+
assert isinstance(result, ExecutionResult)
191+
assert result.results is None
192+
assert result.error is None
193+
assert result.store_result is None
194+
assert query.state == ExecutionState.COMPLETED
195+
196+
def test_empty_dict_results_in_execution_result_unblocks_cursor(self):
197+
"""When execution_result arrives with results={} (empty dict),
198+
the cursor should be unblocked with an empty ExecutionResult."""
199+
result_queue = queue.Queue()
200+
201+
def handler(result):
202+
result_queue.put(result)
203+
204+
query = Query(
205+
sql="SELECT 1",
206+
execution_id="exec-5",
207+
state=ExecutionState.RESULTS_REQUESTED,
208+
handler=handler,
209+
store=None,
210+
)
211+
212+
conn, _ = self._make_connection_and_cursor()
213+
conn._Connection__queries["exec-5"] = query
214+
215+
message = {
216+
"kind": "execution_result",
217+
"execution_id": "exec-5",
218+
"state": "succeeded",
219+
"results": {},
220+
}
221+
conn._Connection__ws.recv.return_value = json.dumps(message)
222+
conn._Connection__listen()
223+
224+
result = result_queue.get(timeout=1)
225+
assert isinstance(result, ExecutionResult)
226+
assert result.results is None
227+
assert result.error is None
228+
assert query.state == ExecutionState.COMPLETED

uv.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

wherobots/db/connection.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,16 @@ def __listen(self) -> None:
192192
query.handler(ExecutionResult(store_result=store_result))
193193
return
194194

195+
if query.store is not None:
196+
# Store was configured but produced no results (empty result set)
197+
logging.info(
198+
"Query %s completed with store configured but no results to store.",
199+
execution_id,
200+
)
201+
query.state = ExecutionState.COMPLETED
202+
query.handler(ExecutionResult())
203+
return
204+
195205
# No store configured, request results normally
196206
self.__request_results(execution_id)
197207
return
@@ -200,6 +210,8 @@ def __listen(self) -> None:
200210
results = message.get("results")
201211
if not results or not isinstance(results, dict):
202212
logging.warning("Got no results back from %s.", execution_id)
213+
query.state = ExecutionState.COMPLETED
214+
query.handler(ExecutionResult())
203215
return
204216

205217
query.state = ExecutionState.COMPLETED

0 commit comments

Comments
 (0)