19#include <dhtnet/multiplexed_socket.h>
20#include <dhtnet/channel_utils.h>
21#include <opendht/thread_pool.h>
25using namespace swarm_protocol;
32 routing_table.
setId(
id);
47 std::lock_guard lock(mutex);
48 for (
const auto& nodeId : known_nodes) {
49 if (addKnownNode(nodeId)) {
59 auto shared = w.lock();
66 for (
const auto& nodeId :
newNodes) {
67 if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
79 std::lock_guard lock(mutex);
80 for (
const auto& nodeId : mobile_nodes)
81 addMobileNodes(nodeId);
92 std::lock_guard lock(mutex);
97 resetNodeExpiry(
ec, channel, id_);
100 receiveMessage(channel);
101 if (emit && onConnectionChanged_) {
103 JAMI_DEBUG(
"[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(
this));
104 onConnectionChanged_(
true);
112 std::unique_lock
lk(mutex);
114 removeNodeInternal(nodeId);
123 std::lock_guard lock(mutex);
131 return routing_table.
hasNode(deviceId);
141 std::lock_guard lock(mutex);
152SwarmManager::addKnownNode(
const NodeId& nodeId)
158SwarmManager::addMobileNodes(
const NodeId& nodeId)
169 std::unique_lock lock(mutex);
171 for (
auto it = buckets.begin();
it != buckets.end(); ++
it) {
174 auto connecting_nodes =
myBucket ?
bucket.getConnectingNodesSize()
175 :
bucket.getConnectingNodesSize() +
bucket.getNodesSize();
185 for (
const auto& node : nodes)
190SwarmManager::sendRequest(
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
197 msgpack::packer<msgpack::sbuffer> pk(&
buffer);
204 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
212SwarmManager::sendAnswer(
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
const Message&
msg_)
214 std::lock_guard lock(mutex);
216 if (
msg_.request->q == Query::FIND) {
223 msg.is_mobile = isMobile_;
226 msgpack::sbuffer
buffer((
size_t) 60000);
227 msgpack::packer<msgpack::sbuffer> pk(&
buffer);
231 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
240SwarmManager::receiveMessage(
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
242 socket->setOnRecv(dhtnet::buildMsgpackReader<Message>(
243 [w =
weak(),
wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket)](Message&&
msg) {
244 auto shared = w.lock();
246 if (!shared || !socket)
247 return std::make_error_code(std::errc::operation_canceled);
250 shared->changeMobility(socket->deviceId(),
msg.is_mobile);
253 shared->sendAnswer(socket,
msg);
255 }
else if (
msg.response) {
256 shared->setKnownNodes(
msg.response->nodes);
257 shared->setMobileNodes(
msg.response->mobile_nodes);
259 return std::error_code();
262 socket->onShutdown([w =
weak(), deviceId = socket->deviceId()](
const std::error_code&) {
263 dht::ThreadPool::io().run([w, deviceId] {
264 auto shared = w.lock();
265 if (shared && !shared->isShutdown_) {
266 shared->removeNode(deviceId);
273SwarmManager::resetNodeExpiry(
const asio::error_code&
ec,
274 const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
278 std::list<Bucket>::iterator
bucket;
280 if (
ec == asio::error::operation_aborted)
296 nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
298 std::placeholders::_1,
305SwarmManager::tryConnect(
const NodeId& nodeId)
309 [w =
weak(), nodeId](
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
310 auto shared = w.lock();
311 if (!shared || shared->isShutdown_)
314 shared->addChannel(socket);
317 std::unique_lock
lk(shared->mutex);
318 auto bucket = shared->routing_table.findBucket(nodeId);
319 bucket->removeConnectingNode(nodeId);
320 bucket->addKnownNode(nodeId);
321 bucket = shared->routing_table.findBucket(shared->getId());
322 if (
bucket->getConnectingNodesSize() == 0 &&
bucket->isEmpty()
323 && shared->onConnectionChanged_) {
325 JAMI_LOG(
"[SwarmManager {:p}] Bootstrap: all connections failed", fmt::ptr(shared.get()));
326 shared->onConnectionChanged_(
false);
333SwarmManager::removeNodeInternal(
const NodeId& nodeId)
341 std::lock_guard lock(mutex);
348 std::lock_guard lock(mutex);
352std::vector<std::map<std::string, std::string>>
355 std::lock_guard lock(mutex);
357 std::vector<std::map<std::string, std::string>> result;
358 result.reserve(
stats.size());
362 {
"device",
stat.id },
363 {
"status",
stat.status },
364 {
"remoteAddress",
stat.remoteAddress },
365 {
"mobile",
stat.isMobile ?
"true" :
"false"}
367 if (
stat.connectionTime != std::chrono::system_clock::time_point::min()) {
368 auto tt = std::chrono::system_clock::to_time_t(
stat.connectionTime);
369 result.back().emplace(
"connectionTime", std::to_string(
tt));
378 std::lock_guard lock(mutex);
379 return !routing_table.
isEmpty();
386 std::lock_guard lock(mutex);
387 for (
const auto& node : nodes) {
static constexpr int BUCKET_MAX_SIZE
std::vector< NodeId > getAllNodes() const
Return every node from each bucket.
void deleteNode(const NodeId &nodeId)
Delete node from every table in bucket.
bool addKnownNode(const NodeId &nodeId)
Add known node to routing table.
std::vector< NodeId > closestNodes(const NodeId &nodeId, unsigned count)
Returns the count closest nodes to a specific nodeId.
void shutdownAllNodes()
Shutdowns all nodes in routing table and add them to known_nodes or mobile_nodes.
bool addMobileNode(const NodeId &nodeId)
Add mobile node to routing table.
std::list< Bucket >::iterator findBucket(const NodeId &nodeId)
Returns bucket iterator containing nodeId.
bool removeNode(const NodeId &nodeId)
Removes node from routing table Adds it to known_nodes or mobile_nodes depending on mobility.
bool hasNode(const NodeId &nodeId)
Check if connected node exsits in routing table.
std::vector< NodeId > getConnectedNodes() const
bool contains(const std::list< Bucket >::iterator &it, const NodeId &nodeId) const
Test if connected nodeId is in specific bucket.
std::list< Bucket > & getBuckets()
Returns buckets in routing table.
bool addConnectingNode(const NodeId &nodeId)
Add connecting node to routing table.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add socket to bucket.
std::vector< NodeStats > getRoutingTableStats() const
void setId(const NodeId &node)
Sets id for routing table.
bool isMobile() const
Get mobility of swarm manager.
NeedSocketCb needSocketCb_
bool isConnectedWith(const NodeId &deviceId)
Check if we're connected with a specific device.
void removeNode(const NodeId &nodeId)
Remove channel from routing table.
void shutdown()
Shutdown swarm manager.
SwarmManager(const NodeId &, const std::mt19937_64 &rand, ToConnectCb &&toConnectCb)
void changeMobility(const NodeId &nodeId, bool isMobile)
Change mobility of specific node.
void maintainBuckets(const std::set< NodeId > &toConnect={})
Maintain/Update buckets.
void deleteNode(const std::vector< NodeId > &nodes)
Delete nodes from the different tables in bucket.
void setMobileNodes(const std::vector< NodeId > &mobile_nodes)
Set list of nodes to the routing table mobile_nodes.
const NodeId & getId() const
Get swarm manager id.
void restart()
Restart the swarm manager.
bool setKnownNodes(const std::vector< NodeId > &known_nodes)
Set list of nodes to the routing table known_nodes.
std::vector< NodeId > getConnectedNodes() const
std::vector< NodeId > getAllNodes() const
get all nodes from the different tables in bucket
std::vector< std::map< std::string, std::string > > getRoutingTableInfo() const
void addChannel(const std::shared_ptr< dhtnet::ChannelSocketInterface > &channel)
Add channel to routing table.
std::weak_ptr< SwarmManager > weak()
bool isConnected() const
Check if swarm manager is connected.
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_LOG(formatstr,...)
static constexpr const std::chrono::minutes FIND_PERIOD
void emitSignal(Args... args)