Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable #23634

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
c18aa62
[fix][broker]If there is a deadlock in the service, the probe should …
yyj8 Nov 23, 2024
90ff720
[fix][broker]If there is a deadlock in the service, the probe should …
yyj8 Nov 25, 2024
2b18156
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Nov 26, 2024
70325c0
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Nov 26, 2024
c230aa5
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Nov 26, 2024
2c8819b
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Nov 27, 2024
f17da50
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Nov 27, 2024
dc53040
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Dec 4, 2024
532e69f
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Dec 4, 2024
e1a1dd5
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Dec 4, 2024
69aba3e
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Dec 8, 2024
5970dcc
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Dec 22, 2024
830c01a
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Dec 22, 2024
52ab050
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Dec 24, 2024
5b0c2ec
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Jan 3, 2025
b1eaedd
[improvement][broker] If there is a deadlock in the service, the prob…
yyj8 Jan 3, 2025
1a2f6aa
Fix checkstyle
lhotari Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@
package org.apache.pulsar.common.configuration;

import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.time.Clock;
import java.util.Arrays;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.servlet.ServletContext;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response.Status;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.ThreadDumpUtil;

/**
* Web resource used by the VIP service to check to availability of the service instance.
Expand All @@ -38,25 +46,84 @@ public class VipStatus {
public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";

// log a full thread dump when a deadlock is detected in status check once every 10 minutes
// to prevent excessive logging
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
// Rate limit status checks to every 500ms to prevent DoS
private static final long CHECK_STATUS_INTERVAL = 500L;

private static volatile long lastCheckStatusTimestamp;
private static volatile long lastPrintThreadDumpTimestamp;
private static volatile boolean lastCheckStatusResult;

private long printThreadDumpIntervalMs;
private Clock clock;

@Context
protected ServletContext servletContext;

public VipStatus() {
this.clock = Clock.systemUTC();
this.printThreadDumpIntervalMs = LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED;
}

@VisibleForTesting
public VipStatus(ServletContext servletContext, long printThreadDumpIntervalMs) {
this.servletContext = servletContext;
this.printThreadDumpIntervalMs = printThreadDumpIntervalMs;
this.clock = Clock.systemUTC();
}

@GET
public String checkStatus() {
String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
@SuppressWarnings("unchecked")
Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
// Locking classes to avoid deadlock detection in multi-thread concurrent requests.
synchronized (VipStatus.class) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
if (clock.millis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) {
if (lastCheckStatusResult) {
return "OK";
} else {
throw new WebApplicationException(Status.SERVICE_UNAVAILABLE);
}
}
lastCheckStatusTimestamp = clock.millis();

boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;
String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
@SuppressWarnings("unchecked")
Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;

if (statusFilePath != null) {
File statusFile = new File(statusFilePath);
if (isReady && statusFile.exists() && statusFile.isFile()) {
return "OK";
if (statusFilePath != null) {
File statusFile = new File(statusFilePath);
if (isReady && statusFile.exists() && statusFile.isFile()) {
// check deadlock
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
long[] threadIds = threadBean.findDeadlockedThreads();
if (threadIds != null && threadIds.length > 0) {
ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false,
false);
String threadNames = Arrays.stream(threadInfos)
.map(threadInfo -> threadInfo.getThreadName()
+ "(tid=" + threadInfo.getThreadId() + ")")
.collect(Collectors.joining(", "));
if (clock.millis() - lastPrintThreadDumpTimestamp > printThreadDumpIntervalMs) {
String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString();
log.error("Deadlock detected, service may be unavailable, "
+ "thread stack details are as follows: {}.", diagnosticResult);
lastPrintThreadDumpTimestamp = clock.millis();
} else {
log.error("Deadlocked threads detected. {}", threadNames);
}
lastCheckStatusResult = false;
throw new WebApplicationException(Status.SERVICE_UNAVAILABLE);
} else {
lastCheckStatusResult = true;
return "OK";
}
}
}
lastCheckStatusResult = false;
log.warn("Failed to access \"status.html\". The service is not ready");
throw new WebApplicationException(Status.NOT_FOUND);
}
log.warn("Failed to access \"status.html\". The service is not ready");
throw new WebApplicationException(Status.NOT_FOUND);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.configuration;

import static org.testng.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.mockito.Mockito;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

@Slf4j
public class VipStatusTest {

public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 10000L;
// Rate limit status checks to every 500ms to prevent DoS
private static final long CHECK_STATUS_INTERVAL = 500L;

private ServletContext mockServletContext;
private VipStatus vipStatus;

@BeforeTest
public void setup() throws IOException {
String statusFilePath = "/tmp/status.html";
File file = new File(statusFilePath);
file.createNewFile();
Comment on lines +53 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use something like org.assertj.core.util.Files#newTemporaryFile for creating the temporary file.

Supplier<Boolean> isReadyProbe = () -> true;

mockServletContext = Mockito.mock(ServletContext.class);
Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH)).thenReturn(statusFilePath);
Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE)).thenReturn(isReadyProbe);

vipStatus = new VipStatus(mockServletContext, LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED);
}

@Test
public void testVipStatusCheckStatus() {
// No deadlocks
testVipStatusCheckStatusWithoutDeadlock();
// There is a deadlock
testVipStatusCheckStatusWithDeadlock();
}

@AfterTest
public void release() throws IOException {
String statusFilePath = "/tmp/status.html";
File file = new File(statusFilePath);
file.deleteOnExit();
Comment on lines +75 to +77
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't make sense

}

public void testVipStatusCheckStatusWithoutDeadlock() {
assertEquals(vipStatus.checkStatus(), "OK");
}

public void testVipStatusCheckStatusWithDeadlock() {
MockDeadlock.startDeadlock();
yyj8 marked this conversation as resolved.
Show resolved Hide resolved
boolean asExpected = true;
try {
vipStatus.checkStatus();
asExpected = false;
System.out.println("Simulated deadlock, no deadlock detected, not as expected.");
} catch (Exception wae) {
System.out.println("Simulated deadlock and detected it, as expected.");
} finally {
MockDeadlock.executorService.shutdownNow();
}

if (!asExpected) {
throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
}
}

public class MockDeadlock {
private static ExecutorService executorService = Executors.newCachedThreadPool();
private static ReentrantLock lockA = new ReentrantLock();
private static ReentrantLock lockB = new ReentrantLock();

@SneakyThrows
public static void startDeadlock() {
executorService.execute(new ThreadOne());
executorService.execute(new ThreadTwo());
Thread.sleep(CHECK_STATUS_INTERVAL);
}

private static class ThreadOne implements Runnable {
@Override
public void run() {
try {
lockA.lock();
System.out.println("ThreadOne acquired lockA");
Thread.sleep(100);
while (!lockB.tryLock(1, TimeUnit.SECONDS)) {
System.out.println("ThreadOne acquired lockB");
}
} catch (InterruptedException e) {
//e.printStackTrace();
} finally {
lockA.unlock();
}
}
}

private static class ThreadTwo implements Runnable {
@Override
public void run() {
try {
lockB.lock();
System.out.println("ThreadOne acquired lockB");
Thread.sleep(100);
while (!lockA.tryLock(1, TimeUnit.SECONDS)) {
System.out.println("ThreadOne acquired lockA");
}
} catch (InterruptedException e) {
//e.printStackTrace();
} finally {
lockB.unlock();
}
}
}
}
}
Loading