Flows: add parent to queue before children start? #1986
Replies: 3 comments
-
A great application for this could be monitoring the parent job's progress by the completion of its children. For example, if each child job takes 2 seconds and there are 100 of them, then each completed child would advance the parent's progress by 1%, and when completed, the parent would show that it took 3 minutes and 20 seconds for the parent job. |
Beta Was this translation helpful? Give feedback.
-
I'm starting to think I should layer some kind of enclosing queue over this to accomplish what I want. It might look like this:
This approach would seem to have some redundancy (because every item in Grandparent Queue would also be an item in Parent Queue), but wrapping the Parent➜Child flow with another job would enclose the whole thing. That could even give a simple start/stop timer for each item, which seems impossible to do with flows. |
Beta Was this translation helpful? Give feedback.
-
I implemented this approach and it seems to work well. My "grandparent" worker handles each item in the "Items" queue: const flowProducer = new FlowProducer({ connection });
const itemWorker = new Worker(
'Items',
async (job: Job) => {
const start = Date.now();
const queueData: FlowJob = prepareChildrenForQueue({
item,
priority,
});
const itemFlow = await flowProducer.add(queueData);
const parentId = itemFlow.job.id;
await itemFlow.job.waitUntilFinished(saveDataQueueEvents);
const end = Date.now();
const runTime = new Date(end - start).toISOString().slice(14, -1);
return { runTime };
},
{
connection,
concurrency: 3,
}
); And then the flow's "parent" work focuses only on handling the DB saving (for now), while the "child" worker handles all the ~200 tasks that pass their data to the parent worker for saving to the DB. saveDataWorker: const saveDataWorker = new Worker(
'Save Data',
async (job: Job) => {
const childrenValues = await job.getChildrenValues();
let results = {
foods: [],
};
_.forEach(childrenValues, ({ foods }) => {
results.foods.push(foods);
});
await Promise.all([
saveFoods(results.foods),
]);
return;
},
{
connection,
concurrency: 3,
}
); childWorker: const childWorker = new Worker(
'Foods',
async (job: Job) => {
const results = {
foods: {},
};
// Do the work
return results;
},
{
connection,
concurrency: 200,
}
); |
Beta Was this translation helpful? Give feedback.
-
As I understand it, Flows process all the children before even adding the parent job to the queue.
But is there a way to add the parent to the parent queue first, then the children process, and then the parent finishes?
I'd love for the parent queue logs to reflect how long each parent took including all the children, not only the parent itself.
Beta Was this translation helpful? Give feedback.
All reactions