diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/AbstractFinisherTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/AbstractFinisherTest.java new file mode 100644 index 0000000000..01c3cc152c --- /dev/null +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/AbstractFinisherTest.java @@ -0,0 +1,74 @@ +package oap.logstream.disk; + +import oap.logstream.Timestamp; +import oap.testng.Fixtures; +import oap.testng.TestDirectoryFixture; +import oap.util.Dates; +import org.joda.time.DateTime; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileTime; +import java.util.List; +import java.util.Map; + +import static oap.util.Pair.__; +import static org.assertj.core.api.Assertions.assertThat; +import static org.joda.time.DateTimeZone.UTC; + +public class AbstractFinisherTest extends Fixtures { + private final TestDirectoryFixture testDirectoryFixture; + + public AbstractFinisherTest() { + testDirectoryFixture = fixture( new TestDirectoryFixture() ); + } + + @Test + public void testSort() throws IOException { + int safeInterval = 10; + Timestamp timestamp = Timestamp.BPH_6; + + Path logs = testDirectoryFixture.testPath( "logs" ); + Files.createDirectory( logs ); + MockFinisher finisher = new MockFinisher( logs, safeInterval, List.of( "*.txt" ), timestamp ); + finisher.priorityByType.put( "type2", 10 ); + + Path file11 = Files.createFile( logs.resolve( "file1-type1.txt" ) ); + Path file12 = Files.createFile( logs.resolve( "file1-type2.txt" ) ); + Path file21 = Files.createFile( logs.resolve( "file2-type1.txt" ) ); + Path file22 = Files.createFile( logs.resolve( "file2-type2.txt" ) ); + + LogMetadata type1 = new LogMetadata( "", "type1", "", Map.of(), new String[] {}, new byte[][] {} ); + LogMetadata type2 = new LogMetadata( "", "type2", "", Map.of(), new String[] {}, new byte[][] {} ); + + type1.writeFor( file11 ); + type2.writeFor( file12 ); + type1.writeFor( file21 ); + type2.writeFor( file22 ); + + Files.setLastModifiedTime( file11, FileTime.fromMillis( 123453 ) ); + Files.setLastModifiedTime( file12, FileTime.fromMillis( 123454 ) ); + Files.setLastModifiedTime( file21, FileTime.fromMillis( 123455 ) ); + Files.setLastModifiedTime( file22, FileTime.fromMillis( 123456 ) ); + + Dates.setTimeFixed( 123456 + Dates.m( 60 / timestamp.bucketsPerHour ) + safeInterval + 1 ); + + + finisher.run(); + + assertThat( finisher.files ) + .hasSize( 4 ); + + assertThat( finisher.files.subList( 0, 2 ) ).containsAnyOf( + __( file12, new DateTime( 123454, UTC ).withMillisOfSecond( 0 ).withSecondOfMinute( 0 ) ), + __( file22, new DateTime( 123456, UTC ).withMillisOfSecond( 0 ).withSecondOfMinute( 0 ) ) + ); + + assertThat( finisher.files.subList( 2, 4 ) ).containsAnyOf( + __( file11, new DateTime( 123453, UTC ).withMillisOfSecond( 0 ).withSecondOfMinute( 0 ) ), + __( file21, new DateTime( 123455, UTC ).withMillisOfSecond( 0 ).withSecondOfMinute( 0 ) ) + ); + } +} diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/MockFinisher.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/MockFinisher.java new file mode 100644 index 0000000000..1b75c23970 --- /dev/null +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/MockFinisher.java @@ -0,0 +1,28 @@ +package oap.logstream.disk; + +import oap.logstream.Timestamp; +import oap.util.Pair; +import org.joda.time.DateTime; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static oap.util.Pair.__; + +public class MockFinisher extends AbstractFinisher { + public final ArrayList> files = new ArrayList<>(); + + protected MockFinisher( Path sourceDirectory, long safeInterval, List mask, Timestamp timestamp ) { + super( sourceDirectory, safeInterval, mask, timestamp ); + } + + @Override + protected void cleanup() { + } + + @Override + protected void process( Path path, DateTime bucketTime ) { + files.add( __( path, bucketTime ) ); + } +} diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogIdTemplate.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogIdTemplate.java index d0d54b8551..9f787964b7 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogIdTemplate.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogIdTemplate.java @@ -92,6 +92,7 @@ public void init( ST st, DateTime time, Timestamp timestamp, int version ) { st.add( "DAY", print2Chars( time.getDayOfMonth() ) ); st.add( "HOUR", print2Chars( time.getHourOfDay() ) ); st.add( "MINUTE", print2Chars( time.getMinuteOfHour() ) ); + st.add( "SECOND", print2Chars( time.getSecondOfMinute() ) ); st.add( "INTERVAL", print2Chars( timestamp.currentBucket( time ) ) ); st.add( "LOG_TIME_INTERVAL", String.valueOf( 60 / timestamp.bucketsPerHour ) ); st.add( "REGION", System.getenv( "REGION" ) ); diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractFinisher.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractFinisher.java index bcc0eb76c2..8c09338dc2 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractFinisher.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractFinisher.java @@ -1,17 +1,25 @@ package oap.logstream.disk; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.AllArgsConstructor; import lombok.SneakyThrows; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import oap.concurrent.Executors; +import oap.concurrent.ThreadPoolExecutor; import oap.io.Files; import oap.logstream.Timestamp; import oap.util.Dates; +import oap.util.Lists; import org.joda.time.DateTime; import org.joda.time.DateTimeUtils; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.joda.time.DateTimeZone.UTC; @@ -25,6 +33,7 @@ public abstract class AbstractFinisher implements Runnable { public final Path corruptedDirectory; private final Timestamp timestamp; public int threads = Runtime.getRuntime().availableProcessors(); + public LinkedHashMap priorityByType = new LinkedHashMap<>(); protected int bufferSize = 1024 * 256 * 4 * 4; @@ -47,30 +56,76 @@ public void run() { run( false ); } + @SuppressWarnings( "checkstyle:ModifiedControlVariable" ) @SneakyThrows public void run( boolean forceSync ) { log.debug( "force {} let's start packing of {} in {}", forceSync, mask, sourceDirectory ); log.debug( "current timestamp is {}", timestamp.toStartOfBucket( DateTime.now( UTC ) ) ); - var bucketStartTime = timestamp.currentBucketStartMillis(); - var elapsed = DateTimeUtils.currentTimeMillis() - bucketStartTime; + long bucketStartTime = timestamp.currentBucketStartMillis(); + long elapsed = DateTimeUtils.currentTimeMillis() - bucketStartTime; if( elapsed < safeInterval ) { log.debug( "not safe to process yet ({}ms), some of the files could still be open, waiting...", elapsed ); cleanup(); log.debug( "packing is skipped" ); return; } - var pool = Executors.newFixedBlockingThreadPool( threads, new ThreadFactoryBuilder().setNameFormat( "finisher-%d" ).build() ); - for( Path path : Files.wildcard( sourceDirectory, mask ) ) { - if( path.startsWith( corruptedDirectory ) ) continue; - if( LogMetadata.isMetadata( path ) ) continue; + ThreadPoolExecutor pool = Executors.newFixedBlockingThreadPool( threads, new ThreadFactoryBuilder().setNameFormat( "finisher-%d" ).build() ); + + + List logs = Files.wildcard( sourceDirectory, mask ); + logs = Lists.filter( logs, path -> { + if( path.startsWith( corruptedDirectory ) ) return false; + if( LogMetadata.isMetadata( path ) ) return false; DateTime lastModifiedTime = timestamp.toStartOfBucket( new DateTime( Files.getLastModifiedTime( path ), UTC ) ); - if( forceSync || lastModifiedTime.isBefore( bucketStartTime ) ) { - pool.execute( () -> process( path, lastModifiedTime ) ); - } else log.debug( "skipping (current timestamp) {}", path ); + if( !forceSync && !lastModifiedTime.isBefore( bucketStartTime ) ) { + log.debug( "skipping (current timestamp) {}", path ); + return false; + } + + return true; + } ); + + List logInfos = Lists.map( logs, path -> { + LogMetadata logMetadata = LogMetadata.readFor( path ); + long lastModifiedTime = Files.getLastModifiedTime( path ); + + return new LogInfo( path, lastModifiedTime, logMetadata.type, priorityByType.getOrDefault( logMetadata.type, 0 ) ); + } ); + + + logInfos.sort( ( li1, li2 ) -> { + Comparator comparator = Comparator + .comparingInt( logInfo -> logInfo.priority ).reversed() + .thenComparingLong( logInfo -> logInfo.lastModifiedTime ); + + return comparator.compare( li1, li2 ); + } ); + + + int priority = 0; + ArrayList> futures = new ArrayList<>(); + + for( int i = 0; i < logInfos.size(); i++ ) { + LogInfo logInfo = logInfos.get( i ); + if( priority == logInfo.priority ) { + DateTime lastModifiedTime = timestamp.toStartOfBucket( new DateTime( logInfo.lastModifiedTime, UTC ) ); + futures.add( CompletableFuture.runAsync( () -> process( logInfo.path, lastModifiedTime ), pool ) ); + } else { + CompletableFuture allOf = CompletableFuture.allOf( futures.toArray( new CompletableFuture[0] ) ); + allOf.get( 60 / timestamp.bucketsPerHour, TimeUnit.MINUTES ); + futures.clear(); + + priority = logInfo.priority; + i--; + } } + CompletableFuture allOf = CompletableFuture.allOf( futures.toArray( new CompletableFuture[0] ) ); + allOf.get( 60 / timestamp.bucketsPerHour, TimeUnit.MINUTES ); + pool.shutdown(); + long fullTimeout = DateTime.now().getMillis() + TimeUnit.MINUTES.toMillis( 20 ); while( !pool.awaitTermination( 1, TimeUnit.MINUTES ) ) { if( DateTime.now().getMillis() <= fullTimeout ) { @@ -86,4 +141,13 @@ public void run( boolean forceSync ) { protected abstract void cleanup(); protected abstract void process( Path path, DateTime bucketTime ); + + @ToString + @AllArgsConstructor + private static class LogInfo { + public final Path path; + public final long lastModifiedTime; + public final String type; + public final int priority; + } } diff --git a/pom.xml b/pom.xml index 7970ee39fc..0b0410aa07 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ - 22.4.12 + 22.4.13 21.0.0 21.0.1