/** * Original sources: * - https://github.com/kmalakoff/response-iterator/blob/master/src/index.ts * - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts */ import type { Response as NodeResponse } from "node-fetch"; import { Readable as NodeReadableStream } from "stream"; interface NodeStreamIterator { next(): Promise>; [Symbol.asyncIterator]?(): AsyncIterator; } interface PromiseIterator { next(): Promise>; [Symbol.asyncIterator]?(): AsyncIterator; } interface ReaderIterator { next(): Promise>; [Symbol.asyncIterator]?(): AsyncIterator; } const canUseSymbol = typeof Symbol === 'function' && typeof Symbol.for === 'function'; const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator; function isBuffer(value: any): value is Buffer { return value != null && value.constructor != null && typeof value.constructor.isBuffer === 'function' && value.constructor.isBuffer(value) } function isNodeResponse(value: any): value is NodeResponse { return !!(value as NodeResponse).body; } function isReadableStream(value: any): value is ReadableStream { return !!(value as ReadableStream).getReader; } function isAsyncIterableIterator( value: any ): value is AsyncIterableIterator { return !!( canUseAsyncIteratorSymbol && (value as AsyncIterableIterator)[Symbol.asyncIterator] ); } function isStreamableBlob(value: any): value is Blob { return !!(value as Blob).stream; } function isBlob(value: any): value is Blob { return !!(value as Blob).arrayBuffer; } function isNodeReadableStream(value: any): value is NodeReadableStream { return !!(value as NodeReadableStream).pipe; } function readerIterator( reader: ReadableStreamDefaultReader ): AsyncIterableIterator { const iterator: ReaderIterator = { next() { return reader.read(); }, }; if (canUseAsyncIteratorSymbol) { iterator[Symbol.asyncIterator] = function (): AsyncIterator { //@ts-ignore return this; }; } return iterator as AsyncIterableIterator; } function promiseIterator( promise: Promise ): AsyncIterableIterator { let resolved = false; const iterator: PromiseIterator = { next(): Promise> { if (resolved) return Promise.resolve({ value: undefined, done: true, }); resolved = true; return new Promise(function (resolve, reject) { promise .then(function (value) { resolve({ value: value as unknown as T, done: false }); }) .catch(reject); }); }, }; if (canUseAsyncIteratorSymbol) { iterator[Symbol.asyncIterator] = function (): AsyncIterator { return this; }; } return iterator as AsyncIterableIterator; } function nodeStreamIterator( stream: NodeReadableStream ): AsyncIterableIterator { let cleanup: (() => void) | null = null; let error: Error | null = null; let done = false; const data: unknown[] = []; const waiting: [ ( value: | IteratorResult | PromiseLike> ) => void, (reason?: any) => void ][] = []; function onData(chunk: any) { if (error) return; if (waiting.length) { const shiftedArr = waiting.shift(); if (Array.isArray(shiftedArr) && shiftedArr[0]) { return shiftedArr[0]({ value: chunk, done: false }); } } data.push(chunk); } function onError(err: Error) { error = err; const all = waiting.slice(); all.forEach(function (pair) { pair[1](err); }); !cleanup || cleanup(); } function onEnd() { done = true; const all = waiting.slice(); all.forEach(function (pair) { pair[0]({ value: undefined, done: true }); }); !cleanup || cleanup(); } cleanup = function () { cleanup = null; stream.removeListener("data", onData); stream.removeListener("error", onError); stream.removeListener("end", onEnd); stream.removeListener("finish", onEnd); stream.removeListener("close", onEnd); }; stream.on("data", onData); stream.on("error", onError); stream.on("end", onEnd); stream.on("finish", onEnd); stream.on("close", onEnd); function getNext(): Promise> { return new Promise(function (resolve, reject) { if (error) return reject(error); if (data.length) return resolve({ value: data.shift() as T, done: false }); if (done) return resolve({ value: undefined, done: true }); waiting.push([resolve, reject]); }); } const iterator: NodeStreamIterator = { next(): Promise> { return getNext(); }, }; if (canUseAsyncIteratorSymbol) { iterator[Symbol.asyncIterator] = function (): AsyncIterator { return this; }; } return iterator as AsyncIterableIterator; } function asyncIterator( source: AsyncIterableIterator ): AsyncIterableIterator { const iterator = source[Symbol.asyncIterator](); return { next(): Promise> { return iterator.next(); }, [Symbol.asyncIterator](): AsyncIterableIterator { return this; }, }; } export function responseIterator( response: Response | NodeResponse | Buffer ): AsyncIterableIterator { let body: unknown = response; if (isNodeResponse(response)) body = response.body; if (isBuffer(body)) body = NodeReadableStream.from(body); if (isAsyncIterableIterator(body)) return asyncIterator(body); if (isReadableStream(body)) return readerIterator(body.getReader()); // this errors without casting to ReadableStream // because Blob.stream() returns a NodeJS ReadableStream if (isStreamableBlob(body)) { return readerIterator( (body.stream() as unknown as ReadableStream).getReader() ); } if (isBlob(body)) return promiseIterator(body.arrayBuffer()); if (isNodeReadableStream(body)) return nodeStreamIterator(body); throw new Error( "Unknown body type for responseIterator. Please pass a streamable response." ); }