Skip to content

Commit

Permalink
feat: [NET-1290]: Store sample message (#17)
Browse files Browse the repository at this point in the history
Added new `sampleMessage` endpoint. It returns a sample message from a specified stream. 

- In each crawl iteration, we store one random message for each stream, replacing any previous sample from earlier iteration
- We collect samples from public streams only

The sample message can be in JSON or binary format. JSON is returned as a plain string and binary as base64-encoded string.
  • Loading branch information
teogeb authored Apr 12, 2024
1 parent 5dfa1b8 commit dda1e8d
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 28 deletions.
6 changes: 6 additions & 0 deletions initialize-database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ CREATE TABLE IF NOT EXISTS streams (
INDEX streams_subscriberCount (subscriberCount)
);

CREATE TABLE IF NOT EXISTS sample_messages (
streamId VARCHAR(500) NOT NULL PRIMARY KEY,
content MEDIUMBLOB NOT NULL,
contentType VARCHAR(50) NOT NULL
);

CREATE TABLE IF NOT EXISTS nodes (
id CHAR(40) NOT NULL PRIMARY KEY,
ipAddress VARCHAR(15)
Expand Down
3 changes: 2 additions & 1 deletion src/api/APIServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Config, CONFIG_TOKEN } from '../Config'
import { StreamResolver } from './StreamResolver'
import { SummaryResolver } from './SummaryResolver'
import { NodeResolver } from './NodeResolver'
import { MessageResolver } from './MessageResolver'

const logger = new Logger(module)

Expand All @@ -30,7 +31,7 @@ export class APIServer {

async start(): Promise<void> {
const schema = await buildSchema({
resolvers: [StreamResolver, NodeResolver, SummaryResolver],
resolvers: [StreamResolver, MessageResolver, NodeResolver, SummaryResolver],
container: Container,
validate: false
})
Expand Down
36 changes: 36 additions & 0 deletions src/api/MessageResolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Arg, Query, Resolver } from 'type-graphql'
import { Inject, Service } from 'typedi'
import { ContentType, Message } from '../entities/Message'
import { MessageRepository } from '../repository/MessageRepository'
import { toStreamID } from '@streamr/protocol'
import { binaryToUtf8 } from '@streamr/utils'

@Resolver()
@Service()
export class MessageResolver {

private repository: MessageRepository

constructor(
@Inject() repository: MessageRepository
) {
this.repository = repository
}

@Query(() => Message, { nullable: true })
async sampleMessage(
@Arg("stream", { nullable: false }) streamId: string
): Promise<Message | null> {
const message = await this.repository.getSampleMessage(toStreamID(streamId))
if (message !== null) {
return {
content: (message.contentType === ContentType.JSON)
? binaryToUtf8(message.content)
: Buffer.from(message.content).toString('base64'),
contentType: message.contentType
}
} else {
return null
}
}
}
18 changes: 16 additions & 2 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { NetworkNodeFacade } from './NetworkNodeFacade'
import { MAX_SUBSCRIPTION_COUNT, SubscribeGate } from './SubscribeGate'
import { Topology } from './Topology'
import { getMessageRate } from './messageRate'
import { MessageRepository, convertStreamMessageToMessageRow } from '../repository/MessageRepository'

const logger = new Logger(module)

Expand Down Expand Up @@ -97,10 +98,15 @@ export const crawlTopology = async (
return new Topology([...nodeInfos.values()])
}

const isPublicStream = (subscriberCount: number | null) => {
return subscriberCount === null
}

@Service()
export class Crawler {

private readonly streamRepository: StreamRepository
private readonly messageRepository: MessageRepository
private readonly nodeRepository: NodeRepository
private readonly client: StreamrClientFacade
private readonly config: Config
Expand All @@ -109,11 +115,13 @@ export class Crawler {

constructor(
@Inject() streamRepository: StreamRepository,
@Inject() messageRepository: MessageRepository,
@Inject() nodeRepository: NodeRepository,
@Inject() client: StreamrClientFacade,
@Inject(CONFIG_TOKEN) config: Config
) {
this.streamRepository = streamRepository
this.messageRepository = messageRepository
this.nodeRepository = nodeRepository
this.client = client
this.config = config
Expand Down Expand Up @@ -190,18 +198,20 @@ export class Crawler {
peersByPartition.set(partition, topology.getPeers(toStreamPartID(id, partition)))
}
try {
const publisherCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.PUBLISH)
const subscriberCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.SUBSCRIBE)
const peerIds = new Set(...peersByPartition.values())
const messageRate = (peerIds.size > 0)
? await getMessageRate(
id,
[...peersByPartition.keys()],
isPublicStream(subscriberCount),
await this.client.getNetworkNodeFacade(),
subscribeGate,
this.config
)
: { messagesPerSecond: 0, bytesPerSecond: 0 }
const publisherCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.PUBLISH)
const subscriberCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.SUBSCRIBE)

logger.info(`Replace ${id}`)
await this.streamRepository.replaceStream({
id,
Expand All @@ -212,6 +222,10 @@ export class Crawler {
publisherCount,
subscriberCount
})
const sampleMessage = (messageRate.sampleMessage !== undefined)
? convertStreamMessageToMessageRow(messageRate.sampleMessage)
: null
await this.messageRepository.replaceSampleMessage(sampleMessage, id)
} catch (e: any) {
logger.error(`Failed to analyze ${id}`, e)
}
Expand Down
10 changes: 9 additions & 1 deletion src/crawler/messageRate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,32 @@ const logger = new Logger(module)
// If there are many partitions, we approximate the message rate of a stream by analyzing only some of the partitions.
// We assume that traffic levels in each partitions are be quite similar.
export const MAX_PARTITION_COUNT = 10
const MAX_MESSAGE_SIZE = 1048576

export interface MessageRate {
messagesPerSecond: number
bytesPerSecond: number
sampleMessage?: StreamMessage
}

export const getMessageRate = async (
streamId: StreamID,
activePartitions: number[],
isPublicStream: boolean,
node: NetworkNodeFacade,
subscibeGate: Gate,
config: Config
): Promise<MessageRate> => {
let messageCount = 0
let bytesSum = 0
let sampleMessage: StreamMessage | undefined = undefined
const messageListener = (msg: StreamMessage) => {
if (msg.getStreamId() === streamId) {
messageCount++
bytesSum += msg.content.length
if ((sampleMessage === undefined) && isPublicStream && (msg.content.length <= MAX_MESSAGE_SIZE)) {
sampleMessage = msg
}
}
}
node.addMessageListener(messageListener)
Expand All @@ -53,7 +60,8 @@ export const getMessageRate = async (
}
const rate = {
messagesPerSecond: calculateRate(messageCount),
bytesPerSecond: calculateRate(bytesSum)
bytesPerSecond: calculateRate(bytesSum),
sampleMessage
}
// eslint-disable-next-line max-len
logger.info(`Message rate ${streamId}: messagesPerSecond=${rate.messagesPerSecond}, bytesPerSecond=${rate.bytesPerSecond}`, { messageCount, samplePartitions, activePartitions })
Expand Down
15 changes: 15 additions & 0 deletions src/entities/Message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Field, ObjectType } from 'type-graphql'

export enum ContentType {
JSON = 'JSON',
BINARY = 'BINARY'
}

/* eslint-disable indent */
@ObjectType()
export class Message {
@Field(() => String, { description: 'JSON string if contentType is JSON, otherwise base64-encoded binary content' })
content!: string
@Field()
contentType!: ContentType
}
67 changes: 67 additions & 0 deletions src/repository/MessageRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Inject, Service } from 'typedi'
import { ConnectionPool } from './ConnectionPool'
import { StreamID } from '@streamr/protocol'
import { ContentType } from '../entities/Message'
import { StreamMessage, ContentType as StreamMessageContentType } from '@streamr/protocol'

export interface MessageRow {
content: Uint8Array
contentType: ContentType
}

export const convertStreamMessageToMessageRow = (msg: StreamMessage): MessageRow => {
let contentType
if (msg.contentType === StreamMessageContentType.JSON) {
contentType = ContentType.JSON
} else if (msg.contentType === StreamMessageContentType.BINARY) {
contentType = ContentType.BINARY
} else {
throw new Error(`Assertion failed: unknown content type ${msg.contentType}`)
}
return {
content: msg.content,
contentType
}
}

@Service()
export class MessageRepository {

private readonly connectionPool: ConnectionPool

constructor(
@Inject() connectionPool: ConnectionPool
) {
this.connectionPool = connectionPool
}

async getSampleMessage(streamId: StreamID): Promise<MessageRow | null> {
const rows = await this.connectionPool.queryOrExecute<MessageRow>(
'SELECT content, contentType FROM sample_messages WHERE streamId=? LIMIT 1',
[streamId]
)
if (rows.length === 1) {
return rows[0]
} else {
return null
}
}

async replaceSampleMessage(message: MessageRow | null, streamId: StreamID): Promise<void> {
if (message !== null) {
await this.connectionPool.queryOrExecute(
'REPLACE INTO sample_messages (streamId, content, contentType) VALUES (?, ?, ?)',
[
streamId,
Buffer.from(message.content),
message.contentType
]
)
} else {
await this.connectionPool.queryOrExecute(
'DELETE FROM sample_messages WHERE streamId=?',
[streamId]
)
}
}
}
58 changes: 57 additions & 1 deletion test/APIServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import { createDatabase, queryAPI } from '../src/utils'
import { dropTestDatabaseIfExists, TEST_DATABASE_NAME } from './utils'
import { NodeRepository } from '../src/repository/NodeRepository'
import { DhtAddress, createRandomDhtAddress } from '@streamr/dht'
import { Multimap } from '@streamr/utils'
import { Multimap, utf8ToBinary } from '@streamr/utils'
import { StreamPartID, StreamPartIDUtils } from '@streamr/protocol'
import { MessageRepository } from '../src/repository/MessageRepository'
import { ContentType } from '../src/entities/Message'
import { StreamID } from '@streamr/protocol'

const storeTestTopology = async (
streamParts: {
Expand Down Expand Up @@ -282,6 +285,59 @@ describe('APIServer', () => {
}])
})

describe('sampleMessage', () => {

it('JSON', async () => {
const streamId = `stream-${Date.now()}` as StreamID
const content = { foo: 'bar' }
const repository = Container.get(MessageRepository)
await repository.replaceSampleMessage({
content: utf8ToBinary(JSON.stringify(content)),
contentType: ContentType.JSON
}, streamId)
const sample = await queryAPI(`{
sampleMessage(stream: "${streamId}") {
content
contentType
}
}`, apiPort)
expect(sample).toEqual({
content: JSON.stringify(content),
contentType: 'JSON'
})
})

it('binary', async () => {
const streamId = `stream-${Date.now()}` as StreamID
const repository = Container.get(MessageRepository)
await repository.replaceSampleMessage({
content: new Uint8Array([1, 2, 3, 4]),
contentType: ContentType.BINARY
}, streamId)
const sample = await queryAPI(`{
sampleMessage(stream: "${streamId}") {
content
contentType
}
}`, apiPort)
expect(sample).toEqual({
content: 'AQIDBA==',
contentType: 'BINARY'
})
})

it('not found', async () => {
const streamId = `stream-${Date.now()}` as StreamID
const sample = await queryAPI(`{
sampleMessage(stream: "${streamId}") {
content
contentType
}
}`, apiPort)
expect(sample).toBeNull()
})
})

describe('nodes', () => {

const node1 = createRandomDhtAddress()
Expand Down
Loading

0 comments on commit dda1e8d

Please sign in to comment.