Skip to content

Instantly share code, notes, and snippets.

@chrisdickinson
Last active August 29, 2015 14:05
Show Gist options
  • Save chrisdickinson/3230b2b1fa4e1046e1c1 to your computer and use it in GitHub Desktop.
Save chrisdickinson/3230b2b1fa4e1046e1c1 to your computer and use it in GitHub Desktop.

pipeline errors

Writing correct streaming programs is difficult. A single stream missing an error handler in a pipeline has the ability to crash your entire program, often with a cryptic error message. Even when an error listener is added, the stream system has already unpiped by the time client code is called. This makes it difficult to generically gather pipeline information on error events. Finally, error events are not necessarily fatal to the pipeline, but the user has no way to communicate this intent back to the stream subsystem -- any error emitted will destroy whatever pipeline it is emitted in, at present. To reiterate: error handling in pipelines requires users to remember to add error listeners to every constituent stream, does not allow users to prevent undesired default behavior, and in the event of an error, does not give users enough information to debug the situation.

The goals and constraints of this proposal are as follows:

  • Make correct error handling easier to implement for end users.
  • The solution must be polyfillable for older versions of node.
  • The solution must work in browser (for browserify).
  • Work with extant stream solutions (through, through2).
  • Provide public methods into internal state for being able to traverse up and down pipelines.
  • Implement the solution in terms of public methods, inasfar as possible.

To that end, this proposal adds six methods (in three pairs) to the Stream class.

  • stream.next() and stream.prev(): Get an array of the set of immediate destination or source streams.
  • stream.nextAll() and stream.prevAll(): Get an array of the set of all destination or source streams, with no duplicates or cycles.
  • stream.addPipelineErrorHandler(fn) and stream.removePipelineErrorHandler(fn): Add a pipeline error handler to a stream.

User-facing API

var A = fs.createReadStream('input');
var B = new stream.PassThrough();
var C = fs.createWriteStream('output');

A.name = 'A';
B.name = 'B';
C.name = 'C';

C.addPipelineErrorHandler(function(ev) {
  console.log(ev.error.message, ev.stream.name);
  ev.handleError();
});

A.pipe(B).pipe(C);
A.emit('error', new Error('hello!')); // logs "hello!".

For example, in the above program, someone has maliciously injected an error event into our stream pipeline. The error originates on A, but is handled at C. The error handler indicates that the user wishes to manually handle this error event and prevent the unpiping behavior with ev.handleError(). The original error and the originating stream are available at ev.error and ev.stream, respectively.

The error event may only be marked "handled" on the same event loop turn as it was emitted. Multiple pipeline error handlers may intercept an error -- it propagates downstream, hitting handlers "A" then "B" then "C". Once an error event is "handled", it stops propagating entirely. For complex pipelines -- where there may be cycles, or multiple branches, it propagates down the "oldest" pipelines first, stopping when it hits duplicates. A few examples:

A - B - A     error at A, propagates to A, then B.
*

A - B - A     error at B, propagates to B, then A.
    *

A - B - C     error at A, propagates to A, B, then C.
*

A - B - C     error at B, propagates to B, then C.
    *

       C - D  error at A, propagates to A, B, C, D, X, then Y.
      /       NB: handlers on *either* branch have the opportunity
A - B         to "handle" the error and thus stop propagation.
*     \
       X - Y

A user may remove a handler by calling stream.removePipelineErrorHandler(fn) with the handler function they originally provided.

Implementation

The sequence of events during an error is as follows:

  1. A.emit("error")
  2. stream .pipe "error" handler (or _defaultErrorHandler).
  3. Internal "_preError" event.
  4. pipelineErrorHandlers for A, and all downstream streams (gathered with nextAll) until error is handled or streams exhausted.
  5. if the error is handled, skip to step 8.
  6. unpipe().
  7. re-emit error if no other error handlers exist.
  8. The rest of the "error" handlers on A.

In order to implement this sequence of events, the following modifications and additions were made:

Added _nextStreams and _prevStreams attributes to base Stream class

The stream constructor grew two new arrays -- _nextStreams and _prevStreams. These are arrays tracking the next and previous streams, and are managed internally by the .pipe method (both Readable#pipe and Stream#pipe may affect these arrays. Since "correct" subclassing isn't guaranteed (see crypto's LazyTransform), the attributes are lazily added by *#pipe if they are not present. These attributes are there to support the {next,prev}{All,}() methods. _nextStreams is a slight duplication of information in the Readable case, but it is only ever represented by an Array, while _readableState.pipes may be null, a single Stream, or an array of Streams.

Added {next,prev}{All,}() methods

To give users better tools to debug errors within streams, the next(), prev(), nextAll(), and prevAll() methods were added to publicly expose what was previously private or implicit state. nextAll and prevAll are implemented in terms of iterateStream(stream, attributeName) (here and here), which does a (non-recursive) depth first traversal of the _nextStreams (or _prevStreams) property. One potentially controversial bit is the use of a WeakSet to ensure no duplicates are added to the output -- this could be replaced with an array of "seen" streams in environments that don't support WeakSet.

Modified onerror in {Readable,Stream}#pipe

This is the primary change:

function onerror(er) {
  var handled = {handled: false};
  this.emit('_preError', er, handled);

  if (handled.handled) {
    return;
  }

  // ...

This has been added for old streams as well as new streams. The goal was to keep all of the pipeline error event handling out of the pipe functions. Earlier attempts, where the pipelineError event was emitted as a "normal" EE-style event, and the handling logic was embedded in .pipe became complicated very quickly. There's bookkeeping that needs to be done with regards to default error handlers, deciding which error handler needs to propagate the PipelineErrorEvent, etc., and I wanted to keep that as separate from .pipe as possible. In order to do this, I added an internal event -- "_preError" -- that emits the error and an object that can be modified by listeners of _preError to indicate whether the error is handled or not.

One benefit of this approach is that the feature can be added piecemeal -- adding the navigational components can be one piece, adding _preError can be another, and we can iterate on what the public API for _preError should be (if the current approach is deemed unsatisfactory).

Added _errorHandlers attribute to base Stream class

This is pretty straightforward. Like _{next,prev}Streams, it's an attribute that's lazily added if not present; otherwise it contains a list of {handler, uninstall} objects. {add,remove}PipelineErrorHandler manipulates this attribute.

Added addPipelineErrorEvent

This is the real meat of the change. It installs listeners onto the current stream as well as all streams returned by prevAll(). Installation adds listeners for unpipe, pipe, _preError, and, if there are no error handlers present, error as well.

The error event handler is called _defaultPipelineErrorHandler (here). Because streams2+ do not add an error event handler to Readable streams on .pipe, this ensures consistent pipeline error behavior -- namely, that errors emitted on Readables are forwarded down the pipeline.

Because upstream may change over time (after the initial addPipelineErrorHandler call), a listener is added for pipe and unpipe. Once piped to, a pipeline-error-handling stream will remove the _defaultPipelineErrorHandler, and install the 3 to 4 listeners on the new source and all of its ancestors. If unpiped, to keep things simple, the handler simply uninstalls and reinstalls itself, re-propagating the error handlers out appropriately.

Uninstallation involves removing all installed listeners, and splicing out the errorHandler from the list of error handlers if the stream is the source stream. Since the streams installed are tracked by the installedOnStreams array (deduplicated with the aid of a WeakSet), uninstallation is as simple as calling uninstall on each of the streams.

@dominictarr
Copy link

Wow, this is scarily complex. I think the thing with streams is that their use is incredibly broad,
and so, it's if you discuss them in abstract then you are discussing many many possible uses.

I think we need to focus any discussion like this in terms of concrete examples. Every error is different, so we need to consider them in terms of their context. What sort of situations do you want to be handlable?

Handleable Errors: I support the idea of error forwarding. I use error forwarding in pull-streams,
and it works great. say I have a pipeline from a network connection, through several transforms, then written to a file, if the network stream errors, then that is forwarded through the transforms, and then the file is closed - and a partial file handled in which ever way is correct.

I don't think there are many cases where you want to handle an error and keep on going,
one example could be parsing lines out of a long file - maybe lines that don't parse should just be ignored. This case does not warrant error forwarding - can you think of any other cases?

I think the cases where there can be an error and it keeps going usually involve multiple pipelines. For example, in dynamo/riak/cassandra a PUT is written to N (usually 3) servers,
if the stream to one of those errors, the rest keep on going. Like wise, a read is read from N servers, and it waits for at least 2 responses. If one of these error, that can be handled in a broader context, but the stream that errored is dead. "Error" is in many respects just like "end".

I think you are correct to propose two way propagation - the sink needs to know when the source has failed, and if the pipeline fails in the middle (example: invalid zip headers) then both ends need to be notified - the source should be closed, and the error should be passed to the sink.

@hayes
Copy link

hayes commented Aug 29, 2014

@dominictarr at Urban Airship we use object mode streams to create long running pipelines that often pass through several error prone sections in the browser. For us being able to combine error forwarding with the ability to prevent the pipeline from being destroyed would allow for a more modular approach and/or less duplication of code.

an example might be for a paginated table:
click-pagination-button --> prevent-default --> generate-url-from-attr --> request-new-page --> json-parse --> format-data --> rerender-table

if a request fails, or there is an error parsing the json, we probably just want to throw up some kind of error or warning message, but if there is another click on the pagination, we still want to try to request the new set of data without having to rebuild the pipeline.

There are a lot of similar cases where you have long running pipeline like a series of build steps that recompile some static assets any time any file changes.

@yocontra
Copy link

@dominictarr In gulp we use object mode streams to create pipelines of file transformations.

For example: You have 100 files you want to compile and lint. There is the possibility that a file may trigger an error (as it fails to compile or lint) while passing through the pipeline. This crashes the process - so we wrap tasks in a try/catch to handle errors (soon to be a domain). The only way to prevent this is expecting users to attach error handlers to every single stream in their pipeline - which for a long pipeline becomes insanely annoying.

Now that the process isn't crashing, we have the issue of unpipe on error. If I have 100 files and 1 of them errors, it causes the pipeline to become completely destroyed and no other work can be done. There is no concept of a "partial build" because of this. Currently people have modules to monkeypatch this out of the Stream prototype, but I don't think that's a viable long term solution.

We have to make a decision for gulp 4 about what to do for error management on pipelines, and if it isn't something like this then the only other option is really unpipe monkeypatching.

Simply put our needs are:

  • one location where errors bubble in stream pipelines, to prevent process from exploding
  • object mode streams that don't unpipe on error

@phated
Copy link

phated commented Aug 29, 2014

This is really well written to handle concerns and why you chose to do it each way, @chrisdickinson.

@contra is right about people monkey-patching streams down the pipeline to solve all the problems they are encountering in gulp. Node streams are really primitive and we need a way to have more control over errors and unpiping.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment