-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add cancellation to RPC handlers #846
base: staging
Are you sure you want to change the base?
Add cancellation to RPC handlers #846
Conversation
For each RPC handler, we need to provide a Should I make a commit into |
Don't bother defining the type for it unless you're actually using it. So you can just name the |
I just did this to mute the warnings for type hints, as TS assumes it is |
Please make sure to review the actual situation where this can cause a big problem: MatrixAI/Polykey-CLI#264 (comment). If you're fixing it, a test must be created to prevent regression of that specific problem. |
All this does is add support for cancelling RPC which are in the middle of something. This would ensure that a RPC won't keep the entire agent alive in a deadlock state. I'm not sure how this relates to adding support for streaming progress updates back. They can still be done just fine after this PR is implemented, no? |
a2baf5f
to
7b14e96
Compare
Polykey/src/client/handlers/VaultsClone.ts Lines 28 to 42 in 19e5278
This whole section of parsing const nodeId = ids.parseNodeId(input.nodeIdEncoded); This single line is more concise and performant than the |
isomorphic-git/isomorphic-git#1867
My idea is to use a deconstructed promise and reject the promise when the context sends an abort signal, but I'm not sure how this would affect other things, especially the underlying code in |
This requires fast check model checking applied. I don't trust it until sufficient variations of side effects are tested and the right defaults are found. Along with benchmarking. |
We check for abortion in multiple ways in Polykey. In some places, it is Which one should we follow consistently throughout the repo? |
await vaultManager.pullVault(
{
vaultId: vaultId,
pullNodeId: nodeId,
pullVaultNameOrId: pullVaultId,
tran: tran,
},
ctx,
); Here, the parameters are being sent as a JSON object for some reason. I want to know when a JSON object should be used for parameters and when should they be passed as-is. Before, the await vaultManager.pullVault({
vaultId: vaultId,
pullNodeId: nodeId,
pullVaultNameOrId: pullVaultId,
tran: tran,
}); The one benefit I see is that the parameters don't need to be ordered and can be set via setting key-value pairs like in Python. However, I can't do the same with the context, as it needs a special decorator which doesn't work well with JSON objects. @timedCancellable(true)
public async pullVault(
{
vaultId,
pullNodeId,
pullVaultNameOrId,
tran,
}: {
vaultId: VaultId;
pullNodeId?: NodeId;
pullVaultNameOrId?: VaultId | VaultName;
tran?: DBTransaction;
},
@context ctx: ContextTimed, // The decorator can't go in JSON object
): Promise<void>; How should I handle these cases, @tegefaulkes? |
I went over a couple of things with Brian in a meeting today, and this is the key takeaways.
|
src/vaults/VaultInternal.ts
Outdated
protected async start_( | ||
fresh: boolean, | ||
tran: DBTransaction, | ||
ctx?: Partial<ContextTimed>, | ||
vaultName?: VaultName, | ||
) { | ||
): Promise<void>; | ||
@timedCancellable(true) | ||
protected async start_( | ||
fresh: boolean, | ||
tran: DBTransaction, | ||
@context ctx: ContextTimed, | ||
vaultName?: VaultName, | ||
): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, as tran
is expected to be present, and an overload also expects ctx
to be present, the vaultName
is the only optional parameter. Thus, it has been pushed back to the end of the argument list. This does not follow how the rest of the project has been formatted, but there is little that can be done here.
However, I am curious as to the reason for keeping the vault name optional. Does this mean that vaults can exist without vault names? I guess it would make sense, as a vault name is merely an alias and the vault ID is the actual identifier. But I'd still want to know the actual reason.
As a workaround, I can pack other parameters inside a JSON object and pass the context and transaction identifier outside like regular parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea vault names are optional. They can be reset, or they can be cleared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature might be wrong.
I am kind of nitpicking here, but I noticed this minor point I want to bring up and get some clarification on. We rely on getting the class names for logger messages. This is done differently for static and instance methods for a class. In static methods, Can't we do something like a getter that returns the class name reliably in all contexts? class VaultInternal {
static get className(): string {
return this.name;
}
get className(): string {
return this.constructor.className;
}
// Now the class name can be obtained in any context using
// this.className
} Note that this code was generated by ChatGPT and has not been tested yet. This probably won't be the final implementation, but I am just asking if this idea can be implemented. |
We are also inconsistently using I think we should be explicit about this. Thoughts, @tegefaulkes? |
// This should really be an internal property
// get whether this is remote, and the remote address
// if it is, we consider this repo an "attached repo"
// this vault is a "mirrored" vault
if (
(await tran.get([
...this.vaultMetadataDbPath,
VaultInternal.remoteKey,
])) != null
) {
// Mirrored vaults are immutable
throw new vaultsErrors.ErrorVaultRemoteDefined();
} This was a comment made in |
Probably doesn't work that's why it's different. |
But is this a good thing to invest some time and effort in? This can probably be a quick fix as a part of this PR itself, or I can make an issue which explores this more in depth and maybe explores a way to get this in other repos without defining it in every class. |
Another option would be to repurpose the file to assist with other issues like #822. We shouldn't be keeping dead code unless there's a good reason. |
Lines 54 to 68 in 72c7dfb
The type for |
Did you review this: https://github.com/MatrixAI/js-rpc?tab=readme-ov-file#specifications? |
}> & { | ||
paths: Array<TopicSubPath>; | ||
}, | ||
_cancel, | ||
_meta, | ||
_cancel: (reason?: any) => void, | ||
_meta: Record<string, JSONValue>, | ||
ctx: ContextTimed, | ||
): AsyncGenerator<ClientRPCResponseResult<AuditEventSerialized>> { | ||
const { audit }: { audit: Audit } = this.container; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused here since I didn't work directly on js-rpc.
But if a handler receives a cancel
function. What exactly is it cancelling?
Is it cancelling itself? I find this API a bit strange.
From first principles, the idea is that a caller can "abort" its call, by using the controller to abort: controller.abort(reason);
This is transmitted to the handler side, and tells the handler that it is aborted. You can check synchronously with ctx.signal.aborted
, and then the handler essentially deals with this problem gracefully.
In a single-process context, if the caller proceeds to call cancel
, the caller's promise is rejected with the cancellation reason immediately, but the handler side may continue to function (this example comes from cancellable.test.ts
in js-contexts
:
const fCancellable = cancellable(f);
// Signal is aborted afterwards
const pC1 = fCancellable();
pC1.cancel('cancel reason');
await expect(pC1).rejects.toBe('cancel reason');
Therefore something similar happens across multi-process context as in the case of RPC. Essentially the call site rejects immediately with a reason, but the handler is able to at their discretion to deal with how it wants to cancel.
The problem right now I see in PK's rpc is that it does not seem that this abort signal is being properly handled in every case. The patterns used to handle abort signal from the caller side should be studied in js-contexts
and js-rpc
tests should be representing this correctly.
Now for the cancel
function on the handler side, I don't quite understand this... why does this exist?
If it is to "cancel" the handling, I would argue that you should just raise an exception, or return early, there's no need to have a cancel
method. What exactly is it cancelling?
I think this concept generally makes sense under unary handler and server streaming handlers.
In the case of client streaming and duplex streaming, it can be get a bit more complicated. Because now the client is also sending a stream, it's not just the case where the server side is handling a request and returning 1 response or multiple responses, but the client side can send multiple requests, and in cases like duplex streaming, it can also be interleaved order of request, response, response, request request... etc, like a "conversation". That's what I'm referring to as a RPC conversation.
I remember an older issue in js-rpc where I pointed out that the implementation of js-rpc is missing client side transmission of errors: MatrixAI/js-rpc#18.
This could mean that one could also cancel from the server side handling. That would indicate the idea that the SERVER no longer cares about the client's next request. This is important because normally under unary and server streaming, cancelling means "as a client I do not care about your next response", whereas in the case of client streaming and duplex streaming, cancelling from the server side means "as the server I do not care about your next request". Swap request/response for just "message" and you get the idea.
However if you cancel, then you want to make sure that you're also getting the abort reason properly. This is not currently documented or demonstrated in the js-rpc README.
@aryanjassal I expect you to update the README to explain the new implementation and model checked tests in js-rpc to achieve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of client streaming and duplex streaming:
abstract class ClientHandler<
Container extends ContainerType = ContainerType,
Input extends JSONRPCRequestParams = JSONRPCRequestParams,
Output extends JSONRPCResponseResult = JSONRPCResponseResult,
> extends Handler<Container, Input, Output> {
public async handle(
/* eslint-disable */
input: AsyncIterableIterator<Input>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
/* eslint-disable */
): Promise<Output> {
throw new ErrorRPCMethodNotImplemented();
}
}
abstract class DuplexHandler<
Container extends ContainerType = ContainerType,
Input extends JSONRPCRequestParams = JSONRPCRequestParams,
Output extends JSONRPCResponseResult = JSONRPCResponseResult,
> extends Handler<Container, Input, Output> {
/**
* Note that if the output has an error, the handler will not see this as an
* error. If you need to handle any clean up it should be handled in a
* `finally` block and check the abort signal for potential errors.
*/
public async *handle(
/* eslint-disable */
input: AsyncIterableIterator<Input>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
/* eslint-disable */
): AsyncIterableIterator<Output> {
throw new ErrorRPCMethodNotImplemented('This method must be overwrtitten.');
}
}
The cancel
function here should mean send a cancel signal to the client side. But @tegefaulkes said that this is being used to cancel the webstream on the server side. This is incorrect and should not be done this way.
The unary and server streaming handler should not receive cancel
as a parameter, it should never be used.
In the case of client streaming and server streaming, the cancel
should not be cancelling the underlying stream, it should be about sending a abort signal to the client side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be a new issue in js-rpc to refactor the cancel
parameter being passed into all the high level handlers. Remove from unary and server streaming, while changing the implementation for client streaming and duplex streaming. You have to change it so that client streaming and duplex streaming on the call-site also gives back a ContextCancellable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be solved with: MatrixAI/js-rpc#18
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not clear in the js-rpc documentation how you're supposed to properly "cancel" the server handler for all 4 call types. Even in the case of unary calls, you don't get back a PromiseCancellable
where you can do:
const callP = rpcClient.methods.testMethod();
callP.cancel();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aryanjassal this needs a new issue.
- We want to use
PromiseCancellable
on the client callers. - This means it should be possible to do
callP.cancel(reason);
- This should translate into a stream cancel reason using the
reasonToCode
. - On the server side where
codeToReason
occurs, that should try to recover an exception object or a generic exception object. (This will require indexing all exceptions by code numbers). - The handler on the server side should then see it as part of the
ctx.signal.reason
. - High level handlers on the server side should not have access to the
cancel
parameter. - @tegefaulkes mentioned that the
cancel
parameter is being used by isogit. However this shouldn't be done this, way I believe it should be possible to bridge thectx.signal
into the cancellation of the isogit http stream: Add cancellation to RPC handlers #846 (comment), it shouldn't be using thecancel
parameter directly. - I don't know if the
cancel
parameter is needed at the handler level at all. It may be required in the case of client streaming/duplex streaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On point 7, it's not being used by isogit. It's just Isogit has no built in cancellation so we'd need to use the cancel()
to propagate the cancellation to the stream layer to end the cloning and pulling operation earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it is not equivalent. There are certain exceptions we expect from the validator and matcher. |
Yes so refer to my comment here: #846 (comment). Any progress update system SHOULD not be part of the regular RPC call, that over complicates our API and makes it harder to integrate. Remember even a unary call can trigger a long running operation on the agent side (and downstream effects magnify this due to "Call Amplification"). All progress updates depend on the measurement point. A client may measure between calls it makes by chunking up its calls. An example is like MULTIPLE RPC call or a client stream that streams chunks at a time expecting a synchronous response. This doesn't require the server side to necessarily have a separate protocol to give progress updates, it's implicit to the algorithm which makes it elegant. On the other hand, let's say you want to "break up" a long running async operation to get progress updates, rather than building a separate protocol for that, you could try to just do chunking of the request. On the other hand, in some cases it really doesn't make sense to chunk the request, since the request is spatially/structurally small on the client side, it just takes a long time on the server side. You don't necessarily want to change your unary call into a server streaming call cause that changes the API and makes it needlessly complex for users who just want to do a regular call. So in those scenarios, you introduce out-of-band RPC calls to get this information. This is where #444 comes into play. We can make use of a generalized push outlet, and stream information from there, that allows clients at their discretion to get extra progress information from a separate system. Another way is to introduce a streaming variant of the same call, this enables clients again to use a more complex API if they want the progress update version of the same call. This is common in functional programming where one creates |
So isogit is a third party library. We often come across third party library problems, if it is structurally a problem, we fork them and implement it ourselves - we can do so by creating js-virtualgit. However I remember that in the case of isogit, we actually hacked into the HTTP stream itself, where the HTTP stream is virtualised ontop of our RPC stream protocol. That would mean, you could implement a sort of abortion at the raw RPC stream level, by aborting it at the stream. This is because when we use isogit's features, we are actually wrapping its entire IO, so by being able to control the IO we can inject an abortion at the stream rather than relying on the internal mechanism of isogit to do so. This is absolutely possible but you need to diagram out a block diagram of the wrapped structure to demonstrate. |
Obviously However beware that this only works synchronously, when dealing with asynchronous operations, you either want to propagate that signal downwards, so that all asynchronous loop steps into a check of the signal property. Sometimes that is not available, so you have to integrate it into the chunked loop step. |
I don't understand this, js-rpc must take an input JSON object. There's no choice around here. The |
You should make it explicit. Sometimes people are lazy. Don't propagate laziness. |
|
I think this PR is lacking consideration/discussion of the timeouts/timers. Beware of how we expect timeouts to work. You must read the README.md of js-rpc on the timeouts section and do some reflection there. |
If MatrixAI/Polykey-CLI#264 (comment) is not fixed by this PR, then this PR is not sufficient. You must target the fixing of MatrixAI/Polykey-CLI#264 (comment). |
This specific PR lays down the groundwork to allow this. However, adding RPC progress updates is out of scope for this particular PR. I can work on that in a new PR after I close this one. |
src/vaults/VaultManager.ts
Outdated
@@ -340,7 +350,7 @@ class VaultManager { | |||
); | |||
const vaultIdString = vaultId.toString() as VaultIdString; | |||
return await this.vaultLocks.withF( | |||
[vaultId.toString(), RWLockWriter, 'write'], | |||
[vaultId.toString(), RWLockWriter, 'write', ctx], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done this in a couple of places, but I don't think this is the right way to do this. The type itself is also pretty complex and I can't understand it. This feels wrong to me, but no error is raised here.
I have noticed inside |
Description
Without proper abortion or cancellation for a RPC, it will keep running and conclude first. This ends up adding a significant amount of time when closing the agent, as all outstanding RPC calls must resolve beforehand.
Issues Fixed
Tasks
AgentLockAll
uncancellable unary commandAgentStatus
uncancellable unary commandAgentStop
cant cancel unary noop commandAgentUnlock
AuditEventsGet
AuditMetricGet
GestaltsActionsGetByIdentity
GestaltsActionsGetByNode
GestaltsActionsSetByIdentity
GestaltsActionsSetByNode
GestaltsActionsUnsetByIdentity
GestaltsActionsUnsetByNode
GestaltsDiscoveryByIdentity
GestaltsDiscoveryByNode
GestaltsDiscoveryQueue
GestaltsGestaltsGetByIdentity
GestaltsGestaltsGetByNode
GestaltsGestaltsList
GestaltsGestaltsTrustByIdentity
GestaltsGestaltsTrustByNode
IdentitiesAuthenticate
IdentitiesAuthenticatedGet
IdentitiesClaim
IdentitiesInfoConnectedGet
IdentitiesInfoGet
IdentitiesInvite
IdentitiesProvidersList
IdentitiesTokenDelete
IdentitiesTokenGet
IdentitiesTokenPut
KeysCertsChainGet
KeysCertsGet
KeysDecrypt
KeysEncrypt
KeysKeyPair
KeysKeyPairRenew
KeysKeyPairReset
KeysPasswordChange
KeysPublicKey
KeysSign
KeysVerify
NodesAdd
NodesClaim
NodesFind
NodesGetAll
NodesListConnections
NodesPing
NotificationsInboxClear
NotificationsInboxRead
NotificationsInboxRemove
NotificationsOutboxClear
NotificationsOutboxClear
NotificationsOutboxRead
NotificationsOutboxRemove
NotificationsSend
VaultsClone
VaultsCreate
VaultsDelete
VaultsList
VaultsLog
VaultsPermissionGet
VaultsPermissionSet
VaultsPermissionUnset
VaultsPull
VaultsRename
VaultsScan
VaultsSecretsCat
VaultsSecretsEnv
VaultsSecretsGet
VautlsSecretsList
VaultsSecretsMkdir
VaultsSecretsNew
VaultsSecretsNewDir
VaultsSecretsRemove
VaultsSecretsRename
VaultsSecretsStat
VaultsSecretsWriteFile
VaultsVersion
NodesClaimNetworkSign
NodesClaimNetworkVerify
NodesClaimsGet
NodesClosestActiveConnectionsGet
NodesClosestLocalNodesGet
NodesConnectionSignalFinal
NodesConnectionSignalInitial
NodesCrossSignClaim
NotificationsSend
VaultsGitInfoGet
VaultsGitPackGet
VaultsScan
Final checklist