EHS/src/io/socket/ehc/NetEnd.cpp

512 lines
12 KiB
C++

#include "ehs/io/socket/ehc/NetEnd.h"
#include "ehs/io/socket/EHC.h"
#include <ehs/system/CPU.h>
#include <ehe/Encryption.h>
namespace ehs
{
NetEnd::NetEnd()
: owner(nullptr), disposition(EndDisp::ENDPOINT), hashName(0), status(Status::PENDING),
arch(Architecture::UNKNOWN), token{}, nextSendId(0), nextRecvId(0), type(AddrType::IPV6), port(0),
deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f),
queueSlot(0)
{
}
NetEnd::NetEnd(const EndDisp disposition, Str_8 id, const Architecture arch, const AddrType type,
Str_8 address, const UInt_16 port)
: owner(nullptr), disposition(disposition), hashName(id.Hash_64()), name((Str_8&&)id), status(Status::ACTIVE),
arch(arch), token{}, nextSendId(0), nextRecvId(0), type(type), address((Str_8&&)address), port(port),
deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f),
queueSlot(0)
{
}
NetEnd::NetEnd(const AddrType type, Str_8 address, const UInt_16 port)
: owner(nullptr), disposition(EndDisp::ENDPOINT), hashName(0), status(Status::PENDING),
arch(Architecture::UNKNOWN), token{}, nextSendId(0), nextRecvId(0), type(type), address((Str_8&&)address),
port(port), deltaDuration(0.0f), deltaRate(1.0f / 60.0f), timeout(0.0f), lastPing(0.0f), oldLatency(0.0f),
latency(0.0f), queueSlot(0)
{
}
NetEnd::NetEnd(NetEnd &&end) noexcept
: owner(end.owner), disposition(end.disposition), hashName(end.hashName), name((Str_8&&)end.name), status(end.status), arch(end.arch), token{},
nextSendId(end.nextSendId), sent((Vector<Insurance>&&)end.sent), nextRecvId(end.nextRecvId),
received((Vector<Fragments>&&)end.received), type(end.type), address((Str_8&&)end.address), port(end.port),
deltaDuration(end.deltaDuration), deltaRate(end.deltaRate), timeout(end.timeout), lastPing(end.lastPing),
oldLatency(end.oldLatency), latency(end.latency), queueSlot(end.queueSlot)
{
end.owner = nullptr;
end.disposition = EndDisp::ENDPOINT;
end.hashName = 0;
end.status = Status::PENDING;
end.arch = Architecture::UNKNOWN;
Util::Copy(token, end.token, 64);
Util::Zero(end.token, 64);
end.nextSendId = 0;
end.nextRecvId = 0;
end.type = AddrType::IPV6;
end.port = 0;
end.deltaDuration = 0.0f;
end.deltaRate = 1.0f / 60.0f;
end.timeout = 0.0f;
end.lastPing = 0.0f;
end.oldLatency = 0.0f;
end.latency = 0.0f;
end.queueSlot = 0;
}
NetEnd::NetEnd(const NetEnd& end)
: owner(nullptr), disposition(EndDisp::ENDPOINT), hashName(end.hashName), name(end.name), status(Status::PENDING), arch(Architecture::UNKNOWN),
token{}, nextSendId(0), nextRecvId(0), type(end.type), port(0), deltaDuration(0.0f), deltaRate(1.0f / 60.0f),
timeout(0.0f), lastPing(0.0f), oldLatency(0.0f), latency(0.0f), queueSlot(0)
{
}
NetEnd& NetEnd::operator=(NetEnd&& end) noexcept
{
if (this == &end)
return *this;
owner = end.owner;
disposition = end.disposition;
hashName = end.hashName;
name = (Str_8&&)end.name;
status = end.status;
arch = end.arch;
Util::Copy(token, end.token, 64);
nextSendId = end.nextSendId;
sent = (Vector<Insurance>&&)end.sent;
nextRecvId = end.nextRecvId;
received = (Vector<Fragments>&&)end.received;
type = end.type;
address = (Str_8&&)end.address;
port = end.port;
deltaDuration = end.deltaDuration;
deltaRate = end.deltaRate;
timeout = end.timeout;
lastPing = end.lastPing;
oldLatency = end.oldLatency;
latency = end.latency;
queueSlot = end.queueSlot;
end.owner = nullptr;
end.disposition = EndDisp::ENDPOINT;
end.hashName = 0;
end.status = Status::PENDING;
end.arch = Architecture::UNKNOWN;
Util::Zero(end.token, 64);
end.nextSendId = 0;
end.nextRecvId = 0;
end.type = AddrType::IPV6;
end.port = 0;
end.deltaDuration = 0.0f;
end.deltaRate = 1.0f / 60.0f;
end.timeout = 0.0f;
end.lastPing = 0.0f;
end.oldLatency = 0.0f;
end.latency = 0.0f;
end.queueSlot = 0;
return *this;
}
NetEnd &NetEnd::operator=(const NetEnd &end)
{
if (this == &end)
return *this;
owner = nullptr;
disposition = EndDisp::ENDPOINT;
hashName = end.hashName;
name = end.name;
status = Status::PENDING;
arch = Architecture::UNKNOWN;
Util::Zero(token, 64);
nextSendId = 0;
sent = {};
nextRecvId = 0;
received = {};
type = AddrType::IPV6;
address = {};
port = 0;
deltaDuration = 0.0f;
deltaRate = 1.0f / 60.0f;
timeout = 0.0f;
lastPing = 0.0f;
oldLatency = 0.0f;
latency = 0.0f;
queueSlot = 0;
return *this;
}
EndDisp NetEnd::GetDisposition() const
{
return disposition;
}
UInt_64 NetEnd::GetHashName() const
{
return hashName;
}
Str_8 NetEnd::GetName() const
{
return name;
}
Status NetEnd::GetStatus() const
{
return status;
}
Architecture NetEnd::GetArchitecture() const
{
return arch;
}
UInt_64 NetEnd::GetNextSendId() const
{
return nextSendId;
}
void NetEnd::Send(const bool deltaLocked, const bool encrypted, const bool ensure, const UInt_64 sys,
const UInt_64 op, const Serializer<>& payload)
{
if (deltaLocked && deltaDuration < deltaRate)
return;
Header header = {
encrypted,
nextSendId++,
1,
0,
ensure,
owner->GetDisposition(),
"",
sys,
op
};
Util::Copy(header.token, token, 64);
if ((owner->GetLocalAddressType() == AddrType::IPV6 && payload.Size() > COMMS_IPV6_PAYLOAD) || (owner->GetLocalAddressType() == AddrType::IPV4 && payload.Size() > COMMS_IPV4_PAYLOAD))
{
Fragments frags = FragmentData(header, payload);
for (UInt_64 i = 0; i < frags.Size(); ++i)
{
Header newHeader = frags.GetHeader();
newHeader.fragment = i;
Send(newHeader, frags[i]);
}
}
else
{
Send(header, payload);
}
}
void NetEnd::Send(const bool deltaLocked, const bool encrypted, const bool ensure, const Str_8& sys,
const Str_8& op, const Serializer<>& payload)
{
Send(deltaLocked, encrypted, ensure, sys.Hash_64(), op.Hash_64(), payload);
}
UInt_64 NetEnd::GetNextRecvId() const
{
return nextRecvId;
}
Str_8 NetEnd::GetAddress() const
{
return address;
}
UInt_16 NetEnd::GetPort() const
{
return port;
}
float NetEnd::GetDeltaRate() const
{
return deltaRate;
}
float NetEnd::GetTimeout() const
{
return timeout;
}
float NetEnd::GetLastPing() const
{
return lastPing;
}
float NetEnd::GetLatency() const
{
return oldLatency;
}
UInt_64 NetEnd::GetQueueSlot() const
{
return queueSlot;
}
void NetEnd::Poll(const float delta)
{
SortReceived();
if (deltaDuration >= deltaRate)
deltaDuration = Math::Mod(deltaDuration, deltaRate);
deltaDuration += delta;
timeout += delta;
latency += delta;
if (sent.Size())
{
for (UInt_64 i = 0; i < sent.Size(); ++i)
{
sent[i].lastResend += delta;
if (sent[i].lastResend >= owner->GetResendRate())
{
Serializer result(Endianness::LE);
result.Write(sent[i].header);
result.WriteSer(sent[i].payload);
if (sent[i].header.encrypted)
Encryption::Encrypt_64(result.Size() - sizeof(bool), &result[sizeof(bool)]);
owner->udp.Send(type, address, port, result, result.Size());
sent[i].lastResend = Math::Mod(sent[i].lastResend, owner->GetResendRate());
}
}
}
if (owner->GetDisposition() == EndDisp::SERVICE)
{
lastPing += delta;
if (lastPing >= 1.0f)
Ping(delta);
}
}
void NetEnd::SetStatus(const Status newStatus)
{
status = newStatus;
}
void NetEnd::RemoveInsurance(const UInt_64 msgId, const UInt_64 fragment)
{
for (UInt_64 i = 0; i < sent.Size(); ++i)
{
if (sent[i].header.id == msgId && sent[i].header.fragment == fragment)
{
sent.Remove(i);
break;
}
}
timeout = 0.0f;
}
void NetEnd::AddReceived(const Header& header, const Serializer<>& payload)
{
Fragments* frags = nullptr;
for (UInt_64 i = 0; i < received.Size(); ++i)
{
if (received[i].GetHeader().id == header.id)
{
if (received[i][header.fragment].Size())
return;
frags = &received[i];
break;
}
}
if (header.id > nextRecvId)
nextRecvId = header.id + 1;
if (frags)
(*frags)[header.fragment] = payload;
else
received.Push({header, payload});
timeout = 0.0f;
}
Vector<Fragments>* NetEnd::GetReceived()
{
return &received;
}
void NetEnd::SetDeltaRate(const float newDeltaRate)
{
deltaRate = newDeltaRate;
}
void NetEnd::Ping(const float delta)
{
Serializer payload(Endianness::LE);
payload.Write(delta);
Send(false, true, false, "Internal", "Ping", payload);
lastPing = 0.0f;
latency = 0.0f;
}
void NetEnd::Pong(const float delta)
{
Serializer payload(Endianness::LE);
payload.Write(delta);
Send(false, true, false, "Internal", "Pong", payload);
timeout = 0.0f;
}
void NetEnd::SendLatency()
{
oldLatency = latency * 1000;
Serializer sPayload(Endianness::LE);
sPayload.Write(oldLatency);
Send(false, true, false, "Internal", "Latency", sPayload);
latency = 0.0f;
timeout = 0.0f;
}
void NetEnd::SetLatency(const float newLatency)
{
oldLatency = newLatency;
}
void NetEnd::SetQueueSlot(const UInt_64 slot)
{
queueSlot = slot;
}
Fragments NetEnd::FragmentData(const Header& header, const Serializer<>& data)
{
Fragments result;
if (owner->GetLocalAddressType() == AddrType::IPV6)
{
UInt_64 frags = data.Size() / COMMS_IPV6_PAYLOAD;
if (data.Size() % COMMS_IPV6_PAYLOAD)
++frags;
result = Fragments(header, frags);
UInt_64 size = COMMS_IPV6_PAYLOAD;
for (UInt_64 i = 0; i < result.Size(); ++i)
{
size = COMMS_IPV6_PAYLOAD;
if (i == result.Size() - 1)
size = data.Size() % COMMS_IPV6_PAYLOAD;
result[i] = {data.GetEndianness(), &data[i * COMMS_IPV6_PAYLOAD], size};
}
}
else if (owner->GetLocalAddressType() == AddrType::IPV4)
{
UInt_64 frags = data.Size() / COMMS_IPV4_PAYLOAD;
if (data.Size() % COMMS_IPV4_PAYLOAD)
++frags;
result = Fragments(header, frags);
UInt_64 size = COMMS_IPV4_PAYLOAD;
for (UInt_64 i = 0; i < result.Size(); ++i)
{
size = COMMS_IPV4_PAYLOAD;
if (i == result.Size() - 1)
size = data.Size() % COMMS_IPV4_PAYLOAD;
result[i] = {data.GetEndianness(), &data[i * COMMS_IPV4_PAYLOAD], size};
}
}
return result;
}
void NetEnd::Send(const Header& header, const Serializer<UInt_64>& payload)
{
Serializer result(Endianness::LE);
result.Write(header);
result.WriteSer(payload);
if (header.encrypted)
Encryption::Encrypt_64(result.Size() - sizeof(bool), &result[sizeof(bool)]);
if (header.ensure)
sent.Push({header, payload});
owner->udp.Send(type, address, port, result, result.Size());
}
bool NetEnd::SortingNeeded() const
{
UInt_64 lastPacket = 0;
for (UInt_64 i = 0; i < received.Size(); ++i)
{
if (received[i].GetHeader().id < lastPacket)
return true;
else
lastPacket = received[i].GetHeader().id;
}
return false;
}
void NetEnd::SortReceived()
{
if (!SortingNeeded())
return;
Vector<Fragments> sorted(0, received.Stride());
for (UInt_64 a = 0; a < received.Size(); ++a)
{
if (sorted.Size())
{
for (UInt_64 b = sorted.Size(); b; --b)
{
if (received[a].GetHeader().id > sorted[b - 1].GetHeader().id)
{
if (b == sorted.Size())
sorted.Push(received[a]);
else
sorted.Insert(b, received[a]);
break;
}
else
{
sorted.Insert(b - 1, received[a]);
break;
}
}
}
else
{
sorted.Push(received[a]);
}
}
received = sorted;
}
}