Skip to content

Commit

Permalink
Store topology neighbors to DB (#8)
Browse files Browse the repository at this point in the history
Store all neighbors of the network topology to the database.

New queries:
- all neighbors in the network topology
- neighbors of a node
- neighbors in a stream part topology

Also some stricter types for GraphQL arguments.
  • Loading branch information
teogeb authored Feb 29, 2024
1 parent a53d1aa commit 9aa4e9b
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 30 deletions.
10 changes: 10 additions & 0 deletions initialize-database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ CREATE TABLE IF NOT EXISTS nodes (
id CHAR(40) NOT NULL PRIMARY KEY,
ipAddress VARCHAR(15)
);

CREATE TABLE IF NOT EXISTS neighbors (
streamPartId VARCHAR(500) NOT NULL,
nodeId1 CHAR(40) NOT NULL,
nodeId2 CHAR(40) NOT NULL,
PRIMARY KEY (streamPartId, nodeId1, nodeId2),
FOREIGN KEY (nodeId1) REFERENCES nodes(id),
FOREIGN KEY (nodeId2) REFERENCES nodes(id),
INDEX neighbors_streamPartId (streamPartId)
);
29 changes: 25 additions & 4 deletions src/api/NodeResolver.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { StreamPartIDUtils } from '@streamr/protocol'
import { DhtAddress } from '@streamr/sdk'
import { DeepOmit } from 'ts-essentials'
import { Arg, FieldResolver, Int, Query, Resolver, Root } from 'type-graphql'
import { Inject, Service } from 'typedi'
import { Location, Node, Nodes } from '../entities/Node'
import { NodeRepository } from '../repository/NodeRepository'
import { Location, Neighbors, Node, Nodes } from '../entities/Node'
import { getLocationFromIpAddress } from '../location'
import { NodeRepository } from '../repository/NodeRepository'

@Resolver(() => Node)
@Service()
Expand All @@ -21,9 +23,28 @@ export class NodeResolver {
async nodes(
@Arg("ids", () => [String], { nullable: true }) ids?: string[],
@Arg("pageSize", () => Int, { nullable: true }) pageSize?: number,
@Arg("cursor", { nullable: true }) cursor?: string,
@Arg("cursor", { nullable: true }) cursor?: string
): Promise<DeepOmit<Nodes, { items: { location: never }[] }>> {
return this.repository.getNodes(ids, pageSize, cursor)
return this.repository.getNodes(
(ids !== undefined) ? ids as DhtAddress[] : undefined,
pageSize,
cursor
)
}

@Query(() => Neighbors)
async neighbors(
@Arg("node", { nullable: true }) nodeId?: DhtAddress,
@Arg("streamPart", { nullable: true }) streamPart?: string,
@Arg("pageSize", () => Int, { nullable: true }) pageSize?: number,
@Arg("cursor", { nullable: true }) cursor?: string
): Promise<Neighbors> {
return this.repository.getNeighbors(
(nodeId !== undefined) ? nodeId as DhtAddress : undefined,
(streamPart !== undefined) ? StreamPartIDUtils.parse(streamPart) : undefined,
pageSize,
cursor
)
}

// eslint-disable-next-line class-methods-use-this
Expand Down
12 changes: 11 additions & 1 deletion src/api/StreamResolver.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { StreamID } from '@streamr/sdk'
import { toEthereumAddress } from '@streamr/utils'
import { Arg, Int, Query, Resolver } from 'type-graphql'
import { Inject, Service } from 'typedi'
import { OrderDirection } from '../entities/OrderDirection'
Expand Down Expand Up @@ -26,6 +28,14 @@ export class StreamResolver {
@Arg("pageSize", () => Int, { nullable: true }) pageSize?: number,
@Arg("cursor", { nullable: true }) cursor?: string,
): Promise<Streams> {
return this.repository.getStreams(ids, searchTerm, owner, orderBy, orderDirection, pageSize, cursor)
return this.repository.getStreams(
(ids !== undefined) ? ids.map((id) => id as StreamID) : undefined,
searchTerm,
(owner !== undefined) ? toEthereumAddress(owner) : undefined,
orderBy,
orderDirection,
pageSize,
cursor
)
}
}
20 changes: 20 additions & 0 deletions src/entities/Node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,23 @@ export class Nodes {
@Field(() => String, { nullable: true })
cursor!: string | null
}

/* eslint-disable indent */
@ObjectType()
export class Neighbor {
@Field()
streamPartId!: string
@Field()
nodeId1!: string
@Field()
nodeId2!: string
}

/* eslint-disable indent */
@ObjectType()
export class Neighbors {
@Field(() => [Neighbor])
items!: Neighbor[]
@Field(() => String, { nullable: true })
cursor!: string | null
}
56 changes: 55 additions & 1 deletion src/repository/NodeRepository.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { DhtAddress } from '@streamr/dht'
import { StreamPartID } from '@streamr/sdk'
import { Logger } from '@streamr/utils'
import { RowDataPacket } from 'mysql2'
import { Inject, Service } from 'typedi'
Expand All @@ -10,6 +12,12 @@ export interface NodeRow extends RowDataPacket {
ipAddress: string | null
}

interface NeighborRow extends RowDataPacket {
streamPartId: string
nodeId1: string
nodeId2: string
}

const logger = new Logger(module)

@Service()
Expand All @@ -24,7 +32,7 @@ export class NodeRepository {
}

async getNodes(
ids?: string[],
ids?: DhtAddress[],
pageSize?: number,
cursor?: string
): Promise<PaginatedListFragment<NodeRow[]>> {
Expand All @@ -42,15 +50,61 @@ export class NodeRepository {
return this.connectionPool.queryPaginated<NodeRow[]>(sql, params)
}

async getNeighbors(
nodeId?: DhtAddress,
streamPartId?: StreamPartID,
pageSize?: number,
cursor?: string
): Promise<PaginatedListFragment<NeighborRow[]>> {
logger.info('Query: getNeighbors', { nodeId, streamPartId })
const whereClauses = []
const params = []
if (nodeId !== undefined) {
whereClauses.push('nodeId1 = ? OR nodeId2 = ?')
params.push(nodeId, nodeId)
}
if (streamPartId !== undefined) {
whereClauses.push('streamPartId = ?')
params.push(streamPartId)
}
const sql = createSqlQuery(
'SELECT streamPartId, nodeId1, nodeId2 FROM neighbors',
whereClauses
)
return this.connectionPool.queryPaginated<NeighborRow[]>(
sql,
params,
pageSize,
cursor
)
}

async replaceNetworkTopology(topology: Topology): Promise<void> {
const nodes = topology.getNodes().map((node) => {
return [node.id, node.ipAddress]
})
const neighbors: [StreamPartID, DhtAddress, DhtAddress][] = []
for (const node of topology.getNodes()) {
for (const streamPartId of node.streamPartNeighbors.keys()) {
const streamPartNeighbors = node.streamPartNeighbors.get(streamPartId)!
for (const neighbor of streamPartNeighbors) {
// If node A and B are neighbors, we assume that there are two associations in the topology:
// A->B and B-A. We don't need to store both associations to the DB. The following comparison
// filters out the duplication. Note that if there is only one side of the association
// in the topology, that association is maybe not stored at all.
if (node.id < neighbor) {
neighbors.push([streamPartId, node.id, neighbor])
}
}
}
}
const connection = await this.connectionPool.getConnection()
try {
await connection.beginTransaction()
await connection.query('DELETE FROM neighbors')
await connection.query('DELETE FROM nodes')
await connection.query('INSERT INTO nodes (id, ipAddress) VALUES ?', [nodes])
await connection.query('INSERT INTO neighbors (streamPartId, nodeId1, nodeId2) VALUES ?', [neighbors])
await connection.commit()
} catch (e) {
connection.rollback()
Expand Down
5 changes: 3 additions & 2 deletions src/repository/StreamRepository.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { StreamID } from '@streamr/sdk'
import { Logger } from '@streamr/utils'
import { RowDataPacket } from 'mysql2/promise'
import { Inject, Service } from 'typedi'
import { StreamrClientFacade } from '../StreamrClientFacade'
import { OrderDirection } from '../entities/OrderDirection'
import { StreamOrderBy, Stream, Streams } from '../entities/Stream'
import { Stream, StreamOrderBy, Streams } from '../entities/Stream'
import { collect, createSqlQuery } from '../utils'
import { ConnectionPool } from './ConnectionPool'

Expand Down Expand Up @@ -38,7 +39,7 @@ export class StreamRepository {
}

async getStreams(
ids?: string[],
ids?: StreamID[],
searchTerm?: string,
owner?: string,
orderBy?: StreamOrderBy,
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export const queryAPI = async (query: string, port: number): Promise<any> => {
export const createSqlQuery = (select: string, whereClauses: string[], orderByExpression?: string): string => {
let sql = select
if (whereClauses.length > 0) {
sql += ` WHERE ${whereClauses.join(' AND ')}`
sql += ` WHERE ${whereClauses.map((c) => `(${c})`).join(' AND ')}`
}
if (orderByExpression !== undefined) {
sql += ` ORDER BY ${orderByExpression}`
Expand Down
110 changes: 91 additions & 19 deletions test/APIServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,33 @@ import { dropTestDatabaseIfExists, TEST_DATABASE_NAME } from './utils'
import { NodeRepository } from '../src/repository/NodeRepository'
import { DhtAddress, createRandomDhtAddress } from '@streamr/dht'
import { Multimap } from '@streamr/utils'
import { StreamPartIDUtils } from '@streamr/protocol'

const TOPOLOGY_STREAM_PART_ID = StreamPartIDUtils.parse('stream#0')
import { StreamPartID, StreamPartIDUtils } from '@streamr/protocol'

const storeTestTopology = async (
node1: DhtAddress,
node2: DhtAddress
streamParts: {
id: StreamPartID
node1: DhtAddress
node2: DhtAddress
}[]
) => {
const nodeRepository = Container.get(NodeRepository)
const streamPartNeighbors1 = new Multimap()
streamPartNeighbors1.add(TOPOLOGY_STREAM_PART_ID, node2)
const streamPartNeighbors2 = new Multimap()
streamPartNeighbors2.add(TOPOLOGY_STREAM_PART_ID, node1)
await nodeRepository.replaceNetworkTopology({
getNodes: () => [{
id: node1,
const nodes: any[] = []
for (const streamPart of streamParts) {
const streamPartNeighbors1 = new Multimap()
streamPartNeighbors1.add(streamPart.id, streamPart.node2)
const streamPartNeighbors2 = new Multimap()
streamPartNeighbors2.add(streamPart.id, streamPart.node1)
nodes.push({
id: streamPart.node1,
streamPartNeighbors: streamPartNeighbors1,
ipAddress: '123.1.2.3'
}, {
id: node2,
id: streamPart.node2,
streamPartNeighbors: streamPartNeighbors2,
ipAddress: '123.1.2.3'
}]
} as any)
})
}
await nodeRepository.replaceNetworkTopology({ getNodes: () => nodes } as any)
}

describe('APIServer', () => {
Expand Down Expand Up @@ -269,11 +272,19 @@ describe('APIServer', () => {

describe('nodes', () => {

it('location', async () => {
await storeTestTopology(createRandomDhtAddress(), createRandomDhtAddress())
const node1 = createRandomDhtAddress()
const node2 = createRandomDhtAddress()

beforeEach(async () => {
await storeTestTopology([{ id: StreamPartIDUtils.parse('stream#0'), node1, node2 }])
})

it('ids', async () => {
const response = await queryAPI(`{
nodes {
nodes(ids: ["${node1}"]) {
items {
id
ipAddress
location {
latitude
longitude
Expand All @@ -285,6 +296,8 @@ describe('APIServer', () => {
}`, apiPort)
const node = response['items'][0]
expect(node).toEqual({
id: node1,
ipAddress: '123.1.2.3',
location: {
city: 'Nagoya',
country: 'JP',
Expand All @@ -293,6 +306,65 @@ describe('APIServer', () => {
}
})
})
})

describe('neighbors', () => {

const node1 = createRandomDhtAddress()
const node2 = createRandomDhtAddress()
const node3 = createRandomDhtAddress()
const node4 = createRandomDhtAddress()

beforeEach(async () => {
await storeTestTopology([
{ id: StreamPartIDUtils.parse('stream#0'), node1, node2 },
{ id: StreamPartIDUtils.parse('stream#1'), node1: node3, node2: node4 }
])
})

it('all', async () => {
const response = await queryAPI(`{
neighbors {
items {
streamPartId
nodeId1
nodeId2
}
}
}`, apiPort)
const neighbors = response['items']
const actualNodes = [neighbors[0].nodeId1, neighbors[0].nodeId2, neighbors[1].nodeId1, neighbors[1].nodeId2]
expect(actualNodes).toIncludeSameMembers([node1, node2, node3, node4])
})

it('filter by node', async () => {
const response1 = await queryAPI(`{
neighbors(node: "${node1}") {
items {
streamPartId
nodeId1
nodeId2
}
}
}`, apiPort)
const neighbors = response1['items']
const actualNodes = [neighbors[0].nodeId1, neighbors[0].nodeId2]
expect(actualNodes).toIncludeSameMembers([node1, node2])
})

it('filter by stream part', async () => {
const response = await queryAPI(`{
neighbors(streamPart: "stream#0") {
items {
nodeId1
nodeId2
}
}
}`, apiPort)
const neighbors = response['items']
const actualNodes = [neighbors[0].nodeId1, neighbors[0].nodeId2]
expect(actualNodes).toIncludeSameMembers([node1, node2])
})
})

it('summary', async () => {
Expand All @@ -313,7 +385,7 @@ describe('APIServer', () => {
publisherCount: null,
subscriberCount: null
})
await storeTestTopology(createRandomDhtAddress(), createRandomDhtAddress())
await storeTestTopology([{ id: StreamPartIDUtils.parse('stream#0'), node1: createRandomDhtAddress(), node2: createRandomDhtAddress() }])
const summary = await queryAPI(`{
summary {
streamCount
Expand Down
Loading

0 comments on commit 9aa4e9b

Please sign in to comment.