1+ from io import BufferedReader , BufferedWriter
12import ssl
2- from typing import Optional , Union
3+ from typing import Callable , Optional , Union
34import lz4 .block
45from sqlitecloud .resultset import SQCloudResult
56from sqlitecloud .types import (
67 SQCLOUD_CMD ,
8+ SQCLOUD_DEFAULT ,
79 SQCLOUD_INTERNAL_ERRCODE ,
810 SQCLOUD_ROWSET ,
911 SQCloudConfig ,
1719
1820
1921class Driver :
22+ SQCLOUD_DEFAULT_UPLOAD_SIZE = 512 * 1024
23+
2024 def __init__ (self ) -> None :
2125 # Used while parsing chunked rowset
2226 self ._rowset : SQCloudResult = None
@@ -77,7 +81,7 @@ def disconnect(self, conn: SQCloudConnect):
7781 def execute (self , command : str , connection : SQCloudConnect ) -> SQCloudResult :
7882 return self ._internal_run_command (connection , command )
7983
80- def sendblob (self , blob : bytes , conn : SQCloudConnect ) -> SQCloudResult :
84+ def send_blob (self , blob : bytes , conn : SQCloudConnect ) -> SQCloudResult :
8185 try :
8286 conn .isblob = True
8387 return self ._internal_run_command (conn , blob )
@@ -90,6 +94,113 @@ def _internal_reconnect(self, buffer: bytes) -> bool:
9094 def _internal_setup_pubsub (self , buffer : bytes ) -> bool :
9195 return True
9296
97+ def upload_database (
98+ self ,
99+ connection : SQCloudConnect ,
100+ dbname : str ,
101+ key : Optional [str ],
102+ is_file_transfer : bool ,
103+ snapshot_id : int ,
104+ is_internal_db : bool ,
105+ fd : BufferedReader ,
106+ dbsize : int ,
107+ xCallback : Callable [[BufferedReader , int , int , int ], bytes ],
108+ ) -> None :
109+ keyarg = "KEY " if key else ""
110+ keyvalue = key if key else ""
111+
112+ # prepare command to execute
113+ command = ""
114+ if is_file_transfer :
115+ internalarg = "INTERNAL" if is_internal_db else ""
116+ command = f"TRANSFER DATABASE '{ dbname } ' { keyarg } { keyvalue } SNAPSHOT { snapshot_id } { internalarg } "
117+ else :
118+ command = f"UPLOAD DATABASE '{ dbname } ' { keyarg } { keyvalue } "
119+
120+ # execute command on server side
121+ result = self ._internal_run_command (connection , command )
122+ if not result .data [0 ]:
123+ raise SQCloudException (
124+ "An error occurred while initializing the upload of the database."
125+ )
126+
127+ buffer : bytes = b""
128+ blen = 0
129+ nprogress = 0
130+ try :
131+ while True :
132+ # execute callback to read buffer
133+ blen = SQCLOUD_DEFAULT .UPLOAD_SIZE .value
134+ try :
135+ buffer = xCallback (fd , blen , dbsize , nprogress )
136+ blen = len (buffer )
137+ except Exception as e :
138+ raise SQCloudException (
139+ "An error occurred while reading the file."
140+ ) from e
141+
142+ try :
143+ # send also the final confirmation blob of zero bytes
144+ self .send_blob (buffer , connection )
145+ except Exception as e :
146+ raise SQCloudException (
147+ "An error occurred while uploading the file."
148+ ) from e
149+
150+ # update progress
151+ nprogress += blen
152+
153+ if blen == 0 :
154+ # Upload completed
155+ break
156+ except Exception as e :
157+ self ._internal_run_command (connection , "UPLOAD ABORT" )
158+ raise e
159+
160+ def download_database (
161+ self ,
162+ connection : SQCloudConnect ,
163+ dbname : str ,
164+ fd : BufferedWriter ,
165+ xCallback : Callable [[BufferedWriter , int , int , int ], bytes ],
166+ if_exists : bool ,
167+ ) -> None :
168+ exists_cmd = " IF EXISTS" if if_exists else ""
169+ result = self ._internal_run_command (
170+ connection , f"DOWNLOAD DATABASE { dbname } { exists_cmd } ;"
171+ )
172+
173+ if result .nrows == 0 :
174+ raise SQCloudException (
175+ "An error occurred while initializing the download of the database."
176+ )
177+
178+ # result is an ARRAY (database size, number of pages, raft_index)
179+ download_info = result .data [0 ]
180+ db_size = int (download_info [0 ])
181+
182+ # loop to download
183+ progress_size = 0
184+
185+ try :
186+ while progress_size < db_size :
187+ result = self ._internal_run_command (connection , "DOWNLOAD STEP" )
188+
189+ # res is BLOB, decode it
190+ data = result .data [0 ]
191+ data_len = len (data )
192+
193+ # execute callback (with progress_size updated)
194+ progress_size += data_len
195+ xCallback (fd , data , data_len , db_size , progress_size )
196+
197+ # check exit condition
198+ if data_len == 0 :
199+ break
200+ except Exception as e :
201+ self ._internal_run_command (connection , "DOWNLOAD ABORT" )
202+ raise e
203+
93204 def _internal_config_apply (
94205 self , connection : SQCloudConnect , config : SQCloudConfig
95206 ) -> None :
@@ -136,7 +247,7 @@ def _internal_config_apply(
136247
137248 def _internal_run_command (
138249 self , connection : SQCloudConnect , command : Union [str , bytes ]
139- ) -> None :
250+ ) -> SQCloudResult :
140251 self ._internal_socket_write (connection , command )
141252 return self ._internal_socket_read (connection )
142253
0 commit comments