Ring Daemon 16.0.0
Loading...
Searching...
No Matches
swarm_manager.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "swarm_manager.h"
19#include <dhtnet/multiplexed_socket.h>
20#include <opendht/thread_pool.h>
21
22namespace jami {
23
24using namespace swarm_protocol;
25
26SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
27 : id_(id)
28 , rd(rand)
29 , toConnectCb_(toConnectCb)
30{
31 routing_table.setId(id);
32}
33
35{
36 if (!isShutdown_)
37 shutdown();
38}
39
40bool
41SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
42{
43 isShutdown_ = false;
44 std::vector<NodeId> newNodes;
45 {
46 std::lock_guard lock(mutex);
47 for (const auto& nodeId : known_nodes) {
48 if (addKnownNode(nodeId)) {
49 newNodes.emplace_back(nodeId);
50 }
51 }
52 }
53
54 if (newNodes.empty())
55 return false;
56
57 dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
58 auto shared = w.lock();
59 if (!shared)
60 return;
61 // If we detect a new node which already got a TCP link
62 // we can use it to speed-up the bootstrap (because opening
63 // a new channel will be easy)
64 std::set<NodeId> toConnect;
65 for (const auto& nodeId : newNodes) {
66 if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
67 toConnect.emplace(nodeId);
68 }
69 shared->maintainBuckets(toConnect);
70 });
71 return true;
72}
73
74void
75SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
76{
77 {
78 std::lock_guard lock(mutex);
79 for (const auto& nodeId : mobile_nodes)
80 addMobileNodes(nodeId);
81 }
82}
83
84void
85SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
86{
87 // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
88 if (channel) {
89 auto emit = false;
90 {
91 std::lock_guard lock(mutex);
92 emit = routing_table.findBucket(getId())->isEmpty();
93 auto bucket = routing_table.findBucket(channel->deviceId());
94 if (routing_table.addNode(channel, bucket)) {
95 std::error_code ec;
96 resetNodeExpiry(ec, channel, id_);
97 }
98 }
99 receiveMessage(channel);
100 if (emit && onConnectionChanged_) {
101 // If it's the first channel we add, we're now connected!
102 JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
103 onConnectionChanged_(true);
104 }
105 }
106}
107
108void
110{
111 std::unique_lock lk(mutex);
112 if (isConnectedWith(nodeId)) {
113 removeNodeInternal(nodeId);
114 lk.unlock();
116 }
117}
118
119void
120SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
121{
122 std::lock_guard lock(mutex);
123 auto bucket = routing_table.findBucket(nodeId);
124 bucket->changeMobility(nodeId, isMobile);
125}
126
127bool
129{
130 return routing_table.hasNode(deviceId);
131}
132
133void
135{
136 if (isShutdown_) {
137 return;
138 }
139 isShutdown_ = true;
140 std::lock_guard lock(mutex);
141 routing_table.shutdownAllNodes();
142}
143
144void
146{
147 isShutdown_ = false;
148}
149
150bool
151SwarmManager::addKnownNode(const NodeId& nodeId)
152{
153 return routing_table.addKnownNode(nodeId);
154}
155
156void
157SwarmManager::addMobileNodes(const NodeId& nodeId)
158{
159 if (id_ != nodeId) {
160 routing_table.addMobileNode(nodeId);
161 }
162}
163
164void
166{
167 std::set<NodeId> nodes = toConnect;
168 std::unique_lock lock(mutex);
169 auto& buckets = routing_table.getBuckets();
170 for (auto it = buckets.begin(); it != buckets.end(); ++it) {
171 auto& bucket = *it;
172 bool myBucket = routing_table.contains(it, id_);
173 auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
174 : bucket.getConnectingNodesSize() + bucket.getNodesSize();
175 if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
176 auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
177 rd);
178 for (auto& node : nodesToTry)
179 routing_table.addConnectingNode(node);
180
181 nodes.insert(nodesToTry.begin(), nodesToTry.end());
182 }
183 }
184 lock.unlock();
185 for (auto& node : nodes)
186 tryConnect(node);
187}
188
189void
190SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
191 const NodeId& nodeId,
192 Query q,
193 int numberNodes)
194{
195 dht::ThreadPool::io().run([socket, isMobile=isMobile_, nodeId, q, numberNodes] {
196 msgpack::sbuffer buffer;
197 msgpack::packer<msgpack::sbuffer> pk(&buffer);
198 Message msg;
200 msg.request = Request {q, numberNodes, nodeId};
201 pk.pack(msg);
202
203 std::error_code ec;
204 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
205 if (ec) {
206 JAMI_ERROR("{}", ec.message());
207 }
208 });
209}
210
211void
212SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
213 const Message& msg_)
214{
215 std::lock_guard lock(mutex);
216
217 if (msg_.request->q == Query::FIND) {
218 auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
219 auto bucket = routing_table.findBucket(msg_.request->nodeId);
220 const auto& m_nodes = bucket->getMobileNodes();
221 Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
222
223 Message msg;
224 msg.is_mobile = isMobile_;
225 msg.response = std::move(toResponse);
226
227 msgpack::sbuffer buffer((size_t) 60000);
228 msgpack::packer<msgpack::sbuffer> pk(&buffer);
229 pk.pack(msg);
230
231 std::error_code ec;
232 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
233 if (ec) {
234 JAMI_ERROR("{}", ec.message());
235 return;
236 }
237 }
238}
239
240void
241SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
242{
243 struct DecodingContext
244 {
245 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
246 nullptr,
247 512};
248 };
249
250 socket->setOnRecv([w = weak(),
251 wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
252 ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
253 ctx->pac.reserve_buffer(len);
254 std::copy_n(buf, len, ctx->pac.buffer());
255 ctx->pac.buffer_consumed(len);
256
257 msgpack::object_handle oh;
258 while (ctx->pac.next(oh)) {
259 auto shared = w.lock();
260 auto socket = wsocket.lock();
261 if (!shared || !socket)
262 return size_t {0};
263
264 try {
265 Message msg;
266 oh.get().convert(msg);
267
268 if (msg.is_mobile)
269 shared->changeMobility(socket->deviceId(), msg.is_mobile);
270
271 if (msg.request) {
272 shared->sendAnswer(socket, msg);
273
274 } else if (msg.response) {
275 shared->setKnownNodes(msg.response->nodes);
276 shared->setMobileNodes(msg.response->mobile_nodes);
277 }
278
279 } catch (const std::exception& e) {
280 JAMI_WARNING("Error DRT recv: {}", e.what());
281 return len;
282 }
283 }
284
285 return len;
286 });
287
288 socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
289 dht::ThreadPool::io().run([w, deviceId] {
290 auto shared = w.lock();
291 if (shared && !shared->isShutdown_) {
292 shared->removeNode(deviceId);
293 }
294 });
295 });
296}
297
298void
299SwarmManager::resetNodeExpiry(const asio::error_code& ec,
300 const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
301 NodeId node)
302{
304 std::list<Bucket>::iterator bucket;
305
306 if (ec == asio::error::operation_aborted)
307 return;
308
309 if (!node) {
310 bucket = routing_table.findBucket(socket->deviceId());
311 idToFind = bucket->randomId(rd);
312 } else {
313 bucket = routing_table.findBucket(node);
314 idToFind = node;
315 }
316
317 sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
318
319 if (!node) {
320 auto& nodeTimer = bucket->getNodeTimer(socket);
321 nodeTimer.expires_after(FIND_PERIOD);
322 nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
324 std::placeholders::_1,
325 socket,
326 NodeId {}));
327 }
328}
329
330void
331SwarmManager::tryConnect(const NodeId& nodeId)
332{
333 if (needSocketCb_)
334 needSocketCb_(nodeId.toString(),
335 [w = weak(),
336 nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
337 auto shared = w.lock();
338 if (!shared || shared->isShutdown_)
339 return true;
340 if (socket) {
341 shared->addChannel(socket);
342 return true;
343 }
344 std::unique_lock lk(shared->mutex);
345 auto bucket = shared->routing_table.findBucket(nodeId);
346 bucket->removeConnectingNode(nodeId);
347 bucket->addKnownNode(nodeId);
348 bucket = shared->routing_table.findBucket(shared->getId());
349 if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
350 && shared->onConnectionChanged_) {
351 lk.unlock();
352 JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed",
353 fmt::ptr(shared.get()));
354 shared->onConnectionChanged_(false);
355 }
356 return true;
357 });
358}
359
360void
361SwarmManager::removeNodeInternal(const NodeId& nodeId)
362{
363 routing_table.removeNode(nodeId);
364}
365
366std::vector<NodeId>
368{
369 std::lock_guard lock(mutex);
370 return routing_table.getAllNodes();
371}
372
373void
374SwarmManager::deleteNode(std::vector<NodeId> nodes)
375{
376 {
377 std::lock_guard lock(mutex);
378 for (const auto& node : nodes) {
379 routing_table.deleteNode(node);
380 }
381 }
383}
384
385} // namespace jami
static constexpr int BUCKET_MAX_SIZE
std::vector< NodeId > getAllNodes() const
Return every node from each bucket.
void deleteNode(const NodeId &nodeId)
Delete node from every table in bucket.
bool addKnownNode(const NodeId &nodeId)
Add known node to routing table.
std::vector< NodeId > closestNodes(const NodeId &nodeId, unsigned count)
Returns the count closest nodes to a specific nodeId.
void shutdownAllNodes()
Shutdowns all nodes in routing table and add them to known_nodes or mobile_nodes.
bool addMobileNode(const NodeId &nodeId)
Add mobile node to routing table.
std::list< Bucket >::iterator findBucket(const NodeId &nodeId)
Returns bucket iterator containing nodeId.
bool removeNode(const NodeId &nodeId)
Removes node from routing table Adds it to known_nodes or mobile_nodes depending on mobility.
bool hasNode(const NodeId &nodeId)
Check if connected node exsits in routing table.
bool contains(const std::list< Bucket >::iterator &it, const NodeId &nodeId) const
Test if connected nodeId is in specific bucket.
std::list< Bucket > & getBuckets()
Returns buckets in routing table.
bool addConnectingNode(const NodeId &nodeId)
Add connecting node to routing table.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add socket to bucket.
void setId(const NodeId &node)
Sets id for routing table.
bool isMobile() const
Get mobility of swarm manager.
NeedSocketCb needSocketCb_
bool isConnectedWith(const NodeId &deviceId)
Check if we're connected with a specific device.
void removeNode(const NodeId &nodeId)
Remove channel from routing table.
void deleteNode(std::vector< NodeId > nodes)
Delete nodes from the different tables in bucket.
void shutdown()
Shutdown swarm manager.
SwarmManager(const NodeId &, const std::mt19937_64 &rand, ToConnectCb &&toConnectCb)
void changeMobility(const NodeId &nodeId, bool isMobile)
Change mobility of specific node.
void maintainBuckets(const std::set< NodeId > &toConnect={})
Maintain/Update buckets.
void setMobileNodes(const std::vector< NodeId > &mobile_nodes)
Set list of nodes to the routing table mobile_nodes.
const NodeId & getId() const
Get swarm manager id.
void restart()
Restart the swarm manager.
bool setKnownNodes(const std::vector< NodeId > &known_nodes)
Set list of nodes to the routing table known_nodes.
std::vector< NodeId > getAllNodes() const
get all nodes from the different tables in bucket
void addChannel(const std::shared_ptr< dhtnet::ChannelSocketInterface > &channel)
Add channel to routing table.
std::weak_ptr< SwarmManager > weak()
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:226
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
dht::h256 NodeId
static constexpr const std::chrono::minutes FIND_PERIOD
void emitSignal(Args... args)
Definition ring_signal.h:64
dht::PkId NodeId