Skip to content

Commit

Permalink
chore: writeQueue more resilient
Browse files Browse the repository at this point in the history
  • Loading branch information
mabels committed Dec 23, 2024
1 parent a65855b commit cbf933f
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions src/write-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,26 @@ class WriteQueueImpl<T extends DocTypes> implements WriteQueue<T> {
return;
}
this.isProcessing = true;
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks");
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][];
const promises = updates.map(async (update, index) => {
try {
const result = await this.worker(update);
tasksToProcess[index].resolve(result);
} catch (error) {
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError());
}
});
await Promise.allSettled(promises);
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks");
this.isProcessing = false;
setTimeout(() => this.process(), 0);
try {
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks");
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][];
const promises = updates.map(async (update, index) => {
try {
const result = await this.worker(update);
tasksToProcess[index].resolve(result);
} catch (error) {
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError());
}
});
await Promise.allSettled(promises);
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks");
} catch (error) {
this.logger.Error().Err(error).Msg("Error processing tasks");
} finally {
this.isProcessing = false;
setTimeout(() => this.process(), 0);
}
}

bulk(tasks: DocUpdate<T>[]): Promise<MetaType> {
Expand Down

0 comments on commit cbf933f

Please sign in to comment.