Spaces:
				
			
			
	
			
			
		Configuration error
		
	
	
	
			
			
	
	
	
	
		
		
		Configuration error
		
	| var util = require('util'); | |
| var Stream = require('stream').Stream; | |
| var DelayedStream = require('delayed-stream'); | |
| module.exports = CombinedStream; | |
| function CombinedStream() { | |
| this.writable = false; | |
| this.readable = true; | |
| this.dataSize = 0; | |
| this.maxDataSize = 2 * 1024 * 1024; | |
| this.pauseStreams = true; | |
| this._released = false; | |
| this._streams = []; | |
| this._currentStream = null; | |
| this._insideLoop = false; | |
| this._pendingNext = false; | |
| } | |
| util.inherits(CombinedStream, Stream); | |
| CombinedStream.create = function(options) { | |
| var combinedStream = new this(); | |
| options = options || {}; | |
| for (var option in options) { | |
| combinedStream[option] = options[option]; | |
| } | |
| return combinedStream; | |
| }; | |
| CombinedStream.isStreamLike = function(stream) { | |
| return (typeof stream !== 'function') | |
| && (typeof stream !== 'string') | |
| && (typeof stream !== 'boolean') | |
| && (typeof stream !== 'number') | |
| && (!Buffer.isBuffer(stream)); | |
| }; | |
| CombinedStream.prototype.append = function(stream) { | |
| var isStreamLike = CombinedStream.isStreamLike(stream); | |
| if (isStreamLike) { | |
| if (!(stream instanceof DelayedStream)) { | |
| var newStream = DelayedStream.create(stream, { | |
| maxDataSize: Infinity, | |
| pauseStream: this.pauseStreams, | |
| }); | |
| stream.on('data', this._checkDataSize.bind(this)); | |
| stream = newStream; | |
| } | |
| this._handleErrors(stream); | |
| if (this.pauseStreams) { | |
| stream.pause(); | |
| } | |
| } | |
| this._streams.push(stream); | |
| return this; | |
| }; | |
| CombinedStream.prototype.pipe = function(dest, options) { | |
| Stream.prototype.pipe.call(this, dest, options); | |
| this.resume(); | |
| return dest; | |
| }; | |
| CombinedStream.prototype._getNext = function() { | |
| this._currentStream = null; | |
| if (this._insideLoop) { | |
| this._pendingNext = true; | |
| return; // defer call | |
| } | |
| this._insideLoop = true; | |
| try { | |
| do { | |
| this._pendingNext = false; | |
| this._realGetNext(); | |
| } while (this._pendingNext); | |
| } finally { | |
| this._insideLoop = false; | |
| } | |
| }; | |
| CombinedStream.prototype._realGetNext = function() { | |
| var stream = this._streams.shift(); | |
| if (typeof stream == 'undefined') { | |
| this.end(); | |
| return; | |
| } | |
| if (typeof stream !== 'function') { | |
| this._pipeNext(stream); | |
| return; | |
| } | |
| var getStream = stream; | |
| getStream(function(stream) { | |
| var isStreamLike = CombinedStream.isStreamLike(stream); | |
| if (isStreamLike) { | |
| stream.on('data', this._checkDataSize.bind(this)); | |
| this._handleErrors(stream); | |
| } | |
| this._pipeNext(stream); | |
| }.bind(this)); | |
| }; | |
| CombinedStream.prototype._pipeNext = function(stream) { | |
| this._currentStream = stream; | |
| var isStreamLike = CombinedStream.isStreamLike(stream); | |
| if (isStreamLike) { | |
| stream.on('end', this._getNext.bind(this)); | |
| stream.pipe(this, {end: false}); | |
| return; | |
| } | |
| var value = stream; | |
| this.write(value); | |
| this._getNext(); | |
| }; | |
| CombinedStream.prototype._handleErrors = function(stream) { | |
| var self = this; | |
| stream.on('error', function(err) { | |
| self._emitError(err); | |
| }); | |
| }; | |
| CombinedStream.prototype.write = function(data) { | |
| this.emit('data', data); | |
| }; | |
| CombinedStream.prototype.pause = function() { | |
| if (!this.pauseStreams) { | |
| return; | |
| } | |
| if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause(); | |
| this.emit('pause'); | |
| }; | |
| CombinedStream.prototype.resume = function() { | |
| if (!this._released) { | |
| this._released = true; | |
| this.writable = true; | |
| this._getNext(); | |
| } | |
| if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume(); | |
| this.emit('resume'); | |
| }; | |
| CombinedStream.prototype.end = function() { | |
| this._reset(); | |
| this.emit('end'); | |
| }; | |
| CombinedStream.prototype.destroy = function() { | |
| this._reset(); | |
| this.emit('close'); | |
| }; | |
| CombinedStream.prototype._reset = function() { | |
| this.writable = false; | |
| this._streams = []; | |
| this._currentStream = null; | |
| }; | |
| CombinedStream.prototype._checkDataSize = function() { | |
| this._updateDataSize(); | |
| if (this.dataSize <= this.maxDataSize) { | |
| return; | |
| } | |
| var message = | |
| 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'; | |
| this._emitError(new Error(message)); | |
| }; | |
| CombinedStream.prototype._updateDataSize = function() { | |
| this.dataSize = 0; | |
| var self = this; | |
| this._streams.forEach(function(stream) { | |
| if (!stream.dataSize) { | |
| return; | |
| } | |
| self.dataSize += stream.dataSize; | |
| }); | |
| if (this._currentStream && this._currentStream.dataSize) { | |
| this.dataSize += this._currentStream.dataSize; | |
| } | |
| }; | |
| CombinedStream.prototype._emitError = function(err) { | |
| this._reset(); | |
| this.emit('error', err); | |
| }; | |

