diff --git a/oap-stdlib-test/src/main/java/oap/testng/AbstractEnvFixture.java b/oap-stdlib-test/src/main/java/oap/testng/AbstractEnvFixture.java index c97f0030bc..8f4657d713 100644 --- a/oap-stdlib-test/src/main/java/oap/testng/AbstractEnvFixture.java +++ b/oap-stdlib-test/src/main/java/oap/testng/AbstractEnvFixture.java @@ -28,6 +28,7 @@ import com.typesafe.config.impl.ConfigImpl; import lombok.extern.slf4j.Slf4j; import oap.concurrent.Threads; +import oap.concurrent.atomic.FileAtomicLong; import oap.io.Sockets; import oap.util.Strings; @@ -36,14 +37,13 @@ import java.nio.file.Path; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import static oap.testng.Asserts.locationOfTestResource; @Slf4j public abstract class AbstractEnvFixture> extends AbstractScopeFixture { public static final String NO_PREFIX = ""; - public static final AtomicInteger LAST_PORT = new AtomicInteger( 20000 ); + public static final FileAtomicLong LAST_PORT = new FileAtomicLong( "/tmp/port.lock", 1, 10000 ); protected final String prefix; private final ConcurrentMap ports = new ConcurrentHashMap<>(); private final ConcurrentMap properties = new ConcurrentHashMap<>(); @@ -108,7 +108,7 @@ public int portFor( String key ) throws UncheckedIOException { return ports.computeIfAbsent( prefix + key, k -> { int port; do { - port = LAST_PORT.incrementAndGet(); + port = ( int ) LAST_PORT.updateAndGet( previousPort -> previousPort > 30000 ? 10000 : previousPort + 1 ); } while( !Sockets.isTcpPortAvailable( port ) ); log.debug( "{} finding port for key={}... port={}", this.getClass().getSimpleName(), k, port ); diff --git a/oap-stdlib-test/src/test/java/oap/concurrent/atomic/FileAtomicLongTest.java b/oap-stdlib-test/src/test/java/oap/concurrent/atomic/FileAtomicLongTest.java new file mode 100644 index 0000000000..b469724172 --- /dev/null +++ b/oap-stdlib-test/src/test/java/oap/concurrent/atomic/FileAtomicLongTest.java @@ -0,0 +1,99 @@ +package oap.concurrent.atomic; + +import oap.io.Closeables; +import oap.testng.Fixtures; +import oap.testng.TestDirectoryFixture; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.file.Path; + +import static org.assertj.core.api.Assertions.assertThat; + +public class FileAtomicLongTest extends Fixtures { + + private FileAtomicLong fileAtomicLong1; + private FileAtomicLong fileAtomicLong2; + + public FileAtomicLongTest() { + fixture( TestDirectoryFixture.FIXTURE ); + } + + @BeforeMethod + public void beforeMethod() { + Path file = TestDirectoryFixture.testPath( "al" ); + + fileAtomicLong1 = new FileAtomicLong( file, 1, 0 ); + fileAtomicLong2 = new FileAtomicLong( file, 1, 0 ); + } + + @AfterMethod + public void afterMethod() { + Closeables.close( fileAtomicLong2 ); + Closeables.close( fileAtomicLong1 ); + } + + @Test + public void testGetSet() throws IOException { + assertThat( fileAtomicLong1.get() ).isEqualTo( 0L ); + assertThat( fileAtomicLong2.get() ).isEqualTo( 0L ); + + fileAtomicLong1.set( 16 ); + + assertThat( fileAtomicLong1.get() ).isEqualTo( 16L ); + assertThat( fileAtomicLong2.get() ).isEqualTo( 16L ); + + fileAtomicLong2.set( 17 ); + + assertThat( fileAtomicLong1.get() ).isEqualTo( 17L ); + assertThat( fileAtomicLong2.get() ).isEqualTo( 17L ); + } + + @Test + public void testGetAndSet() throws IOException { + assertThat( fileAtomicLong1.getAndSet( -12L ) ).isEqualTo( 0L ); + assertThat( fileAtomicLong2.getAndSet( 1L ) ).isEqualTo( -12L ); + assertThat( fileAtomicLong1.getAndSet( 6L ) ).isEqualTo( 1L ); + } + + @Test + public void testCompareAndSet() throws IOException { + assertThat( fileAtomicLong1.compareAndSet( -1, -12L ) ).isFalse(); + assertThat( fileAtomicLong2.get() ).isEqualTo( 0L ); + assertThat( fileAtomicLong1.compareAndSet( 0, -12L ) ).isTrue(); + assertThat( fileAtomicLong2.get() ).isEqualTo( -12L ); + assertThat( fileAtomicLong1.compareAndSet( -1, 6L ) ).isFalse(); + } + + @Test + public void testGetAndAnd() throws IOException { + assertThat( fileAtomicLong1.getAndAdd( -12 ) ).isEqualTo( 0L ); + assertThat( fileAtomicLong2.getAndAdd( 1 ) ).isEqualTo( -12L ); + assertThat( fileAtomicLong1.getAndAdd( 5L ) ).isEqualTo( -11L ); + assertThat( fileAtomicLong1.getAndAdd( 5L ) ).isEqualTo( -6L ); + } + + @Test + public void testAddAndGet() throws IOException { + assertThat( fileAtomicLong1.addAndGet( -12 ) ).isEqualTo( -12L ); + assertThat( fileAtomicLong2.addAndGet( 1 ) ).isEqualTo( -11L ); + assertThat( fileAtomicLong1.addAndGet( 5L ) ).isEqualTo( -6L ); + assertThat( fileAtomicLong1.addAndGet( 5L ) ).isEqualTo( -1L ); + } + + @Test + public void testGetAndUpdate() throws IOException { + assertThat( fileAtomicLong1.getAndUpdate( old -> 12L ) ).isEqualTo( 0L ); + assertThat( fileAtomicLong2.getAndUpdate( old -> old * 2 ) ).isEqualTo( 12L ); + assertThat( fileAtomicLong1.getAndUpdate( old -> old * 3 ) ).isEqualTo( 24L ); + } + + @Test + public void testUpdateAndGet() throws IOException { + assertThat( fileAtomicLong1.updateAndGet( old -> 12L ) ).isEqualTo( 12L ); + assertThat( fileAtomicLong2.updateAndGet( old -> old * 2 ) ).isEqualTo( 24L ); + assertThat( fileAtomicLong1.updateAndGet( old -> old * 3 ) ).isEqualTo( 72L ); + } +} diff --git a/oap-stdlib/src/main/java/oap/concurrent/atomic/FileAtomicLong.java b/oap-stdlib/src/main/java/oap/concurrent/atomic/FileAtomicLong.java new file mode 100644 index 0000000000..ccbe529045 --- /dev/null +++ b/oap-stdlib/src/main/java/oap/concurrent/atomic/FileAtomicLong.java @@ -0,0 +1,241 @@ +package oap.concurrent.atomic; + +import oap.concurrent.Threads; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.mutable.MutableLong; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.io.UncheckedIOException; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.Path; +import java.util.function.LongFunction; +import java.util.function.LongUnaryOperator; + +/** + * FileAtomicLong + */ +public class FileAtomicLong implements Closeable { + protected final byte[] buffer = new byte[8]; + private final File sharedLockFile; + private final long retryTimeMs; + private final long initValue; + private FileChannel channel; + private FileLock lock; + + public FileAtomicLong( String sharedLockFile, long retryTimeMs, long initValue ) { + this( new File( sharedLockFile ), retryTimeMs, initValue ); + } + + public FileAtomicLong( File sharedLockFile, long retryTimeMs, long initValue ) { + this.sharedLockFile = sharedLockFile; + this.retryTimeMs = retryTimeMs; + this.initValue = initValue; + } + + public FileAtomicLong( Path sharedLockFile, long retryTimeMs, long initValue ) { + this( sharedLockFile.toFile(), retryTimeMs, initValue ); + } + + public final long get() throws UncheckedIOException { + MutableLong ret = new MutableLong(); + editFile( fileValue -> { + ret.setValue( fileValue ); + return null; + } ); + return ret.getValue(); + } + + public final void set( long newValue ) throws UncheckedIOException { + editFile( fileValue -> newValue ); + } + + public final long getAndSet( long newValue ) throws UncheckedIOException { + MutableLong ret = new MutableLong(); + editFile( fileValue -> { + ret.setValue( fileValue ); + + return newValue; + } ); + + return ret.getValue(); + } + + public final boolean compareAndSet( long expectedValue, long newValue ) throws UncheckedIOException { + MutableBoolean ret = new MutableBoolean(); + + editFile( fileValue -> { + if( fileValue != expectedValue ) { + ret.setFalse(); + return null; + } else { + ret.setTrue(); + return newValue; + } + } ); + + return ret.booleanValue(); + } + + public final long getAndIncrement() throws UncheckedIOException { + return getAndAdd( 1 ); + } + + public final long getAndDecrement() throws UncheckedIOException { + return getAndAdd( -1 ); + } + + public final long getAndAdd( long delta ) throws UncheckedIOException { + MutableLong ret = new MutableLong(); + editFile( fileValue -> { + ret.setValue( fileValue ); + + return fileValue + delta; + } ); + + return ret.getValue(); + } + + public final long incrementAndGet() throws UncheckedIOException { + return addAndGet( 1 ); + } + + public final long decrementAndGet() throws UncheckedIOException { + return addAndGet( -1 ); + } + + public final long addAndGet( long delta ) throws UncheckedIOException { + MutableLong ret = new MutableLong(); + editFile( fileValue -> { + long value = fileValue + delta; + ret.setValue( value ); + + return value; + } ); + + return ret.getValue(); + } + + public final long getAndUpdate( LongUnaryOperator updateFunction ) throws UncheckedIOException { + MutableLong ret = new MutableLong(); + editFile( fileValue -> { + ret.setValue( fileValue ); + + return updateFunction.applyAsLong( fileValue ); + } ); + + return ret.getValue(); + } + + public final long updateAndGet( LongUnaryOperator updateFunction ) throws UncheckedIOException { + MutableLong ret = new MutableLong(); + editFile( fileValue -> { + long value = updateFunction.applyAsLong( fileValue ); + ret.setValue( value ); + + return value; + } ); + + return ret.getValue(); + } + + private boolean lockFile() throws UncheckedIOException { + try { + // Try to get the lock + channel = new RandomAccessFile( sharedLockFile, "rw" ).getChannel(); + lock = channel.tryLock(); + if( lock == null ) { + // File is locked by other application + channel.close(); + return false; + } + return true; + } catch( IOException e ) { + throw new UncheckedIOException( e ); + } + } + + public void unlockFile() throws UncheckedIOException { + try { + if( lock != null ) { + lock.release(); + channel.close(); + + lock = null; + channel = null; + } + } catch( IOException e ) { + throw new UncheckedIOException( e ); + } + } + + private void editFile( LongFunction func ) throws UncheckedIOException { + try { + while( !lockFile() ) { + Threads.sleepSafely( retryTimeMs ); + } + + channel.position( 0 ); + long value = readLong( Channels.newInputStream( channel ), initValue ); + Long result = func.apply( value ); + if( result != null ) { + channel.position( 0 ); + writeLong( Channels.newOutputStream( channel ), result ); + } + } catch( IOException e ) { + throw new UncheckedIOException( e ); + } finally { + unlockFile(); + } + } + + @Override + public void close() throws UncheckedIOException { + unlockFile(); + } + + protected void writeLong( OutputStream os, long v ) throws UncheckedIOException { + try { + buffer[0] = ( byte ) ( v >>> 56 ); + buffer[1] = ( byte ) ( v >>> 48 ); + buffer[2] = ( byte ) ( v >>> 40 ); + buffer[3] = ( byte ) ( v >>> 32 ); + buffer[4] = ( byte ) ( v >>> 24 ); + buffer[5] = ( byte ) ( v >>> 16 ); + buffer[6] = ( byte ) ( v >>> 8 ); + buffer[7] = ( byte ) ( v >>> 0 ); + + os.write( buffer ); + os.flush(); + } catch( IOException e ) { + throw new UncheckedIOException( e ); + } + } + + protected long readLong( InputStream is, long initValue ) throws UncheckedIOException { + try { + IOUtils.readFully( is, buffer, 0, 8 ); + } catch( EOFException ignored ) { + return initValue; + } catch( IOException e ) { + throw new UncheckedIOException( e ); + } + + return ( ( long ) buffer[0] << 56 ) + + ( ( long ) ( buffer[1] & 255 ) << 48 ) + + ( ( long ) ( buffer[2] & 255 ) << 40 ) + + ( ( long ) ( buffer[3] & 255 ) << 32 ) + + ( ( long ) ( buffer[4] & 255 ) << 24 ) + + ( ( buffer[5] & 255 ) << 16 ) + + ( ( buffer[6] & 255 ) << 8 ) + + ( ( buffer[7] & 255 ) << 0 ); + } +} diff --git a/pom.xml b/pom.xml index 357c6517a9..7c42dfa135 100644 --- a/pom.xml +++ b/pom.xml @@ -46,7 +46,7 @@ - 21.3.0 + 21.3.1 21.0.0