Here’s a story about falling into a rabbit hole. We’ve all been there I’m sure. I had a simple feature to upload a text file and render each line to the DOM. I figure how tough could it be. I attached a change event to an <input type="file"> which grabbed the first file off the .files list. await file.text() and then processed the line.
uploadField.addEventListener('change', async () => { let [file] = uploadField.files; if (!file) return; let text = await file.text(); renderLines(text.split('\n')); });
Easy right? And then someone tried to upload a 10MB file and froze their browser. Ugh. Ok so we check the file size right?
if (file.size > MAX_FILE_SIZE) throw new Error('Yo dawg, that file is way too big!');
Then you realize that the file size is in bytes and what you really want to to limit the number of lines the user can upload. In other words it would be nice if we could scan each line and bail when we reach a limit. Maybe even chomp lines that are too long?
Luckily we have the ability to process data in a stream native in browsers now. By means of the Stream API. This example will have three parts: input (ReadableStream), batch as lines (TransformStream), and render management (WritableStream).
Reading in data
The File API extends from Blob which has a .stream() which returns a ReadableStream that we can use as the start of our pipeline. But this is a binary stream of data. We need to convert it into text data and we can use the TextDecoderStream.
let reader = file .stream() .pipeThrough(new TextDecoderStream());
There is a lot going on here. The TextDecoderStream is what we call a TransformStream and it offers two properties readable and writable which .pipeThrough() uses to pass the data from the reader into the transformer and soon back out again.
Transforming the data
We did a bit of transform converting the input data from binary to text but we still want to process things on a line by line basis. We can do that by means of our first custom stream.
There isn’t much of a convention concerning OOP with streams. The API is more about creating new Streams with hooks passed into the constructor then it is about class inheritance. The confusing bit for me was thinking that I could reimplement the readable and writable properties (or the .getWriter() in the case of WritableStream) thinking if I did I could pass the instance directly to .pipeThrough() (or .pipeTo()) but that didn't work for me and I suspect wasn’t design that way either. Instead I realized that my classes are not actual streams like I would think but wrapper or better managers the expose a stream for use. That is how I came to use the .stream() method convention. I'll lead with an example:
class CustomConsoleWriter { counter = 0; stream() { return new WritableStream({ write: (data) => console.log(this.counter++, data), }); } } file .stream() .pipeThrough(new TextDecoderStream()) .pipeTo(new CustomConsoleWriter().stream());
Would result in printing out to the console as the writer received data from the reader. Notice how we could still have encapsulated instance data while also exposing a stream to pipe to.
There is an alternative using closures which I’ve provided an example of in my Split by line stream implementation.
Anyway―after that divergence into Object Oriented Design and personal style―our line batcher can collect chunks of data as strings from the input. Split them by new lines and spit them one by one back out.
class LineChunks { buffer = ''; flushBuffer(controller) { if (this.buffer === '') return; controller.enqueue(this.buffer); } processChunk(chunk, controller) { for (let char of chunk) { if (char === '\r') continue; if (char === '\n') { controller.enqueue(this.buffer); this.buffer = ''; continue; } buffer += char; } } stream() { return new TransformStream({ transform: (chunk, controller) => this.processChunk(chunk, controller), flush: (controller) => this.flushBuffer(controller), }); } } file .stream() .pipeThrough(new TextDecoderStream()) .pipeThrough(new LineChunks().stream()) .pipeTo(new CustomConsoleWriter().stream());
Writing the data
Now that we have each line being sent to us on a chunk by chunk basis we can make a writer stream that understands how to render each line but also monitor the amount of lines being processed. And later we will add the ability for it to pause each batch long enough for the browser’s main thread to have a chance to do other thing before continuing.
It also offers an opportunity to inspect the line to see if it is too large and if we have processed too many lines so far. And this is where the streaming API really shines. Once an exception criteria is met we can abort the stream and it will propagate along the pipeline till t hits the reader and the reader knows to stop doing File I/O and close the file. That means if the user were to provide a huge file like 100MB and we abort after only reading in a 1MB the other 99MB won’t have been loaded into memory.
First the rendering logic (renderLines(…) from original).
class LineRenderer { buffer = []; constructor({ batchSize = 1 } = {}) { this.batchSize = batchSize; } queueLine(line) { this.buffer.push(line); if (this.buffer.length < batchSize) return; renderLines(this.buffer); this.buffer = []; } flushLines() { if (this.buffer.length === 0) return; renderLines(this.buffer); } stream() { return new WritableStream({ write: (chunk) => this.queueLine(chunk), close: () => this.flushLines(), }); } } file .stream() .pipeThrough(new TextDecoderStream()) .pipeThrough(new LineChunks().stream()) .pipeTo(new LineRenderer({ batchSize: 50 }).stream());
Guarding for exceptions
To keep the LineRenderer focused on rendering we can implement the line size guards as yet another stream that we add to the pipeline.
class LineGuardError extends Error { name = 'LineGuardError'; constructor(lines) { super(`Exceeded max lines (${lines}). File truncated`); } } class LineGuard { lineCount = 0; constructor(columns, lines) { this.columns = columns; this.lines = lines; } processLine(line, controller) { this.lineCount++; if (this.lineCount > this.lines) throw new LineGuardError(this.lines); controller.enqueue(line.slice(0, this.columns)); } stream() { return new TransformStream({ transform: (chunk, controller) => this.processLine(chunk, controller), }); } } file .stream() .pipeThrough(new TextDecoderStream()) .pipeThrough(new LineChunks().stream()) .pipeThrough(new LineGuard(80, 24).stream()) .pipeTo(new LineRenderer({ batchSize: 50 }).stream());
Back pressure
As it is now the pipeline will run to completion without giving any breaks for the browser’s main thread which in the case of a large file and a high enough guard setting it is possible to freeze the browser.
To prevent that we can introduce some back pressure when the pipeline has process each batch.
When a hook like transform / write returns a promise the pipeline will apply some back pressure till the promise resolves.
class LineRenderer { buffer = []; constructor({ batchSize = 1 } = {}) { … } queueLine(line) { this.buffer.push(line); if (this.buffer.length < this.batchSize) return; renderLines(this.buffer); this.buffer = []; return new Promise( resolve => setTimeout(resolve, 100), ); } flushLines() { … } stream() { return new WritableStream({ … }); } }
See it running in the Live DEMO.