Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TLS stability issues with V2 protocol that caused data corruption #4404

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
class BookieNettyServer {

private static final Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class);
public static final String CONSOLIDATION_HANDLER_NAME = "consolidation";

final int maxFrameSize;
final ServerConfiguration conf;
Expand Down Expand Up @@ -344,7 +345,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));

pipeline.addLast("bytebufList", ByteBufList.ENCODER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,10 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
writeAndFlush(c, response.build());
} else {
LOG.info("Starting TLS handshake with client on channel {}", c);
// there is no need to execute in a different thread as this operation is light
SslHandler sslHandler = shFactory.newTLSHandler();
c.pipeline().addFirst("tls", sslHandler);
c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, "tls", sslHandler);

response.setStatus(BookkeeperProtocol.StatusCode.EOK);
BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
private static final AtomicLong txnIdGenerator = new AtomicLong(0);
static final String CONSOLIDATION_HANDLER_NAME = "consolidation";

final BookieId bookieId;
final BookieAddressResolver bookieAddressResolver;
Expand Down Expand Up @@ -595,7 +596,7 @@ protected ChannelFuture connect() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
Expand Down Expand Up @@ -1573,8 +1574,10 @@ void initTLSHandshake() {
} else {
throw new RuntimeException("Unexpected socket address type");
}
LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort());
SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
channel.pipeline()
.addAfter(CONSOLIDATION_HANDLER_NAME, parentObj.shFactory.getHandlerName(), handler);
handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception {
}

protected ClientConfiguration newClientConfiguration() {
return new ClientConfiguration(baseConf);
return new ClientConfiguration(baseClientConf);
}

protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception {
*/
@Test
public void testConnectToLocalTLSClusterTLSClient() throws Exception {
// skip test
if (useV2Protocol) {
return;
}

restartBookies(c -> {
c.setDisableServerSocketBind(true);
c.setEnableLocalTransport(true);
Expand Down Expand Up @@ -622,10 +617,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio
*/
@Test
public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception {
if (useV2Protocol) {
return;
}

restartBookies(c -> {
c.setBookieAuthProviderFactoryClass(
AllowOnlyClientsWithX509Certificates.class.getName());
Expand Down Expand Up @@ -756,6 +747,10 @@ public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception
testClient(clientConf, numBookies);
fail("Shouldn't be able to connect");
} catch (BKException.BKUnauthorizedAccessException authFailed) {
} catch (BKException.BKNotEnoughBookiesException notEnoughBookiesException) {
if (!useV2Protocol) {
fail("Unexpected exception occurred.");
}
}

assertFalse(secureBookieSideChannel);
Expand Down
Loading