Skip to content

Commit

Permalink
Changed Mutex to be file-based
Browse files Browse the repository at this point in the history
  • Loading branch information
james-pre committed Sep 24, 2023
1 parent 6e791b3 commit cc1e648
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 76 deletions.
120 changes: 60 additions & 60 deletions src/generic/locked_fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,267 +35,267 @@ export default class LockedFS<T extends FileSystem> implements FileSystem {
}

public async rename(oldPath: string, newPath: string, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(oldPath);
await this._fs.rename(oldPath, newPath, cred);
this._mu.unlock();
this._mu.unlock(oldPath);
}

public renameSync(oldPath: string, newPath: string, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(oldPath)) {
throw new Error('invalid sync call');
}
return this._fs.renameSync(oldPath, newPath, cred);
}

public async stat(p: string, cred: Cred): Promise<Stats> {
await this._mu.lock();
await this._mu.lock(p);
const stats = await this._fs.stat(p, cred);
this._mu.unlock();
this._mu.unlock(p);
return stats;
}

public statSync(p: string, cred: Cred): Stats {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.statSync(p, cred);
}

public async access(p: string, mode: number, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(p);
await this._fs.access(p, mode, cred);
this._mu.unlock();
this._mu.unlock(p);
}

public accessSync(p: string, mode: number, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.accessSync(p, mode, cred);
}

public async open(p: string, flag: FileFlag, mode: number, cred: Cred): Promise<File> {
await this._mu.lock();
await this._mu.lock(p);
const fd = await this._fs.open(p, flag, mode, cred);
this._mu.unlock();
this._mu.unlock(p);
return fd;
}

public openSync(p: string, flag: FileFlag, mode: number, cred: Cred): File {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.openSync(p, flag, mode, cred);
}

public async unlink(p: string, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(p);
await this._fs.unlink(p, cred);
this._mu.unlock();
this._mu.unlock(p);
}

public unlinkSync(p: string, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.unlinkSync(p, cred);
}

public async rmdir(p: string, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(p);
await this._fs.rmdir(p, cred);
this._mu.unlock();
this._mu.unlock(p);
}

public rmdirSync(p: string, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.rmdirSync(p, cred);
}

public async mkdir(p: string, mode: number, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(p);
await this._fs.mkdir(p, mode, cred);
this._mu.unlock();
this._mu.unlock(p);
}

public mkdirSync(p: string, mode: number, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.mkdirSync(p, mode, cred);
}

public async readdir(p: string, cred: Cred): Promise<string[]> {
await this._mu.lock();
await this._mu.lock(p);
const files = await this._fs.readdir(p, cred);
this._mu.unlock();
this._mu.unlock(p);
return files;
}

public readdirSync(p: string, cred: Cred): string[] {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.readdirSync(p, cred);
}

public async exists(p: string, cred: Cred): Promise<boolean> {
await this._mu.lock();
await this._mu.lock(p);
const exists = await this._fs.exists(p, cred);
this._mu.unlock();
this._mu.unlock(p);
return exists;
}

public existsSync(p: string, cred: Cred): boolean {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.existsSync(p, cred);
}

public async realpath(p: string, cred: Cred): Promise<string> {
await this._mu.lock();
await this._mu.lock(p);
const resolvedPath = await this._fs.realpath(p, cred);
this._mu.unlock();
this._mu.unlock(p);
return resolvedPath;
}

public realpathSync(p: string, cred: Cred): string {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.realpathSync(p, cred);
}

public async truncate(p: string, len: number, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(p);
await this._fs.truncate(p, len, cred);
this._mu.unlock();
this._mu.unlock(p);
}

public truncateSync(p: string, len: number, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.truncateSync(p, len, cred);
}

public async readFile(fname: string, encoding: BufferEncoding, flag: FileFlag, cred: Cred): Promise<FileContents> {
await this._mu.lock();
await this._mu.lock(fname);
const data = await this._fs.readFile(fname, encoding, flag, cred);
this._mu.unlock();
this._mu.unlock(fname);
return data;
}

public readFileSync(fname: string, encoding: BufferEncoding, flag: FileFlag, cred: Cred): FileContents {
if (this._mu.isLocked()) {
if (this._mu.isLocked(fname)) {
throw new Error('invalid sync call');
}
return this._fs.readFileSync(fname, encoding, flag, cred);
}

public async writeFile(fname: string, data: FileContents, encoding: BufferEncoding, flag: FileFlag, mode: number, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(fname);
await this._fs.writeFile(fname, data, encoding, flag, mode, cred);
this._mu.unlock();
this._mu.unlock(fname);
}

public writeFileSync(fname: string, data: FileContents, encoding: BufferEncoding, flag: FileFlag, mode: number, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(fname)) {
throw new Error('invalid sync call');
}
return this._fs.writeFileSync(fname, data, encoding, flag, mode, cred);
}

public async appendFile(fname: string, data: FileContents, encoding: BufferEncoding, flag: FileFlag, mode: number, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(fname);
await this._fs.appendFile(fname, data, encoding, flag, mode, cred);
this._mu.unlock();
this._mu.unlock(fname);
}

public appendFileSync(fname: string, data: FileContents, encoding: BufferEncoding, flag: FileFlag, mode: number, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(fname)) {
throw new Error('invalid sync call');
}
return this._fs.appendFileSync(fname, data, encoding, flag, mode, cred);
}

public async chmod(p: string, mode: number, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(p);
await this._fs.chmod(p, mode, cred);
this._mu.unlock();
this._mu.unlock(p);
}

public chmodSync(p: string, mode: number, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.chmodSync(p, mode, cred);
}

public async chown(p: string, new_uid: number, new_gid: number, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(p);
await this._fs.chown(p, new_uid, new_gid, cred);
this._mu.unlock();
this._mu.unlock(p);
}

public chownSync(p: string, new_uid: number, new_gid: number, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.chownSync(p, new_uid, new_gid, cred);
}

public async utimes(p: string, atime: Date, mtime: Date, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(p);
await this._fs.utimes(p, atime, mtime, cred);
this._mu.unlock();
this._mu.unlock(p);
}

public utimesSync(p: string, atime: Date, mtime: Date, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.utimesSync(p, atime, mtime, cred);
}

public async link(srcpath: string, dstpath: string, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(srcpath);
await this._fs.link(srcpath, dstpath, cred);
this._mu.unlock();
this._mu.unlock(srcpath);
}

public linkSync(srcpath: string, dstpath: string, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(srcpath)) {
throw new Error('invalid sync call');
}
return this._fs.linkSync(srcpath, dstpath, cred);
}

public async symlink(srcpath: string, dstpath: string, type: string, cred: Cred): Promise<void> {
await this._mu.lock();
await this._mu.lock(srcpath);
await this._fs.symlink(srcpath, dstpath, type, cred);
this._mu.unlock();
this._mu.unlock(srcpath);
}

public symlinkSync(srcpath: string, dstpath: string, type: string, cred: Cred): void {
if (this._mu.isLocked()) {
if (this._mu.isLocked(srcpath)) {
throw new Error('invalid sync call');
}
return this._fs.symlinkSync(srcpath, dstpath, type, cred);
}

public async readlink(p: string, cred: Cred): Promise<string> {
await this._mu.lock();
await this._mu.lock(p);
const linkString = await this._fs.readlink(p, cred);
this._mu.unlock();
this._mu.unlock(p);
return linkString;
}

public readlinkSync(p: string, cred: Cred): string {
if (this._mu.isLocked()) {
if (this._mu.isLocked(p)) {
throw new Error('invalid sync call');
}
return this._fs.readlinkSync(p, cred);
Expand Down
32 changes: 16 additions & 16 deletions src/generic/mutex.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
export type MutexCallback = () => void;

/**
* Non-recursive mutex
* @hidden
*/
export default class Mutex {
private _locked: boolean = false;
private _waiters: MutexCallback[] = [];
private _locks: Map<string, MutexCallback[]> = new Map();

public lock(): Promise<void> {
public lock(path: string): Promise<void> {
return new Promise(resolve => {
if (this._locked) {
this._waiters.push(resolve);
if (this._locks.has(path)) {
this._locks.get(path).push(resolve);
} else {
this._locks.set(path, []);
}

this._locked = true;
});
}

public unlock(): void {
if (!this._locked) {
public unlock(path: string): void {
if (!this._locks.has(path)) {
throw new Error('unlock of a non-locked mutex');
}

const next = this._waiters.shift();
const next = this._locks.get(path).shift();
/*
don't unlock - we want to queue up next for the
end of the current task execution, but we don't
Expand All @@ -36,19 +36,19 @@ export default class Mutex {
return;
}

this._locked = false;
this._locks.delete(path);
}

public tryLock(): boolean {
if (this._locked) {
public tryLock(path: string): boolean {
if (this._locks.has(path)) {
return false;
}

this._locked = true;
this._locks.set(path, []);
return true;
}

public isLocked(): boolean {
return this._locked;
public isLocked(path: string): boolean {
return this._locks.has(path);
}
}

0 comments on commit cc1e648

Please sign in to comment.