improve allocations needed in serialization

This commit is contained in:
Tobias Koppers 2021-08-10 17:30:00 +02:00
parent a402d2002a
commit db9b2df6cd
1 changed files with 297 additions and 269 deletions

View File

@ -133,17 +133,29 @@ class BinaryMiddleware extends SerializerMiddleware {
/**
* @param {DeserializedType} data data
* @param {Object} context context object
* @param {{ leftOverBuffer: Buffer | null, allocationSize: number, increaseCounter: number }} allocationScope allocation scope
* @returns {SerializedType} serialized data
*/
_serialize(data, context) {
/** @type {Buffer} */
let currentBuffer = null;
_serialize(
data,
context,
allocationScope = {
allocationSize: 1024,
increaseCounter: 0,
leftOverBuffer: null
}
) {
/** @type {Buffer} */
let leftOverBuffer = null;
let currentPosition = 0;
/** @type {BufferSerializableType[]} */
let buffers = [];
let buffersTotalLength = 0;
/** @type {Buffer} */
let currentBuffer = allocationScope ? allocationScope.leftOverBuffer : null;
allocationScope.leftOverBuffer = null;
let currentPosition = 0;
if (currentBuffer === null) {
currentBuffer = Buffer.allocUnsafe(allocationScope.allocationSize);
}
const allocate = bytesNeeded => {
if (currentBuffer !== null) {
if (currentBuffer.length - currentPosition >= bytesNeeded) return;
@ -154,22 +166,28 @@ class BinaryMiddleware extends SerializerMiddleware {
leftOverBuffer = null;
} else {
currentBuffer = Buffer.allocUnsafe(
Math.max(
bytesNeeded,
Math.min(Math.max(buffersTotalLength, 1024), 16384)
)
Math.max(bytesNeeded, allocationScope.allocationSize)
);
if (
!(allocationScope.increaseCounter =
(allocationScope.increaseCounter + 1) % 4) &&
allocationScope.allocationSize < 16777216
) {
allocationScope.allocationSize = allocationScope.allocationSize << 1;
}
}
};
const flush = () => {
if (currentBuffer !== null) {
buffers.push(
Buffer.from(
currentBuffer.buffer,
currentBuffer.byteOffset,
currentPosition
)
);
if (currentPosition > 0) {
buffers.push(
Buffer.from(
currentBuffer.buffer,
currentBuffer.byteOffset,
currentPosition
)
);
}
if (
!leftOverBuffer ||
leftOverBuffer.length < currentBuffer.length - currentPosition
@ -180,8 +198,8 @@ class BinaryMiddleware extends SerializerMiddleware {
currentBuffer.byteLength - currentPosition
);
}
currentBuffer = null;
buffersTotalLength += currentPosition;
currentPosition = 0;
}
};
@ -205,232 +223,237 @@ class BinaryMiddleware extends SerializerMiddleware {
}
return size;
};
const serializeData = data => {
for (let i = 0; i < data.length; i++) {
const thing = data[i];
switch (typeof thing) {
case "function": {
if (!SerializerMiddleware.isLazy(thing))
throw new Error("Unexpected function " + thing);
/** @type {SerializedType | (() => SerializedType)} */
let serializedData =
SerializerMiddleware.getLazySerializedValue(thing);
if (serializedData === undefined) {
if (SerializerMiddleware.isLazy(thing, this)) {
const data = this._serialize(thing(), context);
SerializerMiddleware.setLazySerializedValue(thing, data);
serializedData = data;
} else {
serializedData = this._serializeLazy(thing, context);
}
for (let i = 0; i < data.length; i++) {
const thing = data[i];
switch (typeof thing) {
case "function": {
if (!SerializerMiddleware.isLazy(thing))
throw new Error("Unexpected function " + thing);
/** @type {SerializedType | (() => SerializedType)} */
let serializedData =
SerializerMiddleware.getLazySerializedValue(thing);
if (serializedData === undefined) {
if (SerializerMiddleware.isLazy(thing, this)) {
flush();
allocationScope.leftOverBuffer = leftOverBuffer;
const result =
/** @type {(Exclude<PrimitiveSerializableType, Promise<PrimitiveSerializableType>>)[]} */ (
thing()
);
const data = this._serialize(result, context, allocationScope);
leftOverBuffer = allocationScope.leftOverBuffer;
allocationScope.leftOverBuffer = null;
SerializerMiddleware.setLazySerializedValue(thing, data);
serializedData = data;
} else {
serializedData = this._serializeLazy(thing, context);
flush();
buffers.push(serializedData);
break;
}
} else {
if (typeof serializedData === "function") {
flush();
buffers.push(serializedData);
} else {
const lengths = [];
for (const item of serializedData) {
let last;
if (typeof item === "function") {
lengths.push(0);
} else if (item.length === 0) {
// ignore
} else if (
lengths.length > 0 &&
(last = lengths[lengths.length - 1]) !== 0
) {
const remaining = 0xffffffff - last;
if (remaining >= item.length) {
lengths[lengths.length - 1] += item.length;
} else {
lengths.push(item.length - remaining);
lengths[lengths.length - 2] = 0xffffffff;
}
} else {
lengths.push(item.length);
}
}
allocate(5 + lengths.length * 4);
writeU8(LAZY_HEADER);
writeU32(lengths.length);
for (const l of lengths) {
writeU32(l);
}
flush();
for (const item of serializedData) {
buffers.push(item);
}
}
break;
}
case "string": {
const len = Buffer.byteLength(thing);
if (len >= 128 || len !== thing.length) {
allocate(len + HEADER_SIZE + I32_SIZE);
writeU8(STRING_HEADER);
writeU32(len);
currentBuffer.write(thing, currentPosition);
} else {
allocate(len + HEADER_SIZE);
writeU8(SHORT_STRING_HEADER | len);
currentBuffer.write(thing, currentPosition, "latin1");
}
currentPosition += len;
break;
}
case "number": {
const type = identifyNumber(thing);
if (type === 0 && thing >= 0 && thing <= 10) {
// shortcut for very small numbers
allocate(I8_SIZE);
writeU8(thing);
break;
}
/**
* amount of numbers to write
* @type {number}
*/
let n = 1;
for (; n < 32 && i + n < data.length; n++) {
const item = data[i + n];
if (typeof item !== "number") break;
if (identifyNumber(item) !== type) break;
}
switch (type) {
case 0:
allocate(HEADER_SIZE + I8_SIZE * n);
writeU8(I8_HEADER | (n - 1));
while (n > 0) {
currentBuffer.writeInt8(
/** @type {number} */ (data[i]),
currentPosition
);
currentPosition += I8_SIZE;
n--;
i++;
}
break;
case 1:
allocate(HEADER_SIZE + I32_SIZE * n);
writeU8(I32_HEADER | (n - 1));
while (n > 0) {
currentBuffer.writeInt32LE(
/** @type {number} */ (data[i]),
currentPosition
);
currentPosition += I32_SIZE;
n--;
i++;
}
break;
case 2:
allocate(HEADER_SIZE + F64_SIZE * n);
writeU8(F64_HEADER | (n - 1));
while (n > 0) {
currentBuffer.writeDoubleLE(
/** @type {number} */ (data[i]),
currentPosition
);
currentPosition += F64_SIZE;
n--;
i++;
}
break;
}
i--;
break;
}
case "boolean": {
let lastByte = thing === true ? 1 : 0;
const bytes = [];
let count = 1;
let n;
for (n = 1; n < 0xffffffff && i + n < data.length; n++) {
const item = data[i + n];
if (typeof item !== "boolean") break;
const pos = count & 0x7;
if (pos === 0) {
bytes.push(lastByte);
lastByte = item === true ? 1 : 0;
} else if (item === true) {
lastByte |= 1 << pos;
const lengths = [];
for (const item of serializedData) {
let last;
if (typeof item === "function") {
lengths.push(0);
} else if (item.length === 0) {
// ignore
} else if (
lengths.length > 0 &&
(last = lengths[lengths.length - 1]) !== 0
) {
const remaining = 0xffffffff - last;
if (remaining >= item.length) {
lengths[lengths.length - 1] += item.length;
} else {
lengths.push(item.length - remaining);
lengths[lengths.length - 2] = 0xffffffff;
}
count++;
}
i += count - 1;
if (count === 1) {
allocate(HEADER_SIZE);
writeU8(lastByte === 1 ? TRUE_HEADER : FALSE_HEADER);
} else if (count === 2) {
allocate(HEADER_SIZE * 2);
writeU8(lastByte & 1 ? TRUE_HEADER : FALSE_HEADER);
writeU8(lastByte & 2 ? TRUE_HEADER : FALSE_HEADER);
} else if (count <= 6) {
allocate(HEADER_SIZE + I8_SIZE);
writeU8(BOOLEANS_HEADER);
writeU8((1 << count) | lastByte);
} else if (count <= 133) {
allocate(
HEADER_SIZE + I8_SIZE + I8_SIZE * bytes.length + I8_SIZE
);
writeU8(BOOLEANS_HEADER);
writeU8(0x80 | (count - 7));
for (const byte of bytes) writeU8(byte);
writeU8(lastByte);
} else {
allocate(
HEADER_SIZE +
I8_SIZE +
I32_SIZE +
I8_SIZE * bytes.length +
I8_SIZE
);
writeU8(BOOLEANS_HEADER);
writeU8(0xff);
writeU32(count);
for (const byte of bytes) writeU8(byte);
writeU8(lastByte);
lengths.push(item.length);
}
}
allocate(5 + lengths.length * 4);
writeU8(LAZY_HEADER);
writeU32(lengths.length);
for (const l of lengths) {
writeU32(l);
}
flush();
for (const item of serializedData) {
buffers.push(item);
}
break;
}
case "string": {
const len = Buffer.byteLength(thing);
if (len >= 128 || len !== thing.length) {
allocate(len + HEADER_SIZE + I32_SIZE);
writeU8(STRING_HEADER);
writeU32(len);
currentBuffer.write(thing, currentPosition);
} else {
allocate(len + HEADER_SIZE);
writeU8(SHORT_STRING_HEADER | len);
currentBuffer.write(thing, currentPosition, "latin1");
}
currentPosition += len;
break;
}
case "number": {
const type = identifyNumber(thing);
if (type === 0 && thing >= 0 && thing <= 10) {
// shortcut for very small numbers
allocate(I8_SIZE);
writeU8(thing);
break;
}
case "object": {
if (thing === null) {
let n;
for (n = 1; n < 0x100000104 && i + n < data.length; n++) {
const item = data[i + n];
if (item !== null) break;
/**
* amount of numbers to write
* @type {number}
*/
let n = 1;
for (; n < 32 && i + n < data.length; n++) {
const item = data[i + n];
if (typeof item !== "number") break;
if (identifyNumber(item) !== type) break;
}
switch (type) {
case 0:
allocate(HEADER_SIZE + I8_SIZE * n);
writeU8(I8_HEADER | (n - 1));
while (n > 0) {
currentBuffer.writeInt8(
/** @type {number} */ (data[i]),
currentPosition
);
currentPosition += I8_SIZE;
n--;
i++;
}
i += n - 1;
if (n === 1) {
if (i + 1 < data.length) {
const next = data[i + 1];
if (next === true) {
allocate(HEADER_SIZE);
writeU8(NULL_AND_TRUE_HEADER);
break;
case 1:
allocate(HEADER_SIZE + I32_SIZE * n);
writeU8(I32_HEADER | (n - 1));
while (n > 0) {
currentBuffer.writeInt32LE(
/** @type {number} */ (data[i]),
currentPosition
);
currentPosition += I32_SIZE;
n--;
i++;
}
break;
case 2:
allocate(HEADER_SIZE + F64_SIZE * n);
writeU8(F64_HEADER | (n - 1));
while (n > 0) {
currentBuffer.writeDoubleLE(
/** @type {number} */ (data[i]),
currentPosition
);
currentPosition += F64_SIZE;
n--;
i++;
}
break;
}
i--;
break;
}
case "boolean": {
let lastByte = thing === true ? 1 : 0;
const bytes = [];
let count = 1;
let n;
for (n = 1; n < 0xffffffff && i + n < data.length; n++) {
const item = data[i + n];
if (typeof item !== "boolean") break;
const pos = count & 0x7;
if (pos === 0) {
bytes.push(lastByte);
lastByte = item === true ? 1 : 0;
} else if (item === true) {
lastByte |= 1 << pos;
}
count++;
}
i += count - 1;
if (count === 1) {
allocate(HEADER_SIZE);
writeU8(lastByte === 1 ? TRUE_HEADER : FALSE_HEADER);
} else if (count === 2) {
allocate(HEADER_SIZE * 2);
writeU8(lastByte & 1 ? TRUE_HEADER : FALSE_HEADER);
writeU8(lastByte & 2 ? TRUE_HEADER : FALSE_HEADER);
} else if (count <= 6) {
allocate(HEADER_SIZE + I8_SIZE);
writeU8(BOOLEANS_HEADER);
writeU8((1 << count) | lastByte);
} else if (count <= 133) {
allocate(HEADER_SIZE + I8_SIZE + I8_SIZE * bytes.length + I8_SIZE);
writeU8(BOOLEANS_HEADER);
writeU8(0x80 | (count - 7));
for (const byte of bytes) writeU8(byte);
writeU8(lastByte);
} else {
allocate(
HEADER_SIZE +
I8_SIZE +
I32_SIZE +
I8_SIZE * bytes.length +
I8_SIZE
);
writeU8(BOOLEANS_HEADER);
writeU8(0xff);
writeU32(count);
for (const byte of bytes) writeU8(byte);
writeU8(lastByte);
}
break;
}
case "object": {
if (thing === null) {
let n;
for (n = 1; n < 0x100000104 && i + n < data.length; n++) {
const item = data[i + n];
if (item !== null) break;
}
i += n - 1;
if (n === 1) {
if (i + 1 < data.length) {
const next = data[i + 1];
if (next === true) {
allocate(HEADER_SIZE);
writeU8(NULL_AND_TRUE_HEADER);
i++;
} else if (next === false) {
allocate(HEADER_SIZE);
writeU8(NULL_AND_FALSE_HEADER);
i++;
} else if (typeof next === "number") {
const type = identifyNumber(next);
if (type === 0) {
allocate(HEADER_SIZE + I8_SIZE);
writeU8(NULL_AND_I8_HEADER);
currentBuffer.writeInt8(next, currentPosition);
currentPosition += I8_SIZE;
i++;
} else if (next === false) {
allocate(HEADER_SIZE);
writeU8(NULL_AND_FALSE_HEADER);
} else if (type === 1) {
allocate(HEADER_SIZE + I32_SIZE);
writeU8(NULL_AND_I32_HEADER);
currentBuffer.writeInt32LE(next, currentPosition);
currentPosition += I32_SIZE;
i++;
} else if (typeof next === "number") {
const type = identifyNumber(next);
if (type === 0) {
allocate(HEADER_SIZE + I8_SIZE);
writeU8(NULL_AND_I8_HEADER);
currentBuffer.writeInt8(next, currentPosition);
currentPosition += I8_SIZE;
i++;
} else if (type === 1) {
allocate(HEADER_SIZE + I32_SIZE);
writeU8(NULL_AND_I32_HEADER);
currentBuffer.writeInt32LE(next, currentPosition);
currentPosition += I32_SIZE;
i++;
} else {
allocate(HEADER_SIZE);
writeU8(NULL_HEADER);
}
} else {
allocate(HEADER_SIZE);
writeU8(NULL_HEADER);
@ -439,59 +462,64 @@ class BinaryMiddleware extends SerializerMiddleware {
allocate(HEADER_SIZE);
writeU8(NULL_HEADER);
}
} else if (n === 2) {
allocate(HEADER_SIZE);
writeU8(NULL2_HEADER);
} else if (n === 3) {
allocate(HEADER_SIZE);
writeU8(NULL3_HEADER);
} else if (n < 260) {
allocate(HEADER_SIZE + I8_SIZE);
writeU8(NULLS8_HEADER);
writeU8(n - 4);
} else {
allocate(HEADER_SIZE + I32_SIZE);
writeU8(NULLS32_HEADER);
writeU32(n - 260);
allocate(HEADER_SIZE);
writeU8(NULL_HEADER);
}
} else if (Buffer.isBuffer(thing)) {
if (thing.length < 8192) {
allocate(HEADER_SIZE + I32_SIZE + thing.length);
writeU8(BUFFER_HEADER);
writeU32(thing.length);
thing.copy(currentBuffer, currentPosition);
currentPosition += thing.length;
} else {
allocate(HEADER_SIZE + I32_SIZE);
writeU8(BUFFER_HEADER);
writeU32(thing.length);
flush();
buffers.push(thing);
}
}
break;
}
case "symbol": {
if (thing === MEASURE_START_OPERATION) {
measureStart();
} else if (thing === MEASURE_END_OPERATION) {
const size = measureEnd();
} else if (n === 2) {
allocate(HEADER_SIZE);
writeU8(NULL2_HEADER);
} else if (n === 3) {
allocate(HEADER_SIZE);
writeU8(NULL3_HEADER);
} else if (n < 260) {
allocate(HEADER_SIZE + I8_SIZE);
writeU8(NULLS8_HEADER);
writeU8(n - 4);
} else {
allocate(HEADER_SIZE + I32_SIZE);
writeU8(I32_HEADER);
currentBuffer.writeInt32LE(size, currentPosition);
currentPosition += I32_SIZE;
writeU8(NULLS32_HEADER);
writeU32(n - 260);
}
} else if (Buffer.isBuffer(thing)) {
if (thing.length < 8192) {
allocate(HEADER_SIZE + I32_SIZE + thing.length);
writeU8(BUFFER_HEADER);
writeU32(thing.length);
thing.copy(currentBuffer, currentPosition);
currentPosition += thing.length;
} else {
allocate(HEADER_SIZE + I32_SIZE);
writeU8(BUFFER_HEADER);
writeU32(thing.length);
flush();
buffers.push(thing);
}
break;
}
break;
}
case "symbol": {
if (thing === MEASURE_START_OPERATION) {
measureStart();
} else if (thing === MEASURE_END_OPERATION) {
const size = measureEnd();
allocate(HEADER_SIZE + I32_SIZE);
writeU8(I32_HEADER);
currentBuffer.writeInt32LE(size, currentPosition);
currentPosition += I32_SIZE;
}
break;
}
}
};
serializeData(data);
}
flush();
allocationScope.leftOverBuffer = leftOverBuffer;
// avoid leaking memory
currentBuffer = null;
leftOverBuffer = null;
allocationScope = undefined;
const _buffers = buffers;
buffers = undefined;
return _buffers;