Skip to content

Commit

Permalink
feat: support bulk message RPC (#2313)
Browse files Browse the repository at this point in the history
## Why is this change needed?

Reconciliation of messages historically from off-hub sources is achieved
via `submitMessage`, which incurs a performance penalty when many of
these calls are made in rapid succession. This change introduces a new
`submitBulkMessages` RPC which allows many of the to-be reconciled
messages to be submitted at once and handle via the more efficient rust
`mergeMany` underlying call.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [x] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR adds a new gRPC method `SubmitBulkMessages` for submitting
multiple messages at once and updates related proto files and service
implementations.

### Detailed summary
- Added `SubmitBulkMessages` gRPC method
- Defined new message types: `SubmitBulkMessagesRequest`,
`MessageError`, `BulkMessageResponse`, `SubmitBulkMessagesResponse`
- Updated service implementations and proto files
- Implemented server-side logic for `SubmitBulkMessages`
- Added encoding and decoding functions for new message types

> The following files were skipped due to too many changes:
`packages/hub-web/src/generated/request_response.ts`,
`packages/hub-nodejs/src/generated/request_response.ts`,
`packages/core/src/protobufs/generated/request_response.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
CassOnMars authored Sep 17, 2024
1 parent 0a655b9 commit e5a8611
Show file tree
Hide file tree
Showing 10 changed files with 1,105 additions and 1 deletion.
8 changes: 8 additions & 0 deletions .changeset/pink-drinks-brush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@farcaster/hub-nodejs": patch
"@farcaster/hub-web": patch
"@farcaster/core": patch
"@farcaster/hubble": patch
---

feat: support bulk message writing rpcs
71 changes: 70 additions & 1 deletion apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ import {
StreamError,
OnChainEventRequest,
FidRequest,
getFarcasterTime,
MessageBundle,
SubmitBulkMessagesResponse,
BulkMessageResponse,
MessageError,
} from "@farcaster/hub-nodejs";
import { err, ok, Result, ResultAsync } from "neverthrow";
import { APP_NICKNAME, APP_VERSION, HubInterface } from "../hubble.js";
Expand All @@ -70,7 +75,7 @@ import {
STREAM_MESSAGE_BUFFER_SIZE,
SLOW_CLIENT_GRACE_PERIOD_MS,
} from "./bufferedStreamWriter.js";
import { sleep } from "../utils/crypto.js";
import { blake3Truncate160, sleep } from "../utils/crypto.js";
import { jumpConsistentHash } from "../utils/jumpConsistentHash.js";
import { SUBMIT_MESSAGE_RATE_LIMIT, rateLimitByIp } from "../utils/rateLimits.js";
import { statsd } from "../utils/statsd.js";
Expand Down Expand Up @@ -927,6 +932,70 @@ export default class Server {
},
);
},
submitBulkMessages: async (call, callback) => {
// Identify peer that is calling, if available. This is used for rate limiting.
const peer = Result.fromThrowable(
() => call.getPeer(),
(e) => e,
)().unwrapOr("unavailable");

// Check for rate limits
const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter);
if (rateLimitResult.isErr()) {
logger.warn({ peer }, "submitBulkMessages rate limited");
callback(toServiceError(new HubError("unavailable", "API rate limit exceeded")));
return;
}

// Authentication
const authResult = authenticateUser(call.metadata, this.rpcUsers);
if (authResult.isErr()) {
logger.warn({ errMsg: authResult.error.message }, "gRPC submitBulkMessages failed");
callback(
toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)),
);
return;
}

const submissionTime = getFarcasterTime();
if (submissionTime.isErr()) {
callback(toServiceError(submissionTime.error));
return;
}

const { messages } = call.request;
const allHashes = Buffer.concat(messages.map((message) => message.hash ?? new Uint8Array()));
const bundleHash = blake3Truncate160(allHashes);

const messageBundle = MessageBundle.create({
messages: messages,
hash: bundleHash,
});
const result = await this.hub?.submitMessageBundle(submissionTime.value, messageBundle, "rpc");
callback(
null,
SubmitBulkMessagesResponse.create({
messages: result?.map((m, i) =>
m.match(
() => {
return BulkMessageResponse.create({
message: messages[i],
});
},
(err: HubError) => {
return BulkMessageResponse.create({
messageError: MessageError.create({
hash: messages[i]?.hash ?? new Uint8Array([]),
errCode: err.errCode,
message: err.message,
}),
});
},
),
),
}),
);
},
validateMessage: async (call, callback) => {
const message = call.request;
const result = await this.hub?.validateMessage(message);
Expand Down
28 changes: 28 additions & 0 deletions apps/hubble/www/docs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,34 @@ Used to subscribe to real-time event updates from the Farcaster Hub
| Method Name | Request Type | Response Type | Description |
| ------------- | ------------ | ------------- | ---------------------------- |
| SubmitMessage | Message | Message | Submits a Message to the Hub |
| SubmitBulkMessages | [SubmitBulkMessagesRequest](#SubmitBulkMessagesRequest) | [SubmitBulkMessagesResponse](#SubmitBulkMessagesResponse) | Submits several Messages to the Hub |

### SubmitBulkMessagesRequest

| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| messages | [Message](#Message) | repeated | |

### MessageError

| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| hash | [bytes](#bytes) | | |
| errCode | [string](#string) | | |
| message | [string](#string) | | |

### BulkMessageResponse

| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| message | [Message](#Message) | | |
| message_error | [MessageError](#MessageError) | | |

### SubmitBulkMessagesResponse

| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| messages | [BulkMessageResponse](#BulkMessageResponse) | repeated | |

## 10. Username Proofs Service

Expand Down
Loading

0 comments on commit e5a8611

Please sign in to comment.