Skip to content

Commit

Permalink
fix: tweaks and cleanup based on review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
FUDCo committed May 27, 2021
1 parent e67c4ef commit ba95e34
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 81 deletions.
63 changes: 35 additions & 28 deletions packages/swing-store-lmdb/src/lmdbSwingStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const encoder = new util.TextEncoder();

/**
* @typedef { import('@agoric/swing-store-simple').KVStore } KVStore
* @typedef { import('@agoric/swing-store-simple').StreamPosition } StreamPosition
* @typedef { import('@agoric/swing-store-simple').StreamStore } StreamStore
* @typedef { import('@agoric/swing-store-simple').StreamWriter } StreamWriter
* @typedef { import('@agoric/swing-store-simple').SwingStore } SwingStore
Expand Down Expand Up @@ -183,6 +184,13 @@ function makeSwingStore(dirPath, forceReset = false) {
);
}

function insistStreamPosition(position) {
assert.typeof(position.itemCount, 'number');
assert(position.itemCount >= 0);
assert.typeof(position.offset, 'number');
assert(position.offset >= 0);
}

function closefd(fd) {
try {
fs.closeSync(fd);
Expand All @@ -206,33 +214,35 @@ function makeSwingStore(dirPath, forceReset = false) {
closefd(fd);
streamFds.delete(streamName);
activeStreamFds.delete(fd);
streamStatus.set(streamName, 'unused');
streamStatus.delete(streamName);
}
}

/**
* Generator function that returns an iterator over the items in a stream.
*
* @param {string} streamName The stream to read
* @param {Object} startPosition The position to start reading from
* @param {Object} endPosition The position of the end of the stream
* @param {Object?} startPosition Optional position to start reading from
*
* @returns {Iterable<string>} an iterator for the items in the named stream
*/
function openReadStream(streamName, endPosition, startPosition) {
function openReadStream(streamName, startPosition, endPosition) {
insistStreamName(streamName);
const status = streamStatus.get(streamName);
assert(
status === 'unused' || !status,
!status,
X`can't read stream ${q(streamName)} because it's already in use`,
);
insistStreamPosition(startPosition);
insistStreamPosition(endPosition);
assert(startPosition.itemCount <= endPosition.itemCount);

let itemCount = endPosition.itemCount;
if (endPosition.offset === 0) {
assert(itemCount === 0);
return [];
} else {
assert(itemCount > 0);
assert(endPosition.offset > 0);
const filePath = `${dirPath}/${streamName}`;
fs.truncateSync(filePath, endPosition.offset);
const fd = fs.openSync(filePath, 'r');
Expand All @@ -243,13 +253,11 @@ function makeSwingStore(dirPath, forceReset = false) {
statusCounter += 1;
streamStatus.set(streamName, readStatus);
// let startOffset = 0;
let skipCount = 0;
if (startPosition) {
// itemCount -= startPosition.itemCount;
// startOffset = startPosition.offset;
skipCount = startPosition.itemCount;
assert(skipCount <= endPosition.itemCount);
}
let skipCount = startPosition.itemCount;

// itemCount -= startPosition.itemCount;
// startOffset = startPosition.offset;

// We would like to be able to seek Readlines to a particular position
// in the file before it starts reading. Unfortunately, it is hardcoded
// to reset to 0 at the start and then manually walk itself through the
Expand Down Expand Up @@ -288,6 +296,8 @@ function makeSwingStore(dirPath, forceReset = false) {
break;
}
}
} catch (e) {
console.log(e);
} finally {
const statusEnd = streamStatus.get(streamName);
assert(
Expand All @@ -313,7 +323,7 @@ function makeSwingStore(dirPath, forceReset = false) {
insistStreamName(streamName);
const status = streamStatus.get(streamName);
assert(
status === 'unused' || !status,
!status,
X`can't write stream ${q(streamName)} because it's already in use`,
);
streamStatus.set(streamName, 'write');
Expand All @@ -334,7 +344,7 @@ function makeSwingStore(dirPath, forceReset = false) {
* Write to a stream.
*
* @param {string} item The item to write
* @param {Object|null} position The position to write the item
* @param {Object} position The position to write the item
*
* @returns {Object} the new position after writing
*/
Expand All @@ -344,39 +354,36 @@ function makeSwingStore(dirPath, forceReset = false) {
streamFds.get(streamName) === fd,
X`can't write to closed stream ${q(streamName)}`,
);
if (!position) {
position = STREAM_START;
}
insistStreamPosition(position);
const buf = encoder.encode(`${item}\n`);
fs.writeSync(fd, buf, 0, buf.length, position.offset);
fs.fsyncSync(fd);
return {
return harden({
offset: position.offset + buf.length,
itemCount: position.itemCount + 1,
};
});
}
return write;
}

const streamStore = {
const streamStore = harden({
openReadStream,
openWriteStream,
closeStream,
STREAM_START,
};
});

/**
* Commit unsaved changes.
*/
function commit() {
if (txn) {
for (const fd of activeStreamFds) {
closefd(fd);
}
activeStreamFds.clear();
txn.commit();
txn = null;
}
for (const fd of activeStreamFds) {
fs.fsyncSync(fd);
}
activeStreamFds.clear();
}

/**
Expand All @@ -400,7 +407,7 @@ function makeSwingStore(dirPath, forceReset = false) {
activeStreamFds.clear();
}

return { kvStore, streamStore, commit, close, diskUsage };
return harden({ kvStore, streamStore, commit, close, diskUsage });
}

/**
Expand Down
51 changes: 36 additions & 15 deletions packages/swing-store-lmdb/test/test-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ test('streamStore read/write', t => {
t.is(isSwingStore(dbDir), false);
const { streamStore, commit, close } = initSwingStore(dbDir);

let s1pos;
let s1pos = streamStore.STREAM_START;
const writer1 = streamStore.openWriteStream('st1');
s1pos = writer1('first', s1pos);
s1pos = writer1('second', s1pos);
const s1posAlt = { ...s1pos };
const writer2 = streamStore.openWriteStream('st2');
s1pos = writer1('third', s1pos);
let s2pos = { offset: 0, itemCount: 0 };
let s2pos = streamStore.STREAM_START;
s2pos = writer2('oneth', s2pos);
s1pos = writer1('fourth', s1pos);
s2pos = writer2('twoth', s2pos);
Expand All @@ -87,7 +87,11 @@ test('streamStore read/write', t => {
s2pos = writer2('fourst', s2pos);
streamStore.closeStream('st1');
streamStore.closeStream('st2');
const reader1 = streamStore.openReadStream('st1', s1pos);
const reader1 = streamStore.openReadStream(
'st1',
streamStore.STREAM_START,
s1pos,
);
const reads1 = [];
for (const item of reader1) {
reads1.push(item);
Expand All @@ -96,22 +100,26 @@ test('streamStore read/write', t => {
const writer2alt = streamStore.openWriteStream('st2');
s2pos = writer2alt('re3', s2posAlt);
streamStore.closeStream('st2');
const reader2 = streamStore.openReadStream('st2', s2pos);
const reader2 = streamStore.openReadStream(
'st2',
streamStore.STREAM_START,
s2pos,
);
const reads2 = [];
for (const item of reader2) {
reads2.push(item);
}
t.deepEqual(reads2, ['oneth', 'twoth', 're3']);

const reader1alt = streamStore.openReadStream('st1', s1pos, s1posAlt);
const reader1alt = streamStore.openReadStream('st1', s1posAlt, s1pos);
const reads1alt = [];
for (const item of reader1alt) {
reads1alt.push(item);
}
t.deepEqual(reads1alt, ['third', 'fourth']);

const writerEmpty = streamStore.openWriteStream('empty');
const emptyPos = writerEmpty('filler');
const emptyPos = writerEmpty('filler', streamStore.STREAM_START);
streamStore.closeStream('empty');
const readerEmpty = streamStore.openReadStream('empty', emptyPos, emptyPos);
const readsEmpty = [];
Expand All @@ -122,6 +130,7 @@ test('streamStore read/write', t => {
const readerEmpty2 = streamStore.openReadStream(
'empty',
streamStore.STREAM_START,
streamStore.STREAM_START,
);
const readsEmpty2 = [];
for (const item of readerEmpty2) {
Expand All @@ -141,22 +150,32 @@ test('streamStore mode interlock', t => {
const { streamStore, commit, close } = initSwingStore(dbDir);

const writer = streamStore.openWriteStream('st1');
const s1pos = writer('first');
t.throws(() => streamStore.openReadStream('st1', s1pos), {
message: `can't read stream "st1" because it's already in use`,
});
const s1pos = writer('first', streamStore.STREAM_START);
t.throws(
() => streamStore.openReadStream('st1', streamStore.STREAM_START, s1pos),
{
message: `can't read stream "st1" because it's already in use`,
},
);
t.throws(() => streamStore.openWriteStream('st1', s1pos), {
message: `can't write stream "st1" because it's already in use`,
});
streamStore.closeStream('st1');
t.throws(() => writer('second'), {
t.throws(() => writer('second', streamStore.STREAM_START), {
message: `can't write to closed stream "st1"`,
});

const reader = streamStore.openReadStream('st1', s1pos);
t.throws(() => streamStore.openReadStream('st1', s1pos), {
message: `can't read stream "st1" because it's already in use`,
});
const reader = streamStore.openReadStream(
'st1',
streamStore.STREAM_START,
s1pos,
);
t.throws(
() => streamStore.openReadStream('st1', streamStore.STREAM_START, s1pos),
{
message: `can't read stream "st1" because it's already in use`,
},
);
t.throws(() => streamStore.openWriteStream('st1'), {
message: `can't write stream "st1" because it's already in use`,
});
Expand All @@ -165,6 +184,8 @@ test('streamStore mode interlock', t => {
message: `can't read stream "st1", it's been closed`,
});

streamStore.closeStream('nonexistent');

commit();
close();
});
Expand Down
Loading

0 comments on commit ba95e34

Please sign in to comment.