From 560e45924622141206ff5b47d134cb343d6d2a71 Mon Sep 17 00:00:00 2001 From: hbgl Date: Tue, 15 Aug 2023 16:26:18 +0200 Subject: [PATCH] Stream request body instead of buffering it in memory (#8084) Co-authored-by: Matthew Phillips --- .changeset/lemon-lobsters-do.md | 6 + packages/astro/src/core/app/node.ts | 135 +++++++++++------- packages/integrations/node/package.json | 2 +- .../integrations/node/test/api-route.test.js | 45 +++++- .../test/fixtures/api-route/src/pages/hash.ts | 16 +++ pnpm-lock.yaml | 20 ++- 6 files changed, 167 insertions(+), 57 deletions(-) create mode 100644 .changeset/lemon-lobsters-do.md create mode 100644 packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts diff --git a/.changeset/lemon-lobsters-do.md b/.changeset/lemon-lobsters-do.md new file mode 100644 index 000000000..cfe50300c --- /dev/null +++ b/.changeset/lemon-lobsters-do.md @@ -0,0 +1,6 @@ +--- +'@astrojs/node': patch +'astro': patch +--- + +Stream request body instead of buffering it in memory. diff --git a/packages/astro/src/core/app/node.ts b/packages/astro/src/core/app/node.ts index 2cfc686a2..4ae6e98a9 100644 --- a/packages/astro/src/core/app/node.ts +++ b/packages/astro/src/core/app/node.ts @@ -9,20 +9,33 @@ import { App, type MatchOptions } from './index.js'; const clientAddressSymbol = Symbol.for('astro.clientAddress'); -function createRequestFromNodeRequest(req: NodeIncomingMessage, body?: Uint8Array): Request { +type CreateNodeRequestOptions = { + emptyBody?: boolean; +}; + +type BodyProps = Partial; + +function createRequestFromNodeRequest( + req: NodeIncomingMessage, + options?: CreateNodeRequestOptions +): Request { const protocol = req.socket instanceof TLSSocket || req.headers['x-forwarded-proto'] === 'https' ? 'https' : 'http'; const hostname = req.headers.host || req.headers[':authority']; const url = `${protocol}://${hostname}${req.url}`; - const rawHeaders = req.headers as Record; - const entries = Object.entries(rawHeaders); + const headers = makeRequestHeaders(req); const method = req.method || 'GET'; + let bodyProps: BodyProps = {}; + const bodyAllowed = method !== 'HEAD' && method !== 'GET' && !options?.emptyBody; + if (bodyAllowed) { + bodyProps = makeRequestBody(req); + } const request = new Request(url, { method, - headers: new Headers(entries), - body: ['HEAD', 'GET'].includes(method) ? null : body, + headers, + ...bodyProps, }); if (req.socket?.remoteAddress) { Reflect.set(request, clientAddressSymbol, req.socket.remoteAddress); @@ -30,63 +43,83 @@ function createRequestFromNodeRequest(req: NodeIncomingMessage, body?: Uint8Arra return request; } +function makeRequestHeaders(req: NodeIncomingMessage): Headers { + const headers = new Headers(); + for (const [name, value] of Object.entries(req.headers)) { + if (value === undefined) { + continue; + } + if (Array.isArray(value)) { + for (const item of value) { + headers.append(name, item); + } + } else { + headers.append(name, value); + } + } + return headers; +} + +function makeRequestBody(req: NodeIncomingMessage): BodyProps { + if (req.body !== undefined) { + if (typeof req.body === 'string' && req.body.length > 0) { + return { body: Buffer.from(req.body) }; + } + + if (typeof req.body === 'object' && req.body !== null && Object.keys(req.body).length > 0) { + return { body: Buffer.from(JSON.stringify(req.body)) }; + } + + // This covers all async iterables including Readable and ReadableStream. + if ( + typeof req.body === 'object' && + req.body !== null && + typeof (req.body as any)[Symbol.asyncIterator] !== 'undefined' + ) { + return asyncIterableToBodyProps(req.body as AsyncIterable); + } + } + + // Return default body. + return asyncIterableToBodyProps(req); +} + +function asyncIterableToBodyProps(iterable: AsyncIterable): BodyProps { + return { + // Node uses undici for the Request implementation. Undici accepts + // a non-standard async iterable for the body. + // @ts-expect-error + body: iterable, + // The duplex property is required when using a ReadableStream or async + // iterable for the body. The type definitions do not include the duplex + // property because they are not up-to-date. + // @ts-expect-error + duplex: 'half', + } satisfies BodyProps; +} + class NodeIncomingMessage extends IncomingMessage { /** - * The read-only body property of the Request interface contains a ReadableStream with the body contents that have been added to the request. + * Allow the request body to be explicitly overridden. For example, this + * is used by the Express JSON middleware. */ body?: unknown; } export class NodeApp extends App { match(req: NodeIncomingMessage | Request, opts: MatchOptions = {}) { - return super.match(req instanceof Request ? req : createRequestFromNodeRequest(req), opts); + if (!(req instanceof Request)) { + req = createRequestFromNodeRequest(req, { + emptyBody: true, + }); + } + return super.match(req, opts); } render(req: NodeIncomingMessage | Request, routeData?: RouteData, locals?: object) { - if (typeof req.body === 'string' && req.body.length > 0) { - return super.render( - req instanceof Request ? req : createRequestFromNodeRequest(req, Buffer.from(req.body)), - routeData, - locals - ); + if (!(req instanceof Request)) { + req = createRequestFromNodeRequest(req); } - - if (typeof req.body === 'object' && req.body !== null && Object.keys(req.body).length > 0) { - return super.render( - req instanceof Request - ? req - : createRequestFromNodeRequest(req, Buffer.from(JSON.stringify(req.body))), - routeData, - locals - ); - } - - if ('on' in req) { - let body = Buffer.from([]); - let reqBodyComplete = new Promise((resolve, reject) => { - req.on('data', (d) => { - body = Buffer.concat([body, d]); - }); - req.on('end', () => { - resolve(body); - }); - req.on('error', (err) => { - reject(err); - }); - }); - - return reqBodyComplete.then(() => { - return super.render( - req instanceof Request ? req : createRequestFromNodeRequest(req, body), - routeData, - locals - ); - }); - } - return super.render( - req instanceof Request ? req : createRequestFromNodeRequest(req), - routeData, - locals - ); + return super.render(req, routeData, locals); } } diff --git a/packages/integrations/node/package.json b/packages/integrations/node/package.json index 988246f10..0d7689298 100644 --- a/packages/integrations/node/package.json +++ b/packages/integrations/node/package.json @@ -49,7 +49,7 @@ "chai": "^4.3.7", "cheerio": "1.0.0-rc.12", "mocha": "^9.2.2", - "node-mocks-http": "^1.12.2", + "node-mocks-http": "^1.13.0", "undici": "^5.22.1" } } diff --git a/packages/integrations/node/test/api-route.test.js b/packages/integrations/node/test/api-route.test.js index 7fbd95776..c830eee2d 100644 --- a/packages/integrations/node/test/api-route.test.js +++ b/packages/integrations/node/test/api-route.test.js @@ -1,6 +1,7 @@ import nodejs from '../dist/index.js'; import { loadFixture, createRequestAndResponse } from './test-utils.js'; import { expect } from 'chai'; +import crypto from 'node:crypto'; describe('API routes', () => { /** @type {import('./test-utils').Fixture} */ @@ -22,9 +23,11 @@ describe('API routes', () => { url: '/recipes', }); - handler(req, res); + req.once('async_iterator', () => { + req.send(JSON.stringify({ id: 2 })); + }); - req.send(JSON.stringify({ id: 2 })); + handler(req, res); let [buffer] = await done; @@ -43,11 +46,47 @@ describe('API routes', () => { url: '/binary', }); + req.once('async_iterator', () => { + req.send(Buffer.from(new Uint8Array([1, 2, 3, 4, 5]))); + }); + handler(req, res); - req.send(Buffer.from(new Uint8Array([1, 2, 3, 4, 5]))); let [out] = await done; let arr = Array.from(new Uint8Array(out.buffer)); expect(arr).to.deep.equal([5, 4, 3, 2, 1]); }); + + it('Can post large binary data', async () => { + const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs'); + + let { req, res, done } = createRequestAndResponse({ + method: 'POST', + url: '/hash', + }); + + handler(req, res); + + let expectedDigest = null; + req.once('async_iterator', () => { + // Send 256MB of garbage data in 256KB chunks. This should be fast (< 1sec). + let remainingBytes = 256 * 1024 * 1024; + const chunkSize = 256 * 1024; + + const hash = crypto.createHash('sha256'); + while (remainingBytes > 0) { + const size = Math.min(remainingBytes, chunkSize); + const chunk = Buffer.alloc(size, Math.floor(Math.random() * 256)); + hash.update(chunk); + req.emit('data', chunk); + remainingBytes -= size; + } + + req.emit('end'); + expectedDigest = hash.digest(); + }); + + let [out] = await done; + expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest); + }); }); diff --git a/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts b/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts new file mode 100644 index 000000000..fbf44c547 --- /dev/null +++ b/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts @@ -0,0 +1,16 @@ +import crypto from 'node:crypto'; + +export async function post({ request }: { request: Request }) { + const hash = crypto.createHash('sha256'); + + const iterable = request.body as unknown as AsyncIterable; + for await (const chunk of iterable) { + hash.update(chunk); + } + + return new Response(hash.digest(), { + headers: { + 'Content-Type': 'application/octet-stream' + } + }); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0f43ca59c..e564de45f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -4584,8 +4584,8 @@ importers: specifier: ^9.2.2 version: 9.2.2 node-mocks-http: - specifier: ^1.12.2 - version: 1.12.2 + specifier: ^1.13.0 + version: 1.13.0 undici: specifier: ^5.22.1 version: 5.22.1 @@ -14665,6 +14665,22 @@ packages: type-is: 1.6.18 dev: true + /node-mocks-http@1.13.0: + resolution: {integrity: sha512-lArD6sJMPJ53WF50GX0nJ89B1nkV1TdMvNwq8WXXFrUXF80ujSyye1T30mgiHh4h2It0/svpF3C4kZ2OAONVlg==} + engines: {node: '>=14'} + dependencies: + accepts: 1.3.8 + content-disposition: 0.5.4 + depd: 1.1.2 + fresh: 0.5.2 + merge-descriptors: 1.0.1 + methods: 1.1.2 + mime: 1.6.0 + parseurl: 1.3.3 + range-parser: 1.2.1 + type-is: 1.6.18 + dev: true + /node-releases@2.0.10: resolution: {integrity: sha512-5GFldHPXVG/YZmFzJvKK2zDSzPKhEp0+ZR5SVaoSag9fsL5YgHbUHDfnG5494ISANDcK4KwPXAx2xqVEydmd7w==}