Ring Daemon
Loading...
Searching...
No Matches
sync_module.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "sync_module.h"
19
22
23#include <dhtnet/multiplexed_socket.h>
24#include <dhtnet/channel_utils.h>
25#include <opendht/thread_pool.h>
26
27namespace jami {
28
29class SyncModule::Impl : public std::enable_shared_from_this<Impl>
30{
31public:
32 Impl(const std::shared_ptr<JamiAccount>& account);
33
34 std::weak_ptr<JamiAccount> account_;
35 const std::string accountId_;
36
37 // Sync connections
38 std::recursive_mutex syncConnectionsMtx_;
39 std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
40
45 void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const std::shared_ptr<SyncMsg>& syncMsg);
46 void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device);
47};
48
49SyncModule::Impl::Impl(const std::shared_ptr<JamiAccount>& account)
50 : account_(account)
51 , accountId_ {account->getAccountID()}
52{}
53
54void
55SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
56 const std::shared_ptr<SyncMsg>& syncMsg)
57{
58 auto acc = account_.lock();
59 if (!acc)
60 return;
61 msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
62 std::error_code ec;
63 if (!syncMsg) {
64 // Send contacts infos
65 // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
66 // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
67 if (auto info = acc->accountManager()->getInfo()) {
68 if (info->contacts) {
70 msg.ds = info->contacts->getSyncData();
71 msgpack::pack(buffer, msg);
72 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
73 if (ec) {
74 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
75 return;
76 }
77 }
78 }
79 buffer.clear();
80 // Sync conversations
81 auto c = ConversationModule::convInfos(acc->getAccountID());
82 if (!c.empty()) {
84 msg.c = std::move(c);
85 msgpack::pack(buffer, msg);
86 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
87 if (ec) {
88 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
89 return;
90 }
91 }
92 buffer.clear();
93 // Sync requests
94 auto cr = ConversationModule::convRequests(acc->getAccountID());
95 if (!cr.empty()) {
97 msg.cr = std::move(cr);
98 msgpack::pack(buffer, msg);
99 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
100 if (ec) {
101 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
102 return;
103 }
104 }
105 buffer.clear();
106 auto convModule = acc->convModule(true);
107 if (!convModule)
108 return;
109 // Sync conversation's preferences
110 auto p = convModule->convPreferences();
111 if (!p.empty()) {
112 SyncMsg msg;
113 msg.p = std::move(p);
114 msgpack::pack(buffer, msg);
115 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
116 if (ec) {
117 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
118 return;
119 }
120 }
121 buffer.clear();
122 // Sync read's status
123 auto ms = convModule->convMessageStatus();
124 if (!ms.empty()) {
125 SyncMsg msg;
126 msg.ms = std::move(ms);
127 msgpack::pack(buffer, msg);
128 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
129 if (ec) {
130 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
131 return;
132 }
133 }
134 buffer.clear();
135
136 } else {
137 msgpack::pack(buffer, *syncMsg);
138 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
139 if (ec)
140 JAMI_ERROR("[Account {}] [device {}] {:s}", accountId_, socket->deviceId(), ec.message());
141 }
142}
143
145
146SyncModule::SyncModule(const std::shared_ptr<JamiAccount>& account)
147 : pimpl_ {std::make_shared<Impl>(account)}
148{}
149
150void
151SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device)
152{
153 std::lock_guard lk(syncConnectionsMtx_);
154 auto connectionsIt = syncConnections_.find(device);
155 if (connectionsIt == syncConnections_.end()) {
156 JAMI_WARNING("[Account {}] [device {}] onChannelShutdown: no connection found.", accountId_, device.to_view());
157 return;
158 }
159 auto& connections = connectionsIt->second;
160 auto conn = std::find(connections.begin(), connections.end(), socket);
161 if (conn != connections.end())
162 connections.erase(conn);
163 JAMI_LOG("[Account {}] [device {}] removed connection, remaining: {:d}",
165 device.to_view(),
166 connections.size());
167 if (connections.empty())
169}
170
171void
172SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
173 const std::string& peerId,
174 const DeviceId& device)
175{
176 std::lock_guard lk(pimpl_->syncConnectionsMtx_);
177 pimpl_->syncConnections_[device].emplace_back(socket);
178
179 socket->setOnRecv(dhtnet::buildMsgpackReader<SyncMsg>([acc = pimpl_->account_, device, peerId](SyncMsg&& msg) {
180 auto account = acc.lock();
181 if (!account)
182 return std::make_error_code(std::errc::operation_canceled);
183
184 try {
185 if (auto manager = account->accountManager())
186 manager->onSyncData(std::move(msg.ds), false);
187
188 if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
189 if (auto cm = account->convModule(true))
190 cm->onSyncData(msg, peerId, device.toString());
191 } catch (const std::exception& e) {
192 JAMI_WARNING("[Account {}] [device {}] [convInfo] error on sync: {:s}",
193 account->getAccountID(),
194 device.to_view(),
195 e.what());
196 }
197 return std::error_code();
198 }));
199 socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)](const std::error_code&) {
200 if (auto shared = w.lock())
201 shared->onChannelShutdown(s.lock(), device);
202 });
203
204 dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket = std::move(socket)]() {
205 if (auto s = w.lock())
206 s->syncInfos(socket, nullptr);
207 });
208}
209
210bool
211SyncModule::isConnected(const DeviceId& deviceId) const
212{
213 std::lock_guard lk(pimpl_->syncConnectionsMtx_);
214 auto it = pimpl_->syncConnections_.find(deviceId);
215 if (it == pimpl_->syncConnections_.end())
216 return false;
217 return !it->second.empty();
218}
219
220void
221SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
222{
223 std::lock_guard lk(pimpl_->syncConnectionsMtx_);
224 size_t count = 0;
225 for (const auto& [did, sockets] : pimpl_->syncConnections_) {
226 if (not sockets.empty() and (!deviceId || deviceId == did)) {
227 count++;
228 dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s = sockets.back(), syncMsg] {
229 if (auto sthis = w.lock())
230 sthis->syncInfos(s, syncMsg);
231 });
232 }
233 }
234 if (count == 0) {
235 JAMI_WARNING("[Account {}] [device {}] no sync connection.", pimpl_->accountId_, deviceId.toString());
236 } else {
237 JAMI_DEBUG("[Account {}] [device {}] syncing with {:d} devices", pimpl_->accountId_, deviceId.to_view(), count);
238 }
239}
240
241} // namespace jami
static std::map< std::string, ConversationRequest > convRequests(const std::string &accountId)
static std::map< std::string, ConvInfo > convInfos(const std::string &accountId)
void syncInfos(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const std::shared_ptr< SyncMsg > &syncMsg)
Build SyncMsg and send it on socket.
std::map< DeviceId, std::vector< std::shared_ptr< dhtnet::ChannelSocket > > > syncConnections_
std::recursive_mutex syncConnectionsMtx_
const std::string accountId_
void onChannelShutdown(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const DeviceId &device)
std::weak_ptr< JamiAccount > account_
Impl(const std::shared_ptr< JamiAccount > &account)
SyncModule(const std::shared_ptr< JamiAccount > &account)
void syncWithConnected(const std::shared_ptr< SyncMsg > &syncMsg=nullptr, const DeviceId &deviceId={})
Send sync to all connected devices.
bool isConnected(const DeviceId &deviceId) const
void cacheSyncConnection(std::shared_ptr< dhtnet::ChannelSocket > &&socket, const std::string &peerId, const DeviceId &deviceId)
Store a new Sync connection.
#define JAMI_ERROR(formatstr,...)
Definition logger.h:243
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:238
#define JAMI_WARNING(formatstr,...)
Definition logger.h:242
#define JAMI_LOG(formatstr,...)
Definition logger.h:237
dht::PkId DeviceId
void emitSignal(Args... args)
Definition jami_signal.h:64
std::map< std::string, std::map< std::string, std::string > > p
std::map< std::string, std::map< std::string, std::map< std::string, std::string > > > ms
std::map< std::string, ConversationRequest > cr
std::map< std::string, ConvInfo > c