207 lines
5.8 KiB
JavaScript
207 lines
5.8 KiB
JavaScript
'use strict';
|
|
|
|
var _Object$setPrototypeO;
|
|
|
|
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
|
|
|
|
var finished = require('./end-of-stream');
|
|
|
|
var kLastResolve = Symbol('lastResolve');
|
|
var kLastReject = Symbol('lastReject');
|
|
var kError = Symbol('error');
|
|
var kEnded = Symbol('ended');
|
|
var kLastPromise = Symbol('lastPromise');
|
|
var kHandlePromise = Symbol('handlePromise');
|
|
var kStream = Symbol('stream');
|
|
|
|
function createIterResult(value, done) {
|
|
return {
|
|
value: value,
|
|
done: done
|
|
};
|
|
}
|
|
|
|
function readAndResolve(iter) {
|
|
var resolve = iter[kLastResolve];
|
|
|
|
if (resolve !== null) {
|
|
var data = iter[kStream].read(); // we defer if data is null
|
|
// we can be expecting either 'end' or
|
|
// 'error'
|
|
|
|
if (data !== null) {
|
|
iter[kLastPromise] = null;
|
|
iter[kLastResolve] = null;
|
|
iter[kLastReject] = null;
|
|
resolve(createIterResult(data, false));
|
|
}
|
|
}
|
|
}
|
|
|
|
function onReadable(iter) {
|
|
// we wait for the next tick, because it might
|
|
// emit an error with process.nextTick
|
|
process.nextTick(readAndResolve, iter);
|
|
}
|
|
|
|
function wrapForNext(lastPromise, iter) {
|
|
return function (resolve, reject) {
|
|
lastPromise.then(function () {
|
|
if (iter[kEnded]) {
|
|
resolve(createIterResult(undefined, true));
|
|
return;
|
|
}
|
|
|
|
iter[kHandlePromise](resolve, reject);
|
|
}, reject);
|
|
};
|
|
}
|
|
|
|
var AsyncIteratorPrototype = Object.getPrototypeOf(function () {});
|
|
var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = {
|
|
get stream() {
|
|
return this[kStream];
|
|
},
|
|
|
|
next: function next() {
|
|
var _this = this;
|
|
|
|
// if we have detected an error in the meanwhile
|
|
// reject straight away
|
|
var error = this[kError];
|
|
|
|
if (error !== null) {
|
|
return Promise.reject(error);
|
|
}
|
|
|
|
if (this[kEnded]) {
|
|
return Promise.resolve(createIterResult(undefined, true));
|
|
}
|
|
|
|
if (this[kStream].destroyed) {
|
|
// We need to defer via nextTick because if .destroy(err) is
|
|
// called, the error will be emitted via nextTick, and
|
|
// we cannot guarantee that there is no error lingering around
|
|
// waiting to be emitted.
|
|
return new Promise(function (resolve, reject) {
|
|
process.nextTick(function () {
|
|
if (_this[kError]) {
|
|
reject(_this[kError]);
|
|
} else {
|
|
resolve(createIterResult(undefined, true));
|
|
}
|
|
});
|
|
});
|
|
} // if we have multiple next() calls
|
|
// we will wait for the previous Promise to finish
|
|
// this logic is optimized to support for await loops,
|
|
// where next() is only called once at a time
|
|
|
|
|
|
var lastPromise = this[kLastPromise];
|
|
var promise;
|
|
|
|
if (lastPromise) {
|
|
promise = new Promise(wrapForNext(lastPromise, this));
|
|
} else {
|
|
// fast path needed to support multiple this.push()
|
|
// without triggering the next() queue
|
|
var data = this[kStream].read();
|
|
|
|
if (data !== null) {
|
|
return Promise.resolve(createIterResult(data, false));
|
|
}
|
|
|
|
promise = new Promise(this[kHandlePromise]);
|
|
}
|
|
|
|
this[kLastPromise] = promise;
|
|
return promise;
|
|
}
|
|
}, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () {
|
|
return this;
|
|
}), _defineProperty(_Object$setPrototypeO, "return", function _return() {
|
|
var _this2 = this;
|
|
|
|
// destroy(err, cb) is a private API
|
|
// we can guarantee we have that here, because we control the
|
|
// Readable class this is attached to
|
|
return new Promise(function (resolve, reject) {
|
|
_this2[kStream].destroy(null, function (err) {
|
|
if (err) {
|
|
reject(err);
|
|
return;
|
|
}
|
|
|
|
resolve(createIterResult(undefined, true));
|
|
});
|
|
});
|
|
}), _Object$setPrototypeO), AsyncIteratorPrototype);
|
|
|
|
var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) {
|
|
var _Object$create;
|
|
|
|
var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, {
|
|
value: stream,
|
|
writable: true
|
|
}), _defineProperty(_Object$create, kLastResolve, {
|
|
value: null,
|
|
writable: true
|
|
}), _defineProperty(_Object$create, kLastReject, {
|
|
value: null,
|
|
writable: true
|
|
}), _defineProperty(_Object$create, kError, {
|
|
value: null,
|
|
writable: true
|
|
}), _defineProperty(_Object$create, kEnded, {
|
|
value: stream._readableState.endEmitted,
|
|
writable: true
|
|
}), _defineProperty(_Object$create, kHandlePromise, {
|
|
value: function value(resolve, reject) {
|
|
var data = iterator[kStream].read();
|
|
|
|
if (data) {
|
|
iterator[kLastPromise] = null;
|
|
iterator[kLastResolve] = null;
|
|
iterator[kLastReject] = null;
|
|
resolve(createIterResult(data, false));
|
|
} else {
|
|
iterator[kLastResolve] = resolve;
|
|
iterator[kLastReject] = reject;
|
|
}
|
|
},
|
|
writable: true
|
|
}), _Object$create));
|
|
iterator[kLastPromise] = null;
|
|
finished(stream, function (err) {
|
|
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
|
|
var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise
|
|
// returned by next() and store the error
|
|
|
|
if (reject !== null) {
|
|
iterator[kLastPromise] = null;
|
|
iterator[kLastResolve] = null;
|
|
iterator[kLastReject] = null;
|
|
reject(err);
|
|
}
|
|
|
|
iterator[kError] = err;
|
|
return;
|
|
}
|
|
|
|
var resolve = iterator[kLastResolve];
|
|
|
|
if (resolve !== null) {
|
|
iterator[kLastPromise] = null;
|
|
iterator[kLastResolve] = null;
|
|
iterator[kLastReject] = null;
|
|
resolve(createIterResult(undefined, true));
|
|
}
|
|
|
|
iterator[kEnded] = true;
|
|
});
|
|
stream.on('readable', onReadable.bind(null, iterator));
|
|
return iterator;
|
|
};
|
|
|
|
module.exports = createReadableStreamAsyncIterator; |