flow control

This commit is contained in:
nora 2025-08-16 17:06:25 +02:00
parent 4c3079ce5e
commit c752478c73

60
h2.mjs
View file

@ -869,7 +869,6 @@ const handleConnection =
const hpackDecode = new HPackCtx(); const hpackDecode = new HPackCtx();
const hpackEncode = new HPackCtx(); const hpackEncode = new HPackCtx();
let connectionFlowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW_SIZE;
const peerSettings = new Map(); const peerSettings = new Map();
const streams = new Map(); const streams = new Map();
@ -968,10 +967,12 @@ const handleConnection =
const request = buildRequest(rawH2Request); const request = buildRequest(rawH2Request);
const writeDataFrame = (body, flags) => { const writeDataFrame = (body, flags, cb) => {
const payload = Buffer.from(body); const payload = Buffer.from(body);
streams.get(frame.streamIdentifier).flowControlWindowSize -= const h2stream = streams.get(frame.streamIdentifier);
payload.length;
const doWrite = () => {
h2stream.flowControlWindowSize -= payload.length;
socket.write( socket.write(
encodeFrame({ encodeFrame({
@ -981,6 +982,15 @@ const handleConnection =
streamIdentifier: frame.streamIdentifier, streamIdentifier: frame.streamIdentifier,
}) })
); );
cb();
};
if (h2stream.flowControlWindowSize < payload.length) {
h2stream.waitingForFlowControlWindowSize = payload.length;
h2stream.waitingForFlowControlWindowCb = doWrite;
} else {
setImmediate(doWrite);
}
}; };
const bodyStream = new stream.Writable({ const bodyStream = new stream.Writable({
@ -993,18 +1003,22 @@ const handleConnection =
throw new Error("only buffers are supported"); throw new Error("only buffers are supported");
} }
while (chunk.length > 0) { const writePart = () => {
const chunkchunk = chunk.subarray(0, maxPayloadSize); const chunkchunk = chunk.subarray(0, maxPayloadSize);
writeDataFrame(chunkchunk, 0);
chunk = chunk.subarray(maxPayloadSize); chunk = chunk.subarray(maxPayloadSize);
}
writeDataFrame(chunkchunk, 0, () => {
if (chunk.length > 0) {
writePart();
} else {
next(); next();
}
});
};
writePart();
}, },
final: (cb) => { final: (cb) => {
writeDataFrame(Buffer.from([]), DATA_FLAG.END_STREAM); writeDataFrame(Buffer.from([]), DATA_FLAG.END_STREAM, () => {});
cb(null); cb(null);
}, },
}); });
@ -1106,7 +1120,27 @@ const handleConnection =
case FRAME_TYPE.WINDOW_UPDATE: { case FRAME_TYPE.WINDOW_UPDATE: {
// whatever // whatever
const increment = frame.payload.readUint32BE(); const increment = frame.payload.readUint32BE();
console.log(increment, frame.streamIdentifier);
if (frame.streamIdentifier) {
const h2stream = streams.get(frame.streamIdentifier);
if (h2stream) {
h2stream.flowControlWindowSize += increment;
if (
typeof h2stream.waitingForFlowControlWindowSize === "number" &&
h2stream.flowControlWindowSize >=
h2stream.waitingForFlowControlWindowSize
) {
h2stream.waitingForFlowControlWindowSize = undefined;
const cb = h2stream.waitingForFlowControlWindowCb;
h2stream.waitingForFlowControlWindowCb = undefined;
cb();
}
} else {
socket.destroy();
}
}
break; break;
} }
case FRAME_TYPE.GOAWAY: { case FRAME_TYPE.GOAWAY: {
@ -1254,7 +1288,9 @@ server.on(
headers: [["content-type", contentType]], headers: [["content-type", contentType]],
}); });
const fileStream = fs.createReadStream(filepath); const fileStream = fs.createReadStream(filepath, {
highWaterMark: 1_048_576,
});
fileStream.pipe(res.body); fileStream.pipe(res.body);
}); });