diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e54b76514..ee5c20b0a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -73,7 +73,7 @@ jobs: run: | export RELEASE_VERSION="$(mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec)" echo $RELEASE_VERSION - mvn clean org.jacoco:jacoco-maven-plugin:prepare-agent package -Dtest=!*/integration/* -Dorg.bytedeco.javacpp.logger.debug=false org.jacoco:jacoco-maven-plugin:report sonar:sonar -Dmaven.javadoc.skip=true --quiet + mvn clean org.jacoco:jacoco-maven-plugin:prepare-agent package -Dtest=!*/integration/* -Dorg.bytedeco.javacpp.logger.debug=false org.jacoco:jacoco-maven-plugin:report sonar:sonar -Dmaven.javadoc.skip=true - name: Show MongoDB Log, Crash Log and Servis Status on failure if: failure() diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index c1d357432..fd3d203dc 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -29,6 +29,8 @@ import java.util.Set; import java.util.concurrent.*; +import javax.annotation.Nonnull; + import io.antmedia.filter.JWTFilter; import io.antmedia.filter.TokenFilterManager; import io.antmedia.statistic.*; @@ -106,10 +108,10 @@ import io.antmedia.webrtc.api.IWebRTCClient; import io.antmedia.websocket.WebSocketConstants; import io.vertx.core.Vertx; +import io.vertx.core.impl.ConcurrentHashSet; import io.vertx.core.json.JsonObject; import io.vertx.ext.dropwizard.MetricsService; import jakarta.validation.constraints.NotNull; -import org.springframework.web.context.WebApplicationContext; public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter implements IAntMediaStreamHandler, IShutdownListener { @@ -227,6 +229,8 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter private Random random = new Random(); private IStatsCollector statsCollector; + + private Set settingsUpdateListenerSet = new ConcurrentHashSet(); @Override public boolean appStart(IScope app) { @@ -698,7 +702,9 @@ public void closeBroadcast(String streamId) { } for (IStreamListener listener : streamListeners) { + //keep backward compatibility listener.streamFinished(broadcast.getStreamId()); + listener.streamFinished(broadcast); } logger.info("Leaving closeBroadcast for streamId:{}", streamId); } @@ -874,6 +880,7 @@ public void startPublish(String streamId, long absoluteStartTimeMs, String publi for (IStreamListener listener : streamListeners) { listener.streamStarted(broadcast.getStreamId()); + listener.streamStarted(broadcast); } long videoHeight = 0; @@ -908,7 +915,7 @@ public void startPublish(String streamId, long absoluteStartTimeMs, String publi } return null; - }); + }, false); if (absoluteStartTimeMs == 0) @@ -1014,47 +1021,65 @@ public static Broadcast saveUndefinedBroadcast(String streamId, String streamNam } @Override - public void muxingFinished(final String streamId, File file, long startTime, long duration, int resolution, String previewFilePath, String vodId) { - String vodName = file.getName(); - String filePath = file.getPath(); - long fileSize = file.length(); - long systemTime = System.currentTimeMillis(); + @Deprecated + public void muxingFinished(String streamId, File File, long startTime, long duration, int resolution, + String previewFilePath, String vodId) + { + muxingFinished(getDataStore().get(streamId), streamId, File, startTime, duration, resolution, previewFilePath, vodId); + } - String relativePath = getRelativePath(filePath); + @Override + public void muxingFinished(Broadcast broadcast, String streamId, File file, long startTime, long duration, int resolution, String previewFilePath, String vodId) { + String listenerHookURL = null; String streamName = file.getName(); + String description = null; + String metadata = null; + String longitude = null; + String latitude = null; + String altitude = null; - Broadcast broadcast = getDataStore().get(streamId); - - if(broadcast != null){ + if (broadcast != null) { listenerHookURL = broadcast.getListenerHookURL(); - if(broadcast.getName() != null){ + if(StringUtils.isNotBlank(broadcast.getName())){ streamName = resolution != 0 ? broadcast.getName() + " (" + resolution + "p)" : broadcast.getName(); } + description = broadcast.getDescription(); + metadata = broadcast.getMetaData(); + longitude = broadcast.getLongitude(); + latitude = broadcast.getLatitude(); + altitude = broadcast.getAltitude(); + } + else { + logger.error("Broadcast is null for muxingFinished for stream: {} it's not supposed to happen", streamId); } + + String vodName = file.getName(); + String filePath = file.getPath(); + long fileSize = file.length(); + long systemTime = System.currentTimeMillis(); + + String relativePath = getRelativePath(filePath); + + logger.info("muxing finished for stream: {} with file: {} and duration:{}", streamId, file, duration); //We need to get the webhook url explicitly because broadcast may be deleted here - if (listenerHookURL == null || listenerHookURL.isEmpty()) { + if (StringUtils.isBlank(listenerHookURL)) { // if hook URL is not defined for stream specific, then try to get common one from app listenerHookURL = appSettings.getListenerHookURL(); } - String vodIdFinal; - if (vodId != null) { - vodIdFinal = vodId; - } - else { - vodIdFinal = RandomStringUtils.randomAlphanumeric(24); + if (StringUtils.isBlank(vodId)) { + vodId = RandomStringUtils.randomAlphanumeric(24); } - VoD newVod = new VoD(streamName, streamId, relativePath, vodName, systemTime, startTime, duration, fileSize, VoD.STREAM_VOD, vodIdFinal, previewFilePath); - if(broadcast != null){ - newVod.setDescription(broadcast.getDescription()); - newVod.setMetadata(broadcast.getMetaData()); - newVod.setLongitude(broadcast.getLongitude()); - newVod.setLatitude(broadcast.getLatitude()); - newVod.setAltitude(broadcast.getAltitude()); - } + VoD newVod = new VoD(streamName, streamId, relativePath, vodName, systemTime, startTime, duration, fileSize, VoD.STREAM_VOD, vodId, previewFilePath); + newVod.setDescription(description); + newVod.setMetadata(metadata); + newVod.setLongitude(longitude); + newVod.setLatitude(latitude); + newVod.setAltitude(altitude); + if (getDataStore().addVod(newVod) == null) { @@ -1069,9 +1094,8 @@ public void muxingFinished(final String streamId, File file, long startTime, lon || ((index = vodName.lastIndexOf(".webm")) != -1) ) { final String baseName = vodName.substring(0, index); - final String metaData = (broadcast != null) ? broadcast.getMetaData() : null; logger.info("Setting timer for calling vod ready hook for stream:{}", streamId); - notifyHook(listenerHookURL, streamId, null, HOOK_ACTION_VOD_READY, null, null, baseName, vodIdFinal, metaData, null); + notifyHook(listenerHookURL, streamId, null, HOOK_ACTION_VOD_READY, null, null, baseName, vodId, metadata, null); } String muxerFinishScript = appSettings.getMuxerFinishScript(); @@ -1307,7 +1331,7 @@ public void trySendClusterPostWithDelay(String url, String clusterCommunicationT } return null; - }); + }, false); }); @@ -2109,9 +2133,22 @@ public synchronized boolean updateSettings(AppSettings newSettings, boolean noti else { logger.warn("Settings cannot be saved for {}", getScope().getName()); } + + notifySettingsUpdateListeners(appSettings); return result; } + + public void notifySettingsUpdateListeners(AppSettings appSettings) { + for (IAppSettingsUpdateListener listener : settingsUpdateListenerSet) { + listener.settingsUpdated(appSettings); + } + } + + @Override + public void addSettingsUpdateListener(IAppSettingsUpdateListener listener) { + settingsUpdateListenerSet.add(listener); + } private boolean isEncoderSettingsValid(List encoderSettingsList) { if (encoderSettingsList != null) { @@ -2251,6 +2288,7 @@ public void setStorageclientSettings(AppSettings settings) { storageClient.setPermission(settings.getS3Permission()); storageClient.setStorageClass(settings.getS3StorageClass()); storageClient.setCacheControl(settings.getS3CacheControl()); + storageClient.setTransferBufferSize(settings.getS3TransferBufferSizeInBytes()); storageClient.reset(); } @@ -2447,7 +2485,7 @@ public void stopPublish(String streamId) { vertx.executeBlocking(() -> { closeBroadcast(streamId); return null; - }); + }, false); } public boolean isClusterMode() { diff --git a/src/main/java/io/antmedia/AppSettings.java b/src/main/java/io/antmedia/AppSettings.java index f419d2d46..26403eed8 100644 --- a/src/main/java/io/antmedia/AppSettings.java +++ b/src/main/java/io/antmedia/AppSettings.java @@ -1806,6 +1806,7 @@ public class AppSettings implements Serializable{ public static final String APPLICATION_STATUS_INSTALLATION_FAILED = "installationFailed"; + /** * Describes the application installation status. Possible values: * @@ -2013,6 +2014,17 @@ public class AppSettings implements Serializable{ */ @Value("${s3Permission:${"+SETTINGS_S3_PERMISSION+":public-read}}") private String s3Permission = "public-read"; + + + /** + * S3 Transfer Buffer Size + * This describes to buffer size to keep transferring data. It should be + * bigger than ts segment file size for HLS continuous upload. + * Otherwise chunk update may cannot be retried in case of any network break. + */ + @Value("${s3TransferBufferSizeInBytes:10000000}") + private int s3TransferBufferSizeInBytes = 10000000; + /** * HLS Encryption key info file full path. @@ -4159,4 +4171,12 @@ public long getAppInstallationTime() { public void setAppInstallationTime(long appInstallationTime) { this.appInstallationTime = appInstallationTime; } + + public int getS3TransferBufferSizeInBytes() { + return s3TransferBufferSizeInBytes; + } + + public void setS3TransferBufferSizeInBytes(int s3TransferBufferSizeInBytes) { + this.s3TransferBufferSizeInBytes = s3TransferBufferSizeInBytes; + } } diff --git a/src/main/java/io/antmedia/console/AdminApplication.java b/src/main/java/io/antmedia/console/AdminApplication.java index 41cf4fa61..f4a46f4a5 100644 --- a/src/main/java/io/antmedia/console/AdminApplication.java +++ b/src/main/java/io/antmedia/console/AdminApplication.java @@ -372,7 +372,7 @@ public boolean createApplication(String appName, String warFileFullPath) { currentApplicationCreationProcesses.remove(appName); } return null; - }); + }, false); return success; diff --git a/src/main/java/io/antmedia/muxer/IAntMediaStreamHandler.java b/src/main/java/io/antmedia/muxer/IAntMediaStreamHandler.java index 5d1d4697f..b9c274517 100644 --- a/src/main/java/io/antmedia/muxer/IAntMediaStreamHandler.java +++ b/src/main/java/io/antmedia/muxer/IAntMediaStreamHandler.java @@ -6,6 +6,7 @@ import org.red5.server.api.scope.IScope; import io.antmedia.AppSettings; +import io.antmedia.IAppSettingsUpdateListener; import io.antmedia.datastore.db.DataStore; import io.antmedia.datastore.db.types.Broadcast; import io.antmedia.plugin.api.IFrameListener; @@ -44,10 +45,28 @@ public interface IAntMediaStreamHandler { * @param file video file that muxed is finished * @param duration of the video in milliseconds * @param resolution height of the video + * + * @Deprecated use {@link #muxingFinished(Broadcast, File, long, long, int, String, String)} because Broadcast object may be deleted when this method is called */ + @Deprecated public void muxingFinished(String id, File file, long startTime, long duration , int resolution, String path, String vodId); + /** + * Called by some muxer like MP4Muxer + * + * id actually is the name of the file however in some cases file name and the id may be different + * in some cases like there is already a file with that name + * + * @param broadcast object that muxed is finished + * @param streamId is the id of the stream + * @param file video file that muxed is finished + * @param duration of the video in milliseconds + * @param resolution height of the video + * + */ + public void muxingFinished(Broadcast broadcast, String streamId, File file, long startTime, long duration , int resolution, String path, String vodId); + /** * Update stream quality, speed and number of pending packet size and update time * in datastore @@ -229,4 +248,10 @@ public interface IAntMediaStreamHandler { */ public void notifyWebhookForStreamStatus(Broadcast broadcast, int width, int height, long totalByteReceived, int inputQueueSize, double speed); + + /** + * Add listener that is notified when the settings are updated + * @param listener + */ + public void addSettingsUpdateListener(IAppSettingsUpdateListener listener); } diff --git a/src/main/java/io/antmedia/muxer/Muxer.java b/src/main/java/io/antmedia/muxer/Muxer.java index a513e66ba..e1b6b799e 100644 --- a/src/main/java/io/antmedia/muxer/Muxer.java +++ b/src/main/java/io/antmedia/muxer/Muxer.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import io.antmedia.FFmpegUtilities; +import io.antmedia.datastore.db.DataStore; import io.antmedia.rest.RestServiceBase; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -58,6 +59,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.core.io.Resource; +import io.antmedia.AntMediaApplicationAdapter; import io.antmedia.AppSettings; import io.vertx.core.Vertx; import io.vertx.core.impl.ConcurrentHashSet; @@ -279,6 +281,8 @@ public long getOriginalFrameTimeMs() { private int videoCodecId; + private IAntMediaStreamHandler appInstance; + protected Muxer(Vertx vertx) { this.vertx = vertx; @@ -703,6 +707,15 @@ public AppSettings getAppSettings() { ApplicationContext appCtx = context.getApplicationContext(); return (AppSettings) appCtx.getBean(AppSettings.BEAN_NAME); } + + + public AntMediaApplicationAdapter getAppAdaptor() { + IContext context = scope.getContext(); + ApplicationContext appCtx = context.getApplicationContext(); + AntMediaApplicationAdapter adaptor = (AntMediaApplicationAdapter) appCtx.getBean(AntMediaApplicationAdapter.BEAN_NAME); + return adaptor; + } + public String getExtendedName(String name, int resolution, int bitrate, String fileNameFormat) { StringBuilder result = new StringBuilder(name); diff --git a/src/main/java/io/antmedia/muxer/RecordMuxer.java b/src/main/java/io/antmedia/muxer/RecordMuxer.java index 872f2a95a..2fa6b7c80 100644 --- a/src/main/java/io/antmedia/muxer/RecordMuxer.java +++ b/src/main/java/io/antmedia/muxer/RecordMuxer.java @@ -9,6 +9,7 @@ import java.nio.file.Files; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.bytedeco.ffmpeg.avformat.AVFormatContext; import org.bytedeco.ffmpeg.avformat.AVStream; import org.red5.server.api.IContext; @@ -17,6 +18,7 @@ import io.antmedia.AntMediaApplicationAdapter; import io.antmedia.AppSettings; +import io.antmedia.datastore.db.types.Broadcast; import io.antmedia.storage.StorageClient; import io.vertx.core.Vertx; @@ -25,7 +27,7 @@ public abstract class RecordMuxer extends Muxer { protected File fileTmp; protected StorageClient storageClient = null; protected int resolution; - + protected boolean uploadMP4ToS3 = true; protected String previewPath; @@ -39,7 +41,7 @@ public abstract class RecordMuxer extends Muxer { * It will be define when record muxer is called by anywhere */ private long startTime = 0; - + private String vodId; @@ -50,7 +52,7 @@ protected RecordMuxer(StorageClient storageClient, Vertx vertx, String s3FolderP firstAudioDts = -1; firstVideoDts = -1; firstKeyFrameReceived = false; - + } protected int[] SUPPORTED_CODECS; @@ -106,13 +108,13 @@ protected boolean prepareAudioOutStream(AVStream inStream, AVStream outStream) { } return true; } - - + + @Override public String getOutputURL() { return fileTmp.getAbsolutePath(); } - + public void setPreviewPath(String path){ this.previewPath = path; } @@ -125,6 +127,13 @@ public synchronized void writeTrailer() { super.writeTrailer(); + if (fileTmp == null || !fileTmp.exists()) { + + logger.error("MP4 temp file does not exist. Streaming is likely not started for streamId:{}", streamId); + return; + } + + Broadcast broadcast = getAppAdaptor().getDataStore().get(streamId); vertx.executeBlocking(()->{ try { @@ -137,7 +146,7 @@ public synchronized void writeTrailer() { finalizeRecordFile(f); - adaptor.muxingFinished(streamId, f, startTime, getDurationInMs(f,streamId), resolution, previewPath, vodId); + adaptor.muxingFinished(broadcast, streamId, f, startTime, getDurationInMs(f,streamId), resolution, previewPath, vodId); logger.info("File: {} exist: {}", fileTmp.getAbsolutePath(), fileTmp.exists()); @@ -151,22 +160,18 @@ public synchronized void writeTrailer() { saveToStorage(s3FolderPath + File.separator + (subFolder != null ? subFolder + File.separator : "" ), f, f.getName(), storageClient); } + } catch (Exception e) { - logger.error(e.getMessage()); + logger.error(ExceptionUtils.getStackTrace(e)); } return null; - }); + }, false); } - public AntMediaApplicationAdapter getAppAdaptor() { - IContext context = RecordMuxer.this.scope.getContext(); - ApplicationContext appCtx = context.getApplicationContext(); - AntMediaApplicationAdapter adaptor = (AntMediaApplicationAdapter) appCtx.getBean(AntMediaApplicationAdapter.BEAN_NAME); - return adaptor; - } - + + public static String getS3Prefix(String s3FolderPath, String subFolder) { return replaceDoubleSlashesWithSingleSlash(s3FolderPath + File.separator + (subFolder != null ? subFolder : "" ) + File.separator); } @@ -177,7 +182,7 @@ public File getFinalFileName(boolean isS3Enabled) String origFileName = absolutePath.replace(TEMP_EXTENSION, ""); String prefix = getS3Prefix(s3FolderPath, subFolder); - + String fileName = getFile().getName(); File f = new File(origFileName); @@ -216,7 +221,7 @@ protected void finalizeRecordFile(final File file) throws IOException { } - + public boolean isUploadingToS3(){return uploadMP4ToS3;} diff --git a/src/main/java/io/antmedia/muxer/RtmpMuxer.java b/src/main/java/io/antmedia/muxer/RtmpMuxer.java index 94966cd9f..dd537e0e8 100644 --- a/src/main/java/io/antmedia/muxer/RtmpMuxer.java +++ b/src/main/java/io/antmedia/muxer/RtmpMuxer.java @@ -145,15 +145,14 @@ public synchronized boolean prepareIO() //if there is a stream in the output format context, try to push if (getOutputFormatContext().nb_streams() > 0) { - this.vertx.executeBlocking(b -> - { + this.vertx.executeBlocking(() -> { if (openIO()) { if (bsfFilterContextList.isEmpty()) { writeHeader(); - return; + return null; } isRunning.set(true); setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING); @@ -164,7 +163,11 @@ public synchronized boolean prepareIO() setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_FAILED); logger.error("Cannot initializeOutputFormatContextIO for rtmp endpoint:{}", url); } - }, null); + + return null; + + + }, false); result = true; } diff --git a/src/main/java/io/antmedia/plugin/api/IStreamListener.java b/src/main/java/io/antmedia/plugin/api/IStreamListener.java index 5953e29c9..4fb3bd463 100644 --- a/src/main/java/io/antmedia/plugin/api/IStreamListener.java +++ b/src/main/java/io/antmedia/plugin/api/IStreamListener.java @@ -1,29 +1,55 @@ package io.antmedia.plugin.api; +import io.antmedia.datastore.db.types.Broadcast; + /* * Interface class to inform the plugins with stream start/start event. */ public interface IStreamListener { - /* + /** * AMS inform the plugins when a stream is started with this method. * @param streamId is the id of the stream + * + * @Deprecated use {@link #streamStarted(Broadcast)} because Broadcast object may be deleted when this method is called */ + @Deprecated (since="3.0", forRemoval = true) public void streamStarted(String streamId); - /* + /** + * AMS inform the plugins when a stream is started with this method. + * @param broadcast is the the broadcast object of the stream + * + */ + public default void streamStarted(Broadcast broadcast) { + //do nothing + } + + /** * AMS inform the plugins when a stream is finished with this method. * @param streamId is the id of the stream + * + * @Deprecated use {@link #streamFinished(Broadcast)} because Broadcast object may be deleted when this method is called */ + @Deprecated (since="3.0", forRemoval = true) public void streamFinished(String streamId); - /* + /** + * AMS inform the plugins when a stream is finished with this method. + * @param broadcast is the broadcast object of the stream + * + * The default implementation does nothing in order to avoid breaking the existing plugins. + */ + public default void streamFinished(Broadcast broadcast) { + //do nothing + } + /** * AMS inform the plugins when a new participant joins to the conference room * @param roomId is the id of the conference room * @param streamId is the id of new stream */ public void joinedTheRoom(String roomId, String streamId); - /* + /** * AMS inform the plugins when a participant leaves from the conference room * @param rroomId is the id of the conference room * @param streamId is the id of new stream diff --git a/src/main/java/io/antmedia/servlet/UploadHLSChunk.java b/src/main/java/io/antmedia/servlet/UploadHLSChunk.java index 75d821cfa..eec032e86 100644 --- a/src/main/java/io/antmedia/servlet/UploadHLSChunk.java +++ b/src/main/java/io/antmedia/servlet/UploadHLSChunk.java @@ -119,6 +119,7 @@ public void uploadHLSChunk(StorageClient storageClient, ConfigurableWebApplicati String s3FileKey = getS3Key(req, appSettings); + //TODO: we overwrite progressListener for the ongoing upload here. This may make logs misleading. storageClient.setProgressListener(event -> { if (event.getEventType() == ProgressEventType.TRANSFER_FAILED_EVENT) { diff --git a/src/main/java/io/antmedia/statistic/StatsCollector.java b/src/main/java/io/antmedia/statistic/StatsCollector.java index 0b034b25b..e63198deb 100644 --- a/src/main/java/io/antmedia/statistic/StatsCollector.java +++ b/src/main/java/io/antmedia/statistic/StatsCollector.java @@ -440,11 +440,11 @@ public int getWebRTCVertxWorkerQueueSize() { private void sendWebRTCClientStats() { getVertx().executeBlocking( - b -> { + () -> { collectAndSendWebRTCClientsStats(); - b.complete(); + return null; }, - null); + false); } diff --git a/src/main/java/io/antmedia/storage/AmazonS3StorageClient.java b/src/main/java/io/antmedia/storage/AmazonS3StorageClient.java index 3a6f38933..da5ff29c6 100644 --- a/src/main/java/io/antmedia/storage/AmazonS3StorageClient.java +++ b/src/main/java/io/antmedia/storage/AmazonS3StorageClient.java @@ -154,7 +154,9 @@ public void save(String key, File file, InputStream inputStream, boolean deleteL { putRequest = new PutObjectRequest(getStorageName(), key, inputStream, metadata); } - + + putRequest.getRequestClientOptions().setReadLimit(getTransferBufferSize()); + putRequest.setCannedAcl(getCannedAcl()); if(checkStorageClass(getStorageClass())){ @@ -187,7 +189,7 @@ public void save(String key, File file, InputStream inputStream, boolean deleteL } } - private void listenUploadProgress(String key, File file, boolean deleteLocalFile, Upload upload) { + public void listenUploadProgress(String key, File file, boolean deleteLocalFile, Upload upload) { upload.addProgressListener((ProgressListener)event -> { if (event.getEventType() == ProgressEventType.TRANSFER_FAILED_EVENT) diff --git a/src/main/java/io/antmedia/storage/StorageClient.java b/src/main/java/io/antmedia/storage/StorageClient.java index b46d80f44..66abe356d 100644 --- a/src/main/java/io/antmedia/storage/StorageClient.java +++ b/src/main/java/io/antmedia/storage/StorageClient.java @@ -55,7 +55,8 @@ public abstract class StorageClient { protected ProgressListener progressListener; - + private int transferBufferSize; + /** * Delete file from storage * @@ -180,4 +181,13 @@ public String getCacheControl() { public void setCacheControl(String cacheControl) { this.cacheControl = cacheControl; } + + public int getTransferBufferSize() { + return transferBufferSize; + } + + + public void setTransferBufferSize(int transferBufferSize) { + this.transferBufferSize = transferBufferSize; + } } diff --git a/src/main/java/io/antmedia/streamsource/StreamFetcher.java b/src/main/java/io/antmedia/streamsource/StreamFetcher.java index 14f6e2d5a..eea1cf4a8 100644 --- a/src/main/java/io/antmedia/streamsource/StreamFetcher.java +++ b/src/main/java/io/antmedia/streamsource/StreamFetcher.java @@ -564,6 +564,8 @@ public void packetRead(AVPacket pkt) av_packet_ref(packet, pkt); bufferQueue.add(packet); + logger.debug("packet/ref packet dts: {}/{} pts:{}/{} streamId:{}", pkt.dts(), packet.dts(), pkt.pts(), + packet.pts(), streamId); } else { diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index 7b386d302..7495b9682 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -73,6 +73,7 @@ import io.antmedia.AntMediaApplicationAdapter; import io.antmedia.AppSettings; +import io.antmedia.IAppSettingsUpdateListener; import io.antmedia.cluster.ClusterNode; import io.antmedia.cluster.IClusterNotifier; import io.antmedia.cluster.IClusterStore; @@ -415,10 +416,16 @@ public void testAppSettings() verify(clusterNotifier, times(1)).getClusterStore(); verify(clusterStore, times(1)).saveSettings(settings); + IAppSettingsUpdateListener settingsListener = mock(IAppSettingsUpdateListener.class); + spyAdapter.addSettingsUpdateListener(settingsListener); + spyAdapter.updateSettings(newSettings, false, false); //it should not change times(1) because we don't want it to update the datastore verify(clusterNotifier, times(1)).getClusterStore(); verify(clusterStore, times(1)).saveSettings(settings); + + //make sure settingsUpdated is called + verify(settingsListener, times(1)).settingsUpdated(settings); settings.setUpdateTime(900); newSettings.setUpdateTime(900); @@ -641,7 +648,7 @@ public void testSynchUserVoD() { } @Test - public void testMuxingFinishedWithPreview(){ + public void testMuxingFinishedWithPreview() throws Exception{ AppSettings appSettings = new AppSettings(); appSettings.setGeneratePreview(true); appSettings.setMuxerFinishScript("src/test/resources/echo.sh"); @@ -653,6 +660,9 @@ public void testMuxingFinishedWithPreview(){ DataStoreFactory dsf = Mockito.mock(DataStoreFactory.class); Mockito.when(dsf.getDataStore()).thenReturn(dataStore); adapter.setDataStoreFactory(dsf); + Broadcast broadcast = new Broadcast(); + broadcast.setStreamId("streamId"); + dataStore.save(broadcast); adapter.setVertx(vertx); @@ -662,7 +672,7 @@ public void testMuxingFinishedWithPreview(){ assertFalse(f.exists()); - adapter.muxingFinished("streamId", anyFile, 0, 100, 480, "src/test/resources/preview.png", null); + adapter.muxingFinished(broadcast, "streamId", anyFile, 0, 100, 480, "src/test/resources/preview.png", null); await().atMost(5, TimeUnit.SECONDS).until(()-> f.exists()); @@ -676,7 +686,7 @@ public void testMuxingFinishedWithPreview(){ } @Test - public void testMuxingFinished() { + public void testMuxingFinished() throws Exception { AppSettings appSettings = new AppSettings(); appSettings.setMuxerFinishScript("src/test/resources/echo.sh"); @@ -688,16 +698,19 @@ public void testMuxingFinished() { DataStoreFactory dsf = Mockito.mock(DataStoreFactory.class); Mockito.when(dsf.getDataStore()).thenReturn(dataStore); adapter.setDataStoreFactory(dsf); + + Broadcast broadcast = new Broadcast(); + broadcast.setStreamId("streamId"); + dataStore.save(broadcast); adapter.setVertx(vertx); File anyFile = new File("src/test/resources/sample_MP4_480.mp4"); { - assertFalse(f.exists()); - adapter.muxingFinished("streamId", anyFile, 0, 100, 480, null, null); + adapter.muxingFinished(broadcast, broadcast.getStreamId(), anyFile, 0, 100, 480, null, null); await().atMost(5, TimeUnit.SECONDS).until(()-> f.exists()); @@ -715,7 +728,7 @@ public void testMuxingFinished() { assertFalse(f.exists()); - adapter.muxingFinished("streamId", anyFile, 0, 100, 480, "", null); + adapter.muxingFinished(broadcast, broadcast.getStreamId(), anyFile, 0, 100, 480, "", null); await().pollDelay(3, TimeUnit.SECONDS).atMost(4, TimeUnit.SECONDS).until(()-> !f.exists()); } @@ -1121,7 +1134,6 @@ public void testNotifyHookFromMuxingFinished() { * So, no hook is posted */ - ArgumentCaptor captureUrl = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureId = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureMainTrackId = ArgumentCaptor.forClass(String.class); @@ -1137,7 +1149,7 @@ public void testNotifyHookFromMuxingFinished() { //call muxingFinished function - spyAdaptor.muxingFinished(streamId, anyFile, 0, 100, 480, null, null); + spyAdaptor.muxingFinished(broadcast, broadcast.getStreamId(), anyFile, 0, 100, 480, null, null); //verify that notifyHook is never called verify(spyAdaptor, never()).notifyHook(captureUrl.capture(), captureId.capture(), captureMainTrackId.capture(), captureAction.capture(), @@ -1162,7 +1174,7 @@ public void testNotifyHookFromMuxingFinished() { dataStore.updateBroadcastFields(streamId, update); //call muxingFinished function - spyAdaptor.muxingFinished(streamId, anyFile, 0, 100, 480, null, null); + spyAdaptor.muxingFinished(broadcast, broadcast.getStreamId(), anyFile, 0, 100, 480, null, null); await().atMost(10, TimeUnit.SECONDS).until(()-> { boolean called = false; @@ -1196,7 +1208,7 @@ public void testNotifyHookFromMuxingFinished() { dataStore.delete(streamId); //call muxingFinished function - spyAdaptor.muxingFinished(streamId, anyFile, 0, 100, 480, null, null); + spyAdaptor.muxingFinished("streamId", anyFile, 0, 100, 480, null, null); await().atMost(10, TimeUnit.SECONDS).until(()-> { boolean called = false; @@ -1225,7 +1237,7 @@ public void testNotifyHookFromMuxingFinished() { appSettings.setListenerHookURL("listenerHookURL"); //call muxingFinished function - spyAdaptor.muxingFinished(streamId, anyFile, 0, 100, 480, null, null); + spyAdaptor.muxingFinished(broadcast, broadcast.getStreamId(), anyFile, 0, 100, 480, null, null); await().atMost(10, TimeUnit.SECONDS).until(()-> { boolean called = false; diff --git a/src/test/java/io/antmedia/test/AppSettingsUnitTest.java b/src/test/java/io/antmedia/test/AppSettingsUnitTest.java index bf183a1c7..1f7dd9a0c 100644 --- a/src/test/java/io/antmedia/test/AppSettingsUnitTest.java +++ b/src/test/java/io/antmedia/test/AppSettingsUnitTest.java @@ -36,6 +36,7 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; +import com.amazonaws.RequestClientOptions; import com.google.gson.Gson; import io.antmedia.AntMediaApplicationAdapter; @@ -656,13 +657,18 @@ public void testUnsetAppSettings(AppSettings appSettings) { appSettings.setAppInstallationTime(100); assertEquals(100, appSettings.getAppInstallationTime()); + + + assertEquals(10000000, appSettings.getS3TransferBufferSizeInBytes()); + appSettings.setS3TransferBufferSizeInBytes(50000); + assertEquals(50000, appSettings.getS3TransferBufferSizeInBytes()); //if we add a new field, we just need to check its default value in this test //When a new field is added or removed please update the number of fields and make this test pass //by also checking its default value. assertEquals("New field is added to settings. PAY ATTENTION: Please CHECK ITS DEFAULT VALUE and fix the number of fields.", - 197, numberOfFields); + 198, numberOfFields); } diff --git a/src/test/java/io/antmedia/test/Application.java b/src/test/java/io/antmedia/test/Application.java index 0b31db9e5..f95f3f702 100644 --- a/src/test/java/io/antmedia/test/Application.java +++ b/src/test/java/io/antmedia/test/Application.java @@ -5,6 +5,7 @@ import java.util.List; import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.datastore.db.types.Broadcast; import io.antmedia.muxer.IAntMediaStreamHandler; public class Application extends AntMediaApplicationAdapter implements IAntMediaStreamHandler { @@ -26,14 +27,16 @@ public class Application extends AntMediaApplicationAdapter implements IAntMedia + @Override - public void muxingFinished(String id, File file, long startTime, long duration, int resolution, String previewPath, String vodId) { - super.muxingFinished(id, file, startTime, duration, resolution, previewPath, vodId); - Application.id.add(id); + public void muxingFinished(Broadcast broadcast, String streamId, File file, long startTime, long duration, int resolution, String previewPath, String vodId) { + super.muxingFinished(broadcast, streamId, file, startTime, duration, resolution, previewPath, vodId); + Application.id.add(broadcast.getStreamId()); Application.file.add(file); Application.duration.add(duration); Application.startTime.add(startTime); } + public static void resetFields() { Application.id.clear(); diff --git a/src/test/java/io/antmedia/test/MuxerUnitTest.java b/src/test/java/io/antmedia/test/MuxerUnitTest.java index 01241377c..002553f8e 100644 --- a/src/test/java/io/antmedia/test/MuxerUnitTest.java +++ b/src/test/java/io/antmedia/test/MuxerUnitTest.java @@ -138,6 +138,7 @@ import io.antmedia.datastore.db.InMemoryDataStore; import io.antmedia.datastore.db.types.Broadcast; import io.antmedia.datastore.db.types.Endpoint; +import io.antmedia.datastore.db.types.VoD; import io.antmedia.eRTMP.HEVCDecoderConfigurationParser.HEVCSPSParser; import io.antmedia.eRTMP.HEVCVideoEnhancedRTMP; import io.antmedia.integration.AppFunctionalV2Test; @@ -2653,11 +2654,9 @@ public void testPlusplus() //assertEquals(0xFF00, unsigned << 8); - - } - public File testMp4Muxing(String name, boolean shortVersion, boolean checkDuration) { + public File testMp4Muxing(String streamId, boolean shortVersion, boolean checkDuration) { logger.info("running testMp4Muxing"); @@ -2674,14 +2673,35 @@ public File testMp4Muxing(String name, boolean shortVersion, boolean checkDurati MuxAdaptor muxAdaptor = MuxAdaptor.initializeMuxAdaptor(clientBroadcastStream, null, false, appScope); - if (getDataStore().get(name) == null) { + String streamName = "broadcastName"; + String description = "broadadcastDescription"; + String metadata = "metadata"; + String lat = "1L"; + String longitude = "2L"; + String altitude = "3L"; + + boolean checkValuesVoDFields = false; + + if (getDataStore().get(streamId) == null) { + + checkValuesVoDFields = true; Broadcast broadcast = new Broadcast(); try { - broadcast.setStreamId(name); + broadcast.setStreamId(streamId); } catch (Exception e1) { e1.printStackTrace(); } + + broadcast.setName(streamName); + broadcast.setDescription(description); + broadcast.setLatitude(lat); + broadcast.setLongitude(longitude); + broadcast.setAltitude(altitude); + broadcast.setMetaData(metadata); + //set this zombi to trigger delete operation at the end + broadcast.setZombi(true); getDataStore().save(broadcast); + } getAppSettings().setMp4MuxingEnabled(true); getAppSettings().setHlsMuxingEnabled(false); @@ -2704,7 +2724,7 @@ public File testMp4Muxing(String name, boolean shortVersion, boolean checkDurati logger.debug("f path:" + file.getAbsolutePath()); assertTrue(file.exists()); - boolean result = muxAdaptor.init(appScope, name, false); + boolean result = muxAdaptor.init(appScope, streamId, false); assertTrue(result); @@ -2733,6 +2753,31 @@ public File testMp4Muxing(String name, boolean shortVersion, boolean checkDurati Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS).until(() -> MuxingTest.testFile(muxAdaptor.getMuxerList().get(0).getFile().getAbsolutePath(), finalDuration)); } + + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { + List tmpList = getDataStore().getVodList(0, 10, "date", "desc", streamId, null); + + return tmpList.size() > 0; + }); + + List vodList = getDataStore().getVodList(0, 10, "date", "desc", streamId, null); + + assertTrue(1 <= vodList.size()); + + if (checkValuesVoDFields) { + assertEquals(streamName, vodList.get(0).getStreamName()); + assertEquals(description, vodList.get(0).getDescription()); + + assertEquals(lat, vodList.get(0).getLatitude()); + + assertEquals(longitude, vodList.get(0).getLongitude()); + + assertEquals(altitude, vodList.get(0).getAltitude()); + assertEquals(metadata, vodList.get(0).getMetadata()); + } + + + return muxAdaptor.getMuxerList().get(0).getFile(); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/java/io/antmedia/test/storage/AmazonS3StorageClientTest.java b/src/test/java/io/antmedia/test/storage/AmazonS3StorageClientTest.java index 11e292b48..e0c303f60 100644 --- a/src/test/java/io/antmedia/test/storage/AmazonS3StorageClientTest.java +++ b/src/test/java/io/antmedia/test/storage/AmazonS3StorageClientTest.java @@ -4,12 +4,18 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -27,6 +33,7 @@ import com.amazonaws.event.ProgressListener; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.Upload; @@ -276,5 +283,28 @@ public void testCannedAcl() { assertEquals(CannedAccessControlList.AwsExecRead, storage.getCannedAcl()); + } + + @Test + public void testS3TransferBufferSize() { + AmazonS3StorageClient storage = spy(new AmazonS3StorageClient()); + int s3TransferBufferSize = 5000; + + storage.setTransferBufferSize(s3TransferBufferSize); + + TransferManager tm = Mockito.mock(TransferManager.class); + Mockito.doReturn(tm).when(storage).getTransferManager(); + Mockito.doReturn(true).when(storage).isEnabled(); + Mockito.doNothing().when(storage).listenUploadProgress(anyString(), nullable(File.class), anyBoolean(), any()); + + + storage.save("key", null, mock(InputStream.class), false, false); + + ArgumentCaptor putObjectRequestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + Mockito.verify(tm).upload(putObjectRequestCaptor.capture()); + assertEquals(s3TransferBufferSize, putObjectRequestCaptor.getValue().getRequestClientOptions().getReadLimit()); + + + } } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 34f280435..2452e5b02 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -28,5 +28,6 @@ +