Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ReadMailReallyFast/code
1 result
Show changes
Commits on Source (13)
......@@ -13,34 +13,10 @@
namespace rmrf::net {
[[nodiscard]] socketaddr get_own_address_after_connect(const auto_fd& socket) {
socketaddr own_address;
socklen_t sa_local_len = sizeof(sockaddr_storage);
if (getsockname(socket.get(), own_address.ptr(), &sa_local_len) == 0) {
// Update length field after the internal structure was modified
// TODO: Maybe make this an internal method in socketaddr to update the size
own_address = own_address.ptr();
} else {
switch(errno) {
case EBADF:
case ENOTSOCK:
throw netio_exception("Invalid file descriptor provided to obtain own address. ERRNO: " + std::to_string(errno));
case EFAULT:
case EINVAL:
throw netio_exception("Invlid data structure for information retrival of own socket address provided. ERRNO: " + std::to_string(errno));
case ENOBUFS:
throw netio_exception("Kernel temporarily out of buffer space to store own address informatio.n ERRNO:." + std::to_string(errno));
default:
throw netio_exception("Unexpected error while requesting own socket address. ERRNO: " + std::to_string(errno));
}
}
return own_address;
}
[[nodiscard]] std::unique_ptr<udp_client> client_factory_construct_udp_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb) {
const auto family = socket_identifier.family();
// TODO maybe implement also unix sockets with SOCK_SEQPACKET if we need them
if (!(family == AF_INET || family == AF_INET6)) {
std::stringstream ss;
ss << "Invalid IP address family. (" << family << ")";
......@@ -62,13 +38,29 @@ namespace rmrf::net {
return c;
}
[[nodiscard]] std::unique_ptr<tcp_client> client_factory_construct_tcp_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb) {
[[nodiscard]] std::unique_ptr<connection_client> client_factory_construct_tcp_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb) {
if (auto f = socket_identifier.family(); f != AF_INET6 && f != AF_INET) {
throw netio_exception("Expected TCP socket address over IP");
}
return client_factory_construct_stream_client(socket_identifier, cb);
}
[[nodiscard]] std::unique_ptr<connection_client> client_factory_construct_unix_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb) {
if (socket_identifier.family() != AF_UNIX) {
throw netio_exception("Expected UNIX domain socket address as target");
}
return client_factory_construct_stream_client(socket_identifier, cb);
}
[[nodiscard]] std::unique_ptr<connection_client> client_factory_construct_stream_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb) {
auto_fd socket_candidate{socket(socket_identifier.family(), SOCK_STREAM, 0)};
if (socket_candidate.valid()) {
if (connect(socket_candidate.get(), socket_identifier.ptr(), socket_identifier.size()) == 0) {
make_socket_nonblocking(socket_candidate);
const auto own_address = get_own_address_after_connect(socket_candidate);
const auto own_address = socket_identifier.family() != AF_UNIX ? get_own_address_after_connect(socket_candidate) : socketaddr{};
// TODO create client object using socket_candidate, own_address and socket_identifier as remote address
auto c = std::make_unique<tcp_client>(nullptr, std::move(socket_candidate), own_address, socket_identifier);
......@@ -85,11 +77,10 @@ namespace rmrf::net {
switch(type) {
case socket_t::TCP:
return client_factory_construct_tcp_client(address);
case socket_t::UNIX:
return client_factory_construct_unix_client(address);
case socket_t::UDP:
return client_factory_construct_udp_client(address);
case socket_t::UNIX:
// TODO implement
return nullptr;
default:
return nullptr;
}
......
......@@ -10,16 +10,9 @@
namespace rmrf::net {
[[nodiscard]] std::unique_ptr<udp_client> client_factory_construct_udp_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb = nullptr);
[[nodiscard]] std::unique_ptr<tcp_client> client_factory_construct_tcp_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb = nullptr);
/**
* This method queries the remote address of a connected client. Please note that only TCP sockets are supported
* at the moment. This operation will fail if an unsupported socket type is provided.
* @brief Get the address of the connected remote client
* @param socket The socket representing the client
* @return The remote socketaddr pair
*/
[[nodiscard]] socketaddr get_own_address_after_connect(const auto_fd& socket);
[[nodiscard]] std::unique_ptr<connection_client> client_factory_construct_tcp_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb = nullptr);
[[nodiscard]] std::unique_ptr<connection_client> client_factory_construct_unix_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb = nullptr);
[[nodiscard]] std::unique_ptr<connection_client> client_factory_construct_stream_client(const socketaddr& socket_identifier, connection_client::incomming_data_cb cb = nullptr);
/**
* This method directly connects to the given address and returns an invalid pointer if it fails.
......
......@@ -15,6 +15,7 @@ namespace rmrf::net {
if (this->server_active) {
io.stop();
this->net_socket.reset();
}
if (destructor_cb) {
......@@ -31,7 +32,7 @@ namespace rmrf::net {
}
if (events & ::ev::READ) {
if(this->in_data_cb != nullptr)
if (this->in_data_cb != nullptr)
this->read_from_socket(w);
}
......@@ -52,19 +53,27 @@ namespace rmrf::net {
if (partial_write_allowed)
this->write_queue.push_front(buffer);
this->data_write_active = false;
}
}
set_new_flags();
}
void connection_client::stop_server() {
this->server_active = false;
this->net_socket.reset();
this->write_queue.clear();
io.stop();
async.stop();
}
void connection_client::set_incomming_data_callback(const incomming_data_cb &cb) {
if (!this->server_active) {
return;
}
this->in_data_cb = cb;
this->async.send();
}
......
......@@ -165,6 +165,10 @@ public:
return this->rate_limit;
}
[[nodiscard]] inline bool is_client_alive() {
return this->server_active;
}
protected:
/**
* @brief Implement the read operation of the implemented service
......@@ -195,8 +199,12 @@ private:
void cb_async(::ev::async& w, int events);
inline void set_new_flags() {
if (!this->server_active) {
return;
}
auto new_flags = 0;
if(this->in_data_cb) {
if (this->in_data_cb) {
new_flags |= ::ev::READ;
}
if (!this->write_queue.empty()) {
......
......@@ -79,6 +79,10 @@ namespace rmrf::net {
[[nodiscard]] inline std::deque<iorecord_type>::const_iterator end() const {
return this->queue.end();
}
void clear() {
this->queue.clear();
}
};
}
......@@ -162,6 +162,17 @@ namespace rmrf::net {
}
std::list<socketaddr> get_socketaddr_list(const std::string &interface_description, const std::string &service_or_port, const socket_t socket_type) {
if (socket_type == socket_t::UNIX) {
sockaddr_un storage;
strncpy(storage.sun_path, interface_description.c_str(), sizeof(storage.sun_path));
// Required as the automatic initialization of sockaddr_un is broken on linux.
// This will be optimized out on platforms where it is not.
((sockaddr*) &storage)->sa_family = AF_UNIX;
const socketaddr sa{storage};
std::list<socketaddr> l = {sa};
return l;
}
int port = -1;
try {
......@@ -177,9 +188,9 @@ namespace rmrf::net {
return l;
}
// Attempt DNS lookup
struct addrinfo hints = {};
struct addrinfo* addrs;
struct addrinfo* addrs = nullptr;
hints.ai_family = AF_INET6;
hints.ai_socktype = get_socket_type_hint(socket_type);
......@@ -190,8 +201,10 @@ namespace rmrf::net {
dns_error = getaddrinfo(interface_description.c_str(), NULL, &hints, &addrs);
if (dns_error != 0) {
freeaddrinfo(addrs);
throw std::invalid_argument("Something went wrong with the DNS lookup. Error code: " + format_network_error(dns_error));
if (addrs != nullptr) {
freeaddrinfo(addrs);
}
throw std::invalid_argument("Something went wrong with the DNS lookup. Error: " + format_network_error(dns_error));
}
}
......
......@@ -11,4 +11,29 @@ namespace rmrf::net {
throw netio_exception("Failed to set socket mode. fcntl resulted in error:" + std::to_string(errno));
}
}
[[nodiscard]] socketaddr get_own_address_after_connect(const auto_fd& socket) {
socketaddr own_address;
socklen_t sa_local_len = sizeof(sockaddr_storage);
if (getsockname(socket.get(), own_address.ptr(), &sa_local_len) == 0) {
// Update length field after the internal structure was modified
// TODO: Maybe make this an internal method in socketaddr to update the size
own_address = own_address.ptr();
} else {
switch(errno) {
case EBADF:
case ENOTSOCK:
throw netio_exception("Invalid file descriptor provided to obtain own address. ERRNO: " + std::to_string(errno));
case EFAULT:
case EINVAL:
throw netio_exception("Invlid data structure for information retrival of own socket address provided. ERRNO: " + std::to_string(errno));
case ENOBUFS:
throw netio_exception("Kernel temporarily out of buffer space to store own address informatio.n ERRNO:." + std::to_string(errno));
default:
throw netio_exception("Unexpected error while requesting own socket address. ERRNO: " + std::to_string(errno));
}
}
return own_address;
}
}
#pragma once
#include "net/async_fd.hpp"
#include "net/socketaddress.hpp"
namespace rmrf::net {
/**
......@@ -8,4 +9,13 @@ namespace rmrf::net {
* @param socket The socket to modify
*/
void make_socket_nonblocking(auto_fd& socket);
/**
* This method queries the remote address of a connected client. Please note that only TCP sockets are supported
* at the moment. This operation will fail if an unsupported socket type is provided.
* @brief Get the address of the connected remote client
* @param socket The socket representing the client
* @return The remote socketaddr pair
*/
[[nodiscard]] socketaddr get_own_address_after_connect(const auto_fd& socket);
}
......@@ -70,7 +70,7 @@ public:
template <typename T, typename std::enable_if<has_field<T>::value, T>::type * = nullptr>
explicit socketaddr(T *other) : addr{}, len{} {
if (other->*(family_map<T>::sa_family_field) != family_map<T>::sa_family) {
throw netio_exception("Address family mismatch in sockaddr structure.");
throw netio_exception("Unable to construct socketaddr object. Address family mismatch in sockaddr structure.");
}
memcpy(&addr, other, sizeof(T));
......@@ -88,7 +88,7 @@ public:
template <typename T, typename std::enable_if<has_field<T>::value, T>::type * = nullptr>
explicit socketaddr(const T& other) : addr{}, len{} {
if (other.*(family_map<T>::sa_family_field) != family_map<T>::sa_family) {
throw netio_exception("Address family mismatch in sockaddr structure.");
throw netio_exception("Unable to construct socketaddr object from reference. Address family mismatch in sockaddr structure.");
}
memcpy(&addr, &other, sizeof(T));
......@@ -98,7 +98,7 @@ public:
template <typename T>
socketaddr& operator=(const T *rhs) {
if (rhs->*(family_map<T>::sa_family_field) != family_map<T>::sa_family) {
throw netio_exception("Address family mismatch in sockaddr structure.");
throw netio_exception("Unable to construct socketaddr object from rhs. Address family mismatch in sockaddr structure.");
}
memcpy(&addr, rhs, sizeof(T));
......
......@@ -35,89 +35,6 @@ tcp_client::tcp_client(
this->own_address = own_address_;
}
/*
// TODO move to client_factory
auto_fd tcp_client::get_connection(const std::string& peer_address_, const std::string& service_or_port, int ip_addr_family) {
std::list<socketaddr> connection_candidates = get_socketaddr_list(peer_address_, service_or_port, socket_t::TCP);
if (ip_addr_family == AF_UNSPEC) {
ip_addr_family = connection_candidates.front().family();
}
if (!(ip_addr_family == AF_INET || ip_addr_family == AF_INET6)) {
throw netio_exception("Invalid IP address family.");
}
// TODO build another nice HL structure wrapper for outbound connections
int status = 1;
do {
if (connection_candidates.empty()) {
throw netio_exception("Unable to find suitable connection candidate.");
}
socketaddr socket_identifier = connection_candidates.front();
connection_candidates.pop_front();
auto_fd socket_candidate{socket(socket_identifier.family(), SOCK_STREAM, 0)};
if (socket_candidate.valid()) {
if (connect(socket_candidate.get(), socket_identifier.ptr(), socket_identifier.size()) == 0) {
status = 0;
if (
const auto existing_fd_flags = fcntl(socket_candidate.get(), F_GETFL, 0);
existing_fd_flags == -1 || fcntl(socket_candidate.get(), F_SETFL, existing_fd_flags | O_NONBLOCK) == -1
) {
throw netio_exception("Failed to set socket mode. fcntl resulted in error:" + std::to_string(errno));
}
// Hier bin ich mir nicht sicher, wie ich das am besten mache. Auch mit socketaddr und type cast ist das irgendwie doof.
// Das Problem besteht darin, dass erst nach erfolgreichem connect der Port auf dieser Seite bekannt ist.
socketaddr sa_local;
socklen_t sa_local_len = sizeof(sockaddr_storage);
if (getsockname(socket_candidate.get(), sa_local.ptr(), &sa_local_len)) {
// Update length field after the internal structure was modified
// TODO: Maybe make this an internal method in socketaddr to update the size
sa_local = sa_local.ptr();
this->own_address = sa_local;
}
// Store remote connection endpoint (that is prior to redirect)
this->peer_address = socket_identifier;
return std::forward<auto_fd>(socket_candidate);
}
}
// We don't need to worry about closing broken fd as auto_fd handles this for us
} while (status == 1);
return null_fd{};
}
// TODO remove and replace with raw socketaddr constructor from factory and implement connect construcor
tcp_client::tcp_client(
const std::string& peer_address_,
const std::string& service_or_port,
int ip_addr_family
) :
connection_client{get_connection(peer_address_, service_or_port, ip_addr_family), this->get_peer_address()},
destructor_cb(nullptr)
{}
tcp_client::tcp_client(
const std::string &peer_address_,
const std::string &service_or_port
) :
tcp_client(peer_address_, service_or_port, AF_UNSPEC)
{}
tcp_client::tcp_client(
const std::string &peer_address_,
const uint16_t port_
) :
tcp_client(peer_address_, std::to_string(port_))
{}*/
tcp_client::~tcp_client() {}
ssize_t tcp_client::push_write_queue(::ev::io& w, iorecord& buffer) {
......@@ -131,6 +48,31 @@ inline uint16_t tcp_client::get_port() {
return optional_port.value();
}
inline std::string get_recv_error() {
switch (errno) {
case EBADF:
return "The argument sockfd is an invalid file descriptor.";
case ECONNREFUSED:
return "A remote host refused to allow the network connection.";
case EFAULT:
return "The receive buffer pointer(s) point outside the process's address space.";
case EINTR:
return "The receive was interrupted by delivery of a signal before any data was available.";
case EINVAL:
return "Invalid argument passed.";
case ENOMEM:
return "Could not allocate memory for recvmsg().";
case ENOTCONN:
return "The socket is associated with a connection-oriented protocol and has not been connected.";
case ENOTSOCK:
return "The file descriptor sockfd does not refer to a socket.";
case 104:
return "Connection Reset by peer (Linux bug)";
default:
return "Error Code not captured: " + std::to_string(errno);
}
}
void tcp_client::read_from_socket(::ev::io& w) {
// notify incomming_data_cb
uint8_t buffer[1024];
......@@ -138,7 +80,12 @@ void tcp_client::read_from_socket(::ev::io& w) {
ssize_t n_read_bytes = recv(w.fd, buffer, sizeof(buffer), 0);
if (n_read_bytes < 0) {
throw netio_exception("Failed to read from network socket in TCP client.");
if (errno == ECONNRESET) {
this->stop_server();
return;
} else if (errno != EAGAIN) {
throw netio_exception("Failed to read from network socket in TCP client. " + get_recv_error());
}
}
if (n_read_bytes == 0) {
......
......@@ -31,7 +31,11 @@
namespace rmrf::net {
auto_fd create_tcp_server_socket(const socketaddr& socket_identifier) {
static auto_fd create_tcp_server_socket(const socketaddr& socket_identifier) {
if (auto f = socket_identifier.family(); f != AF_INET6 && f != AF_INET) {
throw netio_exception("For now, TCP is only supported over IP(v6).");
}
auto_fd socket_fd{socket(socket_identifier.family(), SOCK_STREAM, 0)};
if (!socket_fd.valid()) {
......@@ -63,7 +67,7 @@ auto_fd create_tcp_server_socket(const socketaddr& socket_identifier) {
make_socket_nonblocking(socket_fd);
if (listen(socket_fd.get(), 5) == -1) {
if (listen(socket_fd.get(), 128) == -1) {
throw netio_exception("Failed to enable listening mode for raw socket");
}
......
#include "net/unix_socket_server.hpp"
#include <filesystem>
#include <sstream>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include "net/async_fd.hpp"
#include "net/netio_exception.hpp"
#include "net/socket_utils.hpp"
#include "net/tcp_client.hpp"
namespace rmrf::net {
static std::string get_unix_socket_path(const socketaddr& addr) {
if (addr.family() != AF_UNIX) {
return {};
}
return std::string(((sockaddr_un*) addr.ptr())->sun_path);
}
static auto_fd construct_server_fd(const socketaddr& addr) {
if (addr.family() != AF_UNIX) {
throw netio_exception("Expected a UNIX socket file path.");
}
// man 7 unix suggests the ussage of SOCK_SEQPACKET, but we'd loose the ability to distinguish multiple clients if we do so
auto_fd socket_fd{socket(addr.family(), SOCK_STREAM, 0)};
if (!socket_fd.valid()) {
throw netio_exception("Failed to create UNIX socket. Do you have the permissions to do this?");
}
if (auto error = bind(socket_fd.get(), addr.ptr(), addr.size()); error != 0) {
std::stringstream ss;
ss << "Failed to bind to socket " + addr.str() << ".";
const auto p = std::filesystem::path(get_unix_socket_path(addr));
if (std::filesystem::exists(p)) {
ss << " Reason: The file already exists.";
if (std::filesystem::is_socket(p)) {
ss << " It is also a socket. Is the application already running?";
}
} else {
ss << " Is there buffer space avaiable and do you have the permission to create a socket there?";
}
throw netio_exception(ss.str());
}
make_socket_nonblocking(socket_fd);
if (listen(socket_fd.get(), 5) == -1) {
// We already created the socket and close won't remove the file thus we need to unlink it.
unlink(get_unix_socket_path(addr).c_str());
throw netio_exception("Failed to enable listening mode for raw socket");
}
return socket_fd;
}
unix_socket_server::unix_socket_server(
const socketaddr& socket_identifier,
async_server_socket::accept_handler_type client_listener_
) : async_server_socket{construct_server_fd(socket_identifier)}, socket_path{get_unix_socket_path(socket_identifier)} {
this->set_accept_handler(client_listener_);
}
unix_socket_server::~unix_socket_server() {
if (this->socket_path.length() > 0) {
// We're ignoring potential errors as there's nothing we can do about it anyway.
unlink(this->socket_path.c_str());
}
}
std::shared_ptr<connection_client> unix_socket_server::await_raw_socket_incomming(const auto_fd& server_socket) {
auto client_socket = auto_fd{accept(server_socket.get(), nullptr, nullptr)};
if (!client_socket.valid()) {
throw netio_exception("Failed to accept incomming client to unix socket.");
}
make_socket_nonblocking(client_socket);
const socketaddr own_address{}, peer_address{};
return std::make_shared<tcp_client>(
this->get_locked_destructor_callback(),
std::move(client_socket),
own_address,
peer_address);
}
}
#pragma once
#include <string>
#include "net/async_fd.hpp"
#include "net/async_server.hpp"
#include "net/socketaddress.hpp"
namespace rmrf::net {
/**
* @class unix_socket_server
* @author doralitze
* @date 12/03/23
* @file unix_socket_server.hpp
* @brief A unix file socket server
*/
class unix_socket_server : public async_server_socket
{
private:
std::string socket_path;
public:
unix_socket_server(const socketaddr& socket_identifier, async_server_socket::accept_handler_type client_listener_);
virtual ~unix_socket_server();
public:
virtual std::shared_ptr<connection_client> await_raw_socket_incomming(const auto_fd& socket);
};
}
......@@ -46,6 +46,11 @@ BOOST_AUTO_TEST_CASE(Socketaddr_IPv6_Construction_Test) {
BOOST_CHECK_EQUAL(sa.str(), "SocketAddress: IPv6 [::]:80");
}
BOOST_AUTO_TEST_CASE(Socketaddr_IPv6_Any_Interface_Construction_Test) {
const auto sa = get_first_general_socketaddr("[::]", "80");
BOOST_CHECK_EQUAL(sa.str(), "SocketAddress: IPv6 [::]:80");
}
BOOST_AUTO_TEST_CASE(Socketaddr_IPv6_Localhost_Construction_Test) {
sockaddr_in6 addr_ip6;
addr_ip6.sin6_family = AF_INET6;
......@@ -62,3 +67,8 @@ BOOST_AUTO_TEST_CASE(Socketaddr_comparison) {
BOOST_CHECK_NE(get_first_general_socketaddr("[::1]", "443"), get_first_general_socketaddr("::1", "80"));
BOOST_CHECK_EQUAL(get_first_general_socketaddr("[::1]", "443"), get_first_general_socketaddr("::1", "443"));
}
BOOST_AUTO_TEST_CASE(Unix_socket_construction_test) {
const auto sa = get_first_general_socketaddr("/tmp/9Lq7BNBnBycd6nxy.socket", "", socket_t::UNIX);
BOOST_CHECK_EQUAL(sa.str(), "SocketAddress: FileSocket /tmp/9Lq7BNBnBycd6nxy.socket");
}
......@@ -15,6 +15,7 @@
#include "net/socketaddress.hpp"
#include "net/sock_address_factory.hpp"
#include "net/tcp_server_socket.hpp"
#include "net/unix_socket_server.hpp"
#include "lib/ev/ev.hpp"
#include "mumta/evloop.hpp"
......@@ -27,6 +28,7 @@ const std::string udp_test_string = "TEST UDP PACKET";
volatile bool tcp_called = false;
volatile bool udp_called = false;
volatile bool unix_called = false;
int udp_source_family;
......@@ -118,6 +120,38 @@ void run_tcp_test(const socketaddr& interface_addr) {
server.reset();
}
void run_unix_test() {
using namespace std::chrono_literals;
const socketaddr socket_name = get_first_general_socketaddr("/tmp/9Lq7BNBnBycd6nxy.socket", "", socket_t::UNIX);
auto server = std::make_shared<unix_socket_server>(socket_name,
[](std::shared_ptr<async_server_socket> s, std::shared_ptr<connection_client> c) {
BOOST_REQUIRE(s);
BOOST_REQUIRE(c);
c->set_incomming_data_callback(
[c](const iorecord& data) {
c->write_data(data);
});
});
auto client = connect(socket_name);
client->set_incomming_data_callback(
[](const iorecord& data) {
BOOST_CHECK_EQUAL(data.str(), "Moin, von UNIX");
unix_called = true;
});
const std::string moin_string("Moin, von UNIX");
client->write_data(iorecord(moin_string.c_str(), moin_string.length()));
std::this_thread::yield();
std::this_thread::sleep_for(100ms);
client.reset();
server.reset();
}
BOOST_AUTO_TEST_CASE(Netio_Socket_TCP) {
using namespace std::chrono_literals;
......@@ -149,3 +183,18 @@ BOOST_AUTO_TEST_CASE(Netio_Socket_UDP) {
BOOST_CHECK(udp_called);
}
BOOST_AUTO_TEST_CASE(Netio_Socket_UNIX) {
using namespace std::chrono_literals;
std::thread ev_thread(ev_thread_callable);
BOOST_CHECK_NO_THROW(run_unix_test()); // TODO put in own test while keeping same ev loop setup
// Sleep one second without using ev timer
std::this_thread::sleep_for(500ms);
ev_thread.join();
BOOST_CHECK(unix_called);
}