Spaces:
Runtime error
Runtime error
| const { finished, PassThrough } = require('stream') | |
| const { | |
| InvalidArgumentError, | |
| InvalidReturnValueError, | |
| RequestAbortedError | |
| } = require('../core/errors') | |
| const util = require('../core/util') | |
| const { getResolveErrorBodyCallback } = require('./util') | |
| const { AsyncResource } = require('async_hooks') | |
| const { addSignal, removeSignal } = require('./abort-signal') | |
| class StreamHandler extends AsyncResource { | |
| constructor (opts, factory, callback) { | |
| if (!opts || typeof opts !== 'object') { | |
| throw new InvalidArgumentError('invalid opts') | |
| } | |
| const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts | |
| try { | |
| if (typeof callback !== 'function') { | |
| throw new InvalidArgumentError('invalid callback') | |
| } | |
| if (typeof factory !== 'function') { | |
| throw new InvalidArgumentError('invalid factory') | |
| } | |
| if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | |
| throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') | |
| } | |
| if (method === 'CONNECT') { | |
| throw new InvalidArgumentError('invalid method') | |
| } | |
| if (onInfo && typeof onInfo !== 'function') { | |
| throw new InvalidArgumentError('invalid onInfo callback') | |
| } | |
| super('UNDICI_STREAM') | |
| } catch (err) { | |
| if (util.isStream(body)) { | |
| util.destroy(body.on('error', util.nop), err) | |
| } | |
| throw err | |
| } | |
| this.responseHeaders = responseHeaders || null | |
| this.opaque = opaque || null | |
| this.factory = factory | |
| this.callback = callback | |
| this.res = null | |
| this.abort = null | |
| this.context = null | |
| this.trailers = null | |
| this.body = body | |
| this.onInfo = onInfo || null | |
| this.throwOnError = throwOnError || false | |
| if (util.isStream(body)) { | |
| body.on('error', (err) => { | |
| this.onError(err) | |
| }) | |
| } | |
| addSignal(this, signal) | |
| } | |
| onConnect (abort, context) { | |
| if (!this.callback) { | |
| throw new RequestAbortedError() | |
| } | |
| this.abort = abort | |
| this.context = context | |
| } | |
| onHeaders (statusCode, rawHeaders, resume, statusMessage) { | |
| const { factory, opaque, context, callback, responseHeaders } = this | |
| const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) | |
| if (statusCode < 200) { | |
| if (this.onInfo) { | |
| this.onInfo({ statusCode, headers }) | |
| } | |
| return | |
| } | |
| this.factory = null | |
| let res | |
| if (this.throwOnError && statusCode >= 400) { | |
| const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers | |
| const contentType = parsedHeaders['content-type'] | |
| res = new PassThrough() | |
| this.callback = null | |
| this.runInAsyncScope(getResolveErrorBodyCallback, null, | |
| { callback, body: res, contentType, statusCode, statusMessage, headers } | |
| ) | |
| } else { | |
| if (factory === null) { | |
| return | |
| } | |
| res = this.runInAsyncScope(factory, null, { | |
| statusCode, | |
| headers, | |
| opaque, | |
| context | |
| }) | |
| if ( | |
| !res || | |
| typeof res.write !== 'function' || | |
| typeof res.end !== 'function' || | |
| typeof res.on !== 'function' | |
| ) { | |
| throw new InvalidReturnValueError('expected Writable') | |
| } | |
| // TODO: Avoid finished. It registers an unnecessary amount of listeners. | |
| finished(res, { readable: false }, (err) => { | |
| const { callback, res, opaque, trailers, abort } = this | |
| this.res = null | |
| if (err || !res.readable) { | |
| util.destroy(res, err) | |
| } | |
| this.callback = null | |
| this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) | |
| if (err) { | |
| abort() | |
| } | |
| }) | |
| } | |
| res.on('drain', resume) | |
| this.res = res | |
| const needDrain = res.writableNeedDrain !== undefined | |
| ? res.writableNeedDrain | |
| : res._writableState && res._writableState.needDrain | |
| return needDrain !== true | |
| } | |
| onData (chunk) { | |
| const { res } = this | |
| return res ? res.write(chunk) : true | |
| } | |
| onComplete (trailers) { | |
| const { res } = this | |
| removeSignal(this) | |
| if (!res) { | |
| return | |
| } | |
| this.trailers = util.parseHeaders(trailers) | |
| res.end() | |
| } | |
| onError (err) { | |
| const { res, callback, opaque, body } = this | |
| removeSignal(this) | |
| this.factory = null | |
| if (res) { | |
| this.res = null | |
| util.destroy(res, err) | |
| } else if (callback) { | |
| this.callback = null | |
| queueMicrotask(() => { | |
| this.runInAsyncScope(callback, null, err, { opaque }) | |
| }) | |
| } | |
| if (body) { | |
| this.body = null | |
| util.destroy(body, err) | |
| } | |
| } | |
| } | |
| function stream (opts, factory, callback) { | |
| if (callback === undefined) { | |
| return new Promise((resolve, reject) => { | |
| stream.call(this, opts, factory, (err, data) => { | |
| return err ? reject(err) : resolve(data) | |
| }) | |
| }) | |
| } | |
| try { | |
| this.dispatch(opts, new StreamHandler(opts, factory, callback)) | |
| } catch (err) { | |
| if (typeof callback !== 'function') { | |
| throw err | |
| } | |
| const opaque = opts && opts.opaque | |
| queueMicrotask(() => callback(err, { opaque })) | |
| } | |
| } | |
| module.exports = stream | |