@@ -114,60 +114,57 @@ def __listen(self):
114114 )
115115 return
116116
117- match kind :
118- case EventKind .STATE_UPDATED :
119- try :
120- query .state = ExecutionState [message ["state" ].upper ()]
121- logging .info ("Query %s is now %s." , execution_id , query .state )
122- except KeyError :
123- logging .warning ("Invalid state update message for %s" , execution_id )
124- return
125-
126- # Incoming state transitions are handled here.
127- match query .state :
128- case ExecutionState .SUCCEEDED :
129- self .__request_results (execution_id )
130- case ExecutionState .FAILED :
131- query .handler (OperationalError ("Query execution failed" ))
132-
133- case EventKind .EXECUTION_RESULT :
134- results = message .get ("results" )
135- if not results or not isinstance (results , dict ):
136- logging .warning ("Got no results back from %s." , execution_id )
137- return
138-
139- result_bytes = results .get ("result_bytes" )
140- result_format = results .get ("format" )
141- result_compression = results .get ("compression" )
142- logging .info (
143- "Received %d bytes of %s-compressed %s results from %s." ,
144- len (result_bytes ),
145- result_compression ,
146- result_format ,
147- execution_id ,
148- )
117+ if kind == EventKind .STATE_UPDATED :
118+ try :
119+ query .state = ExecutionState [message ["state" ].upper ()]
120+ logging .info ("Query %s is now %s." , execution_id , query .state )
121+ except KeyError :
122+ logging .warning ("Invalid state update message for %s" , execution_id )
123+ return
124+
125+ # Incoming state transitions are handled here.
126+ if query .state == ExecutionState .SUCCEEDED :
127+ self .__request_results (execution_id )
128+ elif query .state == ExecutionState .FAILED :
129+ query .handler (OperationalError ("Query execution failed" ))
130+
131+ elif kind == EventKind .EXECUTION_RESULT :
132+ results = message .get ("results" )
133+ if not results or not isinstance (results , dict ):
134+ logging .warning ("Got no results back from %s." , execution_id )
135+ return
136+
137+ result_bytes = results .get ("result_bytes" )
138+ result_format = results .get ("format" )
139+ result_compression = results .get ("compression" )
140+ logging .info (
141+ "Received %d bytes of %s-compressed %s results from %s." ,
142+ len (result_bytes ),
143+ result_compression ,
144+ result_format ,
145+ execution_id ,
146+ )
149147
150- query .state = ExecutionState .COMPLETED
151- match result_format :
152- case ResultsFormat .JSON :
153- query .handler (json .loads (result_bytes .decode ("utf-8" )))
154- case ResultsFormat .ARROW :
155- buffer = pyarrow .py_buffer (result_bytes )
156- stream = pyarrow .input_stream (buffer , result_compression )
157- with pyarrow .ipc .open_stream (stream ) as reader :
158- query .handler (reader .read_pandas ())
159- case _:
160- query .handler (
161- OperationalError (
162- f"Unsupported results format { result_format } "
163- )
164- )
165- case EventKind .ERROR :
166- query .state = ExecutionState .FAILED
167- error = message .get ("message" )
168- query .handler (OperationalError (error ))
169- case _:
170- logging .warning ("Received unknown %s event!" , kind )
148+ query .state = ExecutionState .COMPLETED
149+ if result_format == ResultsFormat .JSON :
150+ query .handler (json .loads (result_bytes .decode ("utf-8" )))
151+ elif result_format == ResultsFormat .ARROW :
152+ buffer = pyarrow .py_buffer (result_bytes )
153+ stream = pyarrow .input_stream (buffer , result_compression )
154+ with pyarrow .ipc .open_stream (stream ) as reader :
155+ query .handler (reader .read_pandas ())
156+ else :
157+ query .handler (
158+ OperationalError (
159+ f"Unsupported results format { result_format } "
160+ )
161+ )
162+ elif kind == EventKind .ERROR :
163+ query .state = ExecutionState .FAILED
164+ error = message .get ("message" )
165+ query .handler (OperationalError (error ))
166+ else :
167+ logging .warning ("Received unknown %s event!" , kind )
171168
172169 def __send (self , message : dict [str , Any ]) -> None :
173170 self .__ws .send (json .dumps (message ))
0 commit comments