From 1feff0a25c133b7101ff8032a09b34e23388daf1 Mon Sep 17 00:00:00 2001 From: Karutoh Date: Sun, 6 Oct 2024 20:08:22 -0700 Subject: [PATCH] Moved the Event Horizon Communications to this project. --- CMakeLists.txt | 15 +- include/ehs/io/socket/EHC.h | 216 +++++ include/ehs/io/socket/ehc/Fragments.h | 43 + include/ehs/io/socket/ehc/NetEnd.h | 134 +++ include/ehs/io/socket/ehc/NetOp.h | 42 + include/ehs/io/socket/ehc/NetSystem.h | 48 + include/ehs/io/socket/ehc/Utils.h | 54 ++ src/io/socket/EHC.cpp | 1154 +++++++++++++++++++++++++ src/io/socket/ehc/Fragments.cpp | 122 +++ src/io/socket/ehc/NetEnd.cpp | 512 +++++++++++ src/io/socket/ehc/NetOp.cpp | 66 ++ src/io/socket/ehc/NetSystem.cpp | 105 +++ 12 files changed, 2507 insertions(+), 4 deletions(-) create mode 100644 include/ehs/io/socket/EHC.h create mode 100644 include/ehs/io/socket/ehc/Fragments.h create mode 100644 include/ehs/io/socket/ehc/NetEnd.h create mode 100644 include/ehs/io/socket/ehc/NetOp.h create mode 100644 include/ehs/io/socket/ehc/NetSystem.h create mode 100644 include/ehs/io/socket/ehc/Utils.h create mode 100644 src/io/socket/EHC.cpp create mode 100644 src/io/socket/ehc/Fragments.cpp create mode 100644 src/io/socket/ehc/NetEnd.cpp create mode 100644 src/io/socket/ehc/NetOp.cpp create mode 100644 src/io/socket/ehc/NetSystem.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index e38ee59..9f556d5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -121,20 +121,27 @@ set(EHS_SOURCES include/ehs/io/FileMonitor.h include/ehs/io/Window.h + include/ehs/io/socket/Socket.h src/io/socket/Request.cpp include/ehs/io/socket/Request.h src/io/socket/Response.cpp include/ehs/io/socket/Response.h src/io/socket/BaseDNS.cpp include/ehs/io/socket/BaseDNS.h + include/ehs/io/socket/DNS.h src/io/socket/BaseUDP.cpp include/ehs/io/socket/BaseUDP.h + include/ehs/io/socket/UDP.h src/io/socket/BaseTCP.cpp include/ehs/io/socket/BaseTCP.h + include/ehs/io/socket/TCP.h src/io/socket/SSL.cpp include/ehs/io/socket/SSL.h + include/ehs/io/socket/ehc/Utils.h + src/io/socket/EHC.cpp include/ehs/io/socket/EHC.h + src/io/socket/ehc/Fragments.cpp include/ehs/io/socket/ehc/Fragments.h + src/io/socket/ehc/NetEnd.cpp include/ehs/io/socket/ehc/NetEnd.h + src/io/socket/ehc/NetSystem.cpp include/ehs/io/socket/ehc/NetSystem.h + src/io/socket/ehc/NetOp.cpp include/ehs/io/socket/ehc/NetOp.h + src/io/socket/rest/Twitch.cpp include/ehs/io/socket/rest/Twitch.h src/io/socket/rest/TwitchChat.cpp include/ehs/io/socket/rest/TwitchChat.h src/io/socket/rest/Spotify.cpp include/ehs/io/socket/rest/Spotify.h - include/ehs/io/socket/Socket.h - include/ehs/io/socket/TCP.h - include/ehs/io/socket/UDP.h - include/ehs/io/socket/DNS.h src/io/audio/Audio.cpp include/ehs/io/audio/Audio.h src/io/audio/BaseAudioDevice.cpp include/ehs/io/audio/BaseAudioDevice.h diff --git a/include/ehs/io/socket/EHC.h b/include/ehs/io/socket/EHC.h new file mode 100644 index 0000000..3a81050 --- /dev/null +++ b/include/ehs/io/socket/EHC.h @@ -0,0 +1,216 @@ +#pragma once + +#include "ehs/io/socket/ehc/Utils.h" + +#include "ehs/Serializer.h" +#include "ehs/Vector.h" +#include "ehs/Array.h" +#include "Socket.h" +#include "UDP.h" + +namespace ehs +{ + class NetSystem; + class NetEnd; + class EHC; + + typedef bool (*ConnectCb)(EHC *, NetEnd **, Serializer); + typedef void (*ConnectedCb)(EHC *, NetEnd *); + typedef void (*ActiveCb)(EHC *, NetEnd *); + typedef void (*DisconnectCb)(EHC *, NetEnd *, Serializer); + typedef void (*DisconnectedCb)(EHC *, NetEnd *); + typedef void (*TimeoutCb)(EHC *, NetEnd *); + + class EHC + { + private: + friend class NetEnd; + + static const Version version; + static const UInt_64 internalSys; + static const UInt_64 connectOp; + static const UInt_64 connectedOp; + static const UInt_64 rejectedOp; + static const UInt_64 disconnectOp; + static const UInt_64 disconnectedOp; + static const UInt_64 statusUpdateOp; + static const UInt_64 pingOp; + static const UInt_64 pongOp; + static const UInt_64 latencyOp; + static const UInt_64 receivedOp; + + UDP udp; + Version appVer; + EndDisp disposition; + UInt_64 hashName; + Str_8 name; + bool dropPackets; + Byte* buffer; + UInt_32 bufferSize; + Array systems; + UInt_32 maxEndpoints; + UInt_64 lastTSC; + float delta; + float maxTimeout; + float resendRate; + ConnectCb connectCb; + ConnectedCb connectedCb; + ActiveCb activeCb; + DisconnectCb disconnectCb; + DisconnectedCb disconnectedCb; + TimeoutCb timeoutCb; + + protected: + Vector endpoints; + + public: + ~EHC(); + + EHC(); + + EHC(const Version &ver, Str_8 name, UInt_64 maxEndpoints); + + EHC(const Version &ver, Str_8 name); + + EHC(EHC &&sock) noexcept; + + EHC(const EHC &sock); + + EHC &operator=(EHC&& sock) noexcept; + + EHC &operator=(const EHC &sock); + + void Initialize(); + + void Release(); + + void Bind(AddrType newType, const Str_8& newAddress, UInt_16 newPort); + + void Connect(AddrType rType, const Str_8& rAddress, UInt_16 rPort, Serializer data); + + bool Disconnect(EndDisp endDisp, const Char_8 token[64], const Str_8& msg); + + void Broadcast(EndDisp endDisp, Status endStatus, bool deltaLocked, bool encrypted, + bool ensure, UInt_64 sysHashId, UInt_64 opHashId, + const Serializer<>& payload); + + void Broadcast(EndDisp endDisp, Status endStatus, bool deltaLocked, bool encrypted, + bool ensure, const Str_8& sysId, const Str_8& opId, + const Serializer<>& payload); + + void Poll(); + + bool IsInitialized() const; + + AddrType GetLocalAddressType() const; + + Str_8 GetLocalAddress() const; + + UInt_16 GetLocalPort() const; + + bool IsBound() const; + + static Version GetVersion(); + + Version GetAppVersion() const; + + EndDisp GetDisposition() const; + + UInt_64 GetHashName() const; + + Str_8 GetName() const; + + void EnableDropPackets(bool enable); + + bool IsDropPacketsEnabled() const; + + bool HasSystem(UInt_64 sysHashId) const; + + bool HasSystem(const Str_8& sysId) const; + + bool AddSystem(NetSystem *sys); + + NetSystem* GetSystem(UInt_64 sysHashId) const; + + NetSystem* GetSystem(const Str_8& sysId) const; + + bool HasEndpoint(EndDisp endDisp, Status endStatus, const Char_8 token[64]) const; + + bool HasEndpoint(EndDisp endDisp, Status endStatus, UInt_64 hashId) const; + + bool HasEndpoint(EndDisp endDisp, Status endStatus, const Str_8 &id) const; + + bool HasEndpoint(EndDisp endDisp, const Char_8 token[64]) const; + + bool HasEndpoint(EndDisp endDisp, UInt_64 hashId) const; + + bool HasEndpoint(EndDisp endDisp, const Str_8 &id) const; + + bool HasEndpoint(const Char_8 token[64]) const; + + bool HasEndpoint(UInt_64 hashId) const; + + bool HasEndpoint(const Str_8 &id) const; + + bool HasEndpoint(const Str_8& rAddress, UInt_16 rPort) const; + + NetEnd *GetEndpoint(EndDisp endDisp, Status endStatus, const Char_8 token[64]) const; + + NetEnd *GetEndpoint(EndDisp endDisp, Status endStatus, UInt_64 hashId) const; + + NetEnd *GetEndpoint(EndDisp endDisp, Status endStatus, const Str_8 &id) const; + + NetEnd *GetEndpoint(EndDisp endDisp, const Char_8 token[64]) const; + + NetEnd *GetEndpoint(EndDisp endDisp, UInt_64 hashId) const; + + NetEnd *GetEndpoint(EndDisp endDisp, const Str_8 &id) const; + + NetEnd *GetEndpoint(const Str_8& rAddress, UInt_16 rPort) const; + + Array GetEndpoints(EndDisp endDisp, Status endStatus); + + Array GetEndpoints(EndDisp endDisp); + + UInt_64 GetEndpointsCount(EndDisp endDisp, Status endStatus); + + UInt_64 GetEndpointsCount(EndDisp endDisp); + + UInt_64 GetMaxEndpoints() const; + + void SetMaxTimeout(float seconds); + + float GetMaxTimeout() const; + + void SetResendRate(float seconds); + + float GetResendRate() const; + + void SetConnectCb(ConnectCb cb); + + void SetConnectedCb(ConnectedCb cb); + + void SetActiveCb(ActiveCb cb); + + void SetDisconnectCb(DisconnectCb cb); + + void SetDisconnectedCb(DisconnectedCb cb); + + void SetTimeoutCb(TimeoutCb cb); + + private: + void GenerateToken(Char_8 in[64]); + + void UpdateQueue(UInt_64 active); + + void UpdateQueue(); + + bool RemoveEndpoint(EndDisp disposition, const Char_8 token[64]); + + bool RemoveEndpoint(const Str_8& address, UInt_16 port); + + bool RemoveEndpoint(const NetEnd* end); + + void PollEndpoints(); + }; +} \ No newline at end of file diff --git a/include/ehs/io/socket/ehc/Fragments.h b/include/ehs/io/socket/ehc/Fragments.h new file mode 100644 index 0000000..6182e60 --- /dev/null +++ b/include/ehs/io/socket/ehc/Fragments.h @@ -0,0 +1,43 @@ +#pragma once + +#include "Utils.h" + +#include + +namespace ehs +{ + class Fragments + { + private: + Header header; + Serializer* data; + UInt_64 size; + + public: + ~Fragments(); + + Fragments(); + + Fragments(const Header &header, const Serializer &payload); + + Fragments(const Header &header, UInt_64 size); + + Fragments(Fragments &&frags) noexcept; + + Fragments(const Fragments &frags); + + Fragments &operator=(Fragments &&frags) noexcept; + + Fragments &operator=(const Fragments &frags); + + operator Serializer *() const; + + Header GetHeader() const; + + UInt_64 Size() const; + + bool IsComplete() const; + + Packet Combine() const; + }; +} diff --git a/include/ehs/io/socket/ehc/NetEnd.h b/include/ehs/io/socket/ehc/NetEnd.h new file mode 100644 index 0000000..0af1fdd --- /dev/null +++ b/include/ehs/io/socket/ehc/NetEnd.h @@ -0,0 +1,134 @@ +#pragma once + +#include "Utils.h" +#include "Fragments.h" + +#include +#include +#include +#include + +namespace ehs +{ + class EHC; + + class NetEnd + { + private: + friend class EHC; + + EHC* owner; + EndDisp disposition; + UInt_64 hashName; + Str_8 name; + Status status; + Architecture arch; + Char_8 token[64]; + UInt_64 nextSendId; + Vector sent; + UInt_64 nextRecvId; + Vector received; + AddrType type; + Str_8 address; + UInt_16 port; + float deltaDuration; + float deltaRate; + float timeout; + float lastPing; + float oldLatency; + float latency; + UInt_64 queueSlot; + + public: + NetEnd(); + + NetEnd(EndDisp disposition, Str_8 id, Architecture arch, AddrType type, Str_8 address, UInt_16 port); + + NetEnd(AddrType type, Str_8 address, UInt_16 port); + + NetEnd(NetEnd &&end) noexcept; + + NetEnd(const NetEnd &end); + + NetEnd &operator=(NetEnd &&end) noexcept; + + NetEnd &operator=(const NetEnd &end); + + EndDisp GetDisposition() const; + + UInt_64 GetHashName() const; + + Str_8 GetName() const; + + Status GetStatus() const; + + Architecture GetArchitecture() const; + + UInt_64 GetNextSendId() const; + + /// Sends data to the remote endpoint. + /// @param [in] deltaLocked Whether or not to match the remote endpoint's delta time to prevent overloading the client. This will drop data if delta time does not match. + /// @param [in] encrypted Whether or not to encrypt this data before sending to the remote endpoint. + /// @param [in] ensure Whether or not to ensure the data was received by the remote endpoint. + /// @param [in] sys The system hash id to execute an operation from. + /// @param [in] op The operation hash id in the system to execute. + /// @param [in] payload Additional parameters and data to send to the remote endpoint. + void Send(bool deltaLocked, bool encrypted, bool ensure, UInt_64 sys, UInt_64 op, const Serializer& payload); + + /// Sends data to the remote endpoint. + /// @param [in] deltaLocked Whether or not to match the remote endpoint's delta time to prevent overloading the client. This will drop data if delta time does not match. + /// @param [in] encrypted Whether or not to encrypt this data before sending to the remote endpoint. + /// @param [in] ensure Whether or not to ensure the data was received by the remote endpoint. + /// @param [in] sys The system string id to execute an operation from. + /// @param [in] op The operation string id in the system to execute. + /// @param [in] payload Additional parameters and data to send to the remote endpoint. + void Send(bool deltaLocked, bool encrypted, bool ensure, const Str_8& sys, const Str_8& op, const Serializer& payload); + + UInt_64 GetNextRecvId() const; + + Str_8 GetAddress() const; + + UInt_16 GetPort() const; + + void SetDeltaRate(float newDeltaRate); + + float GetDeltaRate() const; + + float GetTimeout() const; + + float GetLastPing() const; + + void SendLatency(); + + float GetLatency() const; + + UInt_64 GetQueueSlot() const; + + private: + void Poll(float delta); + + void SetStatus(Status newStatus); + + void RemoveInsurance(UInt_64 msgId, UInt_64 fragment); + + void AddReceived(const Header& header, const Serializer<>& payload); + + Vector* GetReceived(); + + void Ping(float delta); + + void Pong(float delta); + + void SetLatency(float newLatency); + + void SetQueueSlot(UInt_64 slot); + + Fragments FragmentData(const Header& header, const Serializer<>& data); + + void Send(const Header& header, const Serializer<>& payload); + + bool SortingNeeded() const; + + void SortReceived(); + }; +} diff --git a/include/ehs/io/socket/ehc/NetOp.h b/include/ehs/io/socket/ehc/NetOp.h new file mode 100644 index 0000000..4a86fdc --- /dev/null +++ b/include/ehs/io/socket/ehc/NetOp.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include + +namespace ehs +{ + class EHC; + class NetSystem; + class NetEnd; + + class NetOp + { + private: + friend class NetSystem; + + UInt_64 hashId; + Str_8 id; + + public: + virtual ~NetOp() = default; + + NetOp(); + + NetOp(Str_8 id); + + NetOp(NetOp &&op) noexcept; + + NetOp(const NetOp &op); + + NetOp &operator=(NetOp &&op) noexcept; + + NetOp &operator=(const NetOp &op); + + Str_8 GetId() const; + + UInt_64 GetHashId() const; + + private: + virtual void Process(EHC *ehc, NetEnd *endpoint, NetSystem *sys, Serializer &payload); + }; +} \ No newline at end of file diff --git a/include/ehs/io/socket/ehc/NetSystem.h b/include/ehs/io/socket/ehc/NetSystem.h new file mode 100644 index 0000000..cd90cf5 --- /dev/null +++ b/include/ehs/io/socket/ehc/NetSystem.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include + +namespace ehs +{ + class EHC; + class NetEnd; + class NetOp; + + class NetSystem + { + private: + friend class EHC; + + UInt_64 hashId; + Str_8 id; + Array ops; + + public: + virtual ~NetSystem(); + + NetSystem(); + + NetSystem(Str_8 id); + + NetSystem(NetSystem &&sys) noexcept; + + NetSystem(const NetSystem &sys); + + NetSystem &operator=(NetSystem &&sys) noexcept; + + NetSystem &operator=(const NetSystem &sys); + + Str_8 GetId() const; + + UInt_64 GetHashId() const; + + bool HasOperation(UInt_64 hashId) const; + + bool AddOperation(NetOp *op); + + private: + void Execute(EHC *ehc, NetEnd *endpoint, UInt_64 hashId, Serializer &payload); + }; +} \ No newline at end of file diff --git a/include/ehs/io/socket/ehc/Utils.h b/include/ehs/io/socket/ehc/Utils.h new file mode 100644 index 0000000..ee6d4f0 --- /dev/null +++ b/include/ehs/io/socket/ehc/Utils.h @@ -0,0 +1,54 @@ +#pragma once + +#include + +namespace ehs +{ + enum class EndDisp : UInt_8 + { + ENDPOINT, + SERVICE + }; + + enum class Status : UInt_8 + { + ACTIVE, + PENDING, + IN_LOCAL_QUEUE, + IN_REMOTE_QUEUE, + }; + + struct Header + { + bool encrypted = true; + UInt_64 id = 0; + UInt_64 fragments = 0; + UInt_64 fragment = 0; + bool ensure = false; + EndDisp disposition = EndDisp::ENDPOINT; + Char_8 token[64] = {}; + UInt_64 system = 0; + UInt_64 op = 0; + }; + + struct Packet + { + Header header; + Serializer payload; + }; + + struct Insurance + { + Header header; + Serializer payload; + float lastResend; + }; +} + +#ifndef COMMS_IPV4_PAYLOAD + #define COMMS_IPV4_PAYLOAD (EHS_IPV4_UDP_PAYLOAD - (UInt_16)sizeof(Header)) +#endif + +#ifndef COMMS_IPV6_PAYLOAD + #define COMMS_IPV6_PAYLOAD (EHS_IPV6_UDP_PAYLOAD - (UInt_16)sizeof(Header)) +#endif \ No newline at end of file diff --git a/src/io/socket/EHC.cpp b/src/io/socket/EHC.cpp new file mode 100644 index 0000000..f6ae877 --- /dev/null +++ b/src/io/socket/EHC.cpp @@ -0,0 +1,1154 @@ +#include "ehs/io/socket/EHC.h" +#include "ehs/io/socket/ehc/NetSystem.h" +#include "ehs/io/socket/ehc/NetEnd.h" + +#include "ehs/Log.h" +#include "ehs/PRNG.h" +#include + +namespace ehs +{ + const Version EHC::version(1, 0, 0); + const UInt_64 EHC::internalSys = Str_8::Hash_64("Internal"); + const UInt_64 EHC::connectOp = Str_8::Hash_64("Connect"); + const UInt_64 EHC::connectedOp = Str_8::Hash_64("Connected"); + const UInt_64 EHC::rejectedOp = Str_8::Hash_64("Rejected"); + const UInt_64 EHC::disconnectOp = Str_8::Hash_64("Disconnect"); + const UInt_64 EHC::disconnectedOp = Str_8::Hash_64("Disconnected"); + const UInt_64 EHC::statusUpdateOp = Str_8::Hash_64("StatusUpdate"); + const UInt_64 EHC::pingOp = Str_8::Hash_64("Ping"); + const UInt_64 EHC::pongOp = Str_8::Hash_64("Pong"); + const UInt_64 EHC::latencyOp = Str_8::Hash_64("Latency"); + const UInt_64 EHC::receivedOp = Str_8::Hash_64("Received"); + + EHC::~EHC() + { + Release(); + } + + EHC::EHC() + : udp(AddrType::IPV6), disposition(EndDisp::ENDPOINT), hashName(0), dropPackets(false), buffer(nullptr), bufferSize(0), + maxEndpoints(0), lastTSC(0), delta(0.0f), maxTimeout(5.0f), resendRate(0.5f), connectCb(nullptr), + connectedCb(nullptr), activeCb(nullptr), disconnectCb(nullptr), disconnectedCb(nullptr), timeoutCb(nullptr) + { + } + + EHC::EHC(const Version &ver, Str_8 name, const UInt_64 maxEndpoints) + : udp(AddrType::IPV6), appVer(ver), disposition(EndDisp::ENDPOINT), hashName(name.Hash_64()), name((Str_8&&)name), + dropPackets(false), buffer(nullptr), bufferSize(0), maxEndpoints(maxEndpoints), lastTSC(CPU::GetTSC()), + delta(0.0f), maxTimeout(5.0f), resendRate(0.5f), connectCb(nullptr), connectedCb(nullptr), activeCb(nullptr), + disconnectCb(nullptr), disconnectedCb(nullptr), timeoutCb(nullptr) + { + Initialize(); + } + + EHC::EHC(const Version &ver, Str_8 name) + : udp(AddrType::IPV6), appVer(ver), disposition(EndDisp::ENDPOINT), hashName(name.Hash_64()), name((Str_8&&)name), + dropPackets(false), buffer(nullptr), bufferSize(0), maxEndpoints(0), lastTSC(CPU::GetTSC()), + delta(0.0f), maxTimeout(5.0f), resendRate(0.5f), connectCb(nullptr), connectedCb(nullptr), activeCb(nullptr), + disconnectCb(nullptr), disconnectedCb(nullptr), timeoutCb(nullptr) + { + Initialize(); + } + + EHC::EHC(EHC &&sock) noexcept + : udp((UDP&&)sock.udp), appVer(sock.appVer), disposition(sock.disposition), hashName(sock.hashName), + name((Str_8&&)sock.name), dropPackets(sock.dropPackets), buffer(sock.buffer), bufferSize(sock.bufferSize), + systems((Array &&)sock.systems), maxEndpoints(sock.maxEndpoints), lastTSC(sock.lastTSC), + delta(sock.delta), maxTimeout(sock.maxTimeout), resendRate(sock.resendRate), connectCb(sock.connectCb), + connectedCb(sock.connectedCb), activeCb(sock.activeCb), disconnectCb(sock.disconnectCb), + disconnectedCb(sock.disconnectedCb), timeoutCb(sock.timeoutCb), endpoints((Vector &&)sock.endpoints) + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + endpoints[i]->owner = this; + + sock.appVer = {}; + sock.disposition = EndDisp::ENDPOINT; + sock.hashName = 0; + sock.dropPackets = false; + sock.buffer = nullptr; + sock.bufferSize = 0; + sock.maxEndpoints = 0; + sock.lastTSC = 0; + sock.delta = 0.0f; + sock.maxTimeout = 5.0f; + sock.resendRate = 0.5f; + sock.connectCb = nullptr; + sock.connectedCb = nullptr; + sock.activeCb = nullptr; + sock.disconnectCb = nullptr; + sock.disconnectedCb = nullptr; + sock.timeoutCb = nullptr; + } + + EHC::EHC(const EHC &sock) + : udp(sock.udp), appVer(sock.appVer), disposition(sock.disposition), hashName(sock.hashName), name(sock.name), + dropPackets(sock.dropPackets), buffer(nullptr), bufferSize(0), maxEndpoints(sock.maxEndpoints), + lastTSC(CPU::GetTSC()), delta(0.0f), maxTimeout(sock.maxTimeout), resendRate(sock.resendRate), + connectCb(nullptr), connectedCb(nullptr), activeCb(nullptr), disconnectCb(nullptr), disconnectedCb(nullptr), + timeoutCb(nullptr) + { + Initialize(); + } + + EHC &EHC::operator=(EHC &&sock) noexcept + { + if (this == &sock) + return *this; + + udp = (UDP&&)sock.udp; + appVer = sock.appVer; + disposition = sock.disposition; + hashName = sock.hashName; + name = (Str_8&&)sock.name; + dropPackets = sock.dropPackets; + + delete[] buffer; + buffer = sock.buffer; + + bufferSize = sock.bufferSize; + systems = (Array &&)sock.systems; + maxEndpoints = sock.maxEndpoints; + lastTSC = sock.lastTSC; + delta = sock.delta; + maxTimeout = sock.maxTimeout; + resendRate = sock.resendRate; + connectCb = sock.connectCb; + connectedCb = sock.connectedCb; + activeCb = sock.activeCb; + disconnectCb = sock.disconnectCb; + disconnectedCb = sock.disconnectedCb; + timeoutCb = sock.timeoutCb; + + endpoints = (Vector&&)sock.endpoints; + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + endpoints[i]->owner = this; + + sock.appVer = {}; + sock.disposition = EndDisp::ENDPOINT; + sock.hashName = 0; + sock.dropPackets = false; + sock.buffer = nullptr; + sock.bufferSize = 0; + sock.maxEndpoints = 0; + sock.lastTSC = 0; + sock.delta = 0.0f; + sock.maxTimeout = 5.0f; + sock.resendRate = 0.5f; + sock.connectCb = nullptr; + sock.connectedCb = nullptr; + sock.activeCb = nullptr; + sock.disconnectCb = nullptr; + sock.disconnectedCb = nullptr; + sock.timeoutCb = nullptr; + + return *this; + } + + EHC &EHC::operator=(const EHC &sock) + { + if (this == &sock) + return *this; + + udp = sock.udp; + appVer = sock.appVer; + disposition = sock.disposition; + hashName = sock.hashName; + name = sock.name; + dropPackets = sock.dropPackets; + + delete[] buffer; + buffer = nullptr; + + bufferSize = 0; + systems = Array(); + maxEndpoints = sock.maxEndpoints; + lastTSC = 0; + delta = 0.0f; + maxTimeout = sock.maxTimeout; + resendRate = sock.resendRate; + connectCb = nullptr; + connectedCb = nullptr; + activeCb = nullptr; + disconnectCb = nullptr; + disconnectedCb = nullptr; + timeoutCb = nullptr; + endpoints = Vector(); + + Initialize(); + + return *this; + } + + void EHC::Initialize() + { + if (!udp.IsValid()) + return; + + udp.SetBlocking(false); + + if (udp.GetLocalAddressType() == AddrType::IPV4) + { + buffer = new Byte[EHS_IPV4_UDP_PAYLOAD]; + bufferSize = EHS_IPV4_UDP_PAYLOAD; + } + else if (udp.GetLocalAddressType() == AddrType::IPV6) + { + buffer = new Byte[EHS_IPV6_UDP_PAYLOAD]; + bufferSize = EHS_IPV6_UDP_PAYLOAD; + } + } + + void EHC::Release() + { + if (!udp.IsValid()) + return; + + disposition = EndDisp::ENDPOINT; + + delete[] buffer; + buffer = nullptr; + bufferSize = 0; + + Serializer payload(Endianness::LE); + payload.Write(0); + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetStatus() != Status::PENDING) + endpoints[i]->Send(false, true, false, internalSys, disconnectOp, payload); + + delete endpoints[i]; + } + endpoints.Clear(); + + for (UInt_64 i = 0; i < systems.Size(); ++i) + delete systems[i]; + + systems.Clear(); + + udp.Release(); + } + + void EHC::Bind(AddrType newType, const Str_8& newAddress, const UInt_16 newPort) + { + disposition = EndDisp::SERVICE; + + udp.Bind(newType, newAddress, newPort); + } + + void EHC::Connect(AddrType rType, const Str_8& rAddress, const UInt_16 rPort, Serializer data) + { + if (!udp.IsValid()) + return; + + Serializer payload(Endianness::LE); + payload.WriteStr(name); + payload.Write(CPU::GetArchitecture()); + payload.WriteVersion(version); + payload.WriteVersion(appVer); + payload.WriteSer((Serializer&&)data); + + NetEnd* end = new NetEnd(rType, rAddress, rPort); + end->owner = this; + end->Send(false, true, false, "Internal", "Connect", payload); + + endpoints.Push(end); + } + + bool EHC::Disconnect(const EndDisp endDisp, const Char_8 token[64], const Str_8& msg) + { + if (!udp.IsValid()) + return false; + + NetEnd* end = GetEndpoint(endDisp, token); + if (!end) + return false; + + Serializer<> payload(Endianness::LE); + payload.WriteStr(msg); + + end->Send(false, true, false, internalSys, disconnectOp, payload); + + return true; + } + + void EHC::Broadcast(const EndDisp endDisp, const Status endStatus, const bool deltaLocked, const bool encrypted, + const bool ensure, const UInt_64 sysHashId, const UInt_64 opHashId, const Serializer &payload) + { + if (!udp.IsValid()) + return; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() == endStatus) + endpoints[i]->Send(deltaLocked, encrypted, ensure, sysHashId, opHashId, payload); + } + } + + void EHC::Broadcast(const EndDisp endDisp, const Status endStatus, const bool deltaLocked, const bool encrypted, + const bool ensure, const Str_8 &sysId, const Str_8 &opId, + const Serializer &payload) + { + Broadcast(endDisp, endStatus, deltaLocked, encrypted, ensure, sysId.Hash_64(), opId.Hash_64(), payload); + } + + void EHC::Poll() + { + if (!udp.IsValid()) + return; + + UInt_64 newTSC = CPU::GetTSC(); + delta = (float)(newTSC - lastTSC) / (float)CPU::GetTSC_Freq(); + lastTSC = newTSC; + + AddrType rType = AddrType::IPV6; + Str_8 rAddress; + UInt_16 rPort = 0; + + UInt_16 received; + + while ((received = udp.Receive(&rType, &rAddress, &rPort, buffer, bufferSize))) + { + Serializer<> payload(Endianness::LE, buffer, received); + + bool encrypted = payload.Read(); + if (encrypted) + Encryption::Encrypt_64(payload.Size() - payload.GetOffset(), &payload[payload.GetOffset()]); + + payload.SetOffset(0); + + Header header = payload.Read
(); + + if (!header.ensure && !header.token[0] && header.system == internalSys && header.op == connectOp) + { + Str_8 endId = payload.ReadStr(); + Architecture rArch = payload.Read(); + + NetEnd* end = new NetEnd(header.disposition, (Str_8&&)endId, rArch, rType, rAddress, rPort); + end->owner = this; + end->SetStatus(Status::PENDING); + + Serializer sPayload(Endianness::LE); + + Version rVer = payload.ReadVersion(); + if (rVer != version) + { + sPayload.WriteStr("Your Event Horizon Socket Layer version " + + Str_8::FromNum(rVer.major) + "." + Str_8::FromNum(rVer.minor) + "." + Str_8::FromNum(rVer.patch) + + " does not match remote endpoint version " + + Str_8::FromNum(version.major) + "." + Str_8::FromNum(version.minor) + "." + Str_8::FromNum(version.patch) + + ". Connection rejected."); + + end->Send(false, true, false, internalSys, rejectedOp, sPayload); + continue; + } + + Version rAppVer = payload.ReadVersion(); + if (rAppVer != appVer) + { + sPayload.WriteStr("Your application version " + + Str_8::FromNum(rAppVer.major) + "." + Str_8::FromNum(rAppVer.minor) + "." + Str_8::FromNum(rAppVer.patch) + + " does not match remote endpoint application version " + + Str_8::FromNum(appVer.major) + "." + Str_8::FromNum(appVer.minor) + "." + Str_8::FromNum(appVer.patch) + + ". Connection rejected."); + + end->Send(false, true, false, internalSys, rejectedOp, sPayload); + continue; + } + + GenerateToken(end->token); + + if (connectCb && !connectCb(this, &end, {Endianness::LE, &payload[payload.GetOffset()], payload.Size() - payload.GetOffset()})) + { + sPayload.WriteStr("Connection rejected."); + end->Send(false, true, false, internalSys, rejectedOp, sPayload); + continue; + } + + endpoints.Push(end); + + sPayload.Write(CPU::GetArchitecture()); + sPayload.WriteStr(name); + + UInt_64 active = GetEndpointsCount(EndDisp::ENDPOINT, Status::ACTIVE); + + if (maxEndpoints && active >= maxEndpoints) + { + end->SetStatus(Status::IN_LOCAL_QUEUE); + + UpdateQueue(active); + + sPayload.Write(Status::IN_REMOTE_QUEUE); + sPayload.Write(end->GetQueueSlot()); + } + else + { + end->SetStatus(Status::ACTIVE); + + if (activeCb) + activeCb(this, end); + + sPayload.Write(Status::ACTIVE); + sPayload.Write(0); + } + + end->Send(false, true, false, internalSys, connectedOp, sPayload); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == connectedOp) + { + NetEnd* end = GetEndpoint(rAddress, rPort); + if (!end || end->GetStatus() != Status::PENDING) + continue; + + Str_8 endId = payload.ReadStr(); + Architecture arch = payload.Read(); + + *end = NetEnd(header.disposition, (Str_8&&)endId, arch, rType, rAddress, rPort); + end->owner = this; + + Util::Copy(end->token, header.token, 64); + + end->SetStatus(payload.Read()); + end->SetQueueSlot(payload.Read()); + + if (connectedCb) + connectedCb(this, end); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == rejectedOp) + { + if (!RemoveEndpoint(rAddress, rPort)) + continue; + + Str_8 msg = payload.ReadStr(); + if (msg.Size()) + EHS_LOG_INT(LogType::INFO, 3, msg); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == disconnectOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + end->Send(false, true, false, internalSys, disconnectedOp, {}); + + if (disconnectCb) + disconnectCb(this, end, {Endianness::LE, &payload[payload.GetOffset()], payload.Size() - payload.GetOffset()}); + + RemoveEndpoint(header.disposition, end->token); + + UpdateQueue(); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == disconnectedOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + if (disconnectedCb) + disconnectedCb(this, end); + + RemoveEndpoint(end); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == statusUpdateOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + Status newStatus = payload.Read(); + UInt_64 newSlot = payload.Read(); + + if (end->GetStatus() == Status::ACTIVE) + { + if (activeCb) + activeCb(this, end); + + //EHS_LOG_INT(LogType::INFO, 5, "Your connection status to " + end->GetId() + " has now become active."); + } + else if (end->GetStatus() == Status::IN_REMOTE_QUEUE && newStatus == Status::IN_REMOTE_QUEUE) + { + //EHS_LOG_INT(LogType::INFO, 5, "Your queue slot for " + end->GetId() + " is now " + newSlot + "."); + } + + end->SetStatus(newStatus); + end->SetQueueSlot(newSlot); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == pingOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + end->SetDeltaRate(payload.Read()); + end->Pong(delta); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == pongOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + end->SetDeltaRate(payload.Read()); + end->SendLatency(); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == latencyOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + end->SetLatency(payload.Read()); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == receivedOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + const UInt_64 msgId = payload.Read(); + const UInt_64 fragment = payload.Read(); + + end->RemoveInsurance(msgId, fragment); + } + else if (header.token[0]) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + if (dropPackets && !header.ensure && header.id < end->GetNextRecvId()) + { + EHS_LOG_INT(LogType::INFO, 6, "Old packet intentionally dropped."); + continue; + } + + if (header.ensure) + { + Serializer sPayload(Endianness::LE); + sPayload.Write(header.id); + sPayload.Write(header.fragment); + + end->Send(false, true, false, internalSys, receivedOp, sPayload); + } + + end->AddReceived( + header, + Serializer<>(Endianness::LE, &payload[payload.GetOffset()], payload.Size() - payload.GetOffset()) + ); + } + else + { + EHS_LOG_INT(LogType::INFO, 7, "Corrupted packet."); + } + } + + PollEndpoints(); + } + + bool EHC::IsInitialized() const + { + return udp.IsValid(); + } + + AddrType EHC::GetLocalAddressType() const + { + return udp.GetLocalAddressType(); + } + + Str_8 EHC::GetLocalAddress() const + { + return udp.GetLocalAddress(); + } + + UInt_16 EHC::GetLocalPort() const + { + return udp.GetLocalPort(); + } + + bool EHC::IsBound() const + { + return udp.IsBound(); + } + + Version EHC::GetVersion() + { + return version; + } + + Version EHC::GetAppVersion() const + { + return appVer; + } + + EndDisp EHC::GetDisposition() const + { + return disposition; + } + + UInt_64 EHC::GetHashName() const + { + return hashName; + } + + Str_8 EHC::GetName() const + { + return name; + } + + void EHC::EnableDropPackets(const bool enable) + { + dropPackets = enable; + } + + bool EHC::IsDropPacketsEnabled() const + { + return dropPackets; + } + + bool EHC::HasSystem(const UInt_64 sysHashId) const + { + if (internalSys == sysHashId) + return true; + + for (UInt_64 i = 0; i < systems.Size(); ++i) + if (systems[i]->GetHashId() == sysHashId) + return true; + + return false; + } + + bool EHC::HasSystem(const Str_8& sysId) const + { + return HasSystem(sysId.Hash_64()); + } + + bool EHC::AddSystem(NetSystem *sys) + { + if (HasSystem(sys->GetHashId())) + return false; + + systems.Push(sys); + + return true; + } + + NetSystem* EHC::GetSystem(const UInt_64 sysHashId) const + { + for (UInt_64 i = 0; i < systems.Size(); ++i) + if (systems[i]->GetHashId() == sysHashId) + return systems[i]; + + return nullptr; + } + + NetSystem* EHC::GetSystem(const Str_8& sysId) const + { + return GetSystem(sysId.Hash_64()); + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Status endStatus, const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() != endStatus) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + return true; + } + + return false; + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Status endStatus, const UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() != endStatus) + continue; + + if (endpoints[i]->GetHashName() == hashName) + return true; + } + + return false; + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Status endStatus, const Str_8 &id) const + { + return HasEndpoint(endDisp, endStatus, id.Hash_64()); + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + return true; + } + + return false; + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetHashName() == hashName) + return true; + } + + return false; + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Str_8 &id) const + { + return HasEndpoint(endDisp, id.Hash_64()); + } + + bool EHC::HasEndpoint(const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (Util::Compare(endpoints[i]->token, token, 64)) + return true; + + return false; + } + + bool EHC::HasEndpoint(UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetHashName() == hashName) + return true; + + return false; + } + + bool EHC::HasEndpoint(const Str_8 &id) const + { + return HasEndpoint(id.Hash_64()); + } + + bool EHC::HasEndpoint(const Str_8& rAddress, const UInt_16 rPort) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetAddress() == rAddress && endpoints[i]->GetPort() == rPort) + return true; + + return false; + } + + NetEnd* EHC::GetEndpoint(const EndDisp endDisp, const Status endStatus, const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() != endStatus) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + return endpoints[i]; + } + + return nullptr; + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const Status endStatus, const UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() != endStatus) + continue; + + if (endpoints[i]->GetHashName() == hashName) + return endpoints[i]; + } + + return nullptr; + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const Status endStatus, const Str_8 &id) const + { + return GetEndpoint(endDisp, endStatus, id.Hash_64()); + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + return endpoints[i]; + } + + return nullptr; + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetHashName() == hashName) + return endpoints[i]; + } + + return nullptr; + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const Str_8 &id) const + { + return GetEndpoint(endDisp, id.Hash_64()); + } + + NetEnd *EHC::GetEndpoint(const Str_8& rAddress, const UInt_16 rPort) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetAddress() == rAddress && endpoints[i]->GetPort() == rPort) + return endpoints[i]; + + return nullptr; + } + + Array EHC::GetEndpoints(const EndDisp endDisp, const Status endStatus) + { + Array result(endpoints.Size()); + UInt_64 count = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() == endStatus) + result[count++] = endpoints[i]; + } + + result.Resize(count); + + return result; + } + + Array EHC::GetEndpoints(const EndDisp endDisp) + { + Array result(endpoints.Size()); + UInt_64 count = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetDisposition() == endDisp) + result[count++] = endpoints[i]; + + result.Resize(count); + + return result; + } + + UInt_64 EHC::GetEndpointsCount(const EndDisp endDisp, const Status endStatus) + { + UInt_64 count = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() == endStatus) + ++count; + } + + return count; + } + + UInt_64 EHC::GetEndpointsCount(const EndDisp endDisp) + { + UInt_64 count = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetDisposition() == endDisp) + ++count; + + return count; + } + + UInt_64 EHC::GetMaxEndpoints() const + { + return maxEndpoints; + } + + void EHC::SetMaxTimeout(const float seconds) + { + maxTimeout = seconds; + } + + float EHC::GetMaxTimeout() const + { + return maxTimeout; + } + + void EHC::SetResendRate(const float seconds) + { + resendRate = seconds; + } + + float EHC::GetResendRate() const + { + return resendRate; + } + + void EHC::SetConnectCb(const ConnectCb cb) + { + connectCb = cb; + } + + void EHC::SetConnectedCb(const ConnectedCb cb) + { + connectedCb = cb; + } + + void EHC::SetActiveCb(const ActiveCb cb) + { + activeCb = cb; + } + + void EHC::SetDisconnectCb(const DisconnectCb cb) + { + disconnectCb = cb; + } + + void EHC::SetDisconnectedCb(const DisconnectedCb cb) + { + disconnectedCb = cb; + } + + void EHC::SetTimeoutCb(const TimeoutCb cb) + { + timeoutCb = cb; + } + + void EHC::GenerateToken(Char_8 in[64]) + { + PRNG_u64 rng(CPU::GetTSC()); + + for (UInt_64 i = 0; i < 8; ++i) + { + do + ((UInt_64*)in)[i] = rng.Generate(); + while (!i && ((UInt_64*)in)[i] == 0); + } + + if (HasEndpoint(in)) + GenerateToken(in); + } + + void EHC::UpdateQueue(UInt_64 active) + { + UInt_64 slot = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetStatus() == Status::IN_LOCAL_QUEUE) + { + if (active < maxEndpoints) + { + endpoints[i]->SetStatus(Status::ACTIVE); + endpoints[i]->SetQueueSlot(0); + + Serializer payload(Endianness::LE); + payload.Write(Status::ACTIVE); + payload.Write(0); + + endpoints[i]->Send(false, true, false, internalSys, statusUpdateOp, payload); + + if (activeCb) + activeCb(this, endpoints[i]); + + ++active; + } + else + { + if (endpoints[i]->GetQueueSlot() != slot) + { + Serializer payload(Endianness::LE); + payload.Write(Status::IN_REMOTE_QUEUE); + payload.Write(slot); + + endpoints[i]->Send(false, true, false, internalSys, statusUpdateOp, payload); + + endpoints[i]->SetQueueSlot(slot++); + } + else + { + ++slot; + } + } + } + } + } + + void EHC::UpdateQueue() + { + UpdateQueue(GetEndpointsCount(EndDisp::ENDPOINT, Status::ACTIVE)); + } + + bool EHC::RemoveEndpoint(const EndDisp endDisp, const Char_8 token[64]) + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + { + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + return true; + } + } + + return false; + } + + bool EHC::RemoveEndpoint(const Str_8& rAddress, const UInt_16 rPort) + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetAddress() == rAddress && endpoints[i]->GetPort() == rPort) + { + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + return true; + } + } + + return false; + } + + bool EHC::RemoveEndpoint(const NetEnd* const end) + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i] == end) + { + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + return true; + } + } + + return false; + } + + void EHC::PollEndpoints() + { + UInt_64 i = 0; + while (i < endpoints.Size()) + { + endpoints[i]->Poll(delta); + + if (endpoints[i]->GetStatus() == Status::PENDING) + { + if (endpoints[i]->GetTimeout() >= maxTimeout) + { + if (timeoutCb) + timeoutCb(this, endpoints[i]); + + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + continue; + } + } + else + { + if (endpoints[i]->GetTimeout() >= maxTimeout) + { + if (timeoutCb) + timeoutCb(this, endpoints[i]); + + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + UpdateQueue(); + + continue; + } + + Vector* frags = endpoints[i]->GetReceived(); + + UInt_64 f = 0; + while (f < frags->Size()) + { + if (!(*frags)[f].IsComplete()) + { + ++f; + continue; + } + + Packet packet = (*frags)[f].Combine(); + + NetSystem* sys = GetSystem(packet.header.system); + if (!sys) + { + ++f; + continue; + } + + sys->Execute(this, endpoints[i], packet.header.op, packet.payload); + + frags->Swap(f, frags->End()); + frags->Pop(); + } + } + + ++i; + } + } +} diff --git a/src/io/socket/ehc/Fragments.cpp b/src/io/socket/ehc/Fragments.cpp new file mode 100644 index 0000000..4c2f8f4 --- /dev/null +++ b/src/io/socket/ehc/Fragments.cpp @@ -0,0 +1,122 @@ +#include "ehs/io/socket/ehc/Fragments.h" + +namespace ehs +{ + Fragments::~Fragments() + { + delete[] data; + } + + Fragments::Fragments() + : data(nullptr), size(0) + { + } + + Fragments::Fragments(const Header &header, const Serializer &payload) + : header(header), data(new Serializer[header.fragments]), size(header.fragments) + { + this->header.fragment = 0; + data[header.fragment] = payload; + } + + Fragments::Fragments(const Header &header, const UInt_64 size) + : header(header), data(new Serializer[size]), size(size) + { + this->header.fragments = size; + this->header.fragment = 0; + } + + Fragments::Fragments(Fragments &&frags) noexcept + : header(frags.header), data(frags.data), size(frags.size) + { + frags.header = {}; + frags.data = nullptr; + frags.size = 0; + } + + Fragments::Fragments(const Fragments &frags) + : header(frags.header), data(new Serializer[frags.size]), size(frags.size) + { + for (UInt_64 i = 0; i < size; ++i) + data[i] = frags.data[i]; + } + + Fragments &Fragments::operator=(Fragments &&frags) noexcept + { + if (this == &frags) + return *this; + + header = frags.header; + + delete[] data; + data = frags.data; + + size = frags.size; + + frags.header = {}; + frags.data = nullptr; + frags.size = 0; + + return *this; + } + + Fragments &Fragments::operator=(const Fragments &frags) + { + if (this == &frags) + return *this; + + header = frags.header; + delete[] data; + data = new Serializer[frags.size]; + for (UInt_64 i = 0; i < frags.size; ++i) + data[i] = frags.data[i]; + size = frags.size; + + return *this; + } + + Fragments::operator Serializer *() const + { + return data; + } + + Header Fragments::GetHeader() const + { + return header; + } + + UInt_64 Fragments::Size() const + { + return size; + } + + bool Fragments::IsComplete() const + { + for (UInt_64 i = 0; i < size; ++i) + if (!data[i].Size()) + return false; + + return true; + } + + Packet Fragments::Combine() const + { + UInt_64 rSize = 0; + for (UInt_64 i = 0; i < size; ++i) + rSize += data[i].Size(); + + Packet result = + { + header, + {Endianness::LE, rSize} + }; + result.header.fragments = 0; + + for (UInt_64 i = 0; i < size; ++i) + result.payload.WriteSer(data[i]); + + result.payload.SetOffset(0); + + return result; + } +} \ No newline at end of file diff --git a/src/io/socket/ehc/NetEnd.cpp b/src/io/socket/ehc/NetEnd.cpp new file mode 100644 index 0000000..2ea1166 --- /dev/null +++ b/src/io/socket/ehc/NetEnd.cpp @@ -0,0 +1,512 @@ +#include "ehs/io/socket/ehc/NetEnd.h" +#include "ehs/io/socket/EHC.h" + +#include +#include + +namespace ehs +{ + NetEnd::NetEnd() + : owner(nullptr), disposition(EndDisp::ENDPOINT), hashName(0), status(Status::PENDING), + arch(Architecture::UNKNOWN), token{}, nextSendId(0), nextRecvId(0), type(AddrType::IPV6), port(0), + deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), + queueSlot(0) + { + } + + NetEnd::NetEnd(const EndDisp disposition, Str_8 id, const Architecture arch, const AddrType type, + Str_8 address, const UInt_16 port) + : owner(nullptr), disposition(disposition), hashName(id.Hash_64()), name((Str_8&&)id), status(Status::ACTIVE), + arch(arch), token{}, nextSendId(0), nextRecvId(0), type(type), address((Str_8&&)address), port(port), + deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), + queueSlot(0) + { + } + + NetEnd::NetEnd(const AddrType type, Str_8 address, const UInt_16 port) + : owner(nullptr), disposition(EndDisp::ENDPOINT), hashName(0), status(Status::PENDING), + arch(Architecture::UNKNOWN), token{}, nextSendId(0), nextRecvId(0), type(type), address((Str_8&&)address), + port(port), deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), + latency(0.0f), queueSlot(0) + { + } + + NetEnd::NetEnd(NetEnd &&end) noexcept + : owner(end.owner), disposition(end.disposition), hashName(end.hashName), name((Str_8&&)end.name), status(end.status), arch(end.arch), token{}, + nextSendId(end.nextSendId), sent((Vector&&)end.sent), nextRecvId(end.nextRecvId), + received((Vector&&)end.received), type(end.type), address((Str_8&&)end.address), port(end.port), + deltaDuration(end.deltaDuration), deltaRate(end.deltaRate), timeout(end.timeout), lastPing(end.lastPing), + oldLatency(end.oldLatency), latency(end.latency), queueSlot(end.queueSlot) + { + end.owner = nullptr; + end.disposition = EndDisp::ENDPOINT; + end.hashName = 0; + end.status = Status::PENDING; + end.arch = Architecture::UNKNOWN; + Util::Copy(token, end.token, 64); + Util::Zero(end.token, 64); + end.nextSendId = 0; + end.nextRecvId = 0; + end.type = AddrType::IPV6; + end.port = 0; + end.deltaDuration = 0.0f; + end.deltaRate = 1.0f / 60.0f; + end.timeout = 0.0f; + end.lastPing = 0.0f; + end.oldLatency = 0.0f; + end.latency = 0.0f; + end.queueSlot = 0; + } + + NetEnd::NetEnd(const NetEnd& end) + : owner(nullptr), disposition(EndDisp::ENDPOINT), hashName(end.hashName), name(end.name), status(Status::PENDING), arch(Architecture::UNKNOWN), + token{}, nextSendId(0), nextRecvId(0), type(end.type), port(0), deltaDuration(0.0f), deltaRate(1.0f / 60.0f), + timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), queueSlot(0) + { + } + + NetEnd& NetEnd::operator=(NetEnd&& end) noexcept + { + if (this == &end) + return *this; + + owner = end.owner; + disposition = end.disposition; + hashName = end.hashName; + name = (Str_8&&)end.name; + status = end.status; + arch = end.arch; + Util::Copy(token, end.token, 64); + nextSendId = end.nextSendId; + sent = (Vector&&)end.sent; + nextRecvId = end.nextRecvId; + received = (Vector&&)end.received; + type = end.type; + address = (Str_8&&)end.address; + port = end.port; + deltaDuration = end.deltaDuration; + deltaRate = end.deltaRate; + timeout = end.timeout; + lastPing = end.lastPing; + oldLatency = end.oldLatency; + latency = end.latency; + queueSlot = end.queueSlot; + + end.owner = nullptr; + end.disposition = EndDisp::ENDPOINT; + end.hashName = 0; + end.status = Status::PENDING; + end.arch = Architecture::UNKNOWN; + Util::Zero(end.token, 64); + end.nextSendId = 0; + end.nextRecvId = 0; + end.type = AddrType::IPV6; + end.port = 0; + end.deltaDuration = 0.0f; + end.deltaRate = 1.0f / 60.0f; + end.timeout = 0.0f; + end.lastPing = 0.0f; + end.oldLatency = 0.0f; + end.latency = 0.0f; + end.queueSlot = 0; + + return *this; + } + + NetEnd &NetEnd::operator=(const NetEnd &end) + { + if (this == &end) + return *this; + + owner = nullptr; + disposition = EndDisp::ENDPOINT; + hashName = end.hashName; + name = end.name; + status = Status::PENDING; + arch = Architecture::UNKNOWN; + Util::Zero(token, 64); + nextSendId = 0; + sent = {}; + nextRecvId = 0; + received = {}; + type = AddrType::IPV6; + address = {}; + port = 0; + deltaDuration = 0.0f; + deltaRate = 1.0f / 60.0f; + timeout = 0.0f; + lastPing = 0.0f; + oldLatency = 0.0f; + latency = 0.0f; + queueSlot = 0; + + return *this; + } + + EndDisp NetEnd::GetDisposition() const + { + return disposition; + } + + UInt_64 NetEnd::GetHashName() const + { + return hashName; + } + + Str_8 NetEnd::GetName() const + { + return name; + } + + Status NetEnd::GetStatus() const + { + return status; + } + + Architecture NetEnd::GetArchitecture() const + { + return arch; + } + + UInt_64 NetEnd::GetNextSendId() const + { + return nextSendId; + } + + void NetEnd::Send(const bool deltaLocked, const bool encrypted, const bool ensure, const UInt_64 sys, + const UInt_64 op, const Serializer<>& payload) + { + if (deltaLocked && deltaDuration < deltaRate) + return; + + Header header = { + encrypted, + nextSendId++, + 1, + 0, + ensure, + owner->GetDisposition(), + "", + sys, + op + }; + + Util::Copy(header.token, token, 64); + + if ((owner->GetLocalAddressType() == AddrType::IPV6 && payload.Size() > COMMS_IPV6_PAYLOAD) || (owner->GetLocalAddressType() == AddrType::IPV4 && payload.Size() > COMMS_IPV4_PAYLOAD)) + { + Fragments frags = FragmentData(header, payload); + for (UInt_64 i = 0; i < frags.Size(); ++i) + { + Header newHeader = frags.GetHeader(); + newHeader.fragment = i; + + Send(newHeader, frags[i]); + } + } + else + { + Send(header, payload); + } + } + + void NetEnd::Send(const bool deltaLocked, const bool encrypted, const bool ensure, const Str_8& sys, + const Str_8& op, const Serializer<>& payload) + { + Send(deltaLocked, encrypted, ensure, sys.Hash_64(), op.Hash_64(), payload); + } + + UInt_64 NetEnd::GetNextRecvId() const + { + return nextRecvId; + } + + Str_8 NetEnd::GetAddress() const + { + return address; + } + + UInt_16 NetEnd::GetPort() const + { + return port; + } + + float NetEnd::GetDeltaRate() const + { + return deltaRate; + } + + float NetEnd::GetTimeout() const + { + return timeout; + } + + float NetEnd::GetLastPing() const + { + return lastPing; + } + + float NetEnd::GetLatency() const + { + return oldLatency; + } + + UInt_64 NetEnd::GetQueueSlot() const + { + return queueSlot; + } + + void NetEnd::Poll(const float delta) + { + SortReceived(); + + if (deltaDuration >= deltaRate) + deltaDuration = Math::Mod(deltaDuration, deltaRate); + + deltaDuration += delta; + timeout += delta; + latency += delta; + + if (sent.Size()) + { + for (UInt_64 i = 0; i < sent.Size(); ++i) + { + sent[i].lastResend += delta; + if (sent[i].lastResend >= owner->GetResendRate()) + { + Serializer result(Endianness::LE); + result.Write(sent[i].header); + result.WriteSer(sent[i].payload); + + if (sent[i].header.encrypted) + Encryption::Encrypt_64(result.Size() - sizeof(bool), &result[sizeof(bool)]); + + owner->udp.Send(type, address, port, result, result.Size()); + + sent[i].lastResend = Math::Mod(sent[i].lastResend, owner->GetResendRate()); + } + } + } + + if (owner->GetDisposition() == EndDisp::SERVICE) + { + lastPing += delta; + if (lastPing >= 1.0f) + Ping(delta); + } + } + + void NetEnd::SetStatus(const Status newStatus) + { + status = newStatus; + } + + void NetEnd::RemoveInsurance(const UInt_64 msgId, const UInt_64 fragment) + { + for (UInt_64 i = 0; i < sent.Size(); ++i) + { + if (sent[i].header.id == msgId && sent[i].header.fragment == fragment) + { + sent.Remove(i); + break; + } + } + + timeout = 0.0f; + } + + void NetEnd::AddReceived(const Header& header, const Serializer<>& payload) + { + Fragments* frags = nullptr; + + for (UInt_64 i = 0; i < received.Size(); ++i) + { + if (received[i].GetHeader().id == header.id) + { + if (received[i][header.fragment].Size()) + return; + + frags = &received[i]; + break; + } + } + + if (header.id > nextRecvId) + nextRecvId = header.id + 1; + + if (frags) + (*frags)[header.fragment] = payload; + else + received.Push({header, payload}); + + timeout = 0.0f; + } + + Vector* NetEnd::GetReceived() + { + return &received; + } + + void NetEnd::SetDeltaRate(const float newDeltaRate) + { + deltaRate = newDeltaRate; + } + + void NetEnd::Ping(const float delta) + { + Serializer payload(Endianness::LE); + payload.Write(delta); + + Send(false, true, false, "Internal", "Ping", payload); + lastPing = 0.0f; + latency = 0.0f; + } + + void NetEnd::Pong(const float delta) + { + Serializer payload(Endianness::LE); + payload.Write(delta); + + Send(false, true, false, "Internal", "Pong", payload); + + timeout = 0.0f; + } + + void NetEnd::SendLatency() + { + oldLatency = latency * 1000; + + Serializer sPayload(Endianness::LE); + sPayload.Write(oldLatency); + + Send(false, true, false, "Internal", "Latency", sPayload); + + latency = 0.0f; + timeout = 0.0f; + } + + void NetEnd::SetLatency(const float newLatency) + { + oldLatency = newLatency; + } + + void NetEnd::SetQueueSlot(const UInt_64 slot) + { + queueSlot = slot; + } + + Fragments NetEnd::FragmentData(const Header& header, const Serializer<>& data) + { + Fragments result; + + if (owner->GetLocalAddressType() == AddrType::IPV6) + { + UInt_64 frags = data.Size() / COMMS_IPV6_PAYLOAD; + if (data.Size() % COMMS_IPV6_PAYLOAD) + ++frags; + + result = Fragments(header, frags); + + UInt_64 size = COMMS_IPV6_PAYLOAD; + + for (UInt_64 i = 0; i < result.Size(); ++i) + { + size = COMMS_IPV6_PAYLOAD; + if (i == result.Size() - 1) + size = data.Size() % COMMS_IPV6_PAYLOAD; + + result[i] = {data.GetEndianness(), &data[i * COMMS_IPV6_PAYLOAD], size}; + } + } + else if (owner->GetLocalAddressType() == AddrType::IPV4) + { + UInt_64 frags = data.Size() / COMMS_IPV4_PAYLOAD; + if (data.Size() % COMMS_IPV4_PAYLOAD) + ++frags; + + result = Fragments(header, frags); + + UInt_64 size = COMMS_IPV4_PAYLOAD; + + for (UInt_64 i = 0; i < result.Size(); ++i) + { + size = COMMS_IPV4_PAYLOAD; + if (i == result.Size() - 1) + size = data.Size() % COMMS_IPV4_PAYLOAD; + + result[i] = {data.GetEndianness(), &data[i * COMMS_IPV4_PAYLOAD], size}; + } + } + + return result; + } + + void NetEnd::Send(const Header& header, const Serializer& payload) + { + Serializer result(Endianness::LE); + result.Write(header); + result.WriteSer(payload); + + if (header.encrypted) + Encryption::Encrypt_64(result.Size() - sizeof(bool), &result[sizeof(bool)]); + + if (header.ensure) + sent.Push({header, payload}); + + owner->udp.Send(type, address, port, result, result.Size()); + } + + bool NetEnd::SortingNeeded() const + { + UInt_64 lastPacket = 0; + + for (UInt_64 i = 0; i < received.Size(); ++i) + { + if (received[i].GetHeader().id < lastPacket) + return true; + else + lastPacket = received[i].GetHeader().id; + } + + return false; + } + + void NetEnd::SortReceived() + { + if (!SortingNeeded()) + return; + + Vector sorted(0, received.Stride()); + + for (UInt_64 a = 0; a < received.Size(); ++a) + { + if (sorted.Size()) + { + for (UInt_64 b = sorted.Size(); b; --b) + { + if (received[a].GetHeader().id > sorted[b - 1].GetHeader().id) + { + if (b == sorted.Size()) + sorted.Push(received[a]); + else + sorted.Insert(b, received[a]); + + break; + } + else + { + sorted.Insert(b - 1, received[a]); + + break; + } + } + } + else + { + sorted.Push(received[a]); + } + } + + received = sorted; + } +} \ No newline at end of file diff --git a/src/io/socket/ehc/NetOp.cpp b/src/io/socket/ehc/NetOp.cpp new file mode 100644 index 0000000..82f51af --- /dev/null +++ b/src/io/socket/ehc/NetOp.cpp @@ -0,0 +1,66 @@ +#include "ehs/io/socket/ehc/NetOp.h" +#include "ehs/io/socket/EHC.h" +#include "ehs/io/socket/ehc/NetEnd.h" +#include "ehs/io/socket/ehc/NetSystem.h" + +namespace ehs +{ + NetOp::NetOp() + : hashId(0) + { + } + + NetOp::NetOp(Str_8 id) + : hashId(id.Hash_64()), id((Str_8 &&)id) + { + } + + NetOp::NetOp(NetOp&& op) noexcept + : hashId(op.hashId), id((Str_8 &&)op.id) + { + op.hashId = 0; + } + + NetOp::NetOp(const NetOp &op) + : hashId(op.hashId), id(op.id) + { + } + + NetOp& NetOp::operator=(NetOp&& op) noexcept + { + if (this == &op) + return *this; + + hashId = op.hashId; + id = (Str_8 &&)op.id; + + op.hashId = 0; + + return *this; + } + + NetOp &NetOp::operator=(const NetOp &op) + { + if (this == &op) + return *this; + + hashId = op.hashId; + id = op.id; + + return *this; + } + + Str_8 NetOp::GetId() const + { + return id; + } + + UInt_64 NetOp::GetHashId() const + { + return hashId; + } + + void NetOp::Process(EHC *ehc, NetEnd *endpoint, NetSystem *sys, Serializer &payload) + { + } +} \ No newline at end of file diff --git a/src/io/socket/ehc/NetSystem.cpp b/src/io/socket/ehc/NetSystem.cpp new file mode 100644 index 0000000..520933a --- /dev/null +++ b/src/io/socket/ehc/NetSystem.cpp @@ -0,0 +1,105 @@ +#include "ehs/io/socket/ehc/NetSystem.h" +#include "ehs/io/socket/EHC.h" +#include "ehs/io/socket/ehc/NetEnd.h" +#include "ehs/io/socket/ehc/NetOp.h" + +namespace ehs +{ + NetSystem::~NetSystem() + { + for (UInt_64 i = 0; i < ops.Size(); ++i) + delete ops[i]; + + ops.Clear(); + } + + NetSystem::NetSystem() + : hashId(0) + { + } + + NetSystem::NetSystem(Str_8 id) + : hashId(id.Hash_64()), id((Str_8&&)id) + { + } + + NetSystem::NetSystem(NetSystem&& sys) noexcept + : hashId(sys.hashId), id((Str_8&&)sys.id), ops((Array&&)sys.ops) + { + sys.hashId = 0; + } + + NetSystem::NetSystem(const NetSystem& sys) + : hashId(sys.hashId), id(sys.id) + { + } + + NetSystem& NetSystem::operator=(NetSystem&& sys) noexcept + { + if (this == &sys) + return *this; + + hashId = sys.hashId; + id = (Str_8&&)sys.id; + ops = (Array&&)sys.ops; + + sys.hashId = 0; + + return *this; + } + + NetSystem& NetSystem::operator=(const NetSystem& sys) + { + if (this == &sys) + return *this; + + hashId = sys.hashId; + id = sys.id; + ops = Array(); + + return *this; + } + + Str_8 NetSystem::GetId() const + { + return id; + } + + UInt_64 NetSystem::GetHashId() const + { + return hashId; + } + + bool NetSystem::HasOperation(const UInt_64 hashId) const + { + for (UInt_64 i = 0; i < ops.Size(); ++i) + if (ops[i]->GetHashId() == hashId) + return true; + + return false; + } + + bool NetSystem::AddOperation(NetOp* op) + { + if (HasOperation(op->GetHashId())) + return false; + + ops.Push(op); + + return true; + } + + void NetSystem::Execute(EHC *ehc, NetEnd *endpoint, const UInt_64 hashId, Serializer &payload) + { + for (UInt_64 i = 0; i < ops.Size(); ++i) + { + if (ops[i]->GetHashId() == hashId) + { + ops[i]->Process(ehc, endpoint, this, payload); + return; + } + } + + EHS_LOG_INT(LogType::INFO, 0, "System not found."); + } +} \ No newline at end of file