Skip to content
This repository was archived by the owner on May 20, 2024. It is now read-only.

Commit 3c82c8b

Browse files
committed
feat: behave according to the result of Readable.push()
1 parent f5b6544 commit 3c82c8b

File tree

2 files changed

+24
-15
lines changed

2 files changed

+24
-15
lines changed

index.js

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,33 @@ class AWSApiReadStream extends Readable {
99
}
1010

1111
async _read(size) {
12-
try {
13-
const res = await this._fn(this._nextToken)
14-
if (res) {
12+
let shouldContinue = false
13+
do {
14+
try {
15+
const res = await this._fn(this._nextToken)
16+
17+
if (!res) {
18+
this.push(null)
19+
return
20+
}
21+
1522
if (this._isInBufferMode()) {
1623
this._buffer.push(res)
1724
}
1825

19-
this.push(res)
26+
shouldContinue = this.push(res)
27+
this._nextToken = res.NextToken || res.NextContinuationToken
2028

21-
if (res.NextToken !== undefined) {
22-
this._nextToken = res.NextToken
29+
if (!this._nextToken) {
30+
this.push(null)
2331
return
2432
}
2533

26-
if (res.NextContinuationToken !== undefined) {
27-
this._nextToken = res.NextContinuationToken
28-
return
29-
}
34+
} catch (e) {
35+
this.destroy(e)
36+
return
3037
}
31-
32-
this.push(null)
33-
} catch (e) {
34-
this.destroy(e)
35-
}
38+
} while (shouldContinue)
3639
}
3740

3841
stop() {
@@ -45,6 +48,8 @@ class AWSApiReadStream extends Readable {
4548
}
4649

4750
// can probably come up with a better name...
51+
// also, not sure if I should also use this.push in _read
52+
// while in this mode...
4853
readAll() {
4954
this._buffer = []
5055
return new Promise((res, rej) => {

test.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ test('AWSApiReadStream - returning null or undefined will stop execution', async
4646
t.deepEqual(results, [0, 1])
4747
})
4848

49+
test.skip('AWSApiReadStream - backpressure', async t => {
50+
51+
})
52+
4953

5054
class TestAPI {
5155
constructor() {

0 commit comments

Comments
 (0)