Skip to content

Commit

Permalink
OAP-209 Optional ability to change processor affinity for worker thre…
Browse files Browse the repository at this point in the history
…ads in the undertow
  • Loading branch information
nofateg authored Feb 5, 2024
1 parent 8edfff4 commit a318874
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 125 deletions.
35 changes: 35 additions & 0 deletions oap-highload/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>oap</groupId>
<artifactId>oap</artifactId>
<version>${oap.project.version}</version>
</parent>

<artifactId>oap-highload</artifactId>

<dependencies>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>affinity</artifactId>
<version>${oap.deps.affinity.version}</version>
</dependency>

<dependency>
<groupId>oap</groupId>
<artifactId>oap-stdlib-test</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${oap.deps.lombok.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
72 changes: 72 additions & 0 deletions oap-highload/src/main/java/oap/highload/Affinity.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package oap.highload;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class Affinity {
@Getter
private final int[] cpus;
private final AtomicInteger position = new AtomicInteger();

public Affinity( String cpu ) {
log.info( "cpu {}", cpu );

if( cpu.trim().equals( "*" ) ) {
cpus = new int[0];
} else {
String[] split = cpu.split( "," );

ArrayList<Integer> cpus = new ArrayList<>();

for( var n : split ) {
String nTrimmed = n.trim();
if( nTrimmed.endsWith( "+" ) ) {
for( int i = Integer.parseInt( nTrimmed.substring( 0, nTrimmed.length() - 1 ) ); i < Runtime.getRuntime().availableProcessors(); i++ ) {
cpus.add( i );
}
} else {
String[] range = n.split( "-" );

if( range.length > 1 ) {
int start = Integer.parseInt( range[0].trim() );
int end = Integer.parseInt( range[1].trim() );

for( int i = start; i <= end; i++ ) {
cpus.add( i );
}

} else {
cpus.add( Integer.parseInt( nTrimmed ) );
}
}
}

this.cpus = cpus.stream().mapToInt( i -> i ).toArray();
}
}

public static Affinity any() {
return new Affinity( "*" );
}

public void set() {
if( isEnabled() ) {
int cpuIndex = position.getAndUpdate( index -> index >= cpus.length ? 0 : index + 1 );
int cpu = cpus[cpuIndex];
log.trace( "affinity -> {}", cpu );
net.openhft.affinity.Affinity.setAffinity( cpu );
}
}

public boolean isEnabled() {
return cpus.length > 0;
}

public int size() {
return cpus.length;
}
}
20 changes: 20 additions & 0 deletions oap-highload/src/test/java/oap/highload/AffinityTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package oap.highload;

import org.testng.annotations.Test;

import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

public class AffinityTest {
@Test
public void testCpuParse() {
assertThat( new Affinity( "1" ).getCpus() ).isEqualTo( new int[] { 1 } );
assertThat( new Affinity( "3+" ).getCpus() ).isEqualTo( IntStream.range( 3, Runtime.getRuntime().availableProcessors() ).toArray() );
assertThat( new Affinity( "1, 3 ,7 " ).getCpus() ).isEqualTo( new int[] { 1, 3, 7 } );
assertThat( new Affinity( "1-3, 8" ).getCpus() ).isEqualTo( new int[] { 1, 2, 3, 8 } );
assertThat( new Affinity( "1-3, 8" ).isEnabled() ).isTrue();
assertThat( new Affinity( "*" ).getCpus() ).isEqualTo( new int[0] );
assertThat( new Affinity( "*" ).isEnabled() ).isFalse();
}
}
7 changes: 6 additions & 1 deletion oap-pnio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
<dependency>
<groupId>oap</groupId>
<artifactId>oap-stdlib</artifactId>
<version>${project.parent.version}</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>oap</groupId>
<artifactId>oap-highload</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
Expand Down
89 changes: 58 additions & 31 deletions oap-pnio/src/main/java/oap/http/pnio/PnioHttpHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import oap.LogConsolidated;
import oap.highload.Affinity;
import oap.http.Http;
import oap.http.server.nio.HttpServerExchange;
import oap.util.Dates;
import org.slf4j.event.Level;
import org.xnio.XnioExecutor;
import org.xnio.XnioWorker;

import java.io.Closeable;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
Expand All @@ -36,25 +42,14 @@ public class PnioHttpHandler<WorkflowState> implements Closeable, AutoCloseable
public final int requestSize;
public final int responseSize;
public final int threads;
private RequestWorkflow<WorkflowState> workflow;
public final double queueTimeoutPercent;
public final Affinity cpuAffinity;
public final Affinity ioAffinity;
private final ErrorResponse<WorkflowState> errorResponse;
private final ThreadPoolExecutor pool;
private final SynchronousQueue<PnioExchange<WorkflowState>> queue;

public final double queueTimeoutPercent;
public final int cpuAffinityFirstCpu;

private final List<RequestTaskComputeRunner<WorkflowState>> tasks = new ArrayList<>();

@Builder
public static class PnioHttpSettings {
int requestSize;
int responseSize;
double queueTimeoutPercent;
int cpuThreads;
boolean cpuQueueFair;
int cpuAffinityFirstCpu;
}
private RequestWorkflow<WorkflowState> workflow;

@Deprecated
// use builder for settings
Expand All @@ -63,29 +58,39 @@ public PnioHttpHandler( int requestSize,
double queueTimeoutPercent,
int cpuThreads,
boolean cpuQueueFair,
int cpuAffinityFirstCpu,
Affinity cpuAffinity,
Affinity ioAffinity,
RequestWorkflow<WorkflowState> workflow,
ErrorResponse<WorkflowState> errorResponse ) {
this( PnioHttpSettings.builder()
.requestSize( requestSize )
.responseSize( responseSize )
.queueTimeoutPercent( queueTimeoutPercent )
.cpuThreads( cpuThreads )
.cpuQueueFair( cpuQueueFair )
.cpuAffinityFirstCpu( cpuAffinityFirstCpu )
.build(),
workflow,
errorResponse );
this( PnioHttpSettings.builder()
.requestSize( requestSize )
.responseSize( responseSize )
.queueTimeoutPercent( queueTimeoutPercent )
.cpuThreads( cpuThreads )
.cpuQueueFair( cpuQueueFair )
.cpuAffinity( cpuAffinity )
.ioAffinity( ioAffinity )
.build(),
workflow,
errorResponse );
}

public PnioHttpHandler( PnioHttpSettings settings,
RequestWorkflow<WorkflowState> workflow,
ErrorResponse<WorkflowState> errorResponse ) {
this.requestSize = settings.requestSize;
this.responseSize = settings.responseSize;
this.queueTimeoutPercent = settings.queueTimeoutPercent;

this.threads = settings.cpuThreads > 0 ? settings.cpuThreads : Runtime.getRuntime().availableProcessors();
this.cpuAffinityFirstCpu = settings.cpuAffinityFirstCpu;
if( settings.cpuThreads > 0 ) {
this.threads = settings.cpuThreads;
} else if( settings.cpuAffinity.isEnabled() ) {
this.threads = settings.cpuAffinity.size();
} else {
this.threads = Runtime.getRuntime().availableProcessors();
}
this.cpuAffinity = settings.cpuAffinity;
this.ioAffinity = settings.ioAffinity;
this.workflow = workflow;
this.errorResponse = errorResponse;

Expand All @@ -98,8 +103,7 @@ public PnioHttpHandler( PnioHttpSettings settings,
new oap.concurrent.ThreadPoolExecutor.BlockingPolicy() );

for( var i = 0; i < settings.cpuThreads; i++ ) {
RequestTaskComputeRunner<WorkflowState> requestTaskComputeRunner = new RequestTaskComputeRunner<>( queue,
cpuAffinityFirstCpu >= 0 ? cpuAffinityFirstCpu + i : -1 );
RequestTaskComputeRunner<WorkflowState> requestTaskComputeRunner = new RequestTaskComputeRunner<>( queue, cpuAffinity );
pool.submit( requestTaskComputeRunner );
tasks.add( requestTaskComputeRunner );
}
Expand All @@ -117,6 +121,18 @@ public long getPoolCompletedTaskCount() {
return pool.getCompletedTaskCount();
}

@SneakyThrows
public void init( XnioWorker xnioWorker ) {
Field workerThreadsField = xnioWorker.getClass().getDeclaredField( "workerThreads" );
workerThreadsField.setAccessible( true );
Object workerThreads = workerThreadsField.get( xnioWorker );
int length = Array.getLength( workerThreads );
for( int i = 0; i < length; i++ ) {
XnioExecutor xnioExecutor = ( XnioExecutor ) Array.get( workerThreads, i );
xnioExecutor.execute( ioAffinity::set );
}
}

public void handleRequest( HttpServerExchange exchange, long startTimeNano, long timeout, WorkflowState workflowState ) {
var requestState = new PnioExchange<>( requestSize, responseSize, workflow, workflowState, exchange, startTimeNano, timeout );

Expand Down Expand Up @@ -145,7 +161,7 @@ private void response( PnioExchange<WorkflowState> pnioExchange, WorkflowState w

errorResponse.handle( pnioExchange, workflowState );
} catch( Throwable e ) {
if ( e instanceof OutOfMemoryError ) {
if( e instanceof OutOfMemoryError ) {
log.error( "OOM error, need restarting!", e );
}
LogConsolidated.log( log, Level.ERROR, Dates.s( 5 ), e.getMessage(), e );
Expand Down Expand Up @@ -190,4 +206,15 @@ public void close() {
public interface ErrorResponse<WorkflowState> {
void handle( PnioExchange<WorkflowState> pnioExchange, WorkflowState workflowState );
}

@Builder
public static class PnioHttpSettings {
int requestSize;
int responseSize;
double queueTimeoutPercent;
int cpuThreads;
boolean cpuQueueFair;
Affinity cpuAffinity;
Affinity ioAffinity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
package oap.http.pnio;

import lombok.extern.slf4j.Slf4j;
import net.openhft.affinity.Affinity;
import oap.highload.Affinity;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
Expand All @@ -20,20 +20,19 @@
@Slf4j
class RequestTaskComputeRunner<WorkflowState> implements Runnable {
private final BlockingQueue<PnioExchange<WorkflowState>> queue;
private final int cpu;
private final Affinity affinity;
volatile boolean done = false;
private Thread thread;

RequestTaskComputeRunner( BlockingQueue<PnioExchange<WorkflowState>> queue, int cpu ) {
RequestTaskComputeRunner( BlockingQueue<PnioExchange<WorkflowState>> queue, Affinity affinity ) {
this.queue = Objects.requireNonNull( queue );
this.cpu = cpu;
this.affinity = affinity;
}

@Override
public void run() {
this.thread = Thread.currentThread();
if( cpu >= 0 )
Affinity.setAffinity( cpu );
affinity.set();
Thread.currentThread().setPriority( Thread.MAX_PRIORITY );

while( !done ) {
Expand Down
31 changes: 23 additions & 8 deletions oap-pnio/src/test/java/oap/http/pnio/PnioHttpHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@

package oap.http.pnio;

import oap.highload.Affinity;
import oap.http.Http;
import oap.http.server.nio.HttpHandler;
import oap.http.server.nio.HttpServerExchange;
import oap.http.server.nio.NioHttpServer;
import oap.testng.EnvFixture;
import oap.testng.Fixtures;
import oap.util.Dates;
import org.testng.annotations.Test;
import org.xnio.XnioWorker;

import java.io.IOException;
import java.util.function.Consumer;
Expand Down Expand Up @@ -168,20 +172,31 @@ private void runWithWorkflow( RequestWorkflow<TestState> workflow, Consumer<Inte
private void runWithWorkflow( int requestSize, int responseSize, int ioThreads, int cpuThreads, long timeout, RequestWorkflow<TestState> workflow, Consumer<Integer> cons ) throws IOException {
int port = envFixture.portFor( "pnio" );
var settings = PnioHttpHandler.PnioHttpSettings.builder()
.requestSize( requestSize )
.responseSize( responseSize )
.queueTimeoutPercent( 0.99 )
.cpuThreads( cpuThreads )
.cpuQueueFair( true )
.cpuAffinityFirstCpu( -1 )
.build();
.requestSize( requestSize )
.responseSize( responseSize )
.queueTimeoutPercent( 0.99 )
.cpuThreads( cpuThreads )
.cpuQueueFair( true )
.cpuAffinity( new Affinity( "0" ) )
.ioAffinity( new Affinity( "1+" ) )
.build();
try( PnioHttpHandler<TestState> httpHandler = new PnioHttpHandler<>( settings, workflow, this::errorResponse );
NioHttpServer httpServer = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ) ) {
httpServer.ioThreads = ioThreads;
httpServer.start();

httpServer.bind( "/test",
exchange -> httpHandler.handleRequest( exchange, System.nanoTime(), timeout, new TestState() ) );
new HttpHandler() {
@Override
public void init( XnioWorker xnioWorker ) {
httpHandler.init( xnioWorker );
}

@Override
public void handleRequest( HttpServerExchange exchange ) throws Exception {
httpHandler.handleRequest( exchange, System.nanoTime(), timeout, new TestState() );
}
} );

cons.accept( port );
}
Expand Down
Loading

0 comments on commit a318874

Please sign in to comment.