Skip to content

Commit

Permalink
fix(delayed): trim events when moving jobs to delayed (python) (#2211)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 4, 2023
1 parent 158b850 commit eca8c2d
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 43 deletions.
6 changes: 5 additions & 1 deletion src/commands/moveToDelayed-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ if rcall("EXISTS", jobKey) == 1 then
return -3
end

local maxEvents = rcall("HGET", KEYS[8], "opts.maxLenEvents") or 10000

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", KEYS[6], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp)
rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed",
--rcall("XADD", KEYS[6], "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- Check if we need to push a marker job to wake up sleeping workers.
local target = getTargetQueueList(KEYS[8], KEYS[1], KEYS[7])
Expand Down
199 changes: 157 additions & 42 deletions tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -486,62 +486,177 @@ describe('events', function () {
await worker.close();
});

it('should trim events automatically', async () => {
const trimmedQueue = new Queue(queueName, {
connection,
streams: {
events: {
maxLen: 0,
describe('when maxLen is 0', function () {
it('should trim events automatically', async () => {
const trimmedQueue = new Queue(queueName, {
connection,
streams: {
events: {
maxLen: 0,
},
},
},
});
});

const worker = new Worker(
queueName,
async () => {
await delay(100);
},
{ connection },
);
const worker = new Worker(
queueName,
async () => {
await delay(100);
},
{ connection },
);

await trimmedQueue.waitUntilReady();
await worker.waitUntilReady();
await trimmedQueue.waitUntilReady();
await worker.waitUntilReady();

const client = await trimmedQueue.client;
const client = await trimmedQueue.client;

const waitCompletedEvent = new Promise<void>(resolve => {
queueEvents.on(
'completed',
after(3, async () => {
resolve();
}),
const waitCompletedEvent = new Promise<void>(resolve => {
queueEvents.on(
'completed',
after(3, async () => {
resolve();
}),
);
});

await trimmedQueue.addBulk([
{ name: 'test', data: { foo: 'bar' } },
{ name: 'test', data: { foo: 'baz' } },
{ name: 'test', data: { foo: 'bar' } },
]);

await waitCompletedEvent;

const [[id, [_, drained]], [, [, completed]]] = await client.xrevrange(
trimmedQueue.keys.events,
'+',
'-',
);

expect(drained).to.be.equal('drained');
expect(completed).to.be.equal('completed');

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(2);

await worker.close();
await trimmedQueue.close();
await removeAllQueueData(new IORedis(), queueName);
});
});

await trimmedQueue.addBulk([
{ name: 'test', data: { foo: 'bar' } },
{ name: 'test', data: { foo: 'baz' } },
{ name: 'test', data: { foo: 'bar' } },
]);
describe('when maxLen is greater than 0', function () {
it('should trim events so its length is at least the threshold', async () => {
const numJobs = 80;
const trimmedQueue = new Queue(queueName, {
connection,
streams: {
events: {
maxLen: 20,
},
},
});

await waitCompletedEvent;
const worker = new Worker(
queueName,
async () => {
await delay(50);
},
{ connection },
);

const [[id, [_, drained]], [, [, completed]]] = await client.xrevrange(
trimmedQueue.keys.events,
'+',
'-',
);
await trimmedQueue.waitUntilReady();
await worker.waitUntilReady();

expect(drained).to.be.equal('drained');
expect(completed).to.be.equal('completed');
const client = await trimmedQueue.client;

const eventsLength = await client.xlen(trimmedQueue.keys.events);
const waitCompletedEvent = new Promise<void>(resolve => {
queueEvents.on(
'completed',
after(numJobs, async () => {
resolve();
}),
);
});

const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'test',
data: { foo: 'bar' },
}));

expect(eventsLength).to.be.lte(2);
await trimmedQueue.addBulk(jobs);

await worker.close();
await trimmedQueue.close();
await removeAllQueueData(new IORedis(), queueName);
await waitCompletedEvent;

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(35);
expect(eventsLength).to.be.gte(20);

await worker.close();
await trimmedQueue.close();
await removeAllQueueData(new IORedis(), queueName);
});

describe('when jobs are moved to delayed', function () {
it('should trim events so its length is at least the threshold', async () => {
const numJobs = 80;
const trimmedQueue = new Queue(queueName, {
connection,
streams: {
events: {
maxLen: 20,
},
},
});

const i = 0;
const worker = new Worker(
queueName,
async () => {
await delay(50);
throw new Error('error');
},
{ connection },
);

await trimmedQueue.waitUntilReady();
await worker.waitUntilReady();

const client = await trimmedQueue.client;

const waitDelayedEvent = new Promise<void>(resolve => {
queueEvents.on(
'delayed',
after(numJobs, async () => {
resolve();
}),
);
});

const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'test',
data: { foo: 'bar' },
opts: {
attempts: 2,
backoff: 5000,
},
}));
await trimmedQueue.addBulk(jobs);

await waitDelayedEvent;

const eventsLength = await client.xlen(trimmedQueue.keys.events);

expect(eventsLength).to.be.lte(35);
expect(eventsLength).to.be.gte(20);

await worker.close();
await trimmedQueue.close();
await removeAllQueueData(new IORedis(), queueName);
});
});
});

it('should trim events manually', async () => {
Expand Down

0 comments on commit eca8c2d

Please sign in to comment.