diff --git a/server/src/interfaces/event.interface.ts b/server/src/interfaces/event.interface.ts index 7ea48faf5380f5..40efaf150c4df4 100644 --- a/server/src/interfaces/event.interface.ts +++ b/server/src/interfaces/event.interface.ts @@ -22,7 +22,7 @@ type EventMap = { 'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }]; // album events - 'album.update': [{ id: string; updatedBy: string }]; + 'album.update': [{ id: string; recipientIds: string[] }]; 'album.invite': [{ id: string; userId: string }]; // asset events diff --git a/server/src/interfaces/job.interface.ts b/server/src/interfaces/job.interface.ts index aa3090675e3fe0..9d9ddb1fe3c2fe 100644 --- a/server/src/interfaces/job.interface.ts +++ b/server/src/interfaces/job.interface.ts @@ -120,6 +120,11 @@ export interface IBaseJob { force?: boolean; } +export interface IDelayedJob extends IBaseJob { + /** The minimum time to wait to execute this job, in milliseconds. */ + delay?: number; +} + export interface IEntityJob extends IBaseJob { id: string; source?: 'upload' | 'sidecar-write' | 'copy'; @@ -181,8 +186,8 @@ export interface INotifyAlbumInviteJob extends IEntityJob { recipientId: string; } -export interface INotifyAlbumUpdateJob extends IEntityJob { - senderId: string; +export interface INotifyAlbumUpdateJob extends IEntityJob, IDelayedJob { + recipientIds: string[]; } export interface JobCounts { @@ -293,6 +298,7 @@ export enum JobStatus { export type JobHandler = (data: T) => Promise; export type JobItemHandler = (item: JobItem) => Promise; +export type DataTransformer = (fromExistingJob: any, fromEnqueueingJob: any) => any; export const IJobRepository = 'IJobRepository'; @@ -310,4 +316,5 @@ export interface IJobRepository { getQueueStatus(name: QueueName): Promise; getJobCounts(name: QueueName): Promise; waitForQueueCompletion(...queues: QueueName[]): Promise; + removeJob(jobId: string, name: JobName): Promise; } diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 3f154ee01615a1..eb68f82668baf3 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -7,6 +7,7 @@ import { CronJob, CronTime } from 'cron'; import { setTimeout } from 'node:timers/promises'; import { bullConfig } from 'src/config'; import { + IEntityJob, IJobRepository, JobCounts, JobItem, @@ -249,24 +250,45 @@ export class JobRepository implements IJobRepository { } private getJobOptions(item: JobItem): JobsOptions | null { + let opts = {}; switch (item.name) { + case JobName.NOTIFY_ALBUM_UPDATE: { + opts = { jobId: item.data.id, delay: item.data?.delay }; + break; + } case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: { - return { jobId: item.data.id }; + opts = { jobId: item.data.id }; + break; } case JobName.GENERATE_PERSON_THUMBNAIL: { - return { priority: 1 }; + opts = { priority: 1 }; + break; } case JobName.QUEUE_FACIAL_RECOGNITION: { - return { jobId: JobName.QUEUE_FACIAL_RECOGNITION }; - } - - default: { - return null; + opts = { jobId: JobName.QUEUE_FACIAL_RECOGNITION }; + break; } } + return opts; } private getQueue(queue: QueueName): Queue { return this.moduleReference.get(getQueueToken(queue), { strict: false }); } + + public async removeJob(jobId: string, name: JobName): Promise { + const existingJob = await this.getQueue(JOBS_TO_QUEUE[name]).getJob(jobId); + if (!existingJob) { + return undefined; + } + try { + await existingJob.remove(); + } catch (error: any) { + if (error.message?.includes('Missing key for job')) { + return undefined; + } + throw error; + } + return existingJob.data; + } } diff --git a/server/src/services/album.service.spec.ts b/server/src/services/album.service.spec.ts index 33c8f5dd7f624f..39d2271fe135ae 100644 --- a/server/src/services/album.service.spec.ts +++ b/server/src/services/album.service.spec.ts @@ -539,7 +539,7 @@ describe(AlbumService.name, () => { expect(albumMock.addAssetIds).toHaveBeenCalledWith('album-123', ['asset-1', 'asset-2', 'asset-3']); expect(eventMock.emit).toHaveBeenCalledWith('album.update', { id: 'album-123', - updatedBy: authStub.admin.user.id, + recipientIds: [], }); }); @@ -583,7 +583,7 @@ describe(AlbumService.name, () => { expect(albumMock.addAssetIds).toHaveBeenCalledWith('album-123', ['asset-1', 'asset-2', 'asset-3']); expect(eventMock.emit).toHaveBeenCalledWith('album.update', { id: 'album-123', - updatedBy: authStub.user1.user.id, + recipientIds: ['admin_id'], }); }); diff --git a/server/src/services/album.service.ts b/server/src/services/album.service.ts index e8acce9b6c878b..bbeefc65bc01eb 100644 --- a/server/src/services/album.service.ts +++ b/server/src/services/album.service.ts @@ -174,7 +174,13 @@ export class AlbumService extends BaseService { albumThumbnailAssetId: album.albumThumbnailAssetId ?? firstNewAssetId, }); - await this.eventRepository.emit('album.update', { id, updatedBy: auth.user.id }); + const allUsersExceptUs = [...album.albumUsers.map((au) => au.user.id), album.owner.id].filter( + (userId) => userId !== auth.user.id, + ); + + if (allUsersExceptUs.length > 0) { + await this.eventRepository.emit('album.update', { id, recipientIds: allUsersExceptUs }); + } } return results; diff --git a/server/src/services/notification.service.spec.ts b/server/src/services/notification.service.spec.ts index 028e512b3968dd..c5faec1cb87b6a 100644 --- a/server/src/services/notification.service.spec.ts +++ b/server/src/services/notification.service.spec.ts @@ -170,10 +170,10 @@ describe(NotificationService.name, () => { describe('onAlbumUpdateEvent', () => { it('should queue notify album update event', async () => { - await sut.onAlbumUpdate({ id: '', updatedBy: '42' }); + await sut.onAlbumUpdate({ id: 'album', recipientIds: ['42'] }); expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.NOTIFY_ALBUM_UPDATE, - data: { id: '', senderId: '42' }, + data: { id: 'album', recipientIds: ['42'], delay: 300_000 }, }); }); }); @@ -512,34 +512,17 @@ describe(NotificationService.name, () => { describe('handleAlbumUpdate', () => { it('should skip if album could not be found', async () => { - await expect(sut.handleAlbumUpdate({ id: '', senderId: '' })).resolves.toBe(JobStatus.SKIPPED); + await expect(sut.handleAlbumUpdate({ id: '', recipientIds: ['1'] })).resolves.toBe(JobStatus.SKIPPED); expect(userMock.get).not.toHaveBeenCalled(); }); it('should skip if owner could not be found', async () => { albumMock.getById.mockResolvedValue(albumStub.emptyWithValidThumbnail); - await expect(sut.handleAlbumUpdate({ id: '', senderId: '' })).resolves.toBe(JobStatus.SKIPPED); + await expect(sut.handleAlbumUpdate({ id: '', recipientIds: ['1'] })).resolves.toBe(JobStatus.SKIPPED); expect(systemMock.get).not.toHaveBeenCalled(); }); - it('should filter out the sender', async () => { - albumMock.getById.mockResolvedValue({ - ...albumStub.emptyWithValidThumbnail, - albumUsers: [ - { user: { id: userStub.user1.id } } as AlbumUserEntity, - { user: { id: userStub.user2.id } } as AlbumUserEntity, - ], - }); - userMock.get.mockResolvedValue(userStub.user1); - notificationMock.renderEmail.mockResolvedValue({ html: '', text: '' }); - - await sut.handleAlbumUpdate({ id: '', senderId: userStub.user1.id }); - expect(userMock.get).not.toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); - expect(userMock.get).toHaveBeenCalledWith(userStub.user2.id, { withDeleted: false }); - expect(notificationMock.renderEmail).toHaveBeenCalledOnce(); - }); - it('should skip recipient that could not be looked up', async () => { albumMock.getById.mockResolvedValue({ ...albumStub.emptyWithValidThumbnail, @@ -548,7 +531,7 @@ describe(NotificationService.name, () => { userMock.get.mockResolvedValueOnce(userStub.user1); notificationMock.renderEmail.mockResolvedValue({ html: '', text: '' }); - await sut.handleAlbumUpdate({ id: '', senderId: '' }); + await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); expect(userMock.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(notificationMock.renderEmail).not.toHaveBeenCalled(); }); @@ -571,7 +554,7 @@ describe(NotificationService.name, () => { }); notificationMock.renderEmail.mockResolvedValue({ html: '', text: '' }); - await sut.handleAlbumUpdate({ id: '', senderId: '' }); + await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); expect(userMock.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(notificationMock.renderEmail).not.toHaveBeenCalled(); }); @@ -594,7 +577,7 @@ describe(NotificationService.name, () => { }); notificationMock.renderEmail.mockResolvedValue({ html: '', text: '' }); - await sut.handleAlbumUpdate({ id: '', senderId: '' }); + await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); expect(userMock.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(notificationMock.renderEmail).not.toHaveBeenCalled(); }); @@ -607,11 +590,25 @@ describe(NotificationService.name, () => { userMock.get.mockResolvedValue(userStub.user1); notificationMock.renderEmail.mockResolvedValue({ html: '', text: '' }); - await sut.handleAlbumUpdate({ id: '', senderId: '' }); + await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); expect(userMock.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(notificationMock.renderEmail).toHaveBeenCalled(); expect(jobMock.queue).toHaveBeenCalled(); }); + + it('should add new recipients for new images if job is already queued', async () => { + // @ts-expect-error needed to force a INotifyAlbumUpdateJob + jobMock.removeJob.mockResolvedValue({ id: '1', recipientIds: ['2', '3', '4'] }); + await sut.onAlbumUpdate({ id: '1', recipientIds: ['1', '2', '3'] }); + expect(jobMock.queue).toHaveBeenCalledWith({ + name: JobName.NOTIFY_ALBUM_UPDATE, + data: { + id: '1', + delay: 300_000, + recipientIds: ['1', '2', '3', '4'], + }, + }); + }); }); describe('handleSendEmail', () => { diff --git a/server/src/services/notification.service.ts b/server/src/services/notification.service.ts index f6b338d79e716a..29291079390bb3 100644 --- a/server/src/services/notification.service.ts +++ b/server/src/services/notification.service.ts @@ -6,9 +6,11 @@ import { AlbumEntity } from 'src/entities/album.entity'; import { ArgOf } from 'src/interfaces/event.interface'; import { IEmailJob, + IEntityJob, INotifyAlbumInviteJob, INotifyAlbumUpdateJob, INotifySignupJob, + JobItem, JobName, JobStatus, } from 'src/interfaces/job.interface'; @@ -21,6 +23,8 @@ import { getPreferences } from 'src/utils/preferences'; @Injectable() export class NotificationService extends BaseService { + private static albumUpdateEmailDelayMs = 300_000; + @OnEvent({ name: 'config.update' }) onConfigUpdate({ oldConfig, newConfig }: ArgOf<'config.update'>) { this.eventRepository.clientBroadcast('on_config_update'); @@ -100,8 +104,30 @@ export class NotificationService extends BaseService { } @OnEvent({ name: 'album.update' }) - async onAlbumUpdate({ id, updatedBy }: ArgOf<'album.update'>) { - await this.jobRepository.queue({ name: JobName.NOTIFY_ALBUM_UPDATE, data: { id, senderId: updatedBy } }); + async onAlbumUpdate({ id, recipientIds }: ArgOf<'album.update'>) { + // if recipientIds is empty, album likely only has one user part of it, don't queue notification if so + if (recipientIds.length == 0) { + return; + } + + const job: JobItem = { + name: JobName.NOTIFY_ALBUM_UPDATE, + data: { id, recipientIds, delay: NotificationService.albumUpdateEmailDelayMs }, + }; + + const previousJobData = await this.jobRepository.removeJob(id, JobName.NOTIFY_ALBUM_UPDATE); + if (previousJobData && this.isAlbumUpdateJob(previousJobData)) { + for (const id of previousJobData.recipientIds) { + if (!recipientIds.includes(id)) { + recipientIds.push(id); + } + } + } + await this.jobRepository.queue(job); + } + + private isAlbumUpdateJob(job: IEntityJob): job is INotifyAlbumUpdateJob { + return 'recipientIds' in job; } @OnEvent({ name: 'album.invite' }) @@ -225,7 +251,7 @@ export class NotificationService extends BaseService { return JobStatus.SUCCESS; } - async handleAlbumUpdate({ id, senderId }: INotifyAlbumUpdateJob) { + async handleAlbumUpdate({ id, recipientIds }: INotifyAlbumUpdateJob) { const album = await this.albumRepository.getById(id, { withAssets: false }); if (!album) { @@ -237,7 +263,9 @@ export class NotificationService extends BaseService { return JobStatus.SKIPPED; } - const recipients = [...album.albumUsers.map((user) => user.user), owner].filter((user) => user.id !== senderId); + const recipients = [...album.albumUsers.map((user) => user.user), owner].filter((user) => + recipientIds.includes(user.id), + ); const attachment = await this.getAlbumThumbnailAttachment(album); const { server } = await this.getConfig({ withCache: false }); diff --git a/server/test/fixtures/user.stub.ts b/server/test/fixtures/user.stub.ts index b65cd6b3958223..9553b5344a492f 100644 --- a/server/test/fixtures/user.stub.ts +++ b/server/test/fixtures/user.stub.ts @@ -7,6 +7,7 @@ export const userStub = { ...authStub.admin.user, password: 'admin_password', name: 'admin_name', + id: 'admin_id', storageLabel: 'admin', oauthId: '', shouldChangePassword: false, diff --git a/server/test/repositories/job.repository.mock.ts b/server/test/repositories/job.repository.mock.ts index 871801830a81dd..cfa1826dd88096 100644 --- a/server/test/repositories/job.repository.mock.ts +++ b/server/test/repositories/job.repository.mock.ts @@ -16,5 +16,6 @@ export const newJobRepositoryMock = (): Mocked => { getJobCounts: vitest.fn(), clear: vitest.fn(), waitForQueueCompletion: vitest.fn(), + removeJob: vitest.fn(), }; };