Skip to content

Unaligned writes to fs.ReadStream internal Buffer pool leads to misaligned reads #24817

Closed
@trxcllnt

Description

@trxcllnt
  • Version: v11.3.0
  • Platform: Linux 4.15.0-42-generic Ubuntu x86_64
  • Subsystem: fs

The internal buffer pooling inside fs.ReadStream doesn't align its pooled writes to any byte boundaries, leading to issues consuming chunks where alignment is required (such as creating typed array views over the underlying ArrayBuffer).

In the simplest case, picture two fs.ReadStreams that are consumed concurrently. One is consumed with no alignment requirement, but the second must be consumed aligned to an 8-byte boundary (for example, as chunks of Float64s). Reads from the unaligned stream will cause the pool.used offset to advance by positions that aren't multiples of 8, such that the next read from the aligned stream won't be written at a multiple-of-8 offset into the pool.

Attached is a repro script demonstrating the issue. The byte lengths of the test files have been carefully selected to reliably reproduce the problem.

One possible fix is to add an alignment member variable to the fs.ReadStream, such that writes into the pool are aligned correctly for each stream instance. Another alternative is to always write on some base address alignment (128 is common), but this will use slightly more memory and is less flexible than the former.

$ node createReadStreamTest.js 
advanceUnaligned done
RangeError: start offset of Float64Array should be a multiple of 8. Received 51050
    at asTypedArray (createReadStreamTest.js:63:15)
    at readN (createReadStreamTest.js:36:5)
    at process.internalTickCallback (internal/process/next_tick.js:77:7)

screenshot from 2018-12-03 14-40-51

// createReadStreamTest.js
const fs = require('fs');

(async () => {

    await makeTestFile(`${__dirname}/bytesA`, 116586);
    await makeTestFile(`${__dirname}/bytesB`, 3961808);

    const streamA = fs.createReadStream(`${__dirname}/bytesA`);
    const streamB = fs.createReadStream(`${__dirname}/bytesB`);

    await advanceUnaligned(streamA);
    await advanceAligned8s(streamB);

    [streamA, streamB].forEach((s) => s.close());
})()
.then(() => 0, (err) => console.error(err) || 1)
.then((code) => process.exit(code));

async function makeTestFile(p, nBytes) {
    const exists = async (p) => {
        try { return !!(await fs.promises.stat(p)); }
        catch (e) { return false; }
    };
    if (!(await exists(p))) {
        const buffer = Uint8Array.from({ length: nBytes }, (_, i) => i);
        await fs.promises.writeFile(p, Buffer.from(buffer.buffer, 0, buffer.byteLength));
    }
}

async function readN(stream, Arr, n) {
    let buf = stream.read(n);
    if (!buf) {
        await onEvent(stream, 'readable');
        buf = stream.read(n);
    }
    asTypedArray(Arr, buf);
}

async function advanceUnaligned(stream) {
    const end = onEvent(stream, 'end');
    while (1) {
        if ('end' === (await Promise.race([end, readN(stream, Uint16Array, (Math.random() * 100 | 0) * 6)]))) { break; }
        if ('end' === (await Promise.race([end, readN(stream, Uint16Array, (Math.random() * 100 | 0) * 4)]))) { break; }
        if ('end' === (await Promise.race([end, readN(stream, Uint16Array, (Math.random() * 100 | 0) * 2)]))) { break; }
    }
    console.log(`advanceUnaligned done`);
}

async function advanceAligned8s(stream) {
    const end = onEvent(stream, 'end');
    while (1) {
        if ('end' === (await Promise.race([end, readN(stream, Float64Array, (Math.random() * 100 | 0) * 8)]))) { break; }
        if ('end' === (await Promise.race([end, readN(stream, Float64Array, (Math.random() * 100 | 0) * 8)]))) { break; }
        if ('end' === (await Promise.race([end, readN(stream, Float64Array, (Math.random() * 100 | 0) * 8)]))) { break; }
    }
    console.log(`advanceAligned8s done`);
}

function onEvent(stream, event) { return new Promise((r) => stream.once(event, () => r(event))); }
function asTypedArray(ArrayCtor, buf) {
    const { buffer, byteOffset, byteLength } = buf || new Uint8Array(ArrayCtor.BYTES_PER_ELEMENT);
    if (byteOffset % ArrayCtor.BYTES_PER_ELEMENT !== 0) {
        throw new RangeError(`start offset of ${ArrayCtor.name} should be a multiple of ${ArrayCtor.BYTES_PER_ELEMENT}. Received ${byteOffset}`);
    }
    return new ArrayCtor(buffer, byteOffset, byteLength / ArrayCtor.BYTES_PER_ELEMENT);
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bufferIssues and PRs related to the buffer subsystem.fsIssues and PRs related to the fs subsystem / file system.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions