Newer
Older
/*
* connection_client.hpp
*
* Created on: 03.01.2021
* Author: doralitze
*/
#pragma once
#include <functional>
#include <memory>
#include <string>
#include "net/async_fd.hpp"
#include "net/ioqueue.hpp"
#include "net/socketaddress.hpp"
#if EAGAIN != EWOULDBLOCK
#define EAGAIN_WRAPPER ((errno != EAGAIN) && (errno != EWOULDBLOCK))
#else
#define EAGAIN_WRAPPER (errno != EAGAIN)
#endif
enum class exit_status_t : uint16_t {
NO_ERROR = 0,
TIMEOUT = 1
};
class connection_client {
/**
* This function type accepts a reference to the incomming data string which it may not alter
* and may not return any data.
*/
typedef std::function<void(const iorecord&)> incomming_data_cb;
/**
* A callback for the destructor must match this definition.
*/
typedef std::function<void(exit_status_t)> destructor_cb_type;
static const unsigned int no_rate_limit = 0;
protected:
auto_fd net_socket;
private:
::ev::io io;
ioqueue<iorecord> write_queue;
const destructor_cb_type destructor_cb;
bool server_active = false;
bool partial_write_allowed = true;
bool data_write_active = false;
unsigned int rate_limit = no_rate_limit;
unsigned int current_burst_count = 0;
socketaddr own_address, peer_address;
* This constructor takes ownership over the provided file descriptor and registeres it
* with libev if it is valid. If an invalid file descriptor or null_fd is passed, libev will not be invoked.
* If this is intentional, an implementing sub class might overwrite write_data in such a case.
* @brief Construct a new connection client.
* @param socket_fd The file descriptor to register with libev
* @param peer_address_ The remote address the client is connected to
*/
connection_client(auto_fd&& socket_fd, const socketaddr& peer_address_, const destructor_cb_type destructor_cb_) :
net_socket(std::forward<auto_fd>(socket_fd)), io{}, async{}, write_queue{}, destructor_cb{destructor_cb_},
in_data_cb{}, own_address{}, peer_address{peer_address_}
{
if(this->net_socket) {
async.set<connection_client, &connection_client::cb_async>(this);
async.start();
io.set<connection_client, &connection_client::cb_ev>(this);
io.start(this->net_socket.get(), 0);
this->server_active = true;
}
}
virtual ~connection_client();
/**
* Use this method to send data to the other endpoint. Overwrite it to modify the data beforehand,
* but don't forget to call this super method, if you still intend to write the data using libev.
* @param data The data to send
*/
inline virtual void write_data(const iorecord& data) {
// Create NICBuffer from data
this->write_queue.push_back(data);
}
/**
* Use this method to send data to the other endpoint. In contrast to write_data this method is not virtual.
* This data passes the raw buffer to the push_back_emplace method of the ioqueue.
* @brief Push data to write queue with explicit single copy
* @param buf The buffer of the data to be send
* @param size The length of the buffer
* @param sa The socket address associated with the data
*/
inline void write_data(iorecord&& data) {
this->write_queue.push_back(std::forward<iorecord>(data));
/**
* This method sends data over the socket. Keep in mind that this method only enqueues the
* data and requests it's transmission but does not send the data directly. While this ensures
* asynchronous execution you need to check if your data has been send using the is_write_queue_empty
* method.
* @brief Send data.
* @param data The data to send
inline void write_data(const std::string& data) {
this->write_data(iorecord{data.c_str(), data.size()});
}
/**
* Use this method in order to register your callback function that should be
* called when the client got data to process.
* @param cb The callback function to register [void(iorecord& data)]
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
virtual void set_incomming_data_callback(const incomming_data_cb& cb);
/**
* Get the connected sockets address. Keep in mind that the lone existance of this address
* does not guaruntee the integrity of the connection. Furthermore, in case of a one-to-many
* connection setting, it is not garunteed which address might be reported back. Check the
* documentation of the child class in such a case.
* @brief Get the connected sockets address
* @return The peers address.
*/
inline const socketaddr get_peer_address() const {
return this->peer_address;
}
/**
* Use this method in order to check if all pending write requests have been successfully transmitted.
* This method returns false if the write queue is not empty or the socket is currently transmitting data.
* Note: If the socket connection crashes with an netio exception it might be possible that this
* method returns true even if the last data send has been corrupted.
* @brief Check if all pending data has been send.
* @return true if all pending data has been send.
*/
inline bool is_write_queue_empty() {
return this->write_queue.empty() && !this->data_write_active;
}
/**
* @brief Set a maximum number of packets to transmit per burst
* @param new_limit The limit to set
*/
void set_rate_limit(unsigned int new_limit);
inline unsigned int get_rate_limit() {
return this->rate_limit;
}
protected:
/**
* @brief Implement the read operation of the implemented service
*/
virtual void read_from_socket(::ev::io& w) = 0;
/**
* This method pushes the front of the write_queue to the socket. It will automatically advance
* the internal buffers and discard empty ones.
* @param w The io handle to use.
* @param buffer The data to be transmitted
*/
virtual ssize_t push_write_queue(::ev::io& w, iorecord& buffer) = 0;
/**
* @brief This method closes the socket and stops the server.
*/
void stop_server();
private:
/**
* This method implements the io queue callback. It is responsible for the actual data transactions
* @param w The io handle to use
* @param events The event flag container
*/
void cb_ev(::ev::io& w, int events);
void cb_async(::ev::async& w, int events);
inline void set_new_flags() {
auto new_flags = 0;
if(this->in_data_cb) {
new_flags |= ::ev::READ;
}
if (!this->write_queue.empty()) {
new_flags |= ::ev::WRITE;
}
this->io.set(new_flags);
}