Home Reference Source Repository

src/log-index-file.js

'use strict';

import assert from 'assert-plus';
import RandomAccessFile from 'ranfile';

/******************************************************************************
The log index file is a side-file that tracks the byte offsets to each distinct
log entry. Using the side-file indexing scheme, we are able to efficiently
store and retrieve each distinct log entry without having to care about the
entry's byte content. In this scheme, log-serial-numbers (LSNs) are the natural
integer index at which the log entry was written, as if being written to an
array.

Layout:

marker   -- 4 marker bytes identifying the file's type: IDX$
base     -- 4 bytes integer (big-endian); an index offset, usually 0 (zero).
            Offsets are only meaningful when indexes are split across many
            files so are a placeholder for future work.
head     -- 4 bytes integer (big-endian); the write head (next unused entry)
commit   -- 4 bytes integer (big-endian); the commit head (may trail head)

In this layout there are 16 header bytes (hlen), therefore each subsequent 4
bytes contains a pointer to a log entry's starting byte in the indexed write-
ahead-log (WAL).

hlen + ((index - base) * sizeof(int))


Example (octets)...

            Base      Head        Commit      Index 0     Index 1
            v           v           v           V           v
49 44 58 24 00 00 00 00 00 00 00 02 00 00 00 01 00 00 00 00 00 00 00 58 \

Index 2
v
00 00 02 1f

Log Entry 0 begins at byte 0
Log Entry 1 begins at byte 88
Log Entry 2 begins at byte 543



******************************************************************************/

const MARKER = 'IDX$';
const OFS_MARKER = 0;
const OFS_BASEINDEX = 4;
const OFS_HEAD = 8;
const OFS_COMMIT = 12;
const HLEN = 16;
const SIZEOF_INT = 4;
const DEF_COMMIT = -1;

const $file = Symbol('file');
const $header = Symbol('header');

function allocFill(len) {
  return (Buffer.alloc) ?
    Buffer.alloc(len) :
    new Buffer(len); // deprecated in 5.x+
}

class LogIndexFile {

  constructor(file) {
    assert.object(file, 'file');
    assert.func(file.read, 'file.read');
    assert.func(file.write, 'file.write');
    assert.func(file.close, 'file.close');
    if (file.size < HLEN) {
      throw new Error('File too small to be an index file.');
    }
    this[$file] = file;
  }

  get name() { return this[$file].name; }

  get writable() {
    return this[$file].writable;
  }

  get marker() {
    assert.ok(this[$header], 'index must be open');
    return this[$header].toString('ascii', OFS_MARKER, OFS_BASEINDEX);
  }

  get baseIndex() {
    assert.ok(this[$header], 'index must be open');
    return this[$header].readInt32BE(OFS_BASEINDEX, true);
  }

  get head() {
    assert.ok(this[$header], 'index must be open');
    return this[$header].readInt32BE(OFS_HEAD, true);
  }

  get commitHead() {
    assert.ok(this[$header], 'index must be open');
    return this[$header].readInt32BE(OFS_COMMIT, true);
  }

  isCommitted(index) {
    assert.ok(this[$header], 'index must be open');
    return index < this.commitHead;
  }

  commit(index) {
    assert.ok(this[$header], 'index must be open');
    assert.number(index, 'index');
    let expected = this.commitHead + 1;
    if (index < expected) {
      return Promise.resolve(index);
    }
    if (index !== expected) {
      return Promise.reject(new Error(`Out of order commit; expected ${expected} but received ${index}.`));
    }
    this[$header].writeInt32BE(index, OFS_COMMIT, true);
    return this[$file].write(OFS_COMMIT, this[$header], OFS_COMMIT, SIZEOF_INT)
      .then(() => index);
  }

  localizeIndex(index) {
    assert.number(index, 'index');
    index = index - this.baseIndex;
    return index;
  }

  globalizeIndex(index) {
    assert.number(index, 'index');
    index = index + this.baseIndex;
    return index;
  }

  offset(index) {
    assert.ok(index <= this.head, 'index out of range');
    assert.ok(this[$header], 'index must be open');
    let ofs = HLEN + (this.localizeIndex(index) * SIZEOF_INT);
    return this[$file].read(ofs, SIZEOF_INT)
      .then(data => {
        let offset = data.readInt32BE(0, SIZEOF_INT);
        return offset;
      });
  }

  get(index) {
    assert.number(index, 'index');
    assert.ok(index < this.head, 'index out of range');
    assert.ok(this[$header], 'index must be open');
    let ofs = HLEN + (this.localizeIndex(index) * SIZEOF_INT);
    return this[$file].read(ofs, 2 * SIZEOF_INT)
      .then(data => {
        let offset = data.readInt32BE(0, SIZEOF_INT);
        return {
          offset,
          length: data.readInt32BE(SIZEOF_INT, SIZEOF_INT) - offset
        };
      });
  }

  getRange(index, count) {
    assert.number(index, 'index');
    assert.ok(index < this.head, 'index out of range');
    assert.number(count, 'count');
    assert.ok(this[$header], 'index must be open');
    // head points at the next entry
    let limit = (this.head - index);
    assert.ok(count <= limit, 'count puts index out of range');
    let ofs = HLEN + (this.localizeIndex(index) * SIZEOF_INT);
    return this[$file].read(ofs, (count + 1) * SIZEOF_INT)
      .then(data => {
        let i = -1;
        let result = [];
        while (++i < count) {
          let offset = data.readInt32BE(i * SIZEOF_INT, SIZEOF_INT);
          result.push({
            index: index + i,
            offset,
            length: data.readInt32BE((i + 1) * SIZEOF_INT, SIZEOF_INT) - offset
          });
        }
        return result;
      });
  }

  truncate(from) {
    assert.number(from, 'from');
    assert.ok(from > this.commitHead, 'cannot truncate a committed log entry');
    assert.ok(from < this.head, 'index out of range');
    let header = this[$header];
    header.writeInt32BE(from, OFS_HEAD, SIZEOF_INT);
    return this[$file].write(OFS_HEAD, header, OFS_HEAD, SIZEOF_INT)
      .then(() => {
        if (this.head === this.baseIndex) {
          return this[$file].read(HLEN, SIZEOF_INT)
            .then(data => {
              return {
                offset: data.readInt32BE(0, SIZEOF_INT),
                length: 0
              };
            });
        }
        return this.get(this.head - 1);
      })
      .then(rec => rec.offset + rec.length);
  }

  increment(offset) {
    assert.ok(this[$header], 'index must be open');
    // write the offset into the next index location (typically at EOF),
    // then update and write the header reference, which completes the
    // increment.
    let header = this[$header];
    let current = this.head;
    let next = current + 1;
    let ofs = HLEN + (this.localizeIndex(next) * SIZEOF_INT);
    let data = allocFill(SIZEOF_INT);
    data.writeInt32BE(offset, 0, SIZEOF_INT);
    return this[$file].write(ofs, data)
      .then(() => {
        header.writeInt32BE(next, OFS_HEAD, SIZEOF_INT);
        return this[$file].write(OFS_HEAD, header, OFS_HEAD, SIZEOF_INT);
      })
      .then(() => {
        return current;
      });
  }

  open() {
    assert.ok(!this[$header], 'index already open');
    let file = this[$file];
    return file.read(OFS_MARKER, HLEN)
      .then(data => {
        let marker = data.toString('ascii', OFS_MARKER, OFS_BASEINDEX);
        if (marker !== MARKER) {
          throw new Error(`Invalid index file; expected file marker to be ${MARKER} but found ${marker}.`);
        }
        this[$header] = data;
        return this;
      });
  }

  close() {
    return this[$file].close()
      .then(() => {
        this[$header] = null;
      });
  }

}

function open(path, writable) {
  assert.string(path, 'path');
  assert.optionalBool(writable, 'writable');
  writable = writable === true;
  return RandomAccessFile.open(path, writable)
    .then(file => new LogIndexFile(file).open());
}

function create(path, baseIndex, byteOffset) {
  assert.string(path, 'path');
  assert.optionalNumber(baseIndex, 'baseIndex');
  assert.optionalNumber(byteOffset, 'byteOffset');
  if (baseIndex === undefined) {
    baseIndex = 0;
  }
  if (byteOffset === undefined) {
    byteOffset = 0;
  }
  let data = allocFill(HLEN + SIZEOF_INT);
  data.write(MARKER, 'ascii');
  data.writeInt32BE(baseIndex, OFS_BASEINDEX, true);
  data.writeInt32BE(baseIndex, OFS_HEAD, true);
  data.writeInt32BE(DEF_COMMIT, OFS_COMMIT, true);
  data.writeInt32BE(byteOffset, HLEN, true);
  return RandomAccessFile.create(path)
    .then(file => file.write(OFS_MARKER, data)
      .then(() => new LogIndexFile(file).open()));
}

LogIndexFile.open = open;
LogIndexFile.create = create;

export default LogIndexFile;