23using Key = std::pair<std::string, DeviceId>;
31 std::map<std::string, std::map<DeviceId, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>>
connections_;
40 const std::string& peerId,
60 if (pimpl_->connectionManager_.isConnecting(deviceId,
channelName)) {
61 JAMI_LOG(
"Already connecting to {}", deviceId);
64 pimpl_->connectionManager_.connectDevice(deviceId,
74 const std::string& peerId,
91 if (
peerIt->second.empty()) {
97std::shared_ptr<dhtnet::ChannelSocket>
100 std::lock_guard
lk(pimpl_->connectionsMtx_);
101 auto it = pimpl_->connections_.find(peer);
102 if (
it == pimpl_->connections_.end())
112std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
115 std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
sockets;
116 std::lock_guard
lk(pimpl_->connectionsMtx_);
117 auto it = pimpl_->connections_.find(peer);
118 if (
it == pimpl_->connections_.end())
121 for (
auto& [deviceId, channels] :
it->second) {
122 for (
auto& channel : channels) {
141 std::shared_ptr<dhtnet::ChannelSocket> socket)
145 auto peerId =
cert->issuer->getId().toString();
146 auto device =
cert->getLongId();
147 std::lock_guard
lk(pimpl_->connectionsMtx_);
153 pimpl_->onPeerStateChanged_(peerId,
true);
155 socket->onShutdown([w = pimpl_->weak_from_this(), peerId, device, s = std::weak_ptr(socket)]() {
156 if (auto shared = w.lock())
157 shared->onChannelShutdown(s.lock(), peerId, device);
160 struct DecodingContext
162 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t,
void*) {
return true; },
167 socket->setOnRecv([
onMessage = pimpl_->onMessage_,
170 ctx = std::make_shared<DecodingContext>()](
const uint8_t*
buf,
size_t len) {
174 ctx->pac.reserve_buffer(len);
175 std::copy_n(
buf, len,
ctx->pac.buffer());
176 ctx->pac.buffer_consumed(len);
178 msgpack::object_handle
oh;
180 while (
ctx->pac.next(
oh)) {
182 oh.get().convert(
msg);
185 }
catch (
const std::exception&
e) {
197 std::unique_lock
lk(pimpl_->connectionsMtx_);
198 auto it = pimpl_->connections_.find(peer);
199 if (
it != pimpl_->connections_.end()) {
203 channels.erase(std::remove(channels.begin(), channels.end(),
conn), channels.end());
204 if (channels.empty()) {
206 if (
it->second.empty()) {
207 pimpl_->connections_.erase(
it);
223 msgpack::pack(
buffer, message);
A Channel handler is used to make the link between JamiAccount and ConnectionManager Its role is to m...
static bool sendMessage(const std::shared_ptr< dhtnet::ChannelSocket > &, const Message &message)
std::vector< std::shared_ptr< dhtnet::ChannelSocket > > getChannels(const std::string &peer) const
void closeChannel(const std::string &peer, const DeviceId &device, const std::shared_ptr< dhtnet::ChannelSocket > &conn)
std::shared_ptr< dhtnet::ChannelSocket > getChannel(const std::string &peer, const DeviceId &deviceId) const
std::function< void(const std::string &, bool)> OnPeerStateChanged
std::function< void(const std::shared_ptr< dht::crypto::Certificate > &, std::string &, const std::string &)> OnMessage
bool onRequest(const std::shared_ptr< dht::crypto::Certificate > &peer, const std::string &name) override
Determine if we accept or not the message request.
MessageChannelHandler(dhtnet::ConnectionManager &cm, OnMessage onMessage, OnPeerStateChanged onPeer)
void onReady(const std::shared_ptr< dht::crypto::Certificate > &peer, const std::string &name, std::shared_ptr< dhtnet::ChannelSocket > channel) override
Launch message process.
void connect(const DeviceId &deviceId, const std::string &, ConnectCb &&cb, const std::string &connectionType, bool forceNewConnection=false) override
Ask for a new message channel.
#define JAMI_WARNING(formatstr,...)
#define JAMI_LOG(formatstr,...)
static constexpr const char MESSAGE_SCHEME[]
std::function< void(std::shared_ptr< dhtnet::ChannelSocket >, const DeviceId &)> ConnectCb
void emitSignal(Args... args)
std::pair< std::string, DeviceId > Key
std::map< std::string, std::map< DeviceId, std::vector< std::shared_ptr< dhtnet::ChannelSocket > > > > connections_
void onChannelShutdown(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const std::string &peerId, const DeviceId &device)
OnPeerStateChanged onPeerStateChanged_
Impl(dhtnet::ConnectionManager &cm, OnMessage onMessage, OnPeerStateChanged onPeer)
std::recursive_mutex connectionsMtx_
dhtnet::ConnectionManager & connectionManager_