Skip to content

Commit

Permalink
compress codecs -> aircompressor
Browse files Browse the repository at this point in the history
  • Loading branch information
galaxina committed Nov 2, 2023
1 parent bbe8e66 commit 224579e
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 785 deletions.
17 changes: 4 additions & 13 deletions oap-stdlib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@
<artifactId>commons-text</artifactId>
<version>${oap.deps.apache.commons.text.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>${oap.deps.apache.compress.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down Expand Up @@ -177,10 +172,11 @@
<artifactId>assertj-core</artifactId>
<version>${oap.deps.assertj.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.airlift/aircompressor -->
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>${oap.deps.lz4.version}</version>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>${oap.deps.aircompressor.version}</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
Expand Down Expand Up @@ -447,11 +443,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${oap.deps.zstd-jni.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
Expand Down
49 changes: 24 additions & 25 deletions oap-stdlib/src/main/java/oap/io/IoStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@
package oap.io;

import com.google.common.io.ByteStreams;
import io.airlift.compress.bzip2.BZip2HadoopStreams;
import io.airlift.compress.gzip.JdkGzipHadoopStreams;
import io.airlift.compress.lz4.Lz4HadoopStreams;
import io.airlift.compress.zstd.ZstdHadoopStreams;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4BlockOutputStream;
import net.jpountz.lz4.LZ4FrameInputStream;
import net.jpountz.lz4.LZ4FrameOutputStream;
import oap.compression.Compression;
import oap.io.ProgressInputStream.Progress;
import oap.util.Stream;
import oap.util.Strings;
import oap.util.function.Try;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -62,8 +59,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import static net.jpountz.lz4.LZ4FrameOutputStream.BLOCKSIZE.SIZE_64KB;
import static oap.compression.Compression.DEFAULT_BUFFER_SIZE;
import static oap.io.ProgressInputStream.progress;
import static oap.util.function.Functions.empty.consume;

Expand Down Expand Up @@ -208,20 +203,25 @@ public static OutputStream out( Path path, Encoding encoding, int bufferSize, bo
OutputStream outputStream = safe
? new SafeFileOutputStream( path, append, encoding )
: new FileOutputStream( path.toFile(), append );
OutputStream fos = bufferSize > 0 && encoding != Encoding.GZIP ? new BufferedOutputStream( outputStream, bufferSize ) : outputStream;
OutputStream fos =
bufferSize > 0 && encoding != Encoding.GZIP ? new BufferedOutputStream( outputStream, bufferSize )
: outputStream;
return switch( encoding ) {
case GZIP -> {
OutputStream gzout = Compression.gzip( fos, bufferSize > 0 ? bufferSize : DEFAULT_BUFFER_SIZE );
OutputStream gzout = new JdkGzipHadoopStreams().createOutputStream( fos );
yield bufferSize > 0 ? new BufferedOutputStream( gzout, bufferSize ) : gzout;
}
case BZIP2 -> {
OutputStream gzout = new BZip2HadoopStreams().createOutputStream( fos );
yield bufferSize > 0 ? new BufferedOutputStream( gzout, bufferSize ) : gzout;
}
case ZIP -> {
var zip = new ZipOutputStream( fos );
zip.putNextEntry( new ZipEntry( path.getFileName().toString() ) );
yield zip;
}
case LZ4_BLOCK -> new LZ4BlockOutputStream( fos );
case LZ4 -> new LZ4FrameOutputStream( fos, SIZE_64KB );
case ZSTD -> new ZstdCompressorOutputStream( fos );
case LZ4 -> new Lz4HadoopStreams().createOutputStream( fos );
case ZSTD -> new ZstdHadoopStreams().createOutputStream( fos );
case PLAIN, ORC, PARQUET, AVRO -> fos;
};
}
Expand Down Expand Up @@ -285,7 +285,13 @@ private static InputStream decoded( InputStream stream, Encoding encoding ) {
switch( encoding ) {
case GZIP:
try {
return Compression.ungzip( stream );
return new JdkGzipHadoopStreams().createInputStream( stream );
} catch( Exception e ) {
stream.close();
}
case BZIP2:
try {
return new BZip2HadoopStreams().createInputStream( stream );
} catch( Exception e ) {
stream.close();
}
Expand All @@ -300,23 +306,16 @@ private static InputStream decoded( InputStream stream, Encoding encoding ) {
}
case PLAIN, ORC, PARQUET, AVRO:
return stream;
case LZ4_BLOCK:
try {
return new LZ4BlockInputStream( stream );
} catch( Exception e ) {
stream.close();
throw e;
}
case LZ4:
try {
return new LZ4FrameInputStream( stream );
return new Lz4HadoopStreams().createInputStream( stream );
} catch( Exception e ) {
stream.close();
throw e;
}
case ZSTD:
try {
return new ZstdCompressorInputStream( stream );
return new ZstdHadoopStreams().createInputStream( stream );
} catch( Exception e ) {
stream.close();
throw e;
Expand All @@ -330,9 +329,9 @@ public enum Encoding {
PLAIN( "", false, true, true ),
ZIP( ".zip", true, false, true ),
GZIP( ".gz", true, true, true ),
BZIP2( ".bz2", true, false, true ),
ZSTD( ".zst", true, true, true ),
LZ4( ".lz4", true, true, true ),
LZ4_BLOCK( ".lz4b", true, false, true ),
ORC( ".orc", true, false, false ),
PARQUET( ".parquet", true, false, false ),
AVRO( ".avsc", true, false, false );
Expand Down
Loading

0 comments on commit 224579e

Please sign in to comment.