26#include <opendht/thread_pool.h>
37 , ioContext_(
Manager::instance().ioContext())
38 , saveTimer_(*ioContext_)
40 dhtnet::fileutils::check_dir(savePath_.parent_path());
45 const std::string& deviceId,
46 const std::map<std::string, std::string>& payloads,
49 if (payloads.empty()
or to.empty())
53 std::lock_guard lock(messagesMutex_);
54 auto&
peerMessages = deviceId.empty() ? messages_[to] : messagesDevices_[deviceId];
60 m.payloads = payloads;
70 m.payloads = payloads;
74 asio::post(*ioContext_, [
this, to, deviceId]() {
75 retrySend(to, deviceId,
true);
82 const std::string& deviceId,
89MessageEngine::retrySend(
const std::string& peer,
const std::string& deviceId,
bool retryOnTimeout)
94 std::map<std::string, std::string> payloads;
96 std::vector<PendingMsg> pending {};
97 auto now = clock::now();
99 std::lock_guard lock(messagesMutex_);
100 auto&
m = deviceId.empty() ? messages_ : messagesDevices_;
101 auto p =
m.find(deviceId.empty() ? peer : deviceId);
111 pending.emplace_back(
PendingMsg {
m.token,
m.to,
m.payloads});
116 for (
const auto& p : pending) {
118 if (p.payloads.find(
"application/im-gitmessage-id") == p.payloads.end())
123 std::to_string(p.token),
132 std::lock_guard lock(messagesMutex_);
133 for (
const auto& p : messages_) {
134 for (
const auto&
m : p.second) {
146 const std::string& deviceId)
149 std::lock_guard lock(messagesMutex_);
150 auto&
m = deviceId.empty() ? messages_ : messagesDevices_;
152 auto p =
m.find(deviceId.empty() ? peer : deviceId);
154 JAMI_WARNING(
"[Account {:s}] onMessageSent: Peer not found: id:{} device:{}", account_.
getAccountID(), peer, deviceId);
158 auto f = std::find_if(p->second.begin(), p->second.end(), [&](
const Message&
m) {
159 return m.token == token;
161 if (
f != p->second.end()) {
162 auto emit =
f->payloads.find(
"application/im-gitmessage-id")
163 ==
f->payloads.end();
167 JAMI_LOG(
"[Account {:s}] [message {:d}] Status changed to SENT", account_.
getAccountID(), token);
173 std::to_string(token),
177 }
else if (
f->retried >= MAX_RETRIES) {
185 std::to_string(token),
205 decltype(messages_)
root;
207 std::lock_guard lock(dhtnet::fileutils::getFileLock(savePath_));
209 file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
210 file.open(savePath_);
211 if (
file.is_open()) {
212 msgpack::unpacker
up;
215 up.buffer_consumed(
file.gcount());
216 msgpack::object_handle
oh;
218 root =
oh.get().as<std::map<std::string, std::list<Message>>>();
225 std::lock_guard lock(messagesMutex_);
226 messages_ = std::move(
root);
227 if (
not messages_.empty()) {
228 JAMI_LOG(
"[Account {}] Loaded {} messages from {}",
233 }
catch (
const std::exception&
e) {
234 JAMI_LOG(
"[Account {}] Unable to load messages from {}: {}",
244 std::lock_guard lock(messagesMutex_);
249MessageEngine::scheduleSave()
251 saveTimer_.expires_after(std::chrono::seconds(5));
252 saveTimer_.async_wait([
this, w = account_.weak_from_this()](
const std::error_code&
ec) {
254 if (auto acc = w.lock())
260MessageEngine::save_()
const
264 file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
265 file.open(savePath_, std::ios::trunc);
267 msgpack::pack(
file, messages_);
268 }
catch (
const std::exception&
e) {
269 JAMI_ERROR(
"[Account {}] Unable to serialize pending messages: {}",
const std::string & getAccountID() const
Get the account ID.
std::mt19937_64 rand
Random generator engine Logical account state shall never rely on the state of the random generator.
Manager (controller) of daemon.
virtual void sendMessage(const std::string &to, const std::string &deviceId, const std::map< std::string, std::string > &payloads, uint64_t id, bool retryOnTimeout=true, bool onlyConnected=false)=0
void load()
Load persisted messages.
MessageToken sendMessage(const std::string &to, const std::string &deviceId, const std::map< std::string, std::string > &payloads, uint64_t refreshToken)
Add a message to the engine and try to send it.
void onPeerOnline(const std::string &peer, const std::string &deviceId={}, bool retryOnTimeout=true)
@TODO change MessageEngine by a queue, @NOTE retryOnTimeout is used for failing SIP messages (jamiAcc...
void onMessageSent(const std::string &peer, MessageToken t, bool success, const std::string &deviceId={})
void save() const
Persist messages.
MessageStatus getStatus(MessageToken t) const
MessageEngine(SIPAccountBase &, const std::filesystem::path &path)
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_WARNING(formatstr,...)
#define JAMI_LOG(formatstr,...)
void emitSignal(Args... args)
static constexpr uint64_t JAMI_ID_MAX_VAL