Improved UDP functionality and reduced complexity of internal Communications.

This commit is contained in:
Arron David Nelson 2023-12-18 21:04:53 -08:00
parent 0a6f5533ee
commit 49f69372a9
8 changed files with 230 additions and 729 deletions

View File

@ -9,7 +9,7 @@ namespace ehs
class BaseUDP
{
protected:
AddrType addrType;
AddrType type;
Str_8 address;
UInt_16 port;
bool bound;
@ -19,7 +19,7 @@ namespace ehs
BaseUDP();
BaseUDP(const AddrType addrType);
BaseUDP(AddrType type);
BaseUDP(BaseUDP&& udp) noexcept;
@ -31,19 +31,19 @@ namespace ehs
virtual void Release() = 0;
virtual void Bind(const Str_8& address, const UInt_16 port) = 0;
virtual void Bind(AddrType type, const Str_8& address, UInt_16 port) = 0;
virtual UInt_64 Send(const Str_8& addr, const UInt_16 port, const Byte* const data, const UInt_64 size) = 0;
virtual UInt_64 Send(AddrType type, const Str_8& address, UInt_16 port, const Byte* data, UInt_64 size) = 0;
virtual UInt_64 Receive(Str_8* const addr, UInt_16* const port, Byte* const data, const UInt_64 size) = 0;
virtual UInt_64 Receive(AddrType* type, Str_8* address, UInt_16* port, Byte* data, UInt_64 size) = 0;
bool IsBound() const;
virtual void SetBlocking(const bool blocking) = 0;
virtual void SetBlocking(bool blocking) = 0;
virtual bool IsBlocking() const = 0;
AddrType GetAddressType() const;
AddrType GetLocalAddressType() const;
Str_8 GetLocalAddress() const;

View File

@ -21,7 +21,7 @@ namespace ehs
private:
friend class Endpoint;
static const Version ver;
static const Version version;
static const UInt_64 internalSys;
static const UInt_64 connectOp;
static const UInt_64 connectedOp;
@ -34,16 +34,12 @@ namespace ehs
static const UInt_64 latencyOp;
static const UInt_64 receivedOp;
Socket hdl;
AddrType type;
Str_8 address;
UInt_16 port;
bool bound;
UDP udp;
Version appVer;
EndDisp disposition;
bool dropPackets;
Str_8 id;
UInt_32 hashId;
Str_8 id;
Byte* buffer;
UInt_32 bufferSize;
Array<CommsSystem*> systems;
@ -62,7 +58,7 @@ namespace ehs
Comms();
Comms(const Version& ver, const EndDisp disposition, const Str_8& id, const UInt_64 maxEndpoints);
Comms(const Version& ver, EndDisp disposition, const Str_8& id, UInt_64 maxEndpoints);
Comms(const Comms& sock);
@ -72,43 +68,41 @@ namespace ehs
void UnInitialize();
void Bind(const Str_8& newAddress, const UInt_16 newPort);
void Bind(AddrType newType, const Str_8& newAddress, UInt_16 newPort);
void Connect(const Str_8& address, const UInt_16 port);
void Connect(AddrType rType, const Str_8& rAddress, UInt_16 rPort);
bool Disconnect(const EndDisp disposition, const UInt_64 hashId, const Str_8& msg);
bool Disconnect(EndDisp endDisp, UInt_64 endHashId, const Str_8& msg);
bool Disconnect(const EndDisp disposition, const Str_8& id, const Str_8& msg);
bool Disconnect(EndDisp endDisp, const Str_8& endId, const Str_8& msg);
void Broadcast(const EndDisp disposition, const Status status, const bool deltaLocked, const bool encrypted,
const bool ensure, const UInt_64 sysHashId, const UInt_64 opHashId,
void Broadcast(EndDisp endDisp, Status endStatus, bool deltaLocked, bool encrypted,
bool ensure, UInt_64 sysHashId, UInt_64 opHashId,
const Serializer<>& payload);
void Broadcast(const EndDisp disposition, const Status status, const bool deltaLocked, const bool encrypted,
const bool ensure, const Str_8& sysId, const Str_8& opId,
void Broadcast(EndDisp endDisp, Status endStatus, bool deltaLocked, bool encrypted,
bool ensure, const Str_8& sysId, const Str_8& opId,
const Serializer<>& payload);
void Poll();
bool IsInitialized() const;
void SetAddressType(const AddrType newType);
AddrType GetLocalAddressType() const;
AddrType GetAddressType() const;
Str_8 GetLocalAddress() const;
Str_8 GetAddress() const;
UInt_16 GetPort() const;
UInt_16 GetLocalPort() const;
bool IsBound() const;
Version GetVersion() const;
static Version GetVersion();
Version GetAppVersion() const;
EndDisp GetDisposition() const;
void EnableDropPackets(const bool enable);
void EnableDropPackets(bool enable);
bool IsDropPacketsEnabled() const;
@ -116,55 +110,55 @@ namespace ehs
UInt_64 GetHashId() const;
bool HasSystem(const UInt_64 hashId) const;
bool HasSystem(UInt_64 sysHashId) const;
bool HasSystem(const Str_8& id) const;
bool HasSystem(const Str_8& sysId) const;
bool AddSystem(CommsSystem* sys);
CommsSystem* GetSystem(const UInt_64 hashId);
CommsSystem* GetSystem(UInt_64 sysHashId);
CommsSystem* GetSystem(const Str_8& id);
CommsSystem* GetSystem(const Str_8& sysId);
bool HasEndpoint(const EndDisp disposition, const Status status, const UInt_64 hashId) const;
bool HasEndpoint(EndDisp endDisp, Status endStatus, UInt_64 endHashId) const;
bool HasEndpoint(const EndDisp disposition, const Status status, const Str_8& id) const;
bool HasEndpoint(EndDisp endDisp, Status endStatus, const Str_8& endId) const;
bool HasEndpoint(const EndDisp disposition, const UInt_64 hashId) const;
bool HasEndpoint(EndDisp endDisp, UInt_64 endHashId) const;
bool HasEndpoint(const EndDisp disposition, const Str_8& id) const;
bool HasEndpoint(EndDisp endDisp, const Str_8& endId) const;
bool HasEndpoint(const Str_8& address, const UInt_16 port) const;
bool HasEndpoint(const Str_8& rAddress, UInt_16 rPort) const;
Endpoint* GetEndpoint(const EndDisp disposition, const Status status, const UInt_64 hashId);
Endpoint* GetEndpoint(EndDisp endDisp, Status endStatus, UInt_64 endHashId);
Endpoint* GetEndpoint(const EndDisp disposition, const Status status, const Str_8& id);
Endpoint* GetEndpoint(EndDisp endDisp, Status endStatus, const Str_8& endId);
Endpoint* GetEndpoint(const EndDisp disposition, const UInt_64 hashId);
Endpoint* GetEndpoint(EndDisp endDisp, UInt_64 endHashId);
Endpoint* GetEndpoint(const EndDisp disposition, const Str_8& id);
Endpoint* GetEndpoint(EndDisp endDisp, const Str_8& endId);
Endpoint* GetEndpoint(const Str_8& address, const UInt_16 port);
Endpoint* GetEndpoint(const Str_8& rAddress, UInt_16 rPort);
Array<Endpoint*> GetEndpoints(const EndDisp disposition, const Status status);
Array<Endpoint*> GetEndpoints(EndDisp endDisp, Status endStatus);
Array<Endpoint*> GetEndpoints(const EndDisp disposition);
Array<Endpoint*> GetEndpoints(EndDisp endDisp);
UInt_64 GetEndpointsCount(const EndDisp disposition, const Status status);
UInt_64 GetEndpointsCount(EndDisp endDisp, Status endStatus);
UInt_64 GetEndpointsCount(const EndDisp disposition);
UInt_64 GetEndpointsCount(EndDisp endDisp);
UInt_64 GetMaxEndpoints() const;
void SetBlocking(const bool blocking);
void SetBlocking(bool blocking);
bool IsBlocking() const;
void SetMaxTimeout(const float seconds);
void SetMaxTimeout(float seconds);
float GetMaxTimeout() const;
void SetResendRate(const float seconds);
void SetResendRate(float seconds);
float GetResendRate() const;
@ -179,18 +173,12 @@ namespace ehs
void UpdateQueue();
bool RemoveEndpoint(const EndDisp disposition, const UInt_64 hashId);
bool RemoveEndpoint(EndDisp disposition, UInt_64 hashId);
bool RemoveEndpoint(const Str_8& address, const UInt_16 port);
bool RemoveEndpoint(const Str_8& address, UInt_16 port);
bool RemoveEndpoint(const Endpoint* const end);
bool RemoveEndpoint(const Endpoint* end);
void PollEndpoints(Vector<Endpoint*>& endpoints);
void Bind_v6(const Str_8& address, const UInt_16 port);
void Bind_v4(const Str_8& address, const UInt_16 port);
UInt_16 Receive(Str_8* address, UInt_16* port, Byte* const data, const UInt_16 size);
void PollEndpoints();
};
}

View File

@ -27,6 +27,7 @@ namespace ehs
Vector<Insurance> sent;
UInt_64 nextRecvId;
Vector<Fragments> received;
AddrType type;
Str_8 address;
UInt_16 port;
float deltaDuration;
@ -40,20 +41,20 @@ namespace ehs
public:
Endpoint();
Endpoint(Comms* owner, const EndDisp disposition, const Architecture arch, const Str_8& id,
const AddrType& type, const Str_8& address, const UInt_16 port);
Endpoint(Comms* owner, EndDisp disposition, Architecture arch, Str_8 id,
AddrType type, Str_8 address, UInt_16 port);
Endpoint(Comms* owner, const AddrType& type, const Str_8& address, const UInt_16 port);
Endpoint(Comms* owner, AddrType type, Str_8 address, UInt_16 port);
Endpoint(const Endpoint& end);
Endpoint& operator=(const Endpoint& end);
void Poll(const float delta);
void Poll(float delta);
EndDisp GetDisposition() const;
void SetStatus(const Status newStatus);
void SetStatus(Status newStatus);
Status GetStatus() const;
@ -72,8 +73,7 @@ namespace ehs
/// @param [in] sys The system hash id to execute an operation from.
/// @param [in] op The operation hash id in the system to execute.
/// @param [in] payload Additional parameters and data to send to the remote endpoint.
void Send(const bool deltaLocked, const bool encrypted, const bool ensure, const UInt_64 sys,
const UInt_64 op, const Serializer<>& payload);
void Send(bool deltaLocked, bool encrypted, bool ensure, UInt_64 sys, UInt_64 op, const Serializer<UInt_64>& payload);
/// Sends data to the remote endpoint.
/// @param [in] deltaLocked Whether or not to match the remote endpoint's delta time to prevent overloading the client. This will drop data if delta time does not match.
@ -82,10 +82,9 @@ namespace ehs
/// @param [in] sys The system string id to execute an operation from.
/// @param [in] op The operation string id in the system to execute.
/// @param [in] payload Additional parameters and data to send to the remote endpoint.
void Send(const bool deltaLocked, const bool encrypted, const bool ensure, const Str_8& sys,
const Str_8& op, const Serializer<>& payload);
void Send(bool deltaLocked, bool encrypted, bool ensure, const Str_8& sys, const Str_8& op, const Serializer<UInt_64>& payload);
void RemoveInsurance(const UInt_64 msgId, const UInt_64 fragment);
void RemoveInsurance(UInt_64 msgId, UInt_64 fragment);
UInt_64 GetNextRecvId() const;
@ -99,7 +98,7 @@ namespace ehs
UInt_16 GetPort() const;
void SetDeltaRate(const float newDeltaRate);
void SetDeltaRate(float newDeltaRate);
float GetDeltaRate() const;
@ -107,17 +106,17 @@ namespace ehs
float GetLastPing() const;
void Ping(const float delta);
void Ping(float delta);
void Pong(const float delta);
void Pong(float delta);
void SendLatency();
void SetLatency(const float newLatency);
void SetLatency(float newLatency);
float GetLatency() const;
void SetQueueSlot(const UInt_64 slot);
void SetQueueSlot(UInt_64 slot);
UInt_64 GetQueueSlot() const;
@ -129,9 +128,5 @@ namespace ehs
bool SortingNeeded() const;
void SortReceived();
UInt_16 Send_v6(const Serializer<>& payload);
UInt_16 Send_v4(const Serializer<>& payload);
};
}

View File

@ -19,7 +19,7 @@ namespace ehs
UDP();
/// Default members initialization.
UDP(const AddrType addrType);
UDP(AddrType type);
UDP(UDP&& udp) noexcept;
@ -41,7 +41,7 @@ namespace ehs
/// @param [in] address The local IPv4 or IPv6 address to bind to. Resolves domain names. The given address can be empty, "127.0.0.1", or "localhost" to automatically find the appropriate device.
/// @param [in] port The port to bind to.
/// @note Requires the port given to be forwarded if this is called.
void Bind(const Str_8& address, const UInt_16 port) override;
void Bind(AddrType type, const Str_8& address, UInt_16 port) override;
/// Sends data using a C-style byte array.
/// @param [in] addr The remote Ipv4 or Ipv6 address to send to. Resolves domain names. The given address can be empty, "127.0.0.1", or "localhost" to automatically find the appropriate device.
@ -49,7 +49,7 @@ namespace ehs
/// @param [in] data The C-style byte array to send.
/// @param [in] size The size of the C-style byte array.
/// @note The size of data to be sent cannot exceed "UDP::maxPayloadIpv4" or "UDP::maxPayloadIpv6".
UInt_64 Send(const Str_8& addr, const UInt_16 port, const Byte* const data, const UInt_64 size) override;
UInt_64 Send(AddrType type, const Str_8& address, UInt_16 port, const Byte* data, UInt_64 size) override;
/// Receives data using the packet helper class.
/// @param [out] addr The Ipv4 or Ipv6 address of the sender.
@ -58,11 +58,11 @@ namespace ehs
/// @param [in] size The size of the pre-allocated C-style byte array.
/// @returns The size of the data received.
/// @warning The provided C-style byte array must be freed when finished using.
UInt_64 Receive(Str_8* const addr, UInt_16* const port, Byte* const data, const UInt_64 size) override;
UInt_64 Receive(AddrType* type, Str_8* address, UInt_16* port, Byte* data, UInt_64 size) override;
/// Sets whether or not receiving data blocks the next task.
/// @param [in] blocking Whether or not to block.
void SetBlocking(const bool blocking) override;
void SetBlocking(bool blocking) override;
/// Retrieves whether or not this socket will block when receiving data.
/// @returns The result.
@ -71,12 +71,12 @@ namespace ehs
bool IsValid() const override;
private:
void Bind_v6(const Str_8& address, const UInt_16 port);
void Bind_v6(const Str_8& address, UInt_16 port) const;
void Bind_v4(const Str_8& address, const UInt_16 port);
void Bind_v4(const Str_8& address, UInt_16 port) const;
UInt_64 Send_v6(const Str_8& addr, const UInt_16 port, const Byte* const data, const UInt_64 size);
UInt_64 Send_v6(const Str_8& address, UInt_16 port, const Byte* data, UInt_64 size);
UInt_64 Send_v4(const Str_8& addr, const UInt_16 port, const Byte* const data, const UInt_64 size);
UInt_64 Send_v4(const Str_8& address, UInt_16 port, const Byte* data, UInt_64 size);
};
}

View File

@ -3,25 +3,25 @@
namespace ehs
{
BaseUDP::BaseUDP()
: addrType(AddrType::IPV6), port(0), bound(false)
: type(AddrType::IPV6), port(0), bound(false)
{
}
BaseUDP::BaseUDP(const AddrType addrType)
: addrType(addrType), port(0), bound(false)
BaseUDP::BaseUDP(const AddrType type)
: type(type), port(0), bound(false)
{
}
BaseUDP::BaseUDP(BaseUDP&& udp) noexcept
: addrType(udp.addrType), address(std::move(udp.address)), port(udp.port), bound(true)
: type(udp.type), address(std::move(udp.address)), port(udp.port), bound(true)
{
udp.addrType = AddrType::IPV6;
udp.type = AddrType::IPV6;
udp.port = 0;
udp.bound = false;
}
BaseUDP::BaseUDP(const BaseUDP& udp)
: addrType(udp.addrType), address(udp.address), port(udp.port), bound(false)
: type(udp.type), address(udp.address), port(udp.port), bound(false)
{
}
@ -30,12 +30,12 @@ namespace ehs
if (this == &udp)
return *this;
addrType = udp.addrType;
type = udp.type;
address = std::move(udp.address);
port = udp.port;
bound = udp.bound;
udp.addrType = AddrType::IPV6;
udp.type = AddrType::IPV6;
udp.port = 0;
udp.bound = false;
@ -47,7 +47,7 @@ namespace ehs
if (this == &udp)
return *this;
addrType = udp.addrType;
type = udp.type;
address = udp.address;
port = udp.port;
bound = false;
@ -60,9 +60,9 @@ namespace ehs
return bound;
}
AddrType BaseUDP::GetAddressType() const
AddrType BaseUDP::GetLocalAddressType() const
{
return addrType;
return type;
}
Str_8 BaseUDP::GetLocalAddress() const

View File

@ -3,23 +3,9 @@
#include "io/socket/Endpoint.h"
#include "Encryption.h"
#if defined(EHS_OS_WINDOWS)
#include <WinSock2.h>
#include <WS2tcpip.h>
#elif defined(EHS_OS_LINUX)
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <cerrno>
#endif
namespace ehs
{
const Version Comms::ver(1, 0, 0);
const Version Comms::version(1, 0, 0);
const UInt_64 Comms::internalSys = Str_8::Hash_64("Internal");
const UInt_64 Comms::connectOp = Str_8::Hash_64("Connect");
const UInt_64 Comms::connectedOp = Str_8::Hash_64("Connected");
@ -38,7 +24,7 @@ namespace ehs
}
Comms::Comms()
: hdl(EHS_INVALID_SOCKET), type(AddrType::IPV4), port(0), bound(false), appVer(0, 0, 0),
: udp(AddrType::IPV6), appVer(0, 0, 0),
disposition(EndDisp::UNKNOWN), dropPackets(false), hashId(0), buffer(nullptr), bufferSize(0),
maxEndpoints(0), lastTSC(0), delta(0.0f), maxTimeout(5.0f), resendRate(0.5f), connectedCb(nullptr),
activeCb(nullptr), disconnectedCb(nullptr)
@ -47,19 +33,17 @@ namespace ehs
}
Comms::Comms(const Version& ver, const EndDisp disposition, const Str_8& id, const UInt_64 maxEndpoints)
: hdl(EHS_INVALID_SOCKET), type(AddrType::IPV4), port(0), bound(false), appVer(ver), disposition(disposition),
dropPackets(false), id(id), hashId(id.Hash_32()), buffer(nullptr), bufferSize(0),
maxEndpoints(maxEndpoints), lastTSC(CPU::GetTSC()), delta(0.0f), maxTimeout(5.0f), resendRate(0.5f),
connectedCb(nullptr), activeCb(nullptr), disconnectedCb(nullptr)
: udp(AddrType::IPV6), appVer(ver), disposition(disposition), dropPackets(false), hashId(id.Hash_64()), id(id),
buffer(nullptr), bufferSize(0), maxEndpoints(maxEndpoints), lastTSC(CPU::GetTSC()), delta(0.0f),
maxTimeout(5.0f), resendRate(0.5f), connectedCb(nullptr), activeCb(nullptr), disconnectedCb(nullptr)
{
}
Comms::Comms(const Comms& sock)
: BaseObj(sock), hdl(EHS_INVALID_SOCKET), type(sock.type), address(sock.address), port(sock.port), bound(false),
appVer(sock.appVer), disposition(sock.disposition), dropPackets(sock.dropPackets),
id(sock.id), hashId(sock.hashId), buffer(nullptr), bufferSize(0), maxEndpoints(sock.maxEndpoints),
lastTSC(CPU::GetTSC()), delta(0.0f), maxTimeout(sock.maxTimeout), resendRate(sock.resendRate),
connectedCb(nullptr), activeCb(nullptr), disconnectedCb(nullptr)
: BaseObj(sock), udp(sock.udp), appVer(sock.appVer), disposition(sock.disposition),
dropPackets(sock.dropPackets), hashId(sock.hashId), id(sock.id), buffer(nullptr), bufferSize(0),
maxEndpoints(sock.maxEndpoints), lastTSC(CPU::GetTSC()), delta(0.0f), maxTimeout(sock.maxTimeout),
resendRate(sock.resendRate), connectedCb(nullptr), activeCb(nullptr), disconnectedCb(nullptr)
{
AddType("Socket");
}
@ -71,16 +55,12 @@ namespace ehs
BaseObj::operator=(sock);
hdl = EHS_INVALID_SOCKET;
type = sock.type;
address = sock.address;
port = sock.port;
bound = false;
udp = sock.udp;
appVer = sock.appVer;
disposition = sock.disposition;
dropPackets = sock.dropPackets;
id = sock.id;
hashId = sock.hashId;
id = sock.id;
buffer = nullptr;
bufferSize = 0;
systems = Array<CommsSystem*>();
@ -99,53 +79,15 @@ namespace ehs
void Comms::Initialize()
{
if (hdl != EHS_INVALID_SOCKET)
if (!udp.IsValid())
return;
#if defined(EHS_OS_WINDOWS)
WSADATA data = {};
int wsaCode = WSAStartup(MAKEWORD(2, 2), &data);
if (wsaCode)
{
EHS_LOG_INT("Error", 0, "WSAStartup failed with the error #" + Str_8::FromNum(wsaCode) + ".");
return;
}
#endif
if (type == AddrType::IPV6)
hdl = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
else if (type == AddrType::IPV4)
hdl = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
else
return;
if (hdl == EHS_INVALID_SOCKET)
{
UInt_32 code = 0;
#if defined(EHS_OS_WINDOWS)
code = WSAGetLastError();
#elif defined(EHS_OS_LINUX)
code = errno;
#endif
EHS_LOG_INT("Error", 1, "Failed to create socket with error #" + Str_8::FromNum(code) + ".");
#if defined(EHS_OS_WINDOWS)
if (WSACleanup() == SOCKET_ERROR)
EHS_LOG_INT("Error", 2, "Failed to shutdown WSA with error #" + Str_8::FromNum(WSAGetLastError()) + ".");
#endif
return;
}
if (type == AddrType::IPV4)
if (udp.GetLocalAddressType() == AddrType::IPV4)
{
buffer = new Byte[EHS_IPV4_UDP_PAYLOAD];
bufferSize = EHS_IPV4_UDP_PAYLOAD;
}
else if (type == AddrType::IPV6)
else if (udp.GetLocalAddressType() == AddrType::IPV6)
{
buffer = new Byte[EHS_IPV6_UDP_PAYLOAD];
bufferSize = EHS_IPV6_UDP_PAYLOAD;
@ -154,7 +96,7 @@ namespace ehs
void Comms::UnInitialize()
{
if (hdl == EHS_INVALID_SOCKET)
if (!udp.IsValid())
return;
delete[] buffer;
@ -177,67 +119,37 @@ namespace ehs
delete systems[i];
systems.Clear();
Int_32 code = 0;
#if defined(EHS_OS_WINDOWS)
code = closesocket(hdl);
if (code == SOCKET_ERROR)
EHS_LOG_INT("Error", 0, "Failed to close socket with error #" + Str_8::FromNum(GetLastError()) + ".");
#elif defined(EHS_OS_LINUX)
code = close(hdl);
if (code == -1)
EHS_LOG_INT("Error", 0, "Failed to close socket with error #" + Str_8::FromNum(errno) + ".");
#endif
hdl = EHS_INVALID_SOCKET;
#if defined(EHS_OS_WINDOWS)
if (WSACleanup() == SOCKET_ERROR)
EHS_LOG_INT("Error", 1, "Failed to shutdown WSA with error #" + Str_8::FromNum(WSAGetLastError()) + ".");
#endif
bound = false;
udp.Release();
}
void Comms::Bind(const Str_8& newAddress, const UInt_16 newPort)
void Comms::Bind(AddrType newType, const Str_8& newAddress, const UInt_16 newPort)
{
if (hdl == EHS_INVALID_SOCKET || bound)
return;
if (type == AddrType::IPV6)
Bind_v6(newAddress, newPort);
else if (type == AddrType::IPV4)
Bind_v4(newAddress, newPort);
address = newAddress;
port = newPort;
bound = true;
udp.Bind(newType, newAddress, newPort);
}
void Comms::Connect(const Str_8& address, const UInt_16 port)
void Comms::Connect(AddrType rType, const Str_8& rAddress, const UInt_16 rPort)
{
if (hdl == EHS_INVALID_SOCKET)
if (!udp.IsValid())
return;
Serializer payload(Endianness::LE);
payload.Write(CPU::GetArchitecture());
payload.WriteStr(id);
payload.WriteVersion(ver);
payload.WriteVersion(version);
payload.WriteVersion(appVer);
Endpoint* end = new Endpoint(this, type, address, port);
Endpoint* end = new Endpoint(this, rType, rAddress, rPort);
end->Send(false, true, false, "Internal", "Connect", payload);
endpoints.Push(end);
}
bool Comms::Disconnect(const EndDisp disposition, const UInt_64 hashId, const Str_8& msg)
bool Comms::Disconnect(const EndDisp endDisp, const UInt_64 endHashId, const Str_8& msg)
{
if (hdl == EHS_INVALID_SOCKET)
if (!udp.IsValid())
return false;
Endpoint* end = GetEndpoint(disposition, hashId);
Endpoint* end = GetEndpoint(endDisp, endHashId);
if (!end)
return false;
@ -255,50 +167,51 @@ namespace ehs
return true;
}
bool Comms::Disconnect(const EndDisp disposition, const Str_8& id, const Str_8& msg)
bool Comms::Disconnect(const EndDisp endDisp, const Str_8& endId, const Str_8& msg)
{
return Disconnect(disposition, id.Hash_32(), msg);
return Disconnect(endDisp, endId.Hash_64(), msg);
}
void Comms::Broadcast(const EndDisp disposition, const Status status, const bool deltaLocked, const bool encrypted,
void Comms::Broadcast(const EndDisp endDisp, const Status endStatus, const bool deltaLocked, const bool encrypted,
const bool ensure, const UInt_64 sysHashId, const UInt_64 opHashId,
const Serializer<>& payload)
{
if (hdl == EHS_INVALID_SOCKET)
if (!udp.IsValid())
return;
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
{
if (endpoints[i]->GetDisposition() != disposition)
if (endpoints[i]->GetDisposition() != endDisp)
continue;
if (endpoints[i]->GetStatus() == status)
if (endpoints[i]->GetStatus() == endStatus)
endpoints[i]->Send(deltaLocked, encrypted, ensure, sysHashId, opHashId, payload);
}
}
void Comms::Broadcast(const EndDisp disposition, const Status status, const bool deltaLocked, const bool encrypted,
void Comms::Broadcast(const EndDisp endDisp, const Status endStatus, const bool deltaLocked, const bool encrypted,
const bool ensure, const Str_8& sysId, const Str_8& opId,
const Serializer<>& payload)
{
Broadcast(disposition, status, deltaLocked, encrypted, ensure, sysId.Hash_64(), opId.Hash_64(), payload);
Broadcast(endDisp, endStatus, deltaLocked, encrypted, ensure, sysId.Hash_64(), opId.Hash_64(), payload);
}
void Comms::Poll()
{
if (hdl == EHS_INVALID_SOCKET)
if (udp.IsValid())
return;
UInt_64 newTSC = CPU::GetTSC();
delta = (float)(newTSC - lastTSC) / (float)CPU::GetTSC_Freq();
lastTSC = newTSC;
AddrType rType = AddrType::IPV6;
Str_8 rAddress;
UInt_16 rPort = 0;
UInt_16 received = 0;
UInt_16 received;
while ((received = Receive(&rAddress, &rPort, buffer, bufferSize)))
while ((received = udp.Receive(&rType, &rAddress, &rPort, buffer, bufferSize)))
{
Serializer<> payload(Endianness::LE, buffer, received);
@ -315,18 +228,18 @@ namespace ehs
Architecture rArch = payload.Read<Architecture>();
Str_8 rId = payload.ReadStr<Char_8, UInt_64>();
Endpoint* end = new Endpoint(this, header.disposition, rArch, rId, type, rAddress, rPort);
Endpoint* end = new Endpoint(this, header.disposition, rArch, rId, rType, rAddress, rPort);
end->SetStatus(Status::PENDING);
Serializer sPayload(Endianness::LE);
Version rVer = payload.ReadVersion();
if (rVer != ver)
if (rVer != version)
{
sPayload.WriteStr<Char_8, UInt_64>("Your Event Horizon Socket Layer version " +
Str_8::FromNum(rVer.major) + "." + Str_8::FromNum(rVer.minor) + "." + Str_8::FromNum(rVer.patch) +
" does not match remote endpoint version " +
Str_8::FromNum(ver.major) + "." + Str_8::FromNum(ver.minor) + "." + Str_8::FromNum(ver.patch) +
Str_8::FromNum(version.major) + "." + Str_8::FromNum(version.minor) + "." + Str_8::FromNum(version.patch) +
". Connection rejected.");
end->Send(false, true, false, internalSys, rejectedOp, sPayload);
@ -411,9 +324,9 @@ namespace ehs
continue;
Architecture arch = payload.Read<Architecture>();
Str_8 id = payload.ReadStr<Char_8, UInt_64>();
Str_8 endId = payload.ReadStr<Char_8, UInt_64>();
*end = Endpoint(this, header.disposition, arch, id, type, rAddress, rPort);
*end = Endpoint(this, header.disposition, arch, endId, rType, rAddress, rPort);
end->SetStatus(payload.Read<Status>());
end->SetQueueSlot(payload.Read<UInt_64>());
@ -569,45 +482,37 @@ namespace ehs
}
}
PollEndpoints(endpoints);
PollEndpoints();
}
bool Comms::IsInitialized() const
{
return hdl != EHS_INVALID_SOCKET;
return udp.IsValid();
}
void Comms::SetAddressType(const AddrType newType)
AddrType Comms::GetLocalAddressType() const
{
if (hdl != EHS_INVALID_SOCKET)
return;
type = newType;
return udp.GetLocalAddressType();
}
AddrType Comms::GetAddressType() const
Str_8 Comms::GetLocalAddress() const
{
return type;
return udp.GetLocalAddress();
}
Str_8 Comms::GetAddress() const
UInt_16 Comms::GetLocalPort() const
{
return address;
}
UInt_16 Comms::GetPort() const
{
return port;
return udp.GetLocalPort();
}
bool Comms::IsBound() const
{
return bound;
return udp.IsBound();
}
Version Comms::GetVersion() const
Version Comms::GetVersion()
{
return ver;
return version;
}
Version Comms::GetAppVersion() const
@ -640,21 +545,21 @@ namespace ehs
return hashId;
}
bool Comms::HasSystem(const UInt_64 hashId) const
bool Comms::HasSystem(const UInt_64 sysHashId) const
{
if (internalSys == hashId)
if (internalSys == sysHashId)
return true;
for (UInt_64 i = 0; i < systems.Size(); ++i)
if (systems[i]->GetHashId() == hashId)
if (systems[i]->GetHashId() == sysHashId)
return true;
return false;
}
bool Comms::HasSystem(const Str_8& id) const
bool Comms::HasSystem(const Str_8& sysId) const
{
return HasSystem(id.Hash_64());
return HasSystem(sysId.Hash_64());
}
bool Comms::AddSystem(CommsSystem* sys)
@ -667,131 +572,131 @@ namespace ehs
return true;
}
CommsSystem* Comms::GetSystem(const UInt_64 hashId)
CommsSystem* Comms::GetSystem(const UInt_64 sysHashId)
{
for (UInt_64 i = 0; i < systems.Size(); ++i)
if (systems[i]->GetHashId() == hashId)
if (systems[i]->GetHashId() == sysHashId)
return systems[i];
return nullptr;
}
CommsSystem* Comms::GetSystem(const Str_8& id)
CommsSystem* Comms::GetSystem(const Str_8& sysId)
{
return GetSystem(id.Hash_32());
return GetSystem(sysId.Hash_64());
}
bool Comms::HasEndpoint(const EndDisp disposition, const Status status, const UInt_64 hashId) const
bool Comms::HasEndpoint(const EndDisp endDisp, const Status endStatus, const UInt_64 endHashId) const
{
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
{
if (endpoints[i]->GetDisposition() != disposition)
if (endpoints[i]->GetDisposition() != endDisp)
continue;
if (endpoints[i]->GetStatus() != status)
if (endpoints[i]->GetStatus() != endStatus)
continue;
if (endpoints[i]->GetHashId() == hashId)
if (endpoints[i]->GetHashId() == endHashId)
return true;
}
return false;
}
bool Comms::HasEndpoint(const EndDisp disposition, const Status status, const Str_8& id) const
bool Comms::HasEndpoint(const EndDisp endDisp, const Status endStatus, const Str_8& endId) const
{
return HasEndpoint(disposition, status, id.Hash_32());
return HasEndpoint(endDisp, endStatus, endId.Hash_64());
}
bool Comms::HasEndpoint(const EndDisp disposition, const UInt_64 hashId) const
bool Comms::HasEndpoint(const EndDisp endDisp, const UInt_64 endHashId) const
{
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
{
if (endpoints[i]->GetDisposition() != disposition)
if (endpoints[i]->GetDisposition() != endDisp)
continue;
if (endpoints[i]->GetHashId() == hashId)
if (endpoints[i]->GetHashId() == endHashId)
return true;
}
return false;
}
bool Comms::HasEndpoint(const EndDisp disposition, const Str_8& id) const
bool Comms::HasEndpoint(const EndDisp endDisp, const Str_8& endId) const
{
return HasEndpoint(disposition, id.Hash_64());
return HasEndpoint(endDisp, endId.Hash_64());
}
bool Comms::HasEndpoint(const Str_8& address, const UInt_16 port) const
bool Comms::HasEndpoint(const Str_8& rAddress, const UInt_16 rPort) const
{
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
if (endpoints[i]->GetAddress() == address && endpoints[i]->GetPort() == port)
if (endpoints[i]->GetAddress() == rAddress && endpoints[i]->GetPort() == rPort)
return true;
return false;
}
Endpoint* Comms::GetEndpoint(const EndDisp disposition, const Status status, const UInt_64 hashId)
Endpoint* Comms::GetEndpoint(const EndDisp endDisp, const Status endStatus, const UInt_64 endHashId)
{
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
{
if (endpoints[i]->GetDisposition() != disposition)
if (endpoints[i]->GetDisposition() != endDisp)
continue;
if (endpoints[i]->GetStatus() != status)
if (endpoints[i]->GetStatus() != endStatus)
continue;
if (endpoints[i]->GetHashId() == hashId)
if (endpoints[i]->GetHashId() == endHashId)
return endpoints[i];
}
return nullptr;
}
Endpoint* Comms::GetEndpoint(const EndDisp disposition, const Status status, const Str_8& id)
Endpoint* Comms::GetEndpoint(const EndDisp endDisp, const Status endStatus, const Str_8& endId)
{
return GetEndpoint(disposition, status, id.Hash_32());
return GetEndpoint(endDisp, endStatus, endId.Hash_64());
}
Endpoint* Comms::GetEndpoint(const EndDisp disposition, const UInt_64 hashId)
Endpoint* Comms::GetEndpoint(const EndDisp endDisp, const UInt_64 endHashId)
{
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
{
if (endpoints[i]->GetDisposition() != disposition)
if (endpoints[i]->GetDisposition() != endDisp)
continue;
if (endpoints[i]->GetHashId() == hashId)
if (endpoints[i]->GetHashId() == endHashId)
return endpoints[i];
}
return nullptr;
}
Endpoint* Comms::GetEndpoint(const EndDisp disposition, const Str_8& id)
Endpoint* Comms::GetEndpoint(const EndDisp endDisp, const Str_8& endId)
{
return GetEndpoint(disposition, id.Hash_32());
return GetEndpoint(endDisp, endId.Hash_64());
}
Endpoint* Comms::GetEndpoint(const Str_8& address, const UInt_16 port)
Endpoint* Comms::GetEndpoint(const Str_8& rAddress, const UInt_16 rPort)
{
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
if (endpoints[i]->GetAddress() == address && endpoints[i]->GetPort() == port)
if (endpoints[i]->GetAddress() == rAddress && endpoints[i]->GetPort() == rPort)
return endpoints[i];
return nullptr;
}
Array<Endpoint*> Comms::GetEndpoints(const EndDisp disposition, const Status status)
Array<Endpoint*> Comms::GetEndpoints(const EndDisp endDisp, const Status endStatus)
{
Array<Endpoint*> result(endpoints.Size());
UInt_64 count = 0;
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
{
if (endpoints[i]->GetDisposition() != disposition)
if (endpoints[i]->GetDisposition() != endDisp)
continue;
if (endpoints[i]->GetStatus() == status)
if (endpoints[i]->GetStatus() == endStatus)
result[count++] = endpoints[i];
}
@ -800,13 +705,13 @@ namespace ehs
return result;
}
Array<Endpoint*> Comms::GetEndpoints(const EndDisp disposition)
Array<Endpoint*> Comms::GetEndpoints(const EndDisp endDisp)
{
Array<Endpoint*> result(endpoints.Size());
UInt_64 count = 0;
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
if (endpoints[i]->GetDisposition() == disposition)
if (endpoints[i]->GetDisposition() == endDisp)
result[count++] = endpoints[i];
result.Resize(count);
@ -814,28 +719,28 @@ namespace ehs
return result;
}
UInt_64 Comms::GetEndpointsCount(const EndDisp disposition, const Status status)
UInt_64 Comms::GetEndpointsCount(const EndDisp endDisp, const Status endStatus)
{
UInt_64 count = 0;
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
{
if (endpoints[i]->GetDisposition() != disposition)
if (endpoints[i]->GetDisposition() != endDisp)
continue;
if (endpoints[i]->GetStatus() == status)
if (endpoints[i]->GetStatus() == endStatus)
++count;
}
return count;
}
UInt_64 Comms::GetEndpointsCount(const EndDisp disposition)
UInt_64 Comms::GetEndpointsCount(const EndDisp endDisp)
{
UInt_64 count = 0;
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
if (endpoints[i]->GetDisposition() == disposition)
if (endpoints[i]->GetDisposition() == endDisp)
++count;
return count;
@ -848,37 +753,12 @@ namespace ehs
void Comms::SetBlocking(const bool blocking)
{
if (hdl == EHS_INVALID_SOCKET)
{
EHS_LOG_INT("Error", 0, "Attempted to toggle blocking while socket is not initialized.");
return;
}
#if defined(EHS_OS_WINDOWS)
u_long r = (u_long)!blocking;
int result = ioctlsocket(hdl, FIONBIO, &r);
if (result != NO_ERROR)
EHS_LOG_INT("Error", 1, "Failed to toggle non-blocking mode with error #" + Str_8::FromNum(result) + ".");
#elif defined(EHS_OS_LINUX)
if (fcntl(hdl, F_SETFL, O_NONBLOCK, blocking) == -1)
EHS_LOG_INT("Error", 1, "Failed to toggle non-blocking mode with error #" + Str_8::FromNum(errno) + ".");
#endif
udp.SetBlocking(blocking);
}
bool Comms::IsBlocking() const
{
#if defined(EHS_OS_WINDOWS)
u_long r = 0;
if (ioctlsocket(hdl, FIONREAD, &r) == SOCKET_ERROR)
EHS_LOG_INT("Error", 0, "Failed to retrieve socket info.");
return (bool)r;
#elif defined(EHS_OS_LINUX)
return (bool)fcntl(hdl, F_GETFL, O_NONBLOCK);
#else
return true;
#endif
return udp.IsBlocking();
}
void Comms::SetMaxTimeout(const float seconds)
@ -966,14 +846,14 @@ namespace ehs
UpdateQueue(GetEndpointsCount(EndDisp::ENDPOINT, Status::ACTIVE));
}
bool Comms::RemoveEndpoint(const EndDisp disposition, const UInt_64 hashId)
bool Comms::RemoveEndpoint(const EndDisp endDisp, const UInt_64 endHashId)
{
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
{
if (endpoints[i]->GetDisposition() != disposition)
if (endpoints[i]->GetDisposition() != endDisp)
continue;
if (endpoints[i]->GetHashId() == hashId)
if (endpoints[i]->GetHashId() == endHashId)
{
delete endpoints[i];
@ -989,11 +869,11 @@ namespace ehs
return false;
}
bool Comms::RemoveEndpoint(const Str_8& address, const UInt_16 port)
bool Comms::RemoveEndpoint(const Str_8& rAddress, const UInt_16 rPort)
{
for (UInt_64 i = 0; i < endpoints.Size(); ++i)
{
if (endpoints[i]->GetAddress() == address && endpoints[i]->GetPort() == port)
if (endpoints[i]->GetAddress() == rAddress && endpoints[i]->GetPort() == rPort)
{
delete endpoints[i];
@ -1029,7 +909,7 @@ namespace ehs
return false;
}
void Comms::PollEndpoints(Vector<Endpoint*>& endpoints)
void Comms::PollEndpoints()
{
UInt_64 i = 0;
while (i < endpoints.Size())
@ -1103,211 +983,4 @@ namespace ehs
++i;
}
}
void Comms::Bind_v6(const Str_8& address, const UInt_16 port)
{
Int_32 code = 0;
sockaddr_in6 result = {};
result.sin6_family = AF_INET6;
result.sin6_port = htons(port);
if (address.Size())
{
Int_32 code = inet_pton(AF_INET6, address, &result.sin6_addr);
if (!code)
{
EHS_LOG_INT("Error", 0, "The given address, \"" + address + "\" is not valid.");
return;
}
else if (code == -1)
{
Int_32 dCode = 0;
#if defined(EHS_OS_WINDOWS)
dCode = WSAGetLastError();
#elif defined(EHS_OS_LINUX)
dCode = errno;
#endif
EHS_LOG_INT("Error", 1, "Failed to convert address with error #" + Str_8::FromNum(dCode) + ".");
return;
}
}
else
{
result.sin6_addr = in6addr_any;
}
code = bind(hdl, (sockaddr*)&result, sizeof(sockaddr_in6));
#if defined(EHS_OS_WINDOWS)
if (code == SOCKET_ERROR)
{
EHS_LOG_INT("Error", 2, "Failed to bind socket with error #" + Str_8::FromNum(WSAGetLastError()) + ".");
return;
}
#elif defined(EHS_OS_LINUX)
if (code == -1)
{
EHS_LOG_INT("Error", 2, "Failed to bind socket with error #" + Str_8::FromNum(errno) + ".");
return;
}
#endif
}
void Comms::Bind_v4(const Str_8& address, const UInt_16 port)
{
Int_32 code = 0;
sockaddr_in result = {};
result.sin_family = AF_INET;
result.sin_port = htons(port);
if (address.Size())
{
code = inet_pton(AF_INET, address, &result.sin_addr);
if (!code)
{
EHS_LOG_INT("Error", 0, "The given address, \"" + address + "\" is not valid.");
return;
}
else if (code == -1)
{
Int_32 dCode = 0;
#if defined(EHS_OS_WINDOWS)
dCode = WSAGetLastError();
#elif defined(EHS_OS_LINUX)
dCode = errno;
#endif
EHS_LOG_INT("Error", 1, "Failed to convert address with error #" + Str_8::FromNum(dCode) + ".");
return;
}
}
else
{
result.sin_addr.s_addr = htonl(INADDR_ANY);
}
code = bind(hdl, (sockaddr*)&result, sizeof(sockaddr_in));
#if defined(EHS_OS_WINDOWS)
if (code == SOCKET_ERROR)
{
EHS_LOG_INT("Error", 2, "Failed to bind socket with error #" + Str_8::FromNum(WSAGetLastError()) + ".");
return;
}
#elif defined(EHS_OS_LINUX)
if (code == -1)
{
EHS_LOG_INT("Error", 2, "Failed to bind socket with error #" + Str_8::FromNum(errno) + ".");
return;
}
#endif
}
UInt_16 Comms::Receive(Str_8* address, UInt_16* port, Byte* const data, const UInt_16 size)
{
if (hdl == EHS_INVALID_SOCKET)
{
EHS_LOG_INT("Error", 0, "Attempted to receive while socket is not initialized.");
return 0;
}
if (type == AddrType::IPV4 && size > EHS_IPV4_UDP_PAYLOAD)
{
EHS_LOG_INT("Error", 1, "Attempted to receive with a buffer size of, \"" + Str_8::FromNum(size)
+ "\", that exceeds, \"" + Str_8::FromNum(EHS_IPV4_UDP_PAYLOAD) + ".");
}
sockaddr_in6 remote = {};
UInt_32 addrLen = type == AddrType::IPV6 ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
SInt_64 received = 0;
#if defined(EHS_OS_WINDOWS)
received = recvfrom(hdl, (char*)data, (int)size, 0, (sockaddr*)&remote, (int*)&addrLen);
if (received == SOCKET_ERROR)
{
int code = WSAGetLastError();
if (code == WSAEMSGSIZE)
{
UnInitialize();
EHS_LOG_INT("Error", 2, "The buffer size, \"" + Str_8::FromNum(size) + "\" is too small.");
}
else if (code != WSAECONNRESET && code != WSAEWOULDBLOCK)
{
UnInitialize();
EHS_LOG_INT("Error", 3, "Failed to receive with error #" + Str_8::FromNum(code) + ".");
}
return 0;
}
#elif defined(EHS_OS_LINUX)
received = recvfrom(hdl, data, size, 0, (sockaddr*)&remote, &addrLen);
if (received == -1)
{
int code = errno;
if (code != ECONNRESET && code != EWOULDBLOCK)
{
UnInitialize();
EHS_LOG_INT("Error", 2, "Failed to receive with error #" + Str_8::FromNum(code) + ".");
}
return 0;
}
#endif
if (addrLen == sizeof(sockaddr_in6))
{
char tmpAddr[INET6_ADDRSTRLEN];
if (!inet_ntop(remote.sin6_family, &remote.sin6_addr, tmpAddr, INET6_ADDRSTRLEN))
{
Int_32 code = 0;
#if defined(EHS_OS_WINDOWS)
code = WSAGetLastError();
#elif defined(EHS_OS_LINUX)
code = errno;
#endif
EHS_LOG_INT("Error", 2, "Failed to convert IPv6 address with error #" + Str_8::FromNum(code) + ".");
return received;
}
*address = tmpAddr;
*port = ntohs(remote.sin6_port);
}
else if (addrLen == sizeof(sockaddr_in))
{
char tmpAddr[INET_ADDRSTRLEN];
if (!inet_ntop(((sockaddr_in*)&remote)->sin_family, &((sockaddr_in*)&remote)->sin_addr, tmpAddr, INET_ADDRSTRLEN))
{
Int_32 code = 0;
#if defined(EHS_OS_WINDOWS)
code = WSAGetLastError();
#elif defined(EHS_OS_LINUX)
code = errno;
#endif
EHS_LOG_INT("Error", 2, "Failed to convert IPv4 address with error #" + Str_8::FromNum(code) + ".");
return (UInt_16)received;
}
*address = tmpAddr;
*port = ntohs(((sockaddr_in*)&remote)->sin_port);
}
return (UInt_16)received;
}
}

View File

@ -3,51 +3,38 @@
#include "Encryption.h"
#include "system/CPU.h"
#if defined(EHS_OS_WINDOWS)
#include <WinSock2.h>
#include <WS2tcpip.h>
#elif defined(EHS_OS_LINUX)
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <cerrno>
#endif
namespace ehs
{
Endpoint::Endpoint()
: owner(nullptr), disposition(EndDisp::UNKNOWN), status(Status::PENDING), arch(Architecture::UNKNOWN),
hashId(0), nextSendId(0), nextRecvId(0), port(0), deltaDuration(0.0f), deltaRate(1.0f / 60.0f),
timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), queueSlot(0)
hashId(0), nextSendId(0), nextRecvId(0), type(AddrType::IPV6), port(0), deltaDuration(0.0f),
deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), queueSlot(0)
{
AddType("Endpoint");
}
Endpoint::Endpoint(Comms* owner, const EndDisp disposition, const Architecture arch, const Str_8& id,
const AddrType& type, const Str_8& address, const UInt_16 port)
: owner(owner), disposition(disposition), status(Status::ACTIVE), arch(arch), id(id), hashId(id.Hash_32()), nextSendId(0),
nextRecvId(0), address(address), port(port), deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f),
lastPing(0.0f), oldLatency(0.0f), latency(0.0f), queueSlot(0)
Endpoint::Endpoint(Comms* owner, const EndDisp disposition, const Architecture arch, Str_8 id,
const AddrType type, Str_8 address, const UInt_16 port)
: owner(owner), disposition(disposition), status(Status::ACTIVE), arch(arch), hashId(id.Hash_64()),
id((Str_8&&)id), nextSendId(0), nextRecvId(0), type(type), address((Str_8&&)address), port(port),
deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f),
queueSlot(0)
{
AddType("Endpoint");
}
Endpoint::Endpoint(Comms* owner, const AddrType& type, const Str_8& address, const UInt_16 port)
Endpoint::Endpoint(Comms* owner, const AddrType type, Str_8 address, const UInt_16 port)
: owner(owner), disposition(EndDisp::UNKNOWN), status(Status::PENDING), arch(Architecture::UNKNOWN), hashId(0),
nextSendId(0), nextRecvId(0), address(address), port(port), deltaDuration(0.0f), deltaRate(1.0f / 60.0f),
timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), queueSlot(0)
nextSendId(0), nextRecvId(0), type(type), address((Str_8&&)address), port(port), deltaDuration(0.0f),
deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), queueSlot(0)
{
AddType("Endpoint");
}
Endpoint::Endpoint(const Endpoint& end)
: BaseObj(end), owner(nullptr), disposition(end.disposition), status(Status::PENDING), arch(end.arch),
id(end.id), hashId(end.hashId), nextSendId(0), nextRecvId(0), address(end.address), port(end.port),
deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f),
hashId(end.hashId), id(end.id), nextSendId(0), nextRecvId(0), type(end.type), address(end.address),
port(end.port), deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f),
oldLatency(0.0f), latency(0.0f), queueSlot(0)
{
}
@ -64,12 +51,13 @@ namespace ehs
disposition = end.disposition;
status = end.status;
arch = end.arch;
id = end.id;
hashId = end.hashId;
id = end.id;
nextSendId = end.nextSendId;
sent = end.sent;
nextRecvId = end.nextRecvId;
received = end.received;
type = end.type;
address = end.address;
port = end.port;
deltaDuration = end.deltaDuration;
@ -86,12 +74,13 @@ namespace ehs
disposition = end.disposition;
status = Status::PENDING;
arch = end.arch;
id = end.id;
hashId = end.hashId;
id = end.id;
nextSendId = 0;
sent = Vector<Insurance>();
nextRecvId = 0;
received = Vector<Fragments>();
type = end.type;
address = end.address;
port = end.port;
deltaDuration = 0.0f;
@ -131,10 +120,7 @@ namespace ehs
if (sent[i].header.encrypted)
Encryption::Encrypt_64(result.Size() - sizeof(bool), &result[sizeof(bool)]);
if (owner->GetAddressType() == AddrType::IPV6)
Send_v6(result);
else if (owner->GetAddressType() == AddrType::IPV4)
Send_v4(result);
owner->udp.Send(type, address, port, result, result.Size());
sent[i].lastResend = Math::Mod(sent[i].lastResend, owner->GetResendRate());
}
@ -202,7 +188,7 @@ namespace ehs
op
};
if ((owner->GetAddressType() == AddrType::IPV6 && payload.Size() > COMMS_IPV6_PAYLOAD) || (owner->GetAddressType() == AddrType::IPV4 && payload.Size() > COMMS_IPV4_PAYLOAD))
if ((owner->GetLocalAddressType() == AddrType::IPV6 && payload.Size() > COMMS_IPV6_PAYLOAD) || (owner->GetLocalAddressType() == AddrType::IPV4 && payload.Size() > COMMS_IPV4_PAYLOAD))
{
Fragments frags = FragmentData(header, payload);
for (UInt_64 i = 0; i < frags.Size(); ++i)
@ -368,7 +354,7 @@ namespace ehs
{
Fragments result;
if (owner->GetAddressType() == AddrType::IPV6)
if (owner->GetLocalAddressType() == AddrType::IPV6)
{
UInt_64 frags = data.Size() / COMMS_IPV6_PAYLOAD;
if (data.Size() % COMMS_IPV6_PAYLOAD)
@ -387,7 +373,7 @@ namespace ehs
result[i] = {data.GetEndianness(), &data[i * COMMS_IPV6_PAYLOAD], size};
}
}
else if (owner->GetAddressType() == AddrType::IPV4)
else if (owner->GetLocalAddressType() == AddrType::IPV4)
{
UInt_64 frags = data.Size() / COMMS_IPV4_PAYLOAD;
if (data.Size() % COMMS_IPV4_PAYLOAD)
@ -410,7 +396,7 @@ namespace ehs
return result;
}
void Endpoint::Send(const Header& header, const Serializer<>& payload)
void Endpoint::Send(const Header& header, const Serializer<UInt_64>& payload)
{
Serializer result(Endianness::LE);
result.Write(header);
@ -422,10 +408,7 @@ namespace ehs
if (header.ensure)
sent.Push({header, payload});
if (owner->GetAddressType() == AddrType::IPV6)
Send_v6(result);
else if (owner->GetAddressType() == AddrType::IPV4)
Send_v4(result);
owner->udp.Send(type, address, port, result, result.Size());
}
bool Endpoint::SortingNeeded() const
@ -481,142 +464,4 @@ namespace ehs
received = sorted;
}
UInt_16 Endpoint::Send_v6(const Serializer<>& payload)
{
if (!owner)
{
EHS_LOG_INT("Info", 0, "Attempted to send while socket is not initialized.");
return 0;
}
if (payload.Size() > EHS_IPV6_UDP_PAYLOAD)
{
EHS_LOG_INT("Info", 1, "Attempted to send a packet with the size, \"" + Str_8::FromNum(payload.Size())
+ "\", that exceeds the max payload of, \"" + Str_8::FromNum(EHS_IPV6_UDP_PAYLOAD) + "\".");
return 0;
}
sockaddr_in6 result = {};
result.sin6_family = AF_INET6;
result.sin6_port = htons(port);
Int_32 code = inet_pton(AF_INET6, address, &result.sin6_addr);
if (!code)
{
EHS_LOG_INT("Error", 2, "The given address, \"" + address + "\" is not valid.");
return 0;
}
else if (code == -1)
{
Int_32 dCode = 0;
#if defined(EHS_OS_WINDOWS)
dCode = WSAGetLastError();
#elif defined(EHS_OS_LINUX)
dCode = errno;
#endif
EHS_LOG_INT("Error", 3, "Failed to convert address with error #" + Str_8::FromNum(dCode) + ".");
return 0;
}
Int_32 sent = sendto(owner->hdl, (char*)&payload[0], (int)payload.Size(), 0, (sockaddr*)&result, sizeof(sockaddr_in6));
#if defined(EHS_OS_WINDOWS)
if (sent == SOCKET_ERROR)
#elif defined(EHS_OS_LINUX)
if (sent == -1)
#endif
{
Int_32 dCode = 0;
#if defined(EHS_OS_WINDOWS)
code = WSAGetLastError();
#elif defined(EHS_OS_LINUX)
code = errno;
#endif
EHS_LOG_INT("Error", 4, "Failed to send with error #" + Str_8::FromNum(dCode) + ".");
owner->UnInitialize();
return 0;
}
return (UInt_16)sent;
}
UInt_16 Endpoint::Send_v4(const Serializer<>& payload)
{
if (!owner)
{
EHS_LOG_INT("Info", 0, "Attempted to send while socket is not initialized.");
return 0;
}
if (payload.Size() > EHS_IPV4_UDP_PAYLOAD)
{
EHS_LOG_INT("Info", 1, "Attempted to send a packet with the size, \"" + Str_8::FromNum(payload.Size())
+ "\", that exceeds the max payload of, \"" + Str_8::FromNum(EHS_IPV4_UDP_PAYLOAD) + "\".");
return 0;
}
sockaddr_in result = {};
result.sin_family = AF_INET;
result.sin_port = htons(port);
int code = inet_pton(AF_INET, address, &result.sin_addr);
if (!code)
{
EHS_LOG_INT("Error", 2, "The given address, \"" + address + "\" is not valid.");
return 0;
}
else if (code == -1)
{
#if defined(EHS_OS_WINDOWS)
Int_32 dCode = WSAGetLastError();
#elif defined(EHS_OS_LINUX)
Int_32 dCode = errno;
#else
Int_32 dCode = 0;
#endif
EHS_LOG_INT("Error", 2, "Failed to convert address with error #" + Str_8::FromNum(dCode) + ".");
return 0;
}
SInt_64 sent = sendto(owner->hdl, (char*)&payload[0], (int)payload.Size(), 0, (sockaddr*)&result, sizeof(sockaddr_in));
#if defined(EHS_OS_WINDOWS)
if (sent == SOCKET_ERROR)
#elif defined(EHS_OS_LINUX)
if (sent == -1)
#endif
{
#if defined(EHS_OS_WINDOWS)
Int_32 dCode = WSAGetLastError();
if (dCode != WSAEWOULDBLOCK)
{
EHS_LOG_INT("Error", 3, "Failed to send with error #" + Str_8::FromNum(dCode) + ".");
((Comms*)GetParent())->UnInitialize();
}
#elif defined(EHS_OS_LINUX)
Int_32 dCode = errno;
if (dCode != EWOULDBLOCK)
{
EHS_LOG_INT("Error", 3, "Failed to send with error #" + Str_8::FromNum(dCode) + ".");
owner->UnInitialize();
}
#else
Int_32 dCode = 0;
#endif
return 0;
}
return (UInt_16)sent;
}
}

View File

@ -3,9 +3,7 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <cerrno>
@ -98,14 +96,14 @@ namespace ehs
bound = false;
}
void UDP::Bind(const Str_8& address, const UInt_16 port)
void UDP::Bind(const AddrType type, const Str_8& address, const UInt_16 port)
{
if (!IsValid() || bound)
return;
if (addrType == AddrType::IPV6)
if (type == AddrType::IPV6)
Bind_v6(address, port);
else if (addrType == AddrType::IPV4)
else if (type == AddrType::IPV4)
Bind_v4(address, port);
this->address = address;
@ -114,17 +112,17 @@ namespace ehs
bound = true;
}
UInt_64 UDP::Send(const Str_8& address, const UInt_16 port, const Byte *const data, const UInt_64 size)
UInt_64 UDP::Send(const AddrType type, const Str_8& address, const UInt_16 port, const Byte *const data, const UInt_64 size)
{
if (addrType == AddrType::IPV6)
if (type == AddrType::IPV6)
return Send_v6(address, port, data, size);
else if (addrType == AddrType::IPV4)
else if (type == AddrType::IPV4)
return Send_v4(address, port, data, size);
return 0;
}
UInt_64 UDP::Receive(Str_8* const address, UInt_16* const port, Byte* const data, const UInt_64 size)
UInt_64 UDP::Receive(AddrType* const type, Str_8* const address, UInt_16* const port, Byte* const data, const UInt_64 size)
{
if (!IsValid())
{
@ -133,8 +131,8 @@ namespace ehs
}
sockaddr_in6 remote = {};
UInt_32 addrLen = sizeof(sockaddr_in);
SInt_64 received = 0;
UInt_32 addrLen = sizeof(sockaddr_in6);
SInt_64 received;
received = recvfrom(hdl, data, size, 0, (sockaddr*)&remote, &addrLen);
if (received == -1)
@ -163,6 +161,7 @@ namespace ehs
return received;
}
*type = AddrType::IPV6;
*address = tmpAddr;
*port = ntohs(remote.sin6_port);
}
@ -179,6 +178,7 @@ namespace ehs
return received;
}
*type = AddrType::IPV4;
*address = tmpAddr;
*port = ntohs(((sockaddr_in*)&remote)->sin_port);
}
@ -208,7 +208,7 @@ namespace ehs
return hdl != EHS_INVALID_SOCKET;
}
void UDP::Bind_v6(const Str_8& address, const UInt_16 port)
void UDP::Bind_v6(const Str_8& address, const UInt_16 port) const
{
sockaddr_in6 result = {};
result.sin6_family = AF_INET6;
@ -243,7 +243,7 @@ namespace ehs
}
}
void UDP::Bind_v4(const Str_8& address, const UInt_16 port)
void UDP::Bind_v4(const Str_8& address, const UInt_16 port) const
{
sockaddr_in result = {};
result.sin_family = AF_INET;
@ -278,7 +278,7 @@ namespace ehs
}
}
UInt_64 UDP::Send_v6(const Str_8& addr, const UInt_16 port, const Byte* const data, const UInt_64 size)
UInt_64 UDP::Send_v6(const Str_8& address, const UInt_16 port, const Byte* const data, const UInt_64 size)
{
if (!IsValid())
{
@ -304,7 +304,7 @@ namespace ehs
return 0;
}
Int_32 sent = sendto(hdl, (char*)&data[0], (int)size, 0, (sockaddr*)&result, sizeof(sockaddr_in6));
SInt_64 sent = sendto(hdl, (char*)&data[0], (int)size, 0, (sockaddr*)&result, sizeof(sockaddr_in6));
if (sent == -1)
{
Int_32 dCode = errno;
@ -319,7 +319,7 @@ namespace ehs
return sent;
}
UInt_64 UDP::Send_v4(const Str_8& addr, const UInt_16 port, const Byte* const data, const UInt_64 size)
UInt_64 UDP::Send_v4(const Str_8& address, const UInt_16 port, const Byte* const data, const UInt_64 size)
{
if (!IsValid())
{
@ -345,7 +345,7 @@ namespace ehs
return 0;
}
int sent = sendto(hdl, (char*)&data[0], (int)size, 0, (sockaddr*)&result, sizeof(sockaddr_in));
SInt_64 sent = sendto(hdl, (char*)&data[0], (int)size, 0, (sockaddr*)&result, sizeof(sockaddr_in));
if (sent == -1)
{
Int_32 dCode = errno;
@ -359,4 +359,4 @@ namespace ehs
return sent;
}
};
}