Spaces:
Runtime error
Runtime error
| // Ported from https://github.com/nodejs/undici/pull/907 | |
| const assert = require('assert') | |
| const { Readable } = require('stream') | |
| const { RequestAbortedError, NotSupportedError, InvalidArgumentError } = require('../core/errors') | |
| const util = require('../core/util') | |
| const { ReadableStreamFrom, toUSVString } = require('../core/util') | |
| let Blob | |
| const kConsume = Symbol('kConsume') | |
| const kReading = Symbol('kReading') | |
| const kBody = Symbol('kBody') | |
| const kAbort = Symbol('abort') | |
| const kContentType = Symbol('kContentType') | |
| module.exports = class BodyReadable extends Readable { | |
| constructor ({ | |
| resume, | |
| abort, | |
| contentType = '', | |
| highWaterMark = 64 * 1024 // Same as nodejs fs streams. | |
| }) { | |
| super({ | |
| autoDestroy: true, | |
| read: resume, | |
| highWaterMark | |
| }) | |
| this._readableState.dataEmitted = false | |
| this[kAbort] = abort | |
| this[kConsume] = null | |
| this[kBody] = null | |
| this[kContentType] = contentType | |
| // Is stream being consumed through Readable API? | |
| // This is an optimization so that we avoid checking | |
| // for 'data' and 'readable' listeners in the hot path | |
| // inside push(). | |
| this[kReading] = false | |
| } | |
| destroy (err) { | |
| if (this.destroyed) { | |
| // Node < 16 | |
| return this | |
| } | |
| if (!err && !this._readableState.endEmitted) { | |
| err = new RequestAbortedError() | |
| } | |
| if (err) { | |
| this[kAbort]() | |
| } | |
| return super.destroy(err) | |
| } | |
| emit (ev, ...args) { | |
| if (ev === 'data') { | |
| // Node < 16.7 | |
| this._readableState.dataEmitted = true | |
| } else if (ev === 'error') { | |
| // Node < 16 | |
| this._readableState.errorEmitted = true | |
| } | |
| return super.emit(ev, ...args) | |
| } | |
| on (ev, ...args) { | |
| if (ev === 'data' || ev === 'readable') { | |
| this[kReading] = true | |
| } | |
| return super.on(ev, ...args) | |
| } | |
| addListener (ev, ...args) { | |
| return this.on(ev, ...args) | |
| } | |
| off (ev, ...args) { | |
| const ret = super.off(ev, ...args) | |
| if (ev === 'data' || ev === 'readable') { | |
| this[kReading] = ( | |
| this.listenerCount('data') > 0 || | |
| this.listenerCount('readable') > 0 | |
| ) | |
| } | |
| return ret | |
| } | |
| removeListener (ev, ...args) { | |
| return this.off(ev, ...args) | |
| } | |
| push (chunk) { | |
| if (this[kConsume] && chunk !== null && this.readableLength === 0) { | |
| consumePush(this[kConsume], chunk) | |
| return this[kReading] ? super.push(chunk) : true | |
| } | |
| return super.push(chunk) | |
| } | |
| // https://fetch.spec.whatwg.org/#dom-body-text | |
| async text () { | |
| return consume(this, 'text') | |
| } | |
| // https://fetch.spec.whatwg.org/#dom-body-json | |
| async json () { | |
| return consume(this, 'json') | |
| } | |
| // https://fetch.spec.whatwg.org/#dom-body-blob | |
| async blob () { | |
| return consume(this, 'blob') | |
| } | |
| // https://fetch.spec.whatwg.org/#dom-body-arraybuffer | |
| async arrayBuffer () { | |
| return consume(this, 'arrayBuffer') | |
| } | |
| // https://fetch.spec.whatwg.org/#dom-body-formdata | |
| async formData () { | |
| // TODO: Implement. | |
| throw new NotSupportedError() | |
| } | |
| // https://fetch.spec.whatwg.org/#dom-body-bodyused | |
| get bodyUsed () { | |
| return util.isDisturbed(this) | |
| } | |
| // https://fetch.spec.whatwg.org/#dom-body-body | |
| get body () { | |
| if (!this[kBody]) { | |
| this[kBody] = ReadableStreamFrom(this) | |
| if (this[kConsume]) { | |
| // TODO: Is this the best way to force a lock? | |
| this[kBody].getReader() // Ensure stream is locked. | |
| assert(this[kBody].locked) | |
| } | |
| } | |
| return this[kBody] | |
| } | |
| async dump (opts) { | |
| let limit = opts && Number.isFinite(opts.limit) ? opts.limit : 262144 | |
| const signal = opts && opts.signal | |
| const abortFn = () => { | |
| this.destroy() | |
| } | |
| let signalListenerCleanup | |
| if (signal) { | |
| if (typeof signal !== 'object' || !('aborted' in signal)) { | |
| throw new InvalidArgumentError('signal must be an AbortSignal') | |
| } | |
| util.throwIfAborted(signal) | |
| signalListenerCleanup = util.addAbortListener(signal, abortFn) | |
| } | |
| try { | |
| for await (const chunk of this) { | |
| util.throwIfAborted(signal) | |
| limit -= Buffer.byteLength(chunk) | |
| if (limit < 0) { | |
| return | |
| } | |
| } | |
| } catch { | |
| util.throwIfAborted(signal) | |
| } finally { | |
| if (typeof signalListenerCleanup === 'function') { | |
| signalListenerCleanup() | |
| } else if (signalListenerCleanup) { | |
| signalListenerCleanup[Symbol.dispose]() | |
| } | |
| } | |
| } | |
| } | |
| // https://streams.spec.whatwg.org/#readablestream-locked | |
| function isLocked (self) { | |
| // Consume is an implicit lock. | |
| return (self[kBody] && self[kBody].locked === true) || self[kConsume] | |
| } | |
| // https://fetch.spec.whatwg.org/#body-unusable | |
| function isUnusable (self) { | |
| return util.isDisturbed(self) || isLocked(self) | |
| } | |
| async function consume (stream, type) { | |
| if (isUnusable(stream)) { | |
| throw new TypeError('unusable') | |
| } | |
| assert(!stream[kConsume]) | |
| return new Promise((resolve, reject) => { | |
| stream[kConsume] = { | |
| type, | |
| stream, | |
| resolve, | |
| reject, | |
| length: 0, | |
| body: [] | |
| } | |
| stream | |
| .on('error', function (err) { | |
| consumeFinish(this[kConsume], err) | |
| }) | |
| .on('close', function () { | |
| if (this[kConsume].body !== null) { | |
| consumeFinish(this[kConsume], new RequestAbortedError()) | |
| } | |
| }) | |
| process.nextTick(consumeStart, stream[kConsume]) | |
| }) | |
| } | |
| function consumeStart (consume) { | |
| if (consume.body === null) { | |
| return | |
| } | |
| const { _readableState: state } = consume.stream | |
| for (const chunk of state.buffer) { | |
| consumePush(consume, chunk) | |
| } | |
| if (state.endEmitted) { | |
| consumeEnd(this[kConsume]) | |
| } else { | |
| consume.stream.on('end', function () { | |
| consumeEnd(this[kConsume]) | |
| }) | |
| } | |
| consume.stream.resume() | |
| while (consume.stream.read() != null) { | |
| // Loop | |
| } | |
| } | |
| function consumeEnd (consume) { | |
| const { type, body, resolve, stream, length } = consume | |
| try { | |
| if (type === 'text') { | |
| resolve(toUSVString(Buffer.concat(body))) | |
| } else if (type === 'json') { | |
| resolve(JSON.parse(Buffer.concat(body))) | |
| } else if (type === 'arrayBuffer') { | |
| const dst = new Uint8Array(length) | |
| let pos = 0 | |
| for (const buf of body) { | |
| dst.set(buf, pos) | |
| pos += buf.byteLength | |
| } | |
| resolve(dst.buffer) | |
| } else if (type === 'blob') { | |
| if (!Blob) { | |
| Blob = require('buffer').Blob | |
| } | |
| resolve(new Blob(body, { type: stream[kContentType] })) | |
| } | |
| consumeFinish(consume) | |
| } catch (err) { | |
| stream.destroy(err) | |
| } | |
| } | |
| function consumePush (consume, chunk) { | |
| consume.length += chunk.length | |
| consume.body.push(chunk) | |
| } | |
| function consumeFinish (consume, err) { | |
| if (consume.body === null) { | |
| return | |
| } | |
| if (err) { | |
| consume.reject(err) | |
| } else { | |
| consume.resolve() | |
| } | |
| consume.type = null | |
| consume.stream = null | |
| consume.resolve = null | |
| consume.reject = null | |
| consume.length = 0 | |
| consume.body = null | |
| } | |