Spaces:
Runtime error
Runtime error
| const { | |
| BalancedPoolMissingUpstreamError, | |
| InvalidArgumentError | |
| } = require('./core/errors') | |
| const { | |
| PoolBase, | |
| kClients, | |
| kNeedDrain, | |
| kAddClient, | |
| kRemoveClient, | |
| kGetDispatcher | |
| } = require('./pool-base') | |
| const Pool = require('./pool') | |
| const { kUrl, kInterceptors } = require('./core/symbols') | |
| const { parseOrigin } = require('./core/util') | |
| const kFactory = Symbol('factory') | |
| const kOptions = Symbol('options') | |
| const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor') | |
| const kCurrentWeight = Symbol('kCurrentWeight') | |
| const kIndex = Symbol('kIndex') | |
| const kWeight = Symbol('kWeight') | |
| const kMaxWeightPerServer = Symbol('kMaxWeightPerServer') | |
| const kErrorPenalty = Symbol('kErrorPenalty') | |
| function getGreatestCommonDivisor (a, b) { | |
| if (b === 0) return a | |
| return getGreatestCommonDivisor(b, a % b) | |
| } | |
| function defaultFactory (origin, opts) { | |
| return new Pool(origin, opts) | |
| } | |
| class BalancedPool extends PoolBase { | |
| constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) { | |
| super() | |
| this[kOptions] = opts | |
| this[kIndex] = -1 | |
| this[kCurrentWeight] = 0 | |
| this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100 | |
| this[kErrorPenalty] = this[kOptions].errorPenalty || 15 | |
| if (!Array.isArray(upstreams)) { | |
| upstreams = [upstreams] | |
| } | |
| if (typeof factory !== 'function') { | |
| throw new InvalidArgumentError('factory must be a function.') | |
| } | |
| this[kInterceptors] = opts.interceptors && opts.interceptors.BalancedPool && Array.isArray(opts.interceptors.BalancedPool) | |
| ? opts.interceptors.BalancedPool | |
| : [] | |
| this[kFactory] = factory | |
| for (const upstream of upstreams) { | |
| this.addUpstream(upstream) | |
| } | |
| this._updateBalancedPoolStats() | |
| } | |
| addUpstream (upstream) { | |
| const upstreamOrigin = parseOrigin(upstream).origin | |
| if (this[kClients].find((pool) => ( | |
| pool[kUrl].origin === upstreamOrigin && | |
| pool.closed !== true && | |
| pool.destroyed !== true | |
| ))) { | |
| return this | |
| } | |
| const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions])) | |
| this[kAddClient](pool) | |
| pool.on('connect', () => { | |
| pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty]) | |
| }) | |
| pool.on('connectionError', () => { | |
| pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]) | |
| this._updateBalancedPoolStats() | |
| }) | |
| pool.on('disconnect', (...args) => { | |
| const err = args[2] | |
| if (err && err.code === 'UND_ERR_SOCKET') { | |
| // decrease the weight of the pool. | |
| pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]) | |
| this._updateBalancedPoolStats() | |
| } | |
| }) | |
| for (const client of this[kClients]) { | |
| client[kWeight] = this[kMaxWeightPerServer] | |
| } | |
| this._updateBalancedPoolStats() | |
| return this | |
| } | |
| _updateBalancedPoolStats () { | |
| this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0) | |
| } | |
| removeUpstream (upstream) { | |
| const upstreamOrigin = parseOrigin(upstream).origin | |
| const pool = this[kClients].find((pool) => ( | |
| pool[kUrl].origin === upstreamOrigin && | |
| pool.closed !== true && | |
| pool.destroyed !== true | |
| )) | |
| if (pool) { | |
| this[kRemoveClient](pool) | |
| } | |
| return this | |
| } | |
| get upstreams () { | |
| return this[kClients] | |
| .filter(dispatcher => dispatcher.closed !== true && dispatcher.destroyed !== true) | |
| .map((p) => p[kUrl].origin) | |
| } | |
| [kGetDispatcher] () { | |
| // We validate that pools is greater than 0, | |
| // otherwise we would have to wait until an upstream | |
| // is added, which might never happen. | |
| if (this[kClients].length === 0) { | |
| throw new BalancedPoolMissingUpstreamError() | |
| } | |
| const dispatcher = this[kClients].find(dispatcher => ( | |
| !dispatcher[kNeedDrain] && | |
| dispatcher.closed !== true && | |
| dispatcher.destroyed !== true | |
| )) | |
| if (!dispatcher) { | |
| return | |
| } | |
| const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true) | |
| if (allClientsBusy) { | |
| return | |
| } | |
| let counter = 0 | |
| let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain]) | |
| while (counter++ < this[kClients].length) { | |
| this[kIndex] = (this[kIndex] + 1) % this[kClients].length | |
| const pool = this[kClients][this[kIndex]] | |
| // find pool index with the largest weight | |
| if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) { | |
| maxWeightIndex = this[kIndex] | |
| } | |
| // decrease the current weight every `this[kClients].length`. | |
| if (this[kIndex] === 0) { | |
| // Set the current weight to the next lower weight. | |
| this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor] | |
| if (this[kCurrentWeight] <= 0) { | |
| this[kCurrentWeight] = this[kMaxWeightPerServer] | |
| } | |
| } | |
| if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) { | |
| return pool | |
| } | |
| } | |
| this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight] | |
| this[kIndex] = maxWeightIndex | |
| return this[kClients][maxWeightIndex] | |
| } | |
| } | |
| module.exports = BalancedPool | |