diff --git a/h2.mjs b/h2.mjs index cfb63a1..ed66178 100644 --- a/h2.mjs +++ b/h2.mjs @@ -869,7 +869,6 @@ const handleConnection = const hpackDecode = new HPackCtx(); const hpackEncode = new HPackCtx(); - let connectionFlowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW_SIZE; const peerSettings = new Map(); const streams = new Map(); @@ -968,19 +967,30 @@ const handleConnection = const request = buildRequest(rawH2Request); - const writeDataFrame = (body, flags) => { + const writeDataFrame = (body, flags, cb) => { const payload = Buffer.from(body); - streams.get(frame.streamIdentifier).flowControlWindowSize -= - payload.length; + const h2stream = streams.get(frame.streamIdentifier); - socket.write( - encodeFrame({ - type: FRAME_TYPE.DATA, - flags: flags, - payload, - streamIdentifier: frame.streamIdentifier, - }) - ); + const doWrite = () => { + h2stream.flowControlWindowSize -= payload.length; + + socket.write( + encodeFrame({ + type: FRAME_TYPE.DATA, + flags: flags, + payload, + streamIdentifier: frame.streamIdentifier, + }) + ); + cb(); + }; + + if (h2stream.flowControlWindowSize < payload.length) { + h2stream.waitingForFlowControlWindowSize = payload.length; + h2stream.waitingForFlowControlWindowCb = doWrite; + } else { + setImmediate(doWrite); + } }; const bodyStream = new stream.Writable({ @@ -993,18 +1003,22 @@ const handleConnection = throw new Error("only buffers are supported"); } - while (chunk.length > 0) { + const writePart = () => { const chunkchunk = chunk.subarray(0, maxPayloadSize); - - writeDataFrame(chunkchunk, 0); - chunk = chunk.subarray(maxPayloadSize); - } - next(); + writeDataFrame(chunkchunk, 0, () => { + if (chunk.length > 0) { + writePart(); + } else { + next(); + } + }); + }; + writePart(); }, final: (cb) => { - writeDataFrame(Buffer.from([]), DATA_FLAG.END_STREAM); + writeDataFrame(Buffer.from([]), DATA_FLAG.END_STREAM, () => {}); cb(null); }, }); @@ -1106,7 +1120,27 @@ const handleConnection = case FRAME_TYPE.WINDOW_UPDATE: { // whatever const increment = frame.payload.readUint32BE(); - console.log(increment, frame.streamIdentifier); + + if (frame.streamIdentifier) { + const h2stream = streams.get(frame.streamIdentifier); + if (h2stream) { + h2stream.flowControlWindowSize += increment; + + if ( + typeof h2stream.waitingForFlowControlWindowSize === "number" && + h2stream.flowControlWindowSize >= + h2stream.waitingForFlowControlWindowSize + ) { + h2stream.waitingForFlowControlWindowSize = undefined; + const cb = h2stream.waitingForFlowControlWindowCb; + h2stream.waitingForFlowControlWindowCb = undefined; + cb(); + } + } else { + socket.destroy(); + } + } + break; } case FRAME_TYPE.GOAWAY: { @@ -1254,7 +1288,9 @@ server.on( headers: [["content-type", contentType]], }); - const fileStream = fs.createReadStream(filepath); + const fileStream = fs.createReadStream(filepath, { + highWaterMark: 1_048_576, + }); fileStream.pipe(res.body); });