|
| 1 | +import * as fs from 'fs'; |
| 2 | +import * as path from 'path'; |
1 | 3 | import { stringify, NIL, parse } from 'uuid'; |
| 4 | +import fetch, { HeadersInit } from 'node-fetch'; |
2 | 5 | import { |
3 | 6 | TSessionHandle, |
4 | 7 | TStatus, |
@@ -30,6 +33,7 @@ import CloseableCollection from './utils/CloseableCollection'; |
30 | 33 | import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger'; |
31 | 34 | import HiveDriverError from './errors/HiveDriverError'; |
32 | 35 | import globalConfig from './globalConfig'; |
| 36 | +import StagingError from './errors/StagingError'; |
33 | 37 | import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter'; |
34 | 38 |
|
35 | 39 | const defaultMaxRows = 100000; |
@@ -163,7 +167,110 @@ export default class DBSQLSession implements IDBSQLSession { |
163 | 167 | parameters: getQueryParameters(options.namedParameters), |
164 | 168 | }); |
165 | 169 | const response = await this.handleResponse(operationPromise); |
166 | | - return this.createOperation(response); |
| 170 | + const operation = this.createOperation(response); |
| 171 | + |
| 172 | + // If `stagingAllowedLocalPath` is provided - assume that operation possibly may be a staging operation. |
| 173 | + // To know for sure, fetch metadata and check a `isStagingOperation` flag. If it happens that it wasn't |
| 174 | + // a staging operation - not a big deal, we just fetched metadata earlier, but operation is still usable |
| 175 | + // and user can get data from it. |
| 176 | + // If `stagingAllowedLocalPath` is not provided - don't do anything to the operation. In a case of regular |
| 177 | + // operation, everything will work as usual. In a case of staging operation, it will be processed like any |
| 178 | + // other query - it will be possible to get data from it as usual, or use other operation methods. |
| 179 | + if (options.stagingAllowedLocalPath !== undefined) { |
| 180 | + const metadata = await operation.getMetadata(); |
| 181 | + if (metadata.isStagingOperation) { |
| 182 | + const allowedLocalPath = Array.isArray(options.stagingAllowedLocalPath) |
| 183 | + ? options.stagingAllowedLocalPath |
| 184 | + : [options.stagingAllowedLocalPath]; |
| 185 | + return this.handleStagingOperation(operation, allowedLocalPath); |
| 186 | + } |
| 187 | + } |
| 188 | + return operation; |
| 189 | + } |
| 190 | + |
| 191 | + private async handleStagingOperation(operation: IOperation, allowedLocalPath: Array<string>): Promise<IOperation> { |
| 192 | + type StagingResponse = { |
| 193 | + presignedUrl: string; |
| 194 | + localFile?: string; |
| 195 | + headers: HeadersInit; |
| 196 | + operation: string; |
| 197 | + }; |
| 198 | + const rows = await operation.fetchAll(); |
| 199 | + if (rows.length !== 1) { |
| 200 | + throw new StagingError('Staging operation: expected only one row in result'); |
| 201 | + } |
| 202 | + const row = rows[0] as StagingResponse; |
| 203 | + |
| 204 | + // For REMOVE operation local file is not available, so no need to validate it |
| 205 | + if (row.localFile !== undefined) { |
| 206 | + let allowOperation = false; |
| 207 | + |
| 208 | + for (const filepath of allowedLocalPath) { |
| 209 | + const relativePath = path.relative(filepath, row.localFile); |
| 210 | + |
| 211 | + if (!relativePath.startsWith('..') && !path.isAbsolute(relativePath)) { |
| 212 | + allowOperation = true; |
| 213 | + } |
| 214 | + } |
| 215 | + |
| 216 | + if (!allowOperation) { |
| 217 | + throw new StagingError('Staging path not a subset of allowed local paths.'); |
| 218 | + } |
| 219 | + } |
| 220 | + |
| 221 | + const { localFile, presignedUrl, headers } = row; |
| 222 | + |
| 223 | + switch (row.operation) { |
| 224 | + case 'GET': |
| 225 | + await this.handleStagingGet(localFile, presignedUrl, headers); |
| 226 | + return operation; |
| 227 | + case 'PUT': |
| 228 | + await this.handleStagingPut(localFile, presignedUrl, headers); |
| 229 | + return operation; |
| 230 | + case 'REMOVE': |
| 231 | + await this.handleStagingRemove(presignedUrl, headers); |
| 232 | + return operation; |
| 233 | + default: |
| 234 | + throw new StagingError(`Staging query operation is not supported: ${row.operation}`); |
| 235 | + } |
| 236 | + } |
| 237 | + |
| 238 | + private async handleStagingGet( |
| 239 | + localFile: string | undefined, |
| 240 | + presignedUrl: string, |
| 241 | + headers: HeadersInit, |
| 242 | + ): Promise<void> { |
| 243 | + if (localFile === undefined) { |
| 244 | + throw new StagingError('Local file path not provided'); |
| 245 | + } |
| 246 | + const response = await fetch(presignedUrl, { method: 'GET', headers }); |
| 247 | + if (!response.ok) { |
| 248 | + throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); |
| 249 | + } |
| 250 | + const buffer = await response.arrayBuffer(); |
| 251 | + fs.writeFileSync(localFile, Buffer.from(buffer)); |
| 252 | + } |
| 253 | + |
| 254 | + private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise<void> { |
| 255 | + const response = await fetch(presignedUrl, { method: 'DELETE', headers }); |
| 256 | + if (!response.ok) { |
| 257 | + throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); |
| 258 | + } |
| 259 | + } |
| 260 | + |
| 261 | + private async handleStagingPut( |
| 262 | + localFile: string | undefined, |
| 263 | + presignedUrl: string, |
| 264 | + headers: HeadersInit, |
| 265 | + ): Promise<void> { |
| 266 | + if (localFile === undefined) { |
| 267 | + throw new StagingError('Local file path not provided'); |
| 268 | + } |
| 269 | + const data = fs.readFileSync(localFile); |
| 270 | + const response = await fetch(presignedUrl, { method: 'PUT', headers, body: data }); |
| 271 | + if (!response.ok) { |
| 272 | + throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); |
| 273 | + } |
167 | 274 | } |
168 | 275 |
|
169 | 276 | /** |
@@ -362,7 +469,7 @@ export default class DBSQLSession implements IDBSQLSession { |
362 | 469 | return new Status(response.status); |
363 | 470 | } |
364 | 471 |
|
365 | | - private createOperation(response: OperationResponseShape): IOperation { |
| 472 | + private createOperation(response: OperationResponseShape): DBSQLOperation { |
366 | 473 | Status.assert(response.status); |
367 | 474 | const handle = definedOrError(response.operationHandle); |
368 | 475 | const operation = new DBSQLOperation( |
|
0 commit comments