Skip to content

Commit b370028

Browse files
committed
stream: simplify Transform
1 parent 065a6b2 commit b370028

File tree

5 files changed

+42
-55
lines changed

5 files changed

+42
-55
lines changed

lib/_stream_readable.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,6 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
315315
}
316316

317317
function addChunk(stream, state, chunk, addToFront) {
318-
console.log('addChunk', state.flowing, state.length, state.sync)
319318
if (state.flowing && state.length === 0 && !state.sync) {
320319
// Use the guard to avoid creating `Set()` repeatedly
321320
// when we have multiple pipes.

lib/_stream_transform.js

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@ const { Object } = primordials;
6868
module.exports = Transform;
6969
const {
7070
ERR_METHOD_NOT_IMPLEMENTED,
71-
ERR_MULTIPLE_CALLBACK,
72-
ERR_TRANSFORM_ALREADY_TRANSFORMING,
73-
ERR_TRANSFORM_WITH_LENGTH_0
71+
ERR_MULTIPLE_CALLBACK
7472
} = require('internal/errors').codes;
7573
const Duplex = require('_stream_duplex');
7674
Object.setPrototypeOf(Transform.prototype, Duplex.prototype);
@@ -90,35 +88,50 @@ function Transform(options) {
9088
this._flush = options.flush;
9189
}
9290

93-
const final = this._final || (cb => cb());
94-
const flush = this._flush || (cb => cb());
91+
this._readableState.sync = false;
92+
this._resume = null;
93+
};
9594

96-
this._final = function (cb) {
97-
final(() => process.nextTick(flush, () => {
98-
cb();
99-
this.push(null);
100-
}));
95+
Transform.prototype._final = function (cb) {
96+
if (this._flush) {
97+
this._flush((err) => {
98+
if (err) {
99+
cb(err);
100+
} else {
101+
this.push(null);
102+
cb();
103+
}
104+
})
105+
} else {
106+
this.push(null);
107+
cb();
101108
}
102-
103-
this._readableState.sync = false;
104109
};
105110

106111
Transform.prototype._read = function (n) {
107112
if (this._resume) {
108113
this._resume();
109114
this._resume = null;
110115
}
111-
}
116+
};
112117

113118
Transform.prototype._write = function (chunk, encoding, callback) {
114-
this._transform.call(this, chunk, encoding, (...args) => {
115-
if (args[0]) {
116-
callback(args[0]);
119+
let called = false;
120+
this._transform.call(this, chunk, encoding, (err, val) => {
121+
if (err) {
122+
callback(err);
117123
return;
118124
}
119125

120-
if (args.length > 1) {
121-
this.push(args[1]);
126+
if (called) {
127+
callback(new ERR_MULTIPLE_CALLBACK());
128+
return;
129+
} else {
130+
called = true;
131+
}
132+
133+
if (val !== undefined) {
134+
this.push(val);
122135
}
123136

124137
const r = this._readableState;

test/parallel/test-stream-transform-constructor-set-methods.js

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,13 @@ const _transform = common.mustCall((chunk, _, next) => {
1818
next();
1919
});
2020

21-
const _final = common.mustCall((next) => {
22-
next();
23-
});
24-
2521
const _flush = common.mustCall((next) => {
2622
next();
2723
});
2824

2925
const t2 = new Transform({
3026
transform: _transform,
31-
flush: _flush,
32-
final: _final
27+
flush: _flush
3328
});
3429

3530
strictEqual(t2._transform, _transform);

test/parallel/test-stream-transform-final-sync.js

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,36 +68,27 @@ const t = new stream.Transform({
6868
assert.strictEqual(++state, chunk + 2);
6969
process.nextTick(next);
7070
}, 3),
71-
final: common.mustCall(function(done) {
72-
state++;
73-
// finalCallback part 1
74-
assert.strictEqual(state, 10);
75-
state++;
76-
// finalCallback part 2
77-
assert.strictEqual(state, 11);
78-
done();
79-
}, 1),
8071
flush: common.mustCall(function(done) {
8172
state++;
8273
// fluchCallback part 1
83-
assert.strictEqual(state, 12);
74+
assert.strictEqual(state, 10);
8475
process.nextTick(function() {
8576
state++;
8677
// fluchCallback part 2
87-
assert.strictEqual(state, 13);
78+
assert.strictEqual(state, 11);
8879
done();
8980
});
9081
}, 1)
9182
});
9283
t.on('finish', common.mustCall(function() {
9384
state++;
9485
// finishListener
95-
assert.strictEqual(state, 14);
86+
assert.strictEqual(state, 12);
9687
}, 1));
9788
t.on('end', common.mustCall(function() {
9889
state++;
9990
// endEvent
100-
assert.strictEqual(state, 16);
91+
assert.strictEqual(state, 14);
10192
}, 1));
10293
t.on('data', common.mustCall(function(d) {
10394
// dataListener
@@ -109,5 +100,5 @@ t.write(4);
109100
t.end(7, common.mustCall(function() {
110101
state++;
111102
// endMethodCallback
112-
assert.strictEqual(state, 15);
103+
assert.strictEqual(state, 13);
113104
}, 1));

test/parallel/test-stream-transform-final.js

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,27 +68,16 @@ const t = new stream.Transform({
6868
assert.strictEqual(++state, chunk + 2);
6969
process.nextTick(next);
7070
}, 3),
71-
final: common.mustCall(function(done) {
72-
state++;
73-
// finalCallback part 1
74-
assert.strictEqual(state, 10);
75-
setTimeout(function() {
76-
state++;
77-
// finalCallback part 2
78-
assert.strictEqual(state, 11);
79-
done();
80-
}, 100);
81-
}, 1),
8271
flush: common.mustCall(function(done) {
8372
state++;
8473
// flushCallback part 1
8574
console.log('flushCallback part 1');
86-
assert.strictEqual(state, 12);
75+
assert.strictEqual(state, 10);
8776
process.nextTick(function() {
8877
state++;
8978
// flushCallback part 2
9079
console.log('flushCallback part 2');
91-
assert.strictEqual(state, 13);
80+
assert.strictEqual(state, 11);
9281
done();
9382
});
9483
}, 1)
@@ -97,12 +86,12 @@ t.on('finish', common.mustCall(function() {
9786
state++;
9887
// finishListener
9988
console.log('finishListener');
100-
assert.strictEqual(state, 14);
89+
assert.strictEqual(state, 12);
10190
}, 1));
10291
t.on('end', common.mustCall(function() {
10392
state++;
10493
// end event
105-
assert.strictEqual(state, 16);
94+
assert.strictEqual(state, 14);
10695
}, 1));
10796
t.on('data', common.mustCall(function(d) {
10897
// dataListener
@@ -114,5 +103,5 @@ t.write(4);
114103
t.end(7, common.mustCall(function() {
115104
state++;
116105
// endMethodCallback
117-
assert.strictEqual(state, 15);
106+
assert.strictEqual(state, 13);
118107
}, 1));

0 commit comments

Comments
 (0)