19#include <dhtnet/multiplexed_socket.h>
20#include <opendht/thread_pool.h>
24using namespace swarm_protocol;
31 routing_table.
setId(
id);
46 std::lock_guard lock(mutex);
47 for (
const auto& nodeId : known_nodes) {
48 if (addKnownNode(nodeId)) {
58 auto shared = w.lock();
65 for (
const auto& nodeId :
newNodes) {
66 if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
78 std::lock_guard lock(mutex);
79 for (
const auto& nodeId : mobile_nodes)
80 addMobileNodes(nodeId);
91 std::lock_guard lock(mutex);
96 resetNodeExpiry(
ec, channel, id_);
99 receiveMessage(channel);
100 if (emit && onConnectionChanged_) {
102 JAMI_DEBUG(
"[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(
this));
103 onConnectionChanged_(
true);
111 std::unique_lock
lk(mutex);
113 removeNodeInternal(nodeId);
122 std::lock_guard lock(mutex);
130 return routing_table.
hasNode(deviceId);
140 std::lock_guard lock(mutex);
151SwarmManager::addKnownNode(
const NodeId& nodeId)
157SwarmManager::addMobileNodes(
const NodeId& nodeId)
168 std::unique_lock lock(mutex);
170 for (
auto it = buckets.begin();
it != buckets.end(); ++
it) {
173 auto connecting_nodes =
myBucket ?
bucket.getConnectingNodesSize()
174 :
bucket.getConnectingNodesSize() +
bucket.getNodesSize();
185 for (
auto& node : nodes)
190SwarmManager::sendRequest(
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
197 msgpack::packer<msgpack::sbuffer> pk(&
buffer);
204 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
212SwarmManager::sendAnswer(
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
215 std::lock_guard lock(mutex);
217 if (
msg_.request->q == Query::FIND) {
224 msg.is_mobile = isMobile_;
227 msgpack::sbuffer
buffer((
size_t) 60000);
228 msgpack::packer<msgpack::sbuffer> pk(&
buffer);
232 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
241SwarmManager::receiveMessage(
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
243 struct DecodingContext
245 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t,
void*) {
return true; },
250 socket->setOnRecv([w =
weak(),
251 wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
252 ctx = std::make_shared<DecodingContext>()](
const uint8_t*
buf,
size_t len) {
253 ctx->pac.reserve_buffer(len);
254 std::copy_n(
buf, len, ctx->pac.buffer());
255 ctx->pac.buffer_consumed(len);
257 msgpack::object_handle
oh;
258 while (ctx->pac.next(
oh)) {
259 auto shared = w.lock();
261 if (!shared || !socket)
266 oh.get().convert(
msg);
269 shared->changeMobility(socket->deviceId(),
msg.is_mobile);
272 shared->sendAnswer(socket,
msg);
274 }
else if (
msg.response) {
275 shared->setKnownNodes(
msg.response->nodes);
276 shared->setMobileNodes(
msg.response->mobile_nodes);
279 }
catch (
const std::exception&
e) {
288 socket->onShutdown([w =
weak(), deviceId = socket->deviceId()] {
289 dht::ThreadPool::io().run([w, deviceId] {
290 auto shared = w.lock();
291 if (shared && !shared->isShutdown_) {
292 shared->removeNode(deviceId);
299SwarmManager::resetNodeExpiry(
const asio::error_code&
ec,
300 const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
304 std::list<Bucket>::iterator
bucket;
306 if (
ec == asio::error::operation_aborted)
322 nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
324 std::placeholders::_1,
331SwarmManager::tryConnect(
const NodeId& nodeId)
336 nodeId](
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
337 auto shared = w.lock();
338 if (!shared || shared->isShutdown_)
341 shared->addChannel(socket);
344 std::unique_lock
lk(shared->mutex);
345 auto bucket = shared->routing_table.findBucket(nodeId);
346 bucket->removeConnectingNode(nodeId);
347 bucket->addKnownNode(nodeId);
348 bucket = shared->routing_table.findBucket(shared->getId());
349 if (
bucket->getConnectingNodesSize() == 0 &&
bucket->isEmpty()
350 && shared->onConnectionChanged_) {
352 JAMI_LOG(
"[SwarmManager {:p}] Bootstrap: all connections failed",
353 fmt::ptr(shared.get()));
354 shared->onConnectionChanged_(
false);
361SwarmManager::removeNodeInternal(
const NodeId& nodeId)
369 std::lock_guard lock(mutex);
377 std::lock_guard lock(mutex);
378 for (
const auto& node : nodes) {
static constexpr int BUCKET_MAX_SIZE
std::vector< NodeId > getAllNodes() const
Return every node from each bucket.
void deleteNode(const NodeId &nodeId)
Delete node from every table in bucket.
bool addKnownNode(const NodeId &nodeId)
Add known node to routing table.
std::vector< NodeId > closestNodes(const NodeId &nodeId, unsigned count)
Returns the count closest nodes to a specific nodeId.
void shutdownAllNodes()
Shutdowns all nodes in routing table and add them to known_nodes or mobile_nodes.
bool addMobileNode(const NodeId &nodeId)
Add mobile node to routing table.
std::list< Bucket >::iterator findBucket(const NodeId &nodeId)
Returns bucket iterator containing nodeId.
bool removeNode(const NodeId &nodeId)
Removes node from routing table Adds it to known_nodes or mobile_nodes depending on mobility.
bool hasNode(const NodeId &nodeId)
Check if connected node exsits in routing table.
bool contains(const std::list< Bucket >::iterator &it, const NodeId &nodeId) const
Test if connected nodeId is in specific bucket.
std::list< Bucket > & getBuckets()
Returns buckets in routing table.
bool addConnectingNode(const NodeId &nodeId)
Add connecting node to routing table.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add socket to bucket.
void setId(const NodeId &node)
Sets id for routing table.
bool isMobile() const
Get mobility of swarm manager.
NeedSocketCb needSocketCb_
bool isConnectedWith(const NodeId &deviceId)
Check if we're connected with a specific device.
void removeNode(const NodeId &nodeId)
Remove channel from routing table.
void deleteNode(std::vector< NodeId > nodes)
Delete nodes from the different tables in bucket.
void shutdown()
Shutdown swarm manager.
SwarmManager(const NodeId &, const std::mt19937_64 &rand, ToConnectCb &&toConnectCb)
void changeMobility(const NodeId &nodeId, bool isMobile)
Change mobility of specific node.
void maintainBuckets(const std::set< NodeId > &toConnect={})
Maintain/Update buckets.
void setMobileNodes(const std::vector< NodeId > &mobile_nodes)
Set list of nodes to the routing table mobile_nodes.
const NodeId & getId() const
Get swarm manager id.
void restart()
Restart the swarm manager.
bool setKnownNodes(const std::vector< NodeId > &known_nodes)
Set list of nodes to the routing table known_nodes.
std::vector< NodeId > getAllNodes() const
get all nodes from the different tables in bucket
void addChannel(const std::shared_ptr< dhtnet::ChannelSocketInterface > &channel)
Add channel to routing table.
std::weak_ptr< SwarmManager > weak()
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_WARNING(formatstr,...)
#define JAMI_LOG(formatstr,...)
static constexpr const std::chrono::minutes FIND_PERIOD
void emitSignal(Args... args)