Skip to content

Commit

Permalink
fix: direct peering data needs to be processed by the worker due to o…
Browse files Browse the repository at this point in the history
…bscure node behavior (#2303)

## Why is this change needed?

Peer id validation performs a check of the existence of an object key
that is a `symbol`. When we pass this over to the gossip node worker,
the structuredClone copy drops this key (and functions), which was
causing a crash when using direct peers. The loss of functions would
have been more pronounced in discovering this, but because the check was
on the symbol first, it appeared to be a libp2p bug, until it was
discovered the symbol-based key itself was being omitted per
specification.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.


<!-- start pr-codex -->

---

## PR-Codex overview
This PR focuses on updating the direct peering data handling in the
`hubble` and `gossipNodeWorker` modules.

### Detailed summary
- Updated `directPeers` type to `string[]` in `hubble.ts` and
`gossipNode.ts`
- Modified direct peer address processing logic in `cli.ts` and
`gossipNodeWorker.ts`
- Added `parseAddress` function import in `gossipNodeWorker.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
CassOnMars authored Sep 10, 2024
1 parent 1943a02 commit ec80ff3
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/mean-tomatoes-beg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: direct peering data needs to be processed by the worker due to obscure node behavior
29 changes: 1 addition & 28 deletions apps/hubble/src/cli.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { FarcasterNetwork, farcasterNetworkFromJSON } from "@farcaster/hub-nodejs";
import { peerIdFromString } from "@libp2p/peer-id";
import { Ed25519PeerId, PeerId, RSAPeerId, Secp256k1PeerId } from "@libp2p/interface";
import { createEd25519PeerId, createFromProtobuf, exportToProtobuf } from "@libp2p/peer-id-factory";
import { AddrInfo } from "@chainsafe/libp2p-gossipsub/types";
import { Command } from "commander";
import fs, { existsSync } from "fs";
import { mkdir, readFile, writeFile } from "fs/promises";
Expand Down Expand Up @@ -481,32 +479,7 @@ app
);
}

const directPeers = ((cliOptions.directPeers ?? hubConfig.directPeers ?? []) as string[])
.map((a) => parseAddress(a))
.map((a) => {
if (a.isErr()) {
logger.warn(
{ errorCode: a.error.errCode, message: a.error.message },
"Couldn't parse direct peer address, ignoring",
);
} else if (a.value.getPeerId()) {
logger.warn(
{ errorCode: "unavailable", message: "peer id missing from direct peer" },
"Direct peer missing peer id, ignoring",
);
}

return a;
})
.filter((a) => a.isOk() && a.value.getPeerId())
.map((a) => a._unsafeUnwrap())
.map((a) => {
return {
id: peerIdFromString(a.getPeerId() ?? ""),
addrs: [a],
} as AddrInfo;
});

const directPeers = (cliOptions.directPeers ?? hubConfig.directPeers ?? []) as string[];
const rebuildSyncTrie = cliOptions.rebuildSyncTrie ?? hubConfig.rebuildSyncTrie ?? false;
const profileSync = cliOptions.profileSync ?? hubConfig.profileSync ?? false;

Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ export interface HubOptions {
pruneEventsJobCron?: string;

/** A list of addresses the node directly peers with, provided in MultiAddr format */
directPeers?: AddrInfo[];
directPeers?: string[];

/** If set, snapshot sync is disabled */
disableSnapshotSync?: boolean;
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export interface NodeOptions {
/** A list of peerIds that are not allowed to connect to this node */
deniedPeerIdStrs?: string[] | undefined;
/** A list of addresses the node directly peers with, provided in MultiAddr format */
directPeers?: AddrInfo[] | undefined;
directPeers?: string[] | undefined;
/** Override peer scoring. Useful for tests */
scoreThresholds?: Partial<PeerScoreThresholds>;
/** A list of PeerIds that will bypass application-specific peer scoring and return the cap. */
Expand Down
33 changes: 30 additions & 3 deletions apps/hubble/src/network/p2p/gossipNodeWorker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { parentPort, workerData } from "worker_threads";
import { peerIdFromBytes } from "@libp2p/peer-id";
import { peerIdFromBytes, peerIdFromString } from "@libp2p/peer-id";
import { AddrInfo } from "@chainsafe/libp2p-gossipsub/types";
import { autoNAT } from "@libp2p/autonat";
import { identify } from "@libp2p/identify";
import { ping } from "@libp2p/ping";
Expand Down Expand Up @@ -35,7 +36,7 @@ import {
Message,
toFarcasterTime,
} from "@farcaster/hub-nodejs";
import { addressInfoFromParts, checkNodeAddrs, ipMultiAddrStrFromAddressInfo } from "../../utils/p2p.js";
import { addressInfoFromParts, checkNodeAddrs, ipMultiAddrStrFromAddressInfo, parseAddress } from "../../utils/p2p.js";
import { createLibp2p, Libp2p } from "libp2p";
import { err, ok, Result, ResultAsync } from "neverthrow";
import { GossipSub, gossipsub, GossipsubEvents } from "@chainsafe/libp2p-gossipsub";
Expand Down Expand Up @@ -159,11 +160,37 @@ export class LibP2PNode {
? parseInt(process.env["GOSSIPSUB_SOCKET_TIMEOUT"])
: 30000;

const directPeers = options.directPeers
?.map((a) => parseAddress(a))
.map((a) => {
if (a.isErr()) {
logger.warn(
{ errorCode: a.error.errCode, message: a.error.message },
"Couldn't parse direct peer address, ignoring",
);
} else if (!a.value.getPeerId()) {
logger.warn(
{ errorCode: "unavailable", message: "peer id missing from direct peer" },
"Direct peer missing peer id, ignoring",
);
}

return a;
})
.filter((a) => a.isOk() && a.value.getPeerId())
.map((a) => a._unsafeUnwrap())
.map((a) => {
return {
id: peerIdFromString(a.getPeerId() ?? ""),
addrs: [a],
} as AddrInfo;
});

const gossip = gossipsub({
allowPublishToZeroTopicPeers: true,
asyncValidation: true, // Do not forward messages until we've merged it (prevents forwarding known bad messages)
canRelayMessage: true,
directPeers: options.directPeers || [],
directPeers: directPeers || [],
emitSelf: false,
fallbackToFloodsub: fallbackToFloodsub,
floodPublish: floodPublish,
Expand Down

0 comments on commit ec80ff3

Please sign in to comment.