Ring Daemon 16.0.0
Loading...
Searching...
No Matches
message_channel_handler.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
18
19static constexpr const char MESSAGE_SCHEME[] {"msg:"};
20
21namespace jami {
22
23using Key = std::pair<std::string, DeviceId>;
24
25struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl>
26{
27 dhtnet::ConnectionManager& connectionManager_;
30 std::recursive_mutex connectionsMtx_;
31 std::map<std::string, std::map<DeviceId, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>> connections_;
32
38
39 void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
40 const std::string& peerId,
41 const DeviceId& device);
42};
43
49
51
52void
54 const std::string&,
55 ConnectCb&& cb,
56 const std::string& connectionType,
58{
59 auto channelName = MESSAGE_SCHEME + deviceId.toString();
60 if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) {
61 JAMI_LOG("Already connecting to {}", deviceId);
62 return;
63 }
64 pimpl_->connectionManager_.connectDevice(deviceId,
66 std::move(cb),
67 false,
70}
71
72void
73MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
74 const std::string& peerId,
75 const DeviceId& device)
76{
77 std::lock_guard lk(connectionsMtx_);
78 auto peerIt = connections_.find(peerId);
79 if (peerIt == connections_.end())
80 return;
81 auto connectionsIt = peerIt->second.find(device);
82 if (connectionsIt == peerIt->second.end())
83 return;
84 auto& connections = connectionsIt->second;
85 auto conn = std::find(connections.begin(), connections.end(), socket);
86 if (conn != connections.end())
87 connections.erase(conn);
88 if (connections.empty()) {
89 peerIt->second.erase(connectionsIt);
90 }
91 if (peerIt->second.empty()) {
92 connections_.erase(peerIt);
93 onPeerStateChanged_(peerId, false);
94 }
95}
96
97std::shared_ptr<dhtnet::ChannelSocket>
98MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const
99{
100 std::lock_guard lk(pimpl_->connectionsMtx_);
101 auto it = pimpl_->connections_.find(peer);
102 if (it == pimpl_->connections_.end())
103 return nullptr;
104 auto deviceIt = it->second.find(deviceId);
105 if (deviceIt == it->second.end())
106 return nullptr;
107 if (deviceIt->second.empty())
108 return nullptr;
109 return deviceIt->second.back();
110}
111
112std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
113MessageChannelHandler::getChannels(const std::string& peer) const
114{
115 std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets;
116 std::lock_guard lk(pimpl_->connectionsMtx_);
117 auto it = pimpl_->connections_.find(peer);
118 if (it == pimpl_->connections_.end())
119 return sockets;
120 sockets.reserve(it->second.size());
121 for (auto& [deviceId, channels] : it->second) {
122 for (auto& channel : channels) {
123 sockets.push_back(channel);
124 }
125 }
126 return sockets;
127}
128
129bool
130MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert,
131 const std::string& /* name */)
132{
133 if (!cert || !cert->issuer)
134 return false;
135 return true;
136}
137
138void
139MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& cert,
140 const std::string&,
141 std::shared_ptr<dhtnet::ChannelSocket> socket)
142{
143 if (!cert || !cert->issuer)
144 return;
145 auto peerId = cert->issuer->getId().toString();
146 auto device = cert->getLongId();
147 std::lock_guard lk(pimpl_->connectionsMtx_);
148 auto& connections = pimpl_->connections_[peerId];
149 bool newPeerConnection = connections.empty();
150 auto& deviceConnections = connections[device];
151 deviceConnections.push_back(socket);
153 pimpl_->onPeerStateChanged_(peerId, true);
154
155 socket->onShutdown([w = pimpl_->weak_from_this(), peerId, device, s = std::weak_ptr(socket)]() {
156 if (auto shared = w.lock())
157 shared->onChannelShutdown(s.lock(), peerId, device);
158 });
159
160 struct DecodingContext
161 {
162 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
163 nullptr,
164 16 * 1024};
165 };
166
167 socket->setOnRecv([onMessage = pimpl_->onMessage_,
168 peerId,
169 cert,
170 ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
171 if (!buf)
172 return len;
173
174 ctx->pac.reserve_buffer(len);
175 std::copy_n(buf, len, ctx->pac.buffer());
176 ctx->pac.buffer_consumed(len);
177
178 msgpack::object_handle oh;
179 try {
180 while (ctx->pac.next(oh)) {
181 Message msg;
182 oh.get().convert(msg);
183 onMessage(cert, msg.t, msg.c);
184 }
185 } catch (const std::exception& e) {
186 JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
187 }
188 return len;
189 });
190}
191
192void
193MessageChannelHandler::closeChannel(const std::string& peer, const DeviceId& device, const std::shared_ptr<dhtnet::ChannelSocket>& conn)
194{
195 if (!conn)
196 return;
197 std::unique_lock lk(pimpl_->connectionsMtx_);
198 auto it = pimpl_->connections_.find(peer);
199 if (it != pimpl_->connections_.end()) {
200 auto deviceIt = it->second.find(device);
201 if (deviceIt != it->second.end()) {
202 auto& channels = deviceIt->second;
203 channels.erase(std::remove(channels.begin(), channels.end(), conn), channels.end());
204 if (channels.empty()) {
205 it->second.erase(deviceIt);
206 if (it->second.empty()) {
207 pimpl_->connections_.erase(it);
208 }
209 }
210 }
211 }
212 lk.unlock();
213 conn->stop();
214}
215
216bool
217MessageChannelHandler::sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
218 const Message& message)
219{
220 if (!socket)
221 return false;
222 msgpack::sbuffer buffer(UINT16_MAX); // Use max
223 msgpack::pack(buffer, message);
224 std::error_code ec;
225 auto sent = socket->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size(), ec);
226 if (ec) {
227 JAMI_WARNING("Error sending message: {:s}", ec.message());
228 }
229 return !ec && sent == buffer.size();
230}
231
232} // namespace jami
A Channel handler is used to make the link between JamiAccount and ConnectionManager Its role is to m...
static bool sendMessage(const std::shared_ptr< dhtnet::ChannelSocket > &, const Message &message)
std::vector< std::shared_ptr< dhtnet::ChannelSocket > > getChannels(const std::string &peer) const
void closeChannel(const std::string &peer, const DeviceId &device, const std::shared_ptr< dhtnet::ChannelSocket > &conn)
std::shared_ptr< dhtnet::ChannelSocket > getChannel(const std::string &peer, const DeviceId &deviceId) const
std::function< void(const std::string &, bool)> OnPeerStateChanged
std::function< void(const std::shared_ptr< dht::crypto::Certificate > &, std::string &, const std::string &)> OnMessage
bool onRequest(const std::shared_ptr< dht::crypto::Certificate > &peer, const std::string &name) override
Determine if we accept or not the message request.
MessageChannelHandler(dhtnet::ConnectionManager &cm, OnMessage onMessage, OnPeerStateChanged onPeer)
void onReady(const std::shared_ptr< dht::crypto::Certificate > &peer, const std::string &name, std::shared_ptr< dhtnet::ChannelSocket > channel) override
Launch message process.
void connect(const DeviceId &deviceId, const std::string &, ConnectCb &&cb, const std::string &connectionType, bool forceNewConnection=false) override
Ask for a new message channel.
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
static constexpr const char MESSAGE_SCHEME[]
std::function< void(std::shared_ptr< dhtnet::ChannelSocket >, const DeviceId &)> ConnectCb
dht::PkId DeviceId
void emitSignal(Args... args)
Definition ring_signal.h:64
std::pair< std::string, DeviceId > Key
std::map< std::string, std::map< DeviceId, std::vector< std::shared_ptr< dhtnet::ChannelSocket > > > > connections_
void onChannelShutdown(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const std::string &peerId, const DeviceId &device)
Impl(dhtnet::ConnectionManager &cm, OnMessage onMessage, OnPeerStateChanged onPeer)
dhtnet::ConnectionManager & connectionManager_