Skip to content

Commit

Permalink
Align with 9.0.x onwards - address BZ 65408 and BZ 65755
Browse files Browse the repository at this point in the history
Things reached the point where the remaining patches to back-port were
so intertwined I was having difficulty unpicking them. I also needed to
back-port a few additional changes I had been trying not to back-port
because of due to backwards compatibility issues for users extending the
Endpoint classes.

If users extending (or possibly using) Endpoint classess experience
compatibility issues, then my plan is to retrofit fixes for any specific
issues observed.

The changes covered by this commit include:
- Refactoring to introduce a common Acceptor class.
- Pulling up the Socket/SocketWrapper map to the AbstractEndpoint
- Refactoring socket close so it is controlled by the SocketWrapper
- Completing the introduction of using dummy channels, buffers etc once
  the socket has been closed

The tests currently all pass - hence this commit - but there is still a
little more work to do for i18n, reviewing diffs and back-porting a few
additional features like portOffset and graceful close.

https://bz.apache.org/bugzilla/show_bug.cgi?id=65408
https://bz.apache.org/bugzilla/show_bug.cgi?id=65755
  • Loading branch information
markt-asf committed Feb 18, 2022
1 parent 62ff43a commit 01f2cf2
Show file tree
Hide file tree
Showing 15 changed files with 1,088 additions and 1,899 deletions.
3 changes: 0 additions & 3 deletions java/org/apache/catalina/security/SecurityClassLoad.java
Expand Up @@ -183,9 +183,6 @@ private static final void loadTomcatPackage(ClassLoader loader) throws Exception
// net
loader.loadClass(basePackage + "util.net.Constants");
loader.loadClass(basePackage + "util.net.DispatchType");
loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableAdd");
loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableCancel");
loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableRemove");
loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$AprOperationState");
loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$NioOperationState");
loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$Nio2OperationState");
Expand Down
38 changes: 24 additions & 14 deletions java/org/apache/coyote/AbstractProtocol.java
Expand Up @@ -19,7 +19,7 @@
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -717,7 +717,6 @@ protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<
private final AbstractProtocol<S> proto;
private final RequestGroupInfo global = new RequestGroupInfo();
private final AtomicLong registerCount = new AtomicLong(0);
private final Map<S,Processor> connections = new ConcurrentHashMap<>();
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);

public ConnectionHandler(AbstractProtocol<S> proto) {
Expand Down Expand Up @@ -756,7 +755,7 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {

S socket = wrapper.getSocket();

Processor processor = connections.get(socket);
Processor processor = (Processor) wrapper.getCurrentProcessor();
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
processor, socket));
Expand Down Expand Up @@ -840,7 +839,7 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
wrapper.getSslSupport(getProtocol().getClientCertProvider()));

// Associate the processor with the connection
connections.put(socket, processor);
wrapper.setCurrentProcessor(processor);

SocketState state = SocketState.CLOSED;
do {
Expand All @@ -862,7 +861,7 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
// Create the upgrade processor
processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
// Associate with the processor with the connection
connections.put(socket, processor);
wrapper.setCurrentProcessor(processor);
} else {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
Expand All @@ -883,7 +882,7 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
processor, wrapper));
}
// Associate with the processor with the connection
connections.put(socket, processor);
wrapper.setCurrentProcessor(processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
Expand Down Expand Up @@ -915,7 +914,7 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
connections.remove(socket);
wrapper.setCurrentProcessor(null);
release(processor);
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
Expand All @@ -941,7 +940,7 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
// Connection closed. OK to recycle the processor.
// Processors handling upgrades require additional clean-up
// before release.
connections.remove(socket);
wrapper.setCurrentProcessor(null);
if (processor.isUpgrade()) {
UpgradeToken upgradeToken = processor.getUpgradeToken();
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
Expand Down Expand Up @@ -999,7 +998,7 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {

// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
wrapper.setCurrentProcessor(null);
release(processor);
return SocketState.CLOSED;
}
Expand All @@ -1019,7 +1018,15 @@ protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {

@Override
public Set<S> getOpenSockets() {
return connections.keySet();
Set<SocketWrapperBase<S>> set = proto.getEndpoint().getConnections();
Set<S> result = new HashSet<>();
for (SocketWrapperBase<S> socketWrapper : set) {
S socket = socketWrapper.getSocket();
if (socket != null) {
result.add(socket);
}
}
return result;
}


Expand Down Expand Up @@ -1063,8 +1070,8 @@ private void release(Processor processor) {
*/
@Override
public void release(SocketWrapperBase<S> socketWrapper) {
S socket = socketWrapper.getSocket();
Processor processor = connections.remove(socket);
Processor processor = (Processor) socketWrapper.getCurrentProcessor();
socketWrapper.setCurrentProcessor(null);
release(processor);
}

Expand Down Expand Up @@ -1132,8 +1139,11 @@ public final void pause() {
* Note that even if the endpoint is resumed, there is (currently)
* no API to inform the Processors of this.
*/
for (Processor processor : connections.values()) {
processor.pause();
for (SocketWrapperBase<S> wrapper : proto.getEndpoint().getConnections()) {
Processor processor = (Processor) wrapper.getCurrentProcessor();
if (processor != null) {
processor.pause();
}
}
}
}
Expand Down

0 comments on commit 01f2cf2

Please sign in to comment.