Ring Daemon
Loading...
Searching...
No Matches
swarm_manager.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "swarm_manager.h"
19#include <dhtnet/multiplexed_socket.h>
20#include <dhtnet/channel_utils.h>
21#include <opendht/thread_pool.h>
22
23namespace jami {
24
25using namespace swarm_protocol;
26
27SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
28 : id_(id)
29 , rd(rand)
30 , toConnectCb_(toConnectCb)
31{
32 routing_table.setId(id);
33}
34
36{
37 if (!isShutdown_)
38 shutdown();
39}
40
41bool
42SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
43{
44 isShutdown_ = false;
45 std::vector<NodeId> newNodes;
46 {
47 std::lock_guard lock(mutex);
48 for (const auto& nodeId : known_nodes) {
49 if (addKnownNode(nodeId)) {
50 newNodes.emplace_back(nodeId);
51 }
52 }
53 }
54
55 if (newNodes.empty())
56 return false;
57
58 dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
59 auto shared = w.lock();
60 if (!shared)
61 return;
62 // If we detect a new node which already got a TCP link
63 // we can use it to speed-up the bootstrap (because opening
64 // a new channel will be easy)
65 std::set<NodeId> toConnect;
66 for (const auto& nodeId : newNodes) {
67 if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
68 toConnect.emplace(nodeId);
69 }
70 shared->maintainBuckets(toConnect);
71 });
72 return true;
73}
74
75void
76SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
77{
78 {
79 std::lock_guard lock(mutex);
80 for (const auto& nodeId : mobile_nodes)
81 addMobileNodes(nodeId);
82 }
83}
84
85void
86SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
87{
88 // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
89 if (channel) {
90 auto emit = false;
91 {
92 std::lock_guard lock(mutex);
93 emit = routing_table.findBucket(getId())->isEmpty();
94 auto bucket = routing_table.findBucket(channel->deviceId());
95 if (routing_table.addNode(channel, bucket)) {
96 std::error_code ec;
97 resetNodeExpiry(ec, channel, id_);
98 }
99 }
100 receiveMessage(channel);
101 if (emit && onConnectionChanged_) {
102 // If it's the first channel we add, we're now connected!
103 JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
104 onConnectionChanged_(true);
105 }
106 }
107}
108
109void
111{
112 std::unique_lock lk(mutex);
113 if (isConnectedWith(nodeId)) {
114 removeNodeInternal(nodeId);
115 lk.unlock();
117 }
118}
119
120void
121SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
122{
123 std::lock_guard lock(mutex);
124 auto bucket = routing_table.findBucket(nodeId);
125 bucket->changeMobility(nodeId, isMobile);
126}
127
128bool
130{
131 return routing_table.hasNode(deviceId);
132}
133
134void
136{
137 if (isShutdown_) {
138 return;
139 }
140 isShutdown_ = true;
141 std::lock_guard lock(mutex);
142 routing_table.shutdownAllNodes();
143}
144
145void
147{
148 isShutdown_ = false;
149}
150
151bool
152SwarmManager::addKnownNode(const NodeId& nodeId)
153{
154 return routing_table.addKnownNode(nodeId);
155}
156
157void
158SwarmManager::addMobileNodes(const NodeId& nodeId)
159{
160 if (id_ != nodeId) {
161 routing_table.addMobileNode(nodeId);
162 }
163}
164
165void
167{
168 std::set<NodeId> nodes = toConnect;
169 std::unique_lock lock(mutex);
170 auto& buckets = routing_table.getBuckets();
171 for (auto it = buckets.begin(); it != buckets.end(); ++it) {
172 auto& bucket = *it;
173 bool myBucket = routing_table.contains(it, id_);
174 auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
175 : bucket.getConnectingNodesSize() + bucket.getNodesSize();
176 if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
177 auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes, rd);
178 for (auto& node : nodesToTry)
179 routing_table.addConnectingNode(node);
180
181 nodes.insert(nodesToTry.begin(), nodesToTry.end());
182 }
183 }
184 lock.unlock();
185 for (const auto& node : nodes)
186 tryConnect(node);
187}
188
189void
190SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
191 const NodeId& nodeId,
192 Query q,
193 int numberNodes)
194{
195 dht::ThreadPool::io().run([socket, isMobile = isMobile_, nodeId, q, numberNodes] {
196 msgpack::sbuffer buffer;
197 msgpack::packer<msgpack::sbuffer> pk(&buffer);
198 Message msg;
200 msg.request = Request {q, numberNodes, nodeId};
201 pk.pack(msg);
202
203 std::error_code ec;
204 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
205 if (ec) {
206 JAMI_ERROR("{}", ec.message());
207 }
208 });
209}
210
211void
212SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket, const Message& msg_)
213{
214 std::lock_guard lock(mutex);
215
216 if (msg_.request->q == Query::FIND) {
217 auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
218 auto bucket = routing_table.findBucket(msg_.request->nodeId);
219 const auto& m_nodes = bucket->getMobileNodes();
220 Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
221
222 Message msg;
223 msg.is_mobile = isMobile_;
224 msg.response = std::move(toResponse);
225
226 msgpack::sbuffer buffer((size_t) 60000);
227 msgpack::packer<msgpack::sbuffer> pk(&buffer);
228 pk.pack(msg);
229
230 std::error_code ec;
231 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
232 if (ec) {
233 JAMI_ERROR("{}", ec.message());
234 return;
235 }
236 }
237}
238
239void
240SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
241{
242 socket->setOnRecv(dhtnet::buildMsgpackReader<Message>(
243 [w = weak(), wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket)](Message&& msg) {
244 auto shared = w.lock();
245 auto socket = wsocket.lock();
246 if (!shared || !socket)
247 return std::make_error_code(std::errc::operation_canceled);
248
249 if (msg.is_mobile)
250 shared->changeMobility(socket->deviceId(), msg.is_mobile);
251
252 if (msg.request) {
253 shared->sendAnswer(socket, msg);
254
255 } else if (msg.response) {
256 shared->setKnownNodes(msg.response->nodes);
257 shared->setMobileNodes(msg.response->mobile_nodes);
258 }
259 return std::error_code();
260 }));
261
262 socket->onShutdown([w = weak(), deviceId = socket->deviceId()](const std::error_code&) {
263 dht::ThreadPool::io().run([w, deviceId] {
264 auto shared = w.lock();
265 if (shared && !shared->isShutdown_) {
266 shared->removeNode(deviceId);
267 }
268 });
269 });
270}
271
272void
273SwarmManager::resetNodeExpiry(const asio::error_code& ec,
274 const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
275 NodeId node)
276{
278 std::list<Bucket>::iterator bucket;
279
280 if (ec == asio::error::operation_aborted)
281 return;
282
283 if (!node) {
284 bucket = routing_table.findBucket(socket->deviceId());
285 idToFind = bucket->randomId(rd);
286 } else {
287 bucket = routing_table.findBucket(node);
288 idToFind = node;
289 }
290
291 sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
292
293 if (!node) {
294 auto& nodeTimer = bucket->getNodeTimer(socket);
295 nodeTimer.expires_after(FIND_PERIOD);
296 nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
298 std::placeholders::_1,
299 socket,
300 NodeId {}));
301 }
302}
303
304void
305SwarmManager::tryConnect(const NodeId& nodeId)
306{
307 if (needSocketCb_)
308 needSocketCb_(nodeId.toString(),
309 [w = weak(), nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
310 auto shared = w.lock();
311 if (!shared || shared->isShutdown_)
312 return true;
313 if (socket) {
314 shared->addChannel(socket);
315 return true;
316 }
317 std::unique_lock lk(shared->mutex);
318 auto bucket = shared->routing_table.findBucket(nodeId);
319 bucket->removeConnectingNode(nodeId);
320 bucket->addKnownNode(nodeId);
321 bucket = shared->routing_table.findBucket(shared->getId());
322 if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
323 && shared->onConnectionChanged_) {
324 lk.unlock();
325 JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed", fmt::ptr(shared.get()));
326 shared->onConnectionChanged_(false);
327 }
328 return true;
329 });
330}
331
332void
333SwarmManager::removeNodeInternal(const NodeId& nodeId)
334{
335 routing_table.removeNode(nodeId);
336}
337
338std::vector<NodeId>
340{
341 std::lock_guard lock(mutex);
342 return routing_table.getAllNodes();
343}
344
345std::vector<NodeId>
347{
348 std::lock_guard lock(mutex);
349 return routing_table.getConnectedNodes();
350}
351
352std::vector<std::map<std::string, std::string>>
354{
355 std::lock_guard lock(mutex);
356 auto stats = routing_table.getRoutingTableStats();
357 std::vector<std::map<std::string, std::string>> result;
358 result.reserve(stats.size());
359 for (const auto& stat : stats) {
360 result.push_back({
361 {"id", stat.id },
362 {"device", stat.id },
363 {"status", stat.status },
364 {"remoteAddress", stat.remoteAddress },
365 {"mobile", stat.isMobile ? "true" : "false"}
366 });
367 if (stat.connectionTime != std::chrono::system_clock::time_point::min()) {
368 auto tt = std::chrono::system_clock::to_time_t(stat.connectionTime);
369 result.back().emplace("connectionTime", std::to_string(tt));
370 }
371 }
372 return result;
373}
374
375bool
377{
378 std::lock_guard lock(mutex);
379 return !routing_table.isEmpty();
380}
381
382void
383SwarmManager::deleteNode(const std::vector<NodeId>& nodes)
384{
385 {
386 std::lock_guard lock(mutex);
387 for (const auto& node : nodes) {
388 routing_table.deleteNode(node);
389 }
390 }
392}
393
394} // namespace jami
static constexpr int BUCKET_MAX_SIZE
std::vector< NodeId > getAllNodes() const
Return every node from each bucket.
void deleteNode(const NodeId &nodeId)
Delete node from every table in bucket.
bool addKnownNode(const NodeId &nodeId)
Add known node to routing table.
std::vector< NodeId > closestNodes(const NodeId &nodeId, unsigned count)
Returns the count closest nodes to a specific nodeId.
void shutdownAllNodes()
Shutdowns all nodes in routing table and add them to known_nodes or mobile_nodes.
bool addMobileNode(const NodeId &nodeId)
Add mobile node to routing table.
std::list< Bucket >::iterator findBucket(const NodeId &nodeId)
Returns bucket iterator containing nodeId.
bool removeNode(const NodeId &nodeId)
Removes node from routing table Adds it to known_nodes or mobile_nodes depending on mobility.
bool hasNode(const NodeId &nodeId)
Check if connected node exsits in routing table.
std::vector< NodeId > getConnectedNodes() const
bool contains(const std::list< Bucket >::iterator &it, const NodeId &nodeId) const
Test if connected nodeId is in specific bucket.
std::list< Bucket > & getBuckets()
Returns buckets in routing table.
bool addConnectingNode(const NodeId &nodeId)
Add connecting node to routing table.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add socket to bucket.
std::vector< NodeStats > getRoutingTableStats() const
void setId(const NodeId &node)
Sets id for routing table.
bool isMobile() const
Get mobility of swarm manager.
NeedSocketCb needSocketCb_
bool isConnectedWith(const NodeId &deviceId)
Check if we're connected with a specific device.
void removeNode(const NodeId &nodeId)
Remove channel from routing table.
void shutdown()
Shutdown swarm manager.
SwarmManager(const NodeId &, const std::mt19937_64 &rand, ToConnectCb &&toConnectCb)
void changeMobility(const NodeId &nodeId, bool isMobile)
Change mobility of specific node.
void maintainBuckets(const std::set< NodeId > &toConnect={})
Maintain/Update buckets.
void deleteNode(const std::vector< NodeId > &nodes)
Delete nodes from the different tables in bucket.
void setMobileNodes(const std::vector< NodeId > &mobile_nodes)
Set list of nodes to the routing table mobile_nodes.
const NodeId & getId() const
Get swarm manager id.
void restart()
Restart the swarm manager.
bool setKnownNodes(const std::vector< NodeId > &known_nodes)
Set list of nodes to the routing table known_nodes.
std::vector< NodeId > getConnectedNodes() const
std::vector< NodeId > getAllNodes() const
get all nodes from the different tables in bucket
std::vector< std::map< std::string, std::string > > getRoutingTableInfo() const
void addChannel(const std::shared_ptr< dhtnet::ChannelSocketInterface > &channel)
Add channel to routing table.
std::weak_ptr< SwarmManager > weak()
bool isConnected() const
Check if swarm manager is connected.
#define JAMI_ERROR(formatstr,...)
Definition logger.h:243
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:238
#define JAMI_LOG(formatstr,...)
Definition logger.h:237
dht::h256 NodeId
static constexpr const std::chrono::minutes FIND_PERIOD
void emitSignal(Args... args)
Definition jami_signal.h:64
dht::PkId NodeId