Ring Daemon 16.0.0
Loading...
Searching...
No Matches
swarm_manager.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "swarm_manager.h"
19#include <dhtnet/multiplexed_socket.h>
20#include <opendht/thread_pool.h>
21
22namespace jami {
23
24using namespace swarm_protocol;
25
26SwarmManager::SwarmManager(const NodeId& id, const std::mt19937_64& rand, ToConnectCb&& toConnectCb)
27 : id_(id)
28 , rd(rand)
29 , toConnectCb_(toConnectCb)
30{
31 routing_table.setId(id);
32}
33
35{
36 if (!isShutdown_)
37 shutdown();
38}
39
40bool
41SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes)
42{
43 isShutdown_ = false;
44 std::vector<NodeId> newNodes;
45 {
46 std::lock_guard lock(mutex);
47 for (const auto& nodeId : known_nodes) {
48 if (addKnownNode(nodeId)) {
49 newNodes.emplace_back(nodeId);
50 }
51 }
52 }
53
54 if (newNodes.empty())
55 return false;
56
57 dht::ThreadPool::io().run([w = weak(), newNodes = std::move(newNodes)] {
58 auto shared = w.lock();
59 if (!shared)
60 return;
61 // If we detect a new node which already got a TCP link
62 // we can use it to speed-up the bootstrap (because opening
63 // a new channel will be easy)
64 std::set<NodeId> toConnect;
65 for (const auto& nodeId : newNodes) {
66 if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
67 toConnect.emplace(nodeId);
68 }
69 shared->maintainBuckets(toConnect);
70 });
71 return true;
72}
73
74void
75SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes)
76{
77 {
78 std::lock_guard lock(mutex);
79 for (const auto& nodeId : mobile_nodes)
80 addMobileNodes(nodeId);
81 }
82}
83
84void
85SwarmManager::addChannel(const std::shared_ptr<dhtnet::ChannelSocketInterface>& channel)
86{
87 // JAMI_WARNING("[SwarmManager {}] addChannel! with {}", fmt::ptr(this), channel->deviceId().to_view());
88 if (channel) {
89 auto emit = false;
90 {
91 std::lock_guard lock(mutex);
92 emit = routing_table.findBucket(getId())->isEmpty();
93 auto bucket = routing_table.findBucket(channel->deviceId());
94 if (routing_table.addNode(channel, bucket)) {
95 std::error_code ec;
96 resetNodeExpiry(ec, channel, id_);
97 }
98 }
99 receiveMessage(channel);
100 if (emit && onConnectionChanged_) {
101 // If it's the first channel we add, we're now connected!
102 JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this));
103 onConnectionChanged_(true);
104 }
105 }
106}
107
108void
110{
111 std::unique_lock lk(mutex);
112 if (isConnectedWith(nodeId)) {
113 removeNodeInternal(nodeId);
114 lk.unlock();
116 }
117}
118
119void
120SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile)
121{
122 std::lock_guard lock(mutex);
123 auto bucket = routing_table.findBucket(nodeId);
124 bucket->changeMobility(nodeId, isMobile);
125}
126
127bool
129{
130 return routing_table.hasNode(deviceId);
131}
132
133void
135{
136 if (isShutdown_) {
137 return;
138 }
139 isShutdown_ = true;
140 std::lock_guard lock(mutex);
141 routing_table.shutdownAllNodes();
142}
143
144void
146{
147 isShutdown_ = false;
148}
149
150bool
151SwarmManager::addKnownNode(const NodeId& nodeId)
152{
153 return routing_table.addKnownNode(nodeId);
154}
155
156void
157SwarmManager::addMobileNodes(const NodeId& nodeId)
158{
159 if (id_ != nodeId) {
160 routing_table.addMobileNode(nodeId);
161 }
162}
163
164void
166{
167 std::set<NodeId> nodes = toConnect;
168 std::unique_lock lock(mutex);
169 auto& buckets = routing_table.getBuckets();
170 for (auto it = buckets.begin(); it != buckets.end(); ++it) {
171 auto& bucket = *it;
172 bool myBucket = routing_table.contains(it, id_);
173 auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize()
174 : bucket.getConnectingNodesSize() + bucket.getNodesSize();
175 if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) {
176 auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes,
177 rd);
178 for (auto& node : nodesToTry)
179 routing_table.addConnectingNode(node);
180
181 nodes.insert(nodesToTry.begin(), nodesToTry.end());
182 }
183 }
184 lock.unlock();
185 for (auto& node : nodes)
186 tryConnect(node);
187}
188
189void
190SwarmManager::sendRequest(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
191 NodeId& nodeId,
192 Query q,
193 int numberNodes)
194{
195 msgpack::sbuffer buffer;
196 msgpack::packer<msgpack::sbuffer> pk(&buffer);
197 std::error_code ec;
198
199 Request toRequest {q, numberNodes, nodeId};
200 Message msg;
201 msg.is_mobile = isMobile_;
202 msg.request = std::move(toRequest);
203
204 pk.pack(msg);
205
206 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
207
208 if (ec) {
209 JAMI_ERROR("{}", ec.message());
210 return;
211 }
212}
213
214void
215SwarmManager::sendAnswer(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
216 const Message& msg_)
217{
218 std::lock_guard lock(mutex);
219
220 if (msg_.request->q == Query::FIND) {
221 auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num);
222 auto bucket = routing_table.findBucket(msg_.request->nodeId);
223 const auto& m_nodes = bucket->getMobileNodes();
224 Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}};
225
226 Message msg;
227 msg.is_mobile = isMobile_;
228 msg.response = std::move(toResponse);
229
230 msgpack::sbuffer buffer((size_t) 60000);
231 msgpack::packer<msgpack::sbuffer> pk(&buffer);
232 pk.pack(msg);
233
234 std::error_code ec;
235
236 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
237 if (ec) {
238 JAMI_ERROR("{}", ec.message());
239 return;
240 }
241 }
242
243 else {
244 }
245}
246
247void
248SwarmManager::receiveMessage(const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
249{
250 struct DecodingContext
251 {
252 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
253 nullptr,
254 512};
255 };
256
257 socket->setOnRecv([w = weak(),
258 wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
259 ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
260 ctx->pac.reserve_buffer(len);
261 std::copy_n(buf, len, ctx->pac.buffer());
262 ctx->pac.buffer_consumed(len);
263
264 msgpack::object_handle oh;
265 while (ctx->pac.next(oh)) {
266 auto shared = w.lock();
267 auto socket = wsocket.lock();
268 if (!shared || !socket)
269 return size_t {0};
270
271 try {
272 Message msg;
273 oh.get().convert(msg);
274
275 if (msg.is_mobile)
276 shared->changeMobility(socket->deviceId(), msg.is_mobile);
277
278 if (msg.request) {
279 shared->sendAnswer(socket, msg);
280
281 } else if (msg.response) {
282 shared->setKnownNodes(msg.response->nodes);
283 shared->setMobileNodes(msg.response->mobile_nodes);
284 }
285
286 } catch (const std::exception& e) {
287 JAMI_WARNING("Error DRT recv: {}", e.what());
288 return len;
289 }
290 }
291
292 return len;
293 });
294
295 socket->onShutdown([w = weak(), deviceId = socket->deviceId()] {
296 dht::ThreadPool::io().run([w, deviceId] {
297 auto shared = w.lock();
298 if (shared && !shared->isShutdown_) {
299 shared->removeNode(deviceId);
300 }
301 });
302 });
303}
304
305void
306SwarmManager::resetNodeExpiry(const asio::error_code& ec,
307 const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
308 NodeId node)
309{
311 std::list<Bucket>::iterator bucket;
312
313 if (ec == asio::error::operation_aborted)
314 return;
315
316 if (!node) {
317 bucket = routing_table.findBucket(socket->deviceId());
318 idToFind = bucket->randomId(rd);
319 } else {
320 bucket = routing_table.findBucket(node);
321 idToFind = node;
322 }
323
324 sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE);
325
326 if (!node) {
327 auto& nodeTimer = bucket->getNodeTimer(socket);
328 nodeTimer.expires_after(FIND_PERIOD);
329 nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
331 std::placeholders::_1,
332 socket,
333 NodeId {}));
334 }
335}
336
337void
338SwarmManager::tryConnect(const NodeId& nodeId)
339{
340 if (needSocketCb_)
341 needSocketCb_(nodeId.toString(),
342 [w = weak(),
343 nodeId](const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
344 auto shared = w.lock();
345 if (!shared || shared->isShutdown_)
346 return true;
347 if (socket) {
348 shared->addChannel(socket);
349 return true;
350 }
351 std::unique_lock lk(shared->mutex);
352 auto bucket = shared->routing_table.findBucket(nodeId);
353 bucket->removeConnectingNode(nodeId);
354 bucket->addKnownNode(nodeId);
355 bucket = shared->routing_table.findBucket(shared->getId());
356 if (bucket->getConnectingNodesSize() == 0 && bucket->isEmpty()
357 && shared->onConnectionChanged_) {
358 lk.unlock();
359 JAMI_LOG("[SwarmManager {:p}] Bootstrap: all connections failed",
360 fmt::ptr(shared.get()));
361 shared->onConnectionChanged_(false);
362 }
363 return true;
364 });
365}
366
367void
368SwarmManager::removeNodeInternal(const NodeId& nodeId)
369{
370 routing_table.removeNode(nodeId);
371}
372
373std::vector<NodeId>
375{
376 std::lock_guard lock(mutex);
377 return routing_table.getAllNodes();
378}
379
380void
381SwarmManager::deleteNode(std::vector<NodeId> nodes)
382{
383 {
384 std::lock_guard lock(mutex);
385 for (const auto& node : nodes) {
386 routing_table.deleteNode(node);
387 }
388 }
390}
391
392} // namespace jami
static constexpr int BUCKET_MAX_SIZE
std::vector< NodeId > getAllNodes() const
Return every node from each bucket.
void deleteNode(const NodeId &nodeId)
Delete node from every table in bucket.
bool addKnownNode(const NodeId &nodeId)
Add known node to routing table.
std::vector< NodeId > closestNodes(const NodeId &nodeId, unsigned count)
Returns the count closest nodes to a specific nodeId.
void shutdownAllNodes()
Shutdowns all nodes in routing table and add them to known_nodes or mobile_nodes.
bool addMobileNode(const NodeId &nodeId)
Add mobile node to routing table.
std::list< Bucket >::iterator findBucket(const NodeId &nodeId)
Returns bucket iterator containing nodeId.
bool removeNode(const NodeId &nodeId)
Removes node from routing table Adds it to known_nodes or mobile_nodes depending on mobility.
bool hasNode(const NodeId &nodeId)
Check if connected node exsits in routing table.
bool contains(const std::list< Bucket >::iterator &it, const NodeId &nodeId) const
Test if connected nodeId is in specific bucket.
std::list< Bucket > & getBuckets()
Returns buckets in routing table.
bool addConnectingNode(const NodeId &nodeId)
Add connecting node to routing table.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add socket to bucket.
void setId(const NodeId &node)
Sets id for routing table.
bool isMobile() const
Get mobility of swarm manager.
NeedSocketCb needSocketCb_
bool isConnectedWith(const NodeId &deviceId)
Check if we're connected with a specific device.
void removeNode(const NodeId &nodeId)
Remove channel from routing table.
void deleteNode(std::vector< NodeId > nodes)
Delete nodes from the different tables in bucket.
void shutdown()
Shutdown swarm manager.
SwarmManager(const NodeId &, const std::mt19937_64 &rand, ToConnectCb &&toConnectCb)
void changeMobility(const NodeId &nodeId, bool isMobile)
Change mobility of specific node.
void maintainBuckets(const std::set< NodeId > &toConnect={})
Maintain/Update buckets.
void setMobileNodes(const std::vector< NodeId > &mobile_nodes)
Set list of nodes to the routing table mobile_nodes.
const NodeId & getId() const
Get swarm manager id.
void restart()
Restart the swarm manager.
bool setKnownNodes(const std::vector< NodeId > &known_nodes)
Set list of nodes to the routing table known_nodes.
std::vector< NodeId > getAllNodes() const
get all nodes from the different tables in bucket
void addChannel(const std::shared_ptr< dhtnet::ChannelSocketInterface > &channel)
Add channel to routing table.
std::weak_ptr< SwarmManager > weak()
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:226
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
dht::h256 NodeId
static constexpr const std::chrono::minutes FIND_PERIOD
void emitSignal(Args... args)
Definition ring_signal.h:64
dht::PkId NodeId