typedef int (*funcptr)();

An engineers technical notebook

Embedding ZeroMQ in the libev Event Loop

In a previous article on ZeroMQ we went over how ZeroMQ is triggered when you use the socket that ZeroMQ returns, in that article there was some discussion of embedding ZeroMQ into another event loop. Let's do that.

libev is an absolutely fantastic library that helps make it easy to write evented programs. Evented programs work by getting notified that an action has happened, and acting upon it. Unlike threaded where multiple pieces of work are being executed at the same time, in an evented system you move every item that could block to an event loop, that then calls back into user code with a notification to continue. If one event uses up more than its fair share of CPU time because it is busy doing a long calculation, every single other event that is waiting will never get notified.

Now, as previously discussed ZeroMQ is edge triggered, so embedding it into an event loop that is level triggered doesn't do us much good, because we will miss certain ZeroMQ notifications.

One way to solve this problem is by looping over ZeroMQ's event system until we get back a notification that it no longer has anything else for us to process, that would look something like this1:

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

do {
    if (zevents & ZMQ_POLLIN) {
        // We can read from the ZeroMQ socket
    } else {
        break;
    }

    // Check to see if there is more to read ...
    zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);
} while (zevents & ZMQ_POLLIN);

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

However if we are receiving information from ZeroMQ remote endpoints faster than we can process them, we end up being stuck in that do ... while loop forever. If we have other events we want to process, that isn't entirely fair since they will never ever get called again. Especially in a server application where it may be servicing thousands of clients this is simply not acceptable.

libev

libev provides various different event notifications, to be able to get around edge triggered notifications, and still provide fair round-robin for all events we are going to have to build on top of multiple different events.

The events used will be:

  • ev::io: This one is pretty self explanatory, this is for getting notified about input output changes. This is the one we are going to use on the ZMQ_FD.
  • ev::prepare and ev::check: These two are generally used together, they can be used to change the event loop and or make modifications on the fly to events that have been registered with the event loop.
  • ev::idle: This is an event that gets fired whenever the event loop has nothing else to do, so no other events fired, this will fire.

Plan of attack

Since the prepare and check run before and after the loop, we are going to be using those to do most of the work. We use an io so that we can turn off the idle when we can actually wait for a result from ZeroMQ's file descriptor, otherwise we use idle so that we will always get called once every loop.

In the prepare watcher callback we do the following:

  1. Check to see what events ZeroMQ has for us, and check what events the user has requested.
  2. If the ZeroMQ has an event for us that we want, and the user has requested that event, we start the idle watcher.
  3. If ZeroMQ has no events, we start the io watcher.

In the check watcher callback we do the following:

  1. Stop both the io and idle watchers, they were only there to make sure that our check watcher was called.
  2. See what event ZeroMQ has for us, and check that against what the user wants. Depending on the event, call user defined function write() or user defined function read().
  3. If this was a spurious wake-up on the part of ZeroMQ we simply ignore it and let libev go on to other events.

We could make all of this work by simply using the prepare, check and idle watchers, but that would mean libev would be busy-waiting for something to happen on the ZeroMQ socket. The io watcher is required simply so in times of nothing happening libev in its library can call into the kernels event handling mechanism and go to sleep. We can't use just the io watcher due to the edge-triggered notification, because we'd miss all kinds of ZeroMQ messages. So all four watchers are required, and play crucial parts in making this work.

Let's get down to code

Below you will find example code, it is not complete. Do note that I am using some C++11isms, error checking code may not be complete/correct and in general I don't suggest you copy and paste this without reading and understanding what it does.

The zmq_event class is meant to be used as a base class, inherit from it, and create the write() and read() functions. These functions will be called when you are able to read from the ZeroMQ socket, or when you are able to write to the ZeroMQ socket. You are guaranteed to be able to read one whole ZeroMQ message, so if it is a multi-part message, do make sure to loop on ZMQ_SNDMORE as required.

Upon instantiation it will automatically start being notified about events, we start off with ev::READ. When your sub-class wants to write to ZeroMQ it should put the messages to be written into a list somewhere, and set ev::READ | ev::WRITE on watcher_io, by calling watcher_io.set(socket_fd, ev::READ | ev::WRITE). write() will then be called, write a single message to ZeroMQ, and if necessary when finished writing, unset ev::WRITE using watcher_io.set(socket_fd, ev::READ). If you are not finished writing, after writing that singular message you may return and write() will be called again the next loop iteration. This way if you have a lot of data to write you don't starve the other events from receiving their notifications.

zmq_event.h

#include <string>

#include <ev++.h>
#include <zmq.hpp>

class zmq_event {
    public:
        zmq_event(zmq::context_t& context, int type, const std::string& connect);
        virtual ~zmq_event();

    protected:
        // This gets fired before the event loop, to prepare
        void before(ev::prepare& prep, int revents);

        // This is fired after the event loop, but before any other type of events
        void after(ev::check& check, int revents);

        // We need to have a no-op function available for those events that we
        // want to add to the list, but should never fire an actual event
        template <typename T>
            inline void noop(T& w, int revents) {};

        // Function we are going to call to write to the ZeroMQ socket
        virtual void write() = 0;

        // Function we are going to call to read from the ZeroMQ socket
        virtual void read() = 0;

        // Some helper function, one to start notifications
        void start_notify();

        // And one to stop notifications.
        void stop_notify();

        // Our event types
        ev::io      watcher_io;
        ev::prepare watcher_prepare;
        ev::check   watcher_check;
        ev::idle    watcher_idle;

        // Our ZeroMQ socket
        zmq::socket_t socket;
        int           socket_fd = -1;
};

zmq_event.cc

#include <stdexcept>

#include "zmq_event.h"

zmq_event::zmq_event(zmq::context_t& context, int type, const std::string& connect) : socket(context, type) {
    // Get the file descriptor for the socket
    size_t fd_len = sizeof(_socket_fd);
    socket.getsockopt(ZMQ_FD, &socket_fd, &fd_len);

    // Actually connect to the ZeroMQ endpoint, could replace this with a bind as well ...
    socket.bind(connect.c_str());

    // Set up all of our watchers

    // Have our IO watcher check for READ on the ZeroMQ socket
    watcher_io.set(socket_fd, ev::READ);

    // This watcher has a no-op callback
    watcher_io.set<zmq_event, &zmq_event::noop>(this);

    // Set up our prepare watcher to call the before() function
    watcher_prepare.set<zmq_event, &zmq_event::before>(this);

    // Set up the check watcher to call the after() function
    watcher_check.set<zmq_event, &zmq_event::after>(this);

    // Set up our idle watcher, once again a no-op
    watcher_idle.set<zmq_event, &zmq_event::noop>(this);

    // Tell libev to start notifying us!
    start_notify();
}

zmq_event::~zmq_event() {}

zmq_event::before(ev::prepare&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Get any events that may be waiting
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);

    // Lucky for us, getting the events available doesn't invalidate the
    // events, so that calling this in `before()` and in `after()` will
    // give us the same results.
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check what events exists, and check it against what event we want. We
    // "abuse" our watcher_io.events for this information.
    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        watcher_idle.start();
        return;
    }

    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        watcher_idle.start();
        return;
    }

    // No events ready to be processed, we'll just go watch some io
    watcher_io.start();
}

zmq_event::after(ev::check&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Stop both the idle and the io watcher, no point in calling the no-op callback
    // One of them will be reactived by before() on the next loop
    watcher_idle.stop();
    watcher_io.stop();

    // Get the events
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check the events and call the users read/write function
    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        this->read();
    }

    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        this->write();
    }
}

zmq_event::start_notify() {
    watcher_check.start();
    watcher_prepare.start();
}

zmq_event::stop_notify() {
    watcher_check.stop();
    watcher_prepare.stop();
}

Other event loops

libev is but just one of many event loops that exist out there, hopefully this shows how it is possible to embed ZeroMQ into an event loop, thereby making it easier to embed ZeroMQ into any other event loops.


  1. This snippet was from my older article regarding ZeroMQ edge triggered notifications. I would highly suggest reading that article for more information and even more background on what is going on. 

ZeroMQ - Edge Triggered Notification

ZeroMQ is an absolutely fantastic communications library that has allowed me to very easily create networks/connections between multiple different applications to communicate in an extremely fast yet directed manner and allow scaling of the connected entities with almost no second thought.

I have used ZeroMQ within Python and C++, and it has been fantastic leaving that part of the communication layer to a library rather than writing something similar myself. However, embedding ZeroMQ into an another event loop has proven to have some quirks. They are documented in the API, but not so much by other users/examples found around the web, and caught me by surprise.

This post mainly goes over the issue, the difference between edge and level triggered and how ZeroMQ works. I am hoping to spend some time documenting how to embed ZeroMQ correctly inside libev at a later date.

The quirk

ZeroMQ allows you to get an underlying file descriptor (ZMQ_FD) for a ZeroMQ socket using getsockopt(), this is not the actual ZeroMQ file descriptor but a stand-in:

int fd = 0;
size_t fd_len = sizeof(fd);
zmq_socket.getsockopt(ZMQ_FD, &fd, &fd_len);

This file descriptor can then be passed on to poll(), select(), kqueue() or whatever your event notification library of choice may be, the idea being that if the file descriptor is notified to be available for reading then you check the sockets ZMQ_EVENTS and find out if you are allowed to read or write to the ZeroMQ socket, or even possibly neither (false positive). The same file descriptor is also used internally within the ZeroMQ library for notification and thus we may receive an event notification that we can't do anything with.

As stated in the ZeroMQ API guide for getsockopt() under ZMQ_FD:

the ØMQ library shall signal any pending events on the socket in an edge-triggered fashion by making the file descriptor become ready for reading.

and further it states:

The ability to read from the returned file descriptor does not necessarily indicate that messages are available to be read from, or can be written to, the underlying socket; applications must retrieve the actual event state with a subsequent retrieval of the ZMQ_EVENTS option.

So far so good, so after we return from our event notification on the file descriptor we simply ask ZeroMQ whether or not we can read or write (or possibly both):

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

if (zevents & ZMQ_POLLIN) {
    // We can read from the ZeroMQ socket
}

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

So now you are happily reading from ZeroMQ and using the data as you wish, except you start to notice that sometimes it can take a little while for data to show up, or that if the end-point you are connected to on ZeroMQ sends a lot of data it is not delivered in a timely fashion. This is where the documentation for edge triggered comes into play, and that is something that is different about ZeroMQ compared to for example BSD sockets.

Edge triggered versus level triggered

Edge and level triggered refer to what kind of signal a device will output in an attempt to gain the attention of another device. Edge and level triggered have their history in electronics and hardware, where they play a very crucial role.

For both level and edge triggered assume the data is represented by this simple string:

00000000000111111111111111111111100000000011111111111000000000

The zeroes (0) are where no data is available, and the ones (1) are where data is available for processing.

Level triggered

Level triggered devices when they require attention, and until they no longer require attention will signal with a high voltage (going back to early electronics), and when they no longer require attention signal with a low voltage.

Data:  00000000001111111111111111111111000000000011111111111000000000
Block:      A               B              C         D
+ 5v              ____________________           _________
  -    __________|                    |_________|         |__________
  0v
State: 0          1                    0          1        0

At this point, once the signal turns to 1 in block B we can start processing data, and until we are done processing data the signal will stay 1. Once we are done processing the device will signal that we are done by setting the state to 0 in block C, and the process can repeat itself with block D.

Edge triggered

Edge triggered devices when they require attention will simply turn on the signal for a short period of time and then turn it back off until all of the requireding processing is complete at which point they will once again signal that they need processing.

Data:  00000000001111111111111111111111100000000011111111111000000000
Block:      A     B              C                 D
+ 5v              __                              __
  -    __________|  |____________________________|  |________________
  0v
State: 0          1              0                1        0

In this case when the trigger arrives you have to process all of the data available. When block B arrives and we get 1 as our signal, we have to process all of the data, we won't know when we are done other than some other notification scheme by the device. (Such as a special message that says we are done). Then when we are done we can go back to listening to waiting for a trigger.

If however in the time that we are processing the data from the trigger at B receive more data to process we won't get notified because we weren't officially done processing data from the device, and thus the device will not notify us of the new data again until we have processed all of the data.

There are some devices that as soon as you start processing their data, you re-enable the trigger, so if you receive new data while still processing the old you will get a new notification.

ZeroMQ uses edge triggered

When multiple message arrive for the ZeroMQ socket, one notification is sent. You check to see if the ZMQ_EVENTS actually contains ZMQ_POLLIN and then read a single message from ZeroMQ. Content that you have processed the data ZeroMQ has for you, you add the file descriptor back to the event notification library and wait. However, you know you sent two messages, but all you got was one. The next time you send data to the socket, it returns the second message, and you have now send three but the third doesn't seem to arrive.

This is where the edge triggered comes in. ZeroMQ only notifies you once, and after it has notified you it won't notify you again until it receives new messages, EVEN if you have messages waiting for you. In BSD sockets when you recv() data from the socket, if it is not all of the data it will simply tell you, and until all of the data has been read that the socket has to offer it is going to tell you (level triggered).

This means that when you get a notification from ZeroMQ you have to use a loop to process all of the messages that are coming your way:

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

do {
    if (zevents & ZMQ_POLLIN) {
        // We can read from the ZeroMQ socket
    } else {
        break;
    }

    // Check to see if there is more to read ...
    zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);
} while (zevents & ZMQ_POLLIN);

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

The same is true if you need to send multiple messages, you will need to check ZMQ_EVENTS for ZMQ_POLLOUT and send messages so long as that holds true, as soon as it is no longer true you will need to wait until you get triggered again by using the file descriptor in your event notification library.

Embedding or co-existing with another event loop

Embedding ZeroMQ into a different event loop becomes more difficult when you realise that it is edge triggered versus level triggered. If you want to allow fair processing of data between events it becomes harder to accomplish with ZeroMQ because you are required to process all of the data before going back to your event loop or requires the setting up of different types of events depending on the state that the ZeroMQ socket is in to make sure you process all of the data and don't forget any.

Edge triggered for the ZeroMQ file descriptor was a design decision which unfortunately made the use of ZeroMQ sockets more difficult and different from BSD sockets which makes it harder for application programmers to do the right thing. There are plenty of questions on the internet regarding the proper usage of ZMQ_FD with various different event notification schemes/implementations.

IPv6 -- getaddrinfo() and bind() ordering with V6ONLY

Recently I ran into an issue that took me a while to sort out, and it is regarding inconsistent behaviour on various OS's with regards to IPv6 sockets (AF_INET61) and calling bind(2) after getting the results back from getaddrinfo(3).

A call to getaddrinfo() with the hints set to AF_UNSPEC in ai_family and AI_PASSIVE in ai_flags will return to us 1 or more results that we can bind() to. Sample code for that looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct addrinfo hints, *addrlist;

memset(&hints, 0, sizeof(hints));

// Ask for TCP
hints.ai_socktype = SOCK_STREAM;

// Any family works for us ...
hints.ai_family = AF_UNSPEC;

// Set some hints
hints.ai_flags = 
            AI_PASSIVE    | // We want to use this with bind
            AI_ADDRCONFIG;  // Only return IPv4 or IPv6 if they are configured

int rv;

if ((rv = getaddrinfo(0, "7020", &hints, &addrlist)) != 0) {
    fprintf(stderr, "getaddrinfo: %s", gai_strerror(rv));
    return 1;
}

// Use the list in *addrlist
for (addr = addrlist; addr != 0; addr = addr->ai_next) {
    // use *addr as appropriate
}

// Clean up the memory from getaddrinfo()
freeaddrinfo(addrlist);

On Linux there are two entries returned when the host it is run on has both IPv4 and IPv6 enabled. An AF_INET which was followed by an AF_INET6. Now, it is not said that you are required to use all of the results that are returned, but if you want to listen on all address families it is off course suggested.

Following the steps below for each of the returned results should result in having 1 or more different sockets that are bound to a single port.

  1. Create the socket()
  2. Set any socket options you want (SO_REUSEADDR for example)
  3. Then bind() the socket
  4. After that call listen() (followed off course by accept() on the socket)

Only for some unknown reason (and errno is no help) bind() fails when you get to the AF_INET6, which was returned second. Searching online as to why the bind would fail doesn't give you any good results and the thing that is even worse is that if you run the same code on another platform such as FreeBSD, OpenIndiana or Mac OS X no such failure exists. However I started suspecting something was up when I started looking at the output from netstat -lan | grep 7020 on Mac OS X. Where 7020 is the port I passed into getaddrinfo().

tcp46      0      0  *.7020                 *.*  LISTEN     
tcp4       0      0  *.7020                 *.*  LISTEN

Wait a minute ... one of the sockets is on both IPv4 and on IPv6. Some more time spent searching the internet I came across RFC 3493 section 5.3, which is titled "IPV6_V6ONLY option for AF_INET6 Sockets".

As stated in section <3.7 Compatibility with IPv4 Nodes>, AF_INET6 sockets may be used for both IPv4 and IPv6 communications. Some applications may want to restrict their use of an AF_INET6 socket to IPv6 communications only.

This was going down the right route, so I changed my code so that in the steps listed above in number 2 I added the following code if the socket type is AF_INET6:

1
2
3
4
5
if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof(int)) == -1) {
    close(sockfd);
    fprintf(stderr, "setsockopt: %s IPV6_V6ONLY\n", strerror(errno));
    continue;
}

The RFC 3493 section 5.3 also states that this option should be turned off by default, which means that all IPv6 sockets can also communicate over IPv4. Thus technically setting the option manually in code the best way to fix the issue. FreeBSD has had this feature turned on (as in IPv6 sockets can only communicate with IPv6 and NOT IPv4) since 5.x.

The biggest issue is that the remaining operating systems (OS X and OpenIndiana) don't have the same behaviour as Linux which makes troubleshooting this issue more difficult than it should be. The issue is that the RFC doesn't specify what exactly the operating should do when it encounters a request to bind to the same port on IPv4 and IPv6. The only place where I have found this documented is in "IPv6 Network Programming" under "Tips in IPv6 Programming" chapter 4, section 4, appropriately titled "bind(2) Ordering and Conflicts".


If you get a bind() error when attempting to bind to an AF_INET6 socket please make sure that you set the socket option IPV6_V6ONLY on the AF_INET6 socket. The default as required by RFC 3493 is to have that option be off. The default is wrong, and the RFC should have been more specific regarding what the right behaviour is when attempting to bind on an AF_INET6 socket when already bound on an AF_INET while IPV6_V6ONLY is set to false.

The full code that I used for testing, along with a little bit more information is available as a gist on github.


  1. The old BSD style socket() called for defines starting with PF_ such as PF_INET and PF_INET6 with the PF standing for protocol family. POSIX starts them with AF_, and calls them an address family. On almost every operating system PF_INET is the same as AF_INET. If the define doesn't exist you can always create it. 

OpenSSL as a Filter (or non-blocking OpenSSL)

OpenSSL as a library at first glance is complicated, and then you realise that a lot of the documentation seems to be incomplete or missing. Generally the individual man pages for the various functions are not bad, and they will give you relevant information to help you along, but it can be hard to find.

The idea is to write a filter in such a way that OpenSSL can easily be disabled or removed. We don't want to rely on OpenSSL specific functionality in our code, for instance in the future we may want to change to using Botan or another SSL/TLS library without having to change a lot of core functionality.

Possible methods

There are various ways of doing non-blocking OpenSSL, the main one is to simply set the underlying socket to non-blocking and pass it into OpenSSL, at that point the functions SSL_write() and SSL_read() will enter various error states that can be read by SSL_get_error(), more specifically the function will return either SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE.

The other method, the one used below, was described by Marc Lehmann in a post to the libev mailling list and uses memory BIO objects. Specifically he linked to some Perl sample code; the OpenSSL setup, and the function called for new incoming data/data to be written: dotls function.

The code is very new and might still be buggy, but it outlines the principles: use a memory stream, which will avoid all issues with blocking in openssl

It shows the idea pretty clearly but unless you know Perl it can be really confusing as to what is going on.

The SSL Filter implementation

The SSL_CTX that is passed in contains all of the initialisation that is done for OpenSSL in general, which is not shown here. The SSLFilter is created for each time you want to have a socket start doing SSL.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// openssl_filter.h

class SSLFilter {
    public:
        SSLFilter(SSL_CTX* ctxt,
                  std::string* nread,
                  std::string* nwrite,
                  std::string* aread,
                  std::string* awrite);
        virtual ~SSLFilter();

        void update();

    private:
        bool continue_ssl_(int function_return);

        SSL * ssl;
        BIO * rbio;
        BIO * wbio;

        std::string* nread;
        std::string* nwrite;
        std::string* aread;
        std::string* awrite;
};

The class contains mainly various different bits of state, when you create the filter you pass in a pointer to four different strings, they are used as follows:

  • nread: This contains data to be processed by the filter, from the network
  • nwrite: This contains data that has been processed by the filter, and has to be sent to the network
  • aread: This contains data that has been processed by the filter, and is ready to be processed by the application
  • awrite: This contains data that the application wants to send out to the network and is ready for processing by the filter.

The other private variables contain various different pieces of state and are created by the filters constructors.

  • ssl: Contains the state for the current SSL connection.
  • rbio: Contains the data that the SSL functions will read from, data put in here is copied from nread.
  • wbio: Contains the data that the SSL functions will write to, data from this BIO is copied to nwrite.

The implementation is the really interesting part.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
// openssl_filter.cc

#include <stdexcept>

#include "openssl_filter.h"

SSLFilter::SSLFilter(SSL_CTX* ctxt, 
                     std::string* nread, 
                     std::string* nwrite,
                     std::string* aread,
                     std::string* awrite)
                      :
                     nread(nread), 
                     nwrite(nwrite), 
                     aread(aread), 
                     awrite(awrite)

    rbio = BIO_new(BIO_s_mem());
    wbio = BIO_new(BIO_s_mem());

    ssl = SSL_new(ctxt);

    SSL_set_accept_state(ssl);
    SSL_set_bio(ssl, rbio, wbio);
}

SSLFilter::~SSLFilter() {
    SSL_free(_ssl);
}

void SSLFilter::update(Filter::FilterDirection) {
    // If we have data from the network to process, put it the memory BIO for OpenSSL
    if (!nread->empty()) {
        int written = BIO_write(rbio, nread->c_str(), nread->length());
        if (written > 0) nread->erase(0, written);
    }

    // If the application wants to write data out to the network, process it with SSL_write
    if (!awrite->empty()) {
        int written = SSL_write(ssl, awrite->c_str(), awrite->length());

        if (!continue_ssl_()) {
            throw std::runtime_error("An SSL error occured.");
        }

        if (written > 0) awrite->erase(0, written);
    }

    // Read data for the application from the encrypted connection and place it in the string for the app to read
    while (1) {
        char *readto = new char[1024];
        int read = SSL_read(ssl, readto, 1024);

        if (!continue_ssl_()) {
            delete readto;
            throw std::runtime_error("An SSL error occured.");
        }

        if (read > 0) {
            size_t cur_size = aread->length();
            aread->resize(cur_size + read);
            std::copy(readto, readto + read, aread->begin() + cur_size);
        }

        delete readto;

        if (static_cast<size_t>(read) != 1024 || written == 0) break;
    }

    // Read any data to be written to the network from the memory BIO and copy it to nwrite
    while (1) {
        char *readto = new char[1024];
        int read = BIO_read(wbio, readto, 1024);

        if (read > 0) {
            size_t cur_size = nwrite->length();
            nwrite->resize(cur_size + read);
            std::copy(readto, readto + read, nwrite->begin() + cur_size);
        }

        delete readto;

        if (static_cast<size_t>(read) != 1024 || read == 0) break;
    }
}

bool SSLFilter::continue_ssl_(int function_return) {
    int err = SSL_get_error(ssl, function_return);

    if (err == SSL_ERROR_NONE || err == SSL_ERROR_WANT_READ) {
        return true;
    }

    if (err == SSL_ERROR_SYSCALL) {
        ERR_print_errors_fp(stderr);
        perror("syscall error: ");
        return false;
    }

    if (err == SSL_ERROR_SSL) {
        ERR_print_errors_fp(stderr);
        return false;
    }
    return true;
}

Explanation of the SSL Filter class

Line 7 - 25, we create the two memory BIO's and then call SSL_new() using the passed in SSL_CTX, set the accept set for SSL and then we hook up the two BIO objects to the ssl object.

Line 27 - 29, All we do in the destructor is SSL_free() the SSL state, which automatically takes care of freeing the two memory BIOs that were given to it.

Line 33 - 36, if nread is not empty we copy the data into rbio. rbio is used by the SSL functions to read from (as if it were reading from a socket). We read into the memory BIO as much as possible, technically this should be everything, but it is possible it won't read everything due to memory constraints, next time SSLFilter::update() is called it will get emptied as much as possible again.

Line 39 - 47, we see if there is anything that the app wants to write out to the remote client, if there is we call SSL_write(). After the call to SSL_write() we call continue_ssl_() to see if we can safely continue using SSL.

Line 50 - 68, this is where try to read as much data from SSL_read() as we can and place it in aread. This is data that has been decrypted by OpenSSL and can be used by the application. After the call to SSL_read() we call continue_ssl_() to see if we can safely continue using SSL.

Line 71 - 84, if OpenSSL has to write something to the network, either directly due to something the app did (by filling awrite) or because of something the remote client sent, it will place it in its memory buffer (wbio), what we do here is drain that memory buffer and fill nwrite.

Line 87 - 105, the continue_ssl_() function gets the SSL errors using SSL_get_error and checks that either it is SSL_ERROR_NONE or SSL_ERROR_WANT_READ both of which are acceptable errors, any other errors and we check to see if it is an error due to a system call or due to an SSL error, we print out the errors to standard error and return false. At that point the calling code is free to go about its business as it pleases (most likely throwing an error).

Notes regarding this sample code ...

The only thing missing in this example code is checking to see if the SSL connection has gone into an SSL_shutdown() mode. This should be added so that you cleanly shut down an SSL connection, but it is not absolutely required, especially since it seems to be shaky as to whether or not it will work, especially if you are using SSLv3 or SSLv2.

How to use the SSL Filter

Here are the steps to using this filter:

  1. At program startup set up the SSL_CTX structure as required, so that it be used in the SSLFilter
  2. Set up socket/event loop and accept() new socket.
  3. Create four new std::string's and create a new SSLFilter, pass in the SSL_CTX you created, and the four buffers.
  4. Add the new socket to the event loop to wait for reading.

Now upon receiving a read ready status from the event loop, the following should be done:

  1. Read data into nread buffer.
  2. call SSLFilter::update()
  3. See if aread is not empty, process the data as you please
  4. Write data to awrite as needed, if data is written to awrite, call SSLFilter::update(WRITE) to process the data
  5. See if nwrite is not empty, if so add socket to event loop for write readiness.

Once you receive a write ready status from the event loop, you should do the following:

  1. Write data in nwrite to the socket.
  2. If nwrite is empty, remove the socket from the event loop for writing (so that you don't have the event loop notifying you of the ability to write, even-though you have nothing to write).

Now it becomes simple to add OpenSSL to any open socket, in a way that is easy to refactor later and or disable depending on parameters passed in as arguments to the program. If you set nread to the same string as aread and nwrite to awrite and don't create an SSLFilter you have bypassed the filter...