diff --git a/commit.js b/commit.js index 6e3bcf6d..998e1f99 100644 --- a/commit.js +++ b/commit.js @@ -16,6 +16,7 @@ class Commit { this._commit = path.join(journalist.directory, 'commit') } + // TODO Should be a hash of specific files to filter, not a regex. async write (commit) { const dir = await this._readdir() const unemplaced = dir.filter(file => ! /\d+\.\d+-\d+\.\d+\.[0-9a-f]/) @@ -126,6 +127,7 @@ class Commit { const buffer = Buffer.from(JSON.stringify(entry.value.items)) const hash = fnv(buffer) entry.heft = buffer.length + // TODO `this._path()` await fs.writeFile(path.join(this._commit, `${entry.value.id}-${hash}`), buffer) const from = path.join('commit', `${entry.value.id}-${hash}`) const to = path.join('pages', entry.value.id, hash) @@ -136,6 +138,51 @@ class Commit { } } + async _vacuum (id, first, second, items) { + await fs.mkdir(this._commit, { recursive: true }) + const filename = this._path(`${id}-${first}`) + const recorder = this._journalist._recorder + // Write out a new page slowly, a record at a time. + for (let index = 0, I = items.length; index < I; index++) { + const { key, body } = items[index] + await fs.appendFile(filename, recorder({ method: 'insert', index, key }, body)) + } + await fs.appendFile(filename, recorder({ + method: 'dependent', id: id, append: second + })) + return { + method: 'rename', + from: path.join('commit', `${id}-${first}`), + to: path.join('pages', id, first), + hash: hash + } + } + + async vacuum (id, first, second, items, right) { + await fs.mkdir(this._commit, { recursive: true }) + const filename = this._path(`${id}-${first}`) + const recorder = this._journalist._recorder + const buffers = [] + buffers.push(recorder({ method: 'right', right })) + // Write out a new page slowly, a record at a time. + for (let index = 0, I = items.length; index < I; index++) { + const { key, value } = items[index] + buffers.push(recorder({ method: 'insert', index, key }, value)) + } + buffers.push(recorder({ + method: 'dependent', id: id, append: second + })) + const buffer = Buffer.concat(buffers) + const hash = fnv(buffer) + await fs.writeFile(filename, buffer) + return { + method: 'rename', + from: path.join('commit', `${id}-${first}`), + to: path.join('pages', id, first), + hash: hash + } + } + // Okay. Now I see. I wanted the commit to be light and easy and minimal, so // that it could be written quickly and loaded quickly, but that is only // necessary for the leaf. We really want a `Prepare` that will write files @@ -186,6 +233,11 @@ class Commit { await this._prepare([ 'rename', from, to, hash ]) } break + case 'rename': { + const { from, to, hash } = operation + await this._prepare([ 'rename', from, to, hash ]) + } + break case 'emplace': { const { page, hash } = operation const from = path.join('commit', `${page.id}-${hash}`) diff --git a/journalist.js b/journalist.js index bd81b5c0..6eae8039 100644 --- a/journalist.js +++ b/journalist.js @@ -13,6 +13,8 @@ const Future = require('prospective/future') const Commit = require('./commit') const fnv = require('./fnv') +const Keyify = require('keyify') + const Turnstile = require('turnstile') Turnstile.Queue = require('turnstile/queue') Turnstile.Set = require('turnstile/set') @@ -34,6 +36,42 @@ function increment (value) { return value + 1 & 0xffffffff } +function keyify ({ id, append }, override = null) { + return Keyify.stringify({ id, append: override || append }) +} + +function reference ({ id, append }, override = null) { + return { id, append: override || append } +} + +class Page { + constructor (dependents) { + this._dependents = dependents + } + + add ({ id, append }, override = null) { + this._dependents[`${id}/${override || append}`] = true + } +} + +class Dependencies { + constructor (dependencies) { + this._dependencies = dependencies + } + + page ({ id, append }, override = null) { + const key = `${id}/${override || append}` + if (!(key in this._dependencies)) { + this._dependencies[key] = {} + } + return new Page(this._dependencies[key]) + } + + merge (dependencies) { + this._dependencies = { ...this._dependencies, ...dependencies } + } +} + class Journalist { constructor (destructible, options) { const leaf = coalesece(options.leaf, {}) @@ -113,8 +151,20 @@ class Journalist { async _read (id, append) { const page = { - id, leaf: true, items: [], deleted: false, lock: null, right: null, ghosts: 0, append + id, + leaf: true, + items: [], + dependencies: {}, + deletes: 0, + // TODO Rename merged. + deleted: false, + lock: null, + right: null, + ghosts: 0, + append } + const dependencies = new Dependencies(page.dependencies) + dependencies.page({ id, append }) const player = new Player(function () { return '0' }) const readable = fileSystem.createReadStream(this._path('pages', id, append)) for await (let chunk of readable) { @@ -125,9 +175,12 @@ class Journalist { } break case 'load': { - const { page: loaded } = await this._read(entry.header.id, entry.header.append) + const { id, append } = entry.header + const { page: loaded } = await this._read(id, append) page.items = loaded.items page.right = loaded.right + dependencies.merge(loaded.dependencies) + // TODO Put an error here and there is no abend. } break case 'slice': { @@ -140,6 +193,7 @@ class Journalist { case 'merge': { const { page: right } = await this._read(entry.header.id, entry.header.append) page.items.push.apply(page.items, right.items.slice(right.ghosts)) + dependencies.merge(right.dependencies) } break case 'insert': { @@ -152,6 +206,12 @@ class Journalist { break case 'delete': { page.items.splice(entry.header.index, 1) + page.deletes++ + } + break + case 'dependent': { + const { id, append } = entry.header + dependencies.page({ id, append }).add(entry.header) } break } @@ -336,6 +396,7 @@ class Journalist { const entry = this._hold(id) const buffers = writes.map(write => { const buffer = recorder(write.header, write.body) + // TODO Where is heft removal? if (write.header.method == 'insert') { entry.heft += (write.record.heft = buffer.length) } @@ -454,6 +515,135 @@ class Journalist { return `${this.instance}.${this._id++}` } + async _vacuum (key) { + const entries = [] + const leaf = await this.descend({ key }) + + const blockId = this._blockId = increment(this._blockId) + const block = this._block(blockId, leaf.entry.value.id) + await block.enter.promise + + const items = leaf.entry.value.items.slice(0) + + const first = this._filename() + const second = this._filename() + + await (async () => { + // Flush any existing writes. We're still write blocked. + const writes = this._queue(leaf.entry.value.id).writes.splice(0) + await this._writeLeaf(leaf.entry.value.id, writes) + + // Create our journaled tree alterations. + const prepare = [] + + // Create a stub that loads the existing page. + const previous = leaf.entry.value.append + prepare.push({ + method: 'stub', + page: { id: leaf.entry.value.id, append: first }, + records: [{ + method: 'load', + id: leaf.entry.value.id, + append: previous + }, { + method: 'dependent', + id: leaf.entry.value.id, + append: second + }] + }, { + method: 'stub', + page: { id: leaf.entry.value.id, append: second }, + records: [{ + method: 'load', + id: leaf.entry.value.id, + append: first + }] + }) + leaf.entry.value.append = second + + const commit = new Commit(this) + await commit.write(prepare) + await commit.prepare() + await commit.commit() + await commit.dispose() + }) () + + block.exit.resolve() + + await (async () => { + const prepare = [] + const commit = new Commit(this) + + prepare.push({ + method: 'unlink', + path: path.join('pages', leaf.entry.value.id, first) + }) + + prepare.push(await commit.vacuum(leaf.entry.value.id, first, second, items, leaf.entry.value.right)) + // Merged pages themselves can just be deleted, but when we do, we + // need to... Seems like both split and merge can use the same + // mechanism, this dependant reference. So, every page we load has a + // list of dependents. We can eliminate any that we know we can + // delete. + + // Delete previous versions. Oof. Split means we have multiple + // references. + const deleted = {} + const deletions = {} + const dependencies = JSON.parse(JSON.stringify(leaf.entry.value.dependencies)) + + // Could save some file operations by maybe doing the will be deleted + // removals first, but this logic is cleaner. + for (const page in dependencies) { + for (const dependent in dependencies[page]) { + const [ id, append ] = dependent.split('/') + try { + await fs.stat(this._path('pages', id, append)) + } catch (error) { + Strata.Error.assert(error.code == 'ENOENT', 'vacuum.not.enoent', error, { id, append }) + deleted[dependent] = true + } + } + } + + let loop = true + while (loop) { + loop = false + for (const page in dependencies) { + if (Object.keys(dependencies[page]).length == 0) { + loop = true + deleted[page] = true + deletions[page] = true + delete dependencies[page] + } else { + for (const dependent in dependencies[page]) { + if (deleted[dependent]) { + loop = true + delete dependencies[page][dependent] + } + } + } + } + } + + // Delete all merged pages. + for (const deletion in deletions) { + const [ id, append ] = deletion.split('/') + prepare.push({ + method: 'unlink', + path: path.join('pages', id, append) + }) + } + + await commit.write(prepare) + await commit.prepare() + await commit.commit() + await commit.dispose() + }) () + + leaf.entry.release() + } + // Assume there is nothing to block or worry about with the branch pages. // Can't recall at the moment, though. Descents are all synchronous. // @@ -605,6 +795,7 @@ class Journalist { id: this._nextId(true), leaf: true, items: items, + dependencies: JSON.parse(JSON.stringify(child.entry.value.dependencies)), right: child.entry.value.right, append: this._filename() }) @@ -635,9 +826,35 @@ class Journalist { // Write any queued writes, they would have been in memory, in the page // that was split above. Once we await, items can be inserted or removed // from the page in memory. Our synchronous operations are over. + const append = this._filename() + const references = { + source: reference(child.entry.value), + left: reference(child.entry.value, append), + right: reference(right.value) + } const writes = this._queue(child.entry.value.id).writes.splice(0) + writes.push({ + header: { method: 'dependent', id: child.entry.value.id, append }, + body: null + }, { + header: { method: 'dependent', id: right.value.id, append: right.value.append }, + body: null + }) await this._writeLeaf(child.entry.value.id, writes) + // TODO Maybe keep the merge, load records in the page and add then, + // then reconstruct dependencies by replaying them. + const dependencies = { + left: new Dependencies(child.entry.value.dependencies), + right: new Dependencies(right.value.dependencies), + } + + dependencies.left.page(child.entry.value, append) + dependencies.left.page(child.entry.value).add(child.entry.value, append) + dependencies.left.page(child.entry.value).add(right.value) + + dependencies.right.page(right.value) + // Curious race condition here, though, where we've flushed the page to // split @@ -665,7 +882,6 @@ class Journalist { // Record the split of the left page in a new stub, for which we create // a new append file. - const append = this._filename() prepare.push({ method: 'stub', page: { id: child.entry.value.id, append }, @@ -702,6 +918,9 @@ class Journalist { // that will be changing the tree structure. entries.forEach(entry => entry.release()) await this._possibleSplit(parent.entry.value, key, parent.level) + + await this._vacuum(key) + await this._vacuum(right.value.items[0].key) } async _selectMerger (key, child, entries) { @@ -961,7 +1180,14 @@ class Journalist { // currently blocked. We start by flushing any cached writes. const writes = { left: this._queue(left.entry.value.id).writes.splice(0), - right: this._queue(right.entry.value.id).writes.splice(0) + right: this._queue(right.entry.value.id).writes.splice(0).concat({ + header: { + method: 'dependent', + id: left.entry.value.id, + append: left.entry.value.append + }, + body: null + }) } await this._writeLeaf(left.entry.value.id, writes.left) diff --git a/package.json b/package.json index f5ded204..1e8b67de 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "fracture": "0.3.0-alpha.7", "hash.fnv": "^1.0.5", "interrupt": "^9.0.0-alpha.4", + "keyify": "^2.0.3", "prospective": "^0.1.0-alpha.4", "rimraf": "^3.0.0", "turnstile": "6.0.0-alpha.2" diff --git a/recorder.js b/recorder.js index 45f25881..cb648cb8 100644 --- a/recorder.js +++ b/recorder.js @@ -1,6 +1,6 @@ module.exports = function (checksum) { const EOL = Buffer.from('\n') - return function (header, body) { + return function (header, body = null) { const buffers = [], checksums = [] let length = 0, json = false if (body != null) {