Skip to content

Commit

Permalink
Merge pull request #847 from MatrixAI/feature-multiple-vault-resource
Browse files Browse the repository at this point in the history
Allow vault `efs` resource acquisition to operate on multiple vaults in parallel
  • Loading branch information
aryanjassal authored Dec 10, 2024
2 parents 9276357 + 7491144 commit 83642fa
Show file tree
Hide file tree
Showing 17 changed files with 551 additions and 160 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"@matrixai/mdns": "^1.2.6",
"@matrixai/quic": "^1.3.1",
"@matrixai/resources": "^1.1.5",
"@matrixai/rpc": "^0.6.0",
"@matrixai/rpc": "^0.6.2",
"@matrixai/timer": "^1.1.3",
"@matrixai/workers": "^1.3.7",
"@matrixai/ws": "^1.1.7",
Expand Down
12 changes: 12 additions & 0 deletions src/client/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ class ErrorClientAuthDenied<T> extends ErrorClient<T> {
exitCode = sysexits.NOPERM;
}

class ErrorClientInvalidHeader<T> extends ErrorClient<T> {
static description = 'The header message does not match the expected type';
exitCode = sysexits.USAGE;
}

class ErrorClientProtocolError<T> extends ErrorClient<T> {
static description = 'Data does not match the protocol requirements';
exitCode = sysexits.USAGE;
}

class ErrorClientService<T> extends ErrorClient<T> {}

class ErrorClientServiceRunning<T> extends ErrorClientService<T> {
Expand Down Expand Up @@ -45,6 +55,8 @@ export {
ErrorClientAuthMissing,
ErrorClientAuthFormat,
ErrorClientAuthDenied,
ErrorClientInvalidHeader,
ErrorClientProtocolError,
ErrorClientService,
ErrorClientServiceRunning,
ErrorClientServiceNotRunning,
Expand Down
4 changes: 3 additions & 1 deletion src/client/handlers/VaultsSecretsCat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class VaultsSecretsCat extends DuplexHandler<
ClientRPCResponseResult<ContentOrErrorMessage>
> {
public handle = async function* (
input: AsyncIterable<ClientRPCRequestParams<SecretIdentifierMessage>>,
input: AsyncIterableIterator<
ClientRPCRequestParams<SecretIdentifierMessage>
>,
): AsyncGenerator<ClientRPCResponseResult<ContentOrErrorMessage>> {
const { db, vaultManager }: { db: DB; vaultManager: VaultManager } =
this.container;
Expand Down
4 changes: 2 additions & 2 deletions src/client/handlers/VaultsSecretsEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { DuplexHandler } from '@matrixai/rpc';
import * as vaultsUtils from '../../vaults/utils';
import * as vaultsErrors from '../../vaults/errors';

class VaultsSecretsList extends DuplexHandler<
class VaultsSecretsEnv extends DuplexHandler<
{
db: DB;
vaultManager: VaultManager;
Expand Down Expand Up @@ -86,4 +86,4 @@ class VaultsSecretsList extends DuplexHandler<
};
}

export default VaultsSecretsList;
export default VaultsSecretsEnv;
2 changes: 1 addition & 1 deletion src/client/handlers/VaultsSecretsMkdir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class VaultsSecretsMkdir extends DuplexHandler<
ClientRPCResponseResult<SuccessOrErrorMessage>
> {
public handle = async function* (
input: AsyncIterable<ClientRPCRequestParams<SecretDirMessage>>,
input: AsyncIterableIterator<ClientRPCRequestParams<SecretDirMessage>>,
): AsyncGenerator<ClientRPCResponseResult<SuccessOrErrorMessage>> {
const { db, vaultManager }: { db: DB; vaultManager: VaultManager } =
this.container;
Expand Down
170 changes: 100 additions & 70 deletions src/client/handlers/VaultsSecretsRemove.ts
Original file line number Diff line number Diff line change
@@ -1,99 +1,129 @@
import type { DB } from '@matrixai/db';
import type { ResourceAcquire } from '@matrixai/resources';
import type {
ClientRPCRequestParams,
ClientRPCResponseResult,
SecretIdentifierMessage,
SecretsRemoveHeaderMessage,
SecretIdentifierMessageTagged,
SuccessOrErrorMessage,
} from '../types';
import type VaultManager from '../../vaults/VaultManager';
import type { FileSystemWritable } from '../../vaults/types';
import { withG } from '@matrixai/resources';
import { DuplexHandler } from '@matrixai/rpc';
import * as vaultsUtils from '../../vaults/utils';
import * as vaultsErrors from '../../vaults/errors';
import * as clientErrors from '../errors';

class VaultsSecretsRemove extends DuplexHandler<
{
db: DB;
vaultManager: VaultManager;
},
ClientRPCRequestParams<SecretIdentifierMessage>,
ClientRPCRequestParams<
SecretsRemoveHeaderMessage | SecretIdentifierMessageTagged
>,
ClientRPCResponseResult<SuccessOrErrorMessage>
> {
public handle = async function* (
input: AsyncIterable<ClientRPCRequestParams<SecretIdentifierMessage>>,
input: AsyncIterableIterator<
ClientRPCRequestParams<
SecretsRemoveHeaderMessage | SecretIdentifierMessageTagged
>
>,
): AsyncGenerator<ClientRPCResponseResult<SuccessOrErrorMessage>> {
const { db, vaultManager }: { db: DB; vaultManager: VaultManager } =
this.container;
// Create a record of secrets to be removed, grouped by vault names
const vaultGroups: Record<string, Array<string>> = {};
const secretNames: Array<[string, string]> = [];
let metadata: any = undefined;
for await (const secretRemoveMessage of input) {
if (metadata == null) metadata = secretRemoveMessage.metadata ?? {};
secretNames.push([
secretRemoveMessage.nameOrId,
secretRemoveMessage.secretName,
]);
// Extract the header message from the iterator
const headerMessagePair = await input.next();
const headerMessage:
| SecretsRemoveHeaderMessage
| SecretIdentifierMessageTagged = headerMessagePair.value;
// Testing if the header is of the expected format
if (
headerMessagePair.done ||
headerMessage.type !== 'VaultNamesHeaderMessage'
) {
throw new clientErrors.ErrorClientInvalidHeader();
}
secretNames.forEach(([vaultName, secretName]) => {
if (vaultGroups[vaultName] == null) {
vaultGroups[vaultName] = [];
// Create an array of write acquires
const vaultAcquires = await db.withTransactionF(async (tran) => {
const vaultAcquires: Array<ResourceAcquire<FileSystemWritable>> = [];
for (const vaultName of headerMessage.vaultNames) {
const vaultIdFromName = await vaultManager.getVaultId(vaultName, tran);
const vaultId = vaultIdFromName ?? vaultsUtils.decodeVaultId(vaultName);
if (vaultId == null) {
throw new vaultsErrors.ErrorVaultsVaultUndefined(
`Vault ${vaultName} does not exist`,
);
}
const acquire = await vaultManager.withVaults(
[vaultId],
async (vault) => vault.acquireWrite(),
);
vaultAcquires.push(acquire);
}
vaultGroups[vaultName].push(secretName);
return vaultAcquires;
});
// Now, all the paths will be removed for a vault within a single commit
yield* db.withTransactionG(
async function* (tran): AsyncGenerator<SuccessOrErrorMessage> {
for (const [vaultName, secretNames] of Object.entries(vaultGroups)) {
const vaultIdFromName = await vaultManager.getVaultId(
vaultName,
tran,
);
const vaultId =
vaultIdFromName ?? vaultsUtils.decodeVaultId(vaultName);
if (vaultId == null) {
throw new vaultsErrors.ErrorVaultsVaultUndefined();
// Acquire all locks in parallel and perform all operations at once
yield* withG(
vaultAcquires,
async function* (efses): AsyncGenerator<SuccessOrErrorMessage> {
// Creating the vault name to efs map for easy access
const vaultMap = new Map<string, FileSystemWritable>();
for (let i = 0; i < efses.length; i++) {
vaultMap.set(headerMessage!.vaultNames[i], efses[i]);
}
let loopRan = false;
for await (const message of input) {
loopRan = true;
// Header messages should not be seen anymore
if (message.type === 'VaultNamesHeaderMessage') {
throw new clientErrors.ErrorClientProtocolError(
'The header message cannot be sent multiple times',
);
}
yield* vaultManager.withVaultsG(
[vaultId],
async function* (vault): AsyncGenerator<SuccessOrErrorMessage> {
yield* vault.writeG(
async function* (efs): AsyncGenerator<SuccessOrErrorMessage> {
for (const secretName of secretNames) {
try {
const stat = await efs.stat(secretName);
if (stat.isDirectory()) {
await efs.rmdir(secretName, {
recursive: metadata?.options?.recursive,
});
} else {
await efs.unlink(secretName);
}
yield {
type: 'success',
success: true,
};
} catch (e) {
if (
e.code === 'ENOENT' ||
e.code === 'ENOTEMPTY' ||
e.code === 'EINVAL'
) {
// INVAL can be triggered if removing the root of the
// vault is attempted.
yield {
type: 'error',
code: e.code,
reason: secretName,
};
} else {
throw e;
}
}
}
},
);
},
tran,
const efs = vaultMap.get(message.nameOrId);
if (efs == null) {
throw new vaultsErrors.ErrorVaultsVaultUndefined(
`Vault ${message.nameOrId} was not present in the header message`,
);
}
try {
const stat = await efs.stat(message.secretName);
if (stat.isDirectory()) {
await efs.rmdir(message.secretName, {
recursive: headerMessage.recursive,
});
} else {
await efs.unlink(message.secretName);
}
yield {
type: 'success',
success: true,
};
} catch (e) {
if (
e.code === 'ENOENT' ||
e.code === 'ENOTEMPTY' ||
e.code === 'EINVAL'
) {
// EINVAL can be triggered if removing the root of the
// vault is attempted.
yield {
type: 'error',
code: e.code,
reason: message.secretName,
};
} else {
throw e;
}
}
}
// Content messages must follow header messages
if (!loopRan) {
throw new clientErrors.ErrorClientProtocolError(
'No content messages followed header message',
);
}
},
Expand Down
16 changes: 16 additions & 0 deletions src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,19 @@ type SecretStatMessage = {
};
};

type SecretIdentifierMessageTagged = SecretIdentifierMessage & {
type: 'SecretIdentifierMessage';
};

type VaultNamesHeaderMessage = {
type: 'VaultNamesHeaderMessage';
vaultNames: Array<string>;
};

type SecretsRemoveHeaderMessage = VaultNamesHeaderMessage & {
recursive?: boolean;
};

// Type casting for tricky handlers

type OverrideRPClientType<T extends RPCClient<ClientManifest>> = Omit<
Expand Down Expand Up @@ -435,6 +448,9 @@ export type {
SecretRenameMessage,
SecretFilesMessage,
SecretStatMessage,
SecretIdentifierMessageTagged,
VaultNamesHeaderMessage,
SecretsRemoveHeaderMessage,
SignatureMessage,
OverrideRPClientType,
AuditMetricGetTypeOverride,
Expand Down
4 changes: 3 additions & 1 deletion src/git/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ async function listObjects({
}
return;
default:
utils.never();
utils.never(
`type must be one of "commit", "tree", "blob", or "tag", got "${type}"`,
);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function getDefaultNodePath(): string | undefined {
return p;
}

function never(message?: string): never {
function never(message: string): never {
throw new utilsErrors.ErrorUtilsUndefinedBehaviour(message);
}

Expand Down
2 changes: 2 additions & 0 deletions src/vaults/Vault.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ interface Vault {
writeG: VaultInternal['writeG'];
readF: VaultInternal['readF'];
readG: VaultInternal['readG'];
acquireRead: VaultInternal['acquireRead'];
acquireWrite: VaultInternal['acquireWrite'];
log: VaultInternal['log'];
version: VaultInternal['version'];
}
Expand Down
Loading

0 comments on commit 83642fa

Please sign in to comment.