You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
614 lines
21 KiB
614 lines
21 KiB
3 years ago
|
# minipass
|
||
|
|
||
|
A _very_ minimal implementation of a [PassThrough
|
||
|
stream](https://nodejs.org/api/stream.html#stream_class_stream_passthrough)
|
||
|
|
||
|
[It's very
|
||
|
fast](https://docs.google.com/spreadsheets/d/1oObKSrVwLX_7Ut4Z6g3fZW-AX1j1-k6w-cDsrkaSbHM/edit#gid=0)
|
||
|
for objects, strings, and buffers.
|
||
|
|
||
|
Supports `pipe()`ing (including multi-`pipe()` and backpressure transmission),
|
||
|
buffering data until either a `data` event handler or `pipe()` is added (so
|
||
|
you don't lose the first chunk), and most other cases where PassThrough is
|
||
|
a good idea.
|
||
|
|
||
|
There is a `read()` method, but it's much more efficient to consume data
|
||
|
from this stream via `'data'` events or by calling `pipe()` into some other
|
||
|
stream. Calling `read()` requires the buffer to be flattened in some
|
||
|
cases, which requires copying memory.
|
||
|
|
||
|
There is also no `unpipe()` method. Once you start piping, there is no
|
||
|
stopping it!
|
||
|
|
||
|
If you set `objectMode: true` in the options, then whatever is written will
|
||
|
be emitted. Otherwise, it'll do a minimal amount of Buffer copying to
|
||
|
ensure proper Streams semantics when `read(n)` is called.
|
||
|
|
||
|
`objectMode` can also be set by doing `stream.objectMode = true`, or by
|
||
|
writing any non-string/non-buffer data. `objectMode` cannot be set to
|
||
|
false once it is set.
|
||
|
|
||
|
This is not a `through` or `through2` stream. It doesn't transform the
|
||
|
data, it just passes it right through. If you want to transform the data,
|
||
|
extend the class, and override the `write()` method. Once you're done
|
||
|
transforming the data however you want, call `super.write()` with the
|
||
|
transform output.
|
||
|
|
||
|
For some examples of streams that extend Minipass in various ways, check
|
||
|
out:
|
||
|
|
||
|
- [minizlib](http://npm.im/minizlib)
|
||
|
- [fs-minipass](http://npm.im/fs-minipass)
|
||
|
- [tar](http://npm.im/tar)
|
||
|
- [minipass-collect](http://npm.im/minipass-collect)
|
||
|
- [minipass-flush](http://npm.im/minipass-flush)
|
||
|
- [minipass-pipeline](http://npm.im/minipass-pipeline)
|
||
|
- [tap](http://npm.im/tap)
|
||
|
- [tap-parser](http://npm.im/tap-parser)
|
||
|
- [treport](http://npm.im/treport)
|
||
|
- [minipass-fetch](http://npm.im/minipass-fetch)
|
||
|
- [pacote](http://npm.im/pacote)
|
||
|
- [make-fetch-happen](http://npm.im/make-fetch-happen)
|
||
|
- [cacache](http://npm.im/cacache)
|
||
|
- [ssri](http://npm.im/ssri)
|
||
|
- [npm-registry-fetch](http://npm.im/npm-registry-fetch)
|
||
|
- [minipass-json-stream](http://npm.im/minipass-json-stream)
|
||
|
- [minipass-sized](http://npm.im/minipass-sized)
|
||
|
|
||
|
## Differences from Node.js Streams
|
||
|
|
||
|
There are several things that make Minipass streams different from (and in
|
||
|
some ways superior to) Node.js core streams.
|
||
|
|
||
|
Please read these caveats if you are familiar with node-core streams and
|
||
|
intend to use Minipass streams in your programs.
|
||
|
|
||
|
### Timing
|
||
|
|
||
|
Minipass streams are designed to support synchronous use-cases. Thus, data
|
||
|
is emitted as soon as it is available, always. It is buffered until read,
|
||
|
but no longer. Another way to look at it is that Minipass streams are
|
||
|
exactly as synchronous as the logic that writes into them.
|
||
|
|
||
|
This can be surprising if your code relies on `PassThrough.write()` always
|
||
|
providing data on the next tick rather than the current one, or being able
|
||
|
to call `resume()` and not have the entire buffer disappear immediately.
|
||
|
|
||
|
However, without this synchronicity guarantee, there would be no way for
|
||
|
Minipass to achieve the speeds it does, or support the synchronous use
|
||
|
cases that it does. Simply put, waiting takes time.
|
||
|
|
||
|
This non-deferring approach makes Minipass streams much easier to reason
|
||
|
about, especially in the context of Promises and other flow-control
|
||
|
mechanisms.
|
||
|
|
||
|
### No High/Low Water Marks
|
||
|
|
||
|
Node.js core streams will optimistically fill up a buffer, returning `true`
|
||
|
on all writes until the limit is hit, even if the data has nowhere to go.
|
||
|
Then, they will not attempt to draw more data in until the buffer size dips
|
||
|
below a minimum value.
|
||
|
|
||
|
Minipass streams are much simpler. The `write()` method will return `true`
|
||
|
if the data has somewhere to go (which is to say, given the timing
|
||
|
guarantees, that the data is already there by the time `write()` returns).
|
||
|
|
||
|
If the data has nowhere to go, then `write()` returns false, and the data
|
||
|
sits in a buffer, to be drained out immediately as soon as anyone consumes
|
||
|
it.
|
||
|
|
||
|
### Hazards of Buffering (or: Why Minipass Is So Fast)
|
||
|
|
||
|
Since data written to a Minipass stream is immediately written all the way
|
||
|
through the pipeline, and `write()` always returns true/false based on
|
||
|
whether the data was fully flushed, backpressure is communicated
|
||
|
immediately to the upstream caller. This minimizes buffering.
|
||
|
|
||
|
Consider this case:
|
||
|
|
||
|
```js
|
||
|
const {PassThrough} = require('stream')
|
||
|
const p1 = new PassThrough({ highWaterMark: 1024 })
|
||
|
const p2 = new PassThrough({ highWaterMark: 1024 })
|
||
|
const p3 = new PassThrough({ highWaterMark: 1024 })
|
||
|
const p4 = new PassThrough({ highWaterMark: 1024 })
|
||
|
|
||
|
p1.pipe(p2).pipe(p3).pipe(p4)
|
||
|
p4.on('data', () => console.log('made it through'))
|
||
|
|
||
|
// this returns false and buffers, then writes to p2 on next tick (1)
|
||
|
// p2 returns false and buffers, pausing p1, then writes to p3 on next tick (2)
|
||
|
// p3 returns false and buffers, pausing p2, then writes to p4 on next tick (3)
|
||
|
// p4 returns false and buffers, pausing p3, then emits 'data' and 'drain'
|
||
|
// on next tick (4)
|
||
|
// p3 sees p4's 'drain' event, and calls resume(), emitting 'resume' and
|
||
|
// 'drain' on next tick (5)
|
||
|
// p2 sees p3's 'drain', calls resume(), emits 'resume' and 'drain' on next tick (6)
|
||
|
// p1 sees p2's 'drain', calls resume(), emits 'resume' and 'drain' on next
|
||
|
// tick (7)
|
||
|
|
||
|
p1.write(Buffer.alloc(2048)) // returns false
|
||
|
```
|
||
|
|
||
|
Along the way, the data was buffered and deferred at each stage, and
|
||
|
multiple event deferrals happened, for an unblocked pipeline where it was
|
||
|
perfectly safe to write all the way through!
|
||
|
|
||
|
Furthermore, setting a `highWaterMark` of `1024` might lead someone reading
|
||
|
the code to think an advisory maximum of 1KiB is being set for the
|
||
|
pipeline. However, the actual advisory buffering level is the _sum_ of
|
||
|
`highWaterMark` values, since each one has its own bucket.
|
||
|
|
||
|
Consider the Minipass case:
|
||
|
|
||
|
```js
|
||
|
const m1 = new Minipass()
|
||
|
const m2 = new Minipass()
|
||
|
const m3 = new Minipass()
|
||
|
const m4 = new Minipass()
|
||
|
|
||
|
m1.pipe(m2).pipe(m3).pipe(m4)
|
||
|
m4.on('data', () => console.log('made it through'))
|
||
|
|
||
|
// m1 is flowing, so it writes the data to m2 immediately
|
||
|
// m2 is flowing, so it writes the data to m3 immediately
|
||
|
// m3 is flowing, so it writes the data to m4 immediately
|
||
|
// m4 is flowing, so it fires the 'data' event immediately, returns true
|
||
|
// m4's write returned true, so m3 is still flowing, returns true
|
||
|
// m3's write returned true, so m2 is still flowing, returns true
|
||
|
// m2's write returned true, so m1 is still flowing, returns true
|
||
|
// No event deferrals or buffering along the way!
|
||
|
|
||
|
m1.write(Buffer.alloc(2048)) // returns true
|
||
|
```
|
||
|
|
||
|
It is extremely unlikely that you _don't_ want to buffer any data written,
|
||
|
or _ever_ buffer data that can be flushed all the way through. Neither
|
||
|
node-core streams nor Minipass ever fail to buffer written data, but
|
||
|
node-core streams do a lot of unnecessary buffering and pausing.
|
||
|
|
||
|
As always, the faster implementation is the one that does less stuff and
|
||
|
waits less time to do it.
|
||
|
|
||
|
### Immediately emit `end` for empty streams (when not paused)
|
||
|
|
||
|
If a stream is not paused, and `end()` is called before writing any data
|
||
|
into it, then it will emit `end` immediately.
|
||
|
|
||
|
If you have logic that occurs on the `end` event which you don't want to
|
||
|
potentially happen immediately (for example, closing file descriptors,
|
||
|
moving on to the next entry in an archive parse stream, etc.) then be sure
|
||
|
to call `stream.pause()` on creation, and then `stream.resume()` once you
|
||
|
are ready to respond to the `end` event.
|
||
|
|
||
|
### Emit `end` When Asked
|
||
|
|
||
|
One hazard of immediately emitting `'end'` is that you may not yet have had
|
||
|
a chance to add a listener. In order to avoid this hazard, Minipass
|
||
|
streams safely re-emit the `'end'` event if a new listener is added after
|
||
|
`'end'` has been emitted.
|
||
|
|
||
|
Ie, if you do `stream.on('end', someFunction)`, and the stream has already
|
||
|
emitted `end`, then it will call the handler right away. (You can think of
|
||
|
this somewhat like attaching a new `.then(fn)` to a previously-resolved
|
||
|
Promise.)
|
||
|
|
||
|
To prevent calling handlers multiple times who would not expect multiple
|
||
|
ends to occur, all listeners are removed from the `'end'` event whenever it
|
||
|
is emitted.
|
||
|
|
||
|
### Impact of "immediate flow" on Tee-streams
|
||
|
|
||
|
A "tee stream" is a stream piping to multiple destinations:
|
||
|
|
||
|
```js
|
||
|
const tee = new Minipass()
|
||
|
t.pipe(dest1)
|
||
|
t.pipe(dest2)
|
||
|
t.write('foo') // goes to both destinations
|
||
|
```
|
||
|
|
||
|
Since Minipass streams _immediately_ process any pending data through the
|
||
|
pipeline when a new pipe destination is added, this can have surprising
|
||
|
effects, especially when a stream comes in from some other function and may
|
||
|
or may not have data in its buffer.
|
||
|
|
||
|
```js
|
||
|
// WARNING! WILL LOSE DATA!
|
||
|
const src = new Minipass()
|
||
|
src.write('foo')
|
||
|
src.pipe(dest1) // 'foo' chunk flows to dest1 immediately, and is gone
|
||
|
src.pipe(dest2) // gets nothing!
|
||
|
```
|
||
|
|
||
|
The solution is to create a dedicated tee-stream junction that pipes to
|
||
|
both locations, and then pipe to _that_ instead.
|
||
|
|
||
|
```js
|
||
|
// Safe example: tee to both places
|
||
|
const src = new Minipass()
|
||
|
src.write('foo')
|
||
|
const tee = new Minipass()
|
||
|
tee.pipe(dest1)
|
||
|
tee.pipe(dest2)
|
||
|
src.pipe(tee) // tee gets 'foo', pipes to both locations
|
||
|
```
|
||
|
|
||
|
The same caveat applies to `on('data')` event listeners. The first one
|
||
|
added will _immediately_ receive all of the data, leaving nothing for the
|
||
|
second:
|
||
|
|
||
|
```js
|
||
|
// WARNING! WILL LOSE DATA!
|
||
|
const src = new Minipass()
|
||
|
src.write('foo')
|
||
|
src.on('data', handler1) // receives 'foo' right away
|
||
|
src.on('data', handler2) // nothing to see here!
|
||
|
```
|
||
|
|
||
|
Using a dedicated tee-stream can be used in this case as well:
|
||
|
|
||
|
```js
|
||
|
// Safe example: tee to both data handlers
|
||
|
const src = new Minipass()
|
||
|
src.write('foo')
|
||
|
const tee = new Minipass()
|
||
|
tee.on('data', handler1)
|
||
|
tee.on('data', handler2)
|
||
|
src.pipe(tee)
|
||
|
```
|
||
|
|
||
|
## USAGE
|
||
|
|
||
|
It's a stream! Use it like a stream and it'll most likely do what you
|
||
|
want.
|
||
|
|
||
|
```js
|
||
|
const Minipass = require('minipass')
|
||
|
const mp = new Minipass(options) // optional: { encoding, objectMode }
|
||
|
mp.write('foo')
|
||
|
mp.pipe(someOtherStream)
|
||
|
mp.end('bar')
|
||
|
```
|
||
|
|
||
|
### OPTIONS
|
||
|
|
||
|
* `encoding` How would you like the data coming _out_ of the stream to be
|
||
|
encoded? Accepts any values that can be passed to `Buffer.toString()`.
|
||
|
* `objectMode` Emit data exactly as it comes in. This will be flipped on
|
||
|
by default if you write() something other than a string or Buffer at any
|
||
|
point. Setting `objectMode: true` will prevent setting any encoding
|
||
|
value.
|
||
|
|
||
|
### API
|
||
|
|
||
|
Implements the user-facing portions of Node.js's `Readable` and `Writable`
|
||
|
streams.
|
||
|
|
||
|
### Methods
|
||
|
|
||
|
* `write(chunk, [encoding], [callback])` - Put data in. (Note that, in the
|
||
|
base Minipass class, the same data will come out.) Returns `false` if
|
||
|
the stream will buffer the next write, or true if it's still in "flowing"
|
||
|
mode.
|
||
|
* `end([chunk, [encoding]], [callback])` - Signal that you have no more
|
||
|
data to write. This will queue an `end` event to be fired when all the
|
||
|
data has been consumed.
|
||
|
* `setEncoding(encoding)` - Set the encoding for data coming of the stream.
|
||
|
This can only be done once.
|
||
|
* `pause()` - No more data for a while, please. This also prevents `end`
|
||
|
from being emitted for empty streams until the stream is resumed.
|
||
|
* `resume()` - Resume the stream. If there's data in the buffer, it is all
|
||
|
discarded. Any buffered events are immediately emitted.
|
||
|
* `pipe(dest)` - Send all output to the stream provided. There is no way
|
||
|
to unpipe. When data is emitted, it is immediately written to any and
|
||
|
all pipe destinations.
|
||
|
* `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are EventEmitters. Some
|
||
|
events are given special treatment, however. (See below under "events".)
|
||
|
* `promise()` - Returns a Promise that resolves when the stream emits
|
||
|
`end`, or rejects if the stream emits `error`.
|
||
|
* `collect()` - Return a Promise that resolves on `end` with an array
|
||
|
containing each chunk of data that was emitted, or rejects if the stream
|
||
|
emits `error`. Note that this consumes the stream data.
|
||
|
* `concat()` - Same as `collect()`, but concatenates the data into a single
|
||
|
Buffer object. Will reject the returned promise if the stream is in
|
||
|
objectMode, or if it goes into objectMode by the end of the data.
|
||
|
* `read(n)` - Consume `n` bytes of data out of the buffer. If `n` is not
|
||
|
provided, then consume all of it. If `n` bytes are not available, then
|
||
|
it returns null. **Note** consuming streams in this way is less
|
||
|
efficient, and can lead to unnecessary Buffer copying.
|
||
|
* `destroy([er])` - Destroy the stream. If an error is provided, then an
|
||
|
`'error'` event is emitted. If the stream has a `close()` method, and
|
||
|
has not emitted a `'close'` event yet, then `stream.close()` will be
|
||
|
called. Any Promises returned by `.promise()`, `.collect()` or
|
||
|
`.concat()` will be rejected. After being destroyed, writing to the
|
||
|
stream will emit an error. No more data will be emitted if the stream is
|
||
|
destroyed, even if it was previously buffered.
|
||
|
|
||
|
### Properties
|
||
|
|
||
|
* `bufferLength` Read-only. Total number of bytes buffered, or in the case
|
||
|
of objectMode, the total number of objects.
|
||
|
* `encoding` The encoding that has been set. (Setting this is equivalent
|
||
|
to calling `setEncoding(enc)` and has the same prohibition against
|
||
|
setting multiple times.)
|
||
|
* `flowing` Read-only. Boolean indicating whether a chunk written to the
|
||
|
stream will be immediately emitted.
|
||
|
* `emittedEnd` Read-only. Boolean indicating whether the end-ish events
|
||
|
(ie, `end`, `prefinish`, `finish`) have been emitted. Note that
|
||
|
listening on any end-ish event will immediateyl re-emit it if it has
|
||
|
already been emitted.
|
||
|
* `writable` Whether the stream is writable. Default `true`. Set to
|
||
|
`false` when `end()`
|
||
|
* `readable` Whether the stream is readable. Default `true`.
|
||
|
* `buffer` A [yallist](http://npm.im/yallist) linked list of chunks written
|
||
|
to the stream that have not yet been emitted. (It's probably a bad idea
|
||
|
to mess with this.)
|
||
|
* `pipes` A [yallist](http://npm.im/yallist) linked list of streams that
|
||
|
this stream is piping into. (It's probably a bad idea to mess with
|
||
|
this.)
|
||
|
* `destroyed` A getter that indicates whether the stream was destroyed.
|
||
|
* `paused` True if the stream has been explicitly paused, otherwise false.
|
||
|
* `objectMode` Indicates whether the stream is in `objectMode`. Once set
|
||
|
to `true`, it cannot be set to `false`.
|
||
|
|
||
|
### Events
|
||
|
|
||
|
* `data` Emitted when there's data to read. Argument is the data to read.
|
||
|
This is never emitted while not flowing. If a listener is attached, that
|
||
|
will resume the stream.
|
||
|
* `end` Emitted when there's no more data to read. This will be emitted
|
||
|
immediately for empty streams when `end()` is called. If a listener is
|
||
|
attached, and `end` was already emitted, then it will be emitted again.
|
||
|
All listeners are removed when `end` is emitted.
|
||
|
* `prefinish` An end-ish event that follows the same logic as `end` and is
|
||
|
emitted in the same conditions where `end` is emitted. Emitted after
|
||
|
`'end'`.
|
||
|
* `finish` An end-ish event that follows the same logic as `end` and is
|
||
|
emitted in the same conditions where `end` is emitted. Emitted after
|
||
|
`'prefinish'`.
|
||
|
* `close` An indication that an underlying resource has been released.
|
||
|
Minipass does not emit this event, but will defer it until after `end`
|
||
|
has been emitted, since it throws off some stream libraries otherwise.
|
||
|
* `drain` Emitted when the internal buffer empties, and it is again
|
||
|
suitable to `write()` into the stream.
|
||
|
* `readable` Emitted when data is buffered and ready to be read by a
|
||
|
consumer.
|
||
|
* `resume` Emitted when stream changes state from buffering to flowing
|
||
|
mode. (Ie, when `resume` is called, `pipe` is called, or a `data` event
|
||
|
listener is added.)
|
||
|
|
||
|
### Static Methods
|
||
|
|
||
|
* `Minipass.isStream(stream)` Returns `true` if the argument is a stream,
|
||
|
and false otherwise. To be considered a stream, the object must be
|
||
|
either an instance of Minipass, or an EventEmitter that has either a
|
||
|
`pipe()` method, or both `write()` and `end()` methods. (Pretty much any
|
||
|
stream in node-land will return `true` for this.)
|
||
|
|
||
|
## EXAMPLES
|
||
|
|
||
|
Here are some examples of things you can do with Minipass streams.
|
||
|
|
||
|
### simple "are you done yet" promise
|
||
|
|
||
|
```js
|
||
|
mp.promise().then(() => {
|
||
|
// stream is finished
|
||
|
}, er => {
|
||
|
// stream emitted an error
|
||
|
})
|
||
|
```
|
||
|
|
||
|
### collecting
|
||
|
|
||
|
```js
|
||
|
mp.collect().then(all => {
|
||
|
// all is an array of all the data emitted
|
||
|
// encoding is supported in this case, so
|
||
|
// so the result will be a collection of strings if
|
||
|
// an encoding is specified, or buffers/objects if not.
|
||
|
//
|
||
|
// In an async function, you may do
|
||
|
// const data = await stream.collect()
|
||
|
})
|
||
|
```
|
||
|
|
||
|
### collecting into a single blob
|
||
|
|
||
|
This is a bit slower because it concatenates the data into one chunk for
|
||
|
you, but if you're going to do it yourself anyway, it's convenient this
|
||
|
way:
|
||
|
|
||
|
```js
|
||
|
mp.concat().then(onebigchunk => {
|
||
|
// onebigchunk is a string if the stream
|
||
|
// had an encoding set, or a buffer otherwise.
|
||
|
})
|
||
|
```
|
||
|
|
||
|
### iteration
|
||
|
|
||
|
You can iterate over streams synchronously or asynchronously in platforms
|
||
|
that support it.
|
||
|
|
||
|
Synchronous iteration will end when the currently available data is
|
||
|
consumed, even if the `end` event has not been reached. In string and
|
||
|
buffer mode, the data is concatenated, so unless multiple writes are
|
||
|
occurring in the same tick as the `read()`, sync iteration loops will
|
||
|
generally only have a single iteration.
|
||
|
|
||
|
To consume chunks in this way exactly as they have been written, with no
|
||
|
flattening, create the stream with the `{ objectMode: true }` option.
|
||
|
|
||
|
```js
|
||
|
const mp = new Minipass({ objectMode: true })
|
||
|
mp.write('a')
|
||
|
mp.write('b')
|
||
|
for (let letter of mp) {
|
||
|
console.log(letter) // a, b
|
||
|
}
|
||
|
mp.write('c')
|
||
|
mp.write('d')
|
||
|
for (let letter of mp) {
|
||
|
console.log(letter) // c, d
|
||
|
}
|
||
|
mp.write('e')
|
||
|
mp.end()
|
||
|
for (let letter of mp) {
|
||
|
console.log(letter) // e
|
||
|
}
|
||
|
for (let letter of mp) {
|
||
|
console.log(letter) // nothing
|
||
|
}
|
||
|
```
|
||
|
|
||
|
Asynchronous iteration will continue until the end event is reached,
|
||
|
consuming all of the data.
|
||
|
|
||
|
```js
|
||
|
const mp = new Minipass({ encoding: 'utf8' })
|
||
|
|
||
|
// some source of some data
|
||
|
let i = 5
|
||
|
const inter = setInterval(() => {
|
||
|
if (i-- > 0)
|
||
|
mp.write(Buffer.from('foo\n', 'utf8'))
|
||
|
else {
|
||
|
mp.end()
|
||
|
clearInterval(inter)
|
||
|
}
|
||
|
}, 100)
|
||
|
|
||
|
// consume the data with asynchronous iteration
|
||
|
async function consume () {
|
||
|
for await (let chunk of mp) {
|
||
|
console.log(chunk)
|
||
|
}
|
||
|
return 'ok'
|
||
|
}
|
||
|
|
||
|
consume().then(res => console.log(res))
|
||
|
// logs `foo\n` 5 times, and then `ok`
|
||
|
```
|
||
|
|
||
|
### subclass that `console.log()`s everything written into it
|
||
|
|
||
|
```js
|
||
|
class Logger extends Minipass {
|
||
|
write (chunk, encoding, callback) {
|
||
|
console.log('WRITE', chunk, encoding)
|
||
|
return super.write(chunk, encoding, callback)
|
||
|
}
|
||
|
end (chunk, encoding, callback) {
|
||
|
console.log('END', chunk, encoding)
|
||
|
return super.end(chunk, encoding, callback)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
someSource.pipe(new Logger()).pipe(someDest)
|
||
|
```
|
||
|
|
||
|
### same thing, but using an inline anonymous class
|
||
|
|
||
|
```js
|
||
|
// js classes are fun
|
||
|
someSource
|
||
|
.pipe(new (class extends Minipass {
|
||
|
emit (ev, ...data) {
|
||
|
// let's also log events, because debugging some weird thing
|
||
|
console.log('EMIT', ev)
|
||
|
return super.emit(ev, ...data)
|
||
|
}
|
||
|
write (chunk, encoding, callback) {
|
||
|
console.log('WRITE', chunk, encoding)
|
||
|
return super.write(chunk, encoding, callback)
|
||
|
}
|
||
|
end (chunk, encoding, callback) {
|
||
|
console.log('END', chunk, encoding)
|
||
|
return super.end(chunk, encoding, callback)
|
||
|
}
|
||
|
}))
|
||
|
.pipe(someDest)
|
||
|
```
|
||
|
|
||
|
### subclass that defers 'end' for some reason
|
||
|
|
||
|
```js
|
||
|
class SlowEnd extends Minipass {
|
||
|
emit (ev, ...args) {
|
||
|
if (ev === 'end') {
|
||
|
console.log('going to end, hold on a sec')
|
||
|
setTimeout(() => {
|
||
|
console.log('ok, ready to end now')
|
||
|
super.emit('end', ...args)
|
||
|
}, 100)
|
||
|
} else {
|
||
|
return super.emit(ev, ...args)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
```
|
||
|
|
||
|
### transform that creates newline-delimited JSON
|
||
|
|
||
|
```js
|
||
|
class NDJSONEncode extends Minipass {
|
||
|
write (obj, cb) {
|
||
|
try {
|
||
|
// JSON.stringify can throw, emit an error on that
|
||
|
return super.write(JSON.stringify(obj) + '\n', 'utf8', cb)
|
||
|
} catch (er) {
|
||
|
this.emit('error', er)
|
||
|
}
|
||
|
}
|
||
|
end (obj, cb) {
|
||
|
if (typeof obj === 'function') {
|
||
|
cb = obj
|
||
|
obj = undefined
|
||
|
}
|
||
|
if (obj !== undefined) {
|
||
|
this.write(obj)
|
||
|
}
|
||
|
return super.end(cb)
|
||
|
}
|
||
|
}
|
||
|
```
|
||
|
|
||
|
### transform that parses newline-delimited JSON
|
||
|
|
||
|
```js
|
||
|
class NDJSONDecode extends Minipass {
|
||
|
constructor (options) {
|
||
|
// always be in object mode, as far as Minipass is concerned
|
||
|
super({ objectMode: true })
|
||
|
this._jsonBuffer = ''
|
||
|
}
|
||
|
write (chunk, encoding, cb) {
|
||
|
if (typeof chunk === 'string' &&
|
||
|
typeof encoding === 'string' &&
|
||
|
encoding !== 'utf8') {
|
||
|
chunk = Buffer.from(chunk, encoding).toString()
|
||
|
} else if (Buffer.isBuffer(chunk))
|
||
|
chunk = chunk.toString()
|
||
|
}
|
||
|
if (typeof encoding === 'function') {
|
||
|
cb = encoding
|
||
|
}
|
||
|
const jsonData = (this._jsonBuffer + chunk).split('\n')
|
||
|
this._jsonBuffer = jsonData.pop()
|
||
|
for (let i = 0; i < jsonData.length; i++) {
|
||
|
try {
|
||
|
// JSON.parse can throw, emit an error on that
|
||
|
super.write(JSON.parse(jsonData[i]))
|
||
|
} catch (er) {
|
||
|
this.emit('error', er)
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
if (cb)
|
||
|
cb()
|
||
|
}
|
||
|
}
|
||
|
```
|