Skip to content

Commit

Permalink
add support for virtual threads for KinesisPut (still works with java…
Browse files Browse the repository at this point in the history
… < 19)
  • Loading branch information
michaeloffner committed Aug 13, 2024
1 parent 643d7c3 commit 764befc
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.lucee</groupId>
<artifactId>kinesis-extension</artifactId>
<version>1.0.1.2-SNAPSHOT</version>
<version>1.0.1.3-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Kinesis Extension</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.lucee.extension.aws.kinesis.AmazonKinesisClient;
import org.lucee.extension.aws.kinesis.util.CommonUtil;
Expand Down Expand Up @@ -48,7 +47,7 @@ public static Struct call(PageContext pc, Collection collData, String partitionK

if (executor == null) {
int maxThreads = CFMLEngineFactory.getInstance().getCastUtil().toIntValue(CommonUtil.getSystemPropOrEnvVar("lucee.kinesis.maxThreads", null), 10);
executor = Executors.newFixedThreadPool(maxThreads > 0 ? (int) maxThreads : 10);
executor = CommonUtil.createExecutorService(maxThreads > 0 ? (int) maxThreads : 10, pc.getConfig().getLog("application"));
}
executor.execute(new Executable(CFMLEngineFactory.getInstance(), pc, listener, pc.getConfig().getLog("application"), collData, partitionKey, streamName, accessKeyId,
secretAccessKey, host, location, timeout));
Expand Down Expand Up @@ -253,4 +252,5 @@ public Struct call(boolean returnData) throws PageException {
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package org.lucee.extension.aws.kinesis.util;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import lucee.commons.io.log.Log;
import lucee.loader.engine.CFMLEngine;
import lucee.loader.engine.CFMLEngineFactory;
import lucee.loader.util.Util;
Expand All @@ -12,6 +19,7 @@

public class CommonUtil {
private static BIF bif;
private static Integer javaMajorNumber;

public static String getSystemPropOrEnvVar(String name, String defaultValue) {
// env
Expand Down Expand Up @@ -114,4 +122,44 @@ public static PageException toPageException(Exception e) {

return CFMLEngineFactory.getInstance().getCastUtil().toPageException(e);
}

public static ExecutorService createExecutorService(int maxThreads, Log log) {
// virtual threads
if (javaMajorNumber() >= 19) {
try {
MethodHandles.Lookup lookup = MethodHandles.lookup();
MethodType methodType = MethodType.methodType(ExecutorService.class);
MethodHandle methodHandle = lookup.findStatic(Executors.class, "newVirtualThreadPerTaskExecutor", methodType);
ExecutorService es = (ExecutorService) methodHandle.invoke();
if (log != null) log.log(Log.LEVEL_INFO, "Kinesis", "use virtual threads for threading");
return es;
}
catch (Throwable t) {
if (log != null) log.log(Log.LEVEL_ERROR, "Kinesis", t);
// in case of an exception, we simply ignore it and fall back to regular threads
if (t instanceof ThreadDeath) throw (ThreadDeath) t;
}

}
// regulat threads
ExecutorService es = Executors.newFixedThreadPool(maxThreads);
if (log != null) log.log(Log.LEVEL_INFO, "Kinesis", "use regular threads for threading");
return es;
}

public static int javaMajorNumber() {
if (javaMajorNumber == null) {
String version = System.getProperty("java.version");
int index = version.indexOf('.');
if (index == -1) return javaMajorNumber = 0;
version = version.substring(0, index);
try {
return javaMajorNumber = Integer.parseInt(version);
}
catch (NumberFormatException nfe) {
return javaMajorNumber = 0;
}
}
return javaMajorNumber;
}
}

0 comments on commit 764befc

Please sign in to comment.