Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileLongAtomic (for parallel testing) #232

Merged
merged 4 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.typesafe.config.impl.ConfigImpl;
import lombok.extern.slf4j.Slf4j;
import oap.concurrent.Threads;
import oap.concurrent.atomic.FileAtomicLong;
import oap.io.Sockets;
import oap.util.Strings;

Expand All @@ -36,14 +37,13 @@
import java.nio.file.Path;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import static oap.testng.Asserts.locationOfTestResource;

@Slf4j
public abstract class AbstractEnvFixture<Self extends AbstractEnvFixture<Self>> extends AbstractScopeFixture<Self> {
public static final String NO_PREFIX = "";
public static final AtomicInteger LAST_PORT = new AtomicInteger( 20000 );
public static final FileAtomicLong LAST_PORT = new FileAtomicLong( "/tmp/port.lock", 1, 10000 );
protected final String prefix;
private final ConcurrentMap<String, Integer> ports = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Object> properties = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -108,7 +108,7 @@ public int portFor( String key ) throws UncheckedIOException {
return ports.computeIfAbsent( prefix + key, k -> {
int port;
do {
port = LAST_PORT.incrementAndGet();
port = ( int ) LAST_PORT.updateAndGet( previousPort -> previousPort > 30000 ? 10000 : previousPort + 1 );
} while( !Sockets.isTcpPortAvailable( port ) );

log.debug( "{} finding port for key={}... port={}", this.getClass().getSimpleName(), k, port );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package oap.concurrent.atomic;

import oap.io.Closeables;
import oap.testng.Fixtures;
import oap.testng.TestDirectoryFixture;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Path;

import static org.assertj.core.api.Assertions.assertThat;

public class FileAtomicLongTest extends Fixtures {

private FileAtomicLong fileAtomicLong1;
private FileAtomicLong fileAtomicLong2;

public FileAtomicLongTest() {
fixture( TestDirectoryFixture.FIXTURE );
}

@BeforeMethod
public void beforeMethod() {
Path file = TestDirectoryFixture.testPath( "al" );

fileAtomicLong1 = new FileAtomicLong( file, 1, 0 );
fileAtomicLong2 = new FileAtomicLong( file, 1, 0 );
}

@AfterMethod
public void afterMethod() {
Closeables.close( fileAtomicLong2 );
Closeables.close( fileAtomicLong1 );
}

@Test
public void testGetSet() throws IOException {
assertThat( fileAtomicLong1.get() ).isEqualTo( 0L );
assertThat( fileAtomicLong2.get() ).isEqualTo( 0L );

fileAtomicLong1.set( 16 );

assertThat( fileAtomicLong1.get() ).isEqualTo( 16L );
assertThat( fileAtomicLong2.get() ).isEqualTo( 16L );

fileAtomicLong2.set( 17 );

assertThat( fileAtomicLong1.get() ).isEqualTo( 17L );
assertThat( fileAtomicLong2.get() ).isEqualTo( 17L );
}

@Test
public void testGetAndSet() throws IOException {
assertThat( fileAtomicLong1.getAndSet( -12L ) ).isEqualTo( 0L );
assertThat( fileAtomicLong2.getAndSet( 1L ) ).isEqualTo( -12L );
assertThat( fileAtomicLong1.getAndSet( 6L ) ).isEqualTo( 1L );
}

@Test
public void testCompareAndSet() throws IOException {
assertThat( fileAtomicLong1.compareAndSet( -1, -12L ) ).isFalse();
assertThat( fileAtomicLong2.get() ).isEqualTo( 0L );
assertThat( fileAtomicLong1.compareAndSet( 0, -12L ) ).isTrue();
assertThat( fileAtomicLong2.get() ).isEqualTo( -12L );
assertThat( fileAtomicLong1.compareAndSet( -1, 6L ) ).isFalse();
}

@Test
public void testGetAndAnd() throws IOException {
assertThat( fileAtomicLong1.getAndAdd( -12 ) ).isEqualTo( 0L );
assertThat( fileAtomicLong2.getAndAdd( 1 ) ).isEqualTo( -12L );
assertThat( fileAtomicLong1.getAndAdd( 5L ) ).isEqualTo( -11L );
assertThat( fileAtomicLong1.getAndAdd( 5L ) ).isEqualTo( -6L );
}

@Test
public void testAddAndGet() throws IOException {
assertThat( fileAtomicLong1.addAndGet( -12 ) ).isEqualTo( -12L );
assertThat( fileAtomicLong2.addAndGet( 1 ) ).isEqualTo( -11L );
assertThat( fileAtomicLong1.addAndGet( 5L ) ).isEqualTo( -6L );
assertThat( fileAtomicLong1.addAndGet( 5L ) ).isEqualTo( -1L );
}

@Test
public void testGetAndUpdate() throws IOException {
assertThat( fileAtomicLong1.getAndUpdate( old -> 12L ) ).isEqualTo( 0L );
assertThat( fileAtomicLong2.getAndUpdate( old -> old * 2 ) ).isEqualTo( 12L );
assertThat( fileAtomicLong1.getAndUpdate( old -> old * 3 ) ).isEqualTo( 24L );
}

@Test
public void testUpdateAndGet() throws IOException {
assertThat( fileAtomicLong1.updateAndGet( old -> 12L ) ).isEqualTo( 12L );
assertThat( fileAtomicLong2.updateAndGet( old -> old * 2 ) ).isEqualTo( 24L );
assertThat( fileAtomicLong1.updateAndGet( old -> old * 3 ) ).isEqualTo( 72L );
}
}
241 changes: 241 additions & 0 deletions oap-stdlib/src/main/java/oap/concurrent/atomic/FileAtomicLong.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package oap.concurrent.atomic;

import oap.concurrent.Threads;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableLong;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.io.UncheckedIOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.util.function.LongFunction;
import java.util.function.LongUnaryOperator;

/**
* FileAtomicLong
*/
public class FileAtomicLong implements Closeable {
protected final byte[] buffer = new byte[8];
private final File sharedLockFile;
private final long retryTimeMs;
private final long initValue;
private FileChannel channel;
private FileLock lock;

public FileAtomicLong( String sharedLockFile, long retryTimeMs, long initValue ) {
this( new File( sharedLockFile ), retryTimeMs, initValue );
}

public FileAtomicLong( File sharedLockFile, long retryTimeMs, long initValue ) {
this.sharedLockFile = sharedLockFile;
this.retryTimeMs = retryTimeMs;
this.initValue = initValue;
}

public FileAtomicLong( Path sharedLockFile, long retryTimeMs, long initValue ) {
this( sharedLockFile.toFile(), retryTimeMs, initValue );
}

public final long get() throws UncheckedIOException {
MutableLong ret = new MutableLong();
editFile( fileValue -> {
ret.setValue( fileValue );
return null;
} );
return ret.getValue();
}

public final void set( long newValue ) throws UncheckedIOException {
editFile( fileValue -> newValue );
}

public final long getAndSet( long newValue ) throws UncheckedIOException {
MutableLong ret = new MutableLong();
editFile( fileValue -> {
ret.setValue( fileValue );

return newValue;
} );

return ret.getValue();
}

public final boolean compareAndSet( long expectedValue, long newValue ) throws UncheckedIOException {
MutableBoolean ret = new MutableBoolean();

editFile( fileValue -> {
if( fileValue != expectedValue ) {
ret.setFalse();
return null;
} else {
ret.setTrue();
return newValue;
}
} );

return ret.booleanValue();
}

public final long getAndIncrement() throws UncheckedIOException {
return getAndAdd( 1 );
}

public final long getAndDecrement() throws UncheckedIOException {
return getAndAdd( -1 );
}

public final long getAndAdd( long delta ) throws UncheckedIOException {
MutableLong ret = new MutableLong();
editFile( fileValue -> {
ret.setValue( fileValue );

return fileValue + delta;
} );

return ret.getValue();
}

public final long incrementAndGet() throws UncheckedIOException {
return addAndGet( 1 );
}

public final long decrementAndGet() throws UncheckedIOException {
return addAndGet( -1 );
}

public final long addAndGet( long delta ) throws UncheckedIOException {
MutableLong ret = new MutableLong();
editFile( fileValue -> {
long value = fileValue + delta;
ret.setValue( value );

return value;
} );

return ret.getValue();
}

public final long getAndUpdate( LongUnaryOperator updateFunction ) throws UncheckedIOException {
MutableLong ret = new MutableLong();
editFile( fileValue -> {
ret.setValue( fileValue );

return updateFunction.applyAsLong( fileValue );
} );

return ret.getValue();
}

public final long updateAndGet( LongUnaryOperator updateFunction ) throws UncheckedIOException {
MutableLong ret = new MutableLong();
editFile( fileValue -> {
long value = updateFunction.applyAsLong( fileValue );
ret.setValue( value );

return value;
} );

return ret.getValue();
}

private boolean lockFile() throws UncheckedIOException {
try {
// Try to get the lock
channel = new RandomAccessFile( sharedLockFile, "rw" ).getChannel();
lock = channel.tryLock();
if( lock == null ) {
// File is locked by other application
channel.close();
return false;
}
return true;
} catch( IOException e ) {
throw new UncheckedIOException( e );
}
}

public void unlockFile() throws UncheckedIOException {
try {
if( lock != null ) {
lock.release();
channel.close();

lock = null;
channel = null;
}
} catch( IOException e ) {
throw new UncheckedIOException( e );
}
}

private void editFile( LongFunction<Long> func ) throws UncheckedIOException {
try {
while( !lockFile() ) {
Threads.sleepSafely( retryTimeMs );
}

channel.position( 0 );
long value = readLong( Channels.newInputStream( channel ), initValue );
Long result = func.apply( value );
if( result != null ) {
channel.position( 0 );
writeLong( Channels.newOutputStream( channel ), result );
}
} catch( IOException e ) {
throw new UncheckedIOException( e );
} finally {
unlockFile();
}
}

@Override
public void close() throws UncheckedIOException {
unlockFile();
}

protected void writeLong( OutputStream os, long v ) throws UncheckedIOException {
try {
buffer[0] = ( byte ) ( v >>> 56 );
buffer[1] = ( byte ) ( v >>> 48 );
buffer[2] = ( byte ) ( v >>> 40 );
buffer[3] = ( byte ) ( v >>> 32 );
buffer[4] = ( byte ) ( v >>> 24 );
buffer[5] = ( byte ) ( v >>> 16 );
buffer[6] = ( byte ) ( v >>> 8 );
buffer[7] = ( byte ) ( v >>> 0 );

os.write( buffer );
os.flush();
} catch( IOException e ) {
throw new UncheckedIOException( e );
}
}

protected long readLong( InputStream is, long initValue ) throws UncheckedIOException {
try {
IOUtils.readFully( is, buffer, 0, 8 );
} catch( EOFException ignored ) {
return initValue;
} catch( IOException e ) {
throw new UncheckedIOException( e );
}

return ( ( long ) buffer[0] << 56 )
+ ( ( long ) ( buffer[1] & 255 ) << 48 )
+ ( ( long ) ( buffer[2] & 255 ) << 40 )
+ ( ( long ) ( buffer[3] & 255 ) << 32 )
+ ( ( long ) ( buffer[4] & 255 ) << 24 )
+ ( ( buffer[5] & 255 ) << 16 )
+ ( ( buffer[6] & 255 ) << 8 )
+ ( ( buffer[7] & 255 ) << 0 );
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
</repositories>

<properties>
<oap.project.version>21.3.0</oap.project.version>
<oap.project.version>21.3.1</oap.project.version>

<oap.deps.config.version>21.0.0</oap.deps.config.version>

Expand Down
Loading