Writing correct streaming programs is difficult. A single stream missing an error handler in a pipeline has the ability to crash your entire program, often with a cryptic error message. Even when an error listener is added, the stream system has already unpiped by the time client code is called. This makes it difficult to generically gather pipeline information on error events. Finally, error events are not necessarily fatal to the pipeline, but the user has no way to communicate this intent back to the stream subsystem -- any error emitted will destroy whatever pipeline it is emitted in, at present. To reiterate: error handling in pipelines requires users to remember to add error listeners to every constituent stream, does not allow users to prevent undesired default behavior, and in the event of an error, does not give users enough information to debug the situation.
The goals and constraints of this proposal are as follows:
- Make correct error handling easier to implement for end users.
- The solution must be polyfillable for older versions of node.
- The solution must work in browser (for browserify).
- Work with extant stream solutions (through, through2).
- Provide public methods into internal state for being able to traverse up and down pipelines.
- Implement the solution in terms of public methods, inasfar as possible.
To that end, this proposal adds six methods (in three pairs) to the Stream class.
stream.next()
andstream.prev()
: Get an array of the set of immediate destination or source streams.stream.nextAll()
andstream.prevAll()
: Get an array of the set of all destination or source streams, with no duplicates or cycles.stream.addPipelineErrorHandler(fn)
andstream.removePipelineErrorHandler(fn)
: Add a pipeline error handler to a stream.
var A = fs.createReadStream('input');
var B = new stream.PassThrough();
var C = fs.createWriteStream('output');
A.name = 'A';
B.name = 'B';
C.name = 'C';
C.addPipelineErrorHandler(function(ev) {
console.log(ev.error.message, ev.stream.name);
ev.handleError();
});
A.pipe(B).pipe(C);
A.emit('error', new Error('hello!')); // logs "hello!".
For example, in the above program, someone has maliciously injected an error
event into our stream pipeline. The error originates on A
, but is handled at
C
. The error handler indicates that the user wishes to manually handle this error
event and prevent the unpiping behavior with ev.handleError()
. The original error
and the originating stream are available at ev.error
and ev.stream
, respectively.
The error event may only be marked "handled" on the same event loop turn as it was emitted. Multiple pipeline error handlers may intercept an error -- it propagates downstream, hitting handlers "A" then "B" then "C". Once an error event is "handled", it stops propagating entirely. For complex pipelines -- where there may be cycles, or multiple branches, it propagates down the "oldest" pipelines first, stopping when it hits duplicates. A few examples:
A - B - A error at A, propagates to A, then B.
*
A - B - A error at B, propagates to B, then A.
*
A - B - C error at A, propagates to A, B, then C.
*
A - B - C error at B, propagates to B, then C.
*
C - D error at A, propagates to A, B, C, D, X, then Y.
/ NB: handlers on *either* branch have the opportunity
A - B to "handle" the error and thus stop propagation.
* \
X - Y
A user may remove a handler by calling stream.removePipelineErrorHandler(fn)
with the handler
function they originally provided.
The sequence of events during an error is as follows:
- A.emit("error")
- stream .pipe "error" handler (or
_defaultErrorHandler
). - Internal "_preError" event.
- pipelineErrorHandlers for A, and all downstream streams (gathered with
nextAll
) until error is handled or streams exhausted. - if the error is handled, skip to step 8.
- unpipe().
- re-emit error if no other error handlers exist.
- The rest of the "error" handlers on A.
In order to implement this sequence of events, the following modifications and additions were made:
The stream constructor grew two new arrays -- _nextStreams
and
_prevStreams
. These are arrays tracking the next and previous streams, and
are managed internally by the .pipe
method (both Readable#pipe
and
Stream#pipe
may affect these arrays. Since "correct" subclassing isn't
guaranteed (see crypto's LazyTransform
), the attributes are lazily added
by *#pipe
if they are not present. These attributes are there to support
the {next,prev}{All,}()
methods. _nextStreams
is a slight duplication
of information in the Readable
case, but it is only ever represented by an
Array, while _readableState.pipes
may be null, a single Stream, or an array
of Streams.
To give users better tools to debug errors within streams, the next()
,
prev()
, nextAll()
, and prevAll()
methods were added to publicly expose
what was previously private or implicit state. nextAll
and prevAll
are implemented
in terms of iterateStream(stream, attributeName)
(here and here), which does a (non-recursive) depth
first traversal of the _nextStreams
(or _prevStreams
) property. One potentially
controversial bit is the use of a WeakSet
to ensure no duplicates are added to the
output -- this could be replaced with an array of "seen" streams in environments that
don't support WeakSet
.
This is the primary change:
function onerror(er) {
var handled = {handled: false};
this.emit('_preError', er, handled);
if (handled.handled) {
return;
}
// ...
This has been added for old streams as
well as new streams. The goal
was to keep all of the pipeline error event handling out of the pipe functions. Earlier attempts, where the pipelineError event was
emitted as a "normal" EE-style event, and the handling logic was embedded in .pipe
became complicated very quickly. There's bookkeeping
that needs to be done with regards to default error handlers, deciding which error handler needs to propagate the PipelineErrorEvent, etc.,
and I wanted to keep that as separate from .pipe
as possible. In order to do this, I added an internal event -- "_preError"
-- that emits
the error and an object that can be modified by listeners of _preError
to indicate whether the error is handled or not.
One benefit of this approach is that the feature can be added piecemeal -- adding the navigational components can be one piece, adding _preError
can be another, and we can iterate on what the public API for _preError
should be (if the current approach is deemed unsatisfactory).
This is pretty straightforward. Like _{next,prev}Streams
, it's an attribute that's
lazily added if not present; otherwise it contains a list of {handler, uninstall}
objects. {add,remove}PipelineErrorHandler
manipulates this attribute.
This is the real meat of the change. It installs listeners onto the current
stream as well as all streams returned by prevAll()
. Installation adds
listeners for unpipe
, pipe
, _preError
, and, if there are no error
handlers present, error
as well.
The error
event handler is called _defaultPipelineErrorHandler
(here). Because streams2+
do not add an error event handler to Readable streams on .pipe
, this ensures consistent
pipeline error behavior -- namely, that errors emitted on Readables are forwarded down the
pipeline.
Because upstream may change over time (after the initial
addPipelineErrorHandler
call), a listener is added for pipe
and
unpipe
. Once piped to, a pipeline-error-handling stream will remove the
_defaultPipelineErrorHandler
, and install the 3 to 4 listeners on the new
source and all of its ancestors. If unpiped, to keep things simple, the handler
simply uninstalls and reinstalls itself, re-propagating the error handlers out
appropriately.
Uninstallation involves removing all installed listeners, and splicing out the
errorHandler from the list of error handlers if the stream is the source
stream. Since the streams installed are tracked by the installedOnStreams
array (deduplicated with the aid of a WeakSet
), uninstallation is as simple as
calling uninstall
on each of the streams.
Wow, this is scarily complex. I think the thing with streams is that their use is incredibly broad,
and so, it's if you discuss them in abstract then you are discussing many many possible uses.
I think we need to focus any discussion like this in terms of concrete examples. Every error is different, so we need to consider them in terms of their context. What sort of situations do you want to be handlable?
Handleable Errors: I support the idea of error forwarding. I use error forwarding in pull-streams,
and it works great. say I have a pipeline from a network connection, through several transforms, then written to a file, if the network stream errors, then that is forwarded through the transforms, and then the file is closed - and a partial file handled in which ever way is correct.
I don't think there are many cases where you want to handle an error and keep on going,
one example could be parsing lines out of a long file - maybe lines that don't parse should just be ignored. This case does not warrant error forwarding - can you think of any other cases?
I think the cases where there can be an error and it keeps going usually involve multiple pipelines. For example, in dynamo/riak/cassandra a PUT is written to N (usually 3) servers,
if the stream to one of those errors, the rest keep on going. Like wise, a read is read from N servers, and it waits for at least 2 responses. If one of these error, that can be handled in a broader context, but the stream that errored is dead. "Error" is in many respects just like "end".
I think you are correct to propose two way propagation - the sink needs to know when the source has failed, and if the pipeline fails in the middle (example: invalid zip headers) then both ends need to be notified - the source should be closed, and the error should be passed to the sink.