You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
518 lines
14 KiB
518 lines
14 KiB
'use strict'; |
|
|
|
const zlib = require('zlib'); |
|
|
|
const bufferUtil = require('./buffer-util'); |
|
const Limiter = require('./limiter'); |
|
const { kStatusCode, NOOP } = require('./constants'); |
|
|
|
const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]); |
|
const kPerMessageDeflate = Symbol('permessage-deflate'); |
|
const kTotalLength = Symbol('total-length'); |
|
const kCallback = Symbol('callback'); |
|
const kBuffers = Symbol('buffers'); |
|
const kError = Symbol('error'); |
|
|
|
// |
|
// We limit zlib concurrency, which prevents severe memory fragmentation |
|
// as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913 |
|
// and https://github.com/websockets/ws/issues/1202 |
|
// |
|
// Intentionally global; it's the global thread pool that's an issue. |
|
// |
|
let zlibLimiter; |
|
|
|
/** |
|
* permessage-deflate implementation. |
|
*/ |
|
class PerMessageDeflate { |
|
/** |
|
* Creates a PerMessageDeflate instance. |
|
* |
|
* @param {Object} [options] Configuration options |
|
* @param {Boolean} [options.serverNoContextTakeover=false] Request/accept |
|
* disabling of server context takeover |
|
* @param {Boolean} [options.clientNoContextTakeover=false] Advertise/ |
|
* acknowledge disabling of client context takeover |
|
* @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the |
|
* use of a custom server window size |
|
* @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support |
|
* for, or request, a custom client window size |
|
* @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on |
|
* deflate |
|
* @param {Object} [options.zlibInflateOptions] Options to pass to zlib on |
|
* inflate |
|
* @param {Number} [options.threshold=1024] Size (in bytes) below which |
|
* messages should not be compressed |
|
* @param {Number} [options.concurrencyLimit=10] The number of concurrent |
|
* calls to zlib |
|
* @param {Boolean} [isServer=false] Create the instance in either server or |
|
* client mode |
|
* @param {Number} [maxPayload=0] The maximum allowed message length |
|
*/ |
|
constructor(options, isServer, maxPayload) { |
|
this._maxPayload = maxPayload | 0; |
|
this._options = options || {}; |
|
this._threshold = |
|
this._options.threshold !== undefined ? this._options.threshold : 1024; |
|
this._isServer = !!isServer; |
|
this._deflate = null; |
|
this._inflate = null; |
|
|
|
this.params = null; |
|
|
|
if (!zlibLimiter) { |
|
const concurrency = |
|
this._options.concurrencyLimit !== undefined |
|
? this._options.concurrencyLimit |
|
: 10; |
|
zlibLimiter = new Limiter(concurrency); |
|
} |
|
} |
|
|
|
/** |
|
* @type {String} |
|
*/ |
|
static get extensionName() { |
|
return 'permessage-deflate'; |
|
} |
|
|
|
/** |
|
* Create an extension negotiation offer. |
|
* |
|
* @return {Object} Extension parameters |
|
* @public |
|
*/ |
|
offer() { |
|
const params = {}; |
|
|
|
if (this._options.serverNoContextTakeover) { |
|
params.server_no_context_takeover = true; |
|
} |
|
if (this._options.clientNoContextTakeover) { |
|
params.client_no_context_takeover = true; |
|
} |
|
if (this._options.serverMaxWindowBits) { |
|
params.server_max_window_bits = this._options.serverMaxWindowBits; |
|
} |
|
if (this._options.clientMaxWindowBits) { |
|
params.client_max_window_bits = this._options.clientMaxWindowBits; |
|
} else if (this._options.clientMaxWindowBits == null) { |
|
params.client_max_window_bits = true; |
|
} |
|
|
|
return params; |
|
} |
|
|
|
/** |
|
* Accept an extension negotiation offer/response. |
|
* |
|
* @param {Array} configurations The extension negotiation offers/reponse |
|
* @return {Object} Accepted configuration |
|
* @public |
|
*/ |
|
accept(configurations) { |
|
configurations = this.normalizeParams(configurations); |
|
|
|
this.params = this._isServer |
|
? this.acceptAsServer(configurations) |
|
: this.acceptAsClient(configurations); |
|
|
|
return this.params; |
|
} |
|
|
|
/** |
|
* Releases all resources used by the extension. |
|
* |
|
* @public |
|
*/ |
|
cleanup() { |
|
if (this._inflate) { |
|
this._inflate.close(); |
|
this._inflate = null; |
|
} |
|
|
|
if (this._deflate) { |
|
const callback = this._deflate[kCallback]; |
|
|
|
this._deflate.close(); |
|
this._deflate = null; |
|
|
|
if (callback) { |
|
callback( |
|
new Error( |
|
'The deflate stream was closed while data was being processed' |
|
) |
|
); |
|
} |
|
} |
|
} |
|
|
|
/** |
|
* Accept an extension negotiation offer. |
|
* |
|
* @param {Array} offers The extension negotiation offers |
|
* @return {Object} Accepted configuration |
|
* @private |
|
*/ |
|
acceptAsServer(offers) { |
|
const opts = this._options; |
|
const accepted = offers.find((params) => { |
|
if ( |
|
(opts.serverNoContextTakeover === false && |
|
params.server_no_context_takeover) || |
|
(params.server_max_window_bits && |
|
(opts.serverMaxWindowBits === false || |
|
(typeof opts.serverMaxWindowBits === 'number' && |
|
opts.serverMaxWindowBits > params.server_max_window_bits))) || |
|
(typeof opts.clientMaxWindowBits === 'number' && |
|
!params.client_max_window_bits) |
|
) { |
|
return false; |
|
} |
|
|
|
return true; |
|
}); |
|
|
|
if (!accepted) { |
|
throw new Error('None of the extension offers can be accepted'); |
|
} |
|
|
|
if (opts.serverNoContextTakeover) { |
|
accepted.server_no_context_takeover = true; |
|
} |
|
if (opts.clientNoContextTakeover) { |
|
accepted.client_no_context_takeover = true; |
|
} |
|
if (typeof opts.serverMaxWindowBits === 'number') { |
|
accepted.server_max_window_bits = opts.serverMaxWindowBits; |
|
} |
|
if (typeof opts.clientMaxWindowBits === 'number') { |
|
accepted.client_max_window_bits = opts.clientMaxWindowBits; |
|
} else if ( |
|
accepted.client_max_window_bits === true || |
|
opts.clientMaxWindowBits === false |
|
) { |
|
delete accepted.client_max_window_bits; |
|
} |
|
|
|
return accepted; |
|
} |
|
|
|
/** |
|
* Accept the extension negotiation response. |
|
* |
|
* @param {Array} response The extension negotiation response |
|
* @return {Object} Accepted configuration |
|
* @private |
|
*/ |
|
acceptAsClient(response) { |
|
const params = response[0]; |
|
|
|
if ( |
|
this._options.clientNoContextTakeover === false && |
|
params.client_no_context_takeover |
|
) { |
|
throw new Error('Unexpected parameter "client_no_context_takeover"'); |
|
} |
|
|
|
if (!params.client_max_window_bits) { |
|
if (typeof this._options.clientMaxWindowBits === 'number') { |
|
params.client_max_window_bits = this._options.clientMaxWindowBits; |
|
} |
|
} else if ( |
|
this._options.clientMaxWindowBits === false || |
|
(typeof this._options.clientMaxWindowBits === 'number' && |
|
params.client_max_window_bits > this._options.clientMaxWindowBits) |
|
) { |
|
throw new Error( |
|
'Unexpected or invalid parameter "client_max_window_bits"' |
|
); |
|
} |
|
|
|
return params; |
|
} |
|
|
|
/** |
|
* Normalize parameters. |
|
* |
|
* @param {Array} configurations The extension negotiation offers/reponse |
|
* @return {Array} The offers/response with normalized parameters |
|
* @private |
|
*/ |
|
normalizeParams(configurations) { |
|
configurations.forEach((params) => { |
|
Object.keys(params).forEach((key) => { |
|
let value = params[key]; |
|
|
|
if (value.length > 1) { |
|
throw new Error(`Parameter "${key}" must have only a single value`); |
|
} |
|
|
|
value = value[0]; |
|
|
|
if (key === 'client_max_window_bits') { |
|
if (value !== true) { |
|
const num = +value; |
|
if (!Number.isInteger(num) || num < 8 || num > 15) { |
|
throw new TypeError( |
|
`Invalid value for parameter "${key}": ${value}` |
|
); |
|
} |
|
value = num; |
|
} else if (!this._isServer) { |
|
throw new TypeError( |
|
`Invalid value for parameter "${key}": ${value}` |
|
); |
|
} |
|
} else if (key === 'server_max_window_bits') { |
|
const num = +value; |
|
if (!Number.isInteger(num) || num < 8 || num > 15) { |
|
throw new TypeError( |
|
`Invalid value for parameter "${key}": ${value}` |
|
); |
|
} |
|
value = num; |
|
} else if ( |
|
key === 'client_no_context_takeover' || |
|
key === 'server_no_context_takeover' |
|
) { |
|
if (value !== true) { |
|
throw new TypeError( |
|
`Invalid value for parameter "${key}": ${value}` |
|
); |
|
} |
|
} else { |
|
throw new Error(`Unknown parameter "${key}"`); |
|
} |
|
|
|
params[key] = value; |
|
}); |
|
}); |
|
|
|
return configurations; |
|
} |
|
|
|
/** |
|
* Decompress data. Concurrency limited. |
|
* |
|
* @param {Buffer} data Compressed data |
|
* @param {Boolean} fin Specifies whether or not this is the last fragment |
|
* @param {Function} callback Callback |
|
* @public |
|
*/ |
|
decompress(data, fin, callback) { |
|
zlibLimiter.add((done) => { |
|
this._decompress(data, fin, (err, result) => { |
|
done(); |
|
callback(err, result); |
|
}); |
|
}); |
|
} |
|
|
|
/** |
|
* Compress data. Concurrency limited. |
|
* |
|
* @param {Buffer} data Data to compress |
|
* @param {Boolean} fin Specifies whether or not this is the last fragment |
|
* @param {Function} callback Callback |
|
* @public |
|
*/ |
|
compress(data, fin, callback) { |
|
zlibLimiter.add((done) => { |
|
this._compress(data, fin, (err, result) => { |
|
done(); |
|
callback(err, result); |
|
}); |
|
}); |
|
} |
|
|
|
/** |
|
* Decompress data. |
|
* |
|
* @param {Buffer} data Compressed data |
|
* @param {Boolean} fin Specifies whether or not this is the last fragment |
|
* @param {Function} callback Callback |
|
* @private |
|
*/ |
|
_decompress(data, fin, callback) { |
|
const endpoint = this._isServer ? 'client' : 'server'; |
|
|
|
if (!this._inflate) { |
|
const key = `${endpoint}_max_window_bits`; |
|
const windowBits = |
|
typeof this.params[key] !== 'number' |
|
? zlib.Z_DEFAULT_WINDOWBITS |
|
: this.params[key]; |
|
|
|
this._inflate = zlib.createInflateRaw({ |
|
...this._options.zlibInflateOptions, |
|
windowBits |
|
}); |
|
this._inflate[kPerMessageDeflate] = this; |
|
this._inflate[kTotalLength] = 0; |
|
this._inflate[kBuffers] = []; |
|
this._inflate.on('error', inflateOnError); |
|
this._inflate.on('data', inflateOnData); |
|
} |
|
|
|
this._inflate[kCallback] = callback; |
|
|
|
this._inflate.write(data); |
|
if (fin) this._inflate.write(TRAILER); |
|
|
|
this._inflate.flush(() => { |
|
const err = this._inflate[kError]; |
|
|
|
if (err) { |
|
this._inflate.close(); |
|
this._inflate = null; |
|
callback(err); |
|
return; |
|
} |
|
|
|
const data = bufferUtil.concat( |
|
this._inflate[kBuffers], |
|
this._inflate[kTotalLength] |
|
); |
|
|
|
if (this._inflate._readableState.endEmitted) { |
|
this._inflate.close(); |
|
this._inflate = null; |
|
} else { |
|
this._inflate[kTotalLength] = 0; |
|
this._inflate[kBuffers] = []; |
|
|
|
if (fin && this.params[`${endpoint}_no_context_takeover`]) { |
|
this._inflate.reset(); |
|
} |
|
} |
|
|
|
callback(null, data); |
|
}); |
|
} |
|
|
|
/** |
|
* Compress data. |
|
* |
|
* @param {Buffer} data Data to compress |
|
* @param {Boolean} fin Specifies whether or not this is the last fragment |
|
* @param {Function} callback Callback |
|
* @private |
|
*/ |
|
_compress(data, fin, callback) { |
|
const endpoint = this._isServer ? 'server' : 'client'; |
|
|
|
if (!this._deflate) { |
|
const key = `${endpoint}_max_window_bits`; |
|
const windowBits = |
|
typeof this.params[key] !== 'number' |
|
? zlib.Z_DEFAULT_WINDOWBITS |
|
: this.params[key]; |
|
|
|
this._deflate = zlib.createDeflateRaw({ |
|
...this._options.zlibDeflateOptions, |
|
windowBits |
|
}); |
|
|
|
this._deflate[kTotalLength] = 0; |
|
this._deflate[kBuffers] = []; |
|
|
|
// |
|
// An `'error'` event is emitted, only on Node.js < 10.0.0, if the |
|
// `zlib.DeflateRaw` instance is closed while data is being processed. |
|
// This can happen if `PerMessageDeflate#cleanup()` is called at the wrong |
|
// time due to an abnormal WebSocket closure. |
|
// |
|
this._deflate.on('error', NOOP); |
|
this._deflate.on('data', deflateOnData); |
|
} |
|
|
|
this._deflate[kCallback] = callback; |
|
|
|
this._deflate.write(data); |
|
this._deflate.flush(zlib.Z_SYNC_FLUSH, () => { |
|
if (!this._deflate) { |
|
// |
|
// The deflate stream was closed while data was being processed. |
|
// |
|
return; |
|
} |
|
|
|
let data = bufferUtil.concat( |
|
this._deflate[kBuffers], |
|
this._deflate[kTotalLength] |
|
); |
|
|
|
if (fin) data = data.slice(0, data.length - 4); |
|
|
|
// |
|
// Ensure that the callback will not be called again in |
|
// `PerMessageDeflate#cleanup()`. |
|
// |
|
this._deflate[kCallback] = null; |
|
|
|
this._deflate[kTotalLength] = 0; |
|
this._deflate[kBuffers] = []; |
|
|
|
if (fin && this.params[`${endpoint}_no_context_takeover`]) { |
|
this._deflate.reset(); |
|
} |
|
|
|
callback(null, data); |
|
}); |
|
} |
|
} |
|
|
|
module.exports = PerMessageDeflate; |
|
|
|
/** |
|
* The listener of the `zlib.DeflateRaw` stream `'data'` event. |
|
* |
|
* @param {Buffer} chunk A chunk of data |
|
* @private |
|
*/ |
|
function deflateOnData(chunk) { |
|
this[kBuffers].push(chunk); |
|
this[kTotalLength] += chunk.length; |
|
} |
|
|
|
/** |
|
* The listener of the `zlib.InflateRaw` stream `'data'` event. |
|
* |
|
* @param {Buffer} chunk A chunk of data |
|
* @private |
|
*/ |
|
function inflateOnData(chunk) { |
|
this[kTotalLength] += chunk.length; |
|
|
|
if ( |
|
this[kPerMessageDeflate]._maxPayload < 1 || |
|
this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload |
|
) { |
|
this[kBuffers].push(chunk); |
|
return; |
|
} |
|
|
|
this[kError] = new RangeError('Max payload size exceeded'); |
|
this[kError].code = 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'; |
|
this[kError][kStatusCode] = 1009; |
|
this.removeListener('data', inflateOnData); |
|
this.reset(); |
|
} |
|
|
|
/** |
|
* The listener of the `zlib.InflateRaw` stream `'error'` event. |
|
* |
|
* @param {Error} err The emitted error |
|
* @private |
|
*/ |
|
function inflateOnError(err) { |
|
// |
|
// There is no need to call `Zlib#close()` as the handle is automatically |
|
// closed when an error is emitted. |
|
// |
|
this[kPerMessageDeflate]._inflate = null; |
|
err[kStatusCode] = 1007; |
|
this[kCallback](err); |
|
}
|
|
|