Skip to content

Commit 9cb7dc6

Browse files
committed
chore: small refactor to get ready for binary messages
1 parent 2d246d8 commit 9cb7dc6

1 file changed

Lines changed: 21 additions & 13 deletions

File tree

wherobots/db/connection.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ def __listen(self):
6868
The code in this method is purposefully defensive to avoid unexpected situations killing the thread.
6969
"""
7070
while True:
71-
message = json.loads(self.__ws.recv())
72-
logging.debug("Received message: %s", message)
71+
message = self.__recv()
7372

7473
execution_id = message.get("execution_id")
7574
if not execution_id:
@@ -103,30 +102,39 @@ def __listen(self):
103102

104103
case EventKind.EXECUTION_RESULT:
105104
results_format = message.get("results_format")
106-
107-
# TODO: Support other results formats.
108-
if results_format != "json":
109-
query.handler(
110-
OperationalError(
111-
f"Unsupported results format {results_format}"
112-
)
113-
)
114-
continue
115-
116105
logging.info(
117106
"Received %s results from %s.",
118107
results_format,
119108
execution_id,
120109
)
110+
121111
query.state = ExecutionState.COMPLETED
122-
query.handler([json.loads(message.get("results"))])
112+
match results_format:
113+
case "json":
114+
query.handler(json.loads(message.get("results")))
115+
case "arrow":
116+
pass
117+
case _:
118+
query.handler(
119+
OperationalError(
120+
f"Unsupported results format {results_format}"
121+
)
122+
)
123123
case _:
124124
logging.warning("Received unknown %s event!", kind)
125125

126126
def __send(self, message: dict[str, Any]) -> None:
127127
logging.debug("Sending %s", message)
128128
self.__ws.send(json.dumps(message))
129129

130+
def __recv(self) -> dict[str, Any]:
131+
frame = self.__ws.recv()
132+
if isinstance(frame, bytes):
133+
frame = frame.decode("utf-8")
134+
message = json.loads(frame)
135+
logging.debug("Received message: %s", message)
136+
return message
137+
130138
def __execute_sql(
131139
self, sql: str, handler: Callable[[list[Any] | DatabaseError], None]
132140
) -> str:

0 commit comments

Comments
 (0)