Ring Daemon
Loading...
Searching...
No Matches
routing_table.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "routing_table.h"
19
20#include <dhtnet/multiplexed_socket.h>
21#include <opendht/infohash.h>
22#include <opendht/thread_pool.h>
23
24#include <math.h>
25#include <iterator>
26#include <stdlib.h>
27
28using namespace std::placeholders;
29
30namespace jami {
31
32using namespace dht;
33
35 : lowerLimit_(id)
36{}
37
38bool
39Bucket::addNode(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
40{
41 return addNode(NodeInfo(socket));
42}
43
44bool
46{
47 auto nodeId = info.socket->deviceId();
48 if (nodes.try_emplace(nodeId, std::move(info)).second) {
49 connecting_nodes.erase(nodeId);
50 known_nodes.erase(nodeId);
51 mobile_nodes.erase(nodeId);
52 return true;
53 }
54 return false;
55}
56
57bool
59{
60 auto node = nodes.find(nodeId);
61 auto isMobile = node->second.isMobile_;
62 if (node == nodes.end())
63 return false;
64 nodes.erase(nodeId);
65 if (isMobile) {
66 addMobileNode(nodeId);
67 } else {
68 addKnownNode(nodeId);
69 }
70
71 return true;
72}
73
74std::set<NodeId>
76{
77 std::set<NodeId> nodesId;
78 for (auto const& key : nodes)
79 nodesId.insert(key.first);
80 return nodesId;
81}
82
83bool
84Bucket::hasNode(const NodeId& nodeId) const
85{
86 return nodes.find(nodeId) != nodes.end();
87}
88
89bool
91{
92 if (!hasNode(nodeId)) {
93 if (known_nodes.emplace(nodeId).second) {
94 return true;
95 }
96 }
97 return false;
98}
99
100NodeId
101Bucket::getKnownNode(unsigned index) const
102{
103 if (index > known_nodes.size()) {
104 throw std::out_of_range("End of table for get known Node Id " + std::to_string(index));
105 }
106 auto it = known_nodes.begin();
107 std::advance(it, index);
108
109 return *it;
110}
111
112bool
114{
115 if (!hasNode(nodeId)) {
116 if (mobile_nodes.emplace(nodeId).second) {
117 known_nodes.erase(nodeId);
118 return true;
119 }
120 }
121 return false;
122}
123
124bool
126{
127 if (!hasNode(nodeId)) {
128 if (connecting_nodes.emplace(nodeId).second) {
129 known_nodes.erase(nodeId);
130 mobile_nodes.erase(nodeId);
131 return true;
132 }
133 }
134 return false;
135}
136
137std::set<NodeId>
138Bucket::getKnownNodesRandom(unsigned numberNodes, std::mt19937_64& rd) const
139{
140 std::set<NodeId> nodesToReturn;
141
143 return getKnownNodes();
144
145 std::uniform_int_distribution<unsigned> distrib(0, getKnownNodesSize() - 1);
146
147 while (nodesToReturn.size() < numberNodes) {
148 nodesToReturn.emplace(getKnownNode(distrib(rd)));
149 }
150
151 return nodesToReturn;
152}
153
154asio::steady_timer&
155Bucket::getNodeTimer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
156{
157 auto node = nodes.find(socket->deviceId());
158 if (node == nodes.end()) {
159 throw std::range_error("Unable to find timer " + socket->deviceId().toString());
160 }
161 return node->second.refresh_timer;
162}
163
164bool
166{
167 auto node = nodes.find(nodeId);
168 if (node != nodes.end()) {
169 auto& socket = node->second.socket;
170 auto node = socket->deviceId();
171 dht::ThreadPool::io().run([socket] { socket->shutdown(); });
172 removeNode(node);
173 return true;
174 }
175 return false;
176}
177
178void
180{
181 while (not nodes.empty()) {
182 auto it = nodes.begin();
183 dht::ThreadPool::io().run([socket = it->second.socket] { socket->shutdown(); });
184 auto nodeId = it->first;
185 removeNode(nodeId);
186 }
187}
188
189void
191{
192 JAMI_ERROR("BUCKET Number: {:d}", number);
193
194 unsigned nodeNum = 1;
195 for (auto it = nodes.begin(); it != nodes.end(); ++it) {
196 JAMI_DEBUG("Node {:s} Id: {:s} isMobile: {:s}",
197 std::to_string(nodeNum),
198 it->first.toString(),
199 std::to_string(it->second.isMobile_));
200 nodeNum++;
201 }
202 JAMI_ERROR("Mobile Nodes");
203 nodeNum = 0;
204 for (auto it = mobile_nodes.begin(); it != mobile_nodes.end(); ++it) {
205 JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString());
206 nodeNum++;
207 }
208
209 JAMI_ERROR("Known Nodes");
210 nodeNum = 0;
211 for (auto it = known_nodes.begin(); it != known_nodes.end(); ++it) {
212 JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString());
213 nodeNum++;
214 }
215 JAMI_ERROR("Connecting_nodes");
216 nodeNum = 0;
217 for (auto it = connecting_nodes.begin(); it != connecting_nodes.end(); ++it) {
218 JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString());
219 nodeNum++;
220 }
221};
222
223void
224Bucket::changeMobility(const NodeId& nodeId, bool isMobile)
225{
226 auto itn = nodes.find(nodeId);
227 if (itn != nodes.end()) {
228 itn->second.isMobile_ = isMobile;
229 }
230}
231
232// For tests
233
234std::set<std::shared_ptr<dhtnet::ChannelSocketInterface>>
236{
237 std::set<std::shared_ptr<dhtnet::ChannelSocketInterface>> sockets;
238 for (auto const& info : nodes)
239 sockets.insert(info.second.socket);
240 return sockets;
241}
242
243// ####################################################################################################
244
246{
247 buckets.emplace_back(NodeId::zero());
248}
249
250bool
252{
253 for (const auto& bucket : buckets) {
254 if (!bucket.isEmpty()) {
255 return false;
256 }
257 }
258 return true;
259}
260
261bool
262RoutingTable::addNode(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
263{
264 auto bucket = findBucket(socket->deviceId());
265 return addNode(socket, bucket);
266}
267
268bool
269RoutingTable::addNode(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel,
270 std::list<Bucket>::iterator& bucket)
271{
272 NodeId nodeId = channel->deviceId();
273
274 if (bucket->hasNode(nodeId) || id_ == nodeId) {
275 return false;
276 }
277
278 while (bucket->isFull()) {
279 if (contains(bucket, id_)) {
280 split(bucket);
281 bucket = findBucket(nodeId);
282
283 } else {
284 return bucket->addNode(std::move(channel));
285 }
286 }
287 return bucket->addNode(std::move(channel));
288}
289
290bool
292{
293 return findBucket(nodeId)->removeNode(nodeId);
294}
295
296bool
298{
299 return findBucket(nodeId)->hasNode(nodeId);
300}
301
302bool
304{
305 if (id_ == nodeId)
306 return false;
307
308 auto bucket = findBucket(nodeId);
309 if (bucket == buckets.end())
310 return false;
311
312 return bucket->addKnownNode(nodeId);
313}
314
315bool
317{
318 if (id_ == nodeId)
319 return false;
320
321 auto bucket = findBucket(nodeId);
322
323 if (bucket == buckets.end())
324 return false;
325
326 bucket->addMobileNode(nodeId);
327 return true;
328}
329
330void
332{
333 return findBucket(nodeId)->removeMobileNode(nodeId);
334}
335
336bool
338{
339 return findBucket(nodeId)->hasMobileNode(nodeId);
340};
341
342bool
344{
345 if (id_ == nodeId)
346 return false;
347
348 auto bucket = findBucket(nodeId);
349
350 if (bucket == buckets.end())
351 return 0;
352
353 bucket->addConnectingNode(nodeId);
354 return 1;
355}
356
357void
359{
360 findBucket(nodeId)->removeConnectingNode(nodeId);
361}
362
363std::list<Bucket>::iterator
365{
366 if (buckets.empty())
367 throw std::runtime_error("No bucket");
368
369 auto b = buckets.begin();
370
371 while (true) {
372 auto next = std::next(b);
373 if (next == buckets.end())
374 return b;
375 if (std::memcmp(nodeId.data(), next->getLowerLimit().data(), nodeId.size()) < 0)
376 return b;
377 b = next;
378 }
379}
380
381std::vector<NodeId>
382RoutingTable::closestNodes(const NodeId& nodeId, unsigned count)
383{
384 std::vector<NodeId> closestNodes;
385 auto bucket = findBucket(nodeId);
386 auto sortedBucketInsert = [&](const std::list<Bucket>::iterator& b) {
387 auto nodes = b->getNodeIds();
388 for (auto n : nodes) {
389 if (n != nodeId) {
390 auto here = std::find_if(closestNodes.begin(), closestNodes.end(), [&nodeId, &n](NodeId& NodeId) {
391 return nodeId.xorCmp(n, NodeId) < 0;
392 });
393
394 closestNodes.insert(here, n);
395 }
396 }
397 };
398
399 auto itn = bucket;
400 auto itp = (bucket == buckets.begin()) ? buckets.end() : std::prev(bucket);
401 while (itn != buckets.end() || itp != buckets.end()) {
402 if (itn != buckets.end()) {
404 itn = std::next(itn);
405 }
406 if (itp != buckets.end()) {
408 itp = (itp == buckets.begin()) ? buckets.end() : std::prev(itp);
409 }
410 }
411
412 if (closestNodes.size() > count) {
413 closestNodes.resize(count);
414 }
415
416 return closestNodes;
417}
418
419void
421{
422 int counter = 1;
423 JAMI_DEBUG("SWARM: {:s} ", id_.toString());
424 for (auto it = buckets.begin(); it != buckets.end(); ++it) {
425 it->printBucket(counter);
426 counter++;
427 }
428 JAMI_DEBUG("_____________________________________________________________________________");
429}
430
431void
433{
434 findBucket(nodeId)->shutdownNode(nodeId);
435}
436
437std::vector<NodeId>
439{
440 std::lock_guard lock(mutex_);
441 std::vector<NodeId> ret;
442 for (const auto& b : buckets) {
443 const auto& nodes = b.getNodeIds();
444 ret.insert(ret.end(), nodes.begin(), nodes.end());
445 }
446 return ret;
447}
448
449std::vector<NodeId>
451{
452 std::vector<NodeId> ret;
453 for (const auto& b : buckets) {
454 const auto& nodes = b.getKnownNodes();
455 ret.insert(ret.end(), nodes.begin(), nodes.end());
456 }
457 return ret;
458}
459
460std::vector<NodeId>
462{
463 std::vector<NodeId> ret;
464 for (const auto& b : buckets) {
465 const auto& nodes = b.getMobileNodes();
466 ret.insert(ret.end(), nodes.begin(), nodes.end());
467 }
468 return ret;
469}
470
471std::vector<NodeId>
473{
474 std::vector<NodeId> ret;
475 for (const auto& b : buckets) {
476 const auto& nodes = b.getConnectingNodes();
477 ret.insert(ret.end(), nodes.begin(), nodes.end());
478 }
479 return ret;
480}
481
482std::vector<NodeId>
484{
485 std::vector<NodeId> ret;
486 auto bucket = findBucket(id_);
487 const auto& nodes = bucket->getMobileNodes();
488 ret.insert(ret.end(), nodes.begin(), nodes.end());
489
490 return ret;
491}
492
493bool
494RoutingTable::contains(const std::list<Bucket>::iterator& bucket, const NodeId& nodeId) const
495{
496 return NodeId::cmp(bucket->getLowerLimit(), nodeId) <= 0
497 && (std::next(bucket) == buckets.end() || NodeId::cmp(nodeId, std::next(bucket)->getLowerLimit()) < 0);
498}
499
500std::vector<NodeId>
502{
503 std::vector<NodeId> ret;
504 for (const auto& b : buckets) {
505 const auto& nodes = b.getNodeIds();
506 const auto& knownNodes = b.getKnownNodes();
507 const auto& mobileNodes = b.getMobileNodes();
508 const auto& connectingNodes = b.getConnectingNodes();
509 ret.reserve(ret.size() + nodes.size() + knownNodes.size() + mobileNodes.size() + connectingNodes.size());
510 ret.insert(ret.end(), nodes.begin(), nodes.end());
511 ret.insert(ret.end(), knownNodes.begin(), knownNodes.end());
512 ret.insert(ret.end(), mobileNodes.begin(), mobileNodes.end());
513 ret.insert(ret.end(), connectingNodes.begin(), connectingNodes.end());
514 }
515 return ret;
516}
517
518std::vector<NodeId>
520{
521 std::vector<NodeId> ret;
522 for (const auto& b : buckets) {
523 const auto& nodes = b.getNodes();
524 const auto& mobiles = b.getMobileNodes();
525 ret.reserve(ret.size() + nodes.size() + mobiles.size());
526 for (const auto& n : nodes)
527 ret.emplace_back(n.first);
528 ret.insert(ret.end(), mobiles.begin(), mobiles.end());
529 }
530 return ret;
531}
532
533void
535{
536 auto bucket = findBucket(nodeId);
537 bucket->shutdownNode(nodeId);
538 bucket->removeConnectingNode(nodeId);
539 bucket->removeKnownNode(nodeId);
540 bucket->removeMobileNode(nodeId);
541}
542
543inline std::chrono::system_clock::time_point
544systemTimeFromSteady(std::chrono::steady_clock::time_point t,
545 const std::chrono::steady_clock::time_point& now,
546 const std::chrono::system_clock::time_point& nowSystem)
547{
548 return nowSystem + std::chrono::duration_cast<std::chrono::system_clock::duration>(t - now);
549}
550
551std::vector<RoutingTable::NodeStats>
553{
554 std::vector<NodeStats> stats;
555 auto now = std::chrono::steady_clock::now();
556 auto nowSystem = std::chrono::system_clock::now();
557 std::lock_guard lock(mutex_);
558 for (const auto& bucket : buckets) {
559 for (const auto& [id, info] : bucket.getNodes()) {
560 if (auto channel = std::dynamic_pointer_cast<dhtnet::ChannelSocket>(info.socket)) {
561 stats.push_back({id.toString(),
562 "connected",
563 channel->getRemoteAddress().toString(true),
564 systemTimeFromSteady(channel->getStartTime(), now, nowSystem),
565 info.isMobile_});
566 } else {
567 stats.push_back(
568 {id.toString(), "connected", "", std::chrono::system_clock::time_point::min(), info.isMobile_});
569 }
570 }
571 for (const auto& id : bucket.getKnownNodes()) {
572 stats.push_back({id.toString(), "known", "", std::chrono::system_clock::time_point::min(), false});
573 }
574 for (const auto& id : bucket.getMobileNodes()) {
575 stats.push_back({id.toString(), "mobile", "", std::chrono::system_clock::time_point::min(), true});
576 }
577 for (const auto& id : bucket.getConnectingNodes()) {
578 stats.push_back({id.toString(), "connecting", "", std::chrono::system_clock::time_point::min(), false});
579 }
580 }
581 return stats;
582}
583
584NodeId
585RoutingTable::middle(std::list<Bucket>::iterator& it) const
586{
587 unsigned bit = depth(it);
588 if (bit >= 8 * HASH_LEN)
589 throw std::out_of_range("End of table");
590
591 NodeId id = it->getLowerLimit();
592 id.setBit(bit, true);
593 return id;
594}
595
596unsigned
597RoutingTable::depth(std::list<Bucket>::iterator& bucket) const
598{
599 int bit1 = bucket->getLowerLimit().lowbit();
600 int bit2 = std::next(bucket) != buckets.end() ? std::next(bucket)->getLowerLimit().lowbit() : -1;
601 return std::max(bit1, bit2) + 1;
602}
603
604bool
605RoutingTable::split(std::list<Bucket>::iterator& bucket)
606{
607 NodeId id = middle(bucket);
608 auto newBucketIt = buckets.emplace(std::next(bucket), id);
609 // Re-assign nodes
610 auto& nodeSwap = bucket->getNodes();
611
612 for (auto it = nodeSwap.begin(); it != nodeSwap.end();) {
613 auto& node = *it;
614
615 auto nodeId = it->first;
616
617 if (!contains(bucket, nodeId)) {
618 newBucketIt->addNode(std::move(node.second));
619 it = nodeSwap.erase(it);
620 } else {
621 ++it;
622 }
623 }
624
625 auto connectingSwap = bucket->getConnectingNodes();
626 for (auto it = connectingSwap.begin(); it != connectingSwap.end();) {
627 auto nodeId = *it;
628
629 if (!contains(bucket, nodeId)) {
630 newBucketIt->addConnectingNode(nodeId);
631 it = connectingSwap.erase(it);
632 bucket->removeConnectingNode(nodeId);
633 } else {
634 ++it;
635 }
636 }
637
638 auto knownSwap = bucket->getKnownNodes();
639 for (auto it = knownSwap.begin(); it != knownSwap.end();) {
640 auto nodeId = *it;
641
642 if (!contains(bucket, nodeId)) {
643 newBucketIt->addKnownNode(nodeId);
644 it = knownSwap.erase(it);
645 bucket->removeKnownNode(nodeId);
646 } else {
647 ++it;
648 }
649 }
650
651 auto mobileSwap = bucket->getMobileNodes();
652 for (auto it = mobileSwap.begin(); it != mobileSwap.end();) {
653 auto nodeId = *it;
654
655 if (!contains(bucket, nodeId)) {
656 newBucketIt->addMobileNode(nodeId);
657 it = mobileSwap.erase(it);
658 bucket->removeMobileNode(nodeId);
659 } else {
660 ++it;
661 }
662 }
663
664 return true;
665}
666
667} // namespace jami
void shutdownAllNodes()
Shutdowns all sockets in nodes through shutdownNode.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add Node socket to bucket.
bool removeNode(const NodeId &nodeId)
Remove NodeId socket from bucket and insert it in known_nodes or mobile_nodes depending on its type.
Bucket()=delete
bool shutdownNode(const NodeId &nodeId)
Shutdowns socket and removes it from nodes.
unsigned getKnownNodesSize() const
Returns number of knwon_nodes in bucket.
bool addMobileNode(const NodeId &nodeId)
Add NodeId to mobile_nodes if it doesn't exist in nodes.
bool hasNode(const NodeId &nodeId) const
Test if socket exists in nodes.
std::set< NodeId > getNodeIds() const
Get NodeIds from bucket.
bool addConnectingNode(const NodeId &nodeId)
Add NodeId to connecting_nodes if it doesn't exist in nodes.
NodeId getKnownNode(unsigned index) const
Returns NodeId from known_nodes at index.
void printBucket(unsigned number) const
Prints bucket and bucket's number.
bool addKnownNode(const NodeId &nodeId)
Add NodeId to known_nodes if it doesn't exist in nodes.
std::set< std::shared_ptr< dhtnet::ChannelSocketInterface > > getNodeSockets() const
Get sockets from bucket.
asio::steady_timer & getNodeTimer(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Returns socket's timer.
const std::set< NodeId > & getKnownNodes() const
Get NodeIds from known_nodes.
std::set< NodeId > getKnownNodesRandom(unsigned numberNodes, std::mt19937_64 &rd) const
Returns random numberNodes NodeId from known_nodes.
void changeMobility(const NodeId &nodeId, bool isMobile)
Change mobility of specific node, mobile or not.
bool hasMobileNode(const NodeId &nodeId)
Check if mobile node exists in routing table.
std::vector< NodeId > getAllNodes() const
Return every node from each bucket.
std::vector< NodeId > getConnectingNodes() const
Returns all routing table's connecting nodes.
void deleteNode(const NodeId &nodeId)
Delete node from every table in bucket.
bool addKnownNode(const NodeId &nodeId)
Add known node to routing table.
std::vector< NodeId > closestNodes(const NodeId &nodeId, unsigned count)
Returns the count closest nodes to a specific nodeId.
std::vector< NodeId > getBucketMobileNodes() const
Returns mobile nodes corresponding to the swarm's id.
void shutdownNode(const NodeId &nodeId)
Shutdowns a node.
bool addMobileNode(const NodeId &nodeId)
Add mobile node to routing table.
std::list< Bucket >::iterator findBucket(const NodeId &nodeId)
Returns bucket iterator containing nodeId.
std::vector< NodeId > getMobileNodes() const
Returns all routing table's mobile nodes.
bool removeNode(const NodeId &nodeId)
Removes node from routing table Adds it to known_nodes or mobile_nodes depending on mobility.
void removeConnectingNode(const NodeId &nodeId)
Remove connecting connecting node to routing table.
bool hasNode(const NodeId &nodeId)
Check if connected node exsits in routing table.
std::vector< NodeId > getConnectedNodes() const
std::vector< NodeId > getKnownNodes() const
Returns all routing table's known nodes.
std::vector< NodeId > getNodes() const
Returns all routing table's connected nodes.
bool contains(const std::list< Bucket >::iterator &it, const NodeId &nodeId) const
Test if connected nodeId is in specific bucket.
void printRoutingTable() const
Prints routing table.
bool addConnectingNode(const NodeId &nodeId)
Add connecting node to routing table.
void removeMobileNode(const NodeId &nodeId)
Remove mobile node to routing table.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add socket to bucket.
std::vector< NodeStats > getRoutingTableStats() const
#define JAMI_ERROR(formatstr,...)
Definition logger.h:243
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:238
Definition account.h:50
dht::h256 NodeId
void emitSignal(Args... args)
Definition jami_signal.h:64
std::chrono::system_clock::time_point systemTimeFromSteady(std::chrono::steady_clock::time_point t, const std::chrono::steady_clock::time_point &now, const std::chrono::system_clock::time_point &nowSystem)
dht::PkId NodeId