diff --git a/CMakeLists.txt b/CMakeLists.txt index e38ee59..ebd860c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -121,20 +121,27 @@ set(EHS_SOURCES include/ehs/io/FileMonitor.h include/ehs/io/Window.h + include/ehs/io/socket/Socket.h src/io/socket/Request.cpp include/ehs/io/socket/Request.h src/io/socket/Response.cpp include/ehs/io/socket/Response.h src/io/socket/BaseDNS.cpp include/ehs/io/socket/BaseDNS.h + include/ehs/io/socket/DNS.h src/io/socket/BaseUDP.cpp include/ehs/io/socket/BaseUDP.h + include/ehs/io/socket/UDP.h src/io/socket/BaseTCP.cpp include/ehs/io/socket/BaseTCP.h + include/ehs/io/socket/TCP.h src/io/socket/SSL.cpp include/ehs/io/socket/SSL.h + include/ehs/io/socket/ehc/NetUtils.h + src/io/socket/EHC.cpp include/ehs/io/socket/EHC.h + src/io/socket/ehc/NetFrags.cpp include/ehs/io/socket/ehc/NetFrags.h + src/io/socket/ehc/NetEnd.cpp include/ehs/io/socket/ehc/NetEnd.h + src/io/socket/ehc/NetSys.cpp include/ehs/io/socket/ehc/NetSys.h + src/io/socket/ehc/NetOp.cpp include/ehs/io/socket/ehc/NetOp.h + src/io/socket/rest/Twitch.cpp include/ehs/io/socket/rest/Twitch.h src/io/socket/rest/TwitchChat.cpp include/ehs/io/socket/rest/TwitchChat.h src/io/socket/rest/Spotify.cpp include/ehs/io/socket/rest/Spotify.h - include/ehs/io/socket/Socket.h - include/ehs/io/socket/TCP.h - include/ehs/io/socket/UDP.h - include/ehs/io/socket/DNS.h src/io/audio/Audio.cpp include/ehs/io/audio/Audio.h src/io/audio/BaseAudioDevice.cpp include/ehs/io/audio/BaseAudioDevice.h @@ -178,6 +185,8 @@ set(EHS_SOURCES include/ehs/io/BaseDirectory.h src/io/BaseDirectory.cpp include/ehs/io/Directory.h + include/ehs/io/socket/ehc/NetEnc.h + src/io/socket/ehc/NetEnc.cpp ) if (IS_OS_WINDOWS) diff --git a/include/ehs/io/audio/Audio.h b/include/ehs/io/audio/Audio.h index 48c0b7f..22eff2c 100644 --- a/include/ehs/io/audio/Audio.h +++ b/include/ehs/io/audio/Audio.h @@ -81,10 +81,14 @@ namespace ehs UInt_64 GetSampleCount() const; + UInt_8 GetFrameSize() const; + UInt_64 GetSize() const; float GetLength() const; + Byte* GetFrame(UInt_64 frameIndex) const; + Array FrameAsMono(UInt_64 frameIndex) const; Array FrameAsStereo(UInt_64 frameIndex) const; diff --git a/include/ehs/io/audio/BaseAudioDevice.h b/include/ehs/io/audio/BaseAudioDevice.h index a866ac4..0f3ec82 100644 --- a/include/ehs/io/audio/BaseAudioDevice.h +++ b/include/ehs/io/audio/BaseAudioDevice.h @@ -52,7 +52,9 @@ namespace ehs virtual UInt_64 ReceiveStream(void *data, UInt_64 size); - void BridgeStreams(UInt_64 bufferSize); + void BridgeStreams(BaseAudioDevice *input, UInt_64 frameBufferSize); + + void BridgeStreams(UInt_64 frameBufferSize); AudioDeviceType GetType() const; diff --git a/include/ehs/io/socket/EHC.h b/include/ehs/io/socket/EHC.h new file mode 100644 index 0000000..fbd5fb7 --- /dev/null +++ b/include/ehs/io/socket/EHC.h @@ -0,0 +1,228 @@ +#pragma once + +#include "ehs/io/socket/ehc/NetUtils.h" +#include "ehs/io/socket/ehc/NetEnc.h" +#include "ehs/Serializer.h" +#include "ehs/Vector.h" +#include "ehs/Array.h" +#include "Socket.h" +#include "UDP.h" + +namespace ehs +{ + class NetSys; + class NetEnd; + class EHC; + + typedef bool (*ConnectCb)(EHC *, NetEnd **, Serializer); + typedef void (*ConnectedCb)(EHC *, NetEnd *); + typedef void (*ActiveCb)(EHC *, NetEnd *); + typedef void (*DisconnectCb)(EHC *, NetEnd *, Serializer); + typedef void (*DisconnectedCb)(EHC *, NetEnd *); + typedef void (*TimeoutCb)(EHC *, NetEnd *); + + class EHC + { + private: + friend class NetEnc; + friend class NetEnd; + + static const Version version; + static const UInt_64 internalSys; + static const UInt_64 connectOp; + static const UInt_64 connectedOp; + static const UInt_64 rejectedOp; + static const UInt_64 disconnectOp; + static const UInt_64 disconnectedOp; + static const UInt_64 statusUpdateOp; + static const UInt_64 pingOp; + static const UInt_64 pongOp; + static const UInt_64 latencyOp; + static const UInt_64 receivedOp; + + UDP udp; + Version appVer; + EndDisp disposition; + UInt_64 hashName; + Str_8 name; + bool dropPackets; + Byte* buffer; + UInt_32 bufferSize; + Array encryptions; + Array systems; + UInt_32 maxEndpoints; + UInt_64 lastTSC; + float delta; + float maxTimeout; + float resendRate; + ConnectCb connectCb; + ConnectedCb connectedCb; + ActiveCb activeCb; + DisconnectCb disconnectCb; + DisconnectedCb disconnectedCb; + TimeoutCb timeoutCb; + + protected: + Vector endpoints; + + public: + ~EHC(); + + EHC(); + + EHC(const Version &ver, Str_8 name, UInt_64 maxEndpoints); + + EHC(const Version &ver, Str_8 name); + + EHC(EHC &&sock) noexcept; + + EHC(const EHC &sock); + + EHC &operator=(EHC&& sock) noexcept; + + EHC &operator=(const EHC &sock); + + void Initialize(); + + void Release(); + + void Bind(AddrType newType, const Str_8& newAddress, UInt_16 newPort); + + void Connect(AddrType rType, const Str_8& rAddress, UInt_16 rPort, Serializer data); + + bool Disconnect(EndDisp endDisp, const Char_8 token[64], const Str_8& msg); + + void Broadcast(EndDisp endDisp, Status endStatus, bool deltaLocked, UInt_64 encHashId, + bool ensure, UInt_64 sysHashId, UInt_64 opHashId, + const Serializer &payload); + + void Broadcast(EndDisp endDisp, Status endStatus, bool deltaLocked, const Str_8 &encId, + bool ensure, const Str_8 &sysId, const Str_8 &opId, + const Serializer &payload); + + void Poll(); + + bool IsInitialized() const; + + AddrType GetLocalAddressType() const; + + Str_8 GetLocalAddress() const; + + UInt_16 GetLocalPort() const; + + bool IsBound() const; + + static Version GetVersion(); + + Version GetAppVersion() const; + + EndDisp GetDisposition() const; + + UInt_64 GetHashName() const; + + Str_8 GetName() const; + + void EnableDropPackets(bool enable); + + bool IsDropPacketsEnabled() const; + + bool HasEncryption(UInt_64 encHashId) const; + + bool HasEncryption(const Str_8& encId) const; + + bool AddEncryption(NetEnc *enc); + + NetEnc* GetEncryption(UInt_64 encHashId) const; + + NetEnc* GetEncryption(const Str_8& encId) const; + + bool HasSystem(UInt_64 sysHashId) const; + + bool HasSystem(const Str_8& sysId) const; + + bool AddSystem(NetSys *sys); + + NetSys* GetSystem(UInt_64 sysHashId) const; + + NetSys* GetSystem(const Str_8& sysId) const; + + bool HasEndpoint(EndDisp endDisp, Status endStatus, const Char_8 token[64]) const; + + bool HasEndpoint(EndDisp endDisp, Status endStatus, UInt_64 hashId) const; + + bool HasEndpoint(EndDisp endDisp, Status endStatus, const Str_8 &id) const; + + bool HasEndpoint(EndDisp endDisp, const Char_8 token[64]) const; + + bool HasEndpoint(EndDisp endDisp, UInt_64 hashId) const; + + bool HasEndpoint(EndDisp endDisp, const Str_8 &id) const; + + bool HasEndpoint(const Char_8 token[64]) const; + + bool HasEndpoint(UInt_64 hashId) const; + + bool HasEndpoint(const Str_8 &id) const; + + bool HasEndpoint(const Str_8& rAddress, UInt_16 rPort) const; + + NetEnd *GetEndpoint(EndDisp endDisp, Status endStatus, const Char_8 token[64]) const; + + NetEnd *GetEndpoint(EndDisp endDisp, Status endStatus, UInt_64 hashId) const; + + NetEnd *GetEndpoint(EndDisp endDisp, Status endStatus, const Str_8 &id) const; + + NetEnd *GetEndpoint(EndDisp endDisp, const Char_8 token[64]) const; + + NetEnd *GetEndpoint(EndDisp endDisp, UInt_64 hashId) const; + + NetEnd *GetEndpoint(EndDisp endDisp, const Str_8 &id) const; + + NetEnd *GetEndpoint(const Str_8& rAddress, UInt_16 rPort) const; + + Array GetEndpoints(EndDisp endDisp, Status endStatus); + + Array GetEndpoints(EndDisp endDisp); + + UInt_64 GetEndpointsCount(EndDisp endDisp, Status endStatus); + + UInt_64 GetEndpointsCount(EndDisp endDisp); + + UInt_64 GetMaxEndpoints() const; + + void SetMaxTimeout(float seconds); + + float GetMaxTimeout() const; + + void SetResendRate(float seconds); + + float GetResendRate() const; + + void SetConnectCb(ConnectCb cb); + + void SetConnectedCb(ConnectedCb cb); + + void SetActiveCb(ActiveCb cb); + + void SetDisconnectCb(DisconnectCb cb); + + void SetDisconnectedCb(DisconnectedCb cb); + + void SetTimeoutCb(TimeoutCb cb); + + private: + void GenerateToken(Char_8 in[64]); + + void UpdateQueue(UInt_64 active); + + void UpdateQueue(); + + bool RemoveEndpoint(EndDisp disposition, const Char_8 token[64]); + + bool RemoveEndpoint(const Str_8& address, UInt_16 port); + + bool RemoveEndpoint(const NetEnd* end); + + void PollEndpoints(); + }; +} \ No newline at end of file diff --git a/include/ehs/io/socket/ehc/NetEnc.h b/include/ehs/io/socket/ehc/NetEnc.h new file mode 100644 index 0000000..769cb34 --- /dev/null +++ b/include/ehs/io/socket/ehc/NetEnc.h @@ -0,0 +1,43 @@ +#pragma once + +#include "ehs/Str.h" + +namespace ehs +{ + class EHC; + + class NetEnc + { + private: + friend class EHC; + + EHC *owner; + UInt_64 hashId; + Str_8 id; + + public: + virtual ~NetEnc() = default; + + NetEnc(); + + NetEnc(Str_8 id); + + NetEnc(NetEnc &&enc) noexcept; + + NetEnc(const NetEnc &enc); + + NetEnc &operator=(NetEnc &&enc) noexcept; + + NetEnc &operator=(const NetEnc &enc); + + EHC *GetOwner() const; + + UInt_64 GetHashId() const; + + Str_8 GetId() const; + + virtual void Encrypt(Byte *data, UInt_64 size) const; + + virtual void Decrypt(Byte *data, UInt_64 size) const; + }; +} \ No newline at end of file diff --git a/include/ehs/io/socket/ehc/NetEnd.h b/include/ehs/io/socket/ehc/NetEnd.h new file mode 100644 index 0000000..566675f --- /dev/null +++ b/include/ehs/io/socket/ehc/NetEnd.h @@ -0,0 +1,134 @@ +#pragma once + +#include "NetUtils.h" +#include "NetFrags.h" + +#include "ehs/Str.h" +#include "ehs/Vector.h" +#include "ehs/Serializer.h" +#include "ehs/io/socket/Socket.h" + +namespace ehs +{ + class EHC; + + class NetEnd + { + private: + friend class EHC; + + EHC* owner; + EndDisp disposition; + UInt_64 hashName; + Str_8 name; + Status status; + Architecture arch; + Char_8 token[64]; + UInt_64 nextSendId; + Vector sent; + UInt_64 nextRecvId; + Vector received; + AddrType type; + Str_8 address; + UInt_16 port; + float deltaDuration; + float deltaRate; + float timeout; + float lastPing; + float oldLatency; + float latency; + UInt_64 queueSlot; + + public: + NetEnd(); + + NetEnd(EndDisp disposition, Str_8 id, Architecture arch, AddrType type, Str_8 address, UInt_16 port); + + NetEnd(AddrType type, Str_8 address, UInt_16 port); + + NetEnd(NetEnd &&end) noexcept; + + NetEnd(const NetEnd &end); + + NetEnd &operator=(NetEnd &&end) noexcept; + + NetEnd &operator=(const NetEnd &end); + + EndDisp GetDisposition() const; + + UInt_64 GetHashName() const; + + Str_8 GetName() const; + + Status GetStatus() const; + + Architecture GetArchitecture() const; + + UInt_64 GetNextSendId() const; + + /// Sends data to the remote endpoint. + /// @param [in] deltaLocked Whether or not to match the remote endpoint's delta time to prevent overloading the client. This will drop data if delta time does not match. + /// @param [in] encHashId The hash id of the encryption to use. Can be zero for none. + /// @param [in] ensure Whether or not to ensure the data was received by the remote endpoint. + /// @param [in] sys The system hash id to execute an operation from. + /// @param [in] op The operation hash id in the system to execute. + /// @param [in] payload Additional parameters and data to send to the remote endpoint. + void Send(bool deltaLocked, UInt_64 encHashId, bool ensure, UInt_64 sys, UInt_64 op, const Serializer& payload); + + /// Sends data to the remote endpoint. + /// @param [in] deltaLocked Whether or not to match the remote endpoint's delta time to prevent overloading the client. This will drop data if delta time does not match. + /// @param [in] encId The id of the encryption to use. Can be empty for none. + /// @param [in] ensure Whether or not to ensure the data was received by the remote endpoint. + /// @param [in] sys The system string id to execute an operation from. + /// @param [in] op The operation string id in the system to execute. + /// @param [in] payload Additional parameters and data to send to the remote endpoint. + void Send(bool deltaLocked, const Str_8 &encID, bool ensure, const Str_8& sys, const Str_8& op, const Serializer& payload); + + UInt_64 GetNextRecvId() const; + + Str_8 GetAddress() const; + + UInt_16 GetPort() const; + + void SetDeltaRate(float newDeltaRate); + + float GetDeltaRate() const; + + float GetTimeout() const; + + float GetLastPing() const; + + void SendLatency(); + + float GetLatency() const; + + UInt_64 GetQueueSlot() const; + + private: + void Poll(float delta); + + void SetStatus(Status newStatus); + + void RemoveInsurance(UInt_64 msgId, UInt_64 fragment); + + void AddReceived(const Header& header, const Serializer<>& payload); + + Vector* GetReceived(); + + void Ping(float delta); + + void Pong(float delta); + + void SetLatency(float newLatency); + + void SetQueueSlot(UInt_64 slot); + + NetFrags FragmentData(const Header& header, const Serializer<>& data); + + void Send(const Header& header, const Serializer<>& payload); + + bool SortingNeeded() const; + + void SortReceived(); + }; +} diff --git a/include/ehs/io/socket/ehc/NetFrags.h b/include/ehs/io/socket/ehc/NetFrags.h new file mode 100644 index 0000000..13f3ec4 --- /dev/null +++ b/include/ehs/io/socket/ehc/NetFrags.h @@ -0,0 +1,43 @@ +#pragma once + +#include "NetUtils.h" + +#include + +namespace ehs +{ + class NetFrags + { + private: + Header header; + Serializer* data; + UInt_64 size; + + public: + ~NetFrags(); + + NetFrags(); + + NetFrags(const Header &header, const Serializer &payload); + + NetFrags(const Header &header, UInt_64 size); + + NetFrags(NetFrags &&frags) noexcept; + + NetFrags(const NetFrags &frags); + + NetFrags &operator=(NetFrags &&frags) noexcept; + + NetFrags &operator=(const NetFrags &frags); + + operator Serializer *() const; + + Header GetHeader() const; + + UInt_64 Size() const; + + bool IsComplete() const; + + Packet Combine() const; + }; +} diff --git a/include/ehs/io/socket/ehc/NetOp.h b/include/ehs/io/socket/ehc/NetOp.h new file mode 100644 index 0000000..6ff478c --- /dev/null +++ b/include/ehs/io/socket/ehc/NetOp.h @@ -0,0 +1,42 @@ +#pragma once + +#include "ehs/Str.h" +#include "ehs/Serializer.h" + +namespace ehs +{ + class EHC; + class NetSys; + class NetEnd; + + class NetOp + { + private: + friend class NetSys; + + UInt_64 hashId; + Str_8 id; + + public: + virtual ~NetOp() = default; + + NetOp(); + + NetOp(Str_8 id); + + NetOp(NetOp &&op) noexcept; + + NetOp(const NetOp &op); + + NetOp &operator=(NetOp &&op) noexcept; + + NetOp &operator=(const NetOp &op); + + Str_8 GetId() const; + + UInt_64 GetHashId() const; + + private: + virtual void Process(EHC *ehc, NetEnd *endpoint, NetSys *sys, Serializer &payload); + }; +} \ No newline at end of file diff --git a/include/ehs/io/socket/ehc/NetSys.h b/include/ehs/io/socket/ehc/NetSys.h new file mode 100644 index 0000000..97ef9ed --- /dev/null +++ b/include/ehs/io/socket/ehc/NetSys.h @@ -0,0 +1,48 @@ +#pragma once + +#include "ehs/Str.h" +#include "ehs/Array.h" +#include "ehs/Serializer.h" + +namespace ehs +{ + class EHC; + class NetEnd; + class NetOp; + + class NetSys + { + private: + friend class EHC; + + UInt_64 hashId; + Str_8 id; + Array ops; + + public: + virtual ~NetSys(); + + NetSys(); + + NetSys(Str_8 id); + + NetSys(NetSys &&sys) noexcept; + + NetSys(const NetSys &sys); + + NetSys &operator=(NetSys &&sys) noexcept; + + NetSys &operator=(const NetSys &sys); + + Str_8 GetId() const; + + UInt_64 GetHashId() const; + + bool HasOperation(UInt_64 hashId) const; + + bool AddOperation(NetOp *op); + + private: + void Execute(EHC *ehc, NetEnd *endpoint, UInt_64 hashId, Serializer &payload); + }; +} \ No newline at end of file diff --git a/include/ehs/io/socket/ehc/NetUtils.h b/include/ehs/io/socket/ehc/NetUtils.h new file mode 100644 index 0000000..6f94e5e --- /dev/null +++ b/include/ehs/io/socket/ehc/NetUtils.h @@ -0,0 +1,54 @@ +#pragma once + +#include "ehs/Serializer.h" + +namespace ehs +{ + enum class EndDisp : UInt_8 + { + ENDPOINT, + SERVICE + }; + + enum class Status : UInt_8 + { + ACTIVE, + PENDING, + IN_LOCAL_QUEUE, + IN_REMOTE_QUEUE, + }; + + struct Header + { + UInt_64 encHashId = 0; + UInt_64 id = 0; + UInt_64 fragments = 0; + UInt_64 fragment = 0; + bool ensure = false; + EndDisp disposition = EndDisp::ENDPOINT; + Char_8 token[64] = {}; + UInt_64 system = 0; + UInt_64 op = 0; + }; + + struct Packet + { + Header header; + Serializer payload; + }; + + struct Insurance + { + Header header; + Serializer payload; + float lastResend; + }; +} + +#ifndef EHC_IPV4_PAYLOAD + #define EHC_IPV4_PAYLOAD (EHS_IPV4_UDP_PAYLOAD - (UInt_16)sizeof(Header)) +#endif + +#ifndef EHC_IPV6_PAYLOAD + #define EHC_IPV6_PAYLOAD (EHS_IPV6_UDP_PAYLOAD - (UInt_16)sizeof(Header)) +#endif \ No newline at end of file diff --git a/src/io/audio/Audio.cpp b/src/io/audio/Audio.cpp index d9ed4a2..25e9612 100644 --- a/src/io/audio/Audio.cpp +++ b/src/io/audio/Audio.cpp @@ -290,6 +290,11 @@ namespace ehs return channels * frames; } + UInt_8 Audio::GetFrameSize() const + { + return byteDepth * channels; + } + UInt_64 Audio::GetSize() const { return byteDepth * channels * frames; @@ -300,6 +305,11 @@ namespace ehs return length; } + Byte* Audio::GetFrame(UInt_64 frameIndex) const + { + return &data[frameIndex * GetFrameSize()]; + } + Array Audio::FrameAsMono(const UInt_64 frameIndex) const { Array result(byteDepth); @@ -1743,161 +1753,161 @@ namespace ehs void Audio::SInt_16_to_SInt_8(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - newData[a] = (Byte)((float)((SInt_16*)data)[a] / (float)EHS_SINT_16_MAX * (float)EHS_SINT_8_MAX); + newData[a] = (Byte)((double)((SInt_16*)data)[a] / (double)EHS_SINT_16_MAX * (double)EHS_SINT_8_MAX); - *(SInt_8*)newPeak = (SInt_8)((float)*(SInt_16*)peak / (float)EHS_SINT_16_MAX * (float)EHS_SINT_8_MAX); + *(SInt_8*)newPeak = (SInt_8)((double)*(SInt_16*)peak / (double)EHS_SINT_16_MAX * (double)EHS_SINT_8_MAX); } void Audio::Float_to_SInt_8(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_8*)newData)[a] = (SInt_8)(((float*)data)[a] * (float)EHS_SINT_8_MAX); + ((SInt_8*)newData)[a] = (SInt_8)((double)((float*)data)[a] * (double)EHS_SINT_8_MAX); - *(SInt_8*)newPeak = (SInt_8)(*(float*)peak * (float)EHS_SINT_8_MAX); + *(SInt_8*)newPeak = (SInt_8)((double)(*(float*)peak) * (double)EHS_SINT_8_MAX); } void Audio::SInt_32_to_SInt_8(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - newData[a] = (Byte)((float)((SInt_32*)data)[a] / (float)EHS_SINT_32_MAX * (float)EHS_SINT_8_MAX); + newData[a] = (Byte)((double)((SInt_32*)data)[a] / (double)EHS_SINT_32_MAX * (double)EHS_SINT_8_MAX); - *(SInt_8*)newPeak = (SInt_8)((float)*(SInt_32*)peak / (float)EHS_SINT_32_MAX * (float)EHS_SINT_8_MAX); + *(SInt_8*)newPeak = (SInt_8)((double)*(SInt_32*)peak / (double)EHS_SINT_32_MAX * (double)EHS_SINT_8_MAX); } void Audio::SInt_64_to_SInt_8(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - newData[a] = (Byte)((float)((SInt_64*)data)[a] / (float)EHS_SINT_64_MAX * (float)EHS_SINT_8_MAX); + newData[a] = (Byte)((double)((SInt_64*)data)[a] / (double)EHS_SINT_64_MAX * (double)EHS_SINT_8_MAX); - *(SInt_8*)newPeak = (SInt_8)((float)*(SInt_64*)peak / (float)EHS_SINT_64_MAX * (float)EHS_SINT_8_MAX); + *(SInt_8*)newPeak = (SInt_8)((double)*(SInt_64*)peak / (double)EHS_SINT_64_MAX * (double)EHS_SINT_8_MAX); } void Audio::SInt_8_to_SInt_16(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_16*)newData)[a] = (SInt_16)((float)(SInt_8)data[a] / (float)EHS_SINT_8_MAX * (float)EHS_SINT_16_MAX); + ((SInt_16*)newData)[a] = (SInt_16)((double)(SInt_8)data[a] / (double)EHS_SINT_8_MAX * (double)EHS_SINT_16_MAX); - *(SInt_16*)newPeak = (SInt_16)((float)*(SInt_8*)peak / (float)EHS_SINT_8_MAX * (float)EHS_SINT_16_MAX); + *(SInt_16*)newPeak = (SInt_16)((double)*(SInt_8*)peak / (double)EHS_SINT_8_MAX * (double)EHS_SINT_16_MAX); } void Audio::Float_to_SInt_16(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_16*)newData)[a] = (SInt_16)(((float*)data)[a] * (float)EHS_SINT_16_MAX); + ((SInt_16*)newData)[a] = (SInt_16)((double)((float*)data)[a] * (double)EHS_SINT_16_MAX); - *(SInt_16*)newPeak = (SInt_16)(*(float*)peak * (float)EHS_SINT_16_MAX); + *(SInt_16*)newPeak = (SInt_16)((double)(*(float*)peak) * (double)EHS_SINT_16_MAX); } void Audio::SInt_32_to_SInt_16(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_16*)newData)[a] = (SInt_16)((float)((SInt_32*)data)[a] / (float)EHS_SINT_32_MAX * (float)EHS_SINT_16_MAX); + ((SInt_16*)newData)[a] = (SInt_16)((double)((SInt_32*)data)[a] / (double)EHS_SINT_32_MAX * (double)EHS_SINT_16_MAX); - *(SInt_16*)newPeak = (SInt_16)((float)*(SInt_32*)peak / (float)EHS_SINT_32_MAX * (float)EHS_SINT_16_MAX); + *(SInt_16*)newPeak = (SInt_16)((double)*(SInt_32*)peak / (double)EHS_SINT_32_MAX * (double)EHS_SINT_16_MAX); } void Audio::SInt_64_to_SInt_16(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_16*)newData)[a] = (SInt_16)((float)((SInt_64*)data)[a] / (float)EHS_SINT_64_MAX * (float)EHS_SINT_16_MAX); + ((SInt_16*)newData)[a] = (SInt_16)((double)((SInt_64*)data)[a] / (double)EHS_SINT_64_MAX * (double)EHS_SINT_16_MAX); - *(SInt_16*)newPeak = (SInt_16)((float)*(SInt_64*)peak / (float)EHS_SINT_64_MAX * (float)EHS_SINT_16_MAX); + *(SInt_16*)newPeak = (SInt_16)((double)*(SInt_64*)peak / (double)EHS_SINT_64_MAX * (double)EHS_SINT_16_MAX); } void Audio::SInt_8_to_Float(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((float*)newData)[a] = (float)(((SInt_8*)data)[a]) / (float)EHS_SINT_8_MAX; + ((float*)newData)[a] = (float)((double)(((SInt_8*)data)[a]) / (double)EHS_SINT_8_MAX); - *(float*)newPeak = (float)*(SInt_8*)peak / (float)EHS_SINT_8_MAX; + *(float*)newPeak = (float)((double)*(SInt_8*)peak / (double)EHS_SINT_8_MAX); } void Audio::SInt_16_to_Float(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((float*)newData)[a] = (float)(((SInt_16*)data)[a]) / (float)EHS_SINT_16_MAX; + ((float*)newData)[a] = (float)((double)(((SInt_16*)data)[a]) / (double)EHS_SINT_16_MAX); - *(float*)newPeak = (float)*(SInt_16*)peak / (float)EHS_SINT_16_MAX; + *(float*)newPeak = (float)((double)*(SInt_16*)peak / (double)EHS_SINT_16_MAX); } void Audio::SInt_32_to_Float(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((float*)newData)[a] = (float)(((SInt_32*)data)[a]) / (float)EHS_SINT_32_MAX; + ((float*)newData)[a] = (float)((double)(((SInt_32*)data)[a]) / (double)EHS_SINT_32_MAX); - *(float*)newPeak = (float)*(SInt_32*)peak / (float)EHS_SINT_32_MAX; + *(float*)newPeak = (float)((double)*(SInt_32*)peak / (double)EHS_SINT_32_MAX); } void Audio::SInt_64_to_Float(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((float*)newData)[a] = (float)(((SInt_64*)data)[a]) / (float)EHS_SINT_64_MAX; + ((float*)newData)[a] = (float)((double)(((SInt_64*)data)[a]) / (double)EHS_SINT_64_MAX); - *(float*)newPeak = (float)*(SInt_64*)peak / (float)EHS_SINT_64_MAX; + *(float*)newPeak = (float)((double)*(SInt_64*)peak / (double)EHS_SINT_64_MAX); } void Audio::SInt_8_to_SInt_32(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_32*)newData)[a] = (SInt_32)((float)(SInt_8)data[a] / (float)EHS_SINT_8_MAX * (float)EHS_SINT_32_MAX); + ((SInt_32*)newData)[a] = (SInt_32)((double)(SInt_8)data[a] / (double)EHS_SINT_8_MAX * (double)EHS_SINT_32_MAX); - *(SInt_32*)newPeak = (SInt_32)((float)*(SInt_8*)peak / (float)EHS_SINT_8_MAX * (float)EHS_SINT_32_MAX); + *(SInt_32*)newPeak = (SInt_32)((double)*(SInt_8*)peak / (double)EHS_SINT_8_MAX * (double)EHS_SINT_32_MAX); } void Audio::SInt_16_to_SInt_32(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_32*)newData)[a] = (SInt_32)((float)((SInt_16*)data)[a] / (float)EHS_SINT_16_MAX * (float)EHS_SINT_32_MAX); + ((SInt_32*)newData)[a] = (SInt_32)((double)((SInt_16*)data)[a] / (double)EHS_SINT_16_MAX * (double)EHS_SINT_32_MAX); - *(SInt_32*)newPeak = (SInt_32)((float)*(SInt_16*)peak / (float)EHS_SINT_16_MAX * (float)EHS_SINT_32_MAX); + *(SInt_32*)newPeak = (SInt_32)((double)*(SInt_16*)peak / (double)EHS_SINT_16_MAX * (double)EHS_SINT_32_MAX); } void Audio::Float_to_SInt_32(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_32*)newData)[a] = (SInt_32)(((float*)data)[a] * (float)EHS_SINT_32_MAX); + ((SInt_32*)newData)[a] = (SInt_32)(((double*)data)[a] * (double)EHS_SINT_32_MAX); - *(SInt_32*)newPeak = (SInt_32)(*(float*)peak * (float)EHS_SINT_32_MAX); + *(SInt_32*)newPeak = (SInt_32)(*(double*)peak * (double)EHS_SINT_32_MAX); } void Audio::SInt_64_to_SInt_32(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_32*)newData)[a] = (SInt_32)((float)((SInt_64*)data)[a] / (float)EHS_SINT_64_MAX * (float)EHS_SINT_32_MAX); + ((SInt_32*)newData)[a] = (SInt_32)((double)((SInt_64*)data)[a] / (double)EHS_SINT_64_MAX * (double)EHS_SINT_32_MAX); - *(SInt_32*)newPeak = (SInt_32)((float)*(SInt_64*)peak / (float)EHS_SINT_64_MAX * (float)EHS_SINT_32_MAX); + *(SInt_32*)newPeak = (SInt_32)((double)*(SInt_64*)peak / (double)EHS_SINT_64_MAX * (double)EHS_SINT_32_MAX); } void Audio::SInt_8_to_SInt_64(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_64*)newData)[a] = (SInt_64)((float)(SInt_8)data[a] / (float)EHS_SINT_8_MAX * (float)EHS_SINT_64_MAX); + ((SInt_64*)newData)[a] = (SInt_64)((double)(SInt_8)data[a] / (double)EHS_SINT_8_MAX * (double)EHS_SINT_64_MAX); - *(SInt_64*)newPeak = (SInt_64)((float)*(SInt_8*)peak / (float)EHS_SINT_8_MAX * (float)EHS_SINT_64_MAX); + *(SInt_64*)newPeak = (SInt_64)((double)*(SInt_8*)peak / (double)EHS_SINT_8_MAX * (double)EHS_SINT_64_MAX); } void Audio::SInt_16_to_SInt_64(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_64*)newData)[a] = (SInt_64)((float)((SInt_16*)data)[a] / (float)EHS_SINT_16_MAX * (float)EHS_SINT_64_MAX); + ((SInt_64*)newData)[a] = (SInt_64)((double)((SInt_16*)data)[a] / (double)EHS_SINT_16_MAX * (double)EHS_SINT_64_MAX); - *(SInt_64*)newPeak = (SInt_64)((float)*(SInt_16*)peak / (float)EHS_SINT_16_MAX * (float)EHS_SINT_64_MAX); + *(SInt_64*)newPeak = (SInt_64)((double)*(SInt_16*)peak / (double)EHS_SINT_16_MAX * (double)EHS_SINT_64_MAX); } void Audio::Float_to_SInt_64(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_64*)newData)[a] = (SInt_64)(((float*)data)[a] * (float)EHS_SINT_64_MAX); + ((SInt_64*)newData)[a] = (SInt_64)(((double*)data)[a] * (double)EHS_SINT_64_MAX); - *(SInt_64*)newPeak = (SInt_64)(*(float*)peak * (float)EHS_SINT_64_MAX); + *(SInt_64*)newPeak = (SInt_64)(*(double*)peak * (double)EHS_SINT_64_MAX); } void Audio::SInt_32_to_SInt_64(Byte* newData, Byte* newPeak) const { for (UInt_64 a = 0; a < GetSampleCount(); ++a) - ((SInt_64*)newData)[a] = (SInt_64)((float)((SInt_32*)data)[a] / (float)EHS_SINT_32_MAX * (float)EHS_SINT_64_MAX); + ((SInt_64*)newData)[a] = (SInt_64)((double)((SInt_32*)data)[a] / (double)EHS_SINT_32_MAX * (double)EHS_SINT_64_MAX); - *(SInt_64*)newPeak = (SInt_64)((float)*(SInt_32*)peak / (float)EHS_SINT_32_MAX * (float)EHS_SINT_64_MAX); + *(SInt_64*)newPeak = (SInt_64)((double)*(SInt_32*)peak / (double)EHS_SINT_32_MAX * (double)EHS_SINT_64_MAX); } bool EncodeEHA(const ehs::AudioCodec* const codec, ehs::Serializer& out, const ehs::Audio* const in) diff --git a/src/io/audio/AudioDevice_W32.cpp b/src/io/audio/AudioDevice_W32.cpp index b26f3d8..293811e 100644 --- a/src/io/audio/AudioDevice_W32.cpp +++ b/src/io/audio/AudioDevice_W32.cpp @@ -341,6 +341,8 @@ namespace ehs return 0; } + Util::Copy(buffer, data, frameSize * byteDepth * channels); + r = playbackClient->ReleaseBuffer(frameSize, 0); if (r == AUDCLNT_E_DEVICE_INVALIDATED) { diff --git a/src/io/audio/BaseAudioDevice.cpp b/src/io/audio/BaseAudioDevice.cpp index 7eb2335..be0be64 100644 --- a/src/io/audio/BaseAudioDevice.cpp +++ b/src/io/audio/BaseAudioDevice.cpp @@ -1,4 +1,5 @@ #include "ehs/io/audio/BaseAudioDevice.h" +#include "ehs/log.h" namespace ehs { @@ -49,19 +50,48 @@ namespace ehs return 0; } - void BaseAudioDevice::BridgeStreams(const UInt_64 bufferSize) + void BaseAudioDevice::BridgeStreams(BaseAudioDevice* input, UInt_64 frameBufferSize) { - Byte* buffer = new Byte[bufferSize]; + if (type != AudioDeviceType::OUTPUT) + { + EHS_LOG_INT(LogType::WARN, 0, "The current audio device object is not an output device."); + return; + } - UInt_64 result = ReceiveStream(buffer, bufferSize); - while (result < bufferSize) - result += ReceiveStream(&buffer[result], bufferSize - result); + if (input->type != AudioDeviceType::INPUT) + { + EHS_LOG_INT(LogType::WARN, 1, "The provided audio device is not an input device."); + return; + } + + Byte* buffer = new Byte[frameBufferSize * GetFrameSize()]; + + UInt_64 result = 0; + while (result < frameBufferSize) + result += input->ReceiveStream(&buffer[result * GetFrameSize()], frameBufferSize - result); - result -= SendStream(buffer, result); while (result) - result -= SendStream(&buffer[bufferSize - result], result); + result -= SendStream(&buffer[(frameBufferSize - result) * GetFrameSize()], result); delete[] buffer; + + EHS_LOG_SUCCESS(); + } + + void BaseAudioDevice::BridgeStreams(const UInt_64 frameBufferSize) + { + Byte* buffer = new Byte[frameBufferSize * GetFrameSize()]; + + UInt_64 result = ReceiveStream(buffer, frameBufferSize); + while (result < frameBufferSize) + result += ReceiveStream(&buffer[result * GetFrameSize()], frameBufferSize - result); + + while (result) + result -= SendStream(&buffer[(frameBufferSize - result) * GetFrameSize()], result); + + delete[] buffer; + + EHS_LOG_SUCCESS(); } AudioDeviceType BaseAudioDevice::GetType() const diff --git a/src/io/socket/EHC.cpp b/src/io/socket/EHC.cpp new file mode 100644 index 0000000..2b7197d --- /dev/null +++ b/src/io/socket/EHC.cpp @@ -0,0 +1,1208 @@ +#include "ehs/io/socket/EHC.h" +#include "ehs/io/socket/ehc/NetEnc.h" +#include "ehs/io/socket/ehc/NetSys.h" +#include "ehs/io/socket/ehc/NetEnd.h" +#include "ehs/Log.h" +#include "ehs/PRNG.h" + + +namespace ehs +{ + const Version EHC::version(1, 0, 0); + const UInt_64 EHC::internalSys = Str_8::Hash_64("Internal"); + const UInt_64 EHC::connectOp = Str_8::Hash_64("Connect"); + const UInt_64 EHC::connectedOp = Str_8::Hash_64("Connected"); + const UInt_64 EHC::rejectedOp = Str_8::Hash_64("Rejected"); + const UInt_64 EHC::disconnectOp = Str_8::Hash_64("Disconnect"); + const UInt_64 EHC::disconnectedOp = Str_8::Hash_64("Disconnected"); + const UInt_64 EHC::statusUpdateOp = Str_8::Hash_64("StatusUpdate"); + const UInt_64 EHC::pingOp = Str_8::Hash_64("Ping"); + const UInt_64 EHC::pongOp = Str_8::Hash_64("Pong"); + const UInt_64 EHC::latencyOp = Str_8::Hash_64("Latency"); + const UInt_64 EHC::receivedOp = Str_8::Hash_64("Received"); + + EHC::~EHC() + { + Release(); + } + + EHC::EHC() + : udp(AddrType::IPV6), disposition(EndDisp::ENDPOINT), hashName(0), dropPackets(false), buffer(nullptr), bufferSize(0), + maxEndpoints(0), lastTSC(0), delta(0.0f), maxTimeout(5.0f), resendRate(0.5f), connectCb(nullptr), + connectedCb(nullptr), activeCb(nullptr), disconnectCb(nullptr), disconnectedCb(nullptr), timeoutCb(nullptr) + { + } + + EHC::EHC(const Version &ver, Str_8 name, const UInt_64 maxEndpoints) + : udp(AddrType::IPV6), appVer(ver), disposition(EndDisp::ENDPOINT), hashName(name.Hash_64()), name((Str_8&&)name), + dropPackets(false), buffer(nullptr), bufferSize(0), maxEndpoints(maxEndpoints), lastTSC(CPU::GetTSC()), + delta(0.0f), maxTimeout(5.0f), resendRate(0.5f), connectCb(nullptr), connectedCb(nullptr), activeCb(nullptr), + disconnectCb(nullptr), disconnectedCb(nullptr), timeoutCb(nullptr) + { + Initialize(); + } + + EHC::EHC(const Version &ver, Str_8 name) + : udp(AddrType::IPV6), appVer(ver), disposition(EndDisp::ENDPOINT), hashName(name.Hash_64()), name((Str_8&&)name), + dropPackets(false), buffer(nullptr), bufferSize(0), maxEndpoints(0), lastTSC(CPU::GetTSC()), + delta(0.0f), maxTimeout(5.0f), resendRate(0.5f), connectCb(nullptr), connectedCb(nullptr), activeCb(nullptr), + disconnectCb(nullptr), disconnectedCb(nullptr), timeoutCb(nullptr) + { + Initialize(); + } + + EHC::EHC(EHC &&sock) noexcept + : udp((UDP&&)sock.udp), appVer(sock.appVer), disposition(sock.disposition), hashName(sock.hashName), + name((Str_8&&)sock.name), dropPackets(sock.dropPackets), buffer(sock.buffer), bufferSize(sock.bufferSize), + encryptions((Array &&)sock.encryptions), systems((Array &&)sock.systems), + maxEndpoints(sock.maxEndpoints), lastTSC(sock.lastTSC), delta(sock.delta), maxTimeout(sock.maxTimeout), + resendRate(sock.resendRate), connectCb(sock.connectCb), connectedCb(sock.connectedCb), activeCb(sock.activeCb), + disconnectCb(sock.disconnectCb), disconnectedCb(sock.disconnectedCb), timeoutCb(sock.timeoutCb), + endpoints((Vector &&)sock.endpoints) + { + for (UInt_64 i = 0; i < encryptions.Size(); ++i) + endpoints[i]->owner = this; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + endpoints[i]->owner = this; + + sock.appVer = {}; + sock.disposition = EndDisp::ENDPOINT; + sock.hashName = 0; + sock.dropPackets = false; + sock.buffer = nullptr; + sock.bufferSize = 0; + sock.maxEndpoints = 0; + sock.lastTSC = 0; + sock.delta = 0.0f; + sock.maxTimeout = 5.0f; + sock.resendRate = 0.5f; + sock.connectCb = nullptr; + sock.connectedCb = nullptr; + sock.activeCb = nullptr; + sock.disconnectCb = nullptr; + sock.disconnectedCb = nullptr; + sock.timeoutCb = nullptr; + } + + EHC::EHC(const EHC &sock) + : udp(sock.udp), appVer(sock.appVer), disposition(sock.disposition), hashName(sock.hashName), name(sock.name), + dropPackets(sock.dropPackets), buffer(nullptr), bufferSize(0), maxEndpoints(sock.maxEndpoints), + lastTSC(CPU::GetTSC()), delta(0.0f), maxTimeout(sock.maxTimeout), resendRate(sock.resendRate), + connectCb(nullptr), connectedCb(nullptr), activeCb(nullptr), disconnectCb(nullptr), disconnectedCb(nullptr), + timeoutCb(nullptr) + { + Initialize(); + } + + EHC &EHC::operator=(EHC &&sock) noexcept + { + if (this == &sock) + return *this; + + udp = (UDP&&)sock.udp; + appVer = sock.appVer; + disposition = sock.disposition; + hashName = sock.hashName; + name = (Str_8&&)sock.name; + dropPackets = sock.dropPackets; + + delete[] buffer; + buffer = sock.buffer; + + bufferSize = sock.bufferSize; + + encryptions = (Array &&)sock.encryptions; + for (UInt_64 i = 0; i < encryptions.Size(); ++i) + encryptions[i]->owner = this; + + systems = (Array &&)sock.systems; + maxEndpoints = sock.maxEndpoints; + lastTSC = sock.lastTSC; + delta = sock.delta; + maxTimeout = sock.maxTimeout; + resendRate = sock.resendRate; + connectCb = sock.connectCb; + connectedCb = sock.connectedCb; + activeCb = sock.activeCb; + disconnectCb = sock.disconnectCb; + disconnectedCb = sock.disconnectedCb; + timeoutCb = sock.timeoutCb; + + endpoints = (Vector&&)sock.endpoints; + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + endpoints[i]->owner = this; + + sock.appVer = {}; + sock.disposition = EndDisp::ENDPOINT; + sock.hashName = 0; + sock.dropPackets = false; + sock.buffer = nullptr; + sock.bufferSize = 0; + sock.maxEndpoints = 0; + sock.lastTSC = 0; + sock.delta = 0.0f; + sock.maxTimeout = 5.0f; + sock.resendRate = 0.5f; + sock.connectCb = nullptr; + sock.connectedCb = nullptr; + sock.activeCb = nullptr; + sock.disconnectCb = nullptr; + sock.disconnectedCb = nullptr; + sock.timeoutCb = nullptr; + + return *this; + } + + EHC &EHC::operator=(const EHC &sock) + { + if (this == &sock) + return *this; + + udp = sock.udp; + appVer = sock.appVer; + disposition = sock.disposition; + hashName = sock.hashName; + name = sock.name; + dropPackets = sock.dropPackets; + + delete[] buffer; + buffer = nullptr; + + bufferSize = 0; + systems = Array(); + maxEndpoints = sock.maxEndpoints; + lastTSC = 0; + delta = 0.0f; + maxTimeout = sock.maxTimeout; + resendRate = sock.resendRate; + connectCb = nullptr; + connectedCb = nullptr; + activeCb = nullptr; + disconnectCb = nullptr; + disconnectedCb = nullptr; + timeoutCb = nullptr; + endpoints = Vector(); + + Initialize(); + + return *this; + } + + void EHC::Initialize() + { + if (!udp.IsValid()) + return; + + udp.SetBlocking(false); + + if (udp.GetLocalAddressType() == AddrType::IPV4) + { + buffer = new Byte[EHS_IPV4_UDP_PAYLOAD]; + bufferSize = EHS_IPV4_UDP_PAYLOAD; + } + else if (udp.GetLocalAddressType() == AddrType::IPV6) + { + buffer = new Byte[EHS_IPV6_UDP_PAYLOAD]; + bufferSize = EHS_IPV6_UDP_PAYLOAD; + } + } + + void EHC::Release() + { + if (!udp.IsValid()) + return; + + disposition = EndDisp::ENDPOINT; + + delete[] buffer; + buffer = nullptr; + bufferSize = 0; + + Serializer payload(Endianness::LE); + payload.Write(0); + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetStatus() != Status::PENDING) + endpoints[i]->Send(false, true, false, internalSys, disconnectOp, payload); + + delete endpoints[i]; + } + endpoints.Clear(); + + for (UInt_64 i = 0; i < systems.Size(); ++i) + delete systems[i]; + + systems.Clear(); + + udp.Release(); + } + + void EHC::Bind(AddrType newType, const Str_8& newAddress, const UInt_16 newPort) + { + disposition = EndDisp::SERVICE; + + udp.Bind(newType, newAddress, newPort); + } + + void EHC::Connect(AddrType rType, const Str_8& rAddress, const UInt_16 rPort, Serializer data) + { + if (!udp.IsValid()) + return; + + Serializer payload(Endianness::LE); + payload.WriteStr(name); + payload.Write(CPU::GetArchitecture()); + payload.WriteVersion(version); + payload.WriteVersion(appVer); + payload.WriteSer((Serializer&&)data); + + NetEnd* end = new NetEnd(rType, rAddress, rPort); + end->owner = this; + end->Send(false, true, false, "Internal", "Connect", payload); + + endpoints.Push(end); + } + + bool EHC::Disconnect(const EndDisp endDisp, const Char_8 token[64], const Str_8& msg) + { + if (!udp.IsValid()) + return false; + + NetEnd* end = GetEndpoint(endDisp, token); + if (!end) + return false; + + Serializer<> payload(Endianness::LE); + payload.WriteStr(msg); + + end->Send(false, true, false, internalSys, disconnectOp, payload); + + return true; + } + + void EHC::Broadcast(const EndDisp endDisp, const Status endStatus, const bool deltaLocked, const UInt_64 encHashId, + const bool ensure, const UInt_64 sysHashId, const UInt_64 opHashId, const Serializer &payload) + { + if (!udp.IsValid()) + return; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() == endStatus) + endpoints[i]->Send(deltaLocked, encHashId, ensure, sysHashId, opHashId, payload); + } + } + + void EHC::Broadcast(const EndDisp endDisp, const Status endStatus, const bool deltaLocked, const Str_8 &encId, + const bool ensure, const Str_8 &sysId, const Str_8 &opId, + const Serializer &payload) + { + Broadcast(endDisp, endStatus, deltaLocked, encId.Hash_64(), ensure, sysId.Hash_64(), opId.Hash_64(), payload); + } + + void EHC::Poll() + { + if (!udp.IsValid()) + return; + + UInt_64 newTSC = CPU::GetTSC(); + delta = (float)(newTSC - lastTSC) / (float)CPU::GetTSC_Freq(); + lastTSC = newTSC; + + AddrType rType = AddrType::IPV6; + Str_8 rAddress; + UInt_16 rPort = 0; + + UInt_16 received; + + while ((received = udp.Receive(&rType, &rAddress, &rPort, buffer, bufferSize))) + { + Serializer<> payload(Endianness::LE, buffer, received); + + UInt_64 encHashId = payload.Read(); + if (encHashId) + { + NetEnc* enc = GetEncryption(encHashId); + if (!enc) + continue; + + enc->Encrypt(&payload[payload.GetOffset()], payload.Size() - payload.GetOffset()); + } + + payload.SetOffset(0); + + Header header = payload.Read
(); + + if (!header.ensure && !header.token[0] && header.system == internalSys && header.op == connectOp) + { + Str_8 endId = payload.ReadStr(); + Architecture rArch = payload.Read(); + + NetEnd* end = new NetEnd(header.disposition, (Str_8&&)endId, rArch, rType, rAddress, rPort); + end->owner = this; + end->SetStatus(Status::PENDING); + + Serializer sPayload(Endianness::LE); + + Version rVer = payload.ReadVersion(); + if (rVer != version) + { + sPayload.WriteStr("Your Event Horizon Socket Layer version " + + Str_8::FromNum(rVer.major) + "." + Str_8::FromNum(rVer.minor) + "." + Str_8::FromNum(rVer.patch) + + " does not match remote endpoint version " + + Str_8::FromNum(version.major) + "." + Str_8::FromNum(version.minor) + "." + Str_8::FromNum(version.patch) + + ". Connection rejected."); + + end->Send(false, true, false, internalSys, rejectedOp, sPayload); + continue; + } + + Version rAppVer = payload.ReadVersion(); + if (rAppVer != appVer) + { + sPayload.WriteStr("Your application version " + + Str_8::FromNum(rAppVer.major) + "." + Str_8::FromNum(rAppVer.minor) + "." + Str_8::FromNum(rAppVer.patch) + + " does not match remote endpoint application version " + + Str_8::FromNum(appVer.major) + "." + Str_8::FromNum(appVer.minor) + "." + Str_8::FromNum(appVer.patch) + + ". Connection rejected."); + + end->Send(false, true, false, internalSys, rejectedOp, sPayload); + continue; + } + + GenerateToken(end->token); + + if (connectCb && !connectCb(this, &end, {Endianness::LE, &payload[payload.GetOffset()], payload.Size() - payload.GetOffset()})) + { + sPayload.WriteStr("Connection rejected."); + end->Send(false, true, false, internalSys, rejectedOp, sPayload); + continue; + } + + endpoints.Push(end); + + sPayload.Write(CPU::GetArchitecture()); + sPayload.WriteStr(name); + + UInt_64 active = GetEndpointsCount(EndDisp::ENDPOINT, Status::ACTIVE); + + if (maxEndpoints && active >= maxEndpoints) + { + end->SetStatus(Status::IN_LOCAL_QUEUE); + + UpdateQueue(active); + + sPayload.Write(Status::IN_REMOTE_QUEUE); + sPayload.Write(end->GetQueueSlot()); + } + else + { + end->SetStatus(Status::ACTIVE); + + if (activeCb) + activeCb(this, end); + + sPayload.Write(Status::ACTIVE); + sPayload.Write(0); + } + + end->Send(false, true, false, internalSys, connectedOp, sPayload); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == connectedOp) + { + NetEnd* end = GetEndpoint(rAddress, rPort); + if (!end || end->GetStatus() != Status::PENDING) + continue; + + Str_8 endId = payload.ReadStr(); + Architecture arch = payload.Read(); + + *end = NetEnd(header.disposition, (Str_8&&)endId, arch, rType, rAddress, rPort); + end->owner = this; + + Util::Copy(end->token, header.token, 64); + + end->SetStatus(payload.Read()); + end->SetQueueSlot(payload.Read()); + + if (connectedCb) + connectedCb(this, end); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == rejectedOp) + { + if (!RemoveEndpoint(rAddress, rPort)) + continue; + + Str_8 msg = payload.ReadStr(); + if (msg.Size()) + EHS_LOG_INT(LogType::INFO, 3, msg); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == disconnectOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + end->Send(false, true, false, internalSys, disconnectedOp, {}); + + if (disconnectCb) + disconnectCb(this, end, {Endianness::LE, &payload[payload.GetOffset()], payload.Size() - payload.GetOffset()}); + + RemoveEndpoint(header.disposition, end->token); + + UpdateQueue(); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == disconnectedOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + if (disconnectedCb) + disconnectedCb(this, end); + + RemoveEndpoint(end); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == statusUpdateOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + Status newStatus = payload.Read(); + UInt_64 newSlot = payload.Read(); + + if (end->GetStatus() == Status::ACTIVE) + { + if (activeCb) + activeCb(this, end); + + //EHS_LOG_INT(LogType::INFO, 5, "Your connection status to " + end->GetId() + " has now become active."); + } + else if (end->GetStatus() == Status::IN_REMOTE_QUEUE && newStatus == Status::IN_REMOTE_QUEUE) + { + //EHS_LOG_INT(LogType::INFO, 5, "Your queue slot for " + end->GetId() + " is now " + newSlot + "."); + } + + end->SetStatus(newStatus); + end->SetQueueSlot(newSlot); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == pingOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + end->SetDeltaRate(payload.Read()); + end->Pong(delta); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == pongOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + end->SetDeltaRate(payload.Read()); + end->SendLatency(); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == latencyOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + end->SetLatency(payload.Read()); + } + else if (!header.ensure && header.token[0] && header.system == internalSys && header.op == receivedOp) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + const UInt_64 msgId = payload.Read(); + const UInt_64 fragment = payload.Read(); + + end->RemoveInsurance(msgId, fragment); + } + else if (header.token[0]) + { + NetEnd* end = GetEndpoint(header.disposition, header.token); + if (!end) + continue; + + if (dropPackets && !header.ensure && header.id < end->GetNextRecvId()) + { + EHS_LOG_INT(LogType::INFO, 6, "Old packet intentionally dropped."); + continue; + } + + if (header.ensure) + { + Serializer sPayload(Endianness::LE); + sPayload.Write(header.id); + sPayload.Write(header.fragment); + + end->Send(false, true, false, internalSys, receivedOp, sPayload); + } + + end->AddReceived( + header, + Serializer<>(Endianness::LE, &payload[payload.GetOffset()], payload.Size() - payload.GetOffset()) + ); + } + else + { + EHS_LOG_INT(LogType::INFO, 7, "Corrupted packet."); + } + } + + PollEndpoints(); + } + + bool EHC::IsInitialized() const + { + return udp.IsValid(); + } + + AddrType EHC::GetLocalAddressType() const + { + return udp.GetLocalAddressType(); + } + + Str_8 EHC::GetLocalAddress() const + { + return udp.GetLocalAddress(); + } + + UInt_16 EHC::GetLocalPort() const + { + return udp.GetLocalPort(); + } + + bool EHC::IsBound() const + { + return udp.IsBound(); + } + + Version EHC::GetVersion() + { + return version; + } + + Version EHC::GetAppVersion() const + { + return appVer; + } + + EndDisp EHC::GetDisposition() const + { + return disposition; + } + + UInt_64 EHC::GetHashName() const + { + return hashName; + } + + Str_8 EHC::GetName() const + { + return name; + } + + void EHC::EnableDropPackets(const bool enable) + { + dropPackets = enable; + } + + bool EHC::IsDropPacketsEnabled() const + { + return dropPackets; + } + + bool EHC::HasEncryption(UInt_64 encHashId) const + { + for (UInt_64 i = 0; i < encryptions.Size(); ++i) + if (encryptions[i]->GetHashId() == encHashId) + return true; + + return false; + } + + bool EHC::HasEncryption(const Str_8& encId) const + { + return HasEncryption(encId.Hash_64()); + } + + bool EHC::AddEncryption(NetEnc* enc) + { + if (HasEncryption(enc->GetHashId())) + return false; + + enc->owner = this; + encryptions.Push(enc); + + return true; + } + + NetEnc* EHC::GetEncryption(UInt_64 encHashId) const + { + for (UInt_64 i = 0; i < encryptions.Size(); ++i) + if (encryptions[i]->GetHashId() == encHashId) + return encryptions[i]; + + return nullptr; + } + + NetEnc* EHC::GetEncryption(const Str_8& encId) const + { + return GetEncryption(encId.Hash_64()); + } + + bool EHC::HasSystem(const UInt_64 sysHashId) const + { + if (internalSys == sysHashId) + return true; + + for (UInt_64 i = 0; i < systems.Size(); ++i) + if (systems[i]->GetHashId() == sysHashId) + return true; + + return false; + } + + bool EHC::HasSystem(const Str_8& sysId) const + { + return HasSystem(sysId.Hash_64()); + } + + bool EHC::AddSystem(NetSys *sys) + { + if (HasSystem(sys->GetHashId())) + return false; + + systems.Push(sys); + + return true; + } + + NetSys* EHC::GetSystem(const UInt_64 sysHashId) const + { + for (UInt_64 i = 0; i < systems.Size(); ++i) + if (systems[i]->GetHashId() == sysHashId) + return systems[i]; + + return nullptr; + } + + NetSys* EHC::GetSystem(const Str_8& sysId) const + { + return GetSystem(sysId.Hash_64()); + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Status endStatus, const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() != endStatus) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + return true; + } + + return false; + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Status endStatus, const UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() != endStatus) + continue; + + if (endpoints[i]->GetHashName() == hashName) + return true; + } + + return false; + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Status endStatus, const Str_8 &id) const + { + return HasEndpoint(endDisp, endStatus, id.Hash_64()); + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + return true; + } + + return false; + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetHashName() == hashName) + return true; + } + + return false; + } + + bool EHC::HasEndpoint(const EndDisp endDisp, const Str_8 &id) const + { + return HasEndpoint(endDisp, id.Hash_64()); + } + + bool EHC::HasEndpoint(const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (Util::Compare(endpoints[i]->token, token, 64)) + return true; + + return false; + } + + bool EHC::HasEndpoint(UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetHashName() == hashName) + return true; + + return false; + } + + bool EHC::HasEndpoint(const Str_8 &id) const + { + return HasEndpoint(id.Hash_64()); + } + + bool EHC::HasEndpoint(const Str_8& rAddress, const UInt_16 rPort) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetAddress() == rAddress && endpoints[i]->GetPort() == rPort) + return true; + + return false; + } + + NetEnd* EHC::GetEndpoint(const EndDisp endDisp, const Status endStatus, const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() != endStatus) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + return endpoints[i]; + } + + return nullptr; + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const Status endStatus, const UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() != endStatus) + continue; + + if (endpoints[i]->GetHashName() == hashName) + return endpoints[i]; + } + + return nullptr; + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const Status endStatus, const Str_8 &id) const + { + return GetEndpoint(endDisp, endStatus, id.Hash_64()); + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const Char_8 token[64]) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + return endpoints[i]; + } + + return nullptr; + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const UInt_64 hashName) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetHashName() == hashName) + return endpoints[i]; + } + + return nullptr; + } + + NetEnd *EHC::GetEndpoint(const EndDisp endDisp, const Str_8 &id) const + { + return GetEndpoint(endDisp, id.Hash_64()); + } + + NetEnd *EHC::GetEndpoint(const Str_8& rAddress, const UInt_16 rPort) const + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetAddress() == rAddress && endpoints[i]->GetPort() == rPort) + return endpoints[i]; + + return nullptr; + } + + Array EHC::GetEndpoints(const EndDisp endDisp, const Status endStatus) + { + Array result(endpoints.Size()); + UInt_64 count = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() == endStatus) + result[count++] = endpoints[i]; + } + + result.Resize(count); + + return result; + } + + Array EHC::GetEndpoints(const EndDisp endDisp) + { + Array result(endpoints.Size()); + UInt_64 count = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetDisposition() == endDisp) + result[count++] = endpoints[i]; + + result.Resize(count); + + return result; + } + + UInt_64 EHC::GetEndpointsCount(const EndDisp endDisp, const Status endStatus) + { + UInt_64 count = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (endpoints[i]->GetStatus() == endStatus) + ++count; + } + + return count; + } + + UInt_64 EHC::GetEndpointsCount(const EndDisp endDisp) + { + UInt_64 count = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + if (endpoints[i]->GetDisposition() == endDisp) + ++count; + + return count; + } + + UInt_64 EHC::GetMaxEndpoints() const + { + return maxEndpoints; + } + + void EHC::SetMaxTimeout(const float seconds) + { + maxTimeout = seconds; + } + + float EHC::GetMaxTimeout() const + { + return maxTimeout; + } + + void EHC::SetResendRate(const float seconds) + { + resendRate = seconds; + } + + float EHC::GetResendRate() const + { + return resendRate; + } + + void EHC::SetConnectCb(const ConnectCb cb) + { + connectCb = cb; + } + + void EHC::SetConnectedCb(const ConnectedCb cb) + { + connectedCb = cb; + } + + void EHC::SetActiveCb(const ActiveCb cb) + { + activeCb = cb; + } + + void EHC::SetDisconnectCb(const DisconnectCb cb) + { + disconnectCb = cb; + } + + void EHC::SetDisconnectedCb(const DisconnectedCb cb) + { + disconnectedCb = cb; + } + + void EHC::SetTimeoutCb(const TimeoutCb cb) + { + timeoutCb = cb; + } + + void EHC::GenerateToken(Char_8 in[64]) + { + PRNG_u64 rng(CPU::GetTSC()); + + for (UInt_64 i = 0; i < 8; ++i) + { + do + ((UInt_64*)in)[i] = rng.Generate(); + while (!i && ((UInt_64*)in)[i] == 0); + } + + if (HasEndpoint(in)) + GenerateToken(in); + } + + void EHC::UpdateQueue(UInt_64 active) + { + UInt_64 slot = 0; + + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetStatus() == Status::IN_LOCAL_QUEUE) + { + if (active < maxEndpoints) + { + endpoints[i]->SetStatus(Status::ACTIVE); + endpoints[i]->SetQueueSlot(0); + + Serializer payload(Endianness::LE); + payload.Write(Status::ACTIVE); + payload.Write(0); + + endpoints[i]->Send(false, true, false, internalSys, statusUpdateOp, payload); + + if (activeCb) + activeCb(this, endpoints[i]); + + ++active; + } + else + { + if (endpoints[i]->GetQueueSlot() != slot) + { + Serializer payload(Endianness::LE); + payload.Write(Status::IN_REMOTE_QUEUE); + payload.Write(slot); + + endpoints[i]->Send(false, true, false, internalSys, statusUpdateOp, payload); + + endpoints[i]->SetQueueSlot(slot++); + } + else + { + ++slot; + } + } + } + } + } + + void EHC::UpdateQueue() + { + UpdateQueue(GetEndpointsCount(EndDisp::ENDPOINT, Status::ACTIVE)); + } + + bool EHC::RemoveEndpoint(const EndDisp endDisp, const Char_8 token[64]) + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetDisposition() != endDisp) + continue; + + if (Util::Compare(endpoints[i]->token, token, 64)) + { + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + return true; + } + } + + return false; + } + + bool EHC::RemoveEndpoint(const Str_8& rAddress, const UInt_16 rPort) + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i]->GetAddress() == rAddress && endpoints[i]->GetPort() == rPort) + { + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + return true; + } + } + + return false; + } + + bool EHC::RemoveEndpoint(const NetEnd* const end) + { + for (UInt_64 i = 0; i < endpoints.Size(); ++i) + { + if (endpoints[i] == end) + { + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + return true; + } + } + + return false; + } + + void EHC::PollEndpoints() + { + UInt_64 i = 0; + while (i < endpoints.Size()) + { + endpoints[i]->Poll(delta); + + if (endpoints[i]->GetStatus() == Status::PENDING) + { + if (endpoints[i]->GetTimeout() >= maxTimeout) + { + if (timeoutCb) + timeoutCb(this, endpoints[i]); + + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + continue; + } + } + else + { + if (endpoints[i]->GetTimeout() >= maxTimeout) + { + if (timeoutCb) + timeoutCb(this, endpoints[i]); + + delete endpoints[i]; + + if (i != endpoints.Size() - 1) + endpoints.Swap(i, endpoints.End()); + + endpoints.Pop(); + + UpdateQueue(); + + continue; + } + + Vector* frags = endpoints[i]->GetReceived(); + + UInt_64 f = 0; + while (f < frags->Size()) + { + if (!(*frags)[f].IsComplete()) + { + ++f; + continue; + } + + Packet packet = (*frags)[f].Combine(); + + NetSys* sys = GetSystem(packet.header.system); + if (!sys) + { + ++f; + continue; + } + + sys->Execute(this, endpoints[i], packet.header.op, packet.payload); + + frags->Swap(f, frags->End()); + frags->Pop(); + } + } + + ++i; + } + } +} diff --git a/src/io/socket/ehc/NetEnc.cpp b/src/io/socket/ehc/NetEnc.cpp new file mode 100644 index 0000000..e9b88f1 --- /dev/null +++ b/src/io/socket/ehc/NetEnc.cpp @@ -0,0 +1,76 @@ +#include "ehs/io/socket/ehc/NetEnc.h" + +namespace ehs +{ + NetEnc::NetEnc() + : owner(nullptr), hashId(0) + { + } + + NetEnc::NetEnc(Str_8 id) + : owner(nullptr), hashId(id.Hash_64()), id((Str_8 &&)id) + { + } + + NetEnc::NetEnc(NetEnc&& enc) noexcept + : owner(enc.owner), hashId(enc.hashId), id((Str_8 &&)enc.id) + { + enc.owner = nullptr; + enc.hashId = 0; + } + + NetEnc::NetEnc(const NetEnc& enc) + : owner(nullptr), hashId(enc.hashId), id(enc.id) + { + } + + NetEnc& NetEnc::operator=(NetEnc&& enc) noexcept + { + if (this == &enc) + return *this; + + owner = enc.owner; + hashId = enc.hashId; + id = (Str_8 &&)enc.id; + + enc.owner = nullptr; + enc.hashId = 0; + + return *this; + } + + NetEnc& NetEnc::operator=(const NetEnc& enc) + { + if (this == &enc) + return *this; + + owner = nullptr; + hashId = enc.hashId; + id = enc.id; + + return *this; + } + + EHC* NetEnc::GetOwner() const + { + return owner; + } + + UInt_64 NetEnc::GetHashId() const + { + return hashId; + } + + Str_8 NetEnc::GetId() const + { + return id; + } + + void NetEnc::Encrypt(Byte *data, UInt_64 size) const + { + } + + void NetEnc::Decrypt(Byte *data, UInt_64 size) const + { + } +} \ No newline at end of file diff --git a/src/io/socket/ehc/NetEnd.cpp b/src/io/socket/ehc/NetEnd.cpp new file mode 100644 index 0000000..15e20e5 --- /dev/null +++ b/src/io/socket/ehc/NetEnd.cpp @@ -0,0 +1,535 @@ +#include "ehs/io/socket/ehc/NetEnd.h" +#include "ehs/io/socket/EHC.h" +#include "ehs/io/socket/ehc/NetEnc.h" +#include "ehs/system/CPU.h" + +namespace ehs +{ + NetEnd::NetEnd() + : owner(nullptr), disposition(EndDisp::ENDPOINT), hashName(0), status(Status::PENDING), + arch(Architecture::UNKNOWN), token{}, nextSendId(0), nextRecvId(0), type(AddrType::IPV6), port(0), + deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), + queueSlot(0) + { + } + + NetEnd::NetEnd(const EndDisp disposition, Str_8 id, const Architecture arch, const AddrType type, + Str_8 address, const UInt_16 port) + : owner(nullptr), disposition(disposition), hashName(id.Hash_64()), name((Str_8&&)id), status(Status::ACTIVE), + arch(arch), token{}, nextSendId(0), nextRecvId(0), type(type), address((Str_8&&)address), port(port), + deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), + queueSlot(0) + { + } + + NetEnd::NetEnd(const AddrType type, Str_8 address, const UInt_16 port) + : owner(nullptr), disposition(EndDisp::ENDPOINT), hashName(0), status(Status::PENDING), + arch(Architecture::UNKNOWN), token{}, nextSendId(0), nextRecvId(0), type(type), address((Str_8&&)address), + port(port), deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), + latency(0.0f), queueSlot(0) + { + } + + NetEnd::NetEnd(NetEnd &&end) noexcept + : owner(end.owner), disposition(end.disposition), hashName(end.hashName), name((Str_8&&)end.name), status(end.status), arch(end.arch), token{}, + nextSendId(end.nextSendId), sent((Vector&&)end.sent), nextRecvId(end.nextRecvId), + received((Vector&&)end.received), type(end.type), address((Str_8&&)end.address), port(end.port), + deltaDuration(end.deltaDuration), deltaRate(end.deltaRate), timeout(end.timeout), lastPing(end.lastPing), + oldLatency(end.oldLatency), latency(end.latency), queueSlot(end.queueSlot) + { + end.owner = nullptr; + end.disposition = EndDisp::ENDPOINT; + end.hashName = 0; + end.status = Status::PENDING; + end.arch = Architecture::UNKNOWN; + Util::Copy(token, end.token, 64); + Util::Zero(end.token, 64); + end.nextSendId = 0; + end.nextRecvId = 0; + end.type = AddrType::IPV6; + end.port = 0; + end.deltaDuration = 0.0f; + end.deltaRate = 1.0f / 60.0f; + end.timeout = 0.0f; + end.lastPing = 0.0f; + end.oldLatency = 0.0f; + end.latency = 0.0f; + end.queueSlot = 0; + } + + NetEnd::NetEnd(const NetEnd& end) + : owner(nullptr), disposition(EndDisp::ENDPOINT), hashName(end.hashName), name(end.name), status(Status::PENDING), arch(Architecture::UNKNOWN), + token{}, nextSendId(0), nextRecvId(0), type(end.type), port(0), deltaDuration(0.0f), deltaRate(1.0f / 60.0f), + timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), queueSlot(0) + { + } + + NetEnd& NetEnd::operator=(NetEnd&& end) noexcept + { + if (this == &end) + return *this; + + owner = end.owner; + disposition = end.disposition; + hashName = end.hashName; + name = (Str_8&&)end.name; + status = end.status; + arch = end.arch; + Util::Copy(token, end.token, 64); + nextSendId = end.nextSendId; + sent = (Vector&&)end.sent; + nextRecvId = end.nextRecvId; + received = (Vector&&)end.received; + type = end.type; + address = (Str_8&&)end.address; + port = end.port; + deltaDuration = end.deltaDuration; + deltaRate = end.deltaRate; + timeout = end.timeout; + lastPing = end.lastPing; + oldLatency = end.oldLatency; + latency = end.latency; + queueSlot = end.queueSlot; + + end.owner = nullptr; + end.disposition = EndDisp::ENDPOINT; + end.hashName = 0; + end.status = Status::PENDING; + end.arch = Architecture::UNKNOWN; + Util::Zero(end.token, 64); + end.nextSendId = 0; + end.nextRecvId = 0; + end.type = AddrType::IPV6; + end.port = 0; + end.deltaDuration = 0.0f; + end.deltaRate = 1.0f / 60.0f; + end.timeout = 0.0f; + end.lastPing = 0.0f; + end.oldLatency = 0.0f; + end.latency = 0.0f; + end.queueSlot = 0; + + return *this; + } + + NetEnd &NetEnd::operator=(const NetEnd &end) + { + if (this == &end) + return *this; + + owner = nullptr; + disposition = EndDisp::ENDPOINT; + hashName = end.hashName; + name = end.name; + status = Status::PENDING; + arch = Architecture::UNKNOWN; + Util::Zero(token, 64); + nextSendId = 0; + sent = {}; + nextRecvId = 0; + received = {}; + type = AddrType::IPV6; + address = {}; + port = 0; + deltaDuration = 0.0f; + deltaRate = 1.0f / 60.0f; + timeout = 0.0f; + lastPing = 0.0f; + oldLatency = 0.0f; + latency = 0.0f; + queueSlot = 0; + + return *this; + } + + EndDisp NetEnd::GetDisposition() const + { + return disposition; + } + + UInt_64 NetEnd::GetHashName() const + { + return hashName; + } + + Str_8 NetEnd::GetName() const + { + return name; + } + + Status NetEnd::GetStatus() const + { + return status; + } + + Architecture NetEnd::GetArchitecture() const + { + return arch; + } + + UInt_64 NetEnd::GetNextSendId() const + { + return nextSendId; + } + + void NetEnd::Send(const bool deltaLocked, const UInt_64 encHashId, const bool ensure, const UInt_64 sys, + const UInt_64 op, const Serializer &payload) + { + if (deltaLocked && deltaDuration < deltaRate) + return; + + Header header = { + encHashId, + nextSendId++, + 1, + 0, + ensure, + owner->GetDisposition(), + "", + sys, + op + }; + + Util::Copy(header.token, token, 64); + + if ((owner->GetLocalAddressType() == AddrType::IPV6 && payload.Size() > EHC_IPV6_PAYLOAD) || (owner->GetLocalAddressType() == AddrType::IPV4 && payload.Size() > EHC_IPV4_PAYLOAD)) + { + NetFrags frags = FragmentData(header, payload); + for (UInt_64 i = 0; i < frags.Size(); ++i) + { + Header newHeader = frags.GetHeader(); + newHeader.fragment = i; + + Send(newHeader, frags[i]); + } + } + else + { + Send(header, payload); + } + } + + void NetEnd::Send(const bool deltaLocked, const Str_8 &encId, const bool ensure, const Str_8& sys, + const Str_8& op, const Serializer<>& payload) + { + Send(deltaLocked, encId.Hash_64(), ensure, sys.Hash_64(), op.Hash_64(), payload); + } + + UInt_64 NetEnd::GetNextRecvId() const + { + return nextRecvId; + } + + Str_8 NetEnd::GetAddress() const + { + return address; + } + + UInt_16 NetEnd::GetPort() const + { + return port; + } + + float NetEnd::GetDeltaRate() const + { + return deltaRate; + } + + float NetEnd::GetTimeout() const + { + return timeout; + } + + float NetEnd::GetLastPing() const + { + return lastPing; + } + + float NetEnd::GetLatency() const + { + return oldLatency; + } + + UInt_64 NetEnd::GetQueueSlot() const + { + return queueSlot; + } + + void NetEnd::Poll(const float delta) + { + SortReceived(); + + if (deltaDuration >= deltaRate) + deltaDuration = Math::Mod(deltaDuration, deltaRate); + + deltaDuration += delta; + timeout += delta; + latency += delta; + + if (sent.Size()) + { + for (UInt_64 i = 0; i < sent.Size(); ++i) + { + sent[i].lastResend += delta; + if (sent[i].lastResend >= owner->GetResendRate()) + { + Serializer result(Endianness::LE); + result.Write(sent[i].header); + result.WriteSer(sent[i].payload); + + if (sent[i].header.encHashId) + { + NetEnc *enc = owner->GetEncryption(sent[i].header.encHashId); + if (!enc) + { + EHS_LOG_INT(LogType::WARN, 0, "The network encryption with the hash id " + + Str_8::FromNum(sent[i].header.encHashId) + ", does not exist."); + + continue; + } + + enc->Encrypt(&result[sizeof(bool)], result.Size() - sizeof(bool)); + } + + owner->udp.Send(type, address, port, result, result.Size()); + + sent[i].lastResend = Math::Mod(sent[i].lastResend, owner->GetResendRate()); + } + } + } + + if (owner->GetDisposition() == EndDisp::SERVICE) + { + lastPing += delta; + if (lastPing >= 1.0f) + Ping(delta); + } + + EHS_LOG_SUCCESS(); + } + + void NetEnd::SetStatus(const Status newStatus) + { + status = newStatus; + } + + void NetEnd::RemoveInsurance(const UInt_64 msgId, const UInt_64 fragment) + { + for (UInt_64 i = 0; i < sent.Size(); ++i) + { + if (sent[i].header.id == msgId && sent[i].header.fragment == fragment) + { + sent.Remove(i); + break; + } + } + + timeout = 0.0f; + } + + void NetEnd::AddReceived(const Header& header, const Serializer<>& payload) + { + NetFrags* frags = nullptr; + + for (UInt_64 i = 0; i < received.Size(); ++i) + { + if (received[i].GetHeader().id == header.id) + { + if (received[i][header.fragment].Size()) + return; + + frags = &received[i]; + break; + } + } + + if (header.id > nextRecvId) + nextRecvId = header.id + 1; + + if (frags) + (*frags)[header.fragment] = payload; + else + received.Push({header, payload}); + + timeout = 0.0f; + } + + Vector* NetEnd::GetReceived() + { + return &received; + } + + void NetEnd::SetDeltaRate(const float newDeltaRate) + { + deltaRate = newDeltaRate; + } + + void NetEnd::Ping(const float delta) + { + Serializer payload(Endianness::LE); + payload.Write(delta); + + Send(false, true, false, "Internal", "Ping", payload); + lastPing = 0.0f; + latency = 0.0f; + } + + void NetEnd::Pong(const float delta) + { + Serializer payload(Endianness::LE); + payload.Write(delta); + + Send(false, true, false, "Internal", "Pong", payload); + + timeout = 0.0f; + } + + void NetEnd::SendLatency() + { + oldLatency = latency * 1000; + + Serializer sPayload(Endianness::LE); + sPayload.Write(oldLatency); + + Send(false, true, false, "Internal", "Latency", sPayload); + + latency = 0.0f; + timeout = 0.0f; + } + + void NetEnd::SetLatency(const float newLatency) + { + oldLatency = newLatency; + } + + void NetEnd::SetQueueSlot(const UInt_64 slot) + { + queueSlot = slot; + } + + NetFrags NetEnd::FragmentData(const Header& header, const Serializer<>& data) + { + NetFrags result; + + if (owner->GetLocalAddressType() == AddrType::IPV6) + { + UInt_64 frags = data.Size() / EHC_IPV6_PAYLOAD; + if (data.Size() % EHC_IPV6_PAYLOAD) + ++frags; + + result = NetFrags(header, frags); + + UInt_64 size = EHC_IPV6_PAYLOAD; + + for (UInt_64 i = 0; i < result.Size(); ++i) + { + size = EHC_IPV6_PAYLOAD; + if (i == result.Size() - 1) + size = data.Size() % EHC_IPV6_PAYLOAD; + + result[i] = {data.GetEndianness(), &data[i * EHC_IPV6_PAYLOAD], size}; + } + } + else if (owner->GetLocalAddressType() == AddrType::IPV4) + { + UInt_64 frags = data.Size() / EHC_IPV4_PAYLOAD; + if (data.Size() % EHC_IPV4_PAYLOAD) + ++frags; + + result = NetFrags(header, frags); + + UInt_64 size = EHC_IPV4_PAYLOAD; + + for (UInt_64 i = 0; i < result.Size(); ++i) + { + size = EHC_IPV4_PAYLOAD; + if (i == result.Size() - 1) + size = data.Size() % EHC_IPV4_PAYLOAD; + + result[i] = {data.GetEndianness(), &data[i * EHC_IPV4_PAYLOAD], size}; + } + } + + return result; + } + + void NetEnd::Send(const Header& header, const Serializer& payload) + { + Serializer result(Endianness::LE); + result.Write(header); + result.WriteSer(payload); + + if (header.encHashId) + { + NetEnc *enc = owner->GetEncryption(header.encHashId); + if (!enc) + { + EHS_LOG_INT(LogType::WARN, 0, "The network encryption with the hash id " + + Str_8::FromNum(header.encHashId) + ", does not exist."); + + return; + } + + enc->Encrypt(&result[sizeof(bool)], result.Size() - sizeof(bool)); + } + + if (header.ensure) + sent.Push({header, payload}); + + owner->udp.Send(type, address, port, result, result.Size()); + } + + bool NetEnd::SortingNeeded() const + { + UInt_64 lastPacket = 0; + + for (UInt_64 i = 0; i < received.Size(); ++i) + { + if (received[i].GetHeader().id < lastPacket) + return true; + else + lastPacket = received[i].GetHeader().id; + } + + return false; + } + + void NetEnd::SortReceived() + { + if (!SortingNeeded()) + return; + + Vector sorted(0, received.Stride()); + + for (UInt_64 a = 0; a < received.Size(); ++a) + { + if (sorted.Size()) + { + for (UInt_64 b = sorted.Size(); b; --b) + { + if (received[a].GetHeader().id > sorted[b - 1].GetHeader().id) + { + if (b == sorted.Size()) + sorted.Push(received[a]); + else + sorted.Insert(b, received[a]); + + break; + } + else + { + sorted.Insert(b - 1, received[a]); + + break; + } + } + } + else + { + sorted.Push(received[a]); + } + } + + received = sorted; + } +} \ No newline at end of file diff --git a/src/io/socket/ehc/NetFrags.cpp b/src/io/socket/ehc/NetFrags.cpp new file mode 100644 index 0000000..f0db0ae --- /dev/null +++ b/src/io/socket/ehc/NetFrags.cpp @@ -0,0 +1,122 @@ +#include "ehs/io/socket/ehc/NetFrags.h" + +namespace ehs +{ + NetFrags::~NetFrags() + { + delete[] data; + } + + NetFrags::NetFrags() + : data(nullptr), size(0) + { + } + + NetFrags::NetFrags(const Header &header, const Serializer &payload) + : header(header), data(new Serializer[header.fragments]), size(header.fragments) + { + this->header.fragment = 0; + data[header.fragment] = payload; + } + + NetFrags::NetFrags(const Header &header, const UInt_64 size) + : header(header), data(new Serializer[size]), size(size) + { + this->header.fragments = size; + this->header.fragment = 0; + } + + NetFrags::NetFrags(NetFrags &&frags) noexcept + : header(frags.header), data(frags.data), size(frags.size) + { + frags.header = {}; + frags.data = nullptr; + frags.size = 0; + } + + NetFrags::NetFrags(const NetFrags &frags) + : header(frags.header), data(new Serializer[frags.size]), size(frags.size) + { + for (UInt_64 i = 0; i < size; ++i) + data[i] = frags.data[i]; + } + + NetFrags &NetFrags::operator=(NetFrags &&frags) noexcept + { + if (this == &frags) + return *this; + + header = frags.header; + + delete[] data; + data = frags.data; + + size = frags.size; + + frags.header = {}; + frags.data = nullptr; + frags.size = 0; + + return *this; + } + + NetFrags &NetFrags::operator=(const NetFrags &frags) + { + if (this == &frags) + return *this; + + header = frags.header; + delete[] data; + data = new Serializer[frags.size]; + for (UInt_64 i = 0; i < frags.size; ++i) + data[i] = frags.data[i]; + size = frags.size; + + return *this; + } + + NetFrags::operator Serializer *() const + { + return data; + } + + Header NetFrags::GetHeader() const + { + return header; + } + + UInt_64 NetFrags::Size() const + { + return size; + } + + bool NetFrags::IsComplete() const + { + for (UInt_64 i = 0; i < size; ++i) + if (!data[i].Size()) + return false; + + return true; + } + + Packet NetFrags::Combine() const + { + UInt_64 rSize = 0; + for (UInt_64 i = 0; i < size; ++i) + rSize += data[i].Size(); + + Packet result = + { + header, + {Endianness::LE, rSize} + }; + result.header.fragments = 0; + + for (UInt_64 i = 0; i < size; ++i) + result.payload.WriteSer(data[i]); + + result.payload.SetOffset(0); + + return result; + } +} \ No newline at end of file diff --git a/src/io/socket/ehc/NetOp.cpp b/src/io/socket/ehc/NetOp.cpp new file mode 100644 index 0000000..40fe058 --- /dev/null +++ b/src/io/socket/ehc/NetOp.cpp @@ -0,0 +1,66 @@ +#include "ehs/io/socket/ehc/NetOp.h" +#include "ehs/io/socket/EHC.h" +#include "ehs/io/socket/ehc/NetEnd.h" +#include "ehs/io/socket/ehc/NetSys.h" + +namespace ehs +{ + NetOp::NetOp() + : hashId(0) + { + } + + NetOp::NetOp(Str_8 id) + : hashId(id.Hash_64()), id((Str_8 &&)id) + { + } + + NetOp::NetOp(NetOp&& op) noexcept + : hashId(op.hashId), id((Str_8 &&)op.id) + { + op.hashId = 0; + } + + NetOp::NetOp(const NetOp &op) + : hashId(op.hashId), id(op.id) + { + } + + NetOp& NetOp::operator=(NetOp&& op) noexcept + { + if (this == &op) + return *this; + + hashId = op.hashId; + id = (Str_8 &&)op.id; + + op.hashId = 0; + + return *this; + } + + NetOp &NetOp::operator=(const NetOp &op) + { + if (this == &op) + return *this; + + hashId = op.hashId; + id = op.id; + + return *this; + } + + Str_8 NetOp::GetId() const + { + return id; + } + + UInt_64 NetOp::GetHashId() const + { + return hashId; + } + + void NetOp::Process(EHC *ehc, NetEnd *endpoint, NetSys *sys, Serializer &payload) + { + } +} \ No newline at end of file diff --git a/src/io/socket/ehc/NetSys.cpp b/src/io/socket/ehc/NetSys.cpp new file mode 100644 index 0000000..f660e01 --- /dev/null +++ b/src/io/socket/ehc/NetSys.cpp @@ -0,0 +1,105 @@ +#include "ehs/io/socket/ehc/NetSys.h" +#include "ehs/io/socket/EHC.h" +#include "ehs/io/socket/ehc/NetEnd.h" +#include "ehs/io/socket/ehc/NetOp.h" + +namespace ehs +{ + NetSys::~NetSys() + { + for (UInt_64 i = 0; i < ops.Size(); ++i) + delete ops[i]; + + ops.Clear(); + } + + NetSys::NetSys() + : hashId(0) + { + } + + NetSys::NetSys(Str_8 id) + : hashId(id.Hash_64()), id((Str_8&&)id) + { + } + + NetSys::NetSys(NetSys&& sys) noexcept + : hashId(sys.hashId), id((Str_8&&)sys.id), ops((Array&&)sys.ops) + { + sys.hashId = 0; + } + + NetSys::NetSys(const NetSys& sys) + : hashId(sys.hashId), id(sys.id) + { + } + + NetSys& NetSys::operator=(NetSys&& sys) noexcept + { + if (this == &sys) + return *this; + + hashId = sys.hashId; + id = (Str_8&&)sys.id; + ops = (Array&&)sys.ops; + + sys.hashId = 0; + + return *this; + } + + NetSys& NetSys::operator=(const NetSys& sys) + { + if (this == &sys) + return *this; + + hashId = sys.hashId; + id = sys.id; + ops = Array(); + + return *this; + } + + Str_8 NetSys::GetId() const + { + return id; + } + + UInt_64 NetSys::GetHashId() const + { + return hashId; + } + + bool NetSys::HasOperation(const UInt_64 hashId) const + { + for (UInt_64 i = 0; i < ops.Size(); ++i) + if (ops[i]->GetHashId() == hashId) + return true; + + return false; + } + + bool NetSys::AddOperation(NetOp* op) + { + if (HasOperation(op->GetHashId())) + return false; + + ops.Push(op); + + return true; + } + + void NetSys::Execute(EHC *ehc, NetEnd *endpoint, const UInt_64 hashId, Serializer &payload) + { + for (UInt_64 i = 0; i < ops.Size(); ++i) + { + if (ops[i]->GetHashId() == hashId) + { + ops[i]->Process(ehc, endpoint, this, payload); + return; + } + } + + EHS_LOG_INT(LogType::INFO, 0, "System not found."); + } +} \ No newline at end of file