Ring Daemon 16.0.0
Loading...
Searching...
No Matches
message_engine.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "message_engine.h"
19#include "sip/sipaccountbase.h"
20#include "manager.h"
21#include "fileutils.h"
22
23#include "client/ring_signal.h"
24#include "jami/account_const.h"
25
26#include <opendht/thread_pool.h>
27#include <fmt/std.h>
28
29#include <fstream>
30
31namespace jami {
32namespace im {
33
34MessageEngine::MessageEngine(SIPAccountBase& acc, const std::filesystem::path& path)
35 : account_(acc)
36 , savePath_(path)
37 , ioContext_(Manager::instance().ioContext())
38 , saveTimer_(*ioContext_)
39{
40 dhtnet::fileutils::check_dir(savePath_.parent_path());
41}
42
44MessageEngine::sendMessage(const std::string& to,
45 const std::string& deviceId,
46 const std::map<std::string, std::string>& payloads,
48{
49 if (payloads.empty() or to.empty())
50 return 0;
51 MessageToken token = 0;
52 {
53 std::lock_guard lock(messagesMutex_);
54 auto& peerMessages = deviceId.empty() ? messages_[to] : messagesDevices_[deviceId];
55 if (refreshToken != 0) {
56 for (auto& m : peerMessages) {
57 if (m.token == refreshToken) {
58 token = refreshToken;
59 m.to = to;
60 m.payloads = payloads;
61 m.status = MessageStatus::IDLE;
62 break;
63 }
64 }
65 }
66 if (token == 0) {
67 token = std::uniform_int_distribution<MessageToken> {1, JAMI_ID_MAX_VAL}(account_.rand);
68 auto& m = peerMessages.emplace_back(Message {token});
69 m.to = to;
70 m.payloads = payloads;
71 }
72 scheduleSave();
73 }
74 asio::post(*ioContext_, [this, to, deviceId]() {
75 retrySend(to, deviceId, true);
76 });
77 return token;
78}
79
80void
81MessageEngine::onPeerOnline(const std::string& peer,
82 const std::string& deviceId,
83 bool retryOnTimeout)
84{
85 retrySend(peer, deviceId, retryOnTimeout);
86}
87
88void
89MessageEngine::retrySend(const std::string& peer, const std::string& deviceId, bool retryOnTimeout)
90{
91 struct PendingMsg {
92 MessageToken token;
93 std::string to;
94 std::map<std::string, std::string> payloads;
95 };
96 std::vector<PendingMsg> pending {};
97 auto now = clock::now();
98 {
99 std::lock_guard lock(messagesMutex_);
100 auto& m = deviceId.empty() ? messages_ : messagesDevices_;
101 auto p = m.find(deviceId.empty() ? peer : deviceId);
102 if (p == m.end())
103 return;
104 auto& messages = p->second;
105
106 for (auto& m: messages) {
107 if (m.status == MessageStatus::IDLE) {
108 m.status = MessageStatus::SENDING;
109 m.retried++;
110 m.last_op = now;
111 pending.emplace_back(PendingMsg {m.token, m.to, m.payloads});
112 }
113 }
114 }
115 // avoid locking while calling callback
116 for (const auto& p : pending) {
117 JAMI_DEBUG("[Account {:s}] [message {:d}] Reattempt sending", account_.getAccountID(), p.token);
118 if (p.payloads.find("application/im-gitmessage-id") == p.payloads.end())
120 account_.getAccountID(),
121 "",
122 p.to,
123 std::to_string(p.token),
125 account_.sendMessage(p.to, deviceId, p.payloads, p.token, retryOnTimeout, false);
126 }
127}
128
131{
132 std::lock_guard lock(messagesMutex_);
133 for (const auto& p : messages_) {
134 for (const auto& m : p.second) {
135 if (m.token == t)
136 return m.status;
137 }
138 }
140}
141
142void
143MessageEngine::onMessageSent(const std::string& peer,
144 MessageToken token,
145 bool ok,
146 const std::string& deviceId)
147{
148 JAMI_DEBUG("[Account {:s}] [message {:d}] Message sent: {:s}", account_.getAccountID(), token, ok ? "success"sv : "failure"sv);
149 std::lock_guard lock(messagesMutex_);
150 auto& m = deviceId.empty() ? messages_ : messagesDevices_;
151
152 auto p = m.find(deviceId.empty() ? peer : deviceId);
153 if (p == m.end()) {
154 JAMI_WARNING("[Account {:s}] onMessageSent: Peer not found: id:{} device:{}", account_.getAccountID(), peer, deviceId);
155 return;
156 }
157
158 auto f = std::find_if(p->second.begin(), p->second.end(), [&](const Message& m) {
159 return m.token == token;
160 });
161 if (f != p->second.end()) {
162 auto emit = f->payloads.find("application/im-gitmessage-id")
163 == f->payloads.end();
164 if (f->status == MessageStatus::SENDING) {
165 if (ok) {
166 f->status = MessageStatus::SENT;
167 JAMI_LOG("[Account {:s}] [message {:d}] Status changed to SENT", account_.getAccountID(), token);
168 if (emit)
170 account_.getAccountID(),
171 "",
172 f->to,
173 std::to_string(token),
174 static_cast<int>(libjami::Account::MessageStates::SENT));
175 p->second.erase(f);
176 scheduleSave();
177 } else if (f->retried >= MAX_RETRIES) {
178 f->status = MessageStatus::FAILURE;
179 JAMI_WARNING("[Account {:s}] [message {:d}] Status changed to FAILURE", account_.getAccountID(), token);
180 if (emit)
182 account_.getAccountID(),
183 "",
184 f->to,
185 std::to_string(token),
187 p->second.erase(f);
188 scheduleSave();
189 } else {
190 f->status = MessageStatus::IDLE;
191 JAMI_DEBUG("[Account {:s}] [message {:d}] Status changed to IDLE", account_.getAccountID(), token);
192 }
193 } else {
194 JAMI_DEBUG("[Account {:s}] [message {:d}] State is not SENDING", account_.getAccountID(), token);
195 }
196 } else {
197 JAMI_DEBUG("[Account {:s}] [message {:d}] Unable to find message", account_.getAccountID(), token);
198 }
199}
200
201void
203{
204 try {
205 decltype(messages_) root;
206 {
207 std::lock_guard lock(dhtnet::fileutils::getFileLock(savePath_));
208 std::ifstream file;
209 file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
210 file.open(savePath_);
211 if (file.is_open()) {
212 msgpack::unpacker up;
213 up.reserve_buffer(UINT16_MAX);
214 while (file.read(up.buffer(), UINT16_MAX)) {
215 up.buffer_consumed(file.gcount());
216 msgpack::object_handle oh;
217 if (up.next(oh)) {
218 root = oh.get().as<std::map<std::string, std::list<Message>>>();
219 break;
220 }
221 up.reserve_buffer(UINT16_MAX);
222 }
223 }
224 }
225 std::lock_guard lock(messagesMutex_);
226 messages_ = std::move(root);
227 if (not messages_.empty()) {
228 JAMI_LOG("[Account {}] Loaded {} messages from {}",
229 account_.getAccountID(),
230 messages_.size(),
231 savePath_);
232 }
233 } catch (const std::exception& e) {
234 JAMI_LOG("[Account {}] Unable to load messages from {}: {}",
235 account_.getAccountID(),
236 savePath_,
237 e.what());
238 }
239}
240
241void
243{
244 std::lock_guard lock(messagesMutex_);
245 save_();
246}
247
248void
249MessageEngine::scheduleSave()
250{
251 saveTimer_.expires_after(std::chrono::seconds(5));
252 saveTimer_.async_wait([this, w = account_.weak_from_this()](const std::error_code& ec) {
253 if (!ec)
254 if (auto acc = w.lock())
255 save();
256 });
257}
258
259void
260MessageEngine::save_() const
261{
262 try {
263 std::ofstream file;
264 file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
265 file.open(savePath_, std::ios::trunc);
266 if (file.is_open())
267 msgpack::pack(file, messages_);
268 } catch (const std::exception& e) {
269 JAMI_ERROR("[Account {}] Unable to serialize pending messages: {}",
270 account_.getAccountID(), e.what());
271 }
272}
273
274} // namespace im
275} // namespace jami
const std::string & getAccountID() const
Get the account ID.
Definition account.h:154
std::mt19937_64 rand
Random generator engine Logical account state shall never rely on the state of the random generator.
Definition account.h:349
Manager (controller) of daemon.
Definition manager.h:67
virtual void sendMessage(const std::string &to, const std::string &deviceId, const std::map< std::string, std::string > &payloads, uint64_t id, bool retryOnTimeout=true, bool onlyConnected=false)=0
void load()
Load persisted messages.
MessageToken sendMessage(const std::string &to, const std::string &deviceId, const std::map< std::string, std::string > &payloads, uint64_t refreshToken)
Add a message to the engine and try to send it.
void onPeerOnline(const std::string &peer, const std::string &deviceId={}, bool retryOnTimeout=true)
@TODO change MessageEngine by a queue, @NOTE retryOnTimeout is used for failing SIP messages (jamiAcc...
void onMessageSent(const std::string &peer, MessageToken t, bool success, const std::string &deviceId={})
void save() const
Persist messages.
MessageStatus getStatus(MessageToken t) const
MessageEngine(SIPAccountBase &, const std::filesystem::path &path)
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:226
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
uint64_t MessageToken
void emitSignal(Args... args)
Definition ring_signal.h:64
static constexpr uint64_t JAMI_ID_MAX_VAL
Definition account.h:62