19#include <dhtnet/multiplexed_socket.h>
20#include <opendht/thread_pool.h>
24using namespace swarm_protocol;
31 routing_table.
setId(
id);
46 std::lock_guard lock(mutex);
47 for (
const auto& nodeId : known_nodes) {
48 if (addKnownNode(nodeId)) {
58 auto shared = w.lock();
65 for (
const auto& nodeId :
newNodes) {
66 if (shared->toConnectCb_ && shared->toConnectCb_(nodeId))
78 std::lock_guard lock(mutex);
79 for (
const auto& nodeId : mobile_nodes)
80 addMobileNodes(nodeId);
91 std::lock_guard lock(mutex);
96 resetNodeExpiry(
ec, channel, id_);
99 receiveMessage(channel);
100 if (emit && onConnectionChanged_) {
102 JAMI_DEBUG(
"[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(
this));
103 onConnectionChanged_(
true);
111 std::unique_lock
lk(mutex);
113 removeNodeInternal(nodeId);
122 std::lock_guard lock(mutex);
130 return routing_table.
hasNode(deviceId);
140 std::lock_guard lock(mutex);
151SwarmManager::addKnownNode(
const NodeId& nodeId)
157SwarmManager::addMobileNodes(
const NodeId& nodeId)
168 std::unique_lock lock(mutex);
170 for (
auto it = buckets.begin();
it != buckets.end(); ++
it) {
173 auto connecting_nodes =
myBucket ?
bucket.getConnectingNodesSize()
174 :
bucket.getConnectingNodesSize() +
bucket.getNodesSize();
185 for (
auto& node : nodes)
190SwarmManager::sendRequest(
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
196 msgpack::packer<msgpack::sbuffer> pk(&
buffer);
201 msg.is_mobile = isMobile_;
206 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
215SwarmManager::sendAnswer(
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
218 std::lock_guard lock(mutex);
220 if (
msg_.request->q == Query::FIND) {
227 msg.is_mobile = isMobile_;
230 msgpack::sbuffer
buffer((
size_t) 60000);
231 msgpack::packer<msgpack::sbuffer> pk(&
buffer);
236 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
248SwarmManager::receiveMessage(
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket)
250 struct DecodingContext
252 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t,
void*) {
return true; },
257 socket->setOnRecv([w =
weak(),
258 wsocket = std::weak_ptr<dhtnet::ChannelSocketInterface>(socket),
259 ctx = std::make_shared<DecodingContext>()](
const uint8_t*
buf,
size_t len) {
260 ctx->pac.reserve_buffer(len);
261 std::copy_n(
buf, len, ctx->pac.buffer());
262 ctx->pac.buffer_consumed(len);
264 msgpack::object_handle
oh;
265 while (ctx->pac.next(
oh)) {
266 auto shared = w.lock();
268 if (!shared || !socket)
273 oh.get().convert(
msg);
276 shared->changeMobility(socket->deviceId(),
msg.is_mobile);
279 shared->sendAnswer(socket,
msg);
281 }
else if (
msg.response) {
282 shared->setKnownNodes(
msg.response->nodes);
283 shared->setMobileNodes(
msg.response->mobile_nodes);
286 }
catch (
const std::exception&
e) {
295 socket->onShutdown([w =
weak(), deviceId = socket->deviceId()] {
296 dht::ThreadPool::io().run([w, deviceId] {
297 auto shared = w.lock();
298 if (shared && !shared->isShutdown_) {
299 shared->removeNode(deviceId);
306SwarmManager::resetNodeExpiry(
const asio::error_code&
ec,
307 const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket,
311 std::list<Bucket>::iterator
bucket;
313 if (
ec == asio::error::operation_aborted)
329 nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry,
331 std::placeholders::_1,
338SwarmManager::tryConnect(
const NodeId& nodeId)
343 nodeId](
const std::shared_ptr<dhtnet::ChannelSocketInterface>& socket) {
344 auto shared = w.lock();
345 if (!shared || shared->isShutdown_)
348 shared->addChannel(socket);
351 std::unique_lock
lk(shared->mutex);
352 auto bucket = shared->routing_table.findBucket(nodeId);
353 bucket->removeConnectingNode(nodeId);
354 bucket->addKnownNode(nodeId);
355 bucket = shared->routing_table.findBucket(shared->getId());
356 if (
bucket->getConnectingNodesSize() == 0 &&
bucket->isEmpty()
357 && shared->onConnectionChanged_) {
359 JAMI_LOG(
"[SwarmManager {:p}] Bootstrap: all connections failed",
360 fmt::ptr(shared.get()));
361 shared->onConnectionChanged_(
false);
368SwarmManager::removeNodeInternal(
const NodeId& nodeId)
376 std::lock_guard lock(mutex);
384 std::lock_guard lock(mutex);
385 for (
const auto& node : nodes) {
static constexpr int BUCKET_MAX_SIZE
std::vector< NodeId > getAllNodes() const
Return every node from each bucket.
void deleteNode(const NodeId &nodeId)
Delete node from every table in bucket.
bool addKnownNode(const NodeId &nodeId)
Add known node to routing table.
std::vector< NodeId > closestNodes(const NodeId &nodeId, unsigned count)
Returns the count closest nodes to a specific nodeId.
void shutdownAllNodes()
Shutdowns all nodes in routing table and add them to known_nodes or mobile_nodes.
bool addMobileNode(const NodeId &nodeId)
Add mobile node to routing table.
std::list< Bucket >::iterator findBucket(const NodeId &nodeId)
Returns bucket iterator containing nodeId.
bool removeNode(const NodeId &nodeId)
Removes node from routing table Adds it to known_nodes or mobile_nodes depending on mobility.
bool hasNode(const NodeId &nodeId)
Check if connected node exsits in routing table.
bool contains(const std::list< Bucket >::iterator &it, const NodeId &nodeId) const
Test if connected nodeId is in specific bucket.
std::list< Bucket > & getBuckets()
Returns buckets in routing table.
bool addConnectingNode(const NodeId &nodeId)
Add connecting node to routing table.
bool addNode(const std::shared_ptr< dhtnet::ChannelSocketInterface > &socket)
Add socket to bucket.
void setId(const NodeId &node)
Sets id for routing table.
bool isMobile() const
Get mobility of swarm manager.
NeedSocketCb needSocketCb_
bool isConnectedWith(const NodeId &deviceId)
Check if we're connected with a specific device.
void removeNode(const NodeId &nodeId)
Remove channel from routing table.
void deleteNode(std::vector< NodeId > nodes)
Delete nodes from the different tables in bucket.
void shutdown()
Shutdown swarm manager.
SwarmManager(const NodeId &, const std::mt19937_64 &rand, ToConnectCb &&toConnectCb)
void changeMobility(const NodeId &nodeId, bool isMobile)
Change mobility of specific node.
void maintainBuckets(const std::set< NodeId > &toConnect={})
Maintain/Update buckets.
void setMobileNodes(const std::vector< NodeId > &mobile_nodes)
Set list of nodes to the routing table mobile_nodes.
const NodeId & getId() const
Get swarm manager id.
void restart()
Restart the swarm manager.
bool setKnownNodes(const std::vector< NodeId > &known_nodes)
Set list of nodes to the routing table known_nodes.
std::vector< NodeId > getAllNodes() const
get all nodes from the different tables in bucket
void addChannel(const std::shared_ptr< dhtnet::ChannelSocketInterface > &channel)
Add channel to routing table.
std::weak_ptr< SwarmManager > weak()
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_WARNING(formatstr,...)
#define JAMI_LOG(formatstr,...)
static constexpr const std::chrono::minutes FIND_PERIOD
void emitSignal(Args... args)