Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAP-219 oap-application: detecting timeouts when a service stops #340

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import oap.util.Maps;
import org.slf4j.Logger;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.net.URL;
Expand Down Expand Up @@ -68,8 +69,16 @@ public KernelTest() {
testDirectoryFixture = fixture( new TestDirectoryFixture() );
}


@BeforeMethod
public void beforeMethod() {
Env.set( "APPLICATION_STOP_DETECT_TIMEOUT", String.valueOf( 1 ) );
}

@AfterMethod
public void afterMethod() {
Env.set( "APPLICATION_STOP_DETECT_TIMEOUT", null );

new ArrayList<>( System.getenv().keySet() )
.stream()
.filter( k -> k.startsWith( "CONFIG." ) )
Expand Down Expand Up @@ -531,6 +540,7 @@ public void preStop() {
}

public void stop() {
Threads.sleepSafely( 2 );
str.append( "/stop" );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@

@Slf4j
@ToString( of = "name" )
/**
* APPLICATION_STOP_DETECT_TIMEOUT - default 5s
* APPLICATION_FORCE_ASYNC_AFTER_TIMEOUT - default false
*/
public class Kernel implements Closeable, AutoCloseable {
public static final String DEFAULT = Strings.DEFAULT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@
*/
package oap.application.supervision;

import io.micrometer.core.instrument.util.NamedThreadFactory;
import lombok.extern.slf4j.Slf4j;
import oap.application.KernelHelper;
import oap.concurrent.Executors;
import oap.concurrent.ThreadPoolExecutor;
import oap.util.BiStream;
import oap.util.Dates;
import oap.util.Throwables;
import org.joda.time.DateTimeUtils;

import java.io.Closeable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Slf4j
public class Supervisor {
Expand All @@ -38,20 +49,52 @@ public class Supervisor {

private boolean stopped = false;

private static void runAndDetectTimeout( String name, ShutdownConfiguration shutdownConfiguration, Runnable func ) {
if( shutdownConfiguration.timeoutMs > 0 ) {
long start = DateTimeUtils.currentTimeMillis();
Future<?> future = shutdownConfiguration.submit( func );
try {
future.get( shutdownConfiguration.timeoutMs, TimeUnit.MILLISECONDS );
} catch( InterruptedException e ) {
log.trace( e.getMessage() );
} catch( ExecutionException e ) {
throw Throwables.propagate( e );
} catch( TimeoutException e ) {
log.warn( "APP_TIMEOUT_START service {} after {}", name, Dates.durationToString( shutdownConfiguration.timeoutMs ) );

try {
if( !shutdownConfiguration.forceAsyncAfterTimeout ) {
nofateg marked this conversation as resolved.
Show resolved Hide resolved
future.get();

log.warn( "APP_TIMEOUT_END service {} done in {}", name, Dates.durationToString( DateTimeUtils.currentTimeMillis() - start ) );
} else {
log.warn( "APP_TIMEOUT_IGNORE service {}", name );
}
} catch( InterruptedException ex ) {
log.trace( e.getMessage() );
} catch( ExecutionException ex ) {
throw Throwables.propagate( e );
}
}
} else {
func.run();
}
}

public synchronized void startSupervised( String name, Object service,
List<String> preStartWith, List<String> startWith,
List<String> preStopWith, List<String> stopWith ) {
this.supervised.put( name, new StartableService( service, preStartWith, startWith, preStopWith, stopWith ) );
}

public synchronized void startThread( String name, Object instance ) {
this.wrappers.put( name, new ThreadService( name, ( Runnable ) instance, this ) );
}

// public synchronized void startScheduledThread( String name, Object instance, long delay, TimeUnit milliseconds ) {
// this.wrappers.put( name, new ThreadService( name, ( Runnable ) instance, this ) );
// }

public synchronized void startThread( String name, Object instance ) {
this.wrappers.put( name, new ThreadService( name, ( Runnable ) instance, this ) );
}

public synchronized void scheduleWithFixedDelay( String name, Runnable service, long delay, TimeUnit unit ) {
this.wrappers.put( name, new DelayScheduledService( service, delay, unit ) );
}
Expand Down Expand Up @@ -151,36 +194,47 @@ public synchronized void preStop() {

public synchronized void stop() {
if( !stopped ) {
log.debug( "stopping..." );
this.stopped = true;

BiStream.of( this.wrappers )
.reversed()
.forEach( ( name, service ) -> {
log.debug( "[{}] stopping {}...", service.type(), name );
KernelHelper.setThreadNameSuffix( name );
try {
service.stop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "[{}] stopping {}... Done.", service.type(), name );
} );
this.wrappers.clear();
try( ShutdownConfiguration shutdownConfiguration = new ShutdownConfiguration() ) {
log.debug( "stopping..." );
this.stopped = true;

BiStream.of( this.supervised )
.reversed()
.forEach( ( name, service ) -> {
log.debug( "stopping {}...", name );
KernelHelper.setThreadNameSuffix( name );
try {
service.stop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "stopping {}... Done.", name );
} );
this.supervised.clear();
BiStream.of( this.wrappers )
.reversed()
.forEach( ( name, service ) -> {
Runnable func = () -> {
log.debug( "[{}] stopping {}...", service.type(), name );
KernelHelper.setThreadNameSuffix( name );
try {
service.stop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "[{}] stopping {}... Done.", service.type(), name );
};

runAndDetectTimeout( name, shutdownConfiguration, func );
} );
this.wrappers.clear();

BiStream.of( this.supervised )
.reversed()
.forEach( ( name, service ) -> {
Runnable func = () -> {
log.debug( "stopping {}...", name );
KernelHelper.setThreadNameSuffix( name );
try {
service.stop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "stopping {}... Done.", name );
};

runAndDetectTimeout( name, shutdownConfiguration, func );
} );
this.supervised.clear();
}
}
}

Expand All @@ -189,34 +243,68 @@ public synchronized void stop( String serviceName ) {
log.debug( "stopping..." );
this.stopped = true;

BiStream.of( this.wrappers )
.filter( ( name, service ) -> name.equals( serviceName ) )
.forEach( ( name, service ) -> {
log.debug( "[{}] stopping {}...", service.type(), name );
KernelHelper.setThreadNameSuffix( name );
try {
service.preStop();
service.stop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "[{}] stopping {}... Done.", service.type(), name );
} );
this.wrappers.clear();
try( ShutdownConfiguration shutdownConfiguration = new ShutdownConfiguration() ) {
BiStream.of( this.wrappers )
.filter( ( name, _ ) -> name.equals( serviceName ) )
.forEach( ( name, service ) -> {
Runnable func = () -> {
log.debug( "[{}] stopping {}...", service.type(), name );
KernelHelper.setThreadNameSuffix( name );
try {
service.preStop();
service.stop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "[{}] stopping {}... Done.", service.type(), name );
};

BiStream.of( this.supervised )
.filter( ( name, service ) -> name.equals( serviceName ) )
.forEach( ( name, service ) -> {
log.debug( "stopping {}...", name );
KernelHelper.setThreadNameSuffix( name );
try {
service.preStop();
service.stop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "stopping {}... Done.", name );
} );
runAndDetectTimeout( name, shutdownConfiguration, func );
} );
this.wrappers.clear();

BiStream.of( this.supervised )
.filter( ( name, service ) -> name.equals( serviceName ) )
.forEach( ( name, service ) -> {
Runnable func = () -> {
log.debug( "stopping {}...", name );
KernelHelper.setThreadNameSuffix( name );
try {
service.preStop();
service.stop();
} finally {
KernelHelper.restoreThreadName();
}
log.debug( "stopping {}... Done.", name );
};

runAndDetectTimeout( name, shutdownConfiguration, func );
} );
}
}
}

public static class ShutdownConfiguration implements Closeable {
private static final Set<String> on = Set.of( "on", "1", "true", "ON", "TRUE", "yes", "YES" );
public final long timeoutMs;
public final boolean forceAsyncAfterTimeout;
public final ThreadPoolExecutor threadPoolExecutor = Executors.newFixedBlockingThreadPool( 1, new NamedThreadFactory( "stop" ) );

public ShutdownConfiguration() {
String timeoutMsStr = System.getenv( "APPLICATION_STOP_DETECT_TIMEOUT" );
this.timeoutMs = timeoutMsStr != null ? Long.parseLong( timeoutMsStr ) : Dates.s( 5 );

String forceAsyncAfterTimeoutStr = System.getenv( "APPLICATION_FORCE_ASYNC_AFTER_TIMEOUT" );
this.forceAsyncAfterTimeout = forceAsyncAfterTimeoutStr != null && on.contains( forceAsyncAfterTimeoutStr );
}

@Override
public void close() {
threadPoolExecutor.shutdown();
}

public Future<?> submit( Runnable func ) {
return threadPoolExecutor.submit( func );
}
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
</distributionManagement>

<properties>
<oap.project.version>22.5.2</oap.project.version>
<oap.project.version>22.5.3</oap.project.version>

<oap.deps.config.version>21.0.0</oap.deps.config.version>
<oap.deps.oap-teamcity.version>21.0.1</oap.deps.oap-teamcity.version>
Expand Down
Loading