From 40fa47ce987240fbefb3022e0e447e9df0a989fd Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Tue, 23 Jul 2024 09:21:43 +0900 Subject: [PATCH] Fix promise leak when using streaming with long-running actors --- .../main/scala/com/devsisters/shardcake/Sharding.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala index 4467a0c..f08ea37 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala @@ -214,8 +214,12 @@ class Sharding private ( }) private[shardcake] def initReply(id: String, replyChannel: ReplyChannel[Nothing]): UIO[Unit] = - replyChannels.update(_.updated(id, replyChannel)) <* - replyChannel.await.ensuring(replyChannels.update(_ - id)).forkDaemon + replyChannels + .getAndUpdate(_.updated(id, replyChannel)) + .flatMap(beforeReplyChannels => + replyChannel.await.ensuring(replyChannels.update(_ - id)).forkDaemon.unless(beforeReplyChannels.contains(id)) + ) + .unit def reply[Reply](reply: Reply, replier: Replier[Reply]): UIO[Unit] = replyChannels