20#include <dhtnet/multiplexed_socket.h>
21#include <opendht/infohash.h>
22#include <opendht/thread_pool.h>
28using namespace std::placeholders;
47 auto nodeId = info.socket->deviceId();
48 if (nodes.try_emplace(nodeId, std::move(info)).second) {
49 connecting_nodes.erase(nodeId);
50 known_nodes.erase(nodeId);
51 mobile_nodes.erase(nodeId);
60 auto node = nodes.find(nodeId);
61 auto isMobile = node->second.isMobile_;
62 if (node == nodes.end())
78 for (
auto const& key : nodes)
86 return nodes.find(nodeId) != nodes.end();
93 if (known_nodes.emplace(nodeId).second) {
103 if (index > known_nodes.size()) {
104 throw std::out_of_range(
"End of table for get known Node Id " + std::to_string(index));
106 auto it = known_nodes.begin();
107 std::advance(
it, index);
116 if (mobile_nodes.emplace(nodeId).second) {
117 known_nodes.erase(nodeId);
128 if (connecting_nodes.emplace(nodeId).second) {
129 known_nodes.erase(nodeId);
130 mobile_nodes.erase(nodeId);
157 auto node = nodes.find(socket->deviceId());
158 if (node == nodes.end()) {
159 throw std::range_error(
"Unable to find timer " + socket->deviceId().toString());
161 return node->second.refresh_timer;
167 auto node = nodes.find(nodeId);
168 if (node != nodes.end()) {
169 auto& socket = node->second.socket;
170 auto node = socket->deviceId();
171 dht::ThreadPool::io().run([socket] { socket->shutdown(); });
181 while (
not nodes.empty()) {
182 auto it = nodes.begin();
183 dht::ThreadPool::io().run([socket =
it->second.socket] { socket->shutdown(); });
184 auto nodeId =
it->first;
195 for (
auto it = nodes.begin();
it != nodes.end(); ++
it) {
196 JAMI_DEBUG(
"Node {:s} Id: {:s} isMobile: {:s}",
198 it->first.toString(),
199 std::to_string(
it->second.isMobile_));
204 for (
auto it = mobile_nodes.begin();
it != mobile_nodes.end(); ++
it) {
211 for (
auto it = known_nodes.begin();
it != known_nodes.end(); ++
it) {
217 for (
auto it = connecting_nodes.begin();
it != connecting_nodes.end(); ++
it) {
226 auto itn = nodes.find(nodeId);
227 if (
itn != nodes.end()) {
228 itn->second.isMobile_ = isMobile;
234std::set<std::shared_ptr<dhtnet::ChannelSocketInterface>>
237 std::set<std::shared_ptr<dhtnet::ChannelSocketInterface>>
sockets;
238 for (
auto const& info : nodes)
239 sockets.insert(info.second.socket);
247 buckets.emplace_back(NodeId::zero());
253 for (
const auto&
bucket : buckets) {
270 std::list<Bucket>::iterator&
bucket)
272 NodeId nodeId = channel->deviceId();
274 if (
bucket->hasNode(nodeId) || id_ == nodeId) {
278 while (
bucket->isFull()) {
284 return bucket->addNode(std::move(channel));
287 return bucket->addNode(std::move(channel));
293 return findBucket(nodeId)->removeNode(nodeId);
309 if (
bucket == buckets.end())
312 return bucket->addKnownNode(nodeId);
323 if (
bucket == buckets.end())
326 bucket->addMobileNode(nodeId);
333 return findBucket(nodeId)->removeMobileNode(nodeId);
339 return findBucket(nodeId)->hasMobileNode(nodeId);
350 if (
bucket == buckets.end())
353 bucket->addConnectingNode(nodeId);
360 findBucket(nodeId)->removeConnectingNode(nodeId);
363std::list<Bucket>::iterator
367 throw std::runtime_error(
"No bucket");
369 auto b = buckets.begin();
372 auto next = std::next(
b);
373 if (next == buckets.end())
375 if (std::memcmp(nodeId.data(), next->getLowerLimit().data(), nodeId.size()) < 0)
387 auto nodes =
b->getNodeIds();
388 for (
auto n : nodes) {
391 return nodeId.xorCmp(n, NodeId) < 0;
400 auto itp = (
bucket == buckets.begin()) ? buckets.end() : std::prev(
bucket);
401 while (
itn != buckets.end() ||
itp != buckets.end()) {
402 if (
itn != buckets.end()) {
406 if (
itp != buckets.end()) {
408 itp = (
itp == buckets.begin()) ? buckets.end() : std::prev(
itp);
424 for (
auto it = buckets.begin();
it != buckets.end(); ++
it) {
428 JAMI_DEBUG(
"_____________________________________________________________________________");
440 std::lock_guard lock(mutex_);
441 std::vector<NodeId>
ret;
442 for (
const auto&
b : buckets) {
443 const auto& nodes =
b.getNodeIds();
444 ret.insert(
ret.end(), nodes.begin(), nodes.end());
452 std::vector<NodeId>
ret;
453 for (
const auto&
b : buckets) {
454 const auto& nodes =
b.getKnownNodes();
455 ret.insert(
ret.end(), nodes.begin(), nodes.end());
463 std::vector<NodeId>
ret;
464 for (
const auto&
b : buckets) {
465 const auto& nodes =
b.getMobileNodes();
466 ret.insert(
ret.end(), nodes.begin(), nodes.end());
474 std::vector<NodeId>
ret;
475 for (
const auto&
b : buckets) {
476 const auto& nodes =
b.getConnectingNodes();
477 ret.insert(
ret.end(), nodes.begin(), nodes.end());
485 std::vector<NodeId>
ret;
487 const auto& nodes =
bucket->getMobileNodes();
488 ret.insert(
ret.end(), nodes.begin(), nodes.end());
496 return NodeId::cmp(
bucket->getLowerLimit(), nodeId) <= 0
497 && (std::next(
bucket) == buckets.end() || NodeId::cmp(nodeId, std::next(
bucket)->getLowerLimit()) < 0);
503 std::vector<NodeId>
ret;
504 for (
const auto&
b : buckets) {
505 const auto& nodes =
b.getNodeIds();
510 ret.insert(
ret.end(), nodes.begin(), nodes.end());
521 std::vector<NodeId>
ret;
522 for (
const auto&
b : buckets) {
523 const auto& nodes =
b.getNodes();
524 const auto&
mobiles =
b.getMobileNodes();
526 for (
const auto&
n : nodes)
527 ret.emplace_back(
n.first);
537 bucket->shutdownNode(nodeId);
538 bucket->removeConnectingNode(nodeId);
539 bucket->removeKnownNode(nodeId);
540 bucket->removeMobileNode(nodeId);
543inline std::chrono::system_clock::time_point
545 const std::chrono::steady_clock::time_point&
now,
546 const std::chrono::system_clock::time_point&
nowSystem)
548 return nowSystem + std::chrono::duration_cast<std::chrono::system_clock::duration>(t -
now);
551std::vector<RoutingTable::NodeStats>
554 std::vector<NodeStats>
stats;
555 auto now = std::chrono::steady_clock::now();
556 auto nowSystem = std::chrono::system_clock::now();
557 std::lock_guard lock(mutex_);
558 for (
const auto&
bucket : buckets) {
559 for (
const auto& [
id, info] :
bucket.getNodes()) {
560 if (
auto channel = std::dynamic_pointer_cast<dhtnet::ChannelSocket>(info.socket)) {
561 stats.push_back({
id.toString(),
563 channel->getRemoteAddress().toString(
true),
568 {
id.toString(),
"connected",
"", std::chrono::system_clock::time_point::min(), info.isMobile_});
571 for (
const auto&
id :
bucket.getKnownNodes()) {
572 stats.push_back({
id.toString(),
"known",
"", std::chrono::system_clock::time_point::min(),
false});
574 for (
const auto&
id :
bucket.getMobileNodes()) {
575 stats.push_back({
id.toString(),
"mobile",
"", std::chrono::system_clock::time_point::min(),
true});
577 for (
const auto&
id :
bucket.getConnectingNodes()) {
578 stats.push_back({
id.toString(),
"connecting",
"", std::chrono::system_clock::time_point::min(),
false});
585RoutingTable::middle(std::list<Bucket>::iterator&
it)
const
587 unsigned bit = depth(
it);
589 throw std::out_of_range(
"End of table");
592 id.setBit(
bit,
true);
597RoutingTable::depth(std::list<Bucket>::iterator&
bucket)
const
600 int bit2 = std::next(
bucket) != buckets.end() ? std::next(
bucket)->getLowerLimit().lowbit() : -1;
605RoutingTable::split(std::list<Bucket>::iterator&
bucket)
615 auto nodeId =
it->first;
632 bucket->removeConnectingNode(nodeId);
645 bucket->removeKnownNode(nodeId);
658 bucket->removeMobileNode(nodeId);
void shutdownAllNodes()
Shutdowns all sockets in nodes through shutdownNode.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add Node socket to bucket.
bool removeNode(const NodeId &nodeId)
Remove NodeId socket from bucket and insert it in known_nodes or mobile_nodes depending on its type.
bool shutdownNode(const NodeId &nodeId)
Shutdowns socket and removes it from nodes.
unsigned getKnownNodesSize() const
Returns number of knwon_nodes in bucket.
bool addMobileNode(const NodeId &nodeId)
Add NodeId to mobile_nodes if it doesn't exist in nodes.
bool hasNode(const NodeId &nodeId) const
Test if socket exists in nodes.
std::set< NodeId > getNodeIds() const
Get NodeIds from bucket.
bool addConnectingNode(const NodeId &nodeId)
Add NodeId to connecting_nodes if it doesn't exist in nodes.
NodeId getKnownNode(unsigned index) const
Returns NodeId from known_nodes at index.
void printBucket(unsigned number) const
Prints bucket and bucket's number.
bool addKnownNode(const NodeId &nodeId)
Add NodeId to known_nodes if it doesn't exist in nodes.
std::set< std::shared_ptr< dhtnet::ChannelSocketInterface > > getNodeSockets() const
Get sockets from bucket.
asio::steady_timer & getNodeTimer(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Returns socket's timer.
const std::set< NodeId > & getKnownNodes() const
Get NodeIds from known_nodes.
std::set< NodeId > getKnownNodesRandom(unsigned numberNodes, std::mt19937_64 &rd) const
Returns random numberNodes NodeId from known_nodes.
void changeMobility(const NodeId &nodeId, bool isMobile)
Change mobility of specific node, mobile or not.
bool hasMobileNode(const NodeId &nodeId)
Check if mobile node exists in routing table.
std::vector< NodeId > getAllNodes() const
Return every node from each bucket.
std::vector< NodeId > getConnectingNodes() const
Returns all routing table's connecting nodes.
void deleteNode(const NodeId &nodeId)
Delete node from every table in bucket.
bool addKnownNode(const NodeId &nodeId)
Add known node to routing table.
std::vector< NodeId > closestNodes(const NodeId &nodeId, unsigned count)
Returns the count closest nodes to a specific nodeId.
std::vector< NodeId > getBucketMobileNodes() const
Returns mobile nodes corresponding to the swarm's id.
void shutdownNode(const NodeId &nodeId)
Shutdowns a node.
bool addMobileNode(const NodeId &nodeId)
Add mobile node to routing table.
std::list< Bucket >::iterator findBucket(const NodeId &nodeId)
Returns bucket iterator containing nodeId.
std::vector< NodeId > getMobileNodes() const
Returns all routing table's mobile nodes.
bool removeNode(const NodeId &nodeId)
Removes node from routing table Adds it to known_nodes or mobile_nodes depending on mobility.
void removeConnectingNode(const NodeId &nodeId)
Remove connecting connecting node to routing table.
bool hasNode(const NodeId &nodeId)
Check if connected node exsits in routing table.
std::vector< NodeId > getConnectedNodes() const
std::vector< NodeId > getKnownNodes() const
Returns all routing table's known nodes.
std::vector< NodeId > getNodes() const
Returns all routing table's connected nodes.
bool contains(const std::list< Bucket >::iterator &it, const NodeId &nodeId) const
Test if connected nodeId is in specific bucket.
void printRoutingTable() const
Prints routing table.
bool addConnectingNode(const NodeId &nodeId)
Add connecting node to routing table.
void removeMobileNode(const NodeId &nodeId)
Remove mobile node to routing table.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add socket to bucket.
std::vector< NodeStats > getRoutingTableStats() const
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
void emitSignal(Args... args)
std::chrono::system_clock::time_point systemTimeFromSteady(std::chrono::steady_clock::time_point t, const std::chrono::steady_clock::time_point &now, const std::chrono::system_clock::time_point &nowSystem)