From e925aec13f8b9279dbc1205c148e00bd67ad9999 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 24 Apr 2024 12:40:01 +0300 Subject: [PATCH 1/3] OAP-125 IoStreams#out: add parameter throwError if file exists --- .../test/java/oap/logstream/LoggerTest.java | 2 +- .../src/test/java/oap/io/IoStreamsTest.java | 13 + .../test/java/oap/util/ThrowablesTest.java | 3 +- .../main/java/oap/io/FileExistsException.java | 13 + .../java/oap/io/FileNotFoundException.java | 11 + oap-stdlib/src/main/java/oap/io/Files.java | 23 +- .../src/main/java/oap/io/IOException.java | 18 + .../src/main/java/oap/io/IoStreams.java | 334 +++++++++++------- .../java/oap/io/MalformedURLException.java | 7 + oap-stdlib/src/main/java/oap/io/Paths.java | 12 +- .../src/main/java/oap/io/Resources.java | 8 +- .../java/oap/io/SafeFileOutputStream.java | 21 +- .../src/main/java/oap/util/Throwables.java | 16 +- .../sso/JwtTokenGeneratorExtractorTest.java | 5 +- pom.xml | 2 +- 15 files changed, 323 insertions(+), 165 deletions(-) create mode 100644 oap-stdlib/src/main/java/oap/io/FileExistsException.java create mode 100644 oap-stdlib/src/main/java/oap/io/FileNotFoundException.java create mode 100644 oap-stdlib/src/main/java/oap/io/IOException.java create mode 100644 oap-stdlib/src/main/java/oap/io/MalformedURLException.java diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/LoggerTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/LoggerTest.java index 777f6366bd..fc3fe8b4b3 100644 --- a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/LoggerTest.java +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/LoggerTest.java @@ -29,8 +29,8 @@ import oap.logstream.disk.DiskLoggerBackend; import oap.logstream.net.client.SocketLoggerBackend; import oap.logstream.net.server.SocketLoggerServer; -import oap.message.server.MessageHttpHandler; import oap.message.client.MessageSender; +import oap.message.server.MessageHttpHandler; import oap.template.BinaryUtils; import oap.template.Types; import oap.testng.Fixtures; diff --git a/oap-stdlib-test/src/test/java/oap/io/IoStreamsTest.java b/oap-stdlib-test/src/test/java/oap/io/IoStreamsTest.java index e3f582bb17..0cd48bd3b3 100644 --- a/oap-stdlib-test/src/test/java/oap/io/IoStreamsTest.java +++ b/oap-stdlib-test/src/test/java/oap/io/IoStreamsTest.java @@ -53,6 +53,7 @@ import static oap.testng.Asserts.assertFile; import static oap.testng.Asserts.pathOfTestResource; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public class IoStreamsTest extends Fixtures { private final TestDirectoryFixture testDirectoryFixture; @@ -151,4 +152,16 @@ public void compressionLevel() { System.out.println( encoding + ":\t" + content.length() + " -> " + path.toFile().length() ); } } + + @Test + public void testThrowIfFileExists() throws IOException { + Path testFile = testDirectoryFixture.testPath( "testFile" ); + testFile.toFile().createNewFile(); + + assertThatThrownBy( () -> { + try( var _ = IoStreams.out( testFile, new IoStreams.OutOptions().withThrowIfFileExists( true ) ) ) { + System.out.println( "!" ); + } + } ).isInstanceOf( oap.io.FileExistsException.class ); + } } diff --git a/oap-stdlib-test/src/test/java/oap/util/ThrowablesTest.java b/oap-stdlib-test/src/test/java/oap/util/ThrowablesTest.java index 49ea74d9a8..8fe249031e 100644 --- a/oap-stdlib-test/src/test/java/oap/util/ThrowablesTest.java +++ b/oap-stdlib-test/src/test/java/oap/util/ThrowablesTest.java @@ -27,10 +27,9 @@ import org.testng.annotations.Test; import java.io.IOException; -import java.io.UncheckedIOException; public class ThrowablesTest { - @Test( expectedExceptions = UncheckedIOException.class, expectedExceptionsMessageRegExp = "java.io.IOException: test" ) + @Test( expectedExceptions = oap.io.IOException.class, expectedExceptionsMessageRegExp = "java.io.IOException: test" ) public void propagateIOException() { throw Throwables.propagate( new IOException( "test" ) ); } diff --git a/oap-stdlib/src/main/java/oap/io/FileExistsException.java b/oap-stdlib/src/main/java/oap/io/FileExistsException.java new file mode 100644 index 0000000000..672ce45594 --- /dev/null +++ b/oap-stdlib/src/main/java/oap/io/FileExistsException.java @@ -0,0 +1,13 @@ +package oap.io; + +import java.io.File; + +public class FileExistsException extends IOException { + public FileExistsException( File file ) { + super( "File " + file + " exists" ); + } + + public FileExistsException( IOException cause ) { + super( cause ); + } +} diff --git a/oap-stdlib/src/main/java/oap/io/FileNotFoundException.java b/oap-stdlib/src/main/java/oap/io/FileNotFoundException.java new file mode 100644 index 0000000000..7e291eb57c --- /dev/null +++ b/oap-stdlib/src/main/java/oap/io/FileNotFoundException.java @@ -0,0 +1,11 @@ +package oap.io; + +public class FileNotFoundException extends IOException { + public FileNotFoundException( String message ) { + super( message ); + } + + public FileNotFoundException( java.io.FileNotFoundException cause ) { + super( cause ); + } +} diff --git a/oap-stdlib/src/main/java/oap/io/Files.java b/oap-stdlib/src/main/java/oap/io/Files.java index 89b72a732a..19b5092a7c 100644 --- a/oap-stdlib/src/main/java/oap/io/Files.java +++ b/oap-stdlib/src/main/java/oap/io/Files.java @@ -57,7 +57,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.UncheckedIOException; import java.net.MalformedURLException; import java.net.URL; import java.nio.file.CopyOption; @@ -509,7 +508,7 @@ public static void rename( Path sourcePath, Path destPath ) { ensureFile( destPath ); java.nio.file.Files.move( sourcePath, destPath, ATOMIC_MOVE, REPLACE_EXISTING ); } catch( IOException e ) { - throw new UncheckedIOException( "cannot rename " + sourcePath + " to " + destPath, e ); + throw new oap.io.IOException( "cannot rename " + sourcePath + " to " + destPath, e ); } } @@ -526,7 +525,7 @@ public static Path ensureDirectory( Path path ) { } } - public static void move( Path source, Path target, CopyOption... options ) { + public static void move( Path source, Path target, CopyOption... options ) throws oap.io.IOException { try { ensureFile( target ); java.nio.file.Files.move( source, target, options ); @@ -535,7 +534,7 @@ public static void move( Path source, Path target, CopyOption... options ) { } } - public static void setPosixPermissions( Path path, Set permissions ) { + public static void setPosixPermissions( Path path, Set permissions ) throws oap.io.IOException { try { java.nio.file.Files.setPosixFilePermissions( path, permissions ); } catch( IOException e ) { @@ -543,11 +542,11 @@ public static void setPosixPermissions( Path path, Set perm } } - public static void setPosixPermissions( Path path, PosixFilePermission... permissions ) { + public static void setPosixPermissions( Path path, PosixFilePermission... permissions ) throws oap.io.IOException { setPosixPermissions( path, Sets.of( permissions ) ); } - public static Set getPosixPermissions( Path path ) { + public static Set getPosixPermissions( Path path ) throws oap.io.IOException { try { return java.nio.file.Files.getPosixFilePermissions( path, LinkOption.NOFOLLOW_LINKS ); } catch( IOException e ) { @@ -555,7 +554,7 @@ public static Set getPosixPermissions( Path path ) { } } - public static boolean isDirectoryEmpty( Path directory ) { + public static boolean isDirectoryEmpty( Path directory ) throws oap.io.IOException { try( DirectoryStream dirStream = java.nio.file.Files.newDirectoryStream( directory ) ) { return !dirStream.iterator().hasNext(); } catch( IOException e ) { @@ -563,11 +562,11 @@ public static boolean isDirectoryEmpty( Path directory ) { } } - public static void setLastModifiedTime( Path path, DateTime dateTime ) { + public static void setLastModifiedTime( Path path, DateTime dateTime ) throws oap.io.IOException { setLastModifiedTime( path, dateTime.getMillis() ); } - public static void setLastModifiedTime( Path path, long ms ) { + public static void setLastModifiedTime( Path path, long ms ) throws oap.io.IOException { try { java.nio.file.Files.setLastModifiedTime( path, FileTime.fromMillis( ms ) ); } catch( IOException e ) { @@ -654,7 +653,7 @@ public static boolean fileNotEmpty( Path path ) { return isFileNotEmpty( path ); } - public static boolean isFileNotEmpty( final Path path ) { + public static boolean isFileNotEmpty( final Path path ) throws oap.io.IOException { try( InputStream is = IoStreams.in( path ); InputStreamReader isr = new InputStreamReader( is ); BufferedReader reader = new BufferedReader( isr ) ) { @@ -670,7 +669,7 @@ public static boolean exists( Path path ) { return java.nio.file.Files.exists( path ); } - public static long getLastModifiedTime( Path path ) { + public static long getLastModifiedTime( Path path ) throws oap.io.IOException { try { return java.nio.file.Files.getLastModifiedTime( path ).toMillis(); } catch( IOException e ) { @@ -678,7 +677,7 @@ public static long getLastModifiedTime( Path path ) { } } - public static boolean createFile( Path file ) { + public static boolean createFile( Path file ) throws oap.io.IOException { try { java.nio.file.Files.createFile( file ); return true; diff --git a/oap-stdlib/src/main/java/oap/io/IOException.java b/oap-stdlib/src/main/java/oap/io/IOException.java new file mode 100644 index 0000000000..1a1dd5f4cf --- /dev/null +++ b/oap-stdlib/src/main/java/oap/io/IOException.java @@ -0,0 +1,18 @@ +package oap.io; + +public class IOException extends RuntimeException { + public IOException() { + } + + public IOException( String message ) { + super( message ); + } + + public IOException( String message, Throwable cause ) { + super( message, cause ); + } + + public IOException( Throwable cause ) { + super( cause ); + } +} diff --git a/oap-stdlib/src/main/java/oap/io/IoStreams.java b/oap-stdlib/src/main/java/oap/io/IoStreams.java index 4345c0e33c..adb1bf29db 100644 --- a/oap-stdlib/src/main/java/oap/io/IoStreams.java +++ b/oap-stdlib/src/main/java/oap/io/IoStreams.java @@ -23,16 +23,18 @@ */ package oap.io; +import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; import io.airlift.compress.bzip2.BZip2HadoopStreams; import io.airlift.compress.gzip.JdkGzipHadoopStreams; import io.airlift.compress.lz4.Lz4HadoopStreams; import io.airlift.compress.zstd.ZstdHadoopStreams; -import lombok.SneakyThrows; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import oap.io.ProgressInputStream.Progress; import oap.util.Stream; import oap.util.Strings; +import oap.util.Throwables; import oap.util.function.Try; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -47,7 +49,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.UncheckedIOException; import java.net.URL; import java.net.URLConnection; import java.nio.file.Path; @@ -57,7 +58,6 @@ import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; -import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; import static oap.io.ProgressInputStream.progress; import static oap.util.function.Functions.empty.consume; @@ -70,42 +70,44 @@ public class IoStreams { public static final ZstdHadoopStreams ZSTD_HADOOP_STREAMS = new ZstdHadoopStreams(); public static final JdkGzipHadoopStreams GZIP_HADOOP_STREAMS = new JdkGzipHadoopStreams(); - public static Stream lines( URL url ) { + public static Stream lines( URL url ) throws oap.io.IOException { return lines( url, Encoding.from( url ), consume() ); } - public static Stream lines( URL url, Consumer progress ) { + public static Stream lines( URL url, Consumer progress ) throws oap.io.IOException { return lines( url, Encoding.from( url ), progress ); } - @SneakyThrows - public static Stream lines( URL url, Encoding encoding, Consumer progress ) { - log.trace( "loading {}...", url ); - URLConnection connection = url.openConnection(); - InputStream stream = connection.getInputStream(); - return lines( stream, encoding, progress( connection.getContentLengthLong(), progress ) ) - .onClose( Try.run( stream::close ) ); + public static Stream lines( URL url, Encoding encoding, Consumer progress ) throws oap.io.IOException { + try { + log.trace( "loading {}...", url ); + URLConnection connection = url.openConnection(); + InputStream stream = connection.getInputStream(); + return lines( stream, encoding, progress( connection.getContentLengthLong(), progress ) ).onClose( Try.run( stream::close ) ); + } catch( IOException e ) { + throw Throwables.propagate( e ); + } } - public static Stream lines( Path path ) { + public static Stream lines( Path path ) throws oap.io.IOException { return lines( path, Encoding.from( path ), consume() ); } - public static Stream lines( Path path, Encoding encoding ) { - return lines( path, encoding, p -> {} ); + public static Stream lines( Path path, Encoding encoding ) throws oap.io.IOException { + return lines( path, encoding, _ -> {} ); } - public static Stream lines( Path path, Encoding encoding, Consumer progress ) { + public static Stream lines( Path path, Encoding encoding, Consumer progress ) throws oap.io.IOException { InputStream stream = in( path, Encoding.PLAIN ); return lines( stream, encoding, progress( path.toFile().length(), progress ) ) .onClose( Try.run( stream::close ) ); } - private static Stream lines( InputStream stream, Encoding encoding, Progress progress ) { + private static Stream lines( InputStream stream, Encoding encoding, Progress progress ) throws oap.io.IOException { return lines( in( new ProgressInputStream( stream, progress ), encoding ) ); } - public static Stream lines( InputStream stream ) { + public static Stream lines( InputStream stream ) throws oap.io.IOException { return lines( stream, false ); } @@ -119,7 +121,7 @@ public static Stream lines( InputStream stream, boolean autoClose ) { return Stream.of( ustream ); } - public static void write( Path path, Encoding encoding, java.util.stream.Stream lines ) throws UncheckedIOException { + public static void write( Path path, Encoding encoding, java.util.stream.Stream lines ) throws oap.io.IOException { Files.ensureFile( path ); try( OutputStream out = out( path, encoding, DEFAULT_BUFFER, false, false ) ) { @@ -128,204 +130,233 @@ public static void write( Path path, Encoding encoding, java.util.stream.Stream< out.write( line.getBytes() ); out.write( '\n' ); } catch( IOException e ) { - throw new UncheckedIOException( e ); + throw Throwables.propagate( e ); } } ); } catch( IOException e ) { - throw new UncheckedIOException( e ); + throw Throwables.propagate( e ); } } - public static void write( Path path, Encoding encoding, String value ) throws UncheckedIOException { + public static void write( Path path, Encoding encoding, String value ) throws oap.io.IOException { write( path, encoding, value, false ); } - public static void write( Path path, Encoding encoding, InputStream in ) throws UncheckedIOException { + public static void write( Path path, Encoding encoding, InputStream in ) throws oap.io.IOException { write( path, encoding, in, Progress.EMPTY ); } - public static void write( Path path, Encoding encoding, InputStream in, Progress progress ) throws UncheckedIOException { + public static void write( Path path, Encoding encoding, InputStream in, Progress progress ) throws oap.io.IOException { write( path, encoding, in, false, false, progress ); } - public static void write( Path path, Encoding encoding, InputStream in, Progress progress, boolean append ) throws UncheckedIOException { + public static void write( Path path, Encoding encoding, InputStream in, Progress progress, boolean append ) throws oap.io.IOException { write( path, encoding, in, append, false, progress ); } - public static void write( Path path, Encoding encoding, String value, boolean append ) throws UncheckedIOException { + public static void write( Path path, Encoding encoding, String value, boolean append ) throws oap.io.IOException { write( path, encoding, new ByteArrayInputStream( Strings.toByteArray( value ) ), append, false, Progress.EMPTY ); - } - public static void write( Path path, Encoding encoding, InputStream in, boolean append, boolean safe, Progress progress ) throws UncheckedIOException { + public static void write( Path path, Encoding encoding, InputStream in, boolean append, boolean safe, Progress progress ) throws oap.io.IOException { Files.ensureFile( path ); try( OutputStream out = out( path, encoding, DEFAULT_BUFFER, append, safe ) ) { ByteStreams.copy( new ProgressInputStream( in, progress ), out ); } catch( IOException e ) { - throw new UncheckedIOException( e ); + throw Throwables.propagate( e ); } } - public static void write( Path path, Encoding encoding, InputStream in, boolean append, boolean safe ) { + public static void write( Path path, Encoding encoding, InputStream in, boolean append, boolean safe ) throws oap.io.IOException { Files.ensureFile( path ); try( OutputStream out = out( path, encoding, DEFAULT_BUFFER, append, safe ) ) { ByteStreams.copy( in, out ); } catch( IOException e ) { - throw new UncheckedIOException( e ); + throw Throwables.propagate( e ); } } - public static FixedLengthArrayOutputStream out( byte[] bytes ) { + public static FixedLengthArrayOutputStream out( byte[] bytes ) throws oap.io.IOException { return new FixedLengthArrayOutputStream( bytes ); } - public static OutputStream out( Path path ) { + public static OutputStream out( Path path ) throws oap.io.IOException { return out( path, Encoding.from( path ), DEFAULT_BUFFER ); } - public static OutputStream out( Path path, Encoding encoding ) { + public static OutputStream out( Path path, Encoding encoding ) throws oap.io.IOException { return out( path, encoding, DEFAULT_BUFFER ); } - public static OutputStream out( Path path, Encoding encoding, int bufferSize ) { + public static OutputStream out( Path path, Encoding encoding, int bufferSize ) throws oap.io.IOException { return out( path, encoding, bufferSize, false ); } - public static OutputStream out( Path path, Encoding encoding, boolean append ) { + public static OutputStream out( Path path, Encoding encoding, boolean append ) throws oap.io.IOException { return out( path, encoding, DEFAULT_BUFFER, append ); } - public static OutputStream out( Path path, Encoding encoding, int bufferSize, boolean append ) { + public static OutputStream out( Path path, Encoding encoding, int bufferSize, boolean append ) throws oap.io.IOException { return out( path, encoding, bufferSize, append, false ); } - @SneakyThrows - public static OutputStream out( Path path, Encoding encoding, int bufferSize, boolean append, boolean safe ) { - checkArgument( !append || encoding.appendable, encoding + " is not appendable" ); - Files.ensureFile( path ); - if( append ) Files.ensureFileEncodingValid( path ); - OutputStream outputStream = safe - ? new SafeFileOutputStream( path, append, encoding ) - : new FileOutputStream( path.toFile(), append ); - OutputStream fos = - bufferSize > 0 && encoding != Encoding.GZIP ? new BufferedOutputStream( outputStream, bufferSize ) - : outputStream; - return switch( encoding ) { - case GZIP -> GZIP_HADOOP_STREAMS.createOutputStream( fos ); - case BZIP2 -> { - OutputStream os = BZIP2_HADOOP_STREAMS.createOutputStream( fos ); - yield bufferSize > 0 ? new BufferedOutputStream( os, bufferSize ) : os; - } - case ZIP -> { - var zip = new ZipOutputStream( fos ); - zip.putNextEntry( new ZipEntry( path.getFileName().toString() ) ); - yield zip; + public static OutputStream out( Path path, Encoding encoding, int bufferSize, boolean append, boolean safe ) throws oap.io.IOException { + return out( path, new OutOptions() + .withBufferSize( bufferSize ).withEncoding( encoding ).withAppend( append ) + .withSafe( safe ) ); + } + + public static OutputStream out( Path path, OutOptions options ) throws oap.io.IOException { + try { + if( options.encoding == null ) { + options.withEncodingFrom( path ); } - case LZ4 -> LZ4_HADOOP_STREAMS.createOutputStream( fos ); - case ZSTD -> { - OutputStream os = ZSTD_HADOOP_STREAMS.createOutputStream( fos ); - yield bufferSize > 0 ? new BufferedOutputStream( os, bufferSize ) : os; + + Preconditions.checkArgument( !options.throwIfFileExists || !options.append ); + Preconditions.checkArgument( !options.append || options.encoding.appendable, options.encoding + " is not appendable" ); + + Files.ensureFile( path ); + if( options.append ) Files.ensureFileEncodingValid( path ); + + if( options.throwIfFileExists ) { + Path filePath = options.safe ? SafeFileOutputStream.getUnsafePath( path ) : path; + + if( !filePath.toFile().createNewFile() ) { + throw new FileExistsException( path.toFile() ); + } } - case PLAIN, ORC, PARQUET, AVRO -> fos; - }; + + OutputStream outputStream = options.safe + ? new SafeFileOutputStream( path, options.append, options.encoding ) + : new FileOutputStream( path.toFile(), options.append ); + + OutputStream fos = + options.bufferSize > 0 && options.encoding != Encoding.GZIP + ? new BufferedOutputStream( outputStream, options.bufferSize ) + : outputStream; + return switch( options.encoding ) { + case GZIP -> GZIP_HADOOP_STREAMS.createOutputStream( fos ); + case BZIP2 -> { + OutputStream os = BZIP2_HADOOP_STREAMS.createOutputStream( fos ); + yield options.bufferSize > 0 ? new BufferedOutputStream( os, options.bufferSize ) : os; + } + case ZIP -> { + ZipOutputStream zip = new ZipOutputStream( fos ); + zip.putNextEntry( new ZipEntry( path.getFileName().toString() ) ); + yield zip; + } + case LZ4 -> LZ4_HADOOP_STREAMS.createOutputStream( fos ); + case ZSTD -> { + OutputStream os = ZSTD_HADOOP_STREAMS.createOutputStream( fos ); + yield options.bufferSize > 0 ? new BufferedOutputStream( os, options.bufferSize ) : os; + } + case PLAIN, ORC, PARQUET, AVRO -> fos; + }; + } catch( IOException e ) { + throw Throwables.propagate( e ); + } } - public static InputStream in( Path path, Encoding encoding ) { + public static InputStream in( Path path, Encoding encoding ) throws oap.io.IOException { return in( path, encoding, DEFAULT_BUFFER ); } - public static InputStream in( URL url, Encoding encoding ) { + public static InputStream in( URL url, Encoding encoding ) throws oap.io.IOException { return in( url, encoding, DEFAULT_BUFFER ); } - public static InputStream in( Path path ) { + public static InputStream in( Path path ) throws oap.io.IOException { return in( path, DEFAULT_BUFFER ); } - public static InputStream in( URL url ) { + public static InputStream in( URL url ) throws oap.io.IOException { return in( url, DEFAULT_BUFFER ); } - public static InputStream in( Path path, int bufferSIze ) { + public static InputStream in( Path path, int bufferSIze ) throws oap.io.IOException { return in( path, Encoding.from( path ), bufferSIze ); } - public static InputStream in( URL url, int bufferSIze ) { + public static InputStream in( URL url, int bufferSIze ) throws oap.io.IOException { return in( url, Encoding.from( url ), bufferSIze ); } - public static InputStream in( Path path, Encoding encoding, int bufferSize ) { + public static InputStream in( Path path, Encoding encoding, int bufferSize ) throws oap.io.IOException { try { FileInputStream fileInputStream = new FileInputStream( path.toFile() ); - return decoded( - bufferSize > 0 ? new BufferedInputStream( fileInputStream, bufferSize ) : fileInputStream, encoding ); + return decoded( bufferSize > 0 ? new BufferedInputStream( fileInputStream, bufferSize ) : fileInputStream, encoding ); } catch( IOException e ) { - throw new UncheckedIOException( "couldn't open file " + path, e ); + throw new oap.io.IOException( "couldn't open file " + path, e ); } } - public static InputStream in( URL url, Encoding encoding, int bufferSize ) { + public static InputStream in( URL url, Encoding encoding, int bufferSize ) throws oap.io.IOException { try { - var inputStream = url.openStream(); - return decoded( - bufferSize > 0 ? new BufferedInputStream( inputStream, bufferSize ) : inputStream, encoding ); + InputStream inputStream = url.openStream(); + return decoded( bufferSize > 0 ? new BufferedInputStream( inputStream, bufferSize ) : inputStream, encoding ); } catch( IOException e ) { - throw new UncheckedIOException( "couldn't open file " + url, e ); + throw new oap.io.IOException( "couldn't open file " + url, e ); } } - @SneakyThrows - public static InputStream in( InputStream stream, Encoding encoding ) { + public static InputStream in( InputStream stream, Encoding encoding ) throws oap.io.IOException { return decoded( stream, encoding ); } - @SneakyThrows - public static String asString( InputStream stream, Encoding encoding ) { - return IOUtils.toString( decoded( stream, encoding ), UTF_8 ); + public static String asString( InputStream stream, Encoding encoding ) throws oap.io.IOException { + try { + return IOUtils.toString( decoded( stream, encoding ), UTF_8 ); + } catch( IOException e ) { + throw Throwables.propagate( e ); + } } - @SneakyThrows - private static InputStream decoded( InputStream stream, Encoding encoding ) { - switch( encoding ) { - case GZIP: - try { - return GZIP_HADOOP_STREAMS.createInputStream( stream ); - } catch( Exception e ) { - stream.close(); - } - case BZIP2: - try { - return BZIP2_HADOOP_STREAMS.createInputStream( stream ); - } catch( Exception e ) { - stream.close(); - } - case ZIP: - try { - ZipInputStream zip = new ZipInputStream( stream ); - if( zip.getNextEntry() == null ) - throw new IllegalArgumentException( "zip stream contains no entries" ); - return zip; - } catch( Exception e ) { - stream.close(); - } - case PLAIN, ORC, PARQUET, AVRO: - return stream; - case LZ4: - try { - return LZ4_HADOOP_STREAMS.createInputStream( stream ); - } catch( Exception e ) { - stream.close(); - throw e; - } - case ZSTD: - try { - return ZSTD_HADOOP_STREAMS.createInputStream( stream ); - } catch( Exception e ) { - stream.close(); - throw e; - } - default: - throw new IllegalArgumentException( "Unknown encoding " + encoding ); + private static InputStream decoded( InputStream stream, Encoding encoding ) throws oap.io.IOException { + try { + switch( encoding ) { + case GZIP: + try { + return GZIP_HADOOP_STREAMS.createInputStream( stream ); + } catch( Exception e ) { + stream.close(); + } + case BZIP2: + try { + return BZIP2_HADOOP_STREAMS.createInputStream( stream ); + } catch( Exception e ) { + stream.close(); + } + case ZIP: + try { + ZipInputStream zip = new ZipInputStream( stream ); + if( zip.getNextEntry() == null ) { + throw new IllegalArgumentException( "zip stream contains no entries" ); + } + return zip; + } catch( Exception e ) { + stream.close(); + } + case PLAIN, ORC, PARQUET, AVRO: + return stream; + case LZ4: + try { + return LZ4_HADOOP_STREAMS.createInputStream( stream ); + } catch( Exception e ) { + stream.close(); + throw e; + } + case ZSTD: + try { + return ZSTD_HADOOP_STREAMS.createInputStream( stream ); + } catch( Exception e ) { + stream.close(); + throw e; + } + default: + throw new IllegalArgumentException( "Unknown encoding " + encoding ); + } + } catch( IOException e ) { + throw Throwables.propagate( e ); } } @@ -361,7 +392,11 @@ public static Encoding from( URL url ) { } public static Encoding from( String name ) { - for( var e : values() ) if( e.compressed && StringUtils.endsWithIgnoreCase( name, e.extension ) ) return e; + for( Encoding e : values() ) { + if( e.compressed && StringUtils.endsWithIgnoreCase( name, e.extension ) ) { + return e; + } + } return PLAIN; } @@ -370,4 +405,49 @@ public Path resolve( Path path ) { return Paths.get( s.substring( 0, s.length() - from( path ).extension.length() ) + extension ); } } + + @ToString + public static class OutOptions { + protected int bufferSize = DEFAULT_BUFFER; + protected Encoding encoding; + protected boolean append = false; + protected boolean safe = false; + protected boolean throwIfFileExists = false; + + public OutOptions withBufferSize( int bufferSize ) { + this.bufferSize = bufferSize; + + return this; + } + + public OutOptions withEncoding( Encoding encoding ) { + this.encoding = encoding; + + return this; + } + + public OutOptions withEncodingFrom( Path path ) { + this.encoding = Encoding.from( path ); + + return this; + } + + public OutOptions withAppend( boolean append ) { + this.append = append; + + return this; + } + + public OutOptions withSafe( boolean safe ) { + this.safe = safe; + + return this; + } + + public OutOptions withThrowIfFileExists( boolean throwIfFileExists ) { + this.throwIfFileExists = throwIfFileExists; + + return this; + } + } } diff --git a/oap-stdlib/src/main/java/oap/io/MalformedURLException.java b/oap-stdlib/src/main/java/oap/io/MalformedURLException.java new file mode 100644 index 0000000000..c79269e89b --- /dev/null +++ b/oap-stdlib/src/main/java/oap/io/MalformedURLException.java @@ -0,0 +1,7 @@ +package oap.io; + +public class MalformedURLException extends IOException { + public MalformedURLException( java.net.MalformedURLException cause ) { + super( cause ); + } +} diff --git a/oap-stdlib/src/main/java/oap/io/Paths.java b/oap-stdlib/src/main/java/oap/io/Paths.java index ae155ae87f..be4a71d1ff 100644 --- a/oap-stdlib/src/main/java/oap/io/Paths.java +++ b/oap-stdlib/src/main/java/oap/io/Paths.java @@ -24,14 +24,18 @@ package oap.io; -import lombok.SneakyThrows; +import oap.util.Throwables; +import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Path; public class Paths { - @SneakyThrows - public static URL toUrl( Path path ) { - return path.toUri().toURL(); + public static URL toUrl( Path path ) throws oap.io.MalformedURLException { + try { + return path.toUri().toURL(); + } catch( MalformedURLException e ) { + throw Throwables.propagate( e ); + } } } diff --git a/oap-stdlib/src/main/java/oap/io/Resources.java b/oap-stdlib/src/main/java/oap/io/Resources.java index 350d79462d..fc0192becb 100644 --- a/oap-stdlib/src/main/java/oap/io/Resources.java +++ b/oap-stdlib/src/main/java/oap/io/Resources.java @@ -28,11 +28,11 @@ import oap.io.content.ContentReader; import oap.util.Lists; import oap.util.Stream; +import oap.util.Throwables; import oap.util.function.Try; import org.apache.commons.collections4.EnumerationUtils; import java.io.IOException; -import java.io.UncheckedIOException; import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; @@ -110,7 +110,7 @@ public static List readResourcesAsString( Class contextClass, String ret.add( ContentReader.read( is, ContentReader.ofString() ) ); } } catch( IOException e ) { - throw new UncheckedIOException( e ); + throw Throwables.propagate( e ); } return ret; } @@ -195,8 +195,8 @@ public static List urls( String atPackage, String... ext ) { @SneakyThrows public static List urls( Predicate filter ) { return Stream.of( ClassPath.from( Thread.currentThread() - .getContextClassLoader() ) - .getResources() ) + .getContextClassLoader() ) + .getResources() ) .filter( ri -> filter.test( ri.getResourceName() ) ) .map( ClassPath.ResourceInfo::url ) .toList(); diff --git a/oap-stdlib/src/main/java/oap/io/SafeFileOutputStream.java b/oap-stdlib/src/main/java/oap/io/SafeFileOutputStream.java index b5e073357d..cdd22c8c96 100644 --- a/oap-stdlib/src/main/java/oap/io/SafeFileOutputStream.java +++ b/oap-stdlib/src/main/java/oap/io/SafeFileOutputStream.java @@ -23,17 +23,20 @@ */ package oap.io; +import lombok.extern.slf4j.Slf4j; + import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; import java.nio.file.Paths; +@Slf4j public class SafeFileOutputStream extends FileOutputStream { private final Path path; public SafeFileOutputStream( Path path, boolean append, IoStreams.Encoding encoding ) throws IOException { - super( path + ".unsafe", append ); + super( getUnsafePath( path ).toFile(), append ); this.path = path; if( append && java.nio.file.Files.exists( path ) ) { @@ -46,17 +49,23 @@ public SafeFileOutputStream( Path path, boolean append, IoStreams.Encoding encod } } + public static Path getUnsafePath( Path path ) { + return Paths.get( path.toString() + ".unsafe" ); + } + @Override public void close() throws IOException { - // https://stackoverflow.com/questions/25910173/fileoutputstream-close-is-not-always-writing-bytes-to-file-system - getFD().sync(); - super.close(); + try { + // https://stackoverflow.com/questions/25910173/fileoutputstream-close-is-not-always-writing-bytes-to-file-system + getFD().sync(); + super.close(); + } catch( IOException e ) { + log.error( e.getMessage(), e ); + } final Path unsafePath = Paths.get( this.path + ".unsafe" ); if( !java.nio.file.Files.exists( unsafePath ) || java.nio.file.Files.size( unsafePath ) == 0 ) Files.delete( unsafePath ); else Files.rename( unsafePath, this.path ); } - - } diff --git a/oap-stdlib/src/main/java/oap/util/Throwables.java b/oap-stdlib/src/main/java/oap/util/Throwables.java index 8897891c22..928ccfa923 100644 --- a/oap-stdlib/src/main/java/oap/util/Throwables.java +++ b/oap-stdlib/src/main/java/oap/util/Throwables.java @@ -26,18 +26,24 @@ import lombok.Lombok; +import java.io.FileNotFoundException; import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.concurrent.ExecutionException; +import java.net.MalformedURLException; public final class Throwables { private Throwables() { } public static RuntimeException propagate( Throwable throwable ) { - if( throwable instanceof IOException ) throw new UncheckedIOException( ( IOException ) throwable ); - else if( throwable instanceof java.util.concurrent.ExecutionException ) throw new oap.concurrent.ExecutionException( ( ExecutionException ) throwable ); - else if( throwable instanceof java.util.concurrent.TimeoutException ) throw new oap.concurrent.TimeoutException( throwable ); + switch( throwable ) { + case FileNotFoundException fnf -> throw new oap.io.FileNotFoundException( fnf ); + case MalformedURLException murl -> throw new oap.io.MalformedURLException( murl ); + case IOException _ -> throw new oap.io.IOException( throwable ); + case java.util.concurrent.ExecutionException ee -> throw new oap.concurrent.ExecutionException( ee ); + case java.util.concurrent.TimeoutException _ -> throw new oap.concurrent.TimeoutException( throwable ); + case null, default -> { + } + } throw Lombok.sneakyThrow( throwable ); } diff --git a/oap-ws/oap-ws-sso-api/src/test/java/oap/ws/sso/JwtTokenGeneratorExtractorTest.java b/oap-ws/oap-ws-sso-api/src/test/java/oap/ws/sso/JwtTokenGeneratorExtractorTest.java index 818f2f380d..23c4919378 100644 --- a/oap-ws/oap-ws-sso-api/src/test/java/oap/ws/sso/JwtTokenGeneratorExtractorTest.java +++ b/oap-ws/oap-ws-sso-api/src/test/java/oap/ws/sso/JwtTokenGeneratorExtractorTest.java @@ -26,11 +26,11 @@ import oap.testng.Fixtures; import oap.testng.SystemTimerFixture; -import oap.util.Dates; import oap.util.Pair; import oap.ws.sso.AbstractUserTest.TestSecurityRolesProvider; import oap.ws.sso.AbstractUserTest.TestUser; import org.joda.time.DateTime; +import org.joda.time.DateTimeUtils; import org.testng.annotations.Test; import java.util.Date; @@ -52,8 +52,7 @@ public JwtTokenGeneratorExtractorTest() { @Test public void generateAndExtractToken() { - Dates.setTimeFixed( 2024, 4, 23, 14, 45, 56, 12 ); - + DateTimeUtils.setCurrentMillisFixed( DateTimeUtils.currentTimeMillis() ); Pair token = jwtTokenGenerator.generateAccessToken( new TestUser( "email@email.com", "password", Pair.of( "org1", "ADMIN" ) ) ); assertNotNull( token._1 ); diff --git a/pom.xml b/pom.xml index e7f8e92716..bd813b92fd 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ - 22.1.1 + 22.1.2 21.0.0 21.0.1 From 524b244cc84ad4fcee85e51cd6837770833ca743 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 24 Apr 2024 18:33:37 +0300 Subject: [PATCH 2/3] OAP-118 oap.message.MessageServerTest.ttl --- .../java/oap/message/MessageServerTest.java | 144 +++++++++--------- pom.xml | 2 +- 2 files changed, 76 insertions(+), 70 deletions(-) diff --git a/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java b/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java index bb67c71639..acfad262b8 100644 --- a/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java +++ b/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java @@ -54,6 +54,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.testng.Assert.assertNotNull; +@Test public class MessageServerTest extends Fixtures { private final TestDirectoryFixture testDirectoryFixture; @@ -89,7 +90,7 @@ public void rejectedException() throws IOException { int port = Ports.getFreePort( getClass() ); Path controlStatePath = testDirectoryFixture.testPath( "controlStatePath.st" ); - var listener1 = new MessageListenerJsonMock( MessageListenerMock.MESSAGE_TYPE ); + var listener1 = new MessageListenerMock( MessageListenerMock.MESSAGE_TYPE ); try( var server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); var messageHttpHandler = new MessageHttpHandler( server, "/messages", controlStatePath, List.of( listener1 ), -1 ) ) { @@ -105,11 +106,12 @@ public void rejectedException() throws IOException { messageHttpHandler.preStart(); server.start(); - client1.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ).syncMemory(); - client2.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ); + client1.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "rejectedException", ofString() ) + .syncMemory(); + client2.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "rejectedException", ofString() ); assertEventually( 50, 100, () -> { - assertThat( listener1.messages ).containsOnly( new TestMessage( 1, "123" ) ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "rejectedException" ) ); } ); } @@ -118,10 +120,10 @@ public void rejectedException() throws IOException { client.start(); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "1234", ofString() ).syncDisk().syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "rejectedException 2", ofString() ).syncDisk().syncMemory(); assertEventually( 50, 100, () -> { - assertThat( listener1.messages ).containsOnly( new TestMessage( 1, "123" ), new TestMessage( 1, "1234" ) ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "rejectedException" ), new TestMessage( 1, "rejectedException 2" ) ); assertThat( client.getReadyMessages() ).isEqualTo( 0L ); assertThat( client.getRetryMessages() ).isEqualTo( 0L ); } ); @@ -147,16 +149,17 @@ public void sendAndReceive() throws IOException { server.start(); client - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "124", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "124", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE2, ( short ) 1, "555", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceive 1", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceive 2", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceive 2", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceive 1", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE2, ( short ) 1, "sendAndReceive 3", ofString() ) .syncMemory(); assertEventually( 100, 50, () -> { - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "123" ), new TestMessage( 1, "124" ) ); - assertThat( listener2.getMessages() ).containsOnly( new TestMessage( 1, "555" ) ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "sendAndReceive 1" ), + new TestMessage( 1, "sendAndReceive 2" ) ); + assertThat( listener2.getMessages() ).containsOnly( new TestMessage( 1, "sendAndReceive 3" ) ); } ); assertThat( client.getReadyMessages() ).isEqualTo( 0L ); assertThat( client.getRetryMessages() ).isEqualTo( 0L ); @@ -182,16 +185,16 @@ public void sendAndReceiveJson() throws IOException { server.start(); client - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofJson() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "124", ofJson() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "124", ofJson() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofJson() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJson 1", ofJson() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJson 2", ofJson() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJson 2", ofJson() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJson 1", ofJson() ) .syncMemory(); assertEventually( 100, 50, () -> assertThat( listener1.messages ).containsOnly( - new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"123\"".getBytes( UTF_8 ) ) ), "123" ), - new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"124\"".getBytes( UTF_8 ) ) ), "124" ) + new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJson 1\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJson 1" ), + new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJson 2\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJson 2" ) ) ); } @@ -215,16 +218,16 @@ public void sendAndReceiveJsonOneThread() throws IOException { server.start(); client - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofJson() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "124", ofJson() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "124", ofJson() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofJson() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJsonOneThread 1", ofJson() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJsonOneThread 2", ofJson() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJsonOneThread 2", ofJson() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJsonOneThread 1", ofJson() ) .syncMemory(); assertEventually( 50, 100, () -> { assertThat( listener1.messages ).containsOnly( - new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"123\"".getBytes( UTF_8 ) ) ), "123" ), - new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"124\"".getBytes( UTF_8 ) ) ), "124" ) + new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJsonOneThread 1\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJsonOneThread 1" ), + new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJsonOneThread 2\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJsonOneThread 2" ) ); } ); } @@ -247,7 +250,7 @@ public void unknownErrorNoRetry() throws IOException { server.start(); listener1.throwUnknownError( Integer.MAX_VALUE, true ); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ).syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "unknownErrorNoRetry", ofString() ).syncMemory(); assertEventually( 100, 50, () -> { assertThat( client.getReadyMessages() ).isEqualTo( 0L ); @@ -277,12 +280,12 @@ public void unknownError() throws IOException { server.start(); listener1.throwUnknownError( 4, false ); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "unknownError", ofString() ); assertEventually( 100, 50, () -> { client.syncMemory(); assertThat( listener1.throwUnknownError ).isLessThanOrEqualTo( 0 ); - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "123" ) ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "unknownError" ) ); } ); assertThat( client.getReadyMessages() ).isEqualTo( 0L ); @@ -295,11 +298,11 @@ public void statusError() throws IOException { int port = Ports.getFreePort( getClass() ); Path controlStatePath = testDirectoryFixture.testPath( "controlStatePath.st" ); - var listener1 = new MessageListenerMock( MessageListenerMock.MESSAGE_TYPE ); + MessageListenerMock listener1 = new MessageListenerMock( MessageListenerMock.MESSAGE_TYPE ); - try( var server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); - var messageHttpHandler = new MessageHttpHandler( server, "/messages", controlStatePath, List.of( listener1 ), -1 ); - var client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { + try( NioHttpServer server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); + MessageHttpHandler messageHttpHandler = new MessageHttpHandler( server, "/messages", controlStatePath, List.of( listener1 ), -1 ); + MessageSender client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { client.retryTimeout = 100; @@ -309,7 +312,7 @@ public void statusError() throws IOException { server.start(); listener1.setStatus( 567 ); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ).syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "statusError", ofString() ).syncMemory(); assertEventually( 100, 50, () -> { assertThat( client.getRetryMessages() ).isEqualTo( 1 ); @@ -321,24 +324,24 @@ public void statusError() throws IOException { assertEventually( 10, 50, () -> { client.syncMemory(); - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "123" ) ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "statusError" ) ); } ); } } @Test public void ttl() throws IOException { - var hashTtl = 1000; + int hashTtl = 1000; int port = Ports.getFreePort( getClass() ); Path controlStatePath = testDirectoryFixture.testPath( "controlStatePath.st" ); DateTimeUtils.setCurrentMillisFixed( 100 ); - var listener1 = new MessageListenerMock( MessageListenerMock.MESSAGE_TYPE ); + MessageListenerMock listener1 = new MessageListenerMock( MessageListenerMock.MESSAGE_TYPE ); - try( var server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); - var messageHttpHandler = new MessageHttpHandler( server, "/messages", controlStatePath, List.of( listener1 ), hashTtl ); - var client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { + try( NioHttpServer server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); + MessageHttpHandler messageHttpHandler = new MessageHttpHandler( server, "/messages", controlStatePath, List.of( listener1 ), hashTtl ); + MessageSender client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { client.retryTimeout = 100; client.globalIoRetryTimeout = 100; @@ -349,32 +352,35 @@ public void ttl() throws IOException { server.start(); client - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) .syncMemory(); MessageSenderUtils.waitSendAll( client, Dates.s( 10 ), 10 ); - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "123" ) ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "ttl" ) ); DateTimeUtils.setCurrentMillisFixed( DateTimeUtils.currentTimeMillis() + hashTtl + 1 ); messageHttpHandler.updateHash(); client - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) .syncMemory(); MessageSenderUtils.waitSendAll( client, Dates.s( 10 ), 10 ); - assertThat( listener1.getMessages() ).containsExactly( - new TestMessage( 1, "123" ), - new TestMessage( 1, "123" ) - ); + + assertEventually( 100, 10, () -> { + assertThat( listener1.getMessages() ).containsExactly( + new TestMessage( 1, "ttl" ), + new TestMessage( 1, "ttl" ) + ); + } ); } } - @Test( enabled = false ) //flaky test + @Test public void persistence() throws IOException { int port = Ports.getFreePort( getClass() ); Path controlStatePath = testDirectoryFixture.testPath( "controlStatePath.st" ); @@ -396,16 +402,16 @@ public void persistence() throws IOException { server.start(); client - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) .syncMemory(); assertEventually( 100, 50, () -> { assertThat( client.getReadyMessages() ).isEqualTo( 0L ); assertThat( client.getRetryMessages() ).isEqualTo( 0L ); - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "123" ) ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "persistence" ) ); } ); } @@ -420,9 +426,9 @@ public void persistence() throws IOException { server.start(); client - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) .syncMemory(); assertEventually( 100, 50, () -> { @@ -458,8 +464,8 @@ public void clientPersistence() throws IOException { client.retryTimeout = 100; client.start(); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 2, "123", ofString() ).syncMemory(); - client.send( MessageListenerMock.MESSAGE_TYPE2, ( short ) 2, "1234", ofString() ); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 2, "clientPersistence 1", ofString() ).syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE2, ( short ) 2, "clientPersistence 2", ofString() ); assertEventually( 100, 50, () -> { assertThat( listener1.getMessages() ).isEmpty(); @@ -479,7 +485,7 @@ public void clientPersistence() throws IOException { var lockFile = persistenceDirectory .resolve( Long.toHexString( 1 ) ) .resolve( String.valueOf( Byte.toUnsignedInt( MessageListenerMock.MESSAGE_TYPE ) ) ) - .resolve( Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"123\"".getBytes( UTF_8 ) ) ) + "-2.lock" ); + .resolve( Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"clientPersistence 1\"".getBytes( UTF_8 ) ) ) + "-2.lock" ); Files.write( lockFile, "1", ofString() ); @@ -495,8 +501,8 @@ public void clientPersistence() throws IOException { assertThat( persistenceDirectory ).isEmptyDirectory(); assertEventually( 100, 50, () -> { - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 2, "123" ) ); - assertThat( listener2.getMessages() ).containsOnly( new TestMessage( 2, "1234" ) ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 2, "clientPersistence 1" ) ); + assertThat( listener2.getMessages() ).containsOnly( new TestMessage( 2, "clientPersistence 2" ) ); assertThat( client.getReadyMessages() ).isEqualTo( 0L ); assertThat( client.getRetryMessages() ).isEqualTo( 0L ); } ); @@ -526,8 +532,8 @@ public void clientPersistenceLockExpiration() throws IOException { listener1.throwUnknownError = 2; client - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ) - .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "124", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "clientPersistenceLockExpiration 1", ofString() ) + .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "clientPersistenceLockExpiration 2", ofString() ) .syncMemory(); } @@ -554,7 +560,7 @@ public void clientPersistenceLockExpiration() throws IOException { .syncMemory(); assertEventually( 50, 100, () -> { - assertThat( listener1.getMessages() ).containsExactly( new TestMessage( 1, "124" ) ); + assertThat( listener1.getMessages() ).containsExactly( new TestMessage( 1, "clientPersistenceLockExpiration 2" ) ); assertThat( client.getReadyMessages() ).isEqualTo( 0L ); assertThat( client.getRetryMessages() ).isEqualTo( 0L ); } ); @@ -581,7 +587,7 @@ public void availabilityReport() throws IOException { listener1.setStatus( 300 ); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "123", ofString() ).syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "availabilityReport", ofString() ).syncMemory(); assertEventually( 50, 100, () -> { assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( State.FAILED ); @@ -609,11 +615,11 @@ public void testKernel() { try { fixtures.fixBeforeMethod(); - kernelFixture.service( "oap-message-client", MessageSender.class ).send( ( byte ) 12, ( short ) 1, "123", ofString() ); + kernelFixture.service( "oap-message-client", MessageSender.class ).send( ( byte ) 12, ( short ) 1, "testKernel", ofString() ); assertEventually( 50, 100, () -> { assertThat( kernelFixture.service( "oap-message-test", MessageListenerMock.class ).getMessages() ) - .containsExactly( new TestMessage( 1, "123" ) ); + .containsExactly( new TestMessage( 1, "testKernel" ) ); } ); } finally { diff --git a/pom.xml b/pom.xml index bd813b92fd..28923cf2ca 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ - 22.1.2 + 22.1.3 21.0.0 21.0.1 From 2fbc081f39cf46f63d7efaf7be24c75f29122900 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Thu, 25 Apr 2024 08:34:04 +0300 Subject: [PATCH 3/3] OAP-127 oap.message.MessageServerTest.persistence --- .../oap/message/client/MessageSender.java | 14 ++ .../java/oap/message/MessageListenerMock.java | 11 +- .../java/oap/message/MessageServerTest.java | 206 ++++++++---------- .../src/test/resources/logback-test.xml | 14 ++ pom.xml | 2 +- 5 files changed, 127 insertions(+), 120 deletions(-) create mode 100644 oap-message/oap-message-test/src/test/resources/logback-test.xml diff --git a/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java b/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java index 52919d0787..3b7045d77e 100644 --- a/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java +++ b/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java @@ -69,8 +69,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Slf4j @ToString @@ -376,6 +378,10 @@ private Messages.MessageInfo onOkRespone( Messages.MessageInfo messageInfo, Clie } public void syncMemory() { + syncMemory( -1 ); + } + + public void syncMemory( long timeoutMs ) { if( getReadyMessages() + getRetryMessages() + getInProgressMessages() > 0 ) log.trace( "[{}] sync ready {} retry {} inprogress {} ...", uniqueName, getReadyMessages(), getRetryMessages(), getInProgressMessages() ); @@ -401,6 +407,14 @@ public void syncMemory() { log.trace( "[{}] message {}... done", uniqueName, mi.message.md5 ); return null; } ); + + if( timeoutMs >= 0 ) { + try { + future.get( timeoutMs, TimeUnit.MILLISECONDS ); + } catch( InterruptedException | TimeoutException | ExecutionException e ) { + log.error( e.getMessage(), e ); + } + } } if( isGlobalIoRetryTimeout( now ) ) { diff --git a/oap-message/oap-message-test/src/main/java/oap/message/MessageListenerMock.java b/oap-message/oap-message-test/src/main/java/oap/message/MessageListenerMock.java index 2525ef8a0c..befdd5c2c4 100644 --- a/oap-message/oap-message-test/src/main/java/oap/message/MessageListenerMock.java +++ b/oap-message/oap-message-test/src/main/java/oap/message/MessageListenerMock.java @@ -75,15 +75,20 @@ public short run( int version, String hostName, int size, byte[] data, String md accessCount.incrementAndGet(); if( throwUnknownError > 0 ) { + log.debug( "throwUnknownError {} noRetry {}", throwUnknownError, noRetry ); throwUnknownError -= 1; - if( noRetry ) + if( noRetry ) { + log.debug( "RuntimeException -> unknown error" ); throw new RuntimeException( "unknown error" ); - else + } else { + log.debug( "MessageProtocol.STATUS_UNKNOWN_ERROR" ); return MessageProtocol.STATUS_UNKNOWN_ERROR; + } } - if( status == MessageProtocol.STATUS_OK ) + if( status == MessageProtocol.STATUS_OK ) { messages.add( new TestMessage( version, md5, new String( data, UTF_8 ) ) ); + } return status; } diff --git a/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java b/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java index acfad262b8..20abb1ec90 100644 --- a/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java +++ b/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java @@ -33,12 +33,12 @@ import oap.message.server.MessageHttpHandler; import oap.testng.Fixtures; import oap.testng.Ports; +import oap.testng.SystemTimerFixture; import oap.testng.TestDirectoryFixture; import oap.util.Dates; import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.digest.DigestUtils; import org.joda.time.DateTimeUtils; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.IOException; @@ -48,7 +48,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static oap.io.content.ContentWriter.ofJson; import static oap.io.content.ContentWriter.ofString; -import static oap.testng.Asserts.assertEventually; import static oap.testng.Asserts.urlOfTestResource; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -60,11 +59,7 @@ public class MessageServerTest extends Fixtures { public MessageServerTest() { testDirectoryFixture = fixture( new TestDirectoryFixture() ); - } - - @BeforeMethod - public void beforeMethod() { - DateTimeUtils.setCurrentMillisSystem(); + fixture( new SystemTimerFixture() ); } @Test @@ -107,12 +102,11 @@ public void rejectedException() throws IOException { server.start(); client1.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "rejectedException", ofString() ) - .syncMemory(); - client2.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "rejectedException", ofString() ); + .syncMemory( Dates.s( 10 ) ); - assertEventually( 50, 100, () -> { - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "rejectedException" ) ); - } ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "rejectedException" ) ); + + client2.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "rejectedException", ofString() ); } try( var client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { @@ -120,13 +114,12 @@ public void rejectedException() throws IOException { client.start(); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "rejectedException 2", ofString() ).syncDisk().syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "rejectedException 2", ofString() ).syncDisk() + .syncMemory( Dates.s( 10 ) ); - assertEventually( 50, 100, () -> { - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "rejectedException" ), new TestMessage( 1, "rejectedException 2" ) ); - assertThat( client.getReadyMessages() ).isEqualTo( 0L ); - assertThat( client.getRetryMessages() ).isEqualTo( 0L ); - } ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "rejectedException" ), new TestMessage( 1, "rejectedException 2" ) ); + assertThat( client.getReadyMessages() ).isEqualTo( 0L ); + assertThat( client.getRetryMessages() ).isEqualTo( 0L ); } } } @@ -154,13 +147,12 @@ public void sendAndReceive() throws IOException { .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceive 2", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceive 1", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE2, ( short ) 1, "sendAndReceive 3", ofString() ) - .syncMemory(); + .syncMemory( Dates.s( 10 ) ); + + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "sendAndReceive 1" ), + new TestMessage( 1, "sendAndReceive 2" ) ); + assertThat( listener2.getMessages() ).containsOnly( new TestMessage( 1, "sendAndReceive 3" ) ); - assertEventually( 100, 50, () -> { - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "sendAndReceive 1" ), - new TestMessage( 1, "sendAndReceive 2" ) ); - assertThat( listener2.getMessages() ).containsOnly( new TestMessage( 1, "sendAndReceive 3" ) ); - } ); assertThat( client.getReadyMessages() ).isEqualTo( 0L ); assertThat( client.getRetryMessages() ).isEqualTo( 0L ); @@ -189,13 +181,11 @@ public void sendAndReceiveJson() throws IOException { .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJson 2", ofJson() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJson 2", ofJson() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJson 1", ofJson() ) - .syncMemory(); + .syncMemory( Dates.s( 10 ) ); - assertEventually( 100, 50, () -> - assertThat( listener1.messages ).containsOnly( - new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJson 1\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJson 1" ), - new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJson 2\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJson 2" ) - ) + assertThat( listener1.messages ).containsOnly( + new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJson 1\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJson 1" ), + new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJson 2\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJson 2" ) ); } } @@ -222,14 +212,12 @@ public void sendAndReceiveJsonOneThread() throws IOException { .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJsonOneThread 2", ofJson() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJsonOneThread 2", ofJson() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "sendAndReceiveJsonOneThread 1", ofJson() ) - .syncMemory(); - - assertEventually( 50, 100, () -> { - assertThat( listener1.messages ).containsOnly( - new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJsonOneThread 1\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJsonOneThread 1" ), - new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJsonOneThread 2\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJsonOneThread 2" ) - ); - } ); + .syncMemory( Dates.s( 10 ) ); + + assertThat( listener1.messages ).containsOnly( + new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJsonOneThread 1\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJsonOneThread 1" ), + new TestMessage( 1, Hex.encodeHexString( DigestUtils.getMd5Digest().digest( "\"sendAndReceiveJsonOneThread 2\"".getBytes( UTF_8 ) ) ), "sendAndReceiveJsonOneThread 2" ) + ); } } @@ -250,12 +238,11 @@ public void unknownErrorNoRetry() throws IOException { server.start(); listener1.throwUnknownError( Integer.MAX_VALUE, true ); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "unknownErrorNoRetry", ofString() ).syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "unknownErrorNoRetry", ofString() ) + .syncMemory( Dates.s( 10 ) ); - assertEventually( 100, 50, () -> { - assertThat( client.getReadyMessages() ).isEqualTo( 0L ); - assertThat( client.getRetryMessages() ).isEqualTo( 0L ); - } ); + assertThat( client.getReadyMessages() ).isEqualTo( 0L ); + assertThat( client.getRetryMessages() ).isEqualTo( 0L ); assertThat( listener1.getMessages() ).isEmpty(); } @@ -268,6 +255,8 @@ public void unknownError() throws IOException { var listener1 = new MessageListenerMock( MessageListenerMock.MESSAGE_TYPE ); + DateTimeUtils.setCurrentMillisFixed( 100 ); + try( var server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); var messageHttpHandler = new MessageHttpHandler( server, "/messages", controlStatePath, List.of( listener1 ), -1 ); var client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { @@ -282,11 +271,12 @@ public void unknownError() throws IOException { listener1.throwUnknownError( 4, false ); client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "unknownError", ofString() ); - assertEventually( 100, 50, () -> { - client.syncMemory(); - assertThat( listener1.throwUnknownError ).isLessThanOrEqualTo( 0 ); - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "unknownError" ) ); - } ); + for( int i = 0; i < 5; i++ ) { + client.syncMemory( Dates.s( 10 ) ); + Dates.incFixed( 100 + 1 ); + } + assertThat( listener1.throwUnknownError ).isLessThanOrEqualTo( 0 ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "unknownError" ) ); assertThat( client.getReadyMessages() ).isEqualTo( 0L ); assertThat( client.getRetryMessages() ).isEqualTo( 0L ); @@ -300,6 +290,8 @@ public void statusError() throws IOException { MessageListenerMock listener1 = new MessageListenerMock( MessageListenerMock.MESSAGE_TYPE ); + DateTimeUtils.setCurrentMillisFixed( 100 ); + try( NioHttpServer server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); MessageHttpHandler messageHttpHandler = new MessageHttpHandler( server, "/messages", controlStatePath, List.of( listener1 ), -1 ); MessageSender client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { @@ -312,20 +304,19 @@ public void statusError() throws IOException { server.start(); listener1.setStatus( 567 ); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "statusError", ofString() ).syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "statusError", ofString() ) + .syncMemory( Dates.s( 10 ) ); - assertEventually( 100, 50, () -> { - assertThat( client.getRetryMessages() ).isEqualTo( 1 ); - assertThat( listener1.getMessages() ).isEmpty(); - } ); + assertThat( client.getRetryMessages() ).isEqualTo( 1 ); + assertThat( listener1.getMessages() ).isEmpty(); listener1.setStatusOk(); - assertEventually( 10, 50, () -> { - client.syncMemory(); + Dates.incFixed( 100 + 1 ); + + client.syncMemory( Dates.s( 10 ) ); - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "statusError" ) ); - } ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "statusError" ) ); } } @@ -355,28 +346,23 @@ public void ttl() throws IOException { .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) - .syncMemory(); + .syncMemory( Dates.s( 10 ) ); - MessageSenderUtils.waitSendAll( client, Dates.s( 10 ), 10 ); assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "ttl" ) ); - DateTimeUtils.setCurrentMillisFixed( DateTimeUtils.currentTimeMillis() + hashTtl + 1 ); + Dates.incFixed( hashTtl + 1 ); messageHttpHandler.updateHash(); client .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "ttl", ofString() ) - .syncMemory(); + .syncMemory( Dates.s( 10 ) ); - MessageSenderUtils.waitSendAll( client, Dates.s( 10 ), 10 ); - - assertEventually( 100, 10, () -> { - assertThat( listener1.getMessages() ).containsExactly( - new TestMessage( 1, "ttl" ), - new TestMessage( 1, "ttl" ) - ); - } ); + assertThat( listener1.getMessages() ).containsExactly( + new TestMessage( 1, "ttl" ), + new TestMessage( 1, "ttl" ) + ); } } @@ -405,14 +391,12 @@ public void persistence() throws IOException { .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) - .syncMemory(); + .syncMemory( Dates.s( 10 ) ); - assertEventually( 100, 50, () -> { - assertThat( client.getReadyMessages() ).isEqualTo( 0L ); - assertThat( client.getRetryMessages() ).isEqualTo( 0L ); + assertThat( client.getReadyMessages() ).isEqualTo( 0L ); + assertThat( client.getRetryMessages() ).isEqualTo( 0L ); - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "persistence" ) ); - } ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 1, "persistence" ) ); } @@ -429,14 +413,12 @@ public void persistence() throws IOException { .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "persistence", ofString() ) - .syncMemory(); + .syncMemory( Dates.s( 10 ) ); - assertEventually( 100, 50, () -> { - assertThat( client.getReadyMessages() ).isEqualTo( 0L ); - assertThat( client.getRetryMessages() ).isEqualTo( 0L ); + assertThat( client.getReadyMessages() ).isEqualTo( 0L ); + assertThat( client.getRetryMessages() ).isEqualTo( 0L ); - assertThat( listener1.getMessages() ).isEmpty(); - } ); + assertThat( listener1.getMessages() ).isEmpty(); } } } @@ -464,14 +446,13 @@ public void clientPersistence() throws IOException { client.retryTimeout = 100; client.start(); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 2, "clientPersistence 1", ofString() ).syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 2, "clientPersistence 1", ofString() ) + .syncMemory( Dates.s( 10 ) ); client.send( MessageListenerMock.MESSAGE_TYPE2, ( short ) 2, "clientPersistence 2", ofString() ); - assertEventually( 100, 50, () -> { - assertThat( listener1.getMessages() ).isEmpty(); - assertThat( client.getReadyMessages() ).isEqualTo( 1L ); - assertThat( client.getRetryMessages() ).isEqualTo( 1L ); - } ); + assertThat( listener1.getMessages() ).isEmpty(); + assertThat( client.getReadyMessages() ).isEqualTo( 1L ); + assertThat( client.getRetryMessages() ).isEqualTo( 1L ); } assertThat( persistenceDirectory ).isNotEmptyDirectory(); @@ -496,16 +477,14 @@ public void clientPersistence() throws IOException { assertThat( listener1.getMessages() ).isEmpty(); client.syncDisk(); - client.syncMemory(); + client.syncMemory( Dates.s( 10 ) ); assertThat( persistenceDirectory ).isEmptyDirectory(); - assertEventually( 100, 50, () -> { - assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 2, "clientPersistence 1" ) ); - assertThat( listener2.getMessages() ).containsOnly( new TestMessage( 2, "clientPersistence 2" ) ); - assertThat( client.getReadyMessages() ).isEqualTo( 0L ); - assertThat( client.getRetryMessages() ).isEqualTo( 0L ); - } ); + assertThat( listener1.getMessages() ).containsOnly( new TestMessage( 2, "clientPersistence 1" ) ); + assertThat( listener2.getMessages() ).containsOnly( new TestMessage( 2, "clientPersistence 2" ) ); + assertThat( client.getReadyMessages() ).isEqualTo( 0L ); + assertThat( client.getRetryMessages() ).isEqualTo( 0L ); } } } @@ -534,7 +513,7 @@ public void clientPersistenceLockExpiration() throws IOException { client .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "clientPersistenceLockExpiration 1", ofString() ) .send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "clientPersistenceLockExpiration 2", ofString() ) - .syncMemory(); + .syncMemory( Dates.s( 10 ) ); } assertThat( Files.wildcard( msgDirectory, "**/*.bin" ) ).hasSize( 2 ); @@ -557,13 +536,11 @@ public void clientPersistenceLockExpiration() throws IOException { client .syncDisk() - .syncMemory(); + .syncMemory( Dates.s( 10 ) ); - assertEventually( 50, 100, () -> { - assertThat( listener1.getMessages() ).containsExactly( new TestMessage( 1, "clientPersistenceLockExpiration 2" ) ); - assertThat( client.getReadyMessages() ).isEqualTo( 0L ); - assertThat( client.getRetryMessages() ).isEqualTo( 0L ); - } ); + assertThat( listener1.getMessages() ).containsExactly( new TestMessage( 1, "clientPersistenceLockExpiration 2" ) ); + assertThat( client.getReadyMessages() ).isEqualTo( 0L ); + assertThat( client.getRetryMessages() ).isEqualTo( 0L ); } } } @@ -587,21 +564,18 @@ public void availabilityReport() throws IOException { listener1.setStatus( 300 ); - client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "availabilityReport", ofString() ).syncMemory(); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "availabilityReport", ofString() ) + .syncMemory( Dates.s( 10 ) ); - assertEventually( 50, 100, () -> { - assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( State.FAILED ); - assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE2 ).state ).isEqualTo( State.OPERATIONAL ); - } ); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( State.FAILED ); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE2 ).state ).isEqualTo( State.OPERATIONAL ); listener1.setStatus( MessageProtocol.STATUS_OK ); - assertEventually( 50, 100, () -> { - client.syncMemory(); + client.syncMemory( Dates.s( 10 ) ); - assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( State.OPERATIONAL ); - assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE2 ).state ).isEqualTo( State.OPERATIONAL ); - } ); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( State.OPERATIONAL ); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE2 ).state ).isEqualTo( State.OPERATIONAL ); } } @@ -615,12 +589,12 @@ public void testKernel() { try { fixtures.fixBeforeMethod(); - kernelFixture.service( "oap-message-client", MessageSender.class ).send( ( byte ) 12, ( short ) 1, "testKernel", ofString() ); + kernelFixture.service( "oap-message-client", MessageSender.class ) + .send( ( byte ) 12, ( short ) 1, "testKernel", ofString() ) + .syncMemory( Dates.s( 10 ) ); - assertEventually( 50, 100, () -> { - assertThat( kernelFixture.service( "oap-message-test", MessageListenerMock.class ).getMessages() ) - .containsExactly( new TestMessage( 1, "testKernel" ) ); - } ); + assertThat( kernelFixture.service( "oap-message-test", MessageListenerMock.class ).getMessages() ) + .containsExactly( new TestMessage( 1, "testKernel" ) ); } finally { fixtures.fixAfterMethod(); diff --git a/oap-message/oap-message-test/src/test/resources/logback-test.xml b/oap-message/oap-message-test/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..be114813d7 --- /dev/null +++ b/oap-message/oap-message-test/src/test/resources/logback-test.xml @@ -0,0 +1,14 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 28923cf2ca..b0c1b34b2c 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ - 22.1.3 + 22.1.4 21.0.0 21.0.1