Skip to content
Snippets Groups Projects
Commit b14eccce authored by Leon Dietrich's avatar Leon Dietrich
Browse files

add: TCP socket implementation

parent 7f977128
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@
* Created on: 02.01.2021
* Author: doralitze
*/
#include "async_server.hpp"
#include "net/async_server.hpp"
#include <ev++.h>
......@@ -39,6 +39,7 @@ void async_server_socket::cb_ev(::ev::io &w, int events) {
if (events & ::ev::READ) {
// Handle incoming clients
auto ah = this->get_accept_handler();
ah(this->shared_from_this(), auto_fd(w.fd));
}
if (events & ::ev::WRITE) {
......
......@@ -13,7 +13,7 @@ class async_server_socket : public std::enable_shared_from_this<async_server_soc
public:
typedef std::shared_ptr<async_server_socket> self_ptr_type;
typedef std::function<void(self_ptr_type &, const auto_fd &)> accept_handler_type;
typedef std::function<void(self_ptr_type, const auto_fd &)> accept_handler_type;
typedef std::function<void(self_ptr_type &)> error_handler_type;
private:
......
/*
* connection_client.cpp
*
* Created on: 03.01.2021
* Author: doralitze
*/
#include "net/connection_client.hpp"
namespace rmrf::net {
connection_client::connection_client() : in_data_cb{} {
}
inline void connection_client::set_incomming_data_callback(const incomming_data_cb &cb) {
this->in_data_cb = cb;
}
}
/*
* connection_client.hpp
*
* Created on: 03.01.2021
* Author: doralitze
*/
#pragma once
#include <functional>
#include <memory>
#include <string>
namespace rmrf::net {
class connection_client : public std::enable_shared_from_this<connection_client> {
public:
typedef std::function<void(std::string)> incomming_data_cb;
protected:
incomming_data_cb in_data_cb;
public:
connection_client();
/**
* Use this method to send data to the other endpoint.
*/
virtual void write_data(std::string data) = 0;
/**
* Use this method in order to register your callback function that should be
* called when the client got data to process.
* @param cb The callback function to register [void(std::string data)]
*/
void set_incomming_data_callback(const incomming_data_cb &cb);
};
}
/*
* tcp_client.cpp
*
* Created on: 03.01.2021
* Author: doralitze
*/
#include "net/tcp_client.hpp"
#include <ev++.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <utility>
#include "net/netio_exception.hpp"
namespace rmrf::net {
tcp_client::tcp_client(const destructor_cb_type destructor_cb_, auto_fd&& socket_fd, std::string peer_address_, uint16_t port_) :
connection_client{},
destructor_cb(destructor_cb_),
socket(std::forward<auto_fd>(socket_fd)),
peer_address(peer_address_), port(port_),
io{}, write_queue{} {
io.set<tcp_client, &tcp_client::cb_ev>(this);
io.start(this->socket.get(), ::ev::READ);
// TODO log created client
}
tcp_client::~tcp_client() {
destructor_cb(EXIT_STATUS_NO_ERROR);
io.stop();
}
namespace impl {
NICBuffer::NICBuffer(const char* bytes, ssize_t nbytes) : data(nullptr), len(nbytes), pos(0) {
data = new char[nbytes];
memcpy(data, bytes, nbytes);
}
NICBuffer::~NICBuffer() {
delete [] data;
}
char* NICBuffer::dpos() {
return data + pos;
}
ssize_t NICBuffer::nbytes() {
return len - pos;
}
}
void tcp_client::write_data(std::string data) {
// Create NICBuffer from data
this->write_queue.push_back(std::make_shared<impl::NICBuffer>(data.c_str(), data.size()));
}
std::string buffer_to_string(char* buffer, ssize_t bufflen)
{
// For some wired reaseon the compiler refuses to find the correct constructor of string
// without this extra method.
std::string ret(buffer, (int) bufflen);
return ret;
}
void tcp_client::cb_ev(::ev::io &w, int events) {
if (events & ::ev::ERROR) {
// Handle errors
// Log and throw?
return;
}
if (events & ::ev::READ) {
// notify incomming_data_cb
char buffer[1024];
ssize_t n_read_bytes = recv(w.fd, buffer, sizeof(buffer), 0);
if(n_read_bytes < 0) {
throw netio_exception("Failed to read from network socket.");
}
if(n_read_bytes == 0) {
// TODO find a way to properly announce the closed connection
delete this;
} else {
this->in_data_cb(buffer_to_string(buffer, n_read_bytes));
}
}
if (events & ::ev::WRITE) {
// Handle sending data
push_write_queue(w);
}
if (write_queue.empty()) {
io.set(::ev::READ);
} else {
io.set(::ev::READ | ::ev::WRITE);
}
}
void tcp_client::push_write_queue(::ev::io &w) {
if (this->is_write_queue_empty()) {
io.set(::ev::READ);
return;
}
std::shared_ptr<impl::NICBuffer> buffer = this->write_queue.front();
ssize_t written = write(w.fd, buffer->dpos(), buffer->nbytes());
if (written < 0) {
throw netio_exception("Failed to write latest buffer content.");
}
buffer->pos += written;
if (buffer->nbytes() == 0) {
this->write_queue.pop_front();
}
}
inline std::string tcp_client::get_peer_address() {
return this->peer_address;
}
inline uint16_t tcp_client::get_port() {
return this->port;
}
inline bool tcp_client::is_write_queue_empty() {
return this->write_queue.empty();
}
}
/*
* tcp_client.hpp
*
* Created on: 03.01.2021
* Author: doralitze
*/
#pragma once
#include <ev++.h>
#include <functional>
#include <list>
#include <memory>
#include <string>
#include <unistd.h>
#include "net/connection_client.hpp"
#include "net/async_fd.hpp"
#define EXIT_STATUS_NO_ERROR (exit_status) 0
#define EXIT_STATUS_TIMEOUT (exit_status) 1
namespace rmrf::net {
namespace impl {
struct NICBuffer {
char* data;
ssize_t len;
ssize_t pos;
NICBuffer(const char* bytes, ssize_t nbytes);
virtual ~NICBuffer();
char *dpos();
ssize_t nbytes();
NICBuffer operator=(const rmrf::net::impl::NICBuffer&) = delete;
NICBuffer(const NICBuffer &) = delete;
};
}
class tcp_client : public connection_client, std::enable_shared_from_this<tcp_client> {
public:
typedef unsigned short exit_status;
typedef std::function<void(exit_status)> destructor_cb_type;
private:
const destructor_cb_type destructor_cb;
const auto_fd socket;
const std::string peer_address;
const uint16_t port;
::ev::io io;
std::list<std::shared_ptr<impl::NICBuffer>> write_queue;
public:
tcp_client(const destructor_cb_type destructor_cb_, auto_fd&& socket_fd, std::string peer_address_, uint16_t port_);
virtual ~tcp_client();
virtual void write_data(std::string data);
std::string get_peer_address();
uint16_t get_port();
bool is_write_queue_empty();
private:
void cb_ev(::ev::io &w, int events);
void push_write_queue(::ev::io &w);
};
}
......@@ -4,8 +4,13 @@
* Created on: 02.01.2021
* Author: doralitze
*/
#include "net/tcp_server_socket.hpp"
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <functional>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
......@@ -13,11 +18,10 @@
#include <sys/stat.h>
#include <netinet/in.h>
#include "tcp_server_socket.hpp"
namespace rmrf::net {
tcp_server_socket::tcp_server_socket(uint16_t port) : ss{nullptr} {
tcp_server_socket::tcp_server_socket(uint16_t port, incoming_client_listener_type client_listener_) :
ss{nullptr}, client_listener(client_listener_), number_of_connected_clients(0) {
auto raw_socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if(raw_socket_fd < 0) {
// TODO implement propper error handling
......@@ -29,16 +33,59 @@ tcp_server_socket::tcp_server_socket(uint16_t port) : ss{nullptr} {
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY; // TODO FIXME
if (bind(raw_socket_fd, (struct sockaddr *) &addr, sizeof(addr)) != 0) {
throw netio_exception("Failed to bind to all addresses (FIXME)");
std::string msg = "Failed to bind to all addresses (FIXME)";
if (port < 1024) {
msg += "\tYou tried to bind to a port smaller than 1024. Are you root?";
}
throw netio_exception(msg);
}
// Append the non blocking flag to the file state of the socket fd.
// This might be linux only. We should check that
fcntl(raw_socket_fd, F_SETFL, fcntl(raw_socket_fd, F_GETFL, 0) | O_NONBLOCK);
if (listen(raw_socket_fd, 5) == -1) {
throw netio_exception("Failed to enable listenting mode for raw socket");
throw netio_exception("Failed to enable listening mode for raw socket");
}
this->ss = std::make_shared<async_server_socket>(auto_fd(raw_socket_fd));
//TODO set accept handler
using namespace std::placeholders;
this->ss->set_accept_handler(std::bind(&tcp_server_socket::await_raw_socket_incomming, this, _1, _2));
}
// As we're not depending on the actual async server object we need to suppress the warning that we're not using it.
#ifdef __GNUC__
#define UNUSED __attribute__ ((unused))
#else
#define UNUSED
#endif
void tcp_server_socket::await_raw_socket_incomming(async_server_socket::self_ptr_type ass UNUSED, const auto_fd& socket) {
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd_raw = accept(socket.get(), (struct sockaddr *)&client_addr, &client_len);
if(client_fd_raw < 0) {
throw netio_exception("Unable to bind incoming client");
}
fcntl(client_fd_raw, F_SETFL, fcntl(client_fd_raw, F_GETFL, 0) | O_NONBLOCK);
const std::string address = inet_ntoa(client_addr.sin_addr);
const uint16_t port = ntohs(client_addr.sin_port);
// Generate client object from fd and announce it
this->number_of_connected_clients++;
using namespace std::placeholders;
this->client_listener(tcp_client(std::bind(&tcp_server_socket::client_destructed_cb, this, _1), auto_fd(client_fd_raw), address, port));
}
int tcp_server_socket::get_number_of_connected_clients() {
return this->number_of_connected_clients;
}
void tcp_server_socket::client_destructed_cb(tcp_client::exit_status exit_status UNUSED) {
this->number_of_connected_clients--;
}
}
......@@ -7,20 +7,28 @@
#pragma once
#include <cstdio>
#include <memory>
#include "async_server.hpp"
#include "netio_exception.hpp"
#include "net/async_server.hpp"
#include "net/netio_exception.hpp"
#include "net/tcp_client.hpp"
namespace rmrf::net {
class tcp_server_socket {
class tcp_server_socket : public std::enable_shared_from_this<tcp_server_socket>{
public:
typedef std::function<void(tcp_client)> incoming_client_listener_type;
private:
std::shared_ptr<async_server_socket> ss;
incoming_client_listener_type client_listener;
int number_of_connected_clients;
public:
tcp_server_socket(uint16_t port);
tcp_server_socket(uint16_t port, incoming_client_listener_type client_listener_);
int get_number_of_connected_clients();
private:
void await_raw_socket_incomming(async_server_socket::self_ptr_type ass, const auto_fd& socket);
void client_destructed_cb(tcp_client::exit_status exit_status);
};
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment