Skip to content

Commit

Permalink
Add new chunking algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
lxsmnsyc committed Jan 14, 2024
1 parent a670ffa commit b759638
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
12 changes: 8 additions & 4 deletions packages/start/config/server-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ import {
} from "vinxi/server";
import { getFetchEvent } from "../server/middleware";

function createChunk(data) {
const bytes = data.length;
const baseHex = bytes.toString(16);
const totalHex = '00000000'.substring(0, 8 - baseHex.length) + baseHex; // 32-bit
return new TextEncoder().encode(`;0x${totalHex};${data}`);
}

function serializeToStream(id, value) {
return new ReadableStream({
start(controller) {
Expand All @@ -40,15 +47,12 @@ function serializeToStream(id, value) {
URLPlugin
],
onSerialize(data, initial) {
const result = initial ? `(${getCrossReferenceHeader(id)},${data})` : data;
controller.enqueue(new TextEncoder().encode(`${result};\n`));
controller.enqueue(createChunk(initial ? `(${getCrossReferenceHeader(id)},${data})` : data));
},
onDone() {
// controller.enqueue(`delete $R["${id}"];\n`);
controller.close();
},
onError(error) {
// controller.enqueue(`delete $R["${id}"];\n`);
controller.error(error);
}
});
Expand Down
56 changes: 25 additions & 31 deletions packages/start/config/server-runtime.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
} from 'seroval-plugins/web';
import { createIslandReference } from "../server/islands";

class ChunkReader {
class SerovalChunkReader {
constructor(stream) {
this.reader = stream.getReader();
this.buffer = '';
Expand All @@ -31,10 +31,10 @@ class ChunkReader {
}
}

async nextValue() {
async next() {
// Check if the buffer is empty
if (this.buffer === '') {
// if we are already one...
// if we are already done...
if (this.done) {
return {
done: true,
Expand All @@ -43,38 +43,32 @@ class ChunkReader {
}
// Otherwise, read a new chunk
await this.readChunk();
return await this.next();
}
// Get the first valid seroval chunk
const [first, ...rest] = this.buffer.split('\n');
// Deserialize the seroval chunk
const result = {
done: false,
value: deserialize(first),
};
// if it succeeds, remove the first valid chunk
// from the buffer
this.buffer = rest.join('\n');
return result;
}

async next() {
try {
// Attempt to read a valid seroval chunk
return await this.nextValue();
} catch (error) {
// If it happens that there's an error again
// and we are done reading the buffer
// then the whole stream is invalid.
// Read the "byte header"
// The byte header tells us how big the expected data is
// so we know how much data we should wait before we
// deserialize the data
const bytes = Number.parseInt(this.buffer.substring(1, 11), 16); // ;0x00000000;
// Check if the buffer has enough bytes to be parsed
while (bytes > this.buffer.length - 12) {
// If it's not enough, and the reader is done
// then the chunk is invalid.
if (this.done) {
throw new Error('Malformed server function stream.');
throw new Error('Malformed server function stream.')
}
// Since it's invalid (some syntax-related issue)
// we read a new chunk, and hope there's a valid
// seroval chunk there
// Otherwise, we read more chunks
await this.readChunk();
// Retry again
return await this.next();
}
// Extract the exact chunk as defined by the byte header
const partial = this.buffer.substring(12, 12 + bytes);
// The rest goes to the buffer
this.buffer = this.buffer.substring(12 + bytes);
// Deserialize the chunk
return {
done: false,
value: deserialize(partial),
};
}

async drain() {
Expand All @@ -91,7 +85,7 @@ async function deserializeStream(id, response) {
if (!response.body) {
throw new Error("missing body");
}
const reader = new ChunkReader(response.body);
const reader = new SerovalChunkReader(response.body);

const result = await reader.next();

Expand Down

0 comments on commit b759638

Please sign in to comment.