26#include <opendht/thread_pool.h>
37 , ioContext_(
Manager::instance().ioContext())
38 , saveTimer_(*ioContext_)
40 dhtnet::fileutils::check_dir(savePath_.parent_path());
45 const std::string& deviceId,
46 const std::map<std::string, std::string>& payloads,
49 if (payloads.empty()
or to.empty())
53 std::lock_guard lock(messagesMutex_);
54 auto&
peerMessages = deviceId.empty() ? messages_[to] : messagesDevices_[deviceId];
60 m.payloads = payloads;
70 m.payloads = payloads;
74 ioContext_->post([
this, to, deviceId]() { retrySend(to, deviceId,
true); });
80 const std::string& deviceId,
87MessageEngine::retrySend(
const std::string& peer,
const std::string& deviceId,
bool retryOnTimeout)
92 std::map<std::string, std::string> payloads;
94 std::vector<PendingMsg> pending {};
95 auto now = clock::now();
97 std::lock_guard lock(messagesMutex_);
98 auto&
m = deviceId.empty() ? messages_ : messagesDevices_;
99 auto p =
m.find(deviceId.empty() ? peer : deviceId);
109 pending.emplace_back(
PendingMsg {
m.token,
m.to,
m.payloads});
114 for (
const auto& p : pending) {
116 if (p.payloads.find(
"application/im-gitmessage-id") == p.payloads.end())
121 std::to_string(p.token),
130 std::lock_guard lock(messagesMutex_);
131 for (
const auto& p : messages_) {
132 for (
const auto&
m : p.second) {
144 const std::string& deviceId)
147 std::lock_guard lock(messagesMutex_);
148 auto&
m = deviceId.empty() ? messages_ : messagesDevices_;
150 auto p =
m.find(deviceId.empty() ? peer : deviceId);
152 JAMI_WARNING(
"[Account {:s}] onMessageSent: Peer not found: id:{} device:{}", account_.
getAccountID(), peer, deviceId);
156 auto f = std::find_if(p->second.begin(), p->second.end(), [&](
const Message&
m) {
157 return m.token == token;
159 if (
f != p->second.end()) {
160 auto emit =
f->payloads.find(
"application/im-gitmessage-id")
161 ==
f->payloads.end();
165 JAMI_LOG(
"[Account {:s}] [message {:d}] Status changed to SENT", account_.
getAccountID(), token);
171 std::to_string(token),
175 }
else if (
f->retried >= MAX_RETRIES) {
183 std::to_string(token),
203 decltype(messages_)
root;
205 std::lock_guard lock(dhtnet::fileutils::getFileLock(savePath_));
207 file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
208 file.open(savePath_);
209 if (
file.is_open()) {
210 msgpack::unpacker
up;
213 up.buffer_consumed(
file.gcount());
214 msgpack::object_handle
oh;
216 root =
oh.get().as<std::map<std::string, std::list<Message>>>();
223 std::lock_guard lock(messagesMutex_);
224 messages_ = std::move(
root);
225 if (
not messages_.empty()) {
226 JAMI_LOG(
"[Account {}] Loaded {} messages from {}",
231 }
catch (
const std::exception&
e) {
232 JAMI_LOG(
"[Account {}] Unable to load messages from {}: {}",
242 std::lock_guard lock(messagesMutex_);
247MessageEngine::scheduleSave()
249 saveTimer_.expires_after(std::chrono::seconds(5));
250 saveTimer_.async_wait([
this, w = account_.weak_from_this()](
const std::error_code&
ec) {
252 if (auto acc = w.lock())
258MessageEngine::save_()
const
262 file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
263 file.open(savePath_, std::ios::trunc);
265 msgpack::pack(
file, messages_);
266 }
catch (
const std::exception&
e) {
267 JAMI_ERROR(
"[Account {}] Unable to serialize pending messages: {}",
const std::string & getAccountID() const
Get the account ID.
std::mt19937_64 rand
Random generator engine Logical account state shall never rely on the state of the random generator.
Manager (controller) of daemon.
virtual void sendMessage(const std::string &to, const std::string &deviceId, const std::map< std::string, std::string > &payloads, uint64_t id, bool retryOnTimeout=true, bool onlyConnected=false)=0
void load()
Load persisted messages.
MessageToken sendMessage(const std::string &to, const std::string &deviceId, const std::map< std::string, std::string > &payloads, uint64_t refreshToken)
Add a message to the engine and try to send it.
void onPeerOnline(const std::string &peer, const std::string &deviceId={}, bool retryOnTimeout=true)
@TODO change MessageEngine by a queue, @NOTE retryOnTimeout is used for failing SIP messages (jamiAcc...
void onMessageSent(const std::string &peer, MessageToken t, bool success, const std::string &deviceId={})
void save() const
Persist messages.
MessageStatus getStatus(MessageToken t) const
MessageEngine(SIPAccountBase &, const std::filesystem::path &path)
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_WARNING(formatstr,...)
#define JAMI_LOG(formatstr,...)
void emitSignal(Args... args)
static constexpr uint64_t JAMI_ID_MAX_VAL