Skip to content

Commit bdc7126

Browse files
[PECO-1261] Improve CloudFetch downloading queue (#209)
[PECO-1261] Wait only for the currently needed link, download the rest in background Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 47d7151 commit bdc7126

3 files changed

Lines changed: 34 additions & 24 deletions

File tree

lib/result/CloudFetchResultHandler.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
1111

1212
private pendingLinks: Array<TSparkArrowResultLink> = [];
1313

14-
private downloadedBatches: Array<Buffer> = [];
14+
private downloadTasks: Array<Promise<Buffer>> = [];
1515

1616
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>) {
1717
this.context = context;
1818
this.source = source;
1919
}
2020

2121
public async hasMore() {
22-
if (this.pendingLinks.length > 0 || this.downloadedBatches.length > 0) {
22+
if (this.pendingLinks.length > 0 || this.downloadTasks.length > 0) {
2323
return true;
2424
}
2525
return this.source.hasMore();
@@ -32,15 +32,17 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
3232
this.pendingLinks.push(link);
3333
});
3434

35-
if (this.downloadedBatches.length === 0) {
36-
const clientConfig = this.context.getConfig();
37-
const links = this.pendingLinks.splice(0, clientConfig.cloudFetchConcurrentDownloads);
35+
const clientConfig = this.context.getConfig();
36+
const freeTaskSlotsCount = clientConfig.cloudFetchConcurrentDownloads - this.downloadTasks.length;
37+
38+
if (freeTaskSlotsCount > 0) {
39+
const links = this.pendingLinks.splice(0, freeTaskSlotsCount);
3840
const tasks = links.map((link) => this.downloadLink(link));
39-
const batches = await Promise.all(tasks);
40-
this.downloadedBatches.push(...batches);
41+
this.downloadTasks.push(...tasks);
4142
}
4243

43-
return this.downloadedBatches.splice(0, 1);
44+
const batch = await this.downloadTasks.shift();
45+
return batch ? [batch] : [];
4446
}
4547

4648
private async downloadLink(link: TSparkArrowResultLink): Promise<Buffer> {

tests/e2e/cloudfetch.test.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ describe('CloudFetch', () => {
6464
// result handler should download 5 of them and schedule the rest
6565
expect(await cfResultHandler.hasMore()).to.be.false;
6666
expect(cfResultHandler.pendingLinks.length).to.be.equal(0);
67-
expect(cfResultHandler.downloadedBatches.length).to.be.equal(0);
67+
expect(cfResultHandler.downloadTasks.length).to.be.equal(0);
6868

6969
sinon.spy(operation._data, 'fetchNext');
7070

@@ -76,7 +76,7 @@ describe('CloudFetch', () => {
7676
expect(await cfResultHandler.hasMore()).to.be.true;
7777
// expected batches minus first 5 already fetched
7878
expect(cfResultHandler.pendingLinks.length).to.be.equal(resultLinksCount - cloudFetchConcurrentDownloads);
79-
expect(cfResultHandler.downloadedBatches.length).to.be.equal(cloudFetchConcurrentDownloads - 1);
79+
expect(cfResultHandler.downloadTasks.length).to.be.equal(cloudFetchConcurrentDownloads - 1);
8080

8181
let fetchedRowCount = chunk.length;
8282
while (await operation.hasMoreRows()) {

tests/unit/result/CloudFetchResultHandler.test.js

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,19 @@ describe('CloudFetchResultHandler', () => {
8989

9090
case1: {
9191
result.pendingLinks = [];
92-
result.downloadedBatches = [];
92+
result.downloadTasks = [];
9393
expect(await result.hasMore()).to.be.false;
9494
}
9595

9696
case2: {
9797
result.pendingLinks = [{}]; // just anything here
98-
result.downloadedBatches = [];
98+
result.downloadTasks = [];
9999
expect(await result.hasMore()).to.be.true;
100100
}
101101

102102
case3: {
103103
result.pendingLinks = [];
104-
result.downloadedBatches = [{}]; // just anything here
104+
result.downloadTasks = [{}]; // just anything here
105105
expect(await result.hasMore()).to.be.true;
106106
}
107107
});
@@ -134,19 +134,19 @@ describe('CloudFetchResultHandler', () => {
134134
} while (await rowSetProvider.hasMore());
135135

136136
expect(result.pendingLinks.length).to.be.equal(expectedLinksCount);
137-
expect(result.downloadedBatches.length).to.be.equal(0);
137+
expect(result.downloadTasks.length).to.be.equal(0);
138138
expect(result.fetch.called).to.be.false;
139139
});
140140

141141
it('should download batches according to settings', async () => {
142142
const clientConfig = DBSQLClient.getDefaultConfig();
143-
clientConfig.cloudFetchConcurrentDownloads = 2;
143+
clientConfig.cloudFetchConcurrentDownloads = 3;
144144

145145
const rowSet = {
146146
startRowOffset: 0,
147147
resultLinks: [...sampleRowSet1.resultLinks, ...sampleRowSet2.resultLinks],
148148
};
149-
const expectedLinksCount = rowSet.resultLinks.length;
149+
const expectedLinksCount = rowSet.resultLinks.length; // 5
150150
const rowSetProvider = new ResultsProviderMock([rowSet]);
151151
const context = {
152152
getConfig: () => clientConfig,
@@ -166,24 +166,28 @@ describe('CloudFetchResultHandler', () => {
166166
expect(await rowSetProvider.hasMore()).to.be.true;
167167

168168
initialFetch: {
169+
// `cloudFetchConcurrentDownloads` out of `expectedLinksCount` links should be scheduled immediately
170+
// first one should be `await`-ed and returned from `fetchNext`
169171
const items = await result.fetchNext({ limit: 10000 });
170172
expect(items.length).to.be.gt(0);
171173
expect(await rowSetProvider.hasMore()).to.be.false;
172174

173175
expect(result.fetch.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads);
174176
expect(result.pendingLinks.length).to.be.equal(expectedLinksCount - clientConfig.cloudFetchConcurrentDownloads);
175-
expect(result.downloadedBatches.length).to.be.equal(clientConfig.cloudFetchConcurrentDownloads - 1);
177+
expect(result.downloadTasks.length).to.be.equal(clientConfig.cloudFetchConcurrentDownloads - 1);
176178
}
177179

178180
secondFetch: {
179-
// It should return previously fetched batch, not performing additional network requests
181+
// It should return previously fetched batch, and schedule one more
180182
const items = await result.fetchNext({ limit: 10000 });
181183
expect(items.length).to.be.gt(0);
182184
expect(await rowSetProvider.hasMore()).to.be.false;
183185

184-
expect(result.fetch.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads); // no new fetches
185-
expect(result.pendingLinks.length).to.be.equal(expectedLinksCount - clientConfig.cloudFetchConcurrentDownloads);
186-
expect(result.downloadedBatches.length).to.be.equal(clientConfig.cloudFetchConcurrentDownloads - 2);
186+
expect(result.fetch.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads + 1);
187+
expect(result.pendingLinks.length).to.be.equal(
188+
expectedLinksCount - clientConfig.cloudFetchConcurrentDownloads - 1,
189+
);
190+
expect(result.downloadTasks.length).to.be.equal(clientConfig.cloudFetchConcurrentDownloads - 1);
187191
}
188192

189193
thirdFetch: {
@@ -192,11 +196,11 @@ describe('CloudFetchResultHandler', () => {
192196
expect(items.length).to.be.gt(0);
193197
expect(await rowSetProvider.hasMore()).to.be.false;
194198

195-
expect(result.fetch.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads * 2);
199+
expect(result.fetch.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads + 2);
196200
expect(result.pendingLinks.length).to.be.equal(
197-
expectedLinksCount - clientConfig.cloudFetchConcurrentDownloads * 2,
201+
expectedLinksCount - clientConfig.cloudFetchConcurrentDownloads - 2,
198202
);
199-
expect(result.downloadedBatches.length).to.be.equal(clientConfig.cloudFetchConcurrentDownloads - 1);
203+
expect(result.downloadTasks.length).to.be.equal(clientConfig.cloudFetchConcurrentDownloads - 1);
200204
}
201205
});
202206

@@ -251,6 +255,10 @@ describe('CloudFetchResultHandler', () => {
251255
}),
252256
);
253257

258+
// There are two link in the batch - first one is valid and second one is expired
259+
// The first fetch has to be successful, and the second one should fail
260+
await result.fetchNext({ limit: 10000 });
261+
254262
try {
255263
await result.fetchNext({ limit: 10000 });
256264
expect.fail('It should throw an error');

0 commit comments

Comments
 (0)