/** * Original sources: * - https://github.com/kmalakoff/response-iterator/blob/master/src/index.ts * - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts */ import { Readable as NodeReadableStream } from 'stream'; import type { Response as NodeResponse } from 'undici'; interface NodeStreamIterator { next(): Promise>; [Symbol.asyncIterator]?(): AsyncIterator; } interface PromiseIterator { next(): Promise>; [Symbol.asyncIterator]?(): AsyncIterator; } interface ReaderIterator { next(): Promise>; [Symbol.asyncIterator]?(): AsyncIterator; } const canUseSymbol = typeof Symbol === 'function' && typeof Symbol.for === 'function'; const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator; function isBuffer(value: any): value is Buffer { return ( value != null && value.constructor != null && typeof value.constructor.isBuffer === 'function' && value.constructor.isBuffer(value) ); } function isNodeResponse(value: any): value is NodeResponse { return !!(value as NodeResponse).body; } function isReadableStream(value: any): value is ReadableStream { return !!(value as ReadableStream).getReader; } function isAsyncIterableIterator(value: any): value is AsyncIterableIterator { return !!( canUseAsyncIteratorSymbol && (value as AsyncIterableIterator)[Symbol.asyncIterator] ); } function isStreamableBlob(value: any): value is Blob { return !!(value as Blob).stream; } function isBlob(value: any): value is Blob { return !!(value as Blob).arrayBuffer; } function isNodeReadableStream(value: any): value is NodeReadableStream { return !!(value as NodeReadableStream).pipe; } function readerIterator(reader: ReadableStreamDefaultReader): AsyncIterableIterator { const iterator: ReaderIterator = { next() { return reader.read(); }, }; if (canUseAsyncIteratorSymbol) { iterator[Symbol.asyncIterator] = function (): AsyncIterator { //@ts-expect-error return this; }; } return iterator as AsyncIterableIterator; } function promiseIterator(promise: Promise): AsyncIterableIterator { let resolved = false; const iterator: PromiseIterator = { next(): Promise> { if (resolved) return Promise.resolve({ value: undefined, done: true, }); resolved = true; return new Promise(function (resolve, reject) { promise .then(function (value) { resolve({ value: value as unknown as T, done: false }); }) .catch(reject); }); }, }; if (canUseAsyncIteratorSymbol) { iterator[Symbol.asyncIterator] = function (): AsyncIterator { return this; }; } return iterator as AsyncIterableIterator; } function nodeStreamIterator(stream: NodeReadableStream): AsyncIterableIterator { let cleanup: (() => void) | null = null; let error: Error | null = null; let done = false; const data: unknown[] = []; const waiting: [ ( value: | IteratorResult | PromiseLike> ) => void, (reason?: any) => void ][] = []; function onData(chunk: any) { if (error) return; if (waiting.length) { const shiftedArr = waiting.shift(); if (Array.isArray(shiftedArr) && shiftedArr[0]) { return shiftedArr[0]({ value: chunk, done: false }); } } data.push(chunk); } function onError(err: Error) { error = err; const all = waiting.slice(); all.forEach(function (pair) { pair[1](err); }); !cleanup || cleanup(); } function onEnd() { done = true; const all = waiting.slice(); all.forEach(function (pair) { pair[0]({ value: undefined, done: true }); }); !cleanup || cleanup(); } cleanup = function () { cleanup = null; stream.removeListener('data', onData); stream.removeListener('error', onError); stream.removeListener('end', onEnd); stream.removeListener('finish', onEnd); stream.removeListener('close', onEnd); }; stream.on('data', onData); stream.on('error', onError); stream.on('end', onEnd); stream.on('finish', onEnd); stream.on('close', onEnd); function getNext(): Promise> { return new Promise(function (resolve, reject) { if (error) return reject(error); if (data.length) return resolve({ value: data.shift() as T, done: false }); if (done) return resolve({ value: undefined, done: true }); waiting.push([resolve, reject]); }); } const iterator: NodeStreamIterator = { next(): Promise> { return getNext(); }, }; if (canUseAsyncIteratorSymbol) { iterator[Symbol.asyncIterator] = function (): AsyncIterator { return this; }; } return iterator as AsyncIterableIterator; } function asyncIterator(source: AsyncIterableIterator): AsyncIterableIterator { const iterator = source[Symbol.asyncIterator](); return { next(): Promise> { return iterator.next(); }, [Symbol.asyncIterator](): AsyncIterableIterator { return this; }, }; } export function responseIterator( response: Response | NodeResponse | Buffer ): AsyncIterableIterator { let body: unknown = response; if (isNodeResponse(response)) body = response.body; if (isBuffer(body)) body = NodeReadableStream.from(body); if (isAsyncIterableIterator(body)) return asyncIterator(body); if (isReadableStream(body)) return readerIterator(body.getReader()); // this errors without casting to ReadableStream // because Blob.stream() returns a NodeJS ReadableStream if (isStreamableBlob(body)) { return readerIterator((body.stream() as unknown as ReadableStream).getReader()); } if (isBlob(body)) return promiseIterator(body.arrayBuffer()); if (isNodeReadableStream(body)) return nodeStreamIterator(body); throw new Error('Unknown body type for responseIterator. Please pass a streamable response.'); }