Add response iterator to node adapter (#5418)
* add response iterator to node adapter * changeset * add node-fetch types * fix @types/node-fetch as a dev dep
This commit is contained in:
parent
d017701ae6
commit
aa16b6cebc
5 changed files with 289 additions and 3 deletions
9
.changeset/blue-rabbits-hear.md
Normal file
9
.changeset/blue-rabbits-hear.md
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
---
|
||||||
|
'@astrojs/node': minor
|
||||||
|
---
|
||||||
|
|
||||||
|
Sometimes Astro sends a ReadableStream as a response and it raise an error **TypeError: body is not async iterable.**
|
||||||
|
|
||||||
|
I added a function to get a response iterator from different response types (sourced from apollo-client).
|
||||||
|
|
||||||
|
With this, node adapter can handle all the Astro response types.
|
|
@ -37,6 +37,7 @@
|
||||||
"astro": "^1.6.9"
|
"astro": "^1.6.9"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
"@types/node-fetch": "^2.6.2",
|
||||||
"@types/send": "^0.17.1",
|
"@types/send": "^0.17.1",
|
||||||
"astro": "workspace:*",
|
"astro": "workspace:*",
|
||||||
"astro-scripts": "workspace:*",
|
"astro-scripts": "workspace:*",
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import type { NodeApp } from 'astro/app/node';
|
import type { NodeApp } from 'astro/app/node';
|
||||||
import type { IncomingMessage, ServerResponse } from 'http';
|
import type { IncomingMessage, ServerResponse } from 'http';
|
||||||
import type { Readable } from 'stream';
|
import type { Readable } from 'stream';
|
||||||
|
import { responseIterator } from './response-iterator';
|
||||||
|
|
||||||
export default function (app: NodeApp) {
|
export default function (app: NodeApp) {
|
||||||
return async function (
|
return async function (
|
||||||
|
@ -38,7 +39,7 @@ export default function (app: NodeApp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse: Response) {
|
async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse: Response) {
|
||||||
const { status, headers, body } = webResponse;
|
const { status, headers } = webResponse;
|
||||||
|
|
||||||
if (app.setCookieHeaders) {
|
if (app.setCookieHeaders) {
|
||||||
const setCookieHeaders: Array<string> = Array.from(app.setCookieHeaders(webResponse));
|
const setCookieHeaders: Array<string> = Array.from(app.setCookieHeaders(webResponse));
|
||||||
|
@ -48,8 +49,8 @@ async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse:
|
||||||
}
|
}
|
||||||
|
|
||||||
res.writeHead(status, Object.fromEntries(headers.entries()));
|
res.writeHead(status, Object.fromEntries(headers.entries()));
|
||||||
if (body) {
|
if (webResponse.body) {
|
||||||
for await (const chunk of body as unknown as Readable) {
|
for await (const chunk of responseIterator(webResponse) as unknown as Readable) {
|
||||||
res.write(chunk);
|
res.write(chunk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
241
packages/integrations/node/src/response-iterator.ts
Normal file
241
packages/integrations/node/src/response-iterator.ts
Normal file
|
@ -0,0 +1,241 @@
|
||||||
|
/**
|
||||||
|
* Original sources:
|
||||||
|
* - https://github.com/kmalakoff/response-iterator/blob/master/src/index.ts
|
||||||
|
* - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { Response as NodeResponse } from "node-fetch";
|
||||||
|
import { Readable as NodeReadableStream } from "stream";
|
||||||
|
|
||||||
|
interface NodeStreamIterator<T> {
|
||||||
|
next(): Promise<IteratorResult<T, boolean | undefined>>;
|
||||||
|
[Symbol.asyncIterator]?(): AsyncIterator<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface PromiseIterator<T> {
|
||||||
|
next(): Promise<IteratorResult<T, ArrayBuffer | undefined>>;
|
||||||
|
[Symbol.asyncIterator]?(): AsyncIterator<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ReaderIterator<T> {
|
||||||
|
next(): Promise<ReadableStreamDefaultReadResult<T>>;
|
||||||
|
[Symbol.asyncIterator]?(): AsyncIterator<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
const canUseSymbol =
|
||||||
|
typeof Symbol === 'function' &&
|
||||||
|
typeof Symbol.for === 'function';
|
||||||
|
|
||||||
|
const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator;
|
||||||
|
|
||||||
|
function isBuffer(value: any): value is Buffer {
|
||||||
|
return value != null && value.constructor != null &&
|
||||||
|
typeof value.constructor.isBuffer === 'function' && value.constructor.isBuffer(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
function isNodeResponse(value: any): value is NodeResponse {
|
||||||
|
return !!(value as NodeResponse).body;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isReadableStream(value: any): value is ReadableStream<any> {
|
||||||
|
return !!(value as ReadableStream<any>).getReader;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isAsyncIterableIterator(
|
||||||
|
value: any
|
||||||
|
): value is AsyncIterableIterator<any> {
|
||||||
|
return !!(
|
||||||
|
canUseAsyncIteratorSymbol &&
|
||||||
|
(value as AsyncIterableIterator<any>)[Symbol.asyncIterator]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isStreamableBlob(value: any): value is Blob {
|
||||||
|
return !!(value as Blob).stream;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isBlob(value: any): value is Blob {
|
||||||
|
return !!(value as Blob).arrayBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isNodeReadableStream(value: any): value is NodeReadableStream {
|
||||||
|
return !!(value as NodeReadableStream).pipe;
|
||||||
|
}
|
||||||
|
|
||||||
|
function readerIterator<T>(
|
||||||
|
reader: ReadableStreamDefaultReader<T>
|
||||||
|
): AsyncIterableIterator<T> {
|
||||||
|
const iterator: ReaderIterator<T> = {
|
||||||
|
next() {
|
||||||
|
return reader.read();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if (canUseAsyncIteratorSymbol) {
|
||||||
|
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
|
||||||
|
//@ts-ignore
|
||||||
|
return this;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return iterator as AsyncIterableIterator<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
function promiseIterator<T = ArrayBuffer>(
|
||||||
|
promise: Promise<ArrayBuffer>
|
||||||
|
): AsyncIterableIterator<T> {
|
||||||
|
let resolved = false;
|
||||||
|
|
||||||
|
const iterator: PromiseIterator<T> = {
|
||||||
|
next(): Promise<IteratorResult<T, ArrayBuffer | undefined>> {
|
||||||
|
if (resolved)
|
||||||
|
return Promise.resolve({
|
||||||
|
value: undefined,
|
||||||
|
done: true,
|
||||||
|
});
|
||||||
|
resolved = true;
|
||||||
|
return new Promise(function (resolve, reject) {
|
||||||
|
promise
|
||||||
|
.then(function (value) {
|
||||||
|
resolve({ value: value as unknown as T, done: false });
|
||||||
|
})
|
||||||
|
.catch(reject);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if (canUseAsyncIteratorSymbol) {
|
||||||
|
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
|
||||||
|
return this;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return iterator as AsyncIterableIterator<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
function nodeStreamIterator<T>(
|
||||||
|
stream: NodeReadableStream
|
||||||
|
): AsyncIterableIterator<T> {
|
||||||
|
let cleanup: (() => void) | null = null;
|
||||||
|
let error: Error | null = null;
|
||||||
|
let done = false;
|
||||||
|
const data: unknown[] = [];
|
||||||
|
|
||||||
|
const waiting: [
|
||||||
|
(
|
||||||
|
value:
|
||||||
|
| IteratorResult<T, boolean | undefined>
|
||||||
|
| PromiseLike<IteratorResult<T, boolean | undefined>>
|
||||||
|
) => void,
|
||||||
|
(reason?: any) => void
|
||||||
|
][] = [];
|
||||||
|
|
||||||
|
function onData(chunk: any) {
|
||||||
|
if (error) return;
|
||||||
|
if (waiting.length) {
|
||||||
|
const shiftedArr = waiting.shift();
|
||||||
|
if (Array.isArray(shiftedArr) && shiftedArr[0]) {
|
||||||
|
return shiftedArr[0]({ value: chunk, done: false });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
data.push(chunk);
|
||||||
|
}
|
||||||
|
function onError(err: Error) {
|
||||||
|
error = err;
|
||||||
|
const all = waiting.slice();
|
||||||
|
all.forEach(function (pair) {
|
||||||
|
pair[1](err);
|
||||||
|
});
|
||||||
|
!cleanup || cleanup();
|
||||||
|
}
|
||||||
|
function onEnd() {
|
||||||
|
done = true;
|
||||||
|
const all = waiting.slice();
|
||||||
|
all.forEach(function (pair) {
|
||||||
|
pair[0]({ value: undefined, done: true });
|
||||||
|
});
|
||||||
|
!cleanup || cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup = function () {
|
||||||
|
cleanup = null;
|
||||||
|
stream.removeListener("data", onData);
|
||||||
|
stream.removeListener("error", onError);
|
||||||
|
stream.removeListener("end", onEnd);
|
||||||
|
stream.removeListener("finish", onEnd);
|
||||||
|
stream.removeListener("close", onEnd);
|
||||||
|
};
|
||||||
|
stream.on("data", onData);
|
||||||
|
stream.on("error", onError);
|
||||||
|
stream.on("end", onEnd);
|
||||||
|
stream.on("finish", onEnd);
|
||||||
|
stream.on("close", onEnd);
|
||||||
|
|
||||||
|
function getNext(): Promise<IteratorResult<T, boolean | undefined>> {
|
||||||
|
return new Promise(function (resolve, reject) {
|
||||||
|
if (error) return reject(error);
|
||||||
|
if (data.length) return resolve({ value: data.shift() as T, done: false });
|
||||||
|
if (done) return resolve({ value: undefined, done: true });
|
||||||
|
waiting.push([resolve, reject]);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const iterator: NodeStreamIterator<T> = {
|
||||||
|
next(): Promise<IteratorResult<T, boolean | undefined>> {
|
||||||
|
return getNext();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if (canUseAsyncIteratorSymbol) {
|
||||||
|
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
|
||||||
|
return this;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return iterator as AsyncIterableIterator<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
function asyncIterator<T>(
|
||||||
|
source: AsyncIterableIterator<T>
|
||||||
|
): AsyncIterableIterator<T> {
|
||||||
|
const iterator = source[Symbol.asyncIterator]();
|
||||||
|
return {
|
||||||
|
next(): Promise<IteratorResult<T, boolean>> {
|
||||||
|
return iterator.next();
|
||||||
|
},
|
||||||
|
[Symbol.asyncIterator](): AsyncIterableIterator<T> {
|
||||||
|
return this;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function responseIterator<T>(
|
||||||
|
response: Response | NodeResponse | Buffer
|
||||||
|
): AsyncIterableIterator<T> {
|
||||||
|
let body: unknown = response;
|
||||||
|
|
||||||
|
if (isNodeResponse(response)) body = response.body;
|
||||||
|
|
||||||
|
if (isBuffer(body)) body = NodeReadableStream.from(body);
|
||||||
|
|
||||||
|
if (isAsyncIterableIterator(body)) return asyncIterator<T>(body);
|
||||||
|
|
||||||
|
if (isReadableStream(body)) return readerIterator<T>(body.getReader());
|
||||||
|
|
||||||
|
// this errors without casting to ReadableStream<T>
|
||||||
|
// because Blob.stream() returns a NodeJS ReadableStream
|
||||||
|
if (isStreamableBlob(body)) {
|
||||||
|
return readerIterator<T>(
|
||||||
|
(body.stream() as unknown as ReadableStream<T>).getReader()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isBlob(body)) return promiseIterator<T>(body.arrayBuffer());
|
||||||
|
|
||||||
|
if (isNodeReadableStream(body)) return nodeStreamIterator<T>(body);
|
||||||
|
|
||||||
|
throw new Error(
|
||||||
|
"Unknown body type for responseIterator. Please pass a streamable response."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
|
@ -2982,6 +2982,7 @@ importers:
|
||||||
packages/integrations/node:
|
packages/integrations/node:
|
||||||
specifiers:
|
specifiers:
|
||||||
'@astrojs/webapi': ^1.1.1
|
'@astrojs/webapi': ^1.1.1
|
||||||
|
'@types/node-fetch': ^2.6.2
|
||||||
'@types/send': ^0.17.1
|
'@types/send': ^0.17.1
|
||||||
astro: workspace:*
|
astro: workspace:*
|
||||||
astro-scripts: workspace:*
|
astro-scripts: workspace:*
|
||||||
|
@ -2993,6 +2994,7 @@ importers:
|
||||||
'@astrojs/webapi': link:../../webapi
|
'@astrojs/webapi': link:../../webapi
|
||||||
send: 0.18.0
|
send: 0.18.0
|
||||||
devDependencies:
|
devDependencies:
|
||||||
|
'@types/node-fetch': 2.6.2
|
||||||
'@types/send': 0.17.1
|
'@types/send': 0.17.1
|
||||||
astro: link:../../astro
|
astro: link:../../astro
|
||||||
astro-scripts: link:../../../scripts
|
astro-scripts: link:../../../scripts
|
||||||
|
@ -9712,6 +9714,13 @@ packages:
|
||||||
'@types/unist': 2.0.6
|
'@types/unist': 2.0.6
|
||||||
dev: false
|
dev: false
|
||||||
|
|
||||||
|
/@types/node-fetch/2.6.2:
|
||||||
|
resolution: {integrity: sha512-DHqhlq5jeESLy19TYhLakJ07kNumXWjcDdxXsLUMJZ6ue8VZJj4kLPQVE/2mdHh3xZziNF1xppu5lwmS53HR+A==}
|
||||||
|
dependencies:
|
||||||
|
'@types/node': 18.11.9
|
||||||
|
form-data: 3.0.1
|
||||||
|
dev: true
|
||||||
|
|
||||||
/@types/node/12.20.55:
|
/@types/node/12.20.55:
|
||||||
resolution: {integrity: sha512-J8xLz7q2OFulZ2cyGTLE1TbbZcjpno7FaN6zdJNrgAdrJ+DZzh/uFR6YrTb4C+nXakvud8Q4+rbhoIWlYQbUFQ==}
|
resolution: {integrity: sha512-J8xLz7q2OFulZ2cyGTLE1TbbZcjpno7FaN6zdJNrgAdrJ+DZzh/uFR6YrTb4C+nXakvud8Q4+rbhoIWlYQbUFQ==}
|
||||||
dev: true
|
dev: true
|
||||||
|
@ -10581,6 +10590,10 @@ packages:
|
||||||
resolution: {integrity: sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==}
|
resolution: {integrity: sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==}
|
||||||
dev: false
|
dev: false
|
||||||
|
|
||||||
|
/asynckit/0.4.0:
|
||||||
|
resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==}
|
||||||
|
dev: true
|
||||||
|
|
||||||
/at-least-node/1.0.0:
|
/at-least-node/1.0.0:
|
||||||
resolution: {integrity: sha512-+q/t7Ekv1EDY2l6Gda6LLiX14rU9TV20Wa3ofeQmwPFZbOMo9DXrLbOjFaaclkXKWidIaopwAObQDqwWtGUjqg==}
|
resolution: {integrity: sha512-+q/t7Ekv1EDY2l6Gda6LLiX14rU9TV20Wa3ofeQmwPFZbOMo9DXrLbOjFaaclkXKWidIaopwAObQDqwWtGUjqg==}
|
||||||
engines: {node: '>= 4.0.0'}
|
engines: {node: '>= 4.0.0'}
|
||||||
|
@ -11097,6 +11110,13 @@ packages:
|
||||||
resolution: {integrity: sha512-3tlv/dIP7FWvj3BsbHrGLJ6l/oKh1O3TcgBqMn+yyCagOxc23fyzDS6HypQbgxWbkpDnf52p1LuR4eWDQ/K9WQ==}
|
resolution: {integrity: sha512-3tlv/dIP7FWvj3BsbHrGLJ6l/oKh1O3TcgBqMn+yyCagOxc23fyzDS6HypQbgxWbkpDnf52p1LuR4eWDQ/K9WQ==}
|
||||||
dev: false
|
dev: false
|
||||||
|
|
||||||
|
/combined-stream/1.0.8:
|
||||||
|
resolution: {integrity: sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==}
|
||||||
|
engines: {node: '>= 0.8'}
|
||||||
|
dependencies:
|
||||||
|
delayed-stream: 1.0.0
|
||||||
|
dev: true
|
||||||
|
|
||||||
/comma-separated-tokens/2.0.3:
|
/comma-separated-tokens/2.0.3:
|
||||||
resolution: {integrity: sha512-Fu4hJdvzeylCfQPp9SGWidpzrMs7tTrlu6Vb8XGaRGck8QSNZJJp538Wrb60Lax4fPwR64ViY468OIUTbRlGZg==}
|
resolution: {integrity: sha512-Fu4hJdvzeylCfQPp9SGWidpzrMs7tTrlu6Vb8XGaRGck8QSNZJJp538Wrb60Lax4fPwR64ViY468OIUTbRlGZg==}
|
||||||
dev: false
|
dev: false
|
||||||
|
@ -11458,6 +11478,11 @@ packages:
|
||||||
slash: 4.0.0
|
slash: 4.0.0
|
||||||
dev: true
|
dev: true
|
||||||
|
|
||||||
|
/delayed-stream/1.0.0:
|
||||||
|
resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==}
|
||||||
|
engines: {node: '>=0.4.0'}
|
||||||
|
dev: true
|
||||||
|
|
||||||
/delegates/1.0.0:
|
/delegates/1.0.0:
|
||||||
resolution: {integrity: sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==}
|
resolution: {integrity: sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==}
|
||||||
dev: false
|
dev: false
|
||||||
|
@ -12751,6 +12776,15 @@ packages:
|
||||||
resolution: {integrity: sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==}
|
resolution: {integrity: sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==}
|
||||||
dev: true
|
dev: true
|
||||||
|
|
||||||
|
/form-data/3.0.1:
|
||||||
|
resolution: {integrity: sha512-RHkBKtLWUVwd7SqRIvCZMEvAMoGUp0XU+seQiZejj0COz3RI3hWP4sCv3gZWWLjJTd7rGwcsF5eKZGii0r/hbg==}
|
||||||
|
engines: {node: '>= 6'}
|
||||||
|
dependencies:
|
||||||
|
asynckit: 0.4.0
|
||||||
|
combined-stream: 1.0.8
|
||||||
|
mime-types: 2.1.35
|
||||||
|
dev: true
|
||||||
|
|
||||||
/format/0.2.2:
|
/format/0.2.2:
|
||||||
resolution: {integrity: sha512-wzsgA6WOq+09wrU1tsJ09udeR/YZRaeArL9e1wPbFg3GG2yDnC2ldKpxs4xunpFF9DgqCqOIra3bc1HWrJ37Ww==}
|
resolution: {integrity: sha512-wzsgA6WOq+09wrU1tsJ09udeR/YZRaeArL9e1wPbFg3GG2yDnC2ldKpxs4xunpFF9DgqCqOIra3bc1HWrJ37Ww==}
|
||||||
engines: {node: '>=0.4.x'}
|
engines: {node: '>=0.4.x'}
|
||||||
|
|
Loading…
Reference in a new issue