// Adapted from https://github.com/canjs/can-ndjson-stream/blob/master/can-ndjson-stream.js // Did not use the package because it had a useless dependency on can-namespace which was unlicensed // The MIT License (MIT) // // Copyright 2017 Justin Meyer (justinbmeyer@gmail.com), Fang Lu // (cc2lufang@gmail.com), Siyao Wu (wusiyao@umich.edu), Shang Jiang // (mrjiangshang@gmail.com) // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. export default function ndjsonStream(stream: ReadableStream) { // For cancellation let is_reader: ReadableStreamDefaultReader | undefined = undefined; let cancellationRequest = false; return new ReadableStream({ start: (controller) => { const reader = stream.getReader(); is_reader = reader; const decoder = new TextDecoder(); let data_buf = ""; reader.read().then(function processResult(result) { if (result.done) { if (cancellationRequest) { // Immediately exit return; } data_buf = data_buf.trim(); if (data_buf.length !== 0) { try { const data_l = JSON.parse(data_buf); controller.enqueue(data_l); } catch (e) { controller.error(e); return; } } controller.close(); return; } const data = decoder.decode(result.value, { stream: true }); data_buf += data; const lines = data_buf.split("\n"); for (let i = 0; i < lines.length - 1; ++i) { const l = lines[i].trim(); if (l.length > 0) { try { const data_line = JSON.parse(l); controller.enqueue(data_line); } catch (e) { controller.error(e); cancellationRequest = true; reader.cancel(); return; } } } data_buf = lines[lines.length - 1]; return reader.read().then(processResult); }); }, cancel: (reason) => { console.log("Cancel registered due to ", reason); cancellationRequest = true; is_reader.cancel(); }, }); }