Skip to content

Commit 0d9eeba

Browse files
committed
use if condition as < 3.10 doesn't support
1 parent c5deebb commit 0d9eeba

1 file changed

Lines changed: 50 additions & 53 deletions

File tree

wherobots/db/connection.py

Lines changed: 50 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -114,60 +114,57 @@ def __listen(self):
114114
)
115115
return
116116

117-
match kind:
118-
case EventKind.STATE_UPDATED:
119-
try:
120-
query.state = ExecutionState[message["state"].upper()]
121-
logging.info("Query %s is now %s.", execution_id, query.state)
122-
except KeyError:
123-
logging.warning("Invalid state update message for %s", execution_id)
124-
return
125-
126-
# Incoming state transitions are handled here.
127-
match query.state:
128-
case ExecutionState.SUCCEEDED:
129-
self.__request_results(execution_id)
130-
case ExecutionState.FAILED:
131-
query.handler(OperationalError("Query execution failed"))
132-
133-
case EventKind.EXECUTION_RESULT:
134-
results = message.get("results")
135-
if not results or not isinstance(results, dict):
136-
logging.warning("Got no results back from %s.", execution_id)
137-
return
138-
139-
result_bytes = results.get("result_bytes")
140-
result_format = results.get("format")
141-
result_compression = results.get("compression")
142-
logging.info(
143-
"Received %d bytes of %s-compressed %s results from %s.",
144-
len(result_bytes),
145-
result_compression,
146-
result_format,
147-
execution_id,
148-
)
117+
if kind == EventKind.STATE_UPDATED:
118+
try:
119+
query.state = ExecutionState[message["state"].upper()]
120+
logging.info("Query %s is now %s.", execution_id, query.state)
121+
except KeyError:
122+
logging.warning("Invalid state update message for %s", execution_id)
123+
return
124+
125+
# Incoming state transitions are handled here.
126+
if query.state == ExecutionState.SUCCEEDED:
127+
self.__request_results(execution_id)
128+
elif query.state == ExecutionState.FAILED:
129+
query.handler(OperationalError("Query execution failed"))
130+
131+
elif kind == EventKind.EXECUTION_RESULT:
132+
results = message.get("results")
133+
if not results or not isinstance(results, dict):
134+
logging.warning("Got no results back from %s.", execution_id)
135+
return
136+
137+
result_bytes = results.get("result_bytes")
138+
result_format = results.get("format")
139+
result_compression = results.get("compression")
140+
logging.info(
141+
"Received %d bytes of %s-compressed %s results from %s.",
142+
len(result_bytes),
143+
result_compression,
144+
result_format,
145+
execution_id,
146+
)
149147

150-
query.state = ExecutionState.COMPLETED
151-
match result_format:
152-
case ResultsFormat.JSON:
153-
query.handler(json.loads(result_bytes.decode("utf-8")))
154-
case ResultsFormat.ARROW:
155-
buffer = pyarrow.py_buffer(result_bytes)
156-
stream = pyarrow.input_stream(buffer, result_compression)
157-
with pyarrow.ipc.open_stream(stream) as reader:
158-
query.handler(reader.read_pandas())
159-
case _:
160-
query.handler(
161-
OperationalError(
162-
f"Unsupported results format {result_format}"
163-
)
164-
)
165-
case EventKind.ERROR:
166-
query.state = ExecutionState.FAILED
167-
error = message.get("message")
168-
query.handler(OperationalError(error))
169-
case _:
170-
logging.warning("Received unknown %s event!", kind)
148+
query.state = ExecutionState.COMPLETED
149+
if result_format == ResultsFormat.JSON:
150+
query.handler(json.loads(result_bytes.decode("utf-8")))
151+
elif result_format == ResultsFormat.ARROW:
152+
buffer = pyarrow.py_buffer(result_bytes)
153+
stream = pyarrow.input_stream(buffer, result_compression)
154+
with pyarrow.ipc.open_stream(stream) as reader:
155+
query.handler(reader.read_pandas())
156+
else:
157+
query.handler(
158+
OperationalError(
159+
f"Unsupported results format {result_format}"
160+
)
161+
)
162+
elif kind == EventKind.ERROR:
163+
query.state = ExecutionState.FAILED
164+
error = message.get("message")
165+
query.handler(OperationalError(error))
166+
else:
167+
logging.warning("Received unknown %s event!", kind)
171168

172169
def __send(self, message: dict[str, Any]) -> None:
173170
self.__ws.send(json.dumps(message))

0 commit comments

Comments
 (0)