Opening Change Stream with 30 specified collection or watch on database with 100 collection #3179
-
Sorry to open discussion here, mongodb github driver doesnt have discussion. Been scavenging the internet and I dont see any helpful guides. What I just know is when watching 30 collection it mean there is 30 connnections while watching an entire database means 1 connection. Hopefully the mongodb php team could help me enlighten whats the better approach thanks. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
These questions are usually handled in the Developer Community Forums, especially as the question may also apply to other drivers.
Before going into details, it's important to know a few details about the PHP driver:
Let's take the following code from the Change Stream example: $changeStream = $collection->watch();
$documents = [];
for ($i = 0; $i < 10; $i++) {
$documents[] = ['x' => $i];
}
$collection->insertMany($documents);
$changeStream->rewind();
$startTime = time();
while (true) {
if ($changeStream->valid()) {
$event = $changeStream->current();
assert(is_object($event));
printf("%s\n", toJSON($event));
}
$changeStream->next();
if (time() - $startTime > 3) {
echo "Aborting after 3 seconds...\n";
break;
}
} The return value of While the example above immediately calls This brings us to connection counts: yes, if you want to have change streams open for 30 collections and iterate them in a truly parallel fashion, due to the single-threaded nature of PHP you'd have to have 30 PHP processes running, which in turn means that there are at least 30 connections open to your deployment (note that when connecting to a replica set, the driver opens a connection to each replica set member it has discovered). This isn't entirely necessary though - your script could also open the 30 change streams in the same process. Here's an example with two change streams, just quickly hacked together without testing: $changeStreams = [
$client->db->coll1->watch(),
$client->db->coll2->watch(),
];
while (true) {
if ($changeStreams[0]->valid()) {
// Handle change stream 1
}
if ($changeStreams[1]->valid()) {
// Handle change stream 2
}
// Advance both change streams
$changeStreams[0]->next();
$changeStreams[1]->next();
// TODO: may want to stop at some point
} The downside here is that if one of the change streams is very busy (i.e. generates lots of documents) and the other isn't, the less busy change stream will invoke a |
Beta Was this translation helpful? Give feedback.
-
Thanks for explaining to me @alcaeus To get more idea, I have a laravel command being run indefinitely by supervisor Each entry in collection there is 4 different events, my plan is to add more collection that monitors and convert those events into laravel events. Each collection will have its own pcntl_fork, has monitoring to restart if there is problem and resume token to resume in case of downtime. Now if I add more entries in collection, base on my code it will open new Mongodb Client, I was thinking of instead of watching collection, it might be better to watch the whole database in performance perspective? protected $collections = [
'users' => [
'insert' =>\App\Events\UserInsertedEvent::class,
// 'update' => \App\Events\UpdateEvent::class,
// 'delete' => \App\Events\DeleteEvent::class,
// 'replace' => \App\Events\ReplaceEvent::class,
],
'hobbies' => [
'insert' => HobbyInsertedEvent::class,
// 'update' => \App\Events\UpdateEvent::class,
// 'delete' => \App\Events\DeleteEvent::class,
// 'replace' => \App\Events\ReplaceEvent::class,
],
// Add more collections and their operation event classes here
];
/**
* Execute the console command.
*/
public function handle()
{
$client = $this->createMongoClient();
$databaseName = config('database.connections.mongodb.database');
foreach ($this->collections as $collectionName => $eventClasses) {
// Fork process for each collection
$this->forkProcess($client, $databaseName, $collectionName, $eventClasses);
}
// Monitor and restart processes
$this->monitorProcesses();
}
/**
* Create and configure a MongoDB client instance.
*/
private function createMongoClient(): Client
{
return new Client(config('database.connections.mongodb.dsn'));
}
/**
* Fork a process to handle change streams for a specific collection.
*/
private function forkProcess(Client $client, string $databaseName, string $collectionName, array $eventClasses)
{
$pid = pcntl_fork();
if ($pid == -1) {
$this->error('Could not fork process.');
return;
} elseif ($pid == 0) {
// Child process
try {
$this->processChangeStream($client, $databaseName, $collectionName, $eventClasses);
} catch (\Exception $e) {
$this->error('Error in process: '.$e->getMessage());
}
exit;
} else {
// Parent process
$this->processes[$collectionName] = $pid;
}
} |
Beta Was this translation helpful? Give feedback.
There isn't any clear guidance, except for "it depends". If you find yourself constrained by lots of connections, it might be more beneficial to open a single change stream, filter for the events and collections you're looking for. On the other hand, if you find you're getting lots of events for all collections combined to the point where it's starting to bottleneck, you'll want to create multiple change streams to handle them in parallel, using multiple connections to the server.