Ring Daemon
Loading...
Searching...
No Matches
message_channel_handler.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
18
19#include <dhtnet/channel_utils.h>
20#include <string_view>
21
22using namespace std::literals;
23
24static constexpr auto MESSAGE_SCHEME = "msg:"sv;
25
26namespace jami {
27
28using Key = std::pair<std::string, DeviceId>;
29
30struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl>
31{
32 dhtnet::ConnectionManager& connectionManager_;
35 std::recursive_mutex connectionsMtx_;
36 std::map<std::string, std::map<DeviceId, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>> connections_;
37
43
44 void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
45 const std::string& peerId,
46 const DeviceId& device);
47};
48
55
57{
58 std::unique_lock lk(pimpl_->connectionsMtx_);
59 for (const auto& [peerId, _] : pimpl_->connections_) {
60 pimpl_->onPeerStateChanged_(peerId, false);
61 }
62 auto connections = std::move(pimpl_->connections_);
63 pimpl_->connections_.clear();
64 lk.unlock();
65}
66
67void
69 const std::string&,
70 ConnectCb&& cb,
71 const std::string& connectionType,
73{
74 auto channelName = concat(MESSAGE_SCHEME, deviceId.to_view());
75 if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) {
76 JAMI_LOG("Already connecting to {}", deviceId);
77 return;
78 }
79 pimpl_->connectionManager_
80 .connectDevice(deviceId, channelName, std::move(cb), false, forceNewConnection, connectionType);
81}
82
83void
84MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
85 const std::string& peerId,
86 const DeviceId& device)
87{
88 std::lock_guard lk(connectionsMtx_);
89 auto peerIt = connections_.find(peerId);
90 if (peerIt == connections_.end()) {
91 JAMI_WARNING("onChannelShutdown: No connections found for peer {}", peerId);
92 return;
93 }
94 auto connectionsIt = peerIt->second.find(device);
95 if (connectionsIt == peerIt->second.end()) {
96 JAMI_WARNING("onChannelShutdown: No connections found for device {} of peer {}", device.toString(), peerId);
97 return;
98 }
99 auto& connections = connectionsIt->second;
100 auto conn = std::find(connections.begin(), connections.end(), socket);
101 if (conn != connections.end())
102 connections.erase(conn);
103 if (connections.empty()) {
104 peerIt->second.erase(connectionsIt);
105 }
106 if (peerIt->second.empty()) {
107 connections_.erase(peerIt);
108 onPeerStateChanged_(peerId, false);
109 }
110}
111
112std::shared_ptr<dhtnet::ChannelSocket>
113MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const
114{
115 std::lock_guard lk(pimpl_->connectionsMtx_);
116 auto it = pimpl_->connections_.find(peer);
117 if (it == pimpl_->connections_.end())
118 return nullptr;
119 auto deviceIt = it->second.find(deviceId);
120 if (deviceIt == it->second.end())
121 return nullptr;
122 if (deviceIt->second.empty())
123 return nullptr;
124 return deviceIt->second.back();
125}
126
127std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
128MessageChannelHandler::getChannels(const std::string& peer) const
129{
130 std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets;
131 std::lock_guard lk(pimpl_->connectionsMtx_);
132 auto it = pimpl_->connections_.find(peer);
133 if (it == pimpl_->connections_.end())
134 return sockets;
135 sockets.reserve(it->second.size());
136 for (auto& [deviceId, channels] : it->second) {
137 for (auto& channel : channels) {
138 sockets.push_back(channel);
139 }
140 }
141 return sockets;
142}
143
144bool
145MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& /* name */)
146{
147 if (!cert || !cert->issuer)
148 return false;
149 return true;
150}
151
152void
153MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& cert,
154 const std::string&,
155 std::shared_ptr<dhtnet::ChannelSocket> socket)
156{
157 if (!cert || !cert->issuer)
158 return;
159 auto peerId = cert->issuer->getId().toString();
160 auto device = cert->getLongId();
161 std::lock_guard lk(pimpl_->connectionsMtx_);
162 auto& connections = pimpl_->connections_[peerId];
163 bool newPeerConnection = connections.empty();
164 auto& deviceConnections = connections[device];
165 deviceConnections.push_back(socket);
167 pimpl_->onPeerStateChanged_(peerId, true);
168
169 socket->setOnRecv(dhtnet::buildMsgpackReader<Message>([onMessage = pimpl_->onMessage_, cert](Message&& msg) {
170 onMessage(cert, msg.t, msg.c);
171 return std::error_code();
172 }));
173
174 socket->onShutdown(
175 [w = pimpl_->weak_from_this(), peerId, device, s = std::weak_ptr(socket)](const std::error_code& /*ec*/) {
176 if (auto shared = w.lock())
177 shared->onChannelShutdown(s.lock(), peerId, device);
178 });
179}
180
181void
183 const DeviceId& device,
184 const std::shared_ptr<dhtnet::ChannelSocket>& conn)
185{
186 if (!conn)
187 return;
188 std::unique_lock lk(pimpl_->connectionsMtx_);
189 auto it = pimpl_->connections_.find(peer);
190 if (it != pimpl_->connections_.end()) {
191 auto deviceIt = it->second.find(device);
192 if (deviceIt != it->second.end()) {
193 auto& channels = deviceIt->second;
194 channels.erase(std::remove(channels.begin(), channels.end(), conn), channels.end());
195 if (channels.empty()) {
196 it->second.erase(deviceIt);
197 if (it->second.empty()) {
198 pimpl_->connections_.erase(it);
199 pimpl_->onPeerStateChanged_(peer, false);
200 }
201 }
202 }
203 }
204 lk.unlock();
205 conn->stop();
206}
207
208bool
209MessageChannelHandler::sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const Message& message)
210{
211 if (!socket)
212 return false;
213 msgpack::sbuffer buffer(UINT16_MAX); // Use max
214 msgpack::pack(buffer, message);
215 std::error_code ec;
216 auto sent = socket->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size(), ec);
217 if (ec) {
218 JAMI_WARNING("Error sending message: {:s}", ec.message());
219 }
220 return !ec && sent == buffer.size();
221}
222
223} // namespace jami
#define _(S)
Definition SHA3.cpp:119
A Channel handler is used to make the link between JamiAccount and ConnectionManager Its role is to m...
static bool sendMessage(const std::shared_ptr< dhtnet::ChannelSocket > &, const Message &message)
std::vector< std::shared_ptr< dhtnet::ChannelSocket > > getChannels(const std::string &peer) const
void closeChannel(const std::string &peer, const DeviceId &device, const std::shared_ptr< dhtnet::ChannelSocket > &conn)
std::shared_ptr< dhtnet::ChannelSocket > getChannel(const std::string &peer, const DeviceId &deviceId) const
std::function< void(const std::string &, bool)> OnPeerStateChanged
std::function< void(const std::shared_ptr< dht::crypto::Certificate > &, std::string &, const std::string &)> OnMessage
bool onRequest(const std::shared_ptr< dht::crypto::Certificate > &peer, const std::string &name) override
Determine if we accept or not the message request.
MessageChannelHandler(dhtnet::ConnectionManager &cm, OnMessage onMessage, OnPeerStateChanged onPeer)
void onReady(const std::shared_ptr< dht::crypto::Certificate > &peer, const std::string &name, std::shared_ptr< dhtnet::ChannelSocket > channel) override
Launch message process.
void connect(const DeviceId &deviceId, const std::string &, ConnectCb &&cb, const std::string &connectionType, bool forceNewConnection=false) override
Ask for a new message channel.
#define JAMI_WARNING(formatstr,...)
Definition logger.h:242
#define JAMI_LOG(formatstr,...)
Definition logger.h:237
static constexpr auto MESSAGE_SCHEME
std::function< void(std::shared_ptr< dhtnet::ChannelSocket >, const DeviceId &)> ConnectCb
dht::PkId DeviceId
void emitSignal(Args... args)
Definition jami_signal.h:64
std::pair< std::string, DeviceId > Key
std::string concat(Args &&... args)
std::map< std::string, std::map< DeviceId, std::vector< std::shared_ptr< dhtnet::ChannelSocket > > > > connections_
void onChannelShutdown(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const std::string &peerId, const DeviceId &device)
Impl(dhtnet::ConnectionManager &cm, OnMessage onMessage, OnPeerStateChanged onPeer)
dhtnet::ConnectionManager & connectionManager_