Skip to content

Commit

Permalink
Fix TLS stability issues with V2 protocol that caused data corruption (
Browse files Browse the repository at this point in the history
…#4404)

* Fix TLS stability issues with V2 protocol that caused data corruption
- add the TLS handler after the FlushConsolidationHandler
  - This makes TLS connections from Pulsar Broker to Bookkeeper stable
    when bookkeeperUseV2WireProtocol=true is used
- Fix test TestTLS for V2
- Fix inconsistency in client configuration in BookKeeperClusterTestCase

(cherry picked from commit 5f73147)
  • Loading branch information
lhotari authored and hezhangjian committed May 30, 2024
1 parent cc6614b commit 39a0e5a
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
class BookieNettyServer {

private static final Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class);
public static final String CONSOLIDATION_HANDLER_NAME = "consolidation";

final int maxFrameSize;
final ServerConfiguration conf;
Expand Down Expand Up @@ -344,7 +345,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));

pipeline.addLast("bytebufList", ByteBufList.ENCODER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
public class BookieRequestProcessor implements RequestProcessor {

private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
public static final String TLS_HANDLER_NAME = "tls";

/**
* The server configuration. We use this for getting the number of add and read
Expand Down Expand Up @@ -580,9 +581,15 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
writeAndFlush(c, response.build());
} else {
LOG.info("Starting TLS handshake with client on channel {}", c);
// there is no need to execute in a different thread as this operation is light
SslHandler sslHandler = shFactory.newTLSHandler();
c.pipeline().addFirst("tls", sslHandler);
if (c.pipeline().names().contains(BookieNettyServer.CONSOLIDATION_HANDLER_NAME)) {
c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, TLS_HANDLER_NAME, sslHandler);
} else {
// local transport doesn't contain FlushConsolidationHandler
c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler);
}

response.setStatus(BookkeeperProtocol.StatusCode.EOK);
BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
private static final AtomicLong txnIdGenerator = new AtomicLong(0);
static final String CONSOLIDATION_HANDLER_NAME = "consolidation";

final BookieId bookieId;
final BookieAddressResolver bookieAddressResolver;
Expand Down Expand Up @@ -595,7 +596,7 @@ protected ChannelFuture connect() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
Expand Down Expand Up @@ -1573,9 +1574,16 @@ void initTLSHandshake() {
} else {
throw new RuntimeException("Unexpected socket address type");
}
SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort());
SslHandler sslHandler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
String sslHandlerName = parentObj.shFactory.getHandlerName();
if (channel.pipeline().names().contains(CONSOLIDATION_HANDLER_NAME)) {
channel.pipeline().addAfter(CONSOLIDATION_HANDLER_NAME, sslHandlerName, sslHandler);
} else {
// local transport doesn't contain FlushConsolidationHandler
channel.pipeline().addFirst(sslHandlerName, sslHandler);
}
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
int rc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception {
}

protected ClientConfiguration newClientConfiguration() {
return new ClientConfiguration(baseConf);
return new ClientConfiguration(baseClientConf);
}

protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception {
*/
@Test
public void testConnectToLocalTLSClusterTLSClient() throws Exception {
// skip test
if (useV2Protocol) {
return;
}

restartBookies(c -> {
c.setDisableServerSocketBind(true);
c.setEnableLocalTransport(true);
Expand Down Expand Up @@ -621,10 +616,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio
*/
@Test
public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception {
if (useV2Protocol) {
return;
}

restartBookies(c -> {
c.setBookieAuthProviderFactoryClass(
AllowOnlyClientsWithX509Certificates.class.getName());
Expand Down Expand Up @@ -755,6 +746,10 @@ public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception
testClient(clientConf, numBookies);
fail("Shouldn't be able to connect");
} catch (BKException.BKUnauthorizedAccessException authFailed) {
} catch (BKException.BKNotEnoughBookiesException notEnoughBookiesException) {
if (!useV2Protocol) {
fail("Unexpected exception occurred.");
}
}

assertFalse(secureBookieSideChannel);
Expand Down

0 comments on commit 39a0e5a

Please sign in to comment.