Skip to content

Commit

Permalink
OAP-202 oap-logstream finisher: sort by type priority
Browse files Browse the repository at this point in the history
  • Loading branch information
galaxina committed Aug 29, 2024
1 parent c2cb6c5 commit acf0d99
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package oap.logstream.disk;

import oap.logstream.Timestamp;
import oap.testng.Fixtures;
import oap.testng.TestDirectoryFixture;
import oap.util.Dates;
import org.joda.time.DateTime;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileTime;
import java.util.List;
import java.util.Map;

import static oap.util.Pair.__;
import static org.assertj.core.api.Assertions.assertThat;
import static org.joda.time.DateTimeZone.UTC;

public class AbstractFinisherTest extends Fixtures {
private final TestDirectoryFixture testDirectoryFixture;

public AbstractFinisherTest() {
testDirectoryFixture = fixture( new TestDirectoryFixture() );
}

@Test
public void testSort() throws IOException {
int safeInterval = 10;
Timestamp timestamp = Timestamp.BPH_6;

Path logs = testDirectoryFixture.testPath( "logs" );
Files.createDirectory( logs );
MockFinisher finisher = new MockFinisher( logs, safeInterval, List.of( "*.txt" ), timestamp );
finisher.priorityByType.put( "type2", 10 );

Path file11 = Files.createFile( logs.resolve( "file1-type1.txt" ) );
Path file12 = Files.createFile( logs.resolve( "file1-type2.txt" ) );
Path file21 = Files.createFile( logs.resolve( "file2-type1.txt" ) );
Path file22 = Files.createFile( logs.resolve( "file2-type2.txt" ) );

LogMetadata type1 = new LogMetadata( "", "type1", "", Map.of(), new String[] {}, new byte[][] {} );
LogMetadata type2 = new LogMetadata( "", "type2", "", Map.of(), new String[] {}, new byte[][] {} );

type1.writeFor( file11 );
type2.writeFor( file12 );
type1.writeFor( file21 );
type2.writeFor( file22 );

Files.setLastModifiedTime( file11, FileTime.fromMillis( 123453 ) );
Files.setLastModifiedTime( file12, FileTime.fromMillis( 123454 ) );
Files.setLastModifiedTime( file21, FileTime.fromMillis( 123455 ) );
Files.setLastModifiedTime( file22, FileTime.fromMillis( 123456 ) );

Dates.setTimeFixed( 123456 + Dates.m( 60 / timestamp.bucketsPerHour ) + safeInterval + 1 );


finisher.run();

assertThat( finisher.files )
.hasSize( 4 );

assertThat( finisher.files.subList( 0, 2 ) ).containsAnyOf(
__( file12, new DateTime( 123454, UTC ).withMillisOfSecond( 0 ).withSecondOfMinute( 0 ) ),
__( file22, new DateTime( 123456, UTC ).withMillisOfSecond( 0 ).withSecondOfMinute( 0 ) )
);

assertThat( finisher.files.subList( 2, 4 ) ).containsAnyOf(
__( file11, new DateTime( 123453, UTC ).withMillisOfSecond( 0 ).withSecondOfMinute( 0 ) ),
__( file21, new DateTime( 123455, UTC ).withMillisOfSecond( 0 ).withSecondOfMinute( 0 ) )
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package oap.logstream.disk;

import oap.logstream.Timestamp;
import oap.util.Pair;
import org.joda.time.DateTime;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

import static oap.util.Pair.__;

public class MockFinisher extends AbstractFinisher {
public final ArrayList<Pair<Path, DateTime>> files = new ArrayList<>();

protected MockFinisher( Path sourceDirectory, long safeInterval, List<String> mask, Timestamp timestamp ) {
super( sourceDirectory, safeInterval, mask, timestamp );
}

@Override
protected void cleanup() {
}

@Override
protected void process( Path path, DateTime bucketTime ) {
files.add( __( path, bucketTime ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void init( ST st, DateTime time, Timestamp timestamp, int version ) {
st.add( "DAY", print2Chars( time.getDayOfMonth() ) );
st.add( "HOUR", print2Chars( time.getHourOfDay() ) );
st.add( "MINUTE", print2Chars( time.getMinuteOfHour() ) );
st.add( "SECOND", print2Chars( time.getSecondOfMinute() ) );
st.add( "INTERVAL", print2Chars( timestamp.currentBucket( time ) ) );
st.add( "LOG_TIME_INTERVAL", String.valueOf( 60 / timestamp.bucketsPerHour ) );
st.add( "REGION", System.getenv( "REGION" ) );
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package oap.logstream.disk;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import oap.concurrent.Executors;
import oap.concurrent.ThreadPoolExecutor;
import oap.io.Files;
import oap.logstream.Timestamp;
import oap.util.Dates;
import oap.util.Lists;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.joda.time.DateTimeZone.UTC;
Expand All @@ -25,6 +33,7 @@ public abstract class AbstractFinisher implements Runnable {
public final Path corruptedDirectory;
private final Timestamp timestamp;
public int threads = Runtime.getRuntime().availableProcessors();
public LinkedHashMap<String, Integer> priorityByType = new LinkedHashMap<>();
protected int bufferSize = 1024 * 256 * 4 * 4;


Expand All @@ -47,30 +56,76 @@ public void run() {
run( false );
}

@SuppressWarnings( "checkstyle:ModifiedControlVariable" )
@SneakyThrows
public void run( boolean forceSync ) {
log.debug( "force {} let's start packing of {} in {}", forceSync, mask, sourceDirectory );

log.debug( "current timestamp is {}", timestamp.toStartOfBucket( DateTime.now( UTC ) ) );
var bucketStartTime = timestamp.currentBucketStartMillis();
var elapsed = DateTimeUtils.currentTimeMillis() - bucketStartTime;
long bucketStartTime = timestamp.currentBucketStartMillis();
long elapsed = DateTimeUtils.currentTimeMillis() - bucketStartTime;
if( elapsed < safeInterval ) {
log.debug( "not safe to process yet ({}ms), some of the files could still be open, waiting...", elapsed );
cleanup();
log.debug( "packing is skipped" );
return;
}
var pool = Executors.newFixedBlockingThreadPool( threads, new ThreadFactoryBuilder().setNameFormat( "finisher-%d" ).build() );
for( Path path : Files.wildcard( sourceDirectory, mask ) ) {
if( path.startsWith( corruptedDirectory ) ) continue;
if( LogMetadata.isMetadata( path ) ) continue;
ThreadPoolExecutor pool = Executors.newFixedBlockingThreadPool( threads, new ThreadFactoryBuilder().setNameFormat( "finisher-%d" ).build() );


List<Path> logs = Files.wildcard( sourceDirectory, mask );
logs = Lists.filter( logs, path -> {
if( path.startsWith( corruptedDirectory ) ) return false;
if( LogMetadata.isMetadata( path ) ) return false;

DateTime lastModifiedTime = timestamp.toStartOfBucket( new DateTime( Files.getLastModifiedTime( path ), UTC ) );
if( forceSync || lastModifiedTime.isBefore( bucketStartTime ) ) {
pool.execute( () -> process( path, lastModifiedTime ) );
} else log.debug( "skipping (current timestamp) {}", path );
if( !forceSync && !lastModifiedTime.isBefore( bucketStartTime ) ) {
log.debug( "skipping (current timestamp) {}", path );
return false;
}

return true;
} );

List<LogInfo> logInfos = Lists.map( logs, path -> {
LogMetadata logMetadata = LogMetadata.readFor( path );
long lastModifiedTime = Files.getLastModifiedTime( path );

return new LogInfo( path, lastModifiedTime, logMetadata.type, priorityByType.getOrDefault( logMetadata.type, 0 ) );
} );


logInfos.sort( ( li1, li2 ) -> {
Comparator<LogInfo> comparator = Comparator
.<LogInfo>comparingInt( logInfo -> logInfo.priority ).reversed()
.thenComparingLong( logInfo -> logInfo.lastModifiedTime );

return comparator.compare( li1, li2 );
} );


int priority = 0;
ArrayList<CompletableFuture<?>> futures = new ArrayList<>();

for( int i = 0; i < logInfos.size(); i++ ) {
LogInfo logInfo = logInfos.get( i );
if( priority == logInfo.priority ) {
DateTime lastModifiedTime = timestamp.toStartOfBucket( new DateTime( logInfo.lastModifiedTime, UTC ) );
futures.add( CompletableFuture.runAsync( () -> process( logInfo.path, lastModifiedTime ), pool ) );
} else {
CompletableFuture<Void> allOf = CompletableFuture.allOf( futures.toArray( new CompletableFuture<?>[0] ) );
allOf.get( 60 / timestamp.bucketsPerHour, TimeUnit.MINUTES );
futures.clear();

priority = logInfo.priority;
i--;
}
}
CompletableFuture<Void> allOf = CompletableFuture.allOf( futures.toArray( new CompletableFuture<?>[0] ) );
allOf.get( 60 / timestamp.bucketsPerHour, TimeUnit.MINUTES );

pool.shutdown();

long fullTimeout = DateTime.now().getMillis() + TimeUnit.MINUTES.toMillis( 20 );
while( !pool.awaitTermination( 1, TimeUnit.MINUTES ) ) {
if( DateTime.now().getMillis() <= fullTimeout ) {
Expand All @@ -86,4 +141,13 @@ public void run( boolean forceSync ) {
protected abstract void cleanup();

protected abstract void process( Path path, DateTime bucketTime );

@ToString
@AllArgsConstructor
private static class LogInfo {
public final Path path;
public final long lastModifiedTime;
public final String type;
public final int priority;
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
</distributionManagement>

<properties>
<oap.project.version>22.4.12</oap.project.version>
<oap.project.version>22.4.13</oap.project.version>

<oap.deps.config.version>21.0.0</oap.deps.config.version>
<oap.deps.oap-teamcity.version>21.0.1</oap.deps.oap-teamcity.version>
Expand Down

0 comments on commit acf0d99

Please sign in to comment.