Skip to content
Snippets Groups Projects
connection_client.hpp 6.9 KiB
Newer Older
/*
 * connection_client.hpp
 *
 *  Created on: 03.01.2021
 *      Author: doralitze
 */

#pragma once

#include <functional>
#include <memory>
#include <string>

#include "net/async_fd.hpp"
#include "net/ioqueue.hpp"
#include "net/socketaddress.hpp"

#if EAGAIN != EWOULDBLOCK
    #define EAGAIN_WRAPPER ((errno != EAGAIN) && (errno != EWOULDBLOCK))
#else
    #define EAGAIN_WRAPPER (errno != EAGAIN)
#endif

namespace rmrf::net {

enum class exit_status_t : uint16_t {
    NO_ERROR = 0,
    TIMEOUT = 1
};

class connection_client {
public:
    /**
      * This function type accepts a reference to the incomming data string which it may not alter
      * and may not return any data.
      */
    typedef std::function<void(const iorecord&)> incomming_data_cb;
    
    /**
      * A callback for the destructor must match this definition.
      */
    typedef std::function<void(exit_status_t)> destructor_cb_type;
    
    static const unsigned int no_rate_limit = 0;

protected:
    auto_fd net_socket;
private:
    ::ev::io io;
    ::ev::async async;
    ioqueue<iorecord> write_queue;
    const destructor_cb_type destructor_cb;
    bool server_active = false;
    bool partial_write_allowed = true;
    bool data_write_active = false;
    
    unsigned int rate_limit = no_rate_limit;
    unsigned int current_burst_count = 0;
protected:
Benny Baumann's avatar
Benny Baumann committed
    incomming_data_cb in_data_cb;
    socketaddr own_address, peer_address;
public:
     * This constructor takes ownership over the provided file descriptor and registeres it
     * with libev if it is valid. If an invalid file descriptor or null_fd is passed, libev will not be invoked.
     * If this is intentional, an implementing sub class might overwrite write_data in such a case.
     * @brief Construct a new connection client.
     * @param socket_fd The file descriptor to register with libev
     * @param peer_address_ The remote address the client is connected to
     */
    connection_client(auto_fd&& socket_fd, const socketaddr& peer_address_, const destructor_cb_type destructor_cb_) :
        net_socket(std::forward<auto_fd>(socket_fd)), io{}, async{}, write_queue{}, destructor_cb{destructor_cb_},
        in_data_cb{}, own_address{}, peer_address{peer_address_}
    {
        if(this->net_socket) {
            async.set<connection_client, &connection_client::cb_async>(this);
            async.start();
            io.set<connection_client, &connection_client::cb_ev>(this);
            io.start(this->net_socket.get(), 0);
            this->server_active = true;
        }
    }
    
    virtual ~connection_client();

    /**
     * Use this method to send data to the other endpoint. Overwrite it to modify the data beforehand,
     * but don't forget to call this super method, if you still intend to write the data using libev.
     * @param data The data to send
     */
    inline virtual void write_data(const iorecord& data) {
        // Create NICBuffer from data
        this->write_queue.push_back(data);
        this->async.send();
    }
    
    /**
     * Use this method to send data to the other endpoint. In contrast to write_data this method is not virtual.
     * This data passes the raw buffer to the push_back_emplace method of the ioqueue.
     * @brief Push data to write queue with explicit single copy
     * @param buf The buffer of the data to be send
     * @param size The length of the buffer
     * @param sa The socket address associated with the data
     */
    inline void write_data(iorecord&& data) {
        this->write_queue.push_back(std::forward<iorecord>(data));
        this->async.send();
   /**
     * This method sends data over the socket. Keep in mind that this method only enqueues the
     * data and requests it's transmission but does not send the data directly. While this ensures
     * asynchronous execution you need to check if your data has been send using the is_write_queue_empty
     * method.
     * @brief Send data.
     * @param data The data to send
Benny Baumann's avatar
Benny Baumann committed
     */
    inline void write_data(const std::string& data) {
        this->write_data(iorecord{data.c_str(), data.size()});
    }
Benny Baumann's avatar
Benny Baumann committed

    /**
     * Use this method in order to register your callback function that should be
     * called when the client got data to process.
     * @param cb The callback function to register [void(iorecord& data)]
Benny Baumann's avatar
Benny Baumann committed
     */
    virtual void set_incomming_data_callback(const incomming_data_cb& cb);
    
    /**
     * Get the connected sockets address. Keep in mind that the lone existance of this address
     * does not guaruntee the integrity of the connection. Furthermore, in case of a one-to-many
     * connection setting, it is not garunteed which address might be reported back. Check the
     * documentation of the child class in such a case.
     * @brief Get the connected sockets address
     * @return The peers address.
     */
    inline const socketaddr get_peer_address() const {
        return this->peer_address;
    }

    /**
     * Use this method in order to check if all pending write requests have been successfully transmitted.
     * This method returns false if the write queue is not empty or the socket is currently transmitting data.
     * Note: If the socket connection crashes with an netio exception it might be possible that this
     * method returns true even if the last data send has been corrupted.
     * @brief Check if all pending data has been send.
     * @return true if all pending data has been send.
     */
    inline bool is_write_queue_empty() {
        return this->write_queue.empty() && !this->data_write_active;
    }
    
    /**
     * @brief Set a maximum number of packets to transmit per burst
     * @param new_limit The limit to set
     */
    void set_rate_limit(unsigned int new_limit);

    inline unsigned int get_rate_limit() {
        return this->rate_limit;
    }

protected:
    /**
     * @brief Implement the read operation of the implemented service
     */
    virtual void read_from_socket(::ev::io& w) = 0;

    /**
     * This method pushes the front of the write_queue to the socket. It will automatically advance
     * the internal buffers and discard empty ones.
     * @param w The io handle to use.
     * @param buffer The data to be transmitted
     */
    virtual ssize_t push_write_queue(::ev::io& w, iorecord& buffer) = 0;

    /**
     * @brief This method closes the socket and stops the server.
     */
    void stop_server();

private:
    /**
     * This method implements the io queue callback. It is responsible for the actual data transactions
     * @param w The io handle to use
     * @param events The event flag container
     */
    void cb_ev(::ev::io& w, int events);

    void cb_async(::ev::async& w, int events);
    
    inline void set_new_flags() {
        auto new_flags = 0;
        if(this->in_data_cb) {
            new_flags |= ::ev::READ;
        }
        if (!this->write_queue.empty()) {
            new_flags |= ::ev::WRITE;
        }
        this->io.set(new_flags);
    }