update to stream

This commit is contained in:
nora 2025-08-16 16:46:56 +02:00
parent 9d8ebdac82
commit 4c3079ce5e

58
h2.mjs
View file

@ -2,6 +2,7 @@ import * as net from "node:net";
import { EventEmitter } from "node:events"; import { EventEmitter } from "node:events";
import * as fs from "node:fs"; import * as fs from "node:fs";
import path from "node:path"; import path from "node:path";
import * as stream from "node:stream";
const debug = process.env.DEBUG?.includes("h2.js"); const debug = process.env.DEBUG?.includes("h2.js");
@ -967,7 +968,7 @@ const handleConnection =
const request = buildRequest(rawH2Request); const request = buildRequest(rawH2Request);
const writeData = (body, flags) => { const writeDataFrame = (body, flags) => {
const payload = Buffer.from(body); const payload = Buffer.from(body);
streams.get(frame.streamIdentifier).flowControlWindowSize -= streams.get(frame.streamIdentifier).flowControlWindowSize -=
payload.length; payload.length;
@ -982,6 +983,32 @@ const handleConnection =
); );
}; };
const bodyStream = new stream.Writable({
write: (chunk, encoding, next) => {
const frameSize =
peerSettings.get(SETTING.SETTINGS_MAX_FRAME_SIZE) ?? 16_384;
const maxPayloadSize = frameSize - 24;
if (encoding !== "buffer") {
throw new Error("only buffers are supported");
}
while (chunk.length > 0) {
const chunkchunk = chunk.subarray(0, maxPayloadSize);
writeDataFrame(chunkchunk, 0);
chunk = chunk.subarray(maxPayloadSize);
}
next();
},
final: (cb) => {
writeDataFrame(Buffer.from([]), DATA_FLAG.END_STREAM);
cb(null);
},
});
/** /**
* @type {ResponseWriter} * @type {ResponseWriter}
*/ */
@ -1014,12 +1041,7 @@ const handleConnection =
}) })
); );
}, },
bodyPart: (body) => { body: bodyStream,
writeData(body, 0);
},
end: (body) => {
writeData(body, DATA_FLAG.END_STREAM);
},
}; };
// friends, we got a request! // friends, we got a request!
@ -1194,7 +1216,8 @@ server.on(
contentLength: html.length, contentLength: html.length,
headers: [["content-type", "text/html; charset=utf-8"]], headers: [["content-type", "text/html; charset=utf-8"]],
}); });
res.end(html); res.body.write(html);
res.body.end();
}); });
return; return;
} }
@ -1231,24 +1254,9 @@ server.on(
headers: [["content-type", contentType]], headers: [["content-type", contentType]],
}); });
let remainingSize = size; const fileStream = fs.createReadStream(filepath);
const readDataPart = () => fileStream.pipe(res.body);
fs.read(fd, (err, bytesRead, buffer) => {
if (err) {
console.error("error reading from file", err);
res.end("");
}
remainingSize -= bytesRead;
if (remainingSize === 0) {
res.end(buffer.subarray(0, bytesRead));
} else {
res.bodyPart(buffer.subarray(0, bytesRead));
readDataPart();
}
});
readDataPart();
}); });
return; return;
} }