Skip to content

Commit

Permalink
deps: Bump @streamr network packages to v102.0.0-beta.0 (#22)
Browse files Browse the repository at this point in the history
## Changes

- The `NodeInfo` format has changed (streamr-dev/network#2738, streamr-dev/network#2790)
  - we normalize the info in `NetworkNodeFacade`
  - the type of `NormalizedNodeInfo` is quite complex, but it will be simpler when we export new helper types from `trackerless-network` (streamr-dev/network#2822, not yet released)
- Using renamed converted functions, e.g. `getNodeIdFromPeerDescriptor` to `toNodeId`

## Future improvements

- Use these types: streamr-dev/network#2822
  • Loading branch information
teogeb authored Oct 28, 2024
1 parent be66d55 commit fd05681
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 445 deletions.
487 changes: 112 additions & 375 deletions package-lock.json

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@
"author": "Streamr Network AG <[email protected]>",
"dependencies": {
"@streamr/config": "^5.3.7",
"@streamr/dht": "101.1.2",
"@streamr/sdk": "101.1.2",
"@streamr/trackerless-network": "101.1.2",
"@streamr/utils": "101.1.2",
"@streamr/dht": "102.0.0-beta.0",
"@streamr/sdk": "102.0.0-beta.0",
"@streamr/trackerless-network": "102.0.0-beta.0",
"@streamr/utils": "102.0.0-beta.0",
"@types/node-fetch": "^2.6.3",
"class-validator": "^0.14.1",
"cors": "^2.8.5",
"eventemitter3": "^5.0.1",
"express": "^4.18.2",
"graphql": "^16.8.1",
"geoip-lite": "^1.4.10",
"graphql": "^16.8.1",
"graphql-http": "^1.22.0",
"lodash": "^4.17.21",
"mysql2": "^3.9.2",
"node-fetch": "^2.6.7",
"p-limit": "3.1.0",
"reflect-metadata": "^0.2.1",
"semver": "^7.6.3",
"ts-essentials": "^9.4.1",
"type-graphql": "^2.0.0-beta.6",
"typedi": "^0.10.0"
Expand Down
4 changes: 2 additions & 2 deletions src/StreamrClientFacade.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DhtAddress, NodeType, getRawFromDhtAddress } from '@streamr/dht'
import { DhtAddress, NodeType, toDhtAddressRaw } from '@streamr/dht'
import {
NetworkNodeType,
NetworkPeerDescriptor,
Expand All @@ -20,7 +20,7 @@ export const peerDescriptorTranslator = (json: NetworkPeerDescriptor): PeerDescr
const type = json.type === NetworkNodeType.BROWSER ? NodeType.BROWSER : NodeType.NODEJS
return {
...json,
nodeId: getRawFromDhtAddress(json.nodeId as DhtAddress),
nodeId: toDhtAddressRaw(json.nodeId as DhtAddress),
type
}
}
Expand Down
29 changes: 14 additions & 15 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { PeerDescriptor, getNodeIdFromPeerDescriptor } from '@streamr/dht'
import { PeerDescriptor, toNodeId } from '@streamr/dht'
import { DhtAddress, Stream, StreamCreationEvent, StreamMetadata, StreamPermission } from '@streamr/sdk'
import { NodeInfo } from '@streamr/trackerless-network'
import { Logger, StreamID, StreamPartID, StreamPartIDUtils, binaryToHex, toStreamPartID, wait } from '@streamr/utils'
import { difference, range, sortBy } from 'lodash'
import pLimit from 'p-limit'
Expand All @@ -11,7 +10,7 @@ import { MessageRepository, convertStreamMessageToMessageRow } from '../reposito
import { NodeRepository } from '../repository/NodeRepository'
import { StreamRepository } from '../repository/StreamRepository'
import { collect, retry } from '../utils'
import { NetworkNodeFacade } from './NetworkNodeFacade'
import { NetworkNodeFacade, NormalizedNodeInfo } from './NetworkNodeFacade'
import { MAX_SUBSCRIPTION_COUNT, SubscribeGate } from './SubscribeGate'
import { Topology } from './Topology'
import { getMessageRate } from './messageRate'
Expand All @@ -35,7 +34,7 @@ const RECOVERY_DELAY = 5 * 60 * 1000 // TODO from config

const createPeerDescriptorLogOutput = (peerDescriptor: PeerDescriptor) => {
return {
nodeId: getNodeIdFromPeerDescriptor(peerDescriptor),
nodeId: toNodeId(peerDescriptor),
type: peerDescriptor.type,
udp: peerDescriptor.udp,
tcp: peerDescriptor.tcp,
Expand All @@ -47,17 +46,17 @@ const createPeerDescriptorLogOutput = (peerDescriptor: PeerDescriptor) => {
}
}

const createNodeInfoLogOutput = (nodeInfo: NodeInfo) => {
const createNodeInfoLogOutput = (nodeInfo: NormalizedNodeInfo) => {
return {
peerDescriptor: createPeerDescriptorLogOutput(nodeInfo.peerDescriptor),
controlLayer: {
neighbors: nodeInfo.controlLayer!.neighbors.map((n: PeerDescriptor) => getNodeIdFromPeerDescriptor(n)),
connections: nodeInfo.controlLayer!.connections.map((n: PeerDescriptor) => getNodeIdFromPeerDescriptor(n))
neighbors: nodeInfo.controlLayer!.neighbors.map(toNodeId),
connections: nodeInfo.controlLayer!.connections.map(toNodeId)
},
streamPartitions: nodeInfo.streamPartitions.map((sp: any) => ({
id: sp.id,
controlLayerNeighbors: sp.controlLayerNeighbors.map((n: PeerDescriptor) => getNodeIdFromPeerDescriptor(n)),
contentDeliveryLayerNeighbors: sp.contentDeliveryLayerNeighbors.map((n: PeerDescriptor) => getNodeIdFromPeerDescriptor(n))
controlLayerNeighbors: sp.controlLayerNeighbors.map(toNodeId),
contentDeliveryLayerNeighbors: sp.contentDeliveryLayerNeighbors.map((n: any) => toNodeId(n.peerDescriptor)) // TODO better type
})),
version: nodeInfo.version
}
Expand All @@ -66,13 +65,13 @@ const createNodeInfoLogOutput = (nodeInfo: NodeInfo) => {
export const crawlTopology = async (
localNode: NetworkNodeFacade,
entryPoints: PeerDescriptor[],
getNeighbors: (nodeInfo: NodeInfo) => PeerDescriptor[],
getNeighbors: (nodeInfo: NormalizedNodeInfo) => PeerDescriptor[],
runId: string
): Promise<Topology> => {
const nodeInfos: Map<DhtAddress, NodeInfo> = new Map()
const nodeInfos: Map<DhtAddress, NormalizedNodeInfo> = new Map()
const errorNodes: Set<DhtAddress> = new Set()
const processNode = async (peerDescriptor: PeerDescriptor): Promise<void> => {
const nodeId = getNodeIdFromPeerDescriptor(peerDescriptor)
const nodeId = toNodeId(peerDescriptor)
const processed = nodeInfos.has(nodeId) || errorNodes.has(nodeId)
if (processed) {
return
Expand Down Expand Up @@ -141,7 +140,7 @@ export class Crawler {
const topology = await crawlTopology(
networkNodeFacade,
this.client.getEntryPoints(),
(nodeInfo: NodeInfo) => nodeInfo.controlLayer!.neighbors,
(nodeInfo: NormalizedNodeInfo) => nodeInfo.controlLayer!.neighbors,
`full-${Date.now()}`
)
await this.nodeRepository.replaceNetworkTopology(topology)
Expand Down Expand Up @@ -270,11 +269,11 @@ export class Crawler {
const entryPoints = (await Promise.all(range(payload.metadata.partitions)
.map((p) => toStreamPartID(payload.streamId, p))
.map((sp) => localNode.fetchStreamPartEntryPoints(sp)))).flat()
const topology = await crawlTopology(localNode, entryPoints, (nodeInfo: NodeInfo) => {
const topology = await crawlTopology(localNode, entryPoints, (nodeInfo: NormalizedNodeInfo) => {
const streamPartitions = nodeInfo.streamPartitions.filter(
(sp) => StreamPartIDUtils.getStreamID(sp.id as StreamPartID) === payload.streamId
)
return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors)).flat()
return (streamPartitions.map((sp) => sp.contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor!))).flat()
}, `stream-${payload.streamId}-${Date.now()}`)
// TODO could add new nodes and neighbors to NodeRepository?
await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!)
Expand Down
27 changes: 25 additions & 2 deletions src/crawler/NetworkNodeFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,35 @@ import { PeerDescriptor } from '@streamr/dht'
import { NetworkNode, NodeInfo, StreamMessage, streamPartIdToDataKey } from '@streamr/trackerless-network'
import { StreamPartID } from '@streamr/utils'
import EventEmitter3 from 'eventemitter3'
import semver from 'semver'
import { Config } from '../Config'

export interface Events {
subscribe: () => void
unsubscribe: () => void
}

type ArrayElement<ArrayType extends readonly unknown[]> =
ArrayType extends readonly (infer ElementType)[] ? ElementType : never

export type NormalizedNodeInfo = Omit<NodeInfo, 'streamPartitions'>
& { streamPartitions: Omit<ArrayElement<NodeInfo['streamPartitions']>, 'deprecatedContentDeliveryLayerNeighbors'>[] }

const toNormalizeNodeInfo = (info: NodeInfo): NormalizedNodeInfo => {
const isLegacyFormat = semver.satisfies(semver.coerce(info.version)!, '< 102.0.0')
return {
...info,
streamPartitions: info.streamPartitions.map((sp) => ({
...sp,
contentDeliveryLayerNeighbors: !isLegacyFormat
? sp.contentDeliveryLayerNeighbors
: sp.deprecatedContentDeliveryLayerNeighbors.map((n) => ({
peerDescriptor: n
}))
}))
}
}

export class NetworkNodeFacade {

private readonly node: NetworkNode
Expand Down Expand Up @@ -46,8 +68,9 @@ export class NetworkNodeFacade {
return Array.from(this.node.getStreamParts()).length
}

async fetchNodeInfo(peerDescriptor: PeerDescriptor): Promise<NodeInfo> {
return await this.node.fetchNodeInfo(peerDescriptor)
async fetchNodeInfo(peerDescriptor: PeerDescriptor): Promise<NormalizedNodeInfo> {
const info = await this.node.fetchNodeInfo(peerDescriptor)
return toNormalizeNodeInfo(info)
}

async fetchStreamPartEntryPoints(streamPartId: StreamPartID): Promise<PeerDescriptor[]> {
Expand Down
12 changes: 6 additions & 6 deletions src/crawler/Topology.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getNodeIdFromPeerDescriptor } from '@streamr/dht'
import { toNodeId } from '@streamr/dht'
import { DhtAddress, StreamPartID } from '@streamr/sdk'
import { NodeInfo } from '@streamr/trackerless-network'
import { Multimap, numberToIpv4, StreamPartIDUtils } from '@streamr/utils'
import { NormalizedNodeInfo } from './NetworkNodeFacade'

export interface Node {
id: DhtAddress
Expand All @@ -13,17 +13,17 @@ export class Topology {

private nodes: Map<DhtAddress, Node> = new Map()

constructor(infos: NodeInfo[]) {
const nodeIds = new Set(...[infos.map((info) => getNodeIdFromPeerDescriptor(info.peerDescriptor))])
constructor(infos: NormalizedNodeInfo[]) {
const nodeIds = new Set(...[infos.map((info) => toNodeId(info.peerDescriptor))])
for (const info of infos) {
const streamPartNeighbors: Multimap<StreamPartID, DhtAddress> = new Multimap()
for (const streamPartitionInfo of info.streamPartitions) {
const neighbors = streamPartitionInfo.contentDeliveryLayerNeighbors
.map((n) => getNodeIdFromPeerDescriptor(n))
.map((n) => toNodeId(n.peerDescriptor!))
.filter((id) => nodeIds.has(id))
streamPartNeighbors.addAll(StreamPartIDUtils.parse(streamPartitionInfo.id), neighbors)
}
const nodeId = getNodeIdFromPeerDescriptor(info.peerDescriptor)
const nodeId = toNodeId(info.peerDescriptor)
this.nodes.set(nodeId, {
id: nodeId,
streamPartNeighbors,
Expand Down
22 changes: 11 additions & 11 deletions test/APIServer.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'reflect-metadata'

import { createRandomDhtAddress, DhtAddress } from '@streamr/dht'
import { randomDhtAddress, DhtAddress } from '@streamr/dht'
import { Multimap, StreamID, StreamPartID, StreamPartIDUtils, utf8ToBinary } from '@streamr/utils'
import { range, without } from 'lodash'
import Container from 'typedi'
Expand Down Expand Up @@ -338,15 +338,15 @@ describe('APIServer', () => {

describe('nodes', () => {

const node1 = createRandomDhtAddress()
const node2 = createRandomDhtAddress()
const node3 = createRandomDhtAddress()
const node1 = randomDhtAddress()
const node2 = randomDhtAddress()
const node3 = randomDhtAddress()

beforeEach(async () => {
await storeTestTopology([
{ id: StreamPartIDUtils.parse('stream1#0'), nodeIds: [node1, node2] },
{ id: StreamPartIDUtils.parse('stream1#1'), nodeIds: [node2, node3] },
{ id: StreamPartIDUtils.parse('stream2#0'), nodeIds: [createRandomDhtAddress(), createRandomDhtAddress()] }
{ id: StreamPartIDUtils.parse('stream2#0'), nodeIds: [randomDhtAddress(), randomDhtAddress()] }
])
})

Expand Down Expand Up @@ -393,11 +393,11 @@ describe('APIServer', () => {

describe('neighbors', () => {

const node1 = createRandomDhtAddress()
const node2 = createRandomDhtAddress()
const node3 = createRandomDhtAddress()
const node4 = createRandomDhtAddress()
const node5 = createRandomDhtAddress()
const node1 = randomDhtAddress()
const node2 = randomDhtAddress()
const node3 = randomDhtAddress()
const node4 = randomDhtAddress()
const node5 = randomDhtAddress()

beforeEach(async () => {
await storeTestTopology([
Expand Down Expand Up @@ -486,7 +486,7 @@ describe('APIServer', () => {
publisherCount: null,
subscriberCount: null
})
await storeTestTopology([{ id: StreamPartIDUtils.parse('stream#0'), nodeIds: [createRandomDhtAddress(), createRandomDhtAddress()] }])
await storeTestTopology([{ id: StreamPartIDUtils.parse('stream#0'), nodeIds: [randomDhtAddress(), randomDhtAddress()] }])
const summary = await queryAPI(`{
summary {
streamCount
Expand Down
32 changes: 17 additions & 15 deletions test/Crawler.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { DhtAddress, PeerDescriptor, getNodeIdFromPeerDescriptor } from '@streamr/dht'
import { NodeInfo } from '@streamr/trackerless-network'
import { DhtAddress, PeerDescriptor, toNodeId } from '@streamr/dht'
import { StreamPartIDUtils } from '@streamr/utils'
import { crawlTopology } from '../src/crawler/Crawler'
import { createTestPeerDescriptor } from './utils'
import { NormalizedNodeInfo } from '../src/crawler/NetworkNodeFacade'

const STREAM_PART_ID = StreamPartIDUtils.parse('stream#0')

Expand All @@ -11,19 +11,21 @@ describe('Crawler', () => {
let nodes: PeerDescriptor[]
let neighbors: Map<DhtAddress, PeerDescriptor[]>

const createMockNodeInfo = (peerDescriptor: PeerDescriptor): NodeInfo => {
const createMockNodeInfo = (peerDescriptor: PeerDescriptor): NormalizedNodeInfo => {
return {
peerDescriptor,
controlLayer: {
neighbors: [],
connections: []
},
streamPartitions: [{
streamPartitions: [{
id: STREAM_PART_ID,
controlLayerNeighbors: [],
contentDeliveryLayerNeighbors: neighbors.get(getNodeIdFromPeerDescriptor(peerDescriptor)) ?? []
contentDeliveryLayerNeighbors: neighbors.get(toNodeId(peerDescriptor))!.map((n) => ({
peerDescriptor: n
})) ?? []
}],
version: ''
version: '102.0.0'
}
}

Expand All @@ -38,13 +40,13 @@ describe('Crawler', () => {
createTestPeerDescriptor()
]
neighbors = new Map()
neighbors.set(getNodeIdFromPeerDescriptor(nodes[0]), [nodes[1], nodes[2]])
neighbors.set(getNodeIdFromPeerDescriptor(nodes[1]), [nodes[4]])
neighbors.set(getNodeIdFromPeerDescriptor(nodes[2]), [nodes[3]])
neighbors.set(getNodeIdFromPeerDescriptor(nodes[3]), [])
neighbors.set(getNodeIdFromPeerDescriptor(nodes[4]), [nodes[1], nodes[2], nodes[5]])
neighbors.set(getNodeIdFromPeerDescriptor(nodes[5]), [nodes[6]])
neighbors.set(getNodeIdFromPeerDescriptor(nodes[6]), [])
neighbors.set(toNodeId(nodes[0]), [nodes[1], nodes[2]])
neighbors.set(toNodeId(nodes[1]), [nodes[4]])
neighbors.set(toNodeId(nodes[2]), [nodes[3]])
neighbors.set(toNodeId(nodes[3]), [])
neighbors.set(toNodeId(nodes[4]), [nodes[1], nodes[2], nodes[5]])
neighbors.set(toNodeId(nodes[5]), [nodes[6]])
neighbors.set(toNodeId(nodes[6]), [])
})

it('crawlTopology', async () => {
Expand All @@ -58,10 +60,10 @@ describe('Crawler', () => {
const topology = await crawlTopology(
localNode as any,
[nodes[0], nodes[5]],
(response: NodeInfo) => response.streamPartitions[0].contentDeliveryLayerNeighbors,
(response: NormalizedNodeInfo) => response.streamPartitions[0].contentDeliveryLayerNeighbors.map((n) => n.peerDescriptor!),
''
)
expect(localNode.fetchNodeInfo).toHaveBeenCalledTimes(nodes.length)
expect([...topology.getPeers(STREAM_PART_ID)!]).toIncludeSameMembers(nodes.map((n) => getNodeIdFromPeerDescriptor(n)))
expect([...topology.getPeers(STREAM_PART_ID)!]).toIncludeSameMembers(nodes.map(toNodeId))
})
})
Loading

0 comments on commit fd05681

Please sign in to comment.