diff --git a/src/main/java/org/jitsi/jigasi/transcription/AbstractTranscriptPublisher.java b/src/main/java/org/jitsi/jigasi/transcription/AbstractTranscriptPublisher.java index b3bcfad8..34efd36a 100644 --- a/src/main/java/org/jitsi/jigasi/transcription/AbstractTranscriptPublisher.java +++ b/src/main/java/org/jitsi/jigasi/transcription/AbstractTranscriptPublisher.java @@ -21,11 +21,13 @@ import net.java.sip.communicator.service.protocol.*; import net.java.sip.communicator.service.protocol.Message; import org.jitsi.jigasi.*; +import org.jitsi.jigasi.util.Util; import org.jitsi.service.libjitsi.*; import org.jitsi.service.neomedia.*; import org.jitsi.service.neomedia.device.*; import org.jitsi.service.neomedia.recording.*; import org.jitsi.utils.logging.*; +import org.jitsi.utils.queue.*; import java.io.*; import java.nio.charset.*; @@ -157,14 +159,29 @@ public abstract class AbstractTranscriptPublisher = Logger.getLogger(AbstractTranscriptPublisher.class); /** - * Aspect for successful upload of transcript + * A queue used to offload xmpp message sending in a new thread to avoid blocking. */ - private static final String DD_ASPECT_SUCCESS = "upload_success"; + private static final PacketQueue xmppInvokeQueue = new PacketQueue<>( + Integer.MAX_VALUE, + false, + "xmpp-transcript-publisher", + r -> { + // do process and try + try + { + r.run(); - /** - * Aspect for failed upload of transcript - */ - private static final String DD_ASPECT_FAIL = "upload_fail"; + return true; + } + catch (Throwable e) + { + logger.error("Error processing xmpp queue item", e); + + return false; + } + }, + Util.createNewThreadPool("transcript-publisher-executor-pool") + ); /** * Get a string which contains a time stamp and a random UUID, with an @@ -194,6 +211,11 @@ protected static String generateHardToGuessTimeString(String prefix, * @param message the message to send */ protected void sendMessage(ChatRoom chatRoom, T message) + { + xmppInvokeQueue.add(() -> sendMessageInternal(chatRoom, message)); + } + + private void sendMessageInternal(ChatRoom chatRoom, T message) { if (chatRoom == null) { @@ -201,6 +223,15 @@ protected void sendMessage(ChatRoom chatRoom, T message) return; } + if (!chatRoom.isJoined()) + { + if (logger.isDebugEnabled()) + { + logger.debug("Skip sending message to room which we left!"); + } + return; + } + String messageString = message.toString(); Message chatRoomMessage = chatRoom.createMessage(messageString); try @@ -222,6 +253,11 @@ protected void sendMessage(ChatRoom chatRoom, T message) * @param jsonMessage the json message to send */ protected void sendJsonMessage(ChatRoom chatRoom, T jsonMessage) + { + xmppInvokeQueue.add(() -> sendJsonMessageInternal(chatRoom, jsonMessage)); + } + + private void sendJsonMessageInternal(ChatRoom chatRoom, T jsonMessage) { if (chatRoom == null) {