diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java index 36676b283a02..6680d016f599 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java @@ -14,6 +14,7 @@ package org.eclipse.jetty.client; import java.util.Map; +import java.util.Objects; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -28,6 +29,7 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp private HttpClient client; private ConnectionPool.Factory factory; + private InvocationType invocationType = InvocationType.BLOCKING; protected HttpClient getHttpClient() { @@ -60,4 +62,16 @@ protected void connectFailed(Map context, Throwable failure) Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY); promise.failed(failure); } + + @Override + public InvocationType getInvocationType() + { + return invocationType; + } + + @Override + public void setInvocationType(InvocationType invocationType) + { + this.invocationType = Objects.requireNonNull(invocationType); + } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java index 79302aa1b59c..aeb72b8bd13e 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java @@ -136,13 +136,6 @@ public void onComplete(Result result) { HttpRequest request = (HttpRequest)result.getRequest(); ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding()); - if (result.getResponseFailure() != null) - { - if (LOG.isDebugEnabled()) - LOG.debug("Authentication challenge failed", result.getFailure()); - forwardFailureComplete(request, result.getRequestFailure(), response, result.getResponseFailure()); - return; - } String authenticationAttribute = getAuthenticationAttribute(); HttpConversation conversation = request.getConversation(); diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java index eae36440f068..218ffcac534e 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java @@ -18,22 +18,23 @@ import org.eclipse.jetty.client.transport.HttpDestination; import org.eclipse.jetty.io.ClientConnectionFactory; +import org.eclipse.jetty.util.thread.Invocable; /** * {@link HttpClientTransport} represents what transport implementations should provide - * in order to plug-in a different transport for {@link HttpClient}. + * in order to plug in a different transport for {@link HttpClient}. *

* While the {@link HttpClient} APIs define the HTTP semantic (request, response, headers, etc.) * how an HTTP exchange is carried over the network depends on implementations of this class. *

* The default implementation uses the HTTP protocol to carry over the network the HTTP exchange, * but the HTTP exchange may also be carried using the FCGI protocol, the HTTP/2 protocol or, - * in future, other protocols. + * in the future, other protocols. */ -public interface HttpClientTransport extends ClientConnectionFactory, HttpClient.Aware +public interface HttpClientTransport extends ClientConnectionFactory, HttpClient.Aware, Invocable { - public static final String HTTP_DESTINATION_CONTEXT_KEY = "org.eclipse.jetty.client.destination"; - public static final String HTTP_CONNECTION_PROMISE_CONTEXT_KEY = "org.eclipse.jetty.client.connection.promise"; + String HTTP_DESTINATION_CONTEXT_KEY = "org.eclipse.jetty.client.destination"; + String HTTP_CONNECTION_PROMISE_CONTEXT_KEY = "org.eclipse.jetty.client.connection.promise"; /** * Sets the {@link HttpClient} instance on this transport. @@ -45,7 +46,7 @@ public interface HttpClientTransport extends ClientConnectionFactory, HttpClient * @param client the {@link HttpClient} that uses this transport. */ @Override - public void setHttpClient(HttpClient client); + void setHttpClient(HttpClient client); /** * Creates a new Origin with the given request. @@ -53,7 +54,7 @@ public interface HttpClientTransport extends ClientConnectionFactory, HttpClient * @param request the request that triggers the creation of the Origin * @return an Origin that identifies a destination */ - public Origin newOrigin(Request request); + Origin newOrigin(Request request); /** * Creates a new, transport-specific, {@link HttpDestination} object. @@ -64,7 +65,7 @@ public interface HttpClientTransport extends ClientConnectionFactory, HttpClient * @param origin the destination origin * @return a new, transport-specific, {@link HttpDestination} object */ - public Destination newDestination(Origin origin); + Destination newDestination(Origin origin); /** * Establishes a physical connection to the given {@code address}. @@ -72,16 +73,30 @@ public interface HttpClientTransport extends ClientConnectionFactory, HttpClient * @param address the address to connect to * @param context the context information to establish the connection */ - public void connect(SocketAddress address, Map context); + void connect(SocketAddress address, Map context); /** * @return the factory for ConnectionPool instances */ - public ConnectionPool.Factory getConnectionPoolFactory(); + ConnectionPool.Factory getConnectionPoolFactory(); /** * Set the factory for ConnectionPool instances. * @param factory the factory for ConnectionPool instances */ - public void setConnectionPoolFactory(ConnectionPool.Factory factory); + void setConnectionPoolFactory(ConnectionPool.Factory factory); + + /** + * @return the {@link InvocationType} associated with this {@code HttpClientTransport}. + */ + @Override + default InvocationType getInvocationType() + { + return Invocable.super.getInvocationType(); + } + + /** + * @param invocationType the {@link InvocationType} associated with this {@code HttpClientTransport}. + */ + void setInvocationType(InvocationType invocationType); } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java index 56692b7a2003..7479c1e76fa2 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java @@ -70,9 +70,6 @@ public void onComplete(Result result) { Request request = result.getRequest(); Response response = result.getResponse(); - if (result.getResponseFailure() == null) - redirector.redirect(request, response, null); - else - redirector.fail(request, response, result.getFailure()); + redirector.redirect(request, response, null); } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java index 8e658e5d50cb..a7055e9b686e 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java @@ -188,7 +188,7 @@ interface AsyncContentListener extends ContentSourceListener @Override default void onContentSource(Response response, Content.Source contentSource) { - Runnable demandCallback = Invocable.from(Invocable.InvocationType.NON_BLOCKING, () -> onContentSource(response, contentSource)); + Runnable demandCallback = Invocable.from(Invocable.getInvocationType(contentSource), () -> onContentSource(response, contentSource)); Content.Chunk chunk = contentSource.read(); if (chunk == null) { diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpClientConnectionFactory.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpClientConnectionFactory.java index e33503fe3f49..113719426368 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpClientConnectionFactory.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpClientConnectionFactory.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Map; +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.transport.internal.HttpConnectionOverHTTP; import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.EndPoint; @@ -49,8 +50,10 @@ public void setInitializeConnections(boolean initialize) @Override public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) { + HttpClient httpClient = (HttpClient)context.get(ClientConnectionFactory.CLIENT_CONTEXT_KEY); HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, context); connection.setInitialize(isInitializeConnections()); + connection.setInvocationType(httpClient.getHttpClientTransport().getInvocationType()); return customize(connection, context); } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpClientTransportOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpClientTransportOverHTTP.java index 61add8e5791b..8d0c81d7ffaf 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpClientTransportOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpClientTransportOverHTTP.java @@ -26,11 +26,12 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @ManagedObject("The HTTP/1.1 client transport") -public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTransport +public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTransport implements Invocable { public static final Origin.Protocol HTTP11 = new Origin.Protocol(List.of("http/1.1"), false); private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransportOverHTTP.class); diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index 751b6d44b567..5641b1aaa2c4 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -63,7 +63,7 @@ * * @see HttpSender */ -public abstract class HttpReceiver +public abstract class HttpReceiver implements Invocable { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class); @@ -157,7 +157,7 @@ protected void responseBegin(HttpExchange exchange) responseState = ResponseState.BEGIN; HttpResponse response = exchange.getResponse(); HttpConversation conversation = exchange.getConversation(); - // Probe the protocol handlers + // Probe the protocol handlers. HttpClient client = getHttpDestination().getHttpClient(); ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response); Response.Listener handlerListener = null; @@ -170,7 +170,7 @@ protected void responseBegin(HttpExchange exchange) conversation.updateResponseListeners(handlerListener); if (LOG.isDebugEnabled()) - LOG.debug("Response begin {}", response); + LOG.debug("Notifying response begin for {} on {}", exchange, this); conversation.getResponseListeners().notifyBegin(response); }); } @@ -189,12 +189,12 @@ protected void responseBegin(HttpExchange exchange) protected void responseHeader(HttpExchange exchange, HttpField field) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseHeader for {} on {}", field, this); + LOG.debug("Invoking responseHeader {} for {} on {}", field, exchange, this); invoker.run(() -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing responseHeader on {}", this); + LOG.debug("Executing responseHeader for {} on {}", exchange, this); if (exchange.isResponseCompleteOrTerminated()) return; @@ -202,10 +202,10 @@ protected void responseHeader(HttpExchange exchange, HttpField field) responseState = ResponseState.HEADER; HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) - LOG.debug("Notifying header {}", field); + LOG.debug("Notifying response header {} for {} on {}", field, exchange, this); boolean process = exchange.getConversation().getResponseListeners().notifyHeader(response, field); if (LOG.isDebugEnabled()) - LOG.debug("Header {} notified, {}processing needed", field, (process ? "" : "no ")); + LOG.debug("Notified response header {}, processing {}", field, (process ? "needed" : "skipped")); if (process) { response.addHeader(field); @@ -241,12 +241,12 @@ protected void storeCookie(URI uri, HttpField field) protected void responseHeaders(HttpExchange exchange) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseHeaders on {}", this); + LOG.debug("Invoking responseHeaders for {} on {}", exchange, this); invoker.run(() -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing responseHeaders on {}", this); + LOG.debug("Executing responseHeaders for {} on {}", exchange, this); if (exchange.isResponseCompleteOrTerminated()) return; @@ -259,6 +259,8 @@ protected void responseHeaders(HttpExchange exchange) // HEAD responses may have Content-Encoding // and Content-Length, but have no content. + // This step may modify the response headers, + // so must be done before notifying the headers. ContentDecoder.Factory decoderFactory = null; if (!HttpMethod.HEAD.is(exchange.getRequest().getMethod())) { @@ -286,6 +288,8 @@ protected void responseHeaders(HttpExchange exchange) } } + if (LOG.isDebugEnabled()) + LOG.debug("Notifying response headers for {} on {}", exchange, this); ResponseListeners responseListeners = exchange.getConversation().getResponseListeners(); responseListeners.notifyHeaders(response); @@ -296,6 +300,7 @@ protected void responseHeaders(HttpExchange exchange) { if (LOG.isDebugEnabled()) LOG.debug("Interim response status {}, succeeding", response.getStatus()); + // TODO: explain it's queued. responseSuccess(exchange, this::onInterim); return; } @@ -309,12 +314,12 @@ protected void responseHeaders(HttpExchange exchange) if (decoderFactory != null) { if (LOG.isDebugEnabled()) - LOG.debug("Decoding {} response content", decoderFactory.getEncoding()); + LOG.debug("Decoding {} response content for {} on {}", decoderFactory.getEncoding(), exchange, this); contentSource = new DecodedContentSource(decoderFactory.newDecoderContentSource(rawContentSource), response); } if (LOG.isDebugEnabled()) - LOG.debug("Response content {} {}", response, contentSource); + LOG.debug("Notifying response content {} for {} on {}", contentSource, exchange, this); responseListeners.notifyContentSource(response, contentSource); }); } @@ -325,21 +330,22 @@ protected void responseHeaders(HttpExchange exchange) * This method takes care of ensuring the {@link Content.Source} passed to * {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)} * calls the demand callback. - * The call to the demand callback is serialized with other events. */ protected void responseContentAvailable(HttpExchange exchange) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseContentAvailable on {}", this); + LOG.debug("Invoking responseContentAvailable for {} on {}", exchange, this); invoker.run(() -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing responseContentAvailable on {}", this); + LOG.debug("Executing responseContentAvailable for {} on {}", exchange, this); if (exchange.isResponseCompleteOrTerminated()) return; + if (LOG.isDebugEnabled()) + LOG.debug("Notifying data available for {} on {}", exchange, this); rawContentSource.onDataAvailable(); }); } @@ -356,7 +362,7 @@ protected void responseContentAvailable(HttpExchange exchange) protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseSuccess on {}", this); + LOG.debug("Invoking responseSuccess for {} on {}", exchange, this); // Mark atomically the response as completed, with respect // to concurrency between response success and response failure. @@ -366,7 +372,7 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) Runnable successTask = () -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing responseSuccess on {}", this); + LOG.debug("Executing responseSuccess for {} on {}", exchange, this); responseState = ResponseState.IDLE; @@ -374,7 +380,7 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) - LOG.debug("Response success {}", response); + LOG.debug("Notifying response success for {} on {}", exchange, this); exchange.getConversation().getResponseListeners().notifySuccess(response); // Interim responses do not terminate the exchange. @@ -401,11 +407,11 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) */ protected void responseFailure(Throwable failure, Promise promise) { - if (LOG.isDebugEnabled()) - LOG.debug("Failing with {} on {}", failure, this); - HttpExchange exchange = getHttpExchange(); + if (LOG.isDebugEnabled()) + LOG.debug("Response failure {} for {} on {}", failure, exchange, this); + // Mark atomically the response as completed, with respect // to concurrency between response success and response failure. if (exchange != null && exchange.responseComplete(failure)) @@ -447,6 +453,12 @@ private void terminateResponse(HttpExchange exchange, Result result) } } + @Override + public InvocationType getInvocationType() + { + return Invocable.getInvocationType(contentSource); + } + /** * Resets the state of this HttpReceiver. *

@@ -484,7 +496,7 @@ private void cleanup() public void abort(HttpExchange exchange, Throwable failure, Promise promise) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking abort with {} on {}", failure, this); + LOG.debug("Invoking abort for {} on {}", exchange, this, failure); if (!exchange.isResponseCompleteOrTerminated()) throw new IllegalStateException(); @@ -502,13 +514,15 @@ public void abort(HttpExchange exchange, Throwable failure, Promise pro responseState = ResponseState.FAILURE; this.failure = failure; + if (contentSource != null) contentSource.fail(failure); + dispose(); HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) - LOG.debug("Response abort {} {} on {}", response, exchange, getHttpChannel(), failure); + LOG.debug("Notifying response failure {} for {} on {}", failure, exchange, this); exchange.getConversation().getResponseListeners().notifyFailure(response, failure); // Mark atomically the response as terminated, with @@ -602,7 +616,7 @@ private enum ResponseState FAILURE } - private class DecodedContentSource implements Content.Source + private class DecodedContentSource implements Content.Source, Invocable { private static final Logger LOG = LoggerFactory.getLogger(DecodedContentSource.class); @@ -623,6 +637,12 @@ public long getLength() return source.getLength(); } + @Override + public InvocationType getInvocationType() + { + return Invocable.getInvocationType(source); + } + @Override public Content.Chunk read() { @@ -686,11 +706,7 @@ public boolean rewind() } } - /** - * This Content.Source implementation guarantees that all {@link #read(boolean)} calls - * happening from a {@link #demand(Runnable)} callback must be serialized. - */ - private class ContentSource implements Content.Source + private class ContentSource implements Content.Source, Invocable { private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class); @@ -743,6 +759,15 @@ private void onDataAvailable() invokeDemandCallback(false); } + @Override + public InvocationType getInvocationType() + { + Runnable demandCallback = demandCallbackRef.get(); + if (demandCallback != null) + return Invocable.getInvocationType(demandCallback); + return Invocable.getInvocationType(getHttpChannel().getConnection()); + } + @Override public void demand(Runnable demandCallback) { @@ -752,8 +777,6 @@ public void demand(Runnable demandCallback) throw new IllegalArgumentException(); if (!demandCallbackRef.compareAndSet(null, demandCallback)) throw new IllegalStateException(); - // The processDemand method may call HttpReceiver.read(boolean) - // so it must be called by the invoker. invoker.run(processDemand); } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/ResponseListeners.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/ResponseListeners.java index 4c6030e59716..5e2b0acb5f97 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/ResponseListeners.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/ResponseListeners.java @@ -565,7 +565,7 @@ private void registerDemand(ContentSource contentSource) LOG.debug("Registered demand on {}; {}", contentSource, counters); } - private class ContentSource implements Content.Source + private class ContentSource implements Content.Source, Invocable { private static final Content.Chunk ALREADY_READ_CHUNK = new Content.Chunk.Empty() { @@ -653,9 +653,11 @@ private void onDemandCallback() } } - private Invocable.InvocationType getInvocationType() + @Override + public InvocationType getInvocationType() { - return Invocable.getInvocationType(demandCallbackRef.get()); + Runnable demandCallback = demandCallbackRef.get(); + return demandCallback == null ? InvocationType.NON_BLOCKING : Invocable.getInvocationType(demandCallback); } @Override diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpConnectionOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpConnectionOverHTTP.java index 018fe81df0cc..fa5805423a63 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpConnectionOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpConnectionOverHTTP.java @@ -46,15 +46,18 @@ import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Attachable; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Sweeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HttpConnectionOverHTTP extends AbstractConnection implements IConnection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable, Attachable +public class HttpConnectionOverHTTP extends AbstractConnection implements IConnection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable, Attachable, Invocable { private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP.class); + private final Callback fillableCallback = new FillableCallback(); private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicInteger sweeps = new AtomicInteger(); private final Promise promise; @@ -64,6 +67,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne private final LongAdder bytesOut = new LongAdder(); private long idleTimeout; private boolean initialize; + private InvocationType invocationType = InvocationType.BLOCKING; public HttpConnectionOverHTTP(EndPoint endPoint, Map context) { @@ -184,6 +188,23 @@ public void setInitialize(boolean initialize) this.initialize = initialize; } + @Override + public InvocationType getInvocationType() + { + return invocationType; + } + + public void setInvocationType(InvocationType invocationType) + { + this.invocationType = invocationType; + } + + @Override + public void fillInterested() + { + fillInterested(fillableCallback); + } + @Override public void onOpen() { @@ -432,4 +453,25 @@ public String toString() return HttpConnectionOverHTTP.this.toString(); } } + + private class FillableCallback implements Callback + { + @Override + public void succeeded() + { + onFillable(); + } + + @Override + public void failed(Throwable x) + { + onFillInterestedFailed(x); + } + + @Override + public InvocationType getInvocationType() + { + return HttpConnectionOverHTTP.this.getInvocationType(); + } + } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java index 29ea530759e8..575a1d2d1f45 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP.class); + private final Callback demandContentCallback = new DemandContentCallback(); private final Runnable receiveNext = this::receiveNext; private final LongAdder inMessages = new LongAdder(); private final HttpParser parser; @@ -72,18 +74,18 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel) void receive() { - if (!hasContent()) - { - boolean setFillInterest = parseAndFill(true); - if (!hasContent() && setFillInterest) - fillInterested(); - } - else + if (hasContent()) { HttpExchange exchange = getHttpExchange(); if (exchange != null) responseContentAvailable(exchange); } + else + { + boolean setFillInterest = parseAndFill(true); + if (!hasContent() && setFillInterest) + getHttpConnection().fillInterested(); + } } @Override @@ -359,12 +361,13 @@ private boolean parse(boolean notifyContentAvailable) if (LOG.isDebugEnabled()) LOG.debug("Discarding unexpected content after response {}: {} in {}", status, BufferUtil.toDetailString(byteBuffer), this); BufferUtil.clear(byteBuffer); - return false; + return true; } } - // Continue to read from the network. - return false; + // Reading the next response will + // be performed by receivedNext(). + return true; } default -> throw new IllegalStateException("Invalid state " + state); } @@ -386,7 +389,7 @@ protected void fillInterested() { if (LOG.isDebugEnabled()) LOG.debug("Registering as fill interested in {}", this); - getHttpConnection().fillInterested(); + getHttpConnection().fillInterested(demandContentCallback); } private void shutdown() @@ -527,7 +530,7 @@ private void receiveNext() LOG.debug("Receiving next response in {}", this); boolean setFillInterest = parseAndFill(true); if (!hasContent() && setFillInterest) - fillInterested(); + getHttpConnection().fillInterested(); } @Override @@ -572,4 +575,25 @@ private enum State { STATUS, HEADERS, CONTENT, COMPLETE } + + private class DemandContentCallback implements Callback + { + @Override + public void succeeded() + { + getHttpConnection().onFillable(); + } + + @Override + public void failed(Throwable failure) + { + getHttpConnection().onFillInterestedFailed(failure); + } + + @Override + public InvocationType getInvocationType() + { + return HttpReceiverOverHTTP.this.getInvocationType(); + } + } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/ProtocolHttpUpgrader.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/ProtocolHttpUpgrader.java index 220f649bc036..70a747986735 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/ProtocolHttpUpgrader.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/ProtocolHttpUpgrader.java @@ -13,9 +13,9 @@ package org.eclipse.jetty.client.transport.internal; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.eclipse.jetty.client.Destination; import org.eclipse.jetty.client.HttpClient; @@ -29,6 +29,7 @@ import org.eclipse.jetty.client.transport.HttpDestination; import org.eclipse.jetty.client.transport.HttpResponse; import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; @@ -66,15 +67,14 @@ public void upgrade(Response response, EndPoint endPoint, Callback callback) { HttpClient httpClient = destination.getHttpClient(); HttpClientTransport transport = httpClient.getHttpClientTransport(); - if (transport instanceof HttpClientTransportDynamic) + if (transport instanceof HttpClientTransportDynamic dynamic) { - HttpClientTransportDynamic dynamicTransport = (HttpClientTransportDynamic)transport; - Origin origin = destination.getOrigin(); Origin newOrigin = new Origin(origin.getScheme(), origin.getAddress(), origin.getTag(), new Origin.Protocol(List.of(protocol), false)); Destination newDestination = httpClient.resolveDestination(newOrigin); - Map context = new HashMap<>(); + Map context = new ConcurrentHashMap<>(); + context.put(ClientConnectionFactory.CLIENT_CONTEXT_KEY, httpClient); context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, newDestination); context.put(HttpResponse.class.getName(), response); context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, Promise.from(y -> callback.succeeded(), callback::failed)); @@ -82,7 +82,7 @@ public void upgrade(Response response, EndPoint endPoint, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("Upgrading {} on {}", response.getRequest(), endPoint); - dynamicTransport.upgrade(endPoint, context); + dynamic.upgrade(endPoint, context); } else { diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java index 527e76443cd9..cf6bff3d416c 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java @@ -1401,9 +1401,12 @@ public void onClosed(Connection connection) assertTrue(latch.await(5, TimeUnit.SECONDS)); + // Client sent and server received (and viceversa) bytes may not be equal + // because upon closing the connection the TLS alert may not be read due + // to the fact that the EndPoint is closed. assertThat(clientStats.getSentBytes(), Matchers.greaterThan(0L)); - assertEquals(clientStats.getSentBytes(), serverStats.getReceivedBytes()); + assertThat(serverStats.getReceivedBytes(), Matchers.greaterThan(0L)); assertThat(clientStats.getReceivedBytes(), Matchers.greaterThan(0L)); - assertEquals(clientStats.getReceivedBytes(), serverStats.getSentBytes()); + assertThat(serverStats.getSentBytes(), Matchers.greaterThan(0L)); } } diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index 785115587631..2756be646191 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -180,14 +180,17 @@ public boolean handle(org.eclipse.jetty.server.Request request, org.eclipse.jett return true; } }); - client.setConnectBlocking(true); - ContentResponse response = client.GET(scenario.getScheme() + "://localhost:" + connector.getLocalPort()); - assertNotNull(response); - assertEquals(200, response.getStatus()); - byte[] content = response.getContent(); - assertArrayEquals(data, content); + for (int i = 0; i < 2; ++i) + { + ContentResponse response = client.GET(scenario.getScheme() + "://localhost:" + connector.getLocalPort()); + + assertNotNull(response); + assertEquals(200, response.getStatus()); + byte[] content = response.getContent(); + assertArrayEquals(data, content); + } } @ParameterizedTest diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java index 209ed24563a4..bdb023225718 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.client.transport.HttpDestination; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.logging.StacklessLogging; import org.eclipse.jetty.server.Handler; @@ -498,20 +499,19 @@ public boolean handle(org.eclipse.jetty.server.Request request, Response respons @Override public void onComplete(Result result) { - // Fake the fact that the redirect failed. + // Fake the fact that the redirect failed, + // but the redirect should still be followed. Result newResult = new Result(result, cause); super.onComplete(newResult); } }); - ExecutionException e = assertThrows(ExecutionException.class, () -> - { - client.newRequest("localhost", connector.getLocalPort()) - .scheme(scenario.getScheme()) - .path("/redirect") - .timeout(5, TimeUnit.SECONDS) - .send(); - }); - assertSame(cause, e.getCause()); + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .path("/redirect") + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertEquals(HttpStatus.OK_200, response.getStatus()); } } diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java index ffa73f332c75..5f5c012f3b5e 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java @@ -258,7 +258,7 @@ protected void fillInterested() // Verify that the buffer has been released // before fillInterested() is called. assertNull(getResponseBuffer()); - // Fill the endpoint so receive is called again. + // Fill the endpoint so receive() is called again. endPoint.addInput("X"); } super.fillInterested(); diff --git a/jetty-core/jetty-compression/jetty-compression-server/src/main/java/org/eclipse/jetty/compression/server/CompressionConfig.java b/jetty-core/jetty-compression/jetty-compression-server/src/main/java/org/eclipse/jetty/compression/server/CompressionConfig.java index d8ca58d62f78..157013b5e39d 100644 --- a/jetty-core/jetty-compression/jetty-compression-server/src/main/java/org/eclipse/jetty/compression/server/CompressionConfig.java +++ b/jetty-core/jetty-compression/jetty-compression-server/src/main/java/org/eclipse/jetty/compression/server/CompressionConfig.java @@ -245,7 +245,10 @@ String getCompressionEncoding(Set supportedEncodings, Request request, L // The only option left is identity, if acceptable. if (identity != null && !identity.isAcceptable()) - throw new HttpException.RuntimeException(HttpStatus.UNSUPPORTED_MEDIA_TYPE_415); + { + List accepted = supportedEncodings.stream().filter(compressEncodings).toList(); + throw new HttpException.RuntimeException(HttpStatus.UNSUPPORTED_MEDIA_TYPE_415, String.join(", ", accepted)); + } // Identity is acceptable. return null; diff --git a/jetty-core/jetty-compression/jetty-compression-server/src/main/java/org/eclipse/jetty/compression/server/CompressionHandler.java b/jetty-core/jetty-compression/jetty-compression-server/src/main/java/org/eclipse/jetty/compression/server/CompressionHandler.java index 51daad0f5f1b..8264f9b60e41 100644 --- a/jetty-core/jetty-compression/jetty-compression-server/src/main/java/org/eclipse/jetty/compression/server/CompressionHandler.java +++ b/jetty-core/jetty-compression/jetty-compression-server/src/main/java/org/eclipse/jetty/compression/server/CompressionHandler.java @@ -22,9 +22,11 @@ import org.eclipse.jetty.compression.server.internal.CompressionResponse; import org.eclipse.jetty.compression.server.internal.DecompressionRequest; import org.eclipse.jetty.http.EtagUtils; +import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.PreEncodedHttpField; import org.eclipse.jetty.http.QuotedQualityCSV; import org.eclipse.jetty.http.pathmap.MatchedResource; @@ -34,6 +36,7 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.TypeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,7 +198,7 @@ protected void doStart() throws Exception } @Override - public boolean handle(final Request request, final Response response, final Callback callback) throws Exception + public boolean handle(Request request, Response response, Callback callback) throws Exception { if (LOG.isDebugEnabled()) LOG.debug("handling {} {} {}", request, response, this); @@ -262,7 +265,28 @@ public boolean handle(final Request request, final Response response, final Call requestAcceptEncoding = qualityCSV.getQualityValues(); String decompressEncoding = config.getDecompressionEncoding(supportedEncodings.keySet(), request, requestContentEncoding, pathInContext); - String compressEncoding = config.getCompressionEncoding(supportedEncodings.keySet(), request, requestAcceptEncoding, pathInContext); + + String compressEncoding; + try + { + compressEncoding = config.getCompressionEncoding(supportedEncodings.keySet(), request, requestAcceptEncoding, pathInContext); + } + catch (Throwable x) + { + if (x instanceof HttpException http) + { + int statusCode = http.getCode(); + if (statusCode == HttpStatus.UNSUPPORTED_MEDIA_TYPE_415) + { + String accepted = http.getReason(); + if (StringUtil.isNotBlank(accepted)) + response.getHeaders().put(HttpHeader.ACCEPT_ENCODING, accepted); + Response.writeError(request, response, callback, http.getCode(), null, x); + return true; + } + } + throw x; + } if (LOG.isDebugEnabled()) { diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/HttpClientTransportOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/HttpClientTransportOverFCGI.java index 422f736260db..6c6461ebe378 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/HttpClientTransportOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/HttpClientTransportOverFCGI.java @@ -98,7 +98,9 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map promise) { - return new HttpConnectionOverFCGI(endPoint, destination, promise); + HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination, promise); + connection.setInvocationType(getInvocationType()); + return connection; } public void customize(Request request, HttpFields.Mutable fastCGIHeaders) diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java index 4bdc222b125e..1a2d6e8f2930 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java @@ -48,14 +48,17 @@ import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HttpConnectionOverFCGI extends AbstractConnection implements IConnection, Attachable +public class HttpConnectionOverFCGI extends AbstractConnection implements IConnection, Attachable, Invocable { private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class); + private final Callback fillableCallback = new FillableCallback(); private final ByteBufferPool networkByteBufferPool; private final AtomicInteger requests = new AtomicInteger(); private final AtomicBoolean closed = new AtomicBoolean(); @@ -70,6 +73,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne private State state = State.STATUS; private long idleTimeout; private boolean shutdown; + private InvocationType invocationType = InvocationType.BLOCKING; public HttpConnectionOverFCGI(EndPoint endPoint, Destination destination, Promise promise) { @@ -89,6 +93,17 @@ public HttpDestination getHttpDestination() return destination; } + @Override + public InvocationType getInvocationType() + { + return invocationType; + } + + public void setInvocationType(InvocationType invocationType) + { + this.invocationType = invocationType; + } + @Override public SocketAddress getLocalSocketAddress() { @@ -132,6 +147,12 @@ public void onOpen() promise.succeeded(this); } + @Override + public void fillInterested() + { + fillInterested(fillableCallback); + } + @Override public void onFillable() { @@ -218,6 +239,12 @@ private boolean parse(ByteBuffer buffer, boolean notifyContentAvailable) { boolean handle = parser.parse(buffer); + if (LOG.isDebugEnabled()) + LOG.debug("Parse state={} result={} {} {} on {}", state, handle, BufferUtil.toDetailString(buffer), parser, this); + + if (!handle) + return false; + switch (state) { case STATUS -> @@ -230,19 +257,11 @@ private boolean parse(ByteBuffer buffer, boolean notifyContentAvailable) if (notifyContentAvailable) channel.responseContentAvailable(); } - case COMPLETE -> - { - // For the complete event, handle==false, and cannot - // differentiate between a complete event and a parse() - // with zero or not enough bytes, so the state is reset - // here to avoid calling responseSuccess() again. - state = State.STATUS; - channel.responseSuccess(); - } + case COMPLETE -> channel.responseSuccess(); default -> throw new IllegalStateException("Invalid state " + state); } - return handle; + return true; } private void shutdown() @@ -471,12 +490,13 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) } @Override - public void onEnd(int request) + public boolean onEnd(int request) { if (LOG.isDebugEnabled()) LOG.debug("onEnd r={}", request); channel.end(); state = State.COMPLETE; + return true; } @Override @@ -492,4 +512,25 @@ private enum State { STATUS, HEADERS, CONTENT, COMPLETE } + + private class FillableCallback implements Callback + { + @Override + public void succeeded() + { + onFillable(); + } + + @Override + public void failed(Throwable failure) + { + onFillInterestedFailed(failure); + } + + @Override + public InvocationType getInvocationType() + { + return HttpConnectionOverFCGI.this.getInvocationType(); + } + } } diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java index 805cff268acc..9c42fdfc96f1 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java @@ -18,10 +18,12 @@ import org.eclipse.jetty.client.transport.HttpReceiver; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; public class HttpReceiverOverFCGI extends HttpReceiver { + private final Callback demandContentCallback = new DemandContentCallback(); private Content.Chunk chunk; public HttpReceiverOverFCGI(HttpChannel channel) @@ -31,19 +33,19 @@ public HttpReceiverOverFCGI(HttpChannel channel) void receive() { - if (!hasContent()) - { - HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); - boolean setFillInterest = httpConnection.parseAndFill(true); - if (!hasContent() && setFillInterest) - httpConnection.fillInterested(); - } - else + if (hasContent()) { HttpExchange exchange = getHttpExchange(); if (exchange != null) responseContentAvailable(exchange); } + else + { + HttpConnectionOverFCGI httpConnection = getHttpConnection(); + boolean setFillInterest = httpConnection.parseAndFill(true); + if (!hasContent() && setFillInterest) + httpConnection.fillInterested(); + } } @Override @@ -80,16 +82,21 @@ public Content.Chunk read(boolean fillInterestIfNeeded) Content.Chunk chunk = consumeChunk(); if (chunk != null) return chunk; - HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); + HttpConnectionOverFCGI httpConnection = getHttpConnection(); boolean needFillInterest = httpConnection.parseAndFill(false); chunk = consumeChunk(); if (chunk != null) return chunk; if (needFillInterest && fillInterestIfNeeded) - httpConnection.fillInterested(); + fillInterested(); return null; } + private void fillInterested() + { + getHttpConnection().fillInterested(demandContentCallback); + } + private Content.Chunk consumeChunk() { Content.Chunk chunk = this.chunk; @@ -103,8 +110,8 @@ public void failAndClose(Throwable failure) responseFailure(failure, Promise.from(failed -> { if (failed) - getHttpChannel().getHttpConnection().close(failure); - }, x -> getHttpChannel().getHttpConnection().close(failure))); + getHttpConnection().close(failure); + }, x -> getHttpConnection().close(failure))); } void content(Content.Chunk chunk) @@ -135,7 +142,7 @@ private void receiveNext() if (chunk != null) throw new IllegalStateException(); - HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); + HttpConnectionOverFCGI httpConnection = getHttpConnection(); boolean setFillInterest = httpConnection.parseAndFill(true); if (!hasContent() && setFillInterest) httpConnection.fillInterested(); @@ -147,6 +154,11 @@ protected HttpChannelOverFCGI getHttpChannel() return (HttpChannelOverFCGI)super.getHttpChannel(); } + private HttpConnectionOverFCGI getHttpConnection() + { + return getHttpChannel().getHttpConnection(); + } + @Override protected void responseBegin(HttpExchange exchange) { @@ -176,4 +188,25 @@ protected void responseFailure(Throwable failure, Promise promise) { super.responseFailure(failure, promise); } + + private class DemandContentCallback implements Callback + { + @Override + public void succeeded() + { + getHttpConnection().onFillable(); + } + + @Override + public void failed(Throwable failure) + { + getHttpConnection().onFillInterestedFailed(failure); + } + + @Override + public InvocationType getInvocationType() + { + return HttpReceiverOverFCGI.this.getInvocationType(); + } + } } diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpSenderOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpSenderOverFCGI.java index 17c2e18a8a29..18e5f5c4e56c 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpSenderOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpSenderOverFCGI.java @@ -95,7 +95,8 @@ protected void sendHeaders(HttpExchange exchange, ByteBuffer contentBuffer, bool } // Give a chance to the transport implementation to customize the FastCGI headers - HttpClientTransportOverFCGI transport = (HttpClientTransportOverFCGI)getHttpChannel().getHttpDestination().getHttpClient().getHttpClientTransport(); + HttpClient httpClient = getHttpChannel().getHttpDestination().getHttpClient(); + HttpClientTransportOverFCGI transport = (HttpClientTransportOverFCGI)httpClient.getHttpClientTransport(); transport.customize(request, fcgiHeaders); ByteBufferPool.Accumulator accumulator = new ByteBufferPool.Accumulator(); diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java index 7f63bfd2eace..aab71ec1a2c0 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java @@ -76,13 +76,14 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) } @Override - public void onEnd(int request) + public boolean onEnd(int request) { - listener.onEnd(request); + boolean result = listener.onEnd(request); for (StreamContentParser streamParser : streamParsers) { streamParser.end(request); } + return result; } @Override diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java index a45b17a7deb1..cb328073606b 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java @@ -130,6 +130,8 @@ public boolean parse(ByteBuffer buffer) { buffer.position(buffer.position() + padding); reset(); + if (!buffer.hasRemaining()) + return true; } else { @@ -195,8 +197,9 @@ public default boolean onContent(int request, FCGI.StreamType stream, ByteBuffer return false; } - public default void onEnd(int request) + public default boolean onEnd(int request) { + return false; } public default void onFailure(int request, Throwable failure) diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java index 4414b5ab532d..f022b376114e 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java @@ -87,14 +87,14 @@ public boolean noContent() { try { - listener.onEnd(getRequest()); + return listener.onEnd(getRequest()); } catch (Throwable x) { if (LOG.isDebugEnabled()) LOG.debug("Exception while invoking listener {}", listener, x); + return false; } - return false; } protected boolean onContent(ByteBuffer buffer) diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java index 5eab762f05b2..90359c95d815 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/test/java/org/eclipse/jetty/fcgi/generator/ClientGeneratorTest.java @@ -175,10 +175,11 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) } @Override - public void onEnd(int request) + public boolean onEnd(int request) { assertEquals(id, request); assertEquals(contentLength, totalLength.get()); + return false; } }); diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java index 281778a37e1d..e61ecff85337 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java @@ -130,10 +130,11 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) } @Override - public void onEnd(int request) + public boolean onEnd(int request) { assertEquals(id, request); verifier.addAndGet(3); + return false; } }); @@ -181,10 +182,11 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) } @Override - public void onEnd(int request) + public boolean onEnd(int request) { assertEquals(id, request); verifier.addAndGet(3); + return false; } }); @@ -232,11 +234,12 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) } @Override - public void onEnd(int request) + public boolean onEnd(int request) { assertEquals(id, request); assertEquals(contentLength, totalLength.get()); verifier.set(true); + return false; } }); diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java index f991af3d9ab9..9e1719c60cb9 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java @@ -380,7 +380,7 @@ public boolean onContent(int request, FCGI.StreamType streamType, ByteBuffer buf } @Override - public void onEnd(int request) + public boolean onEnd(int request) { if (LOG.isDebugEnabled()) LOG.debug("Request {} end on {}", request, stream); @@ -390,7 +390,9 @@ public void onEnd(int request) // Nulling out the stream signals that the // request is complete, see also parseAndFill(). stream = null; + return true; } + return false; } @Override @@ -399,9 +401,7 @@ public void onFailure(int request, Throwable failure) if (LOG.isDebugEnabled()) LOG.debug("Request {} failure on {}", request, stream, failure); if (stream != null) - { ThreadPool.executeImmediately(getExecutor(), stream.getHttpChannel().onFailure(new BadMessageException(null, failure))); - } stream = null; } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/ClientConnectionFactoryOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/ClientConnectionFactoryOverHTTP2.java index 7c285a78ce07..3f65fa964564 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/ClientConnectionFactoryOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/ClientConnectionFactoryOverHTTP2.java @@ -75,7 +75,12 @@ public static class HTTP2 extends Info public HTTP2(HTTP2Client http2Client) { - super(new ClientConnectionFactoryOverHTTP2(http2Client)); + this(new ClientConnectionFactoryOverHTTP2(http2Client)); + } + + public HTTP2(ClientConnectionFactoryOverHTTP2 connectionFactory) + { + super(connectionFactory); } @Override diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/HttpClientTransportOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/HttpClientTransportOverHTTP2.java index 72b2a4367cf9..12836692b06f 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/HttpClientTransportOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/HttpClientTransportOverHTTP2.java @@ -42,9 +42,10 @@ import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.thread.Invocable; @ManagedObject("The HTTP/2 client transport") -public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport +public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport implements Invocable { private final ClientConnectionFactory connectionFactory = new HTTP2ClientConnectionFactory(); private final HTTP2Client http2Client; @@ -155,7 +156,9 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map promise) promise.succeeded(true); return null; } - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + return () -> { boolean expire = connection.onIdleExpired(timeout); if (expire) @@ -72,17 +81,17 @@ public Runnable onTimeout(TimeoutException timeout, Promise promise) close(timeout); } promise.succeeded(expire); - }); + }; } @Override public Runnable onFailure(Throwable failure, Callback callback) { - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + return () -> { processFailure(failure); close(failure); callback.failed(failure); - }); + }; } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java index 2d9e50d6a02f..0cf30a923384 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java @@ -22,12 +22,14 @@ import org.eclipse.jetty.client.Connection; import org.eclipse.jetty.client.ConnectionPool; import org.eclipse.jetty.client.Destination; +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClientTransport; import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; @@ -93,7 +95,10 @@ private void onServerPreface(Session session) protected Connection newConnection(Destination destination, Session session, HTTP2Connection connection) { - return new HttpConnectionOverHTTP2(destination, session, connection); + HttpClient httpClient = (HttpClient)context.get(ClientConnectionFactory.CLIENT_CONTEXT_KEY); + HttpConnectionOverHTTP2 result = new HttpConnectionOverHTTP2(destination, session, connection); + result.setInvocationType(httpClient.getHttpClientTransport().getInvocationType()); + return result; } @Override diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java index 7e1991b9b777..f3be4a4c6723 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,9 +183,10 @@ public void onNewStream(Stream stream) } @Override - public void onHeaders(Stream stream, HeadersFrame frame) + public void onHeaders(Stream stream, HeadersFrame frame, Callback callback) { - receiver.onHeaders(stream, frame); + HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); + connection.offerTask(channel.onHeaders(stream, frame, callback), false); } @Override @@ -204,21 +206,24 @@ public void onDataAvailable(Stream stream) public void onReset(Stream stream, ResetFrame frame, Callback callback) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - connection.offerTask(channel.onReset(frame, callback), false); + Runnable task = channel.onReset(frame, callback); + ThreadPool.executeImmediately(connection.getHttpClient().getExecutor(), task); } @Override public void onIdleTimeout(Stream stream, TimeoutException x, Promise promise) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - connection.offerTask(channel.onTimeout(x, promise), false); + Runnable task = channel.onTimeout(x, promise); + ThreadPool.executeImmediately(connection.getHttpClient().getExecutor(), task); } @Override public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - connection.offerTask(channel.onFailure(failure, callback), false); + Runnable task = channel.onFailure(failure, callback); + ThreadPool.executeImmediately(connection.getHttpClient().getExecutor(), task); } } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java index 15e77d308b9d..fb6a38c569df 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java @@ -46,11 +46,12 @@ import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Sweeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.MaxMultiplexable, ConnectionPool.MaxUsable +public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.MaxMultiplexable, ConnectionPool.MaxUsable, Invocable { private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP2.class); @@ -61,6 +62,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S private final Session session; private final HTTP2Connection connection; private boolean recycleHttpChannels = true; + private InvocationType invocationType = InvocationType.BLOCKING; public HttpConnectionOverHTTP2(Destination destination, Session session, HTTP2Connection connection) { @@ -119,6 +121,17 @@ void setMaxUsage(int maxUsage) ((HTTP2Session)session).setMaxTotalLocalStreams(maxUsage); } + @Override + public InvocationType getInvocationType() + { + return invocationType; + } + + public void setInvocationType(InvocationType invocationType) + { + this.invocationType = invocationType; + } + @Override protected Iterator getHttpChannels() { diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java index ba6835cfaf43..021ff55db828 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java @@ -15,6 +15,7 @@ import java.io.EOFException; import java.io.IOException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; @@ -43,6 +44,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.SerializedInvoker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +52,13 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP2.class); + private final SerializedInvoker invoker; + public HttpReceiverOverHTTP2(HttpChannel channel) { super(channel); + Executor executor = channel.getHttpDestination().getHttpClient().getExecutor(); + invoker = new SerializedInvoker(HttpReceiverOverHTTP2.class.getName(), executor); } @Override @@ -116,61 +122,77 @@ protected HttpChannelOverHTTP2 getHttpChannel() return (HttpChannelOverHTTP2)super.getHttpChannel(); } - void onHeaders(Stream stream, HeadersFrame frame) + private HttpConnectionOverHTTP2 getHttpConnection() + { + return getHttpChannel().getHttpConnection(); + } + + @Override + public Runnable onHeaders(Stream stream, HeadersFrame frame, Callback callback) { MetaData metaData = frame.getMetaData(); if (metaData.isResponse()) - onResponse(stream, frame); + return onResponse(stream, frame, callback); else - onTrailer(frame); + return onTrailer(frame, callback); } - private void onResponse(Stream stream, HeadersFrame frame) + private Runnable onResponse(Stream stream, HeadersFrame frame, Callback callback) { HttpExchange exchange = getHttpExchange(); if (exchange == null) - return; - - MetaData.Response response = (MetaData.Response)frame.getMetaData(); - HttpResponse httpResponse = exchange.getResponse(); - httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); - - responseBegin(exchange); - - HttpFields headers = response.getHttpFields(); - for (HttpField header : headers) { - responseHeader(exchange, header); + callback.succeeded(); + return null; } - HttpRequest httpRequest = exchange.getRequest(); - if (MetaData.isTunnel(httpRequest.getMethod(), httpResponse.getStatus())) + return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), () -> { - ClientHTTP2StreamEndPoint endPoint = new ClientHTTP2StreamEndPoint((HTTP2Stream)stream); - long idleTimeout = httpRequest.getIdleTimeout(); - if (idleTimeout > 0) - endPoint.setIdleTimeout(idleTimeout); - if (LOG.isDebugEnabled()) - LOG.debug("Successful HTTP2 tunnel on {} via {} in {}", stream, endPoint, this); - ((HTTP2Stream)stream).setAttachment(endPoint); - HttpConversation conversation = httpRequest.getConversation(); - conversation.setAttribute(EndPoint.class.getName(), endPoint); - HttpUpgrader upgrader = (HttpUpgrader)conversation.getAttribute(HttpUpgrader.class.getName()); - if (upgrader != null) - upgrade(upgrader, httpResponse, endPoint); - } + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + HttpResponse httpResponse = exchange.getResponse(); + httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); - responseHeaders(exchange); + responseBegin(exchange); + + HttpFields headers = response.getHttpFields(); + for (HttpField header : headers) + { + responseHeader(exchange, header); + } + + HttpRequest httpRequest = exchange.getRequest(); + if (MetaData.isTunnel(httpRequest.getMethod(), httpResponse.getStatus())) + { + ClientHTTP2StreamEndPoint endPoint = new ClientHTTP2StreamEndPoint((HTTP2Stream)stream); + long idleTimeout = httpRequest.getIdleTimeout(); + if (idleTimeout > 0) + endPoint.setIdleTimeout(idleTimeout); + if (LOG.isDebugEnabled()) + LOG.debug("Successful HTTP2 tunnel on {} via {} in {}", stream, endPoint, this); + ((HTTP2Stream)stream).setAttachment(endPoint); + HttpConversation conversation = httpRequest.getConversation(); + conversation.setAttribute(EndPoint.class.getName(), endPoint); + HttpUpgrader upgrader = (HttpUpgrader)conversation.getAttribute(HttpUpgrader.class.getName()); + if (upgrader != null) + upgrade(upgrader, httpResponse, endPoint); + } + + responseHeaders(exchange); + + callback.succeeded(); + })); } - private void onTrailer(HeadersFrame frame) + private Runnable onTrailer(HeadersFrame frame, Callback callback) { HttpExchange exchange = getHttpExchange(); - if (exchange == null) - return; - - HttpFields trailers = frame.getMetaData().getHttpFields(); - trailers.forEach(exchange.getResponse()::trailer); + if (exchange != null) + { + HttpFields trailers = frame.getMetaData().getHttpFields(); + trailers.forEach(exchange.getResponse()::trailer); + } + callback.succeeded(); + return null; } private void upgrade(HttpUpgrader upgrader, HttpResponse response, EndPoint endPoint) @@ -202,7 +224,7 @@ Stream.Listener onPush(Stream stream, PushPromiseFrame frame) Response.CompleteListener listener = pushHandler.apply(request, pushRequest); if (listener != null) { - HttpChannelOverHTTP2 pushChannel = getHttpChannel().getHttpConnection().acquireHttpChannel(); + HttpChannelOverHTTP2 pushChannel = getHttpConnection().acquireHttpChannel(); pushRequest.getResponseListeners().addCompleteListener(listener, true); HttpExchange pushExchange = new HttpExchange(getHttpDestination(), pushRequest); pushChannel.associate(pushExchange); @@ -224,7 +246,7 @@ public Runnable onDataAvailable() HttpExchange exchange = getHttpExchange(); if (exchange == null) return null; - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> responseContentAvailable(exchange)); + return invoker.offer(new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange))); } @Override @@ -236,12 +258,11 @@ public Runnable onReset(ResetFrame frame, Callback callback) callback.succeeded(); return null; } - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> - { - int error = frame.getError(); - IOException failure = new IOException(ErrorCode.toString(error, "reset_code_" + error)); - callback.completeWith(exchange.getRequest().abort(failure)); - }); + + int error = frame.getError(); + IOException failure = new IOException(ErrorCode.toString(error, "reset_code_" + error)); + Runnable task = () -> callback.completeWith(exchange.getRequest().abort(failure)); + return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), task)); } @Override @@ -253,15 +274,14 @@ public Runnable onTimeout(TimeoutException failure, Promise promise) promise.succeeded(false); return null; } - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> - promise.completeWith(exchange.getRequest().abort(failure)) - ); + Runnable task = () -> promise.completeWith(exchange.getRequest().abort(failure)); + return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), task)); } @Override public Runnable onFailure(Throwable failure, Callback callback) { - Promise promise = Promise.from(failed -> callback.succeeded(), callback::failed); - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> responseFailure(failure, promise)); + Runnable task = () -> responseFailure(failure, Promise.from(failed -> callback.succeeded(), callback::failed)); + return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), task)); } } diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Channel.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Channel.java index ff4ef40c12e2..d572769728e4 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Channel.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Channel.java @@ -16,6 +16,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; +import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.util.Callback; @@ -34,6 +35,8 @@ public interface HTTP2Channel */ public interface Client { + public Runnable onHeaders(Stream stream, HeadersFrame frame, Callback callback); + public Runnable onDataAvailable(); public Runnable onReset(ResetFrame frame, Callback callback); diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index c421acac6deb..b5438057e7fc 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -397,7 +397,7 @@ public Runnable produce() int filled = fill(getEndPoint(), networkBuffer.getByteBuffer(), compact); if (LOG.isDebugEnabled()) - LOG.debug("Filled {} bytes compacted {} in {}", filled, compact, networkBuffer); + LOG.debug("Filled {} bytes compacted {} {} in {}", filled, compact, networkBuffer, HTTP2Connection.this); if (filled > 0) { @@ -439,10 +439,11 @@ else if (filled == 0) private RetainableByteBuffer.Mutable acquireBuffer() { RetainableByteBuffer.Mutable buffer = heldBuffer.getAndSet(null); + RetainableByteBuffer.Mutable held = buffer; if (buffer == null) buffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable(); if (LOG.isDebugEnabled()) - LOG.debug("Acquired {}", buffer); + LOG.debug("Acquired {} {} in {}", held == null ? "new" : "held", buffer, HTTP2Connection.this); return buffer; } @@ -451,7 +452,7 @@ private void holdBuffer(RetainableByteBuffer.Mutable buffer) if (heldBuffer.compareAndSet(null, buffer)) { if (LOG.isDebugEnabled()) - LOG.debug("Held {}", buffer); + LOG.debug("Held {} in {}", buffer, HTTP2Connection.this); } else { diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index d1db60de34ae..cc488a5d92b1 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -1202,7 +1202,7 @@ protected void notifyGoAway(Session session, GoAwayFrame frame) } catch (Throwable x) { - LOG.info("Failure while notifying listener " + listener, x); + LOG.info("Failure while notifying listener {}", listener, x); } } diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index b261a81cec16..0e057809c4f8 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -317,7 +317,7 @@ protected void onIdleTimeout(TimeoutException timeout) LOG.debug("Idle timeout {}ms expired on {}", getIdleTimeout(), this); // Notify the application. - notifyIdleTimeout(this, timeout, Promise.from(timedOut -> + notifyIdleTimeout(timeout, Promise.from(timedOut -> { if (timedOut) reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); @@ -376,13 +376,12 @@ public void process(Data data) private void onNewStream(Callback callback) { - notifyNewStream(this); + notifyNewStream(); callback.succeeded(); } private void onHeaders(HeadersFrame frame, Callback callback) { - boolean offered = false; MetaData metaData = frame.getMetaData(); boolean isTrailer = !metaData.isRequest() && !metaData.isResponse(); if (isTrailer) @@ -390,11 +389,14 @@ private void onHeaders(HeadersFrame frame, Callback callback) // In case of trailers, notify first and then offer EOF to // avoid race conditions due to concurrent calls to readData(). boolean closed = updateClose(true, CloseState.Event.RECEIVED); - notifyHeaders(this, frame); - if (closed) - getSession().removeStream(this); - // Offer EOF in case the application calls readData() or demand(). - offered = offer(Data.eof(getId())); + notifyHeaders(frame, Callback.from(() -> + { + // Offer EOF in case the application calls readData() or demand(). + if (offer(Data.eof(getId()))) + processData(); + if (closed) + getSession().removeStream(this); + }, callback)); } else { @@ -404,27 +406,27 @@ private void onHeaders(HeadersFrame frame, Callback callback) length = fields.getLongField(HttpHeader.CONTENT_LENGTH); dataLength = length; - if (frame.isEndStream()) - { - // Offer EOF for either the request or the response in - // case the application calls readData() or demand(). - offered = offer(Data.eof(getId())); - } + // Offer EOF for either the request or the response in + // case the application calls readData() or demand(). + boolean eof = frame.isEndStream() && offer(Data.eof(getId())); // Requests are notified to a Session.Listener, here only notify responses. - if (metaData.isResponse()) + if (metaData.isRequest()) + { + callback.succeeded(); + } + else { boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED); - notifyHeaders(this, frame); - if (closed) - getSession().removeStream(this); + notifyHeaders(frame, Callback.from(() -> + { + if (eof) + processData(); + if (closed) + getSession().removeStream(this); + }, callback)); } } - - if (offered) - processData(); - - callback.succeeded(); } private void onData(Data data) @@ -441,15 +443,6 @@ private void onData(Data data) return; } - if (isReset()) - { - // Just drop the frame. - if (LOG.isDebugEnabled()) - LOG.debug("Data {} for already reset {}", data, this); - session.dataConsumed(this, data.frame().flowControlLength()); - return; - } - if (dataLength >= 0) { dataLength -= frame.remaining(); @@ -469,14 +462,29 @@ private void onData(Data data) private boolean offer(Data data) { - // Retain the data because it is stored for later use. - data.retain(); - boolean process; + boolean reset; + boolean process = false; try (AutoLock ignored = lock.lock()) { - process = dataQueue.isEmpty() && dataDemand; - dataQueue.offer(data); + reset = isReset(); + if (!reset) + { + process = dataQueue.isEmpty() && dataDemand; + // Retain the data because it is stored for later use. + data.retain(); + dataQueue.offer(data); + } } + + if (reset) + { + // Drop the frame. + if (LOG.isDebugEnabled()) + LOG.debug("Data {} for already reset {}", data, this); + session.dataConsumed(this, data.frame().flowControlLength()); + return false; + } + if (LOG.isDebugEnabled()) LOG.debug("Data {} notifying onDataAvailable() {} for {}", data, process, this); return process; @@ -488,26 +496,30 @@ public Data readData() Data data; try (AutoLock ignored = lock.lock()) { - if (dataQueue.isEmpty()) - return null; data = dataQueue.poll(); + if (data == null) + return null; if (data.frame().isEndStream()) dataQueue.offer(Data.eof(getId())); } - if (updateClose(data.frame().isEndStream(), CloseState.Event.RECEIVED)) - session.removeStream(this); + // Update the stream close state, so that the flow control + // update may be skipped if the stream is remotely closed. + boolean closed = updateClose(data.frame().isEndStream(), CloseState.Event.RECEIVED); if (LOG.isDebugEnabled()) LOG.debug("Reading {} for {}", data, this); - notIdle(); - // Enlarge the flow control window now, since the application // may want to retain the Data objects, accumulating them in // memory beyond the flow control window, without copying them. session.dataConsumed(this, data.frame().flowControlLength()); + if (closed) + session.removeStream(this); + else + notIdle(); + return data; } @@ -546,7 +558,7 @@ public void processData() dataDemand = false; dataStalled = false; } - notifyDataAvailable(this); + notifyDataAvailable(); } } @@ -589,7 +601,7 @@ private void onReset(ResetFrame frame, Callback callback) boolean removed = session.removeStream(this); session.dataConsumed(this, flowControlLength); if (removed) - notifyReset(this, frame, callback); + notifyReset(frame, callback); else callback.succeeded(); } @@ -618,7 +630,7 @@ private void onFailure(FailureFrame frame, Callback callback) session.dataConsumed(this, flowControlLength); close(); - notifyFailure(this, frame, new Nested(callback) + notifyFailure(frame, new Nested(callback) { @Override public void completed() @@ -789,7 +801,7 @@ public void close() public void onClose() { - notifyClosed(this); + notifyClosed(); } private void updateStreamCount(int deltaStream, int deltaClosing) @@ -832,43 +844,43 @@ private Callback endWrite() } } - private void notifyNewStream(Stream stream) + private void notifyNewStream() { - Listener listener = this.listener; - if (listener != null) + Listener listener = getListener(); + try { - try - { - listener.onNewStream(stream); - } - catch (Throwable x) - { - LOG.info("Failure while notifying listener {}", listener, x); - } + if (listener != null) + listener.onNewStream(this); + } + catch (Throwable x) + { + LOG.info("Failure while notifying listener {}", listener, x); } } - protected void notifyHeaders(Stream stream, HeadersFrame frame) + private void notifyHeaders(HeadersFrame frame, Callback callback) { - Stream.Listener listener = stream.getListener(); - if (listener == null) - return; + Stream.Listener listener = getListener(); try { - listener.onHeaders(stream, frame); + if (listener != null) + listener.onHeaders(this, frame, callback); + else + callback.succeeded(); } catch (Throwable x) { LOG.info("Failure while notifying listener {}", listener, x); + callback.failed(x); } } - private void notifyDataAvailable(Stream stream) + private void notifyDataAvailable() { - Listener listener = Objects.requireNonNullElse(this.listener, Listener.AUTO_DISCARD); + Listener listener = Objects.requireNonNullElse(getListener(), Listener.AUTO_DISCARD); try { - listener.onDataAvailable(stream); + listener.onDataAvailable(this); } catch (Throwable x) { @@ -876,77 +888,64 @@ private void notifyDataAvailable(Stream stream) } } - private void notifyReset(Stream stream, ResetFrame frame, Callback callback) + private void notifyReset(ResetFrame frame, Callback callback) { - Listener listener = this.listener; - if (listener != null) + Listener listener = getListener(); + try { - try - { - listener.onReset(stream, frame, callback); - } - catch (Throwable x) - { - LOG.info("Failure while notifying listener {}", listener, x); - callback.failed(x); - } + if (listener != null) + listener.onReset(this, frame, callback); + else + callback.succeeded(); } - else + catch (Throwable x) { - callback.succeeded(); + LOG.info("Failure while notifying listener {}", listener, x); + callback.failed(x); } } - private void notifyIdleTimeout(Stream stream, TimeoutException failure, Promise promise) + private void notifyIdleTimeout(TimeoutException failure, Promise promise) { - Listener listener = this.listener; - if (listener != null) + Listener listener = getListener(); + try { - try - { - listener.onIdleTimeout(stream, failure, promise); - } - catch (Throwable x) - { - LOG.info("Failure while notifying listener {}", listener, x); - promise.failed(x); - } + if (listener != null) + listener.onIdleTimeout(this, failure, promise); + else + promise.succeeded(true); } - else + catch (Throwable x) { - promise.succeeded(true); + LOG.info("Failure while notifying listener {}", listener, x); + promise.failed(x); } } - private void notifyFailure(Stream stream, FailureFrame frame, Callback callback) + private void notifyFailure(FailureFrame frame, Callback callback) { - Listener listener = this.listener; - if (listener != null) + Listener listener = getListener(); + try { - try - { - listener.onFailure(stream, frame.getError(), frame.getReason(), frame.getFailure(), callback); - } - catch (Throwable x) - { - LOG.info("Failure while notifying listener {}", listener, x); - callback.failed(x); - } + if (listener != null) + listener.onFailure(this, frame.getError(), frame.getReason(), frame.getFailure(), callback); + else + callback.succeeded(); } - else + catch (Throwable x) { - callback.succeeded(); + LOG.info("Failure while notifying listener {}", listener, x); + callback.failed(x); } } - private void notifyClosed(Stream stream) + private void notifyClosed() { - Listener listener = this.listener; - if (listener == null) - return; + Listener listener = getListener(); try { - listener.onClosed(stream); + if (listener != null) + listener.onClosed(this); } catch (Throwable x) { diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java index 705492611dad..98c5e63b54d0 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class HTTP2StreamEndPoint implements EndPoint +public abstract class HTTP2StreamEndPoint implements EndPoint, Invocable { private static final Logger LOG = LoggerFactory.getLogger(HTTP2StreamEndPoint.class); @@ -478,10 +478,11 @@ private void process() callback.succeeded(); } - protected Invocable.InvocationType getInvocationType() + @Override + public InvocationType getInvocationType() { Callback callback = readCallback.get(); - return callback == null ? Invocable.InvocationType.NON_BLOCKING : callback.getInvocationType(); + return callback == null ? InvocationType.NON_BLOCKING : callback.getInvocationType(); } @Override diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index ccc5892f49a2..82b06ba75125 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -284,6 +284,7 @@ public default void onNewStream(Stream stream) /** *

Callback method invoked when a HEADERS frame representing the HTTP response has been received.

+ *

This overload version should be used for simple synchronous implementations.

* * @param stream the stream * @param frame the HEADERS frame received @@ -294,6 +295,20 @@ public default void onHeaders(Stream stream, HeadersFrame frame) stream.demand(); } + /** + *

Callback method invoked when a HEADERS frame representing the HTTP response has been received.

+ *

This overload version allows for asynchronous implementations.

+ * + * @param stream the stream + * @param frame the HEADERS frame received + * @param callback the callback to notify when the processing is complete + */ + public default void onHeaders(Stream stream, HeadersFrame frame, Callback callback) + { + onHeaders(stream, frame); + callback.succeeded(); + } + /** *

Callback method invoked when a PUSH_PROMISE frame has been received.

*

Applications that override this method are typically interested in diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java index c4eef21ec424..eca1caa9e514 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java @@ -466,12 +466,18 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s[window_queue=%d,frame_queue=%d,processed/pending=%d/%d]", - super.toString(), - getWindowQueueSize(), - getFrameQueueSize(), - processedEntries.size(), - pendingEntries.size()); + try (AutoLock ignored = lock.tryLock()) + { + String held = lock.isHeldByCurrentThread() ? "" : "?"; + return String.format("%s[%s:windowQueue=%d,frameQueue=%d,processed/pending=%d/%d]", + super.toString(), + held, + windows.size(), + entries.size(), + processedEntries.size(), + pendingEntries.size() + ); + } } private class WindowEntry diff --git a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java index fd28ee93656d..10fc0827b57f 100644 --- a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java @@ -184,6 +184,7 @@ public Content.Chunk read() trailer = _trailer; if (trailer != null) { + data.release(); _chunk = Content.Chunk.next(trailer); return trailer; } diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/ClientConnectionFactoryOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/ClientConnectionFactoryOverHTTP3.java index 6ffa5fab4078..841515951ff2 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/ClientConnectionFactoryOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/ClientConnectionFactoryOverHTTP3.java @@ -62,17 +62,14 @@ public static class HTTP3 extends Info implements ProtocolSession.Factory { private static final List protocols = List.of("h3"); - private final HTTP3Client http3Client; - public HTTP3(HTTP3Client client) { - super(new ClientConnectionFactoryOverHTTP3(client)); - http3Client = client; + this(new ClientConnectionFactoryOverHTTP3(client)); } - public HTTP3Client getHTTP3Client() + public HTTP3(ClientConnectionFactoryOverHTTP3 connectionFactory) { - return http3Client; + super(connectionFactory); } @Override @@ -84,14 +81,15 @@ public List getProtocols(boolean secure) @Override public Transport newTransport() { - return new QuicTransport(getHTTP3Client().getQuicConfiguration()); + ClientConnectionFactoryOverHTTP3 http3 = (ClientConnectionFactoryOverHTTP3)getClientConnectionFactory(); + return new QuicTransport(http3.http3Client.getQuicConfiguration()); } @Override public ProtocolSession newProtocolSession(QuicSession quicSession, Map context) { ClientConnectionFactoryOverHTTP3 http3 = (ClientConnectionFactoryOverHTTP3)getClientConnectionFactory(); - context.put(HTTP3Client.CLIENT_CONTEXT_KEY, http3Client); + context.put(HTTP3Client.CLIENT_CONTEXT_KEY, http3.http3Client); SessionClientListener listener = new SessionClientListener(context); context.put(HTTP3Client.SESSION_LISTENER_CONTEXT_KEY, listener); return http3.factory.newProtocolSession(quicSession, context); diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/HttpClientTransportOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/HttpClientTransportOverHTTP3.java index 58eee23d0016..c1d91132c308 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/HttpClientTransportOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/HttpClientTransportOverHTTP3.java @@ -39,8 +39,9 @@ import org.eclipse.jetty.quic.client.QuicTransport; import org.eclipse.jetty.quic.common.ProtocolSession; import org.eclipse.jetty.quic.common.QuicSession; +import org.eclipse.jetty.util.thread.Invocable; -public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport implements ProtocolSession.Factory +public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport implements ProtocolSession.Factory, Invocable { private final HTTP3ClientConnectionFactory factory = new HTTP3ClientConnectionFactory(); private final HTTP3Client http3Client; @@ -127,7 +128,9 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map promise) + { + Runnable task = receiver.onIdleTimeout(failure, promise); + ThreadPool.executeImmediately(session.getProtocolSession().getQuicSession().getExecutor(), task); + } + + @Override + public void onFailure(Stream.Client stream, long error, Throwable failure) + { + Runnable task = receiver.onFailure(failure); + ThreadPool.executeImmediately(session.getProtocolSession().getQuicSession().getExecutor(), task); + } + } } diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpConnectionOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpConnectionOverHTTP3.java index 4f9e92910d74..5e45c5671954 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpConnectionOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpConnectionOverHTTP3.java @@ -32,16 +32,18 @@ import org.eclipse.jetty.http3.client.HTTP3SessionClient; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.quic.common.QuicSession; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HttpConnectionOverHTTP3 extends HttpConnection implements ConnectionPool.MaxMultiplexable, ConnectionPool.MaxUsable +public class HttpConnectionOverHTTP3 extends HttpConnection implements ConnectionPool.MaxMultiplexable, ConnectionPool.MaxUsable, Invocable { private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP3.class); private final Set activeChannels = ConcurrentHashMap.newKeySet(); private final AtomicBoolean closed = new AtomicBoolean(); private final HTTP3SessionClient session; + private InvocationType invocationType = InvocationType.BLOCKING; public HttpConnectionOverHTTP3(Destination destination, HTTP3SessionClient session) { @@ -87,6 +89,17 @@ public int getMaxUsage() return session.getMaxLocalStreams(); } + @Override + public InvocationType getInvocationType() + { + return invocationType; + } + + public void setInvocationType(InvocationType invocationType) + { + this.invocationType = invocationType; + } + @Override protected Iterator getHttpChannels() { @@ -163,4 +176,10 @@ public boolean onIdleTimeout(long idleTimeout, Throwable failure) close(failure); return false; } + + void offerTask(Runnable task) + { + if (task != null) + session.getProtocolSession().offer(task, false); + } } diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java index 660fad9b9bee..1c1800579b83 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java @@ -15,6 +15,7 @@ import java.io.EOFException; import java.nio.ByteBuffer; +import java.util.concurrent.Executor; import org.eclipse.jetty.client.transport.HttpExchange; import org.eclipse.jetty.client.transport.HttpReceiver; @@ -27,16 +28,22 @@ import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.SerializedInvoker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client.Listener +public class HttpReceiverOverHTTP3 extends HttpReceiver { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP3.class); + private final SerializedInvoker invoker; + protected HttpReceiverOverHTTP3(HttpChannelOverHTTP3 channel) { super(channel); + Executor executor = channel.getHttpDestination().getHttpClient().getExecutor(); + invoker = new SerializedInvoker(getClass().getSimpleName(), executor); } @Override @@ -87,75 +94,76 @@ protected HttpChannelOverHTTP3 getHttpChannel() return (HttpChannelOverHTTP3)super.getHttpChannel(); } - @Override - public void onNewStream(Stream.Client stream) + private HttpConnectionOverHTTP3 getHttpConnection() { - getHttpChannel().setStream(stream); + return getHttpChannel().getHttpConnection(); } - @Override - public void onResponse(Stream.Client stream, HeadersFrame frame) + Runnable onResponse(Stream.Client stream, HeadersFrame frame) { HttpExchange exchange = getHttpExchange(); if (exchange == null) - return; + return null; - HttpResponse httpResponse = exchange.getResponse(); - MetaData.Response response = (MetaData.Response)frame.getMetaData(); - httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); + return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), () -> + { + HttpResponse httpResponse = exchange.getResponse(); + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); - responseBegin(exchange); + responseBegin(exchange); - HttpFields headers = response.getHttpFields(); - for (HttpField header : headers) - { - responseHeader(exchange, header); - } + HttpFields headers = response.getHttpFields(); + for (HttpField header : headers) + { + responseHeader(exchange, header); + } - // TODO: add support for HttpMethod.CONNECT. + // TODO: add support for HttpMethod.CONNECT. - responseHeaders(exchange); + responseHeaders(exchange); + })); } - @Override - public void onDataAvailable(Stream.Client stream) + Runnable onDataAvailable() { if (LOG.isDebugEnabled()) LOG.debug("Data available notification in {}", this); HttpExchange exchange = getHttpExchange(); if (exchange == null) - return; + return null; - responseContentAvailable(exchange); + return invoker.offer(new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange))); } - @Override - public void onTrailer(Stream.Client stream, HeadersFrame frame) + Runnable onTrailer(HeadersFrame frame) { HttpExchange exchange = getHttpExchange(); if (exchange == null) - return; + return null; HttpFields trailers = frame.getMetaData().getHttpFields(); trailers.forEach(exchange.getResponse()::trailer); - responseSuccess(exchange, null); + return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), () -> responseSuccess(exchange, null))); } - @Override - public void onIdleTimeout(Stream.Client stream, Throwable failure, Promise promise) + Runnable onIdleTimeout(Throwable failure, Promise promise) { HttpExchange exchange = getHttpExchange(); - if (exchange != null) - exchange.abort(failure, Promise.from(aborted -> promise.succeeded(!aborted), promise::failed)); - else + if (exchange == null) + { promise.succeeded(false); + return null; + } + Runnable task = () -> promise.completeWith(exchange.getRequest().abort(failure)); + return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), task)); } - @Override - public void onFailure(Stream.Client stream, long error, Throwable failure) + Runnable onFailure(Throwable failure) { - responseFailure(failure, Promise.noop()); + Runnable task = () -> responseFailure(failure, Promise.noop()); + return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), task)); } } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index adbb86cc54e3..6017ee046153 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -31,7 +31,7 @@ * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)} * as appropriate.

*/ -public abstract class AbstractConnection implements Connection, Invocable +public abstract class AbstractConnection implements Connection { private static final Logger LOG = LoggerFactory.getLogger(AbstractConnection.class); @@ -280,7 +280,7 @@ public String toConnectionString() public abstract static class NonBlocking extends AbstractConnection { - private final Callback _nonBlockingReadCallback = new NonBlockingFillableCallback(); + private final Callback _nonBlockingFillableCallback = new NonBlockingFillableCallback(); public NonBlocking(EndPoint endPoint, Executor executor) { @@ -297,7 +297,7 @@ public NonBlocking(EndPoint endPoint, Executor executor) @Override public void fillInterested() { - fillInterested(_nonBlockingReadCallback); + fillInterested(_nonBlockingFillableCallback); } private class NonBlockingFillableCallback extends FillableCallback diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index f280630ca1eb..a2ff06eb82b2 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -601,15 +601,16 @@ public String toString() { try (AutoLock lock = _lock.tryLock()) { - boolean held = lock.isHeldByCurrentThread(); - return String.format("%s@%x{handling=%s, handled=%s, send=%s, completed=%s, request=%s}", + String held = lock.isHeldByCurrentThread() ? "" : "?"; + return String.format("%s@%x[%s:handling=%s,handled=%s,send=%s,completed=%s,request=%s]", this.getClass().getSimpleName(), hashCode(), - held ? _handling : "?", - held ? _handled : "?", - held ? _streamSendState : "?", - held ? _callbackCompleted : "?", - held ? _request : "?" + held, + _handling, + _handled, + _streamSendState, + _callbackCompleted, + _request ); } } diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HTTPDynamicTransportTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HTTPDynamicTransportTest.java index 5d8c5d342d03..da1e1fd0ff49 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HTTPDynamicTransportTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HTTPDynamicTransportTest.java @@ -77,6 +77,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeFalse; public class HTTPDynamicTransportTest extends AbstractTransportTest { @@ -151,6 +152,8 @@ public void testExplicitHTTPVersionWithSameHttpClientForAllHTTPVersions() throws @Test public void testNonExplicitHTTPVersionH3H2H1() throws Exception { + assumeFalse("ci".equals(System.getProperty("env"))); + int port = freePort(); ConnectionFactory h1 = new HttpConnectionFactory(); ConnectionFactory h2c = new HTTP2CServerConnectionFactory(); diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientLoadTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientLoadTest.java index d167d0081fe6..24053257e25c 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientLoadTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientLoadTest.java @@ -25,7 +25,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; +import org.awaitility.Awaitility; import org.eclipse.jetty.client.BytesRequestContent; +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.Request; import org.eclipse.jetty.client.Response; import org.eclipse.jetty.client.Result; @@ -40,7 +42,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.NanoTime; import org.eclipse.jetty.util.thread.Scheduler; -import org.hamcrest.Matchers; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertTrue; public class HttpClientLoadTest extends AbstractTest @@ -68,24 +70,26 @@ public void testIterative(TransportType transportType) throws Exception client.setMaxConnectionsPerDestination(32768); client.setMaxRequestsQueuedPerDestination(1024 * 1024); client.setIdleTimeout(120000); - client.start(); - - // At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity) - int runs = 1; - int iterations = 100; - for (int i = 0; i < runs; ++i) + try (HttpClient httpClient = client) { - run(transportType, iterations); - } + httpClient.start(); - // Re-run after warmup - iterations = 250; - for (int i = 0; i < runs; ++i) - { - run(transportType, iterations); - } + // At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity) + int runs = 1; + int iterations = 100; + for (int i = 0; i < runs; ++i) + { + run(transportType, iterations); + } - assertThat("Leaks: " + byteBufferPool.dumpLeaks(), byteBufferPool.getLeaks().size(), Matchers.is(0)); + // Re-run after warmup + iterations = 250; + for (int i = 0; i < runs; ++i) + { + run(transportType, iterations); + } + } + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Leaks: " + byteBufferPool.dumpLeaks(), byteBufferPool.getLeaks().size(), is(0))); } @ParameterizedTest @@ -101,15 +105,17 @@ public void testConcurrent(TransportType transportType) throws Exception client.setByteBufferPool(byteBufferPool); client.setMaxConnectionsPerDestination(32768); client.setMaxRequestsQueuedPerDestination(1024 * 1024); - client.start(); + try (HttpClient httpClient = client) + { + httpClient.start(); - int runs = 1; - int iterations = 128; - IntStream.range(0, 16).parallel().forEach(i -> + int runs = 1; + int iterations = 128; + IntStream.range(0, 16).parallel().forEach(i -> IntStream.range(0, runs).forEach(j -> - run(transportType, iterations))); - - assertThat("Connection Leaks: " + byteBufferPool.getLeaks(), byteBufferPool.getLeaks().size(), Matchers.is(0)); + run(transportType, iterations))); + } + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Leaks: " + byteBufferPool.dumpLeaks(), byteBufferPool.getLeaks().size(), is(0))); } private void run(TransportType transportType, int iterations) diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java index 479d22f28d08..4a9ab3f8a248 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ThreadStarvationTest.java @@ -166,11 +166,16 @@ public boolean handle(Request request, Response response, Callback callback) thr .timeout(2 * idleTimeout, TimeUnit.MILLISECONDS) .send(result -> { - // The response should arrive correctly, - // it is the request that failed. + // The response frames arrive for all protocols (on the network). + // It is the request that is failed because the request content was not sent. + // For HTTP/2 a RST_STREAM arrives to fail the request, but it likely fails + // the response too, by draining the queued DATA frames before they are read + // by the content listener. + // For the other protocols the request is failed by the total timeout. assertTrue(result.isFailed()); - assertNull(result.getResponseFailure()); assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, result.getResponse().getStatus()); + if (transportType != TransportType.H2C && transportType != TransportType.H2) + assertNull(result.getResponseFailure()); responseLatch.countDown(); }); diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/VirtualThreadsTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/VirtualThreadsTest.java index dc3f07e88ceb..b2d1c5f4c972 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/VirtualThreadsTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/VirtualThreadsTest.java @@ -13,35 +13,41 @@ package org.eclipse.jetty.test.client.transport; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.eclipse.jetty.client.ContentResponse; +import org.eclipse.jetty.client.Result; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.Content; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.VirtualThreads; +import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.ThreadPool; -import org.junit.jupiter.api.Assumptions; +import org.eclipse.jetty.util.thread.VirtualThreadPool; import org.junit.jupiter.api.condition.DisabledForJreRange; import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; @DisabledForJreRange(max = JRE.JAVA_18) public class VirtualThreadsTest extends AbstractTest { @ParameterizedTest - @MethodSource("transports") + @MethodSource("transportsNoFCGI") public void testHandlerInvokedOnVirtualThread(TransportType transportType) throws Exception { - // No virtual thread support in FCGI server-side. - Assumptions.assumeTrue(transportType != TransportType.FCGI); - String virtualThreadsName = "green-"; prepareServer(transportType, new Handler.Abstract() { @@ -71,4 +77,71 @@ public boolean handle(Request request, Response response, Callback callback) assertEquals(HttpStatus.OK_200, response.getStatus(), " for transport " + transportType); } + + @ParameterizedTest + @MethodSource("transports") + public void testBlockingClientListenersInvokedOnVirtualThread(TransportType transportType) throws Exception + { + testClientListeners(transportType, true); + } + + @ParameterizedTest + @MethodSource("transports") + public void testNonBlockingClientListenersInvokedOnPlatformThread(TransportType transportType) throws Exception + { + testClientListeners(transportType, false); + } + + private void testClientListeners(TransportType transportType, boolean blocking) throws Exception + { + startServer(transportType, new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception + { + // Send only the headers. + response.write(false, null, Callback.NOOP); + // Wait to force the client to invoke the content + // callback separately from the headers callback. + Thread.sleep(500); + // Send the content. + Content.Sink.write(response, true, "hello", callback); + return true; + } + }); + + prepareClient(transportType); + VirtualThreads.Configurable executor = (VirtualThreads.Configurable)client.getExecutor(); + VirtualThreadPool vtp = new VirtualThreadPool(); + vtp.setName("green-"); + executor.setVirtualThreadsExecutor(vtp); + Invocable.InvocationType invocationType = blocking ? Invocable.InvocationType.BLOCKING : Invocable.InvocationType.NON_BLOCKING; + client.getHttpClientTransport().setInvocationType(invocationType); + client.start(); + + for (int i = 0; i < 2; ++i) + { + AtomicReference resultRef = new AtomicReference<>(); + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + Consumer verify = name -> queue.offer((VirtualThreads.isVirtualThread() ? "virtual" : "platform") + "-" + name); + client.newRequest(newURI(transportType)) + .onResponseBegin(r -> verify.accept("begin")) + .onResponseHeaders(r -> verify.accept("headers")) + .onResponseContent((r, b) -> verify.accept("content")) + .onResponseSuccess(r -> verify.accept("success")) + .onComplete(r -> verify.accept("complete")) + .timeout(5, TimeUnit.SECONDS) + .send(r -> + { + verify.accept("send"); + resultRef.set(r); + }); + + Result result = await().atMost(5, TimeUnit.SECONDS).until(resultRef::get, notNullValue()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + String expected = blocking ? "virtual" : "platform"; + queue.forEach(event -> assertTrue(event.startsWith(expected), event)); + } + } } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index 10910bc2c7c9..5396131687ce 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -918,9 +918,10 @@ public boolean reset() @Override public String toString() { - try (AutoLock ignored = _lock.lock()) + try (AutoLock ignored = _lock.tryLock()) { - return String.format("%s@%x[%s, %b, %s]", getClass().getSimpleName(), hashCode(), _state, _aborted, _failure); + String held = _lock.isHeldByCurrentThread() ? "" : "?"; + return String.format("%s@%x[%s:%s,aborted=%b,failure=%s]", getClass().getSimpleName(), hashCode(), held, _state, _aborted, _failure); } } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java index 26ebfd861d7e..a843f86b3a6b 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java @@ -267,8 +267,8 @@ static InvocationType combineTypes(InvocationType... it) */ static InvocationType getInvocationType(Object o) { - if (o instanceof Invocable) - return ((Invocable)o).getInvocationType(); + if (o instanceof Invocable i) + return i.getInvocationType(); return InvocationType.BLOCKING; } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java index 7ea7048c7dc4..20240cd3d3e3 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java @@ -110,7 +110,7 @@ public Runnable offer(Runnable task) if (task == null) { if (LOG.isDebugEnabled()) - LOG.debug("Offering task null, skipping it in {}", this); + LOG.debug("Offering task null, skipping it on {}", this); return null; } // The NamedRunnable logger is checked to make it possible to enable the nice task names in a debugger @@ -125,12 +125,17 @@ public Runnable offer(Runnable task) task = new NamedRunnable(task); } } + Link link = new Link(task); - if (LOG.isDebugEnabled()) - LOG.debug("Offering link {} of {}", link, this); Link penultimate = _tail.getAndSet(link); + boolean queued = penultimate != null; + + if (LOG.isDebugEnabled()) + LOG.debug("{} {} on {}", queued ? "Queued" : "Offered", link, this); + if (penultimate == null) return link; + penultimate._next.lazySet(link); return null; } @@ -247,7 +252,7 @@ public void run() while (link != null) { if (LOG.isDebugEnabled()) - LOG.debug("Running link {} of {}", link, SerializedInvoker.this); + LOG.debug("Running {} of {}", link, SerializedInvoker.this); Runnable task = link.getTask(); InvocationType currentInvocationType = link.getInvocationType(); @@ -270,7 +275,7 @@ public void run() catch (Throwable t) { if (LOG.isDebugEnabled()) - LOG.debug("Failed while running link {} of {}", link, SerializedInvoker.this, t); + LOG.debug("Failed while running {} of {}", link, SerializedInvoker.this, t); onError(task, t); } finally diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java index 4a4bc1e2140a..51656947f20e 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java @@ -493,18 +493,18 @@ protected void onCompleteFailure(Throwable cause) icb.iterate(); - assertThat(icb.toString(), containsString("[PENDING, false,")); + assertThat(icb.toString(), containsString("PENDING,aborted=false,")); Throwable cause = new Throwable("test abort"); new Thread(() -> icb.abort(cause)).start(); - Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> icb.toString().contains("[PENDING, true,")); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> icb.toString().contains("PENDING,aborted=true,")); Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> aborted.get() != null); icb.succeeded(); // We are now complete, but callbacks have not yet been done - assertThat(icb.toString(), containsString("[COMPLETE, true,")); + assertThat(icb.toString(), containsString("COMPLETE,aborted=true,")); assertThat(failure.get(), nullValue()); assertFalse(completed.isMarked()); @@ -591,7 +591,7 @@ protected void onCompleteFailure(Throwable cause) new Thread(icb::iterate).start(); - Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> icb.toString().contains("[PROCESSING, true,")); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> icb.toString().contains("PROCESSING,aborted=true,")); // we have aborted, but onAborted not yet called assertThat(aborted.get(), nullValue()); @@ -831,7 +831,7 @@ protected void onCompleteFailure(Throwable cause) } Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(() -> callback.toString().contains(state)); - assertThat(callback.toString(), containsString("[" + state + ",")); + assertThat(callback.toString(), containsString("[:" + state + ",")); onAbort.set(null); if (success == Boolean.FALSE && (state.equals("COMPLETE") || state.equals("CLOSED"))) diff --git a/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/WriteAfterRedirectTest.java b/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/WriteAfterRedirectTest.java index fe205ce9e450..de9b5f47cdfe 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/WriteAfterRedirectTest.java +++ b/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/WriteAfterRedirectTest.java @@ -17,6 +17,7 @@ import java.io.OutputStream; import java.io.PrintWriter; import java.net.URI; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import jakarta.servlet.http.HttpServlet; @@ -31,11 +32,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; public class WriteAfterRedirectTest { @@ -107,13 +109,8 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IO assertThat(response.getContent().length, is(0)); assertThat(response.getHeaders().get(HttpHeader.LOCATION), is("/hello")); - // Following the redirect gives the hello page. - _client.setFollowRedirects(true); - response = _client.GET(_uri.resolve("redirect")); - assertThat(response.getStatus(), is(HttpServletResponse.SC_OK)); - assertThat(response.getContentAsString(), equalTo("hello world")); - // The write() in the servlet actually threw because the HttpOutput was closed. + await().atMost(1, TimeUnit.SECONDS).until(errorReference::get, notNullValue()); assertThat(errorReference.get(), instanceOf(IOException.class)); assertThat(errorReference.get().getMessage(), containsString("Closed")); } diff --git a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/AsyncIOServletTest.java b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/AsyncIOServletTest.java index dcc6e359626f..d4ebe705f8f3 100644 --- a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/AsyncIOServletTest.java +++ b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/AsyncIOServletTest.java @@ -285,7 +285,7 @@ public void onClosed(Connection connection) client.newRequest(newURI(transportType)) .method(HttpMethod.POST) .body(content) - .onResponseSuccess(r -> responseLatch.countDown()) + .onResponseHeaders(r -> responseLatch.countDown()) .timeout(5, TimeUnit.SECONDS) .send(result -> { @@ -1377,12 +1377,11 @@ public long getLength() client.newRequest(newURI(transportType)) .method(HttpMethod.POST) .body(content) - .onResponseSuccess(response -> + .send(result -> { - Assertions.assertEquals(HttpStatus.REQUEST_TIMEOUT_408, response.getStatus()); + Assertions.assertEquals(HttpStatus.REQUEST_TIMEOUT_408, result.getResponse().getStatus()); latch.countDown(); - }) - .send(null); + }); // Wait for the server to idle timeout. Thread.sleep(2 * idleTimeout); diff --git a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/ServerTimeoutsTest.java b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/ServerTimeoutsTest.java index bc13f35a6759..5e43248dbfd3 100644 --- a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/ServerTimeoutsTest.java +++ b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/ServerTimeoutsTest.java @@ -158,17 +158,21 @@ protected void service(HttpServletRequest request, HttpServletResponse response) long idleTimeout = 1000; setStreamIdleTimeout(idleTimeout); - CountDownLatch resultLatch = new CountDownLatch(2); + CountDownLatch resultLatch = new CountDownLatch(1); AsyncRequestContent content = new AsyncRequestContent(); client.POST(newURI(transportType)) .body(content) - .onResponseSuccess(response -> + .onResponseHeaders(r -> { - if (response.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) - resultLatch.countDown(); + // For the cases where the response is not failed, + // complete the request to complete the exchange. content.close(); }) - .send(result -> resultLatch.countDown()); + .send(result -> + { + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, result.getResponse().getStatus()); + resultLatch.countDown(); + }); // The client did not send the content, the request was // dispatched, the server should have idle timed it out. @@ -339,7 +343,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) CountDownLatch resultLatch = new CountDownLatch(1); client.newRequest(newURI(transportType)) .body(content) - .onResponseSuccess(response -> + .onResponseHeaders(response -> { responseRef.set(response); responseLatch.countDown(); diff --git a/jetty-ee11/jetty-ee11-servlet/src/test/java/org/eclipse/jetty/ee11/servlet/WriteAfterRedirectTest.java b/jetty-ee11/jetty-ee11-servlet/src/test/java/org/eclipse/jetty/ee11/servlet/WriteAfterRedirectTest.java index 726e57486d0c..eebb80c2aa53 100644 --- a/jetty-ee11/jetty-ee11-servlet/src/test/java/org/eclipse/jetty/ee11/servlet/WriteAfterRedirectTest.java +++ b/jetty-ee11/jetty-ee11-servlet/src/test/java/org/eclipse/jetty/ee11/servlet/WriteAfterRedirectTest.java @@ -17,6 +17,7 @@ import java.io.OutputStream; import java.io.PrintWriter; import java.net.URI; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import jakarta.servlet.http.HttpServlet; @@ -31,11 +32,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; public class WriteAfterRedirectTest { @@ -107,13 +109,8 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IO assertThat(response.getContent().length, is(0)); assertThat(response.getHeaders().get(HttpHeader.LOCATION), is("/hello")); - // Following the redirect gives the hello page. - _client.setFollowRedirects(true); - response = _client.GET(_uri.resolve("redirect")); - assertThat(response.getStatus(), is(HttpServletResponse.SC_OK)); - assertThat(response.getContentAsString(), equalTo("hello world")); - // The write() in the servlet actually threw because the HttpOutput was closed. + await().atMost(1, TimeUnit.SECONDS).until(errorReference::get, notNullValue()); assertThat(errorReference.get(), instanceOf(IOException.class)); assertThat(errorReference.get().getMessage(), containsString("Closed")); } diff --git a/jetty-ee11/jetty-ee11-tests/jetty-ee11-test-client-transports/src/test/java/org/eclipse/jetty/ee11/test/client/transport/AsyncIOServletTest.java b/jetty-ee11/jetty-ee11-tests/jetty-ee11-test-client-transports/src/test/java/org/eclipse/jetty/ee11/test/client/transport/AsyncIOServletTest.java index 976aa090f6f4..f1f1d7f6b0a3 100644 --- a/jetty-ee11/jetty-ee11-tests/jetty-ee11-test-client-transports/src/test/java/org/eclipse/jetty/ee11/test/client/transport/AsyncIOServletTest.java +++ b/jetty-ee11/jetty-ee11-tests/jetty-ee11-test-client-transports/src/test/java/org/eclipse/jetty/ee11/test/client/transport/AsyncIOServletTest.java @@ -285,7 +285,7 @@ public void onClosed(Connection connection) client.newRequest(newURI(transportType)) .method(HttpMethod.POST) .body(content) - .onResponseSuccess(r -> responseLatch.countDown()) + .onResponseHeaders(r -> responseLatch.countDown()) .timeout(5, TimeUnit.SECONDS) .send(result -> { @@ -1377,12 +1377,11 @@ public long getLength() client.newRequest(newURI(transportType)) .method(HttpMethod.POST) .body(content) - .onResponseSuccess(response -> + .send(result -> { - Assertions.assertEquals(HttpStatus.REQUEST_TIMEOUT_408, response.getStatus()); + Assertions.assertEquals(HttpStatus.REQUEST_TIMEOUT_408, result.getResponse().getStatus()); latch.countDown(); - }) - .send(null); + }); // Wait for the server to idle timeout. Thread.sleep(2 * idleTimeout); diff --git a/jetty-ee11/jetty-ee11-tests/jetty-ee11-test-client-transports/src/test/java/org/eclipse/jetty/ee11/test/client/transport/ServerTimeoutsTest.java b/jetty-ee11/jetty-ee11-tests/jetty-ee11-test-client-transports/src/test/java/org/eclipse/jetty/ee11/test/client/transport/ServerTimeoutsTest.java index 0e909e9c3db5..7502bea57a9d 100644 --- a/jetty-ee11/jetty-ee11-tests/jetty-ee11-test-client-transports/src/test/java/org/eclipse/jetty/ee11/test/client/transport/ServerTimeoutsTest.java +++ b/jetty-ee11/jetty-ee11-tests/jetty-ee11-test-client-transports/src/test/java/org/eclipse/jetty/ee11/test/client/transport/ServerTimeoutsTest.java @@ -158,17 +158,21 @@ protected void service(HttpServletRequest request, HttpServletResponse response) long idleTimeout = 1000; setStreamIdleTimeout(idleTimeout); - CountDownLatch resultLatch = new CountDownLatch(2); + CountDownLatch resultLatch = new CountDownLatch(1); AsyncRequestContent content = new AsyncRequestContent(); client.POST(newURI(transportType)) .body(content) - .onResponseSuccess(response -> + .onResponseHeaders(r -> { - if (response.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) - resultLatch.countDown(); + // For the cases where the response is not failed, + // complete the request to complete the exchange. content.close(); }) - .send(result -> resultLatch.countDown()); + .send(result -> + { + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, result.getResponse().getStatus()); + resultLatch.countDown(); + }); // The client did not send the content, the request was // dispatched, the server should have idle timed it out. @@ -339,7 +343,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) CountDownLatch resultLatch = new CountDownLatch(1); client.newRequest(newURI(transportType)) .body(content) - .onResponseSuccess(response -> + .onResponseHeaders(response -> { responseRef.set(response); responseLatch.countDown(); diff --git a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/AsyncIOServletTest.java b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/AsyncIOServletTest.java index 1023f3d134f4..6d1c4462aa90 100644 --- a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/AsyncIOServletTest.java +++ b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/AsyncIOServletTest.java @@ -287,7 +287,7 @@ public void onClosed(Connection connection) client.newRequest(newURI(transportType)) .method(HttpMethod.POST) .body(content) - .onResponseSuccess(r -> responseLatch.countDown()) + .onResponseHeaders(r -> responseLatch.countDown()) .timeout(5, TimeUnit.SECONDS) .send(result -> { @@ -1782,12 +1782,11 @@ public long getLength() client.newRequest(newURI(transportType)) .method(HttpMethod.POST) .body(content) - .onResponseSuccess(response -> + .send(result -> { - Assertions.assertEquals(HttpStatus.REQUEST_TIMEOUT_408, response.getStatus()); + Assertions.assertEquals(HttpStatus.REQUEST_TIMEOUT_408, result.getResponse().getStatus()); latch.countDown(); - }) - .send(null); + }); // Wait for the server to idle timeout. Thread.sleep(2 * idleTimeout); diff --git a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/ServerTimeoutsTest.java b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/ServerTimeoutsTest.java index 02a74a1968ca..43e70dd6366d 100644 --- a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/ServerTimeoutsTest.java +++ b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/ServerTimeoutsTest.java @@ -161,17 +161,21 @@ protected void service(HttpServletRequest request, HttpServletResponse response) long idleTimeout = 1000; setStreamIdleTimeout(idleTimeout); - CountDownLatch resultLatch = new CountDownLatch(2); + CountDownLatch resultLatch = new CountDownLatch(1); AsyncRequestContent content = new AsyncRequestContent(); client.POST(newURI(transportType)) .body(content) - .onResponseSuccess(response -> + .onResponseHeaders(r -> { - if (response.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) - resultLatch.countDown(); + // For the cases where the response is not failed, + // complete the request to complete the exchange. content.close(); }) - .send(result -> resultLatch.countDown()); + .send(result -> + { + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, result.getResponse().getStatus()); + resultLatch.countDown(); + }); // The client did not send the content, the request was // dispatched, the server should have idle timed it out. @@ -340,7 +344,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) CountDownLatch resultLatch = new CountDownLatch(1); client.newRequest(newURI(transportType)) .body(content) - .onResponseSuccess(response -> + .onResponseHeaders(response -> { responseRef.set(response); responseLatch.countDown(); diff --git a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/resources/jetty-logging.properties b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/resources/jetty-logging.properties index 4fdadbafe69a..7ae8c2aa5d4a 100644 --- a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/resources/jetty-logging.properties +++ b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/resources/jetty-logging.properties @@ -1,4 +1,4 @@ -#org.eclipse.jetty.LEVEL=DEBUG +org.eclipse.jetty.LEVEL=DEBUG org.eclipse.jetty.jmx.LEVEL=INFO #org.eclipse.jetty.client.LEVEL=DEBUG #org.eclipse.jetty.fcgi.LEVEL=DEBUG