Skip to content
Snippets Groups Projects
Commit 0ead1e34 authored by Benny Baumann's avatar Benny Baumann
Browse files

add: Basic UDP client implementation

parent 2d1f0d2d
No related branches found
No related tags found
1 merge request!1First unit tests
#pragma once
#include <algorithm>
#include <array>
#include <cstddef>
#include <ev++.h>
#include <memory>
#include <string>
#include <sstream>
#include <stdexcept>
#include <ostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include "net/async_fd.hpp"
#include "net/connection_client.hpp"
#include "net/ioqueue.hpp"
#include "net/socketaddress.hpp"
#include "net/sock_address_factory.hpp"
#include <iostream>
namespace rmrf::net {
template<std::size_t pkg_size = 1024>
class udp_packet {
typedef const uint8_t& const_reference;
typedef uint8_t* iterator;
private:
std::array<uint8_t, pkg_size> arr;
size_t actual_length;
public:
udp_packet() : arr{}, actual_length(0) { }
inline constexpr size_t max_size() const {
return arr.max_size();
}
inline size_t size() const {
return actual_length;
}
inline size_t length() const {
return actual_length;
}
inline constexpr uint8_t at(size_t pos) const {
return arr.at(pos);
}
inline constexpr const_reference operator[](size_t pos) const noexcept {
return arr.operator[](pos);
}
inline uint8_t& operator[](size_t pos) noexcept {
return arr.operator[](pos);
}
inline bool advance(size_t i) {
if (this->actual_length + i > this->max_size()) {
return false;
}
this->actual_length += i;
return true;
}
inline size_t append(const iterator begin, const iterator end) {
const auto remaining_length = pkg_size - this->length();
size_t i = 0;
for (; i < remaining_length && begin + i < end; i++) {
this->arr[this->length() + i] = begin[i];
}
this->advance(i);
return i;
}
constexpr const uint8_t* raw() const noexcept {
return this->arr.data();
}
constexpr uint8_t* raw() noexcept {
return this->arr.data();
}
constexpr std::array<uint8_t, pkg_size>::iterator begin() noexcept {
return this->arr.begin();
}
constexpr std::array<uint8_t, pkg_size>::iterator end() noexcept {
return this->arr.end();
}
constexpr std::array<uint8_t, pkg_size>::const_iterator cbegin() const noexcept {
return this->arr.cbegin();
}
constexpr std::array<uint8_t, pkg_size>::const_iterator cend() const noexcept {
return this->arr.cend();
}
udp_packet<pkg_size>& operator<<(const std::string& obj) {
const auto max_space = pkg_size - this->length();
const auto str_length = obj.length();
strncpy((char*) (this->raw() + this->length()), obj.c_str(), max_space);
this->advance(std::min(str_length, max_space));
return *this;
}
template<class T>
udp_packet<pkg_size>& operator<<(const T& obj) {
const auto size_to_copy = obj.end() - obj.begin();
if (auto error = memcpy_s(this->raw() + this->length(), pkg_size - this->length(), obj.begin(), size_to_copy); error != 0) {
std::stringstream ss;
ss << "Unable to concat object. Errorcode: " << error << ". Is the memory structure too large for this packet?";
throw std::invalid_argument(ss.str());
}
this->advance(size_to_copy);
return *this;
}
};
class address_augmented_iorecord : public iorecord {
private:
const socketaddr addr;
public:
address_augmented_iorecord(const void* buf, size_t size, socketaddr _addr) : iorecord{buf, size}, addr(_addr) {}
address_augmented_iorecord(const iorecord& ior, socketaddr _addr) : iorecord{ior}, addr(_addr) {}
address_augmented_iorecord() : iorecord{}, addr{} {};
address_augmented_iorecord(const address_augmented_iorecord& _other) = default;
const socketaddr& get_address() const {
return this->addr;
}
};
/**
* This class provides a UDP server / client wrapper. Please note that it is packet based and thus not based
* on connection client on purpose.
* @class udp_client
* @author leondietrich
* @date 04/12/21
* @file udp_client.hpp
* @brief A UDP socket wrapper class
*/
template<std::size_t pkg_size = 1024>
class udp_client : public std::enable_shared_from_this<udp_client<pkg_size>> {
public:
static const unsigned int no_rate_limit = 0;
typedef std::function<void(const udp_packet<pkg_size>& data, socketaddr& source)> in_packet_cb;
private:
auto_fd net_socket;
::ev::io io;
ioqueue<address_augmented_iorecord> write_queue;
const in_packet_cb _in_cb;
unsigned int rate_limit = no_rate_limit;
unsigned int current_burst_count = 0;
bool data_write_active = false;
int send_flags = MSG_DONTWAIT;
public:
udp_client(
socketaddr _addr,
in_packet_cb _incomming_cb = nullptr
) :
net_socket(null_fd()),
io{},
write_queue{},
_in_cb(_incomming_cb)
{
static_assert(pkg_size < 65536);
const auto family = _addr.family();
if (!(family == AF_INET || family == AF_INET6)) {
std::stringstream ss;
ss << "Invalid IP address family. (" << family << ")";
throw netio_exception(ss.str());
}
this->net_socket = auto_fd(socket(family, SOCK_DGRAM, 0));
if (!this->net_socket.valid()) {
// TODO implement proper error handling
throw netio_exception("Failed to request socket fd from kernel.");
}
if (_incomming_cb != nullptr) {
if (auto error = bind(this->net_socket.get(), _addr.ptr(), _addr.size()); error != 0) {
throw netio_exception("Failed to bind UDP socket: " + std::to_string(error));
}
}
if (
const auto existing_fd_flags = fcntl(this->net_socket.get(), F_GETFL, 0);
existing_fd_flags == -1 || fcntl(this->net_socket.get(), F_SETFL, existing_fd_flags | O_NONBLOCK) == -1
) {
throw netio_exception("Failed to set socket mode. fcntl resulted in error:" + std::to_string(errno));
}
io.set<udp_client, &udp_client::cb_ev>(this);
const auto initial_state = _incomming_cb != nullptr ? ::ev::READ : 0;
io.start(this->net_socket.get(), initial_state);
//std::cout << "UDP socket created: state=" << initial_state << std::endl;
}
udp_client(
const std::string& interface_to_bind,
uint16_t port,
in_packet_cb _incomming_cb = nullptr
) :
udp_client{get_first_general_socketaddr(interface_to_bind, port, socket_t::UDP), _incomming_cb}
{}
~udp_client() {
io.stop();
}
/**
* @brief Set a maximum number of packets to transmit per burst
* @param new_limit The limit to set
*/
inline void set_rate_limit(unsigned int new_limit) {
this->rate_limit = new_limit;
}
inline unsigned int get_rate_limit() {
return this->rate_limit;
}
void send_packet(const socketaddr& destination, const udp_packet<pkg_size>& data) {
this->write_queue.push_back(address_augmented_iorecord{data.raw(), data.size(), destination});
this->io.set(::ev::READ | ::ev::WRITE);
}
/**
* Enable or disable UDP confirm mode. Disabled by default. If enabled every
* device handling the send packet shall report if the path was viable or not. This
* is useful for debugging but may negatively impact performance.
* @brief Enable or disable UDP confirm mode
* @param enabled If set to true the UDP confirm mode will be activated.
*/
void enable_confirm_mode(bool enabled) {
if (enabled) {
this->send_flags |= MSG_CONFIRM;
} else {
this->send_flags &= ~MSG_CONFIRM;
}
};
private:
inline int get_io_read_state() const {
return this->_in_cb != nullptr ? ::ev::READ : 0;
}
/**
* This method implements the io queue callback. It is responsible for the actual data transactions
* @param w The io handle to use
* @param events The event flag container
*/
void cb_ev(::ev::io& w, int events) {
if (events & ::ev::ERROR) {
throw netio_exception("Error while decoding events from libev: state=" + std::to_string(events) + ", current_send_flags=" + std::to_string(this->send_flags));
}
if (events & ::ev::READ && this->_in_cb != nullptr) {
read_from_buffer(w);
}
if (events & ::ev::WRITE && (this->current_burst_count < this->rate_limit || this->rate_limit == this->no_rate_limit)) {
send_write_buffer(w);
}
const auto state = this->get_io_read_state();
if (write_queue.empty()) {
io.set(state);
} else {
io.set(state | ::ev::WRITE);
}
}
void send_write_buffer(::ev::io& w) {
// read packet buffer and dispatch it
// TODO fix destination address
// TODO register a timer to decrease rate limit if enabled.
if (this->write_queue.empty()) {
io.set(get_io_read_state());
return;
}
this->data_write_active = true;
auto buffer = this->write_queue.pop_front();
const auto dest_addr = buffer.get_address();
ssize_t written = sendto(w.fd, buffer.ptr(), buffer.size(), this->send_flags, dest_addr.ptr(), dest_addr.size());
if (written >= 0) {
buffer.advance((size_t)written);
} else if (EAGAIN_WRAPPER) {
throw netio_exception("Failed to write latest buffer content.");
}
this->write_queue.push_front(buffer);
this->data_write_active = false;
this->current_burst_count++;
}
void read_from_buffer(::ev::io& w) {
sockaddr_storage source_addr_raw{};
socklen_t srcaddr_len = sizeof(source_addr_raw);
udp_packet<pkg_size> p;
auto read_bytes_or_error = recvfrom(w.fd, p.raw(), p.max_size(), 0, (sockaddr*)&source_addr_raw, &srcaddr_len);
if (read_bytes_or_error < 0) {
throw netio_exception("Failed to read UDP packet. err:" + std::to_string(read_bytes_or_error));
}
p.advance(read_bytes_or_error);
socketaddr source_address{source_addr_raw};
this->_in_cb(p, source_address);
}
};
}
std::ostream& operator<<(std::ostream& os, const rmrf::net::udp_packet<>& obj) {
return os << "[UDP-Packet with size " << obj.size() << ']';
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment