Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription: skip on setup and cluster failure when running subscription restart IT & fix some bugs in SubscriptionExecutorServiceManager #12710

Merged
merged 16 commits into from
Jun 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class IoTDBSubscriptionITConstant {

public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
public static final long AWAITILITY_AT_MOST_SECOND = 240L;
public static final long AWAITILITY_AT_MOST_SECOND = 600L;

public static final long SLEEP_NS = 1_000_000_000L;
public static final long POLL_TIMEOUT_MS = 10_000L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.subscription.it;

import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.AssumptionViolatedException;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

import java.lang.reflect.Method;

public class SkipOnSetUpFailure implements TestRule {

private final String setUpMethodName;

/**
* @param setUpMethodName Should be exactly the same as the method name decorated with @Before.
*/
public SkipOnSetUpFailure(@NonNull final String setUpMethodName) {
this.setUpMethodName = setUpMethodName;
}

@Override
public Statement apply(final Statement base, final Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
try {
base.evaluate();
} catch (final Throwable e) {
// Trace back the exception stack to determine whether the exception was thrown during the
// setUp phase.
for (final StackTraceElement stackTraceElement : e.getStackTrace()) {
if (setUpMethodName.equals(stackTraceElement.getMethodName())
&& description.getClassName().equals(stackTraceElement.getClassName())
&& isMethodAnnotationWithBefore(stackTraceElement.getMethodName())) {
e.printStackTrace();
// Skip this test.
throw new AssumptionViolatedException(
String.format(
"Skipping test due to setup failure for %s#%s",
description.getClassName(), description.getMethodName()));
}
}

// Re-throw the exception (which means the test has failed).
throw e;

// Regardless of the circumstances, the method decorated with @After will always be
// executed.
}
}

private boolean isMethodAnnotationWithBefore(final String methodName) {
try {
final Method method = description.getTestClass().getDeclaredMethod(methodName);
return method.isAnnotationPresent(org.junit.Before.class);
} catch (final Throwable ignored) {
return false;
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,30 @@

import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;

abstract class AbstractSubscriptionDualIT {

protected BaseEnv senderEnv;
protected BaseEnv receiverEnv;

@Rule public TestName testName = new TestName();

@Before
public void setUp() {
// set thread name
Thread.currentThread().setName(String.format("%s - main", testName.getMethodName()));

// set thread pools core size
SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);

MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ private void pollMessagesAndCheck(
LOGGER.info("consumer {} exiting...", consumers.get(index));
}
},
consumers.get(index).toString());
String.format("%s - %s", testName.getMethodName(), consumers.get(index).toString()));
t.start();
threads.add(t);
}
Expand All @@ -1016,6 +1016,7 @@ private void pollMessagesAndCheck(
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollInSameThread()
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
.pollInterval(
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import org.apache.iotdb.subscription.it.SkipOnSetUpFailure;

import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -63,6 +66,8 @@ public class IoTDBSubscriptionRestartIT {

private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);

@Rule public final TestRule skipOnSetUpFailure = new SkipOnSetUpFailure("setUp");

@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
Expand Down Expand Up @@ -123,6 +128,7 @@ public void testSubscriptionAfterRestartCluster() throws Exception {
TestUtils.restartCluster(EnvFactory.getEnv());
} catch (final Throwable e) {
e.printStackTrace();
// Avoid failure
return;
}

Expand All @@ -148,9 +154,10 @@ public void testSubscriptionAfterRestartCluster() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Subscription again
Expand Down Expand Up @@ -253,9 +260,10 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Shutdown DN 1 & DN 2
Expand All @@ -265,6 +273,7 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
EnvFactory.getEnv().shutdownDataNode(2);
} catch (final Throwable e) {
e.printStackTrace();
// Avoid failure
return;
}

Expand Down Expand Up @@ -314,6 +323,7 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown();
} catch (final Throwable e) {
e.printStackTrace();
// Avoid failure
return;
}

Expand All @@ -324,9 +334,10 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Check timestamps size
Expand Down Expand Up @@ -391,9 +402,10 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Subscription again
Expand Down Expand Up @@ -435,7 +447,13 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception {
thread.start();

// Shutdown leader CN
EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex());
try {
EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex());
} catch (final Throwable e) {
e.printStackTrace();
// Avoid failure
return;
}

// Insert some realtime data
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
Expand All @@ -444,9 +462,10 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception {
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
} catch (final Throwable e) {
e.printStackTrace();
fail(e.getMessage());
// Avoid failure
return;
}

// Show topics and subscriptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

final class SubscriptionExecutorServiceManager {
public final class SubscriptionExecutorServiceManager {

private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionExecutorServiceManager.class);

private static final long AWAIT_TERMINATION_TIMEOUT_MS = 10_000L;
private static final long AWAIT_TERMINATION_TIMEOUT_MS = 15_000L;

private static final String CONTROL_FLOW_EXECUTOR_NAME = "SubscriptionControlFlowExecutor";
private static final String UPSTREAM_DATA_FLOW_EXECUTOR_NAME =
Expand Down Expand Up @@ -172,9 +172,9 @@ boolean isShutdown() {
}

void setCorePoolSize(final int corePoolSize) {
if (!isShutdown()) {
if (isShutdown()) {
synchronized (this) {
if (!isShutdown()) {
if (isShutdown()) {
this.corePoolSize = corePoolSize;
return;
}
Expand Down
Loading