Skip to content
Snippets Groups Projects
Verified Commit 7a1428dc authored by Benny Baumann's avatar Benny Baumann
Browse files

chg: Use ioqueue for writing

Also includes several const correctnet fixes and type simplifications
parent a0531533
No related branches found
No related tags found
No related merge requests found
......@@ -14,7 +14,7 @@ public:
typedef std::shared_ptr<async_server_socket> self_ptr_type;
typedef std::function<void(self_ptr_type, const auto_fd &)> accept_handler_type;
typedef std::function<void(self_ptr_type &)> error_handler_type;
typedef std::function<void(self_ptr_type)> error_handler_type;
private:
auto_fd socket;
......
......@@ -18,5 +18,3 @@ inline void connection_client::set_incomming_data_callback(const incomming_data_
}
}
......@@ -15,7 +15,7 @@ namespace rmrf::net {
class connection_client : public std::enable_shared_from_this<connection_client> {
public:
typedef std::function<void(std::shared_ptr<std::string>)> incomming_data_cb;
typedef std::function<void(const std::string&)> incomming_data_cb;
protected:
incomming_data_cb in_data_cb;
public:
......@@ -24,7 +24,7 @@ public:
/**
* Use this method to send data to the other endpoint.
*/
virtual void write_data(std::string data) = 0;
virtual void write_data(const std::string& data) = 0;
/**
* Use this method in order to register your callback function that should be
......
......@@ -32,7 +32,7 @@ tcp_client::tcp_client(const destructor_cb_type destructor_cb_, auto_fd&& socket
// TODO log created client
}
tcp_client::tcp_client(const std::string peer_address_, std::string service_or_port, int ip_addr_family) :
tcp_client::tcp_client(const std::string& peer_address_, const std::string& service_or_port, int ip_addr_family) :
connection_client{},
destructor_cb(nullptr),
peer_address(peer_address_),
......@@ -127,48 +127,27 @@ tcp_client::tcp_client(const std::string peer_address_, std::string service_or_p
//TODO log connected client
}
tcp_client::tcp_client(const std::string peer_address_, std::string service_or_port) :
tcp_client::tcp_client(const std::string& peer_address_, const std::string& service_or_port) :
tcp_client(peer_address_, service_or_port, AF_UNSPEC) { }
tcp_client::tcp_client(const std::string peer_address_, const uint16_t port_) :
tcp_client::tcp_client(const std::string& peer_address_, const uint16_t port_) :
tcp_client(peer_address_, std::to_string(port_)) {}
tcp_client::~tcp_client() {
if(destructor_cb != nullptr)
destructor_cb(EXIT_STATUS_NO_ERROR);
destructor_cb(exit_status_t::NO_ERROR);
io.stop();
}
namespace impl {
NICBuffer::NICBuffer(const char* bytes, ssize_t nbytes) : data(nullptr), len(nbytes), pos(0) {
data = new char[nbytes];
memcpy(data, bytes, nbytes);
}
NICBuffer::~NICBuffer() {
delete [] data;
}
char* NICBuffer::dpos() {
return data + pos;
}
ssize_t NICBuffer::nbytes() {
return len - pos;
}
}
void tcp_client::write_data(std::string data) {
void tcp_client::write_data(const std::string& data) {
// Create NICBuffer from data
this->write_queue.push_back(std::make_shared<impl::NICBuffer>(data.c_str(), data.size()));
this->write_queue.push_back(iorecord{data.c_str(), data.size()});
this->io.set(::ev::READ | ::ev::WRITE);
}
inline std::shared_ptr<std::string> buffer_to_string(char* buffer, ssize_t bufflen)
inline std::string buffer_to_string(char* buffer, ssize_t bufflen)
{
// For some wired reaseon the compiler refuses to find the correct constructor of string
// without this extra method.
std::shared_ptr<std::string> ret = std::make_shared<std::string>(buffer, (int) bufflen);
return ret;
return std::string(buffer, (size_t)bufflen);
}
void tcp_client::cb_ev(::ev::io &w, int events) {
......@@ -208,22 +187,20 @@ void tcp_client::cb_ev(::ev::io &w, int events) {
}
void tcp_client::push_write_queue(::ev::io &w) {
if (this->is_write_queue_empty()) {
if (this->write_queue.empty()) {
io.set(::ev::READ);
return;
}
std::shared_ptr<impl::NICBuffer> buffer = this->write_queue.front();
ssize_t written = write(w.fd, buffer->dpos(), buffer->nbytes());
iorecord buffer = this->write_queue.pop_front();
ssize_t written = write(w.fd, buffer.ptr(), buffer.size());
if (written < 0) {
throw netio_exception("Failed to write latest buffer content.");
}
buffer->pos += written;
if (buffer->nbytes() == 0) {
this->write_queue.pop_front();
}
buffer.advance((size_t)written);
this->write_queue.push_front(buffer);
}
inline std::string tcp_client::get_peer_address() {
......@@ -234,10 +211,6 @@ inline uint16_t tcp_client::get_port() {
return this->port;
}
inline bool tcp_client::is_write_queue_empty() {
return this->write_queue.empty();
}
}
......@@ -15,34 +15,20 @@
#include <string>
#include <unistd.h>
#include "net/connection_client.hpp"
#include "net/async_fd.hpp"
#define EXIT_STATUS_NO_ERROR (exit_status) 0
#define EXIT_STATUS_TIMEOUT (exit_status) 1
#include "net/connection_client.hpp"
#include "net/ioqueue.hpp"
namespace rmrf::net {
namespace impl {
struct NICBuffer {
char* data;
ssize_t len;
ssize_t pos;
NICBuffer(const char* bytes, ssize_t nbytes);
virtual ~NICBuffer();
char *dpos();
ssize_t nbytes();
NICBuffer operator=(const rmrf::net::impl::NICBuffer&) = delete;
NICBuffer(const NICBuffer &) = delete;
enum class exit_status_t : uint16_t {
NO_ERROR = 0,
TIMEOUT = 1
};
}
class tcp_client : public connection_client, std::enable_shared_from_this<tcp_client> {
public:
typedef unsigned short exit_status;
typedef std::function<void(exit_status)> destructor_cb_type;
typedef std::function<void(exit_status_t)> destructor_cb_type;
private:
const destructor_cb_type destructor_cb;
const std::string peer_address;
......@@ -50,17 +36,16 @@ private:
uint16_t port;
auto_fd net_socket;
::ev::io io;
std::list<std::shared_ptr<impl::NICBuffer>> write_queue;
ioqueue write_queue;
public:
tcp_client(const destructor_cb_type destructor_cb_, auto_fd&& socket_fd, std::string peer_address_, uint16_t port_);
tcp_client(const std::string peer_address_, const uint16_t port_);
tcp_client(const std::string peer_address_, std::string service_or_port);
tcp_client(const std::string peer_address_, std::string service_or_port, int ip_addr_family);
tcp_client(const std::string& peer_address_, const uint16_t port_);
tcp_client(const std::string& peer_address_, const std::string& service_or_port);
tcp_client(const std::string& peer_address_, const std::string& service_or_port, int ip_addr_family);
virtual ~tcp_client();
virtual void write_data(std::string data);
virtual void write_data(const std::string& data);
std::string get_peer_address();
uint16_t get_port();
bool is_write_queue_empty();
private:
void cb_ev(::ev::io &w, int events);
void push_write_queue(::ev::io &w);
......
......@@ -27,7 +27,7 @@
namespace rmrf::net {
tcp_server_socket::tcp_server_socket(socketaddr socket_identifier, incoming_client_listener_type client_listener_) :
tcp_server_socket::tcp_server_socket(const socketaddr& socket_identifier, incoming_client_listener_type client_listener_) :
ss{nullptr}, client_listener(client_listener_), number_of_connected_clients(0) {
auto_fd socket_fd{socket(socket_identifier.family(), SOCK_STREAM, 0)};
if(!socket_fd.valid()) {
......@@ -102,7 +102,7 @@ int tcp_server_socket::get_number_of_connected_clients() const {
return this->number_of_connected_clients;
}
void tcp_server_socket::client_destructed_cb(tcp_client::exit_status exit_status) {
void tcp_server_socket::client_destructed_cb(exit_status_t exit_status) {
MARK_UNUSED(exit_status);
this->number_of_connected_clients--;
......
......@@ -27,11 +27,11 @@ private:
std::atomic_uint32_t number_of_connected_clients;
public:
tcp_server_socket(const uint16_t port, incoming_client_listener_type client_listener_);
tcp_server_socket(socketaddr socket_identifier, incoming_client_listener_type client_listener_);
tcp_server_socket(const socketaddr& socket_identifier, incoming_client_listener_type client_listener_);
int get_number_of_connected_clients() const;
private:
void await_raw_socket_incomming(async_server_socket::self_ptr_type ass, const auto_fd& socket);
void client_destructed_cb(tcp_client::exit_status exit_status);
void client_destructed_cb(exit_status_t exit_status);
};
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment