-
Benny Baumann authoredBenny Baumann authored
tcp_client.cpp 8.18 KiB
/*
* tcp_client.cpp
*
* Created on: 03.01.2021
* Author: doralitze
*/
#include "net/tcp_client.hpp"
#include <ev++.h>
#include <fcntl.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <utility>
#include <deque>
#include "net/netio_exception.hpp"
#include "net/socketaddress.hpp"
namespace rmrf::net {
tcp_client::tcp_client(
const destructor_cb_type destructor_cb_,
auto_fd &&socket_fd,
std::string peer_address_,
uint16_t port_
) :
connection_client{},
destructor_cb(destructor_cb_),
peer_address(peer_address_), port(port_),
net_socket(std::forward<auto_fd>(socket_fd)),
io{}, write_queue{}
{
io.set<tcp_client, &tcp_client::cb_ev>(this);
io.start(this->net_socket.get(), ::ev::READ);
// TODO log created client
}
tcp_client::tcp_client(
const destructor_cb_type destructor_cb_,
auto_fd &&socket_fd,
const socketaddr &peer_address_
) :
connection_client{},
destructor_cb(destructor_cb_),
peer_address{peer_address_.str()},
port{},
net_socket(std::forward<auto_fd>(socket_fd)),
io{},
write_queue{}
{
io.set<tcp_client, &tcp_client::cb_ev>(this);
io.start(this->net_socket.get(), ::ev::READ);
// TODO log created client
}
tcp_client::tcp_client(
const std::string &peer_address_,
const std::string &service_or_port,
int ip_addr_family
) :
connection_client{},
destructor_cb(nullptr),
peer_address(peer_address_),
port(0),
net_socket(nullfd),
io{},
write_queue{}
{
if (!(ip_addr_family == AF_INET || ip_addr_family == AF_INET6)) {
throw netio_exception("Invalid IP address family.");
}
this->net_socket = auto_fd(socket(ip_addr_family, SOCK_STREAM, 0));
if (!this->net_socket.valid()) {
// TODO implement proper error handling
throw netio_exception("Failed to request socket fd from kernel.");
}
// TODO Extract DNS/service resolution into separate library
// TODO build another nice HL structure wrapper for outbound connections
std::deque<socketaddr> connection_candidates;
int status;
{
//Reduce scope of locally declared variables
//May be candidate for extraction into own method
addrinfo hints;
addrinfo* servinfo = nullptr;
memset(&hints, 0, sizeof hints);
hints.ai_family = ip_addr_family;
hints.ai_socktype = SOCK_STREAM;
if ((status = getaddrinfo(peer_address_.c_str(), service_or_port.c_str(), &hints, &servinfo)) != 0) {
throw netio_exception("Failed to resolve address '" + peer_address_ + "' with service '" + service_or_port + "': " + gai_strerror(status));
}
// TODO: Prefer IPv6 over IPv4
for (auto p = servinfo; p != NULL; p = p->ai_next) {
if (p->ai_family == AF_INET) {
socketaddr sa{(sockaddr_in*)p->ai_addr};
connection_candidates.push_back(sa);
} else if (p->ai_family == AF_INET6) {
socketaddr sa{(sockaddr_in6*)p->ai_addr};
connection_candidates.push_front(sa);
}
}
freeaddrinfo(servinfo);
}
status = 1;
do {
if (connection_candidates.empty()) {
throw netio_exception("Unable to find suitable connection candidate.");
}
socketaddr socket_identifier = connection_candidates.front();
connection_candidates.pop_front();
auto_fd socket_candidate{socket(socket_identifier.family(), SOCK_STREAM, 0)};
if (socket_candidate.valid()) {
if (connect(socket_candidate.get(), socket_identifier.ptr(), socket_identifier.size()) == 0) {
status = 0;
this->net_socket = std::forward<auto_fd>(socket_candidate);
if (
const auto existing_fd_flags = fcntl(this->net_socket.get(), F_GETFL, 0);
existing_fd_flags == -1 || fcntl(this->net_socket.get(), F_SETFL, existing_fd_flags | O_NONBLOCK) == -1
) {
throw netio_exception("Failed to set socket mode. fcntl resulted in error:" + std::to_string(errno));
}
// Hier bin ich mir nicht sicher, wie ich das am besten mache. Auch mit socketaddr und type cast ist das irgendwie doof.
// Das Problem besteht darin, dass erst nach erfolgreichem connect der Port auf dieser Seite bekannt ist.
socketaddr sa_local;
socklen_t sa_local_len = sizeof(sockaddr_storage);
if (getsockname(this->net_socket.get(), sa_local.ptr(), &sa_local_len)) {
// Update length field after the internal structure was modified
// TODO: Maybe make this an internal method in socketaddr to update the size
sa_local = sa_local.ptr();
//The pointer casts are safe due to operator overloading in socketaddr ...
switch (sa_local.family()) {
case AF_INET:
this->port = ntohs(((sockaddr_in*)sa_local)->sin_port);
break;
case AF_INET6:
this->port = ntohs(((sockaddr_in6*)sa_local)->sin6_port);
break;
default:
throw netio_exception("Invalid/unexpected local socket address type");
}
}
}
}
// We don't need to worry about closing broken fd as auto_fd handles this for us
} while (status == 1);
io.set<tcp_client, &tcp_client::cb_ev>(this);
io.start(this->net_socket.get(), ::ev::READ);
//TODO log connected client
}
tcp_client::tcp_client(
const std::string &peer_address_,
const std::string &service_or_port
) :
tcp_client(peer_address_, service_or_port, AF_UNSPEC)
{}
tcp_client::tcp_client(
const std::string &peer_address_,
const uint16_t port_
) :
tcp_client(peer_address_, std::to_string(port_))
{}
tcp_client::~tcp_client() {
io.stop();
if (destructor_cb) {
destructor_cb(exit_status_t::NO_ERROR);
}
}
void tcp_client::write_data(const std::string &data) {
// Create NICBuffer from data
this->write_queue.push_back(iorecord{data.c_str(), data.size()});
this->io.set(::ev::READ | ::ev::WRITE);
}
inline std::string buffer_to_string(char* buffer, ssize_t bufflen) {
return std::string(buffer, (size_t)bufflen);
}
void tcp_client::cb_ev(::ev::io &w, int events) {
if (events & ::ev::ERROR) {
// Handle errors
// Log and throw?
io.stop();
throw netio_exception("TCP client error");
}
if (events & ::ev::READ) {
// notify incomming_data_cb
char buffer[1024];
ssize_t n_read_bytes = recv(w.fd, buffer, sizeof(buffer), 0);
if (n_read_bytes < 0) {
throw netio_exception("Failed to read from network socket in TCP client.");
}
if (n_read_bytes == 0) {
// TODO find a way to properly announce the closed connection
io.stop();
return;
} else {
this->in_data_cb(buffer_to_string(buffer, n_read_bytes));
}
}
if (events & ::ev::WRITE) {
// Handle sending data
push_write_queue(w);
}
if (write_queue.empty()) {
io.set(::ev::READ);
} else {
io.set(::ev::READ | ::ev::WRITE);
}
}
void tcp_client::push_write_queue(::ev::io &w) {
if (this->write_queue.empty()) {
io.set(::ev::READ);
return;
}
this->data_write_active = true;
iorecord buffer = this->write_queue.pop_front();
ssize_t written = write(w.fd, buffer.ptr(), buffer.size());
if (written >= 0) {
buffer.advance((size_t)written);
} else if (EAGAIN_WRAPPER) {
throw netio_exception("Failed to write latest buffer content.");
}
this->write_queue.push_front(buffer);
this->data_write_active = false;
}
inline std::string tcp_client::get_peer_address() {
return this->peer_address;
}
inline uint16_t tcp_client::get_port() {
return this->port;
}
inline bool tcp_client::is_write_queue_empty() {
return this->write_queue.empty() && !this->data_write_active;
}
}