Ring Daemon 16.0.0
Loading...
Searching...
No Matches
sync_module.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "sync_module.h"
19
22#include <dhtnet/multiplexed_socket.h>
23
24namespace jami {
25
26class SyncModule::Impl : public std::enable_shared_from_this<Impl>
27{
28public:
29 Impl(std::weak_ptr<JamiAccount>&& account);
30
31 std::weak_ptr<JamiAccount> account_;
32
33 // Sync connections
34 std::recursive_mutex syncConnectionsMtx_;
35 std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>
37
42 void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
43 const std::shared_ptr<SyncMsg>& syncMsg);
44 void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
45 const DeviceId& device);
46};
47
48SyncModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account)
49 : account_(account)
50{}
51
52void
53SyncModule::Impl::syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
54 const std::shared_ptr<SyncMsg>& syncMsg)
55{
56 auto acc = account_.lock();
57 if (!acc)
58 return;
59 msgpack::sbuffer buffer(UINT16_MAX); // Use max pkt size
60 std::error_code ec;
61 if (!syncMsg) {
62 // Send contacts infos
63 // This message can be big. TODO rewrite to only take UINT16_MAX bytes max or split it multiple
64 // messages. For now, write 3 messages (UINT16_MAX*3 should be enough for all information).
65 if (auto info = acc->accountManager()->getInfo()) {
66 if (info->contacts) {
68 msg.ds = info->contacts->getSyncData();
69 msgpack::pack(buffer, msg);
70 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()),
71 buffer.size(),
72 ec);
73 if (ec) {
74 JAMI_ERROR("{:s}", ec.message());
75 return;
76 }
77 }
78 }
79 buffer.clear();
80 // Sync conversations
81 auto c = ConversationModule::convInfos(acc->getAccountID());
82 if (!c.empty()) {
84 msg.c = std::move(c);
85 msgpack::pack(buffer, msg);
86 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
87 if (ec) {
88 JAMI_ERROR("{:s}", ec.message());
89 return;
90 }
91 }
92 buffer.clear();
93 // Sync requests
94 auto cr = ConversationModule::convRequests(acc->getAccountID());
95 if (!cr.empty()) {
97 msg.cr = std::move(cr);
98 msgpack::pack(buffer, msg);
99 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
100 if (ec) {
101 JAMI_ERROR("{:s}", ec.message());
102 return;
103 }
104 }
105 buffer.clear();
106 auto convModule = acc->convModule(true);
107 if (!convModule)
108 return;
109 // Sync conversation's preferences
110 auto p = convModule->convPreferences();
111 if (!p.empty()) {
112 SyncMsg msg;
113 msg.p = std::move(p);
114 msgpack::pack(buffer, msg);
115 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
116 if (ec) {
117 JAMI_ERROR("{:s}", ec.message());
118 return;
119 }
120 }
121 buffer.clear();
122 // Sync read's status
123 auto ms = convModule->convMessageStatus();
124 if (!ms.empty()) {
125 SyncMsg msg;
126 msg.ms = std::move(ms);
127 msgpack::pack(buffer, msg);
128 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
129 if (ec) {
130 JAMI_ERROR("{:s}", ec.message());
131 return;
132 }
133 }
134 buffer.clear();
135
136 } else {
137 msgpack::pack(buffer, *syncMsg);
138 socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec);
139 if (ec)
140 JAMI_ERROR("{:s}", ec.message());
141 }
142}
143
145
146SyncModule::SyncModule(std::weak_ptr<JamiAccount>&& account)
147 : pimpl_ {std::make_shared<Impl>(std::move(account))}
148{}
149
150void
151SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
152 const DeviceId& device)
153{
154 std::lock_guard lk(syncConnectionsMtx_);
155 auto connectionsIt = syncConnections_.find(device);
156 if (connectionsIt == syncConnections_.end())
157 return;
158 auto& connections = connectionsIt->second;
159 auto conn = std::find(connections.begin(), connections.end(), socket);
160 if (conn != connections.end())
161 connections.erase(conn);
162 if (connections.empty())
164}
165
166void
167SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
168 const std::string& peerId,
169 const DeviceId& device)
170{
171 std::lock_guard lk(pimpl_->syncConnectionsMtx_);
172 pimpl_->syncConnections_[device].emplace_back(socket);
173
174 socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)]() {
175 if (auto shared = w.lock())
176 shared->onChannelShutdown(s.lock(), device);
177 });
178
179 struct DecodingContext
180 {
181 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
182 nullptr,
183 512};
184 };
185
186 socket->setOnRecv([acc = pimpl_->account_.lock(),
187 device,
188 peerId,
189 ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
190 if (!buf || !acc)
191 return len;
192
193 ctx->pac.reserve_buffer(len);
194 std::copy_n(buf, len, ctx->pac.buffer());
195 ctx->pac.buffer_consumed(len);
196
197 msgpack::object_handle oh;
198 try {
199 while (ctx->pac.next(oh)) {
200 SyncMsg msg;
201 oh.get().convert(msg);
202 if (auto manager = acc->accountManager())
203 manager->onSyncData(std::move(msg.ds), false);
204
205 if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty()
206 || !msg.ms.empty())
207 if (auto cm = acc->convModule(true))
208 cm->onSyncData(msg, peerId, device.toString());
209 }
210 } catch (const std::exception& e) {
211 JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
212 }
213
214 return len;
215 });
216
217 pimpl_->syncInfos(socket, nullptr);
218}
219
220bool
221SyncModule::isConnected(const DeviceId& deviceId) const
222{
223 std::lock_guard lk(pimpl_->syncConnectionsMtx_);
224 auto it = pimpl_->syncConnections_.find(deviceId);
225 if (it == pimpl_->syncConnections_.end())
226 return false;
227 return !it->second.empty();
228}
229
230void
231SyncModule::syncWithConnected(const std::shared_ptr<SyncMsg>& syncMsg, const DeviceId& deviceId)
232{
233 std::lock_guard lk(pimpl_->syncConnectionsMtx_);
234 for (auto& [did, sockets] : pimpl_->syncConnections_) {
235 if (not sockets.empty()) {
236 if (!deviceId || deviceId == did) {
237 pimpl_->syncInfos(sockets[0], syncMsg);
238 }
239 }
240 }
241}
242} // namespace jami
static std::map< std::string, ConversationRequest > convRequests(const std::string &accountId)
static std::map< std::string, ConvInfo > convInfos(const std::string &accountId)
void syncInfos(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const std::shared_ptr< SyncMsg > &syncMsg)
Build SyncMsg and send it on socket.
std::map< DeviceId, std::vector< std::shared_ptr< dhtnet::ChannelSocket > > > syncConnections_
std::recursive_mutex syncConnectionsMtx_
Impl(std::weak_ptr< JamiAccount > &&account)
void onChannelShutdown(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const DeviceId &device)
std::weak_ptr< JamiAccount > account_
SyncModule(std::weak_ptr< JamiAccount > &&account)
void syncWithConnected(const std::shared_ptr< SyncMsg > &syncMsg=nullptr, const DeviceId &deviceId={})
Send sync to all connected devices.
bool isConnected(const DeviceId &deviceId) const
void cacheSyncConnection(std::shared_ptr< dhtnet::ChannelSocket > &&socket, const std::string &peerId, const DeviceId &deviceId)
Store a new Sync connection.
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
dht::PkId DeviceId
void emitSignal(Args... args)
Definition ring_signal.h:64
std::map< std::string, std::map< std::string, std::string > > p
std::map< std::string, std::map< std::string, std::map< std::string, std::string > > > ms
std::map< std::string, ConversationRequest > cr
std::map< std::string, ConvInfo > c