From b14eccce3e1143ba1188f85d87f1c3426df07c31 Mon Sep 17 00:00:00 2001 From: Doralitze <doralitze@chaotikum.org> Date: Sun, 3 Jan 2021 15:33:26 +0100 Subject: [PATCH] add: TCP socket implementation --- src/net/async_server.cpp | 3 +- src/net/async_server.hpp | 2 +- src/net/connection_client.cpp | 22 ++++++ src/net/connection_client.hpp | 37 +++++++++ src/net/tcp_client.cpp | 136 ++++++++++++++++++++++++++++++++++ src/net/tcp_client.hpp | 66 +++++++++++++++++ src/net/tcp_server_socket.cpp | 59 +++++++++++++-- src/net/tcp_server_socket.hpp | 18 +++-- 8 files changed, 330 insertions(+), 13 deletions(-) create mode 100644 src/net/connection_client.cpp create mode 100644 src/net/connection_client.hpp create mode 100644 src/net/tcp_client.cpp create mode 100644 src/net/tcp_client.hpp diff --git a/src/net/async_server.cpp b/src/net/async_server.cpp index 6b30b18..43b2ca6 100644 --- a/src/net/async_server.cpp +++ b/src/net/async_server.cpp @@ -4,7 +4,7 @@ * Created on: 02.01.2021 * Author: doralitze */ -#include "async_server.hpp" +#include "net/async_server.hpp" #include <ev++.h> @@ -39,6 +39,7 @@ void async_server_socket::cb_ev(::ev::io &w, int events) { if (events & ::ev::READ) { // Handle incoming clients auto ah = this->get_accept_handler(); + ah(this->shared_from_this(), auto_fd(w.fd)); } if (events & ::ev::WRITE) { diff --git a/src/net/async_server.hpp b/src/net/async_server.hpp index e497f52..a244de5 100644 --- a/src/net/async_server.hpp +++ b/src/net/async_server.hpp @@ -13,7 +13,7 @@ class async_server_socket : public std::enable_shared_from_this<async_server_soc public: typedef std::shared_ptr<async_server_socket> self_ptr_type; - typedef std::function<void(self_ptr_type &, const auto_fd &)> accept_handler_type; + typedef std::function<void(self_ptr_type, const auto_fd &)> accept_handler_type; typedef std::function<void(self_ptr_type &)> error_handler_type; private: diff --git a/src/net/connection_client.cpp b/src/net/connection_client.cpp new file mode 100644 index 0000000..11e1d1b --- /dev/null +++ b/src/net/connection_client.cpp @@ -0,0 +1,22 @@ +/* + * connection_client.cpp + * + * Created on: 03.01.2021 + * Author: doralitze + */ + +#include "net/connection_client.hpp" + +namespace rmrf::net { + +connection_client::connection_client() : in_data_cb{} { + +} + +inline void connection_client::set_incomming_data_callback(const incomming_data_cb &cb) { + this->in_data_cb = cb; +} + +} + + diff --git a/src/net/connection_client.hpp b/src/net/connection_client.hpp new file mode 100644 index 0000000..5cf6946 --- /dev/null +++ b/src/net/connection_client.hpp @@ -0,0 +1,37 @@ +/* + * connection_client.hpp + * + * Created on: 03.01.2021 + * Author: doralitze + */ + +#pragma once + +#include <functional> +#include <memory> +#include <string> + +namespace rmrf::net { + +class connection_client : public std::enable_shared_from_this<connection_client> { +public: + typedef std::function<void(std::string)> incomming_data_cb; +protected: + incomming_data_cb in_data_cb; +public: + connection_client(); + + /** + * Use this method to send data to the other endpoint. + */ + virtual void write_data(std::string data) = 0; + + /** + * Use this method in order to register your callback function that should be + * called when the client got data to process. + * @param cb The callback function to register [void(std::string data)] + */ + void set_incomming_data_callback(const incomming_data_cb &cb); +}; + +} diff --git a/src/net/tcp_client.cpp b/src/net/tcp_client.cpp new file mode 100644 index 0000000..11a704b --- /dev/null +++ b/src/net/tcp_client.cpp @@ -0,0 +1,136 @@ +/* + * tcp_client.cpp + * + * Created on: 03.01.2021 + * Author: doralitze + */ + +#include "net/tcp_client.hpp" + +#include <ev++.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <utility> + +#include "net/netio_exception.hpp" + +namespace rmrf::net { + +tcp_client::tcp_client(const destructor_cb_type destructor_cb_, auto_fd&& socket_fd, std::string peer_address_, uint16_t port_) : + connection_client{}, + destructor_cb(destructor_cb_), + socket(std::forward<auto_fd>(socket_fd)), + peer_address(peer_address_), port(port_), + io{}, write_queue{} { + io.set<tcp_client, &tcp_client::cb_ev>(this); + io.start(this->socket.get(), ::ev::READ); + // TODO log created client +} + +tcp_client::~tcp_client() { + destructor_cb(EXIT_STATUS_NO_ERROR); + io.stop(); +} + +namespace impl { +NICBuffer::NICBuffer(const char* bytes, ssize_t nbytes) : data(nullptr), len(nbytes), pos(0) { + data = new char[nbytes]; + memcpy(data, bytes, nbytes); +} + +NICBuffer::~NICBuffer() { + delete [] data; +} + +char* NICBuffer::dpos() { + return data + pos; +} + +ssize_t NICBuffer::nbytes() { + return len - pos; +} +} + +void tcp_client::write_data(std::string data) { + // Create NICBuffer from data + this->write_queue.push_back(std::make_shared<impl::NICBuffer>(data.c_str(), data.size())); +} + +std::string buffer_to_string(char* buffer, ssize_t bufflen) +{ + // For some wired reaseon the compiler refuses to find the correct constructor of string + // without this extra method. + std::string ret(buffer, (int) bufflen); + return ret; +} + +void tcp_client::cb_ev(::ev::io &w, int events) { + if (events & ::ev::ERROR) { + // Handle errors + // Log and throw? + return; + } + + if (events & ::ev::READ) { + // notify incomming_data_cb + char buffer[1024]; + + ssize_t n_read_bytes = recv(w.fd, buffer, sizeof(buffer), 0); + if(n_read_bytes < 0) { + throw netio_exception("Failed to read from network socket."); + } + + if(n_read_bytes == 0) { + // TODO find a way to properly announce the closed connection + delete this; + } else { + this->in_data_cb(buffer_to_string(buffer, n_read_bytes)); + } + } + + if (events & ::ev::WRITE) { + // Handle sending data + push_write_queue(w); + } + + if (write_queue.empty()) { + io.set(::ev::READ); + } else { + io.set(::ev::READ | ::ev::WRITE); + } +} + +void tcp_client::push_write_queue(::ev::io &w) { + if (this->is_write_queue_empty()) { + io.set(::ev::READ); + return; + } + + std::shared_ptr<impl::NICBuffer> buffer = this->write_queue.front(); + ssize_t written = write(w.fd, buffer->dpos(), buffer->nbytes()); + + if (written < 0) { + throw netio_exception("Failed to write latest buffer content."); + } + + buffer->pos += written; + if (buffer->nbytes() == 0) { + this->write_queue.pop_front(); + } +} + +inline std::string tcp_client::get_peer_address() { + return this->peer_address; +} + +inline uint16_t tcp_client::get_port() { + return this->port; +} + +inline bool tcp_client::is_write_queue_empty() { + return this->write_queue.empty(); +} + +} + + diff --git a/src/net/tcp_client.hpp b/src/net/tcp_client.hpp new file mode 100644 index 0000000..e0b7889 --- /dev/null +++ b/src/net/tcp_client.hpp @@ -0,0 +1,66 @@ +/* + * tcp_client.hpp + * + * Created on: 03.01.2021 + * Author: doralitze + */ + +#pragma once + +#include <ev++.h> + +#include <functional> +#include <list> +#include <memory> +#include <string> +#include <unistd.h> + +#include "net/connection_client.hpp" +#include "net/async_fd.hpp" + +#define EXIT_STATUS_NO_ERROR (exit_status) 0 +#define EXIT_STATUS_TIMEOUT (exit_status) 1 + +namespace rmrf::net { + +namespace impl { +struct NICBuffer { + char* data; + ssize_t len; + ssize_t pos; + + NICBuffer(const char* bytes, ssize_t nbytes); + virtual ~NICBuffer(); + char *dpos(); + ssize_t nbytes(); + + NICBuffer operator=(const rmrf::net::impl::NICBuffer&) = delete; + NICBuffer(const NICBuffer &) = delete; +}; +} + +class tcp_client : public connection_client, std::enable_shared_from_this<tcp_client> { +public: + typedef unsigned short exit_status; + typedef std::function<void(exit_status)> destructor_cb_type; +private: + const destructor_cb_type destructor_cb; + const auto_fd socket; + const std::string peer_address; + const uint16_t port; + + ::ev::io io; + std::list<std::shared_ptr<impl::NICBuffer>> write_queue; +public: + tcp_client(const destructor_cb_type destructor_cb_, auto_fd&& socket_fd, std::string peer_address_, uint16_t port_); + virtual ~tcp_client(); + virtual void write_data(std::string data); + std::string get_peer_address(); + uint16_t get_port(); + bool is_write_queue_empty(); +private: + void cb_ev(::ev::io &w, int events); + void push_write_queue(::ev::io &w); +}; + +} diff --git a/src/net/tcp_server_socket.cpp b/src/net/tcp_server_socket.cpp index d199168..0308d90 100644 --- a/src/net/tcp_server_socket.cpp +++ b/src/net/tcp_server_socket.cpp @@ -4,8 +4,13 @@ * Created on: 02.01.2021 * Author: doralitze */ + +#include "net/tcp_server_socket.hpp" + +#include <arpa/inet.h> #include <unistd.h> #include <fcntl.h> +#include <functional> #include <stdlib.h> #include <string.h> #include <sys/types.h> @@ -13,11 +18,10 @@ #include <sys/stat.h> #include <netinet/in.h> -#include "tcp_server_socket.hpp" - namespace rmrf::net { -tcp_server_socket::tcp_server_socket(uint16_t port) : ss{nullptr} { +tcp_server_socket::tcp_server_socket(uint16_t port, incoming_client_listener_type client_listener_) : + ss{nullptr}, client_listener(client_listener_), number_of_connected_clients(0) { auto raw_socket_fd = socket(AF_INET, SOCK_STREAM, 0); if(raw_socket_fd < 0) { // TODO implement propper error handling @@ -29,16 +33,59 @@ tcp_server_socket::tcp_server_socket(uint16_t port) : ss{nullptr} { addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; // TODO FIXME if (bind(raw_socket_fd, (struct sockaddr *) &addr, sizeof(addr)) != 0) { - throw netio_exception("Failed to bind to all addresses (FIXME)"); + std::string msg = "Failed to bind to all addresses (FIXME)"; + if (port < 1024) { + msg += "\tYou tried to bind to a port smaller than 1024. Are you root?"; + } + throw netio_exception(msg); } + // Append the non blocking flag to the file state of the socket fd. + // This might be linux only. We should check that fcntl(raw_socket_fd, F_SETFL, fcntl(raw_socket_fd, F_GETFL, 0) | O_NONBLOCK); if (listen(raw_socket_fd, 5) == -1) { - throw netio_exception("Failed to enable listenting mode for raw socket"); + throw netio_exception("Failed to enable listening mode for raw socket"); } this->ss = std::make_shared<async_server_socket>(auto_fd(raw_socket_fd)); - //TODO set accept handler + using namespace std::placeholders; + this->ss->set_accept_handler(std::bind(&tcp_server_socket::await_raw_socket_incomming, this, _1, _2)); +} + + +// As we're not depending on the actual async server object we need to suppress the warning that we're not using it. +#ifdef __GNUC__ + #define UNUSED __attribute__ ((unused)) +#else + #define UNUSED +#endif + +void tcp_server_socket::await_raw_socket_incomming(async_server_socket::self_ptr_type ass UNUSED, const auto_fd& socket) { + struct sockaddr_in client_addr; + socklen_t client_len = sizeof(client_addr); + int client_fd_raw = accept(socket.get(), (struct sockaddr *)&client_addr, &client_len); + + if(client_fd_raw < 0) { + throw netio_exception("Unable to bind incoming client"); + } + + fcntl(client_fd_raw, F_SETFL, fcntl(client_fd_raw, F_GETFL, 0) | O_NONBLOCK); + + const std::string address = inet_ntoa(client_addr.sin_addr); + const uint16_t port = ntohs(client_addr.sin_port); + + // Generate client object from fd and announce it + this->number_of_connected_clients++; + using namespace std::placeholders; + this->client_listener(tcp_client(std::bind(&tcp_server_socket::client_destructed_cb, this, _1), auto_fd(client_fd_raw), address, port)); +} + +int tcp_server_socket::get_number_of_connected_clients() { + return this->number_of_connected_clients; +} + +void tcp_server_socket::client_destructed_cb(tcp_client::exit_status exit_status UNUSED) { + this->number_of_connected_clients--; } } diff --git a/src/net/tcp_server_socket.hpp b/src/net/tcp_server_socket.hpp index 4beca9a..1c28fa4 100644 --- a/src/net/tcp_server_socket.hpp +++ b/src/net/tcp_server_socket.hpp @@ -7,20 +7,28 @@ #pragma once -#include <cstdio> #include <memory> -#include "async_server.hpp" -#include "netio_exception.hpp" +#include "net/async_server.hpp" +#include "net/netio_exception.hpp" +#include "net/tcp_client.hpp" namespace rmrf::net { -class tcp_server_socket { +class tcp_server_socket : public std::enable_shared_from_this<tcp_server_socket>{ +public: + typedef std::function<void(tcp_client)> incoming_client_listener_type; private: std::shared_ptr<async_server_socket> ss; + incoming_client_listener_type client_listener; + int number_of_connected_clients; public: - tcp_server_socket(uint16_t port); + tcp_server_socket(uint16_t port, incoming_client_listener_type client_listener_); + int get_number_of_connected_clients(); +private: + void await_raw_socket_incomming(async_server_socket::self_ptr_type ass, const auto_fd& socket); + void client_destructed_cb(tcp_client::exit_status exit_status); }; } -- GitLab