Skip to content
Snippets Groups Projects
Commit b6cbc27e authored by Leon Dietrich's avatar Leon Dietrich
Browse files

Merge branch 'async_post_support' into 'master'

fix: async support for connection clients

See merge request !6
parents 32dbd80d 6f8fb7bb
No related branches found
No related tags found
1 merge request!6fix: async support for connection clients
Pipeline #7696 failed
......@@ -11,8 +11,12 @@
namespace rmrf::net {
connection_client::~connection_client() {
if(this->server_active)
async.stop();
if (this->server_active) {
io.stop();
}
if (destructor_cb) {
destructor_cb(exit_status_t::NO_ERROR);
}
......@@ -59,15 +63,22 @@ namespace rmrf::net {
this->server_active = false;
io.stop();
}
void connection_client::set_incomming_data_callback(const incomming_data_cb& cb) {
void connection_client::set_incomming_data_callback(const incomming_data_cb &cb) {
this->in_data_cb = cb;
set_new_flags();
this->async.send();
}
void connection_client::set_rate_limit(unsigned int new_limit) {
this->rate_limit = new_limit;
// TODO start timer if new limit is not no_rate_limit
// TODO stop timer if timer exists and new limit is no_rate_limit
}
void connection_client::cb_async(::ev::async &w, int events) {
MARK_UNUSED(w);
MARK_UNUSED(events);
set_new_flags();
}
}
......@@ -49,6 +49,7 @@ protected:
auto_fd net_socket;
private:
::ev::io io;
::ev::async async;
ioqueue<iorecord> write_queue;
const destructor_cb_type destructor_cb;
bool server_active = false;
......@@ -73,10 +74,12 @@ public:
* @param peer_address_ The remote address the client is connected to
*/
connection_client(auto_fd&& socket_fd, const socketaddr& peer_address_, const destructor_cb_type destructor_cb_) :
net_socket(std::forward<auto_fd>(socket_fd)), io{}, write_queue{}, destructor_cb{destructor_cb_},
net_socket(std::forward<auto_fd>(socket_fd)), io{}, async{}, write_queue{}, destructor_cb{destructor_cb_},
in_data_cb{}, own_address{}, peer_address{peer_address_}
{
if(this->net_socket) {
async.set<connection_client, &connection_client::cb_async>(this);
async.start();
io.set<connection_client, &connection_client::cb_ev>(this);
io.start(this->net_socket.get(), 0);
this->server_active = true;
......@@ -93,7 +96,7 @@ public:
inline virtual void write_data(const iorecord& data) {
// Create NICBuffer from data
this->write_queue.push_back(data);
this->io.set(::ev::READ | ::ev::WRITE);
this->async.send();
}
/**
......@@ -106,9 +109,9 @@ public:
*/
inline void write_data(iorecord&& data) {
this->write_queue.push_back(std::forward<iorecord>(data));
this->io.set(::ev::READ | ::ev::WRITE);
this->async.send();
}
/**
* This method sends data over the socket. Keep in mind that this method only enqueues the
* data and requests it's transmission but does not send the data directly. While this ensures
......@@ -188,6 +191,8 @@ private:
* @param events The event flag container
*/
void cb_ev(::ev::io& w, int events);
void cb_async(::ev::async& w, int events);
inline void set_new_flags() {
auto new_flags = 0;
......@@ -202,4 +207,3 @@ private:
};
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment