Spaces:
Runtime error
Runtime error
const DispatcherBase = require('./dispatcher-base') | |
const FixedQueue = require('./node/fixed-queue') | |
const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('./core/symbols') | |
const PoolStats = require('./pool-stats') | |
const kClients = Symbol('clients') | |
const kNeedDrain = Symbol('needDrain') | |
const kQueue = Symbol('queue') | |
const kClosedResolve = Symbol('closed resolve') | |
const kOnDrain = Symbol('onDrain') | |
const kOnConnect = Symbol('onConnect') | |
const kOnDisconnect = Symbol('onDisconnect') | |
const kOnConnectionError = Symbol('onConnectionError') | |
const kGetDispatcher = Symbol('get dispatcher') | |
const kAddClient = Symbol('add client') | |
const kRemoveClient = Symbol('remove client') | |
const kStats = Symbol('stats') | |
class PoolBase extends DispatcherBase { | |
constructor () { | |
super() | |
this[kQueue] = new FixedQueue() | |
this[kClients] = [] | |
this[kQueued] = 0 | |
const pool = this | |
this[kOnDrain] = function onDrain (origin, targets) { | |
const queue = pool[kQueue] | |
let needDrain = false | |
while (!needDrain) { | |
const item = queue.shift() | |
if (!item) { | |
break | |
} | |
pool[kQueued]-- | |
needDrain = !this.dispatch(item.opts, item.handler) | |
} | |
this[kNeedDrain] = needDrain | |
if (!this[kNeedDrain] && pool[kNeedDrain]) { | |
pool[kNeedDrain] = false | |
pool.emit('drain', origin, [pool, ...targets]) | |
} | |
if (pool[kClosedResolve] && queue.isEmpty()) { | |
Promise | |
.all(pool[kClients].map(c => c.close())) | |
.then(pool[kClosedResolve]) | |
} | |
} | |
this[kOnConnect] = (origin, targets) => { | |
pool.emit('connect', origin, [pool, ...targets]) | |
} | |
this[kOnDisconnect] = (origin, targets, err) => { | |
pool.emit('disconnect', origin, [pool, ...targets], err) | |
} | |
this[kOnConnectionError] = (origin, targets, err) => { | |
pool.emit('connectionError', origin, [pool, ...targets], err) | |
} | |
this[kStats] = new PoolStats(this) | |
} | |
get [kBusy] () { | |
return this[kNeedDrain] | |
} | |
get [kConnected] () { | |
return this[kClients].filter(client => client[kConnected]).length | |
} | |
get [kFree] () { | |
return this[kClients].filter(client => client[kConnected] && !client[kNeedDrain]).length | |
} | |
get [kPending] () { | |
let ret = this[kQueued] | |
for (const { [kPending]: pending } of this[kClients]) { | |
ret += pending | |
} | |
return ret | |
} | |
get [kRunning] () { | |
let ret = 0 | |
for (const { [kRunning]: running } of this[kClients]) { | |
ret += running | |
} | |
return ret | |
} | |
get [kSize] () { | |
let ret = this[kQueued] | |
for (const { [kSize]: size } of this[kClients]) { | |
ret += size | |
} | |
return ret | |
} | |
get stats () { | |
return this[kStats] | |
} | |
async [kClose] () { | |
if (this[kQueue].isEmpty()) { | |
return Promise.all(this[kClients].map(c => c.close())) | |
} else { | |
return new Promise((resolve) => { | |
this[kClosedResolve] = resolve | |
}) | |
} | |
} | |
async [kDestroy] (err) { | |
while (true) { | |
const item = this[kQueue].shift() | |
if (!item) { | |
break | |
} | |
item.handler.onError(err) | |
} | |
return Promise.all(this[kClients].map(c => c.destroy(err))) | |
} | |
[kDispatch] (opts, handler) { | |
const dispatcher = this[kGetDispatcher]() | |
if (!dispatcher) { | |
this[kNeedDrain] = true | |
this[kQueue].push({ opts, handler }) | |
this[kQueued]++ | |
} else if (!dispatcher.dispatch(opts, handler)) { | |
dispatcher[kNeedDrain] = true | |
this[kNeedDrain] = !this[kGetDispatcher]() | |
} | |
return !this[kNeedDrain] | |
} | |
[kAddClient] (client) { | |
client | |
.on('drain', this[kOnDrain]) | |
.on('connect', this[kOnConnect]) | |
.on('disconnect', this[kOnDisconnect]) | |
.on('connectionError', this[kOnConnectionError]) | |
this[kClients].push(client) | |
if (this[kNeedDrain]) { | |
process.nextTick(() => { | |
if (this[kNeedDrain]) { | |
this[kOnDrain](client[kUrl], [this, client]) | |
} | |
}) | |
} | |
return this | |
} | |
[kRemoveClient] (client) { | |
client.close(() => { | |
const idx = this[kClients].indexOf(client) | |
if (idx !== -1) { | |
this[kClients].splice(idx, 1) | |
} | |
}) | |
this[kNeedDrain] = this[kClients].some(dispatcher => ( | |
!dispatcher[kNeedDrain] && | |
dispatcher.closed !== true && | |
dispatcher.destroyed !== true | |
)) | |
} | |
} | |
module.exports = { | |
PoolBase, | |
kClients, | |
kNeedDrain, | |
kAddClient, | |
kRemoveClient, | |
kGetDispatcher | |
} | |