Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: KafkaJS Store #234

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,34 @@ jobs:
node-version: [18.x, 20.x]
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
services:
kafka-zookeeper:
image: wurstmeister/zookeeper
ports:
- 2181:2181
env:
ALLOW_ANONYMOUS_LOGIN: yes
options: >-
--health-cmd "echo mntr | nc -w 2 -q 2 localhost 2181"
--health-interval 10s
--health-timeout 5s
--health-retries 5
kafka:
image: wurstmeister/kafka
ports:
- 9092:9092
options: >-
--health-cmd "kafka-broker-api-versions.sh --version"
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ZOOKEEPER_CONNECT: "kafka-zookeeper:2181"
KAFKA_ADVERTISED_PORT: 9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4.1.0
- uses: actions/setup-node@5e21ff4d9bc1a8cf6de233a3057d20ec6b3fb69d # v3.8.1
Expand Down
31 changes: 31 additions & 0 deletions .pnp.cjs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file not shown.
60 changes: 60 additions & 0 deletions packages/milliejs-store-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# MillieJS: Kafka Store

## Getting Started

### Installation

```
npm add milliejs @milliejs/store-kafka
```

### Usage

```js
import MillieJS from "milliejs"
import MillieCRUDStore from "@milliejs/store-*"
import KafkaStore from "@milliejs/store-kafka"
import MillieMemoryStore from "@milliejs/store-*"

const millie = new MillieJS()

const replicaStore = new MillieMemoryStore({})

const muUpstreamCRUDInterface = new MillieCRUDStore({})
const myUpstreamKafkaInterface = new KafkaStore()

const personResource = { id: "person" }
millie.registerResource(personResource, replicaStore, {
sourcePublisher: muUpstreamCRUDInterface,
sourceSubscriber: myUpstreamKafkaInterface,
})
```

#### Configuration

##### Kafka Client

The first argument of the Kafka Store follows the shape of the KafkaJS SDK's
client configuration. More information can be found in the
[KafkaJS documentation](https://kafka.js.org/docs/configuration).

## Contributing

The MillieJS library is currently in its early development stages and is
welcoming contributions, feature requests, and feedback.

## Disclaimer

The MillieJS library is currently in its early development stages and is not
ready for use in production applications. There may be bugs and performance
issues that have not yet been addressed. Use the library at your own risk and
be sure to thoroughly test any implementation before deploying it to a live
environment.

Please note that the API and functionality may change as the library evolves
and stabilizes. If you choose to use MillieJS in your project, we recommend
regularly checking for updates and making any necessary modifications to your
code to ensure compatibility.

**In summary, do not use the MillieJS library in a production environment until
further notice.**
42 changes: 42 additions & 0 deletions packages/milliejs-store-kafka/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"name": "@milliejs/store-kafka",
"version": "0.3.0-alpha.0",
"description": "A Kafka adapter for the MillieJS incremental store utility",
"license": "MIT",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"repository": {
"type": "git",
"url": "https://github.com/sbonami/milliejs.git",
"directory": "packages/milliejs-store-kafka"
},
"bugs": {
"url": "https://github.com/sbonami/milliejs/issues"
},
"files": [
"dist"
],
"scripts": {
"build": "tsc --build",
"test": "echo \"Error: no test specified\" && exit 1"
},
"dependencies": {
"@milliejs/store-base": "workspace:^",
"kafkajs": "^2.2.4"
},
"devDependencies": {
"@milliejs/jest-utils": "workspace:^",
"@milliejs/store-memory": "workspace:^",
"@types/node": "^20.5.0",
"milliejs": "workspace:^",
"tiny-invariant": "^1.3.1",
"typescript": "^4.9.5"
},
"engines": {
"node": ">=18.0.0 || >=20.0.0"
},
"publishConfig": {
"access": "public",
"provenance": true
}
}
77 changes: 77 additions & 0 deletions packages/milliejs-store-kafka/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import EventEmitter from "node:events"
import {
Consumer,
ConsumerConfig,
ConsumerSubscribeTopics,
EachMessagePayload,
Kafka,
KafkaConfig,
} from "kafkajs"
import type {
Entity,
Resource,
SubscriberActionEventKeys,
SubscriberActionInterface,
} from "@milliejs/store-base"

type ParsedMessage<R extends Resource> = {
eventName: SubscriberActionEventKeys<R>
entity: Entity<R>
}

export type MessageParser<R extends Resource> = (
message: EachMessagePayload,
) => ParsedMessage<R>

export type KafkaStoreConfig = KafkaConfig & { consumer: ConsumerConfig }

export class KafkaSubscriber<R extends Resource = Resource<unknown>>
extends EventEmitter
implements SubscriberActionInterface
{
private client: Kafka
private consumer: Consumer

constructor(
clientOptions: KafkaStoreConfig,
private readonly topics: ConsumerSubscribeTopics[],
readonly parser: MessageParser<R>,
) {
const { consumer: consumerConfig, ..._config } = clientOptions
super()
this.client = new Kafka(_config)

this.consumer = this.client.consumer(consumerConfig)
}

async connect() {
await this.consumer.connect()
await Promise.all(
this.topics.map((topic) => this.consumer.subscribe(topic)),
)
await this.consumer.run({
eachMessage: async (messagePayload: EachMessagePayload) => {
const { topic, partition, message } = messagePayload
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
console.log(`- ${prefix} ${message.key}#${message.value}`)

this.messageListener(messagePayload)
},
})
}

async disconnect() {
await this.consumer.disconnect()
}

private messageListener(message: EachMessagePayload) {
try {
const { eventName, entity } = this.parser(message)
this.emit(eventName, entity)
} catch (error) {
this.emit("error", error)
}
}
}

export default KafkaSubscriber
Loading
Loading