Stream request body instead of buffering it in memory (#8084)
Co-authored-by: Matthew Phillips <matthew@matthewphillips.info>
This commit is contained in:
parent
3755424f93
commit
560e459246
6 changed files with 167 additions and 57 deletions
6
.changeset/lemon-lobsters-do.md
Normal file
6
.changeset/lemon-lobsters-do.md
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
---
|
||||||
|
'@astrojs/node': patch
|
||||||
|
'astro': patch
|
||||||
|
---
|
||||||
|
|
||||||
|
Stream request body instead of buffering it in memory.
|
|
@ -9,20 +9,33 @@ import { App, type MatchOptions } from './index.js';
|
||||||
|
|
||||||
const clientAddressSymbol = Symbol.for('astro.clientAddress');
|
const clientAddressSymbol = Symbol.for('astro.clientAddress');
|
||||||
|
|
||||||
function createRequestFromNodeRequest(req: NodeIncomingMessage, body?: Uint8Array): Request {
|
type CreateNodeRequestOptions = {
|
||||||
|
emptyBody?: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
|
type BodyProps = Partial<RequestInit>;
|
||||||
|
|
||||||
|
function createRequestFromNodeRequest(
|
||||||
|
req: NodeIncomingMessage,
|
||||||
|
options?: CreateNodeRequestOptions
|
||||||
|
): Request {
|
||||||
const protocol =
|
const protocol =
|
||||||
req.socket instanceof TLSSocket || req.headers['x-forwarded-proto'] === 'https'
|
req.socket instanceof TLSSocket || req.headers['x-forwarded-proto'] === 'https'
|
||||||
? 'https'
|
? 'https'
|
||||||
: 'http';
|
: 'http';
|
||||||
const hostname = req.headers.host || req.headers[':authority'];
|
const hostname = req.headers.host || req.headers[':authority'];
|
||||||
const url = `${protocol}://${hostname}${req.url}`;
|
const url = `${protocol}://${hostname}${req.url}`;
|
||||||
const rawHeaders = req.headers as Record<string, any>;
|
const headers = makeRequestHeaders(req);
|
||||||
const entries = Object.entries(rawHeaders);
|
|
||||||
const method = req.method || 'GET';
|
const method = req.method || 'GET';
|
||||||
|
let bodyProps: BodyProps = {};
|
||||||
|
const bodyAllowed = method !== 'HEAD' && method !== 'GET' && !options?.emptyBody;
|
||||||
|
if (bodyAllowed) {
|
||||||
|
bodyProps = makeRequestBody(req);
|
||||||
|
}
|
||||||
const request = new Request(url, {
|
const request = new Request(url, {
|
||||||
method,
|
method,
|
||||||
headers: new Headers(entries),
|
headers,
|
||||||
body: ['HEAD', 'GET'].includes(method) ? null : body,
|
...bodyProps,
|
||||||
});
|
});
|
||||||
if (req.socket?.remoteAddress) {
|
if (req.socket?.remoteAddress) {
|
||||||
Reflect.set(request, clientAddressSymbol, req.socket.remoteAddress);
|
Reflect.set(request, clientAddressSymbol, req.socket.remoteAddress);
|
||||||
|
@ -30,63 +43,83 @@ function createRequestFromNodeRequest(req: NodeIncomingMessage, body?: Uint8Arra
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function makeRequestHeaders(req: NodeIncomingMessage): Headers {
|
||||||
|
const headers = new Headers();
|
||||||
|
for (const [name, value] of Object.entries(req.headers)) {
|
||||||
|
if (value === undefined) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (Array.isArray(value)) {
|
||||||
|
for (const item of value) {
|
||||||
|
headers.append(name, item);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
headers.append(name, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeRequestBody(req: NodeIncomingMessage): BodyProps {
|
||||||
|
if (req.body !== undefined) {
|
||||||
|
if (typeof req.body === 'string' && req.body.length > 0) {
|
||||||
|
return { body: Buffer.from(req.body) };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof req.body === 'object' && req.body !== null && Object.keys(req.body).length > 0) {
|
||||||
|
return { body: Buffer.from(JSON.stringify(req.body)) };
|
||||||
|
}
|
||||||
|
|
||||||
|
// This covers all async iterables including Readable and ReadableStream.
|
||||||
|
if (
|
||||||
|
typeof req.body === 'object' &&
|
||||||
|
req.body !== null &&
|
||||||
|
typeof (req.body as any)[Symbol.asyncIterator] !== 'undefined'
|
||||||
|
) {
|
||||||
|
return asyncIterableToBodyProps(req.body as AsyncIterable<any>);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return default body.
|
||||||
|
return asyncIterableToBodyProps(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
function asyncIterableToBodyProps(iterable: AsyncIterable<any>): BodyProps {
|
||||||
|
return {
|
||||||
|
// Node uses undici for the Request implementation. Undici accepts
|
||||||
|
// a non-standard async iterable for the body.
|
||||||
|
// @ts-expect-error
|
||||||
|
body: iterable,
|
||||||
|
// The duplex property is required when using a ReadableStream or async
|
||||||
|
// iterable for the body. The type definitions do not include the duplex
|
||||||
|
// property because they are not up-to-date.
|
||||||
|
// @ts-expect-error
|
||||||
|
duplex: 'half',
|
||||||
|
} satisfies BodyProps;
|
||||||
|
}
|
||||||
|
|
||||||
class NodeIncomingMessage extends IncomingMessage {
|
class NodeIncomingMessage extends IncomingMessage {
|
||||||
/**
|
/**
|
||||||
* The read-only body property of the Request interface contains a ReadableStream with the body contents that have been added to the request.
|
* Allow the request body to be explicitly overridden. For example, this
|
||||||
|
* is used by the Express JSON middleware.
|
||||||
*/
|
*/
|
||||||
body?: unknown;
|
body?: unknown;
|
||||||
}
|
}
|
||||||
|
|
||||||
export class NodeApp extends App {
|
export class NodeApp extends App {
|
||||||
match(req: NodeIncomingMessage | Request, opts: MatchOptions = {}) {
|
match(req: NodeIncomingMessage | Request, opts: MatchOptions = {}) {
|
||||||
return super.match(req instanceof Request ? req : createRequestFromNodeRequest(req), opts);
|
if (!(req instanceof Request)) {
|
||||||
|
req = createRequestFromNodeRequest(req, {
|
||||||
|
emptyBody: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return super.match(req, opts);
|
||||||
}
|
}
|
||||||
render(req: NodeIncomingMessage | Request, routeData?: RouteData, locals?: object) {
|
render(req: NodeIncomingMessage | Request, routeData?: RouteData, locals?: object) {
|
||||||
if (typeof req.body === 'string' && req.body.length > 0) {
|
if (!(req instanceof Request)) {
|
||||||
return super.render(
|
req = createRequestFromNodeRequest(req);
|
||||||
req instanceof Request ? req : createRequestFromNodeRequest(req, Buffer.from(req.body)),
|
|
||||||
routeData,
|
|
||||||
locals
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
return super.render(req, routeData, locals);
|
||||||
if (typeof req.body === 'object' && req.body !== null && Object.keys(req.body).length > 0) {
|
|
||||||
return super.render(
|
|
||||||
req instanceof Request
|
|
||||||
? req
|
|
||||||
: createRequestFromNodeRequest(req, Buffer.from(JSON.stringify(req.body))),
|
|
||||||
routeData,
|
|
||||||
locals
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ('on' in req) {
|
|
||||||
let body = Buffer.from([]);
|
|
||||||
let reqBodyComplete = new Promise((resolve, reject) => {
|
|
||||||
req.on('data', (d) => {
|
|
||||||
body = Buffer.concat([body, d]);
|
|
||||||
});
|
|
||||||
req.on('end', () => {
|
|
||||||
resolve(body);
|
|
||||||
});
|
|
||||||
req.on('error', (err) => {
|
|
||||||
reject(err);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
return reqBodyComplete.then(() => {
|
|
||||||
return super.render(
|
|
||||||
req instanceof Request ? req : createRequestFromNodeRequest(req, body),
|
|
||||||
routeData,
|
|
||||||
locals
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return super.render(
|
|
||||||
req instanceof Request ? req : createRequestFromNodeRequest(req),
|
|
||||||
routeData,
|
|
||||||
locals
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@
|
||||||
"chai": "^4.3.7",
|
"chai": "^4.3.7",
|
||||||
"cheerio": "1.0.0-rc.12",
|
"cheerio": "1.0.0-rc.12",
|
||||||
"mocha": "^9.2.2",
|
"mocha": "^9.2.2",
|
||||||
"node-mocks-http": "^1.12.2",
|
"node-mocks-http": "^1.13.0",
|
||||||
"undici": "^5.22.1"
|
"undici": "^5.22.1"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import nodejs from '../dist/index.js';
|
import nodejs from '../dist/index.js';
|
||||||
import { loadFixture, createRequestAndResponse } from './test-utils.js';
|
import { loadFixture, createRequestAndResponse } from './test-utils.js';
|
||||||
import { expect } from 'chai';
|
import { expect } from 'chai';
|
||||||
|
import crypto from 'node:crypto';
|
||||||
|
|
||||||
describe('API routes', () => {
|
describe('API routes', () => {
|
||||||
/** @type {import('./test-utils').Fixture} */
|
/** @type {import('./test-utils').Fixture} */
|
||||||
|
@ -22,9 +23,11 @@ describe('API routes', () => {
|
||||||
url: '/recipes',
|
url: '/recipes',
|
||||||
});
|
});
|
||||||
|
|
||||||
handler(req, res);
|
req.once('async_iterator', () => {
|
||||||
|
|
||||||
req.send(JSON.stringify({ id: 2 }));
|
req.send(JSON.stringify({ id: 2 }));
|
||||||
|
});
|
||||||
|
|
||||||
|
handler(req, res);
|
||||||
|
|
||||||
let [buffer] = await done;
|
let [buffer] = await done;
|
||||||
|
|
||||||
|
@ -43,11 +46,47 @@ describe('API routes', () => {
|
||||||
url: '/binary',
|
url: '/binary',
|
||||||
});
|
});
|
||||||
|
|
||||||
handler(req, res);
|
req.once('async_iterator', () => {
|
||||||
req.send(Buffer.from(new Uint8Array([1, 2, 3, 4, 5])));
|
req.send(Buffer.from(new Uint8Array([1, 2, 3, 4, 5])));
|
||||||
|
});
|
||||||
|
|
||||||
|
handler(req, res);
|
||||||
|
|
||||||
let [out] = await done;
|
let [out] = await done;
|
||||||
let arr = Array.from(new Uint8Array(out.buffer));
|
let arr = Array.from(new Uint8Array(out.buffer));
|
||||||
expect(arr).to.deep.equal([5, 4, 3, 2, 1]);
|
expect(arr).to.deep.equal([5, 4, 3, 2, 1]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('Can post large binary data', async () => {
|
||||||
|
const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs');
|
||||||
|
|
||||||
|
let { req, res, done } = createRequestAndResponse({
|
||||||
|
method: 'POST',
|
||||||
|
url: '/hash',
|
||||||
|
});
|
||||||
|
|
||||||
|
handler(req, res);
|
||||||
|
|
||||||
|
let expectedDigest = null;
|
||||||
|
req.once('async_iterator', () => {
|
||||||
|
// Send 256MB of garbage data in 256KB chunks. This should be fast (< 1sec).
|
||||||
|
let remainingBytes = 256 * 1024 * 1024;
|
||||||
|
const chunkSize = 256 * 1024;
|
||||||
|
|
||||||
|
const hash = crypto.createHash('sha256');
|
||||||
|
while (remainingBytes > 0) {
|
||||||
|
const size = Math.min(remainingBytes, chunkSize);
|
||||||
|
const chunk = Buffer.alloc(size, Math.floor(Math.random() * 256));
|
||||||
|
hash.update(chunk);
|
||||||
|
req.emit('data', chunk);
|
||||||
|
remainingBytes -= size;
|
||||||
|
}
|
||||||
|
|
||||||
|
req.emit('end');
|
||||||
|
expectedDigest = hash.digest();
|
||||||
|
});
|
||||||
|
|
||||||
|
let [out] = await done;
|
||||||
|
expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
16
packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts
vendored
Normal file
16
packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts
vendored
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
import crypto from 'node:crypto';
|
||||||
|
|
||||||
|
export async function post({ request }: { request: Request }) {
|
||||||
|
const hash = crypto.createHash('sha256');
|
||||||
|
|
||||||
|
const iterable = request.body as unknown as AsyncIterable<Uint8Array>;
|
||||||
|
for await (const chunk of iterable) {
|
||||||
|
hash.update(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Response(hash.digest(), {
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'application/octet-stream'
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
|
@ -4584,8 +4584,8 @@ importers:
|
||||||
specifier: ^9.2.2
|
specifier: ^9.2.2
|
||||||
version: 9.2.2
|
version: 9.2.2
|
||||||
node-mocks-http:
|
node-mocks-http:
|
||||||
specifier: ^1.12.2
|
specifier: ^1.13.0
|
||||||
version: 1.12.2
|
version: 1.13.0
|
||||||
undici:
|
undici:
|
||||||
specifier: ^5.22.1
|
specifier: ^5.22.1
|
||||||
version: 5.22.1
|
version: 5.22.1
|
||||||
|
@ -14665,6 +14665,22 @@ packages:
|
||||||
type-is: 1.6.18
|
type-is: 1.6.18
|
||||||
dev: true
|
dev: true
|
||||||
|
|
||||||
|
/node-mocks-http@1.13.0:
|
||||||
|
resolution: {integrity: sha512-lArD6sJMPJ53WF50GX0nJ89B1nkV1TdMvNwq8WXXFrUXF80ujSyye1T30mgiHh4h2It0/svpF3C4kZ2OAONVlg==}
|
||||||
|
engines: {node: '>=14'}
|
||||||
|
dependencies:
|
||||||
|
accepts: 1.3.8
|
||||||
|
content-disposition: 0.5.4
|
||||||
|
depd: 1.1.2
|
||||||
|
fresh: 0.5.2
|
||||||
|
merge-descriptors: 1.0.1
|
||||||
|
methods: 1.1.2
|
||||||
|
mime: 1.6.0
|
||||||
|
parseurl: 1.3.3
|
||||||
|
range-parser: 1.2.1
|
||||||
|
type-is: 1.6.18
|
||||||
|
dev: true
|
||||||
|
|
||||||
/node-releases@2.0.10:
|
/node-releases@2.0.10:
|
||||||
resolution: {integrity: sha512-5GFldHPXVG/YZmFzJvKK2zDSzPKhEp0+ZR5SVaoSag9fsL5YgHbUHDfnG5494ISANDcK4KwPXAx2xqVEydmd7w==}
|
resolution: {integrity: sha512-5GFldHPXVG/YZmFzJvKK2zDSzPKhEp0+ZR5SVaoSag9fsL5YgHbUHDfnG5494ISANDcK4KwPXAx2xqVEydmd7w==}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue