Ring Daemon 16.0.0
Loading...
Searching...
No Matches
sync_module.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "sync_module.h"
19
22
23#include <dhtnet/multiplexed_socket.h>
24#include <opendht/thread_pool.h>
25
26namespace jami {
27
28class SyncModule::Impl : public std::enable_shared_from_this<Impl>
29{
30public:
31 Impl(std::weak_ptr<JamiAccount>&& account);
32
33 std::weak_ptr<JamiAccount> account_;
34
35 // Sync connections
36 std::recursive_mutex syncConnectionsMtx_;
37 std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>
39
44 void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
45 const std::shared_ptr<SyncMsg>& syncMsg);
46 void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
47 const DeviceId& device);
48};
49
50SyncModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account)
51 : account_(account)
52{}
53
54void
55SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
56 const std::shared_ptr<SyncMsg>& syncMsg)
57{
58 auto acc = account_.lock();
59 if (!acc)
60 return;
61 msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
62 std::error_code ec;
63 if (!syncMsg) {
64 // Send contacts infos
65 // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
66 // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
67 if (auto info = acc->accountManager()->getInfo()) {
68 if (info->contacts) {
70 msg.ds = info->contacts->getSyncData();
71 msgpack::pack(buffer, msg);
72 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
73 buffer.size(),
74 ec);
75 if (ec) {
76 JAMI_ERROR("{:s}", ec.message());
77 return;
78 }
79 }
80 }
81 buffer.clear();
82 // Sync conversations
83 auto c = ConversationModule::convInfos(acc->getAccountID());
84 if (!c.empty()) {
86 msg.c = std::move(c);
87 msgpack::pack(buffer, msg);
88 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
89 if (ec) {
90 JAMI_ERROR("{:s}", ec.message());
91 return;
92 }
93 }
94 buffer.clear();
95 // Sync requests
96 auto cr = ConversationModule::convRequests(acc->getAccountID());
97 if (!cr.empty()) {
99 msg.cr = std::move(cr);
100 msgpack::pack(buffer, msg);
101 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
102 if (ec) {
103 JAMI_ERROR("{:s}", ec.message());
104 return;
105 }
106 }
107 buffer.clear();
108 auto convModule = acc->convModule(true);
109 if (!convModule)
110 return;
111 // Sync conversation's preferences
112 auto p = convModule->convPreferences();
113 if (!p.empty()) {
114 SyncMsg msg;
115 msg.p = std::move(p);
116 msgpack::pack(buffer, msg);
117 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
118 if (ec) {
119 JAMI_ERROR("{:s}", ec.message());
120 return;
121 }
122 }
123 buffer.clear();
124 // Sync read's status
125 auto ms = convModule->convMessageStatus();
126 if (!ms.empty()) {
127 SyncMsg msg;
128 msg.ms = std::move(ms);
129 msgpack::pack(buffer, msg);
130 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
131 if (ec) {
132 JAMI_ERROR("{:s}", ec.message());
133 return;
134 }
135 }
136 buffer.clear();
137
138 } else {
139 msgpack::pack(buffer, *syncMsg);
140 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
141 if (ec)
142 JAMI_ERROR("{:s}", ec.message());
143 }
144}
145
147
148SyncModule::SyncModule(std::weak_ptr<JamiAccount>&& account)
149 : pimpl_ {std::make_shared<Impl>(std::move(account))}
150{}
151
152void
153SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
154 const DeviceId& device)
155{
156 std::lock_guard lk(syncConnectionsMtx_);
157 auto connectionsIt = syncConnections_.find(device);
158 if (connectionsIt == syncConnections_.end())
159 return;
160 auto& connections = connectionsIt->second;
161 auto conn = std::find(connections.begin(), connections.end(), socket);
162 if (conn != connections.end())
163 connections.erase(conn);
164 if (connections.empty())
166}
167
168void
169SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
170 const std::string& peerId,
171 const DeviceId& device)
172{
173 std::lock_guard lk(pimpl_->syncConnectionsMtx_);
174 pimpl_->syncConnections_[device].emplace_back(socket);
175
176 socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)]() {
177 if (auto shared = w.lock())
178 shared->onChannelShutdown(s.lock(), device);
179 });
180
181 struct DecodingContext
182 {
183 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
184 nullptr,
185 512};
186 };
187
188 socket->setOnRecv([acc = pimpl_->account_.lock(),
189 device,
190 peerId,
191 ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
192 if (!buf || !acc)
193 return len;
194
195 ctx->pac.reserve_buffer(len);
196 std::copy_n(buf, len, ctx->pac.buffer());
197 ctx->pac.buffer_consumed(len);
198
199 msgpack::object_handle oh;
200 try {
201 while (ctx->pac.next(oh)) {
202 SyncMsg msg;
203 oh.get().convert(msg);
204 if (auto manager = acc->accountManager())
205 manager->onSyncData(std::move(msg.ds), false);
206
207 if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty()
208 || !msg.ms.empty())
209 if (auto cm = acc->convModule(true))
210 cm->onSyncData(msg, peerId, device.toString());
211 }
212 } catch (const std::exception& e) {
213 JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
214 }
215
216 return len;
217 });
218
219 dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket]() {
220 if (auto s = w.lock())
221 s->syncInfos(socket, nullptr);
222 });
223}
224
225bool
226SyncModule::isConnected(const DeviceId& deviceId) const
227{
228 std::lock_guard lk(pimpl_->syncConnectionsMtx_);
229 auto it = pimpl_->syncConnections_.find(deviceId);
230 if (it == pimpl_->syncConnections_.end())
231 return false;
232 return !it->second.empty();
233}
234
235void
236SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
237{
238 std::lock_guard lk(pimpl_->syncConnectionsMtx_);
239 for (auto& [did, sockets] : pimpl_->syncConnections_) {
240 if (not sockets.empty()) {
241 if (!deviceId || deviceId == did) {
242 pimpl_->syncInfos(sockets[0], syncMsg);
243 }
244 }
245 }
246}
247} // namespace jami
static std::map< std::string, ConversationRequest > convRequests(const std::string &accountId)
static std::map< std::string, ConvInfo > convInfos(const std::string &accountId)
void syncInfos(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const std::shared_ptr< SyncMsg > &syncMsg)
Build SyncMsg and send it on socket.
std::map< DeviceId, std::vector< std::shared_ptr< dhtnet::ChannelSocket > > > syncConnections_
std::recursive_mutex syncConnectionsMtx_
Impl(std::weak_ptr< JamiAccount > &&account)
void onChannelShutdown(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const DeviceId &device)
std::weak_ptr< JamiAccount > account_
SyncModule(std::weak_ptr< JamiAccount > &&account)
void syncWithConnected(const std::shared_ptr< SyncMsg > &syncMsg=nullptr, const DeviceId &deviceId={})
Send sync to all connected devices.
bool isConnected(const DeviceId &deviceId) const
void cacheSyncConnection(std::shared_ptr< dhtnet::ChannelSocket > &&socket, const std::string &peerId, const DeviceId &deviceId)
Store a new Sync connection.
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
dht::PkId DeviceId
void emitSignal(Args... args)
Definition ring_signal.h:64
std::map< std::string, std::map< std::string, std::string > > p
std::map< std::string, std::map< std::string, std::map< std::string, std::string > > > ms
std::map< std::string, ConversationRequest > cr
std::map< std::string, ConvInfo > c