Ring Daemon 16.0.0
Loading...
Searching...
No Matches
conversation.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "conversation.h"
18
19#include "account_const.h"
20#include "jamiaccount.h"
21#include "client/ring_signal.h"
22#include "swarm/swarm_manager.h"
23#ifdef ENABLE_PLUGIN
24#include "manager.h"
26#include "plugin/streamdata.h"
27#endif
29
30#include "fileutils.h"
31#include "json_utils.h"
32
33#include <opendht/thread_pool.h>
34#include <fmt/compile.h>
35
36#include <charconv>
37#include <string_view>
38#include <tuple>
39#include <optional>
40
41namespace jami {
42
43static const char* const LAST_MODIFIED = "lastModified";
44
45ConvInfo::ConvInfo(const Json::Value& json)
46{
47 id = json[ConversationMapKeys::ID].asString();
48 created = json[ConversationMapKeys::CREATED].asLargestUInt();
49 removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
50 erased = json[ConversationMapKeys::ERASED].asLargestUInt();
51 for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
52 members.emplace(v["uri"].asString());
53 }
55}
56
57Json::Value
59{
60 Json::Value json;
62 json[ConversationMapKeys::CREATED] = Json::Int64(created);
63 if (removed) {
64 json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
65 }
66 if (erased) {
67 json[ConversationMapKeys::ERASED] = Json::Int64(erased);
68 }
69 for (const auto& m : members) {
70 Json::Value member;
71 member["uri"] = m;
73 }
75 return json;
76}
77
78// ConversationRequest
80{
81 received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
82 declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
83 from = json[ConversationMapKeys::FROM].asString();
86 for (const auto& member : md.getMemberNames()) {
87 metadatas.emplace(member, md[member].asString());
88 }
89}
90
91Json::Value
93{
94 Json::Value json;
98 if (declined)
100 for (const auto& [key, value] : metadatas) {
102 }
103 return json;
104}
105
106std::map<std::string, std::string>
108{
109 auto result = metadatas;
112 if (declined)
113 result[ConversationMapKeys::DECLINED] = std::to_string(declined);
114 result[ConversationMapKeys::RECEIVED] = std::to_string(received);
115 return result;
116}
117
118using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
119
121{
122 // While loading the history, we need to avoid:
123 // - reloading history (can just be ignored)
124 // - adding new commits (should wait for history to be loaded)
125 std::mutex mutex {};
126 std::condition_variable cv {};
127 bool loading {false};
129 std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
130 std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
131 std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
132};
133
135{
136public:
137 Impl(const std::shared_ptr<JamiAccount>& account,
139 const std::string& otherMember = "")
142 , accountId_(account->getAccountID())
143 , userId_(account->getUsername())
144 , deviceId_(account->currentDeviceId())
145 {
146 if (!repository_) {
147 throw std::logic_error("Unable to create repository");
148 }
149 init(account);
150 }
151
152 Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
154 , accountId_(account->getAccountID())
155 , userId_(account->getUsername())
156 , deviceId_(account->currentDeviceId())
157 {
158 repository_ = std::make_unique<ConversationRepository>(account, conversationId);
159 if (!repository_) {
160 throw std::logic_error("Unable to create repository");
161 }
162 init(account);
163 }
164
165 Impl(const std::shared_ptr<JamiAccount>& account,
166 const std::string& remoteDevice,
167 const std::string& conversationId)
169 , accountId_(account->getAccountID())
170 , userId_(account->getUsername())
171 , deviceId_(account->currentDeviceId())
172 {
173 std::vector<ConversationCommit> commits;
176 conversationId,
177 [&](auto c) {
178 commits = std::move(c);
179 });
180 if (!repository_) {
182 accountId_, conversationId, EFETCH, "Unable to clone repository");
183 throw std::logic_error("Unable to clone repository");
184 }
185 // To detect current active calls, we need to check history
187 / "conversation_data" / conversationId;
189 initActiveCalls(repository_->convCommitsToMap(commits));
190 init(account);
191 }
192
193 void init(const std::shared_ptr<JamiAccount>& account) {
195 fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
197 = std::make_shared<SwarmManager>(NodeId(deviceId_),
198 Manager::instance().getSeededRandomEngine(),
199 [account = account_](const DeviceId& deviceId) {
200 if (auto acc = account.lock()) {
201 return acc->isConnectedWith(deviceId);
202 }
203 return false;
204 });
205 swarmManager_->setMobility(account->isMobile());
207 = std::make_shared<TransferManager>(accountId_,
208 "",
209 repository_->id(),
212 / "conversation_data" / repository_->id();
220 loadStatus();
221 typers_ = std::make_shared<Typers>(account, repository_->id());
222 }
223
224 std::string toString() const {
225 return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
226 }
227 mutable std::string fmtStr_;
228
230 {
231 try {
232 if (fallbackTimer_)
233 fallbackTimer_->cancel();
234 } catch (const std::exception& e) {
235 JAMI_ERROR("{:s} {:s}", toString(), e.what());
236 }
237 }
238
244 std::vector<std::string> commitsEndedCalls();
245 bool isAdmin() const;
246 std::filesystem::path repoPath() const;
247
248 void announce(const std::string& commitId, bool commitFromSelf = false)
249 {
250 std::vector<std::string> vec;
251 if (!commitId.empty())
252 vec.emplace_back(commitId);
254 }
255
256 void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
257 {
258 std::vector<ConversationCommit> convcommits;
259 convcommits.reserve(commits.size());
260 for (const auto& cid : commits) {
261 auto commit = repository_->getCommit(cid);
262 if (commit != std::nullopt) {
263 convcommits.emplace_back(*commit);
264 }
265 }
266 announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
267 }
268
273 void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
274 {
275 std::unordered_set<std::string> invalidHostUris;
276 std::unordered_set<std::string> invalidCallIds;
277
278 std::lock_guard lk(activeCallsMtx_);
279 for (const auto& commit : commits) {
280 if (commit.at("type") == "member") {
281 // Each commit of type "member" has an "action" field whose value can be one
282 // of the following: "add", "join", "remove", "ban", "unban"
283 // In the case of "remove" and "ban", we need to add the member's URI to
284 // invalidHostUris to ensure that any call they may have started in the past
285 // is no longer considered active.
286 // For the other actions, there's no harm in adding the member's URI anyway,
287 // since it's not possible to start hosting a call before joining the swarm (or
288 // before getting unbanned in the case of previously banned members).
289 invalidHostUris.emplace(commit.at("uri"));
290 } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
291 && commit.find("device") != commit.end()) {
292 // The commit indicates either the end or the beginning of a call
293 // (depending on whether there is a "duration" field or not).
294 auto convId = repository_->id();
295 auto confId = commit.at("confId");
296 auto uri = commit.at("uri");
297 auto device = commit.at("device");
298
299 if (commit.find("duration") == commit.end()
300 && invalidCallIds.find(confId) == invalidCallIds.end()
301 && invalidHostUris.find(uri) == invalidHostUris.end()) {
302 std::map<std::string, std::string> activeCall;
303 activeCall["id"] = confId;
304 activeCall["uri"] = uri;
305 activeCall["device"] = device;
306 activeCalls_.emplace_back(activeCall);
307 fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
308 convId,
309 confId,
310 device,
311 uri);
312 }
313 // Even if the call was active, we still add its ID to invalidCallIds to make sure it
314 // doesn't get added a second time. (This shouldn't happen normally, but in practice
315 // there are sometimes multiple commits indicating the beginning of the same call.)
316 invalidCallIds.emplace(confId);
317 }
318 }
321 repository_->id(),
323 }
324
332 void updateActiveCalls(const std::map<std::string, std::string>& commit,
333 bool eraseOnly = false,
334 bool emitSig = true) const
335 {
336 if (!repository_)
337 return;
338 if (commit.at("type") == "member") {
339 // In this case, we need to check if we are not removing a hosting member or device
340 std::lock_guard lk(activeCallsMtx_);
341 auto it = activeCalls_.begin();
342 auto updateActives = false;
343 while (it != activeCalls_.end()) {
344 if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
345 JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left",
346 it->at("id"),
347 commit.at("uri"));
348 it = activeCalls_.erase(it);
349 updateActives = true;
350 } else {
351 ++it;
352 }
353 }
354 if (updateActives) {
356 if (emitSig)
358 repository_->id(),
360 }
361 return;
362 }
363 // Else, it's a call information
364 if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
365 && commit.find("device") != commit.end()) {
366 auto convId = repository_->id();
367 auto confId = commit.at("confId");
368 auto uri = commit.at("uri");
369 auto device = commit.at("device");
370 std::lock_guard lk(activeCallsMtx_);
371 auto itActive = std::find_if(activeCalls_.begin(),
372 activeCalls_.end(),
373 [&](const auto& value) {
374 return value.at("id") == confId
375 && value.at("uri") == uri
376 && value.at("device") == device;
377 });
378 if (commit.find("duration") == commit.end()) {
379 if (itActive == activeCalls_.end() && !eraseOnly) {
381 "swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
382 convId,
383 confId,
384 device,
385 uri);
386 std::map<std::string, std::string> activeCall;
387 activeCall["id"] = confId;
388 activeCall["uri"] = uri;
389 activeCall["device"] = device;
390 activeCalls_.emplace_back(activeCall);
392 if (emitSig)
395 ->id(),
397 }
398 } else {
399 if (itActive != activeCalls_.end()) {
401 // Unlikely, but we must ensure that no duplicate exists
402 while (itActive != activeCalls_.end()) {
403 itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
404 return value.at("id") == confId && value.at("uri") == uri
405 && value.at("device") == device;
406 });
407 if (itActive != activeCalls_.end()) {
408 JAMI_ERROR("Duplicate call found. (This is a bug)");
410 }
411 }
412
413 if (eraseOnly) {
414 JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
415 "{:s}, account {:s}",
416 convId,
417 confId,
418 device,
419 uri);
420 } else {
421 JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
422 convId,
423 confId,
424 device,
425 uri);
426 }
427 }
429 if (emitSig)
431 repository_->id(),
433 }
434 }
435 }
436
437 void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
438 {
439 if (!repository_)
440 return;
441 auto convId = repository_->id();
442 auto ok = !commits.empty();
443 auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
445 if (ok) {
446 bool announceMember = false;
447 for (const auto& c : commits) {
448 // Announce member events
449 if (c.at("type") == "member") {
450 if (c.find("uri") != c.end() && c.find("action") != c.end()) {
451 const auto& uri = c.at("uri");
452 const auto& actionStr = c.at("action");
453 auto action = -1;
454 if (actionStr == "add")
455 action = 0;
456 else if (actionStr == "join")
457 action = 1;
458 else if (actionStr == "remove")
459 action = 2;
460 else if (actionStr == "ban")
461 action = 3;
462 else if (actionStr == "unban")
463 action = 4;
464 if (actionStr == "ban" || actionStr == "remove") {
465 // In this case, a potential host was removed during a call.
467 typers_->removeTyper(uri);
468 }
469 if (action != -1) {
470 announceMember = true;
472 accountId_, convId, uri, action);
473 }
474 }
475 } else if (c.at("type") == "application/call-history+json") {
477 }
478#ifdef ENABLE_PLUGIN
480 = Manager::instance().getJamiPluginManager().getChatServicesManager();
481 if (pluginChatManager.hasHandlers()) {
482 auto cm = std::make_shared<JamiMessage>(accountId_,
483 convId,
484 c.at("author") != userId_,
485 c,
486 false);
487 cm->isSwarm = true;
488 pluginChatManager.publishMessage(std::move(cm));
489 }
490#endif
491 // announce message
493 }
494
496 onMembersChanged_(repository_->memberUris("", {}));
497 }
498 }
499 }
500
502 {
503 try {
504 // read file
506 // load values
507 msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
508 std::lock_guard lk {messageStatusMtx_};
509 oh.get().convert(messagesStatus_);
510 } catch (const std::exception& e) {
511 }
512 }
514 {
515 std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
516 msgpack::pack(file, messagesStatus_);
517 }
518
519 void loadActiveCalls() const
520 {
521 try {
522 // read file
524 // load values
525 msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
526 std::lock_guard lk {activeCallsMtx_};
527 oh.get().convert(activeCalls_);
528 } catch (const std::exception& e) {
529 return;
530 }
531 }
532
533 void saveActiveCalls() const
534 {
535 std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
536 msgpack::pack(file, activeCalls_);
537 }
538
539 void loadHostedCalls() const
540 {
541 try {
542 // read file
544 // load values
545 msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
546 std::lock_guard lk {activeCallsMtx_};
547 oh.get().convert(hostedCalls_);
548 } catch (const std::exception& e) {
549 return;
550 }
551 }
552
553 void saveHostedCalls() const
554 {
555 std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
556 msgpack::pack(file, hostedCalls_);
557 }
558
559 void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
560
561 std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
562 bool includeLeft,
563 bool includeBanned) const;
564
565 std::string_view bannedType(const std::string& uri) const
566 {
567 auto repo = repoPath();
568 auto crt = fmt::format("{}.crt", uri);
569 auto bannedMember = repo / "banned" / "members" / crt;
570 if (std::filesystem::is_regular_file(bannedMember))
571 return "members"sv;
572 auto bannedAdmin = repo / "banned" / "admins" / crt;
573 if (std::filesystem::is_regular_file(bannedAdmin))
574 return "admins"sv;
575 auto bannedInvited = repo / "banned" / "invited" / uri;
576 if (std::filesystem::is_regular_file(bannedInvited))
577 return "invited"sv;
578 auto bannedDevice = repo / "banned" / "devices" / crt;
579 if (std::filesystem::is_regular_file(bannedDevice))
580 return "devices"sv;
581 return {};
582 }
583
584 std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
585 {
586 auto deviceSockets = gitSocketList_.find(deviceId);
587 return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
588 }
589
590 void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
591 {
592 gitSocketList_[deviceId] = socket;
593 }
594 void removeGitSocket(const DeviceId& deviceId)
595 {
596 auto deviceSockets = gitSocketList_.find(deviceId);
597 if (deviceSockets != gitSocketList_.end())
599 }
600
606 void disconnectFromPeer(const std::string& peerUri);
607
608 std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
609 bool includeLeft) const;
610
611 std::mutex membersMtx_ {};
612 std::set<std::string> checkedMembers_; // Store members we tried
613 std::function<void()> bootstrapCb_;
614#ifdef LIBJAMI_TEST
615 std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
616#endif
617
618 std::mutex writeMtx_ {};
619 std::unique_ptr<ConversationRepository> repository_;
620 std::shared_ptr<SwarmManager> swarmManager_;
621 std::weak_ptr<JamiAccount> account_;
622 std::string accountId_ {};
623 std::string userId_;
624 std::string deviceId_;
625 std::atomic_bool isRemoving_ {false};
626 std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
627 std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options,
628 History* optHistory = nullptr);
629 void pull(const std::string& deviceId);
630 std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
631
632 // Avoid multiple fetch/merges at the same time.
633 std::mutex pullcbsMtx_ {};
634 std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {}; // store current remote in fetch
635 std::shared_ptr<TransferManager> transferManager_ {};
636 std::filesystem::path conversationDataPath_ {};
637 std::filesystem::path fetchedPath_ {};
638
639 // Manage last message displayed and status
640 std::filesystem::path sendingPath_ {};
641 std::filesystem::path preferencesPath_ {};
643
644 // Manage hosted calls on this device
645 std::filesystem::path hostedCallsPath_ {};
646 mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
647 // Manage active calls for this conversation (can be hosted by other devices)
648 std::filesystem::path activeCallsPath_ {};
649 mutable std::mutex activeCallsMtx_ {};
650 mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
651
653
654 // Bootstrap
655 std::shared_ptr<asio::io_context> ioContext_;
656 std::unique_ptr<asio::steady_timer> fallbackTimer_;
657
658
663 std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
665 const std::vector<std::map<std::string, std::string>>& commits,
666 bool messageReceived = false,
667 bool commitFromSelf = false);
668
670 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
672 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
673 bool messageReceived) const;
675 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
676 bool messageReceived) const;
677 void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
678 History& history) const;
688 mutable std::mutex messageStatusMtx_;
689 std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
690 std::filesystem::path statusPath_ {};
691 mutable std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
696 // Note: only store int32_t cause it's easy to pass to dbus this way
697 // memberToStatus serves as a cache for loading messages
698 mutable std::map<std::string, int32_t> memberToStatus;
699
700
701 // futureStatus is used to store the status for receiving messages
702 // (because we're not sure to fetch the commit before receiving a status change for this)
703 mutable std::map<std::string, std::map<std::string, int32_t>> futureStatus;
704 // Update internal structures regarding status
705 void updateStatus(const std::string& uri,
707 const std::string& commitId,
708 const std::string& ts,
709 bool emit = false);
710
711
712 std::shared_ptr<Typers> typers_;
713};
714
715bool
717{
718 auto adminsPath = repoPath() / "admins";
719 return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
720}
721
722void
723Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
724{
725 // Remove nodes from swarmManager
726 const auto nodes = swarmManager_->getRoutingTable().getAllNodes();
727 std::vector<NodeId> toRemove;
728 for (const auto node : nodes)
729 if (peerUri == repository_->uriFromDevice(node.toString()))
730 toRemove.emplace_back(node);
731 swarmManager_->deleteNode(toRemove);
732
733 // Remove git sockets with this member
734 for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
735 if (peerUri == repository_->uriFromDevice(it->first.toString()))
736 it = gitSocketList_.erase(it);
737 else
738 ++it;
739 }
740}
741
742std::vector<std::map<std::string, std::string>>
744{
745 std::vector<std::map<std::string, std::string>> result;
746 auto members = repository_->members();
747 std::lock_guard lk(messageStatusMtx_);
748 for (const auto& member : members) {
749 if (member.role == MemberRole::BANNED && !includeBanned) {
750 continue;
751 }
753 continue;
754 if (member.role == MemberRole::LEFT && !includeLeft)
755 continue;
756 auto mm = member.map();
757 mm[ConversationMapKeys::LAST_DISPLAYED] = messagesStatus_[member.uri]["read"];
758 result.emplace_back(std::move(mm));
759 }
760 return result;
761}
762
763std::vector<std::string>
765{
766 // Handle current calls
767 std::vector<std::string> commits {};
768 std::unique_lock lk(writeMtx_);
769 std::unique_lock lkA(activeCallsMtx_);
770 for (const auto& hostedCall : hostedCalls_) {
771 // In this case, this means that we left
772 // the conference while still hosting it, so activeCalls
773 // will not be correctly updated
774 // We don't need to send notifications there, as peers will sync with presence
775 Json::Value value;
776 value["uri"] = userId_;
777 value["device"] = deviceId_;
778 value["confId"] = hostedCall.first;
779 value["type"] = "application/call-history+json";
780 auto now = std::chrono::system_clock::now();
781 auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
782 .count();
783 value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
784 auto itActive = std::find_if(activeCalls_.begin(),
785 activeCalls_.end(),
786 [this, confId = hostedCall.first](const auto& value) {
787 return value.at("id") == confId && value.at("uri") == userId_
788 && value.at("device") == deviceId_;
789 });
790 if (itActive != activeCalls_.end())
791 activeCalls_.erase(itActive);
792 commits.emplace_back(repository_->commitMessage(json::toString(value)));
793
794 JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
795 }
796 hostedCalls_.clear();
797 saveActiveCalls();
798 saveHostedCalls();
799 return commits;
800}
801
802std::filesystem::path
804{
805 return fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id();
806}
807
808std::vector<std::map<std::string, std::string>>
810{
811 if (!repository_)
812 return {};
813 std::vector<ConversationCommit> commits;
814 auto startLogging = options.from == "";
815 auto breakLogging = false;
816 repository_->log(
817 [&](const auto& id, const auto& author, const auto& commit) {
818 if (!commits.empty()) {
819 // Set linearized parent
820 commits.rbegin()->linearized_parent = id;
821 }
822 if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
823 return CallbackResult::Skip;
824 }
825 if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
826 return CallbackResult::Break; // Stop logging
827 if (breakLogging)
828 return CallbackResult::Break; // Stop logging
829 if (id == options.to) {
830 if (options.includeTo)
831 breakLogging = true; // For the next commit
832 else
833 return CallbackResult::Break; // Stop logging
834 }
835
836 if (!startLogging && options.from != "" && options.from == id)
837 startLogging = true;
838 if (!startLogging)
839 return CallbackResult::Skip; // Start logging after this one
840
841 if (options.fastLog) {
842 if (options.authorUri != "") {
843 if (options.authorUri == repository_->uriFromDevice(author.email)) {
844 return CallbackResult::Break; // Found author, stop
845 }
846 }
847 // Used to only count commit
848 commits.emplace(commits.end(), ConversationCommit {});
850 }
851
852 return CallbackResult::Ok; // Continue
853 },
854 [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
855 [](auto, auto, auto) { return false; },
856 options.from,
857 options.logIfNotFound);
858 return repository_->convCommitsToMap(commits);
859}
860
861std::vector<libjami::SwarmMessage>
863{
864 auto history = optHistory ? optHistory : &loadedHistory_;
865
866 // history->mutex is locked by the caller
867 if (!repository_ || history->loading) {
868 return {};
869 }
870 history->loading = true;
871
872 // By convention, if options.nbOfCommits is zero, then we
873 // don't impose a limit on the number of commits returned.
874 bool limitNbOfCommits = options.nbOfCommits > 0;
875
876 auto startLogging = options.from == "";
877 auto breakLogging = false;
878 auto currentHistorySize = loadedHistory_.messageList.size();
879 std::vector<std::string> replies;
880 std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
881 repository_->log(
882 /* preCondition */
883 [&](const auto& id, const auto& author, const auto& commit) {
884 if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
885 return CallbackResult::Skip;
886 }
887 if (id == options.to) {
888 if (options.includeTo)
889 breakLogging = true; // For the next commit
890 }
891 if (replies.empty()) { // This avoid load until
892 // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
893 // until this commit
894 if ((limitNbOfCommits
895 && (loadedHistory_.messageList.size() - currentHistorySize)
896 == options.nbOfCommits))
897 return CallbackResult::Break; // Stop logging
898 if (breakLogging)
899 return CallbackResult::Break; // Stop logging
900 if (id == options.to && !options.includeTo) {
901 return CallbackResult::Break; // Stop logging
902 }
903 }
904
905 if (!startLogging && options.from != "" && options.from == id)
906 startLogging = true;
907 if (!startLogging)
908 return CallbackResult::Skip; // Start logging after this one
909
910 if (options.fastLog) {
911 if (options.authorUri != "") {
912 if (options.authorUri == repository_->uriFromDevice(author.email)) {
913 return CallbackResult::Break; // Found author, stop
914 }
915 }
916 }
917
918 return CallbackResult::Ok; // Continue
919 },
920 /* emplaceCb */
921 [&](auto&& cc) {
922 if(limitNbOfCommits && (msgList.size() == options.nbOfCommits))
923 return;
924 auto optMessage = repository_->convCommitToMap(cc);
925 if (!optMessage.has_value())
926 return;
927 auto message = optMessage.value();
928 if (message.find("reply-to") != message.end()) {
929 auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
930 if(it == replies.end()) {
931 replies.emplace_back(message.at("reply-to"));
932 }
933 }
934 auto it = std::find(replies.begin(), replies.end(), message.at("id"));
935 if (it != replies.end()) {
936 replies.erase(it);
937 }
938 std::shared_ptr<libjami::SwarmMessage> firstMsg;
939 if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
940 firstMsg = *loadedHistory_.messageList.rbegin();
941 }
942 auto added = addToHistory(*history, {message}, false, false);
943 if (!added.empty() && firstMsg) {
945 repository_->id(),
946 *firstMsg);
947 }
948 msgList.insert(msgList.end(), added.begin(), added.end());
949 },
950 /* postCondition */
951 [&](auto, auto, auto) {
952 // Stop logging if there was a limit set on the number of commits
953 // to return and we reached it. This isn't strictly necessary since
954 // the check at the beginning of `emplaceCb` ensures that we won't
955 // return too many messages, but it prevents us from needlessly
956 // iterating over a (potentially) large number of commits.
957 return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
958 },
959 options.from,
960 options.logIfNotFound);
961
962 history->loading = false;
963 history->cv.notify_all();
964
965 // Convert for client (remove ptr)
966 std::vector<libjami::SwarmMessage> ret;
967 ret.reserve(msgList.size());
968 for (const auto& msg: msgList) {
969 ret.emplace_back(*msg);
970 }
971 return ret;
972}
973
974void
976 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
977{
978 auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
979 auto peditIt = history.pendingEditions.find(sharedCommit->id);
980 if (peditIt != history.pendingEditions.end()) {
981 auto oldBody = sharedCommit->body;
982 sharedCommit->body["body"] = peditIt->second.front()->body["body"];
983 if (sharedCommit->body.at("body").empty())
984 return;
985 history.pendingEditions.erase(peditIt);
986 }
987 if (it != history.quickAccess.end()) {
988 it->second->reactions.emplace_back(sharedCommit->body);
990 repository_->id(),
991 it->second->id,
992 sharedCommit->body);
993 } else {
994 history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
995 }
996}
997
998void
1000 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1001 bool messageReceived) const
1002{
1003 auto editId = sharedCommit->body.at("edit");
1004 auto it = history.quickAccess.find(editId);
1005 if (it != history.quickAccess.end()) {
1006 auto baseCommit = it->second;
1007 if (baseCommit) {
1008 auto itReact = baseCommit->body.find("react-to");
1009 std::string toReplace = (baseCommit->type == "application/data-transfer+json") ?
1010 "tid" : "body";
1011 auto body = sharedCommit->body.at(toReplace);
1012 // Edit reaction
1013 if (itReact != baseCommit->body.end()) {
1014 baseCommit->body[toReplace] = body; // Replace body if pending
1015 it = history.quickAccess.find(itReact->second);
1016 auto itPending = history.pendingReactions.find(itReact->second);
1017 if (it != history.quickAccess.end()) {
1018 baseCommit = it->second; // Base commit
1019 auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
1020 baseCommit->reactions.end(),
1021 [&](const auto& reaction) {
1022 return reaction.at("id") == editId;
1023 });
1024 if (itPreviousReact != baseCommit->reactions.end()) {
1025 (*itPreviousReact)[toReplace] = body;
1026 if (body.empty()) {
1027 baseCommit->reactions.erase(itPreviousReact);
1029 repository_
1030 ->id(),
1031 baseCommit->id,
1032 editId);
1033 }
1034 }
1035 } else if (itPending != history.pendingReactions.end()) {
1036 // Else edit if pending
1037 auto itReaction = std::find_if(itPending->second.begin(),
1038 itPending->second.end(),
1039 [&](const auto& reaction) {
1040 return reaction.at("id") == editId;
1041 });
1042 if (itReaction != itPending->second.end()) {
1043 (*itReaction)[toReplace] = body;
1044 if (body.empty())
1045 itPending->second.erase(itReaction);
1046 }
1047 } else {
1048 // Add to pending edtions
1049 messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1050 : history.pendingEditions[editId].emplace_back(sharedCommit);
1051 }
1052 } else {
1053 // Normal message
1054 it->second->editions.emplace(it->second->editions.begin(), it->second->body);
1055 it->second->body[toReplace] = sharedCommit->body[toReplace];
1056 if (toReplace == "tid") {
1057 // Avoid to replace fileId in client
1058 it->second->body["fileId"] = "";
1059 }
1060 // Remove reactions
1061 if (sharedCommit->body.at(toReplace).empty())
1062 it->second->reactions.clear();
1063 emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1064 }
1065 }
1066 } else {
1067 messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1068 : history.pendingEditions[editId].emplace_back(sharedCommit);
1069 }
1070}
1071
1072bool
1074 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1075 bool messageReceived) const
1076{
1077 if (messageReceived) {
1078 // For a received message, we place it at the beginning of the list
1079 if (!history.messageList.empty())
1080 sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1081 history.messageList.emplace_front(sharedCommit);
1082 } else {
1083 // For a loaded message, we load from newest to oldest
1084 // So we change the parent of the last message.
1085 if (!history.messageList.empty())
1086 (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1087 history.messageList.emplace_back(sharedCommit);
1088 }
1089 // Handle pending reactions/editions
1090 auto reactIt = history.pendingReactions.find(sharedCommit->id);
1091 if (reactIt != history.pendingReactions.end()) {
1092 for (const auto& commitBody : reactIt->second)
1093 sharedCommit->reactions.emplace_back(commitBody);
1094 history.pendingReactions.erase(reactIt);
1095 }
1096 auto peditIt = history.pendingEditions.find(sharedCommit->id);
1097 if (peditIt != history.pendingEditions.end()) {
1098 auto oldBody = sharedCommit->body;
1099 if (sharedCommit->type == "application/data-transfer+json") {
1100 sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
1101 sharedCommit->body["fileId"] = "";
1102 } else {
1103 sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1104 }
1105 peditIt->second.pop_front();
1106 for (const auto& commit : peditIt->second) {
1107 sharedCommit->editions.emplace_back(commit->body);
1108 }
1109 sharedCommit->editions.emplace_back(oldBody);
1110 history.pendingEditions.erase(peditIt);
1111 }
1112 // Announce to client
1113 if (messageReceived)
1115 repository_->id(),
1116 *sharedCommit);
1117 return !messageReceived;
1118}
1119
1120void Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
1121 History& history) const
1122{
1123
1124 auto parentIt = history.quickAccess.find(message->linearizedParent);
1125 auto currentMessage = message;
1126
1127 while(parentIt != history.quickAccess.end()){
1128 const auto& parent = parentIt->second;
1129 for (const auto& [peer, value] : message->status) {
1130 auto parentStatusIt = parent->status.find(peer);
1131 if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1132 parent->status[peer] = value;
1134 accountId_,
1135 repository_->id(),
1136 peer,
1137 parent->id,
1138 value);
1139 }
1140 else if(parentStatusIt->second >= value){
1141 break;
1142 }
1143 }
1145 parentIt = history.quickAccess.find(parent->linearizedParent);
1146 }
1147}
1148
1149std::vector<std::shared_ptr<libjami::SwarmMessage>>
1151 const std::vector<std::map<std::string, std::string>>& commits,
1152 bool messageReceived,
1153 bool commitFromSelf)
1154{
1155 //
1156 // NOTE: This function makes the following assumptions on its arguments:
1157 // - The messages in "history" are in reverse chronological order (newest message
1158 // first, oldest message last).
1159 // - If messageReceived is true, then the commits in "commits" are assumed to be in
1160 // chronological order (oldest to newest) and to be newer than the ones in "history".
1161 // They are therefore inserted at the beginning of the message list.
1162 // - If messageReceived is false, then the commits in "commits" are assumed to be in
1163 // reverse chronological order (newest to oldest) and to be older than the ones in
1164 // "history". They are therefore appended at the end of the message list.
1165 //
1166 auto acc = account_.lock();
1167 if (!acc)
1168 return {};
1169 auto username = acc->getUsername();
1170 if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1171 std::unique_lock lk(history.mutex);
1172 history.cv.wait(lk, [&] { return !history.loading; });
1173 }
1174
1175 // Only set messages' status on history for client
1176 bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1177
1178 std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1179 for (const auto& commit : commits) {
1180 auto commitId = commit.at("id");
1181 if (history.quickAccess.find(commitId) != history.quickAccess.end())
1182 continue; // Already present
1183 auto typeIt = commit.find("type");
1184 // Nothing to show for the client, skip
1185 if (typeIt != commit.end() && typeIt->second == "merge")
1186 continue;
1187
1188 auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1189 sharedCommit->fromMapStringString(commit);
1190
1192 // Check if we already have status information for the commit.
1193 auto itFuture = futureStatus.find(sharedCommit->id);
1194 if (itFuture != futureStatus.end()) {
1195 sharedCommit->status = std::move(itFuture->second);
1196 futureStatus.erase(itFuture);
1197 }
1198 }
1199
1200 sharedCommits.emplace_back(sharedCommit);
1201 }
1202
1204 constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1205 constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1206 constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1207
1208 std::lock_guard lk(messageStatusMtx_);
1209 for (const auto& member: repository_->members()) {
1210 // For each member, we iterate over the commits to add in reverse chronological
1211 // order (i.e. from newest to oldest) and set their status from the point of view
1212 // of that member (as best we can given the information we have).
1213 //
1214 // The key assumption made in order to compute the status is that it can never decrease
1215 // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1216 // therefore start by setting the "status" variable below to the lowest possible value,
1217 // and increase it when we encounter a commit for which it is justified to do so.
1218 //
1219 // If messageReceived is true, then the commits we are adding are the most recent in the
1220 // conversation history, so the lowest possible value is SENDING.
1221 //
1222 // If messageReceived is false, then the commits we are adding are older than the ones
1223 // that are already in the history, so the lowest possible value is the status of the
1224 // oldest message in the history so far, which is stored in memberToStatus.
1225 auto status = SENDING;
1226 if (!messageReceived) {
1227 auto cache = memberToStatus[member.uri];
1228 if (cache > status)
1229 status = cache;
1230 }
1231
1232 for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1233 auto sharedCommit = *it;
1234 auto previousStatus = status;
1235
1236 // Compute status for the current commit.
1237 if (status < SENT && messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1238 status = SENT;
1239 }
1240 if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1241 status = DISPLAYED;
1242 }
1243 if (member.uri == sharedCommit->body.at("author")) {
1244 status = DISPLAYED;
1245 }
1246 if(status < sharedCommit->status[member.uri]){
1247 status = sharedCommit->status[member.uri];
1248 }
1249
1250 // Store computed value.
1251 sharedCommit->status[member.uri] = status;
1252
1253 // Update messagesStatus_ if needed.
1254 if (previousStatus == SENDING && status >= SENT) {
1255 messagesStatus_[member.uri]["fetched"] = sharedCommit->id;
1256 }
1257 if (previousStatus <= SENT && status == DISPLAYED) {
1258 messagesStatus_[member.uri]["read"] = sharedCommit->id;
1259 }
1260 }
1261
1262 if (!messageReceived) {
1263 // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1264 memberToStatus[member.uri] = status;
1265 }
1266 }
1267 }
1268
1269 std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1270 for (const auto& sharedCommit : sharedCommits) {
1271 history.quickAccess[sharedCommit->id] = sharedCommit;
1272
1273 auto reactToIt = sharedCommit->body.find("react-to");
1274 auto editIt = sharedCommit->body.find("edit");
1275 if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1276 handleReaction(history, sharedCommit);
1277 } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1278 handleEdition(history, sharedCommit, messageReceived);
1279 } else if (handleMessage(history, sharedCommit, messageReceived)) {
1280 messages.emplace_back(sharedCommit);
1281 }
1282 rectifyStatus(sharedCommit, history);
1283 }
1284
1285 return messages;
1286}
1287
1288Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1290 const std::string& otherMember)
1291 : pimpl_ {new Impl {account, mode, otherMember}}
1292{}
1293
1294Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1295 const std::string& conversationId)
1296 : pimpl_ {new Impl {account, conversationId}}
1297{}
1298
1299Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1300 const std::string& remoteDevice,
1301 const std::string& conversationId)
1302 : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1303{}
1304
1306
1307std::string
1309{
1310 return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1311}
1312
1313void
1315{
1316 try {
1318 // Only authorize to add left members
1320 auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1321 if (it == initialMembers.end()) {
1322 JAMI_WARN("Unable to add new member in one to one conversation");
1323 cb(false, "");
1324 return;
1325 }
1326 }
1327 } catch (const std::exception& e) {
1328 JAMI_WARN("Unable to get mode: %s", e.what());
1329 cb(false, "");
1330 return;
1331 }
1332 if (isMember(contactUri, true)) {
1333 JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1334 cb(false, "");
1335 return;
1336 }
1337 if (isBanned(contactUri)) {
1338 if (pimpl_->isAdmin()) {
1339 dht::ThreadPool::io().run(
1340 [w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1341 if (auto sthis = w.lock()) {
1342 auto members = sthis->pimpl_->repository_->members();
1343 auto type = sthis->pimpl_->bannedType(contactUri);
1344 if (type.empty()) {
1345 cb(false, {});
1346 return;
1347 }
1348 sthis->pimpl_->voteUnban(contactUri, type, cb);
1349 }
1350 });
1351 } else {
1352 JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1353 cb(false, "");
1354 }
1355 return;
1356 }
1357
1358 dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1359 if (auto sthis = w.lock()) {
1360 // Add member files and commit
1361 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1362 auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1363 sthis->pimpl_->announce(commit, true);
1364 lk.unlock();
1365 if (cb)
1366 cb(!commit.empty(), commit);
1367 }
1368 });
1369}
1370
1371std::shared_ptr<dhtnet::ChannelSocket>
1372Conversation::gitSocket(const DeviceId& deviceId) const
1373{
1374 return pimpl_->gitSocket(deviceId);
1375}
1376
1377void
1379 const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1380{
1381 pimpl_->addGitSocket(deviceId, socket);
1382}
1383
1384void
1386{
1387 pimpl_->removeGitSocket(deviceId);
1388}
1389
1390void
1392{
1393 pimpl_->fallbackTimer_->cancel();
1394 pimpl_->gitSocketList_.clear();
1395 if (pimpl_->swarmManager_)
1396 pimpl_->swarmManager_->shutdown();
1397 std::lock_guard lk(pimpl_->membersMtx_);
1398 pimpl_->checkedMembers_.clear();
1399}
1400
1401void
1403{
1404 if (pimpl_->swarmManager_)
1405 pimpl_->swarmManager_->maintainBuckets();
1406}
1407
1408std::vector<jami::DeviceId>
1410{
1411 return pimpl_->swarmManager_->getRoutingTable().getAllNodes();
1412}
1413
1414std::shared_ptr<Typers>
1416{
1417 return pimpl_->typers_;
1418}
1419
1420bool
1421Conversation::hasSwarmChannel(const std::string& deviceId)
1422{
1423 if (!pimpl_->swarmManager_)
1424 return false;
1425 return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1426}
1427
1428void
1430 const std::string_view type,
1431 const OnDoneCb& cb)
1432{
1433 // Check if admin
1434 if (!isAdmin()) {
1435 JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1436 cb(false, {});
1437 return;
1438 }
1439
1440 // Vote for removal
1441 std::unique_lock lk(writeMtx_);
1442 auto voteCommit = repository_->voteUnban(contactUri, type);
1443 if (voteCommit.empty()) {
1444 JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1445 cb(false, "");
1446 return;
1447 }
1448
1449 auto lastId = voteCommit;
1450 std::vector<std::string> commits;
1451 commits.emplace_back(voteCommit);
1452
1453 // If admin, check vote
1454 auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1455 if (!resolveCommit.empty()) {
1456 commits.emplace_back(resolveCommit);
1457 lastId = resolveCommit;
1458 JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1459 }
1460 announce(commits, true);
1461 lk.unlock();
1462 if (cb)
1463 cb(!lastId.empty(), lastId);
1464}
1465
1466void
1468{
1469 dht::ThreadPool::io().run([w = weak(),
1470 contactUri = std::move(contactUri),
1471 isDevice = std::move(isDevice),
1472 cb = std::move(cb)] {
1473 if (auto sthis = w.lock()) {
1474 // Check if admin
1475 if (!sthis->pimpl_->isAdmin()) {
1476 JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1477 cb(false, {});
1478 return;
1479 }
1480
1481 // Get current user type
1482 std::string type;
1483 if (isDevice) {
1484 type = "devices";
1485 } else {
1486 auto members = sthis->pimpl_->repository_->members();
1487 for (const auto& member : members) {
1488 if (member.uri == contactUri) {
1489 if (member.role == MemberRole::INVITED) {
1490 type = "invited";
1491 } else if (member.role == MemberRole::ADMIN) {
1492 type = "admins";
1493 } else if (member.role == MemberRole::MEMBER) {
1494 type = "members";
1495 }
1496 break;
1497 }
1498 }
1499 if (type.empty()) {
1500 cb(false, {});
1501 return;
1502 }
1503 }
1504
1505 // Vote for removal
1506 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1507 auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1508 if (voteCommit.empty()) {
1509 JAMI_WARN("Kicking %s failed", contactUri.c_str());
1510 cb(false, "");
1511 return;
1512 }
1513
1514 auto lastId = voteCommit;
1515 std::vector<std::string> commits;
1516 commits.emplace_back(voteCommit);
1517
1518 // If admin, check vote
1519 auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1520 if (!resolveCommit.empty()) {
1521 commits.emplace_back(resolveCommit);
1522 lastId = resolveCommit;
1523 JAMI_WARN("Vote solved for %s. %s banned",
1524 contactUri.c_str(),
1525 isDevice ? "Device" : "Member");
1526 sthis->pimpl_->disconnectFromPeer(contactUri);
1527 }
1528
1529 sthis->pimpl_->announce(commits, true);
1530 lk.unlock();
1531 cb(!lastId.empty(), lastId);
1532 }
1533 });
1534}
1535
1536std::vector<std::map<std::string, std::string>>
1538{
1539 return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1540}
1541
1542std::set<std::string>
1543Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1544{
1545 return pimpl_->repository_->memberUris(filter, filteredRoles);
1546}
1547
1548std::vector<NodeId>
1550{
1551 const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1552 const auto& nodes = routingTable.getNodes();
1553 const auto& mobiles = routingTable.getMobileNodes();
1554 std::vector<NodeId> s;
1555 s.reserve(nodes.size() + mobiles.size());
1556 s.insert(s.end(), nodes.begin(), nodes.end());
1557 s.insert(s.end(), mobiles.begin(), mobiles.end());
1558 for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1559 if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1560 s.emplace_back(deviceId);
1561 return s;
1562}
1563
1564bool
1566{
1567 const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1568 return !routingTable.getNodes().empty();
1569}
1570
1571std::string
1572Conversation::uriFromDevice(const std::string& deviceId) const
1573{
1574 return pimpl_->repository_->uriFromDevice(deviceId);
1575}
1576
1577void
1579{
1580 pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1581}
1582
1583std::string
1585{
1586 return pimpl_->repository_->join();
1587}
1588
1589bool
1590Conversation::isMember(const std::string& uri, bool includeInvited) const
1591{
1592 auto repoPath = pimpl_->repoPath();
1593 auto invitedPath = repoPath / "invited";
1594 auto adminsPath = repoPath / "admins";
1595 auto membersPath = repoPath / "members";
1596 std::vector<std::filesystem::path> pathsToCheck = {adminsPath, membersPath};
1597 if (includeInvited)
1598 pathsToCheck.emplace_back(invitedPath);
1599 for (const auto& path : pathsToCheck) {
1600 for (const auto& certificate : dhtnet::fileutils::readDirectory(path)) {
1601 std::string_view crtUri = certificate;
1602 auto crtIt = crtUri.find(".crt");
1603 if (path != invitedPath && crtIt == std::string_view::npos) {
1604 JAMI_WARNING("Incorrect file found: {}/{}", path, certificate);
1605 continue;
1606 }
1607 if (crtIt != std::string_view::npos)
1608 crtUri = crtUri.substr(0, crtIt);
1609 if (crtUri == uri)
1610 return true;
1611 }
1612 }
1613
1615 for (const auto& member : getInitialMembers()) {
1616 if (member == uri)
1617 return true;
1618 }
1619 }
1620
1621 return false;
1622}
1623
1624bool
1625Conversation::isBanned(const std::string& uri) const
1626{
1627 return !pimpl_->bannedType(uri).empty();
1628}
1629
1630void
1631Conversation::sendMessage(std::string&& message,
1632 const std::string& type,
1633 const std::string& replyTo,
1635 OnDoneCb&& cb)
1636{
1637 Json::Value json;
1638 json["body"] = std::move(message);
1639 json["type"] = type;
1640 sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1641}
1642
1643void
1645 const std::string& replyTo,
1647 OnDoneCb&& cb)
1648{
1649 if (!replyTo.empty()) {
1650 auto commit = pimpl_->repository_->getCommit(replyTo);
1651 if (commit == std::nullopt) {
1652 JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1653 return;
1654 }
1655 value["reply-to"] = replyTo;
1656 }
1657 dht::ThreadPool::io().run(
1658 [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1659 if (auto sthis = w.lock()) {
1660 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1661 auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1662 lk.unlock();
1663 if (onCommit)
1664 onCommit(commit);
1665 sthis->pimpl_->announce(commit, true);
1666 if (cb)
1667 cb(!commit.empty(), commit);
1668 }
1669 });
1670}
1671
1672void
1674{
1675 dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1676 if (auto sthis = w.lock()) {
1677 std::vector<std::string> commits;
1678 commits.reserve(messages.size());
1679 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1680 for (const auto& message : messages) {
1681 auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(message));
1682 commits.emplace_back(std::move(commit));
1683 }
1684 lk.unlock();
1685 sthis->pimpl_->announce(commits, true);
1686 if (cb)
1687 cb(commits);
1688 }
1689 });
1690}
1691
1692std::optional<std::map<std::string, std::string>>
1693Conversation::getCommit(const std::string& commitId) const
1694{
1695 auto commit = pimpl_->repository_->getCommit(commitId);
1696 if (commit == std::nullopt)
1697 return std::nullopt;
1698 return pimpl_->repository_->convCommitToMap(*commit);
1699}
1700
1701void
1703{
1704 if (!cb)
1705 return;
1706 dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1707 if (auto sthis = w.lock()) {
1708 cb(sthis->pimpl_->loadMessages(options));
1709 }
1710 });
1711}
1712
1713void
1715{
1716 if (!cb)
1717 return;
1718 dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1719 if (auto sthis = w.lock()) {
1720 std::lock_guard lk(sthis->pimpl_->loadedHistory_.mutex);
1721 cb(sthis->pimpl_->loadMessages2(options));
1722 }
1723 });
1724}
1725
1726void
1728{
1729 std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1730 pimpl_->loadedHistory_.messageList.clear();
1731 pimpl_->loadedHistory_.quickAccess.clear();
1732 pimpl_->loadedHistory_.pendingEditions.clear();
1733 pimpl_->loadedHistory_.pendingReactions.clear();
1734 {
1735 std::lock_guard lk(pimpl_->messageStatusMtx_);
1736 pimpl_->memberToStatus.clear();
1737 }
1738}
1739
1740std::string
1742{
1743 {
1744 std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1745 if (!pimpl_->loadedHistory_.messageList.empty())
1746 return (*pimpl_->loadedHistory_.messageList.begin())->id;
1747 }
1749 options.nbOfCommits = 1;
1750 options.skipMerge = true;
1752 std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1753 auto res = pimpl_->loadMessages2(options, &optHistory);
1754 if (res.empty())
1755 return {};
1756 return (*optHistory.messageList.begin())->id;
1757}
1758
1759std::vector<std::map<std::string, std::string>>
1761{
1762 if (not repository_) {
1763 JAMI_WARNING("{} Invalid repo. Abort merge", toString());
1764 return {};
1765 }
1766 auto remoteHead = repository_->remoteHead(uri);
1767 if (remoteHead.empty()) {
1768 JAMI_WARNING("{} Unable to get HEAD of {}", toString(), uri);
1769 return {};
1770 }
1771
1772 // Validate commit
1773 auto [newCommits, err] = repository_->validFetch(uri);
1774 if (newCommits.empty()) {
1775 if (err)
1776 JAMI_ERROR("{} Unable to validate history with {}", toString(), uri);
1777 repository_->removeBranchWith(uri);
1778 return {};
1779 }
1780
1781 // If validated, merge
1782 auto [ok, cid] = repository_->merge(remoteHead);
1783 if (!ok) {
1784 JAMI_ERROR("{} Unable to merge history with {}", toString(), uri);
1785 repository_->removeBranchWith(uri);
1786 return {};
1787 }
1788 if (!cid.empty()) {
1789 // A merge commit was generated, should be added in new commits
1790 auto commit = repository_->getCommit(cid);
1791 if (commit != std::nullopt)
1792 newCommits.emplace_back(*commit);
1793 }
1794
1795 JAMI_LOG("{} Successfully merged history with {:s}", toString(), uri);
1796 auto result = repository_->convCommitsToMap(newCommits);
1797 for (auto& commit : result) {
1798 auto it = commit.find("type");
1799 if (it != commit.end() && it->second == "member") {
1800 repository_->refreshMembers();
1801
1802 if (commit["action"] == "ban")
1803 disconnectFromPeer(commit["uri"]);
1804 }
1805 }
1806 return result;
1807}
1808
1809bool
1810Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1811{
1812 std::lock_guard lk(pimpl_->pullcbsMtx_);
1813 auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId, std::deque<std::pair<std::string, OnPullCb>>());
1814 auto& pullcbs = it->second;
1815 auto itPull = std::find_if(pullcbs.begin(),
1816 pullcbs.end(),
1817 [&](const auto& elem) { return std::get<0>(elem) == commitId; });
1818 if (itPull != pullcbs.end()) {
1819 JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress", pimpl_->toString(), deviceId, commitId);
1820 cb(false);
1821 return false;
1822 }
1823 JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1824 pullcbs.emplace_back(std::move(commitId), std::move(cb));
1825 if (notInProgress)
1826 dht::ThreadPool::io().run([w = weak(), deviceId] {
1827 if (auto sthis_ = w.lock())
1828 sthis_->pimpl_->pull(deviceId);
1829 });
1830 return true;
1831}
1832
1833void
1834Conversation::Impl::pull(const std::string& deviceId)
1835{
1836 auto& repo = repository_;
1837
1838 std::string commitId;
1839 OnPullCb cb;
1840 while (true) {
1841 {
1842 std::lock_guard lk(pullcbsMtx_);
1843 auto it = fetchingRemotes_.find(deviceId);
1844 if (it == fetchingRemotes_.end()) {
1845 JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1846 break;
1847 }
1848 auto& pullcbs = it->second;
1849 if (pullcbs.empty()) {
1850 fetchingRemotes_.erase(it);
1851 break;
1852 }
1853 auto& elem = pullcbs.front();
1854 commitId = std::move(std::get<0>(elem));
1855 cb = std::move(std::get<1>(elem));
1856 pullcbs.pop_front();
1857 }
1858 // If recently fetched, the commit can already be there, so no need to do complex operations
1859 if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
1860 cb(true);
1861 continue;
1862 }
1863 // Pull from remote
1864 auto fetched = repo->fetch(deviceId);
1865 if (!fetched) {
1866 cb(false);
1867 continue;
1868 }
1869 auto oldHead = repo->getHead();
1870 std::string newHead = oldHead;
1871 std::unique_lock lk(writeMtx_);
1872 auto commits = mergeHistory(deviceId);
1873 if (!commits.empty()) {
1874 newHead = commits.rbegin()->at("id");
1875 // Note: Because clients needs to linearize the history, they need to know all commits
1876 // that can be updated.
1877 // In this case, all commits until the common merge base should be announced.
1878 // The client ill need to update it's model after this.
1879 std::string mergeBase = oldHead; // If fast-forward, the merge base is the previous head
1880 auto newHeadCommit = repo->getCommit(newHead);
1881 if (newHeadCommit != std::nullopt && newHeadCommit->parents.size() > 1) {
1882 mergeBase = repo->mergeBase(newHeadCommit->parents[0], newHeadCommit->parents[1]);
1884 options.to = mergeBase;
1886 // We announce commits from oldest to update to newest. This generally avoid
1887 // to get detached commits until they are all announced.
1888 std::reverse(std::begin(updatedCommits), std::end(updatedCommits));
1889 announce(updatedCommits);
1890 } else {
1891 announce(commits);
1892 }
1893 }
1894 lk.unlock();
1895
1896 bool commitFound = false;
1897 if (commitId != "") {
1898 // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1899 // We need to check if we actually got it; the fact that the fetch above was
1900 // successful doesn't guarantee that we did.
1901 for (const auto& commit : commits) {
1902 if (commit.at("id") == commitId) {
1903 commitFound = true;
1904 break;
1905 }
1906 }
1907 } else {
1908 commitFound = true;
1909 }
1910 if (!commitFound)
1911 JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1912 deviceId, commitId);
1913 // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1914 // for commit `commitId` to other members of the swarm. It's important that we only
1915 // send these notifications if we actually have the commit. Otherwise, we can end up
1916 // in a situation where the members of the swarm keep sending notifications to each
1917 // other for a commit that none of them have (note that we are unable to rule this out, as
1918 // nothing prevents a malicious user from intentionally sending a notification with
1919 // a fake commit ID).
1920 if (cb)
1921 cb(commitFound);
1922 // Announce if profile changed
1923 if (oldHead != newHead) {
1924 auto diffStats = repo->diffStats(newHead, oldHead);
1925 auto changedFiles = repo->changedFiles(diffStats);
1926 if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf")
1927 != changedFiles.end()) {
1929 accountId_, repo->id(), repo->infos());
1930 }
1931 }
1932 }
1933}
1934
1935void
1936Conversation::sync(const std::string& member,
1937 const std::string& deviceId,
1938 OnPullCb&& cb,
1939 std::string commitId)
1940{
1941 pull(deviceId, std::move(cb), commitId);
1942 dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1943 auto sthis = w.lock();
1944 // For waiting request, downloadFile
1945 for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1946 sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1947 }
1948 });
1949}
1950
1951std::map<std::string, std::string>
1953{
1954 // Invite the new member to the conversation
1955 Json::Value root;
1957 for (const auto& [k, v] : infos()) {
1958 if (v.size() >= 64000) {
1959 JAMI_WARNING("Cutting invite because the SIP message will be too long");
1960 continue;
1961 }
1962 metadata[k] = v;
1963 }
1965 return {{"application/invite+json", json::toString(root)}};
1966}
1967
1968std::string
1970{
1972 std::lock_guard lk(pimpl_->writeMtx_);
1973 return pimpl_->repository_->leave();
1974}
1975
1976void
1978{
1979 pimpl_->isRemoving_ = true;
1980}
1981
1982bool
1984{
1985 return pimpl_->isRemoving_;
1986}
1987
1988void
1990{
1991 if (pimpl_->conversationDataPath_ != "")
1992 dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1993 if (!pimpl_->repository_)
1994 return;
1995 std::lock_guard lk(pimpl_->writeMtx_);
1996 pimpl_->repository_->erase();
1997}
1998
2001{
2002 return pimpl_->repository_->mode();
2003}
2004
2005std::vector<std::string>
2007{
2008 return pimpl_->repository_->getInitialMembers();
2009}
2010
2011bool
2012Conversation::isInitialMember(const std::string& uri) const
2013{
2014 auto members = getInitialMembers();
2015 return std::find(members.begin(), members.end(), uri) != members.end();
2016}
2017
2018void
2019Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
2020{
2021 dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
2022 if (auto sthis = w.lock()) {
2023 auto& repo = sthis->pimpl_->repository_;
2024 std::unique_lock lk(sthis->pimpl_->writeMtx_);
2025 auto commit = repo->updateInfos(map);
2026 sthis->pimpl_->announce(commit, true);
2027 lk.unlock();
2028 if (cb)
2029 cb(!commit.empty(), commit);
2030 emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
2031 sthis->pimpl_->accountId_, repo->id(), repo->infos());
2032 }
2033 });
2034}
2035
2036std::map<std::string, std::string>
2038{
2039 return pimpl_->repository_->infos();
2040}
2041
2042void
2043Conversation::updatePreferences(const std::map<std::string, std::string>& map)
2044{
2045 auto filePath = pimpl_->conversationDataPath_ / "preferences";
2046 auto prefs = map;
2047 auto itLast = prefs.find(LAST_MODIFIED);
2048 if (itLast != prefs.end()) {
2049 if (std::filesystem::is_regular_file(filePath)) {
2051 try {
2052 if (lastModified >= std::stoul(itLast->second))
2053 return;
2054 } catch (...) {
2055 return;
2056 }
2057 }
2058 prefs.erase(itLast);
2059 }
2060
2061 std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
2062 msgpack::pack(file, prefs);
2064 id(),
2065 std::move(prefs));
2066}
2067
2068std::map<std::string, std::string>
2070{
2071 try {
2072 std::map<std::string, std::string> preferences;
2073 auto filePath = pimpl_->conversationDataPath_ / "preferences";
2075 msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
2076 oh.get().convert(preferences);
2079 return preferences;
2080 } catch (const std::exception& e) {
2081 }
2082 return {};
2083}
2084
2085std::vector<uint8_t>
2087{
2088 try {
2089 return fileutils::loadFile(pimpl_->repoPath() / "profile.vcf");
2090 } catch (...) {
2091 }
2092 return {};
2093}
2094
2095std::shared_ptr<TransferManager>
2097{
2098 return pimpl_->transferManager_;
2099}
2100
2101bool
2103 const std::string& fileId,
2104 std::filesystem::path& path,
2105 std::string& sha3sum) const
2106{
2107 if (!isMember(member))
2108 return false;
2109
2110 auto sep = fileId.find('_');
2111 if (sep == std::string::npos)
2112 return false;
2113
2114 auto interactionId = fileId.substr(0, sep);
2115 auto commit = getCommit(interactionId);
2116 if (commit == std::nullopt || commit->find("type") == commit->end()
2117 || commit->find("tid") == commit->end() || commit->find("sha3sum") == commit->end()
2118 || commit->at("type") != "application/data-transfer+json") {
2119 JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
2120 pimpl_->accountId_,
2121 member,
2122 interactionId);
2123 return false;
2124 }
2125
2126 path = dataTransfer()->path(fileId);
2127 sha3sum = commit->at("sha3sum");
2128
2129 return true;
2130}
2131
2132bool
2133Conversation::downloadFile(const std::string& interactionId,
2134 const std::string& fileId,
2135 const std::string& path,
2136 const std::string&,
2137 const std::string& deviceId)
2138{
2139 auto commit = getCommit(interactionId);
2140 if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
2141 JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
2142 return false;
2143 }
2144 auto tid = commit->find("tid");
2145 auto sha3sum = commit->find("sha3sum");
2146 auto size_str = commit->find("totalSize");
2147
2148 if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2149 JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2150 return false;
2151 }
2152
2153 auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2154 if (totalSize < 0) {
2155 JAMI_ERROR("Invalid file size {}", totalSize);
2156 return false;
2157 }
2158
2159 // Be sure to not lock conversation
2160 dht::ThreadPool().io().run([w = weak(),
2161 deviceId,
2162 fileId,
2163 interactionId,
2164 sha3sum = sha3sum->second,
2165 path,
2166 totalSize] {
2167 if (auto shared = w.lock()) {
2168 std::filesystem::path filePath(path);
2169 if (filePath.empty()) {
2170 filePath = shared->dataTransfer()->path(fileId);
2171 }
2172
2173 if (fileutils::size(filePath) == totalSize) {
2174 if (fileutils::sha3File(filePath) == sha3sum) {
2175 JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
2176 return;
2177 }
2178 }
2179
2180 std::filesystem::path tempFilePath(filePath);
2181 tempFilePath += ".tmp";
2182 auto start = fileutils::size(tempFilePath);
2183 if (start < 0)
2184 start = 0;
2185 size_t end = 0;
2186
2187 auto acc = shared->pimpl_->account_.lock();
2188 if (!acc)
2189 return;
2190 shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2191 acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2192 }
2193 });
2194 return true;
2195}
2196
2197void
2198Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2199{
2200 dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2201 auto sthis = w.lock();
2202 if (!sthis)
2203 return;
2204 // Update fetched for Uri
2205 auto uri = sthis->uriFromDevice(deviceId);
2206 if (uri.empty() || uri == sthis->pimpl_->userId_)
2207 return;
2208 // When a user fetches a commit, the message is sent for this person
2209 sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::SENT, commitId, std::to_string(std::time(nullptr)), true);
2210 });
2211}
2212
2213
2214void
2217 const std::string& commitId,
2218 const std::string& ts,
2219 bool emit)
2220{
2221 // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer send us a status and will emit to other connected devices.
2223 std::map<std::string, std::map<std::string, std::string>> newStatus;
2224 {
2225 // Update internal structures.
2226 std::lock_guard lk(messageStatusMtx_);
2227 auto& status = messagesStatus_[uri];
2228 auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2229 if (oldStatus == commitId)
2230 return; // Nothing to do
2231 options.to = oldStatus;
2232 options.from = commitId;
2234 status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2235 saveStatus();
2236 if (emit)
2237 newStatus[uri].insert(status.begin(), status.end());
2238 }
2239 if (emit && messageStatusCb_) {
2240 messageStatusCb_(newStatus);
2241 }
2242 // Update messages status for all commit between the old and new one
2243 options.logIfNotFound = false;
2244 options.fastLog = true;
2246 std::lock_guard lk(optHistory.mutex); // Avoid to announce messages while updating status.
2248 if (res.size() == 0) {
2249 // In this case, commit is not received yet, so we cache it
2250 futureStatus[commitId][uri] = static_cast<int32_t>(st);
2251 }
2252 for (const auto& [cid, _]: optHistory.quickAccess) {
2253 auto message = loadedHistory_.quickAccess.find(cid);
2254 if (message != loadedHistory_.quickAccess.end()) {
2255 // Update message and emit to client,
2256 if(static_cast<int32_t>(st) > message->second->status[uri]){
2257 message->second->status[uri] = static_cast<int32_t>(st);
2259 accountId_,
2260 repository_->id(),
2261 uri,
2262 cid,
2263 static_cast<int>(st));
2264 }
2265 } else {
2266 // In this case, commit is not loaded by client, so we cache it
2267 // No need to emit to client, they will get a correct status on load.
2268 futureStatus[cid][uri] = static_cast<int32_t>(st);
2269 }
2270 }
2271}
2272
2273bool
2274Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2275{
2276 std::lock_guard lk(pimpl_->messageStatusMtx_);
2277 if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2278 return false; // Nothing to do
2279 dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2280 auto sthis = w.lock();
2281 if (!sthis)
2282 return;
2283 sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::DISPLAYED, interactionId, std::to_string(std::time(nullptr)), true);
2284 });
2285 return true;
2286}
2287
2288std::map<std::string, std::map<std::string, std::string>>
2290{
2291 std::lock_guard lk(pimpl_->messageStatusMtx_);
2292 return pimpl_->messagesStatus_;
2293}
2294
2295void
2296Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2297{
2298 std::unique_lock lk(pimpl_->messageStatusMtx_);
2299 std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2300 for (const auto& [uri, status] : messageStatus) {
2301 auto& oldMs = pimpl_->messagesStatus_[uri];
2302 if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2303 if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2304 stVec.emplace_back(libjami::Account::MessageStates::SENT, uri, status.at("fetched"), status.at("fetched_ts"));
2305 }
2306 }
2307 if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2308 if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2309 stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED, uri, status.at("read"), status.at("read_ts"));
2310 }
2311 }
2312 }
2313 lk.unlock();
2314
2315 for (const auto& [status, uri, commitId, ts] : stVec) {
2316 pimpl_->updateStatus(uri, status, commitId, ts);
2317 }
2318}
2319
2320void
2321Conversation::onMessageStatusChanged(const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2322{
2323 std::unique_lock lk(pimpl_->messageStatusMtx_);
2324 pimpl_->messageStatusCb_ = cb;
2325}
2326
2327#ifdef LIBJAMI_TEST
2328void
2329Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2330{
2331 pimpl_->bootstrapCbTest_ = cb;
2332}
2333#endif
2334
2335void
2336Conversation::checkBootstrapMember(const asio::error_code& ec,
2337 std::vector<std::map<std::string, std::string>> members)
2338{
2339 if (ec == asio::error::operation_aborted)
2340 return;
2341 auto acc = pimpl_->account_.lock();
2342 if (pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 or not acc)
2343 return;
2344 // We bootstrap the DRT with devices who already wrote in the repository.
2345 // However, in a conversation, a large number of devices may just watch
2346 // the conversation, but never write any message.
2347 std::unique_lock lock(pimpl_->membersMtx_);
2348
2349 std::string uri;
2350 while (!members.empty()) {
2351 auto member = std::move(members.back());
2352 members.pop_back();
2353 uri = std::move(member.at("uri"));
2354 if (uri != pimpl_->userId_
2355 && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2356 break;
2357 }
2358 auto fallbackFailed = [](auto sthis) {
2359 JAMI_LOG("{} Bootstrap: Fallback failed. Wait for remote connections.",
2360 sthis->pimpl_->toString());
2361#ifdef LIBJAMI_TEST
2362 if (sthis->pimpl_->bootstrapCbTest_)
2363 sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2364#endif
2365 };
2366 // If members is empty, we finished the fallback un-successfully
2367 if (members.empty() && uri.empty()) {
2368 lock.unlock();
2369 fallbackFailed(this);
2370 return;
2371 }
2372
2373 // Fallback, check devices of a member (we didn't check yet) in the conversation
2374 pimpl_->checkedMembers_.emplace(uri);
2375 auto devices = std::make_shared<std::vector<NodeId>>();
2376 acc->forEachDevice(
2377 dht::InfoHash(uri),
2378 [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2379 // Test if already sent
2380 if (auto sthis = w.lock()) {
2381 if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2382 devices->emplace_back(dev->getLongId());
2383 }
2384 },
2385 [w = weak(), devices, members = std::move(members), uri, fallbackFailed=std::move(fallbackFailed)](bool ok) {
2386 auto sthis = w.lock();
2387 if (!sthis)
2388 return;
2389 auto checkNext = true;
2390 if (ok && devices->size() != 0) {
2391#ifdef LIBJAMI_TEST
2392 if (sthis->pimpl_->bootstrapCbTest_)
2393 sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2394#endif
2395 JAMI_LOG("{} Bootstrap: Fallback with member: {}",
2396 sthis->pimpl_->toString(),
2397 uri);
2398 if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2399 checkNext = false;
2400 }
2401 if (checkNext) {
2402 // Check next member
2403 sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2404 sthis->pimpl_->fallbackTimer_->async_wait(
2405 std::bind(&Conversation::checkBootstrapMember,
2406 sthis,
2407 std::placeholders::_1,
2408 std::move(members)));
2409 } else {
2410 // In this case, all members are checked. Fallback failed
2412 }
2413 });
2414}
2415
2416void
2418 const std::vector<DeviceId>& knownDevices)
2419{
2420 if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2421 return;
2422 // Here, we bootstrap the DRT with devices who already wrote in the conversation
2423 // If this doesn't work, it will attempt to fallback with checkBootstrapMember
2424 // If it works, the callback onConnectionChanged will be called with ok=true
2425 pimpl_->bootstrapCb_ = std::move(onBootstraped);
2426 std::vector<DeviceId> devices = knownDevices;
2427 for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2428 if (!isBanned(member))
2429 devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2430 }
2431 JAMI_DEBUG("{} Bootstrap with {} device(s)",
2432 pimpl_->toString(),
2433 devices.size());
2434 // set callback
2435 auto fallback = [](auto sthis, bool now = false) {
2436 // Fallback
2437 auto acc = sthis->pimpl_->account_.lock();
2438 if (!acc)
2439 return;
2440 auto members = sthis->getMembers(false, false);
2441 std::shuffle(members.begin(), members.end(), acc->rand);
2442 if (now) {
2443 sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2444 } else {
2445 auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2446 sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2447 - std::chrono::seconds(timeForBootstrap));
2448 JAMI_DEBUG("{} Fallback in {} seconds",
2449 sthis->pimpl_->toString(),
2450 (20 - timeForBootstrap));
2451 }
2452 sthis->pimpl_->fallbackTimer_->async_wait(std::bind(&Conversation::checkBootstrapMember,
2453 sthis,
2454 std::placeholders::_1,
2455 std::move(members)));
2456 };
2457
2458 pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2459 // This will call methods from accounts, so trigger on another thread.
2460 dht::ThreadPool::io().run([w, ok, fallback=std::move(fallback)] {
2461 auto sthis = w.lock();
2462 if (!sthis)
2463 return;
2464 if (ok) {
2465 // Bootstrap succeeded!
2466 {
2467 std::lock_guard lock(sthis->pimpl_->membersMtx_);
2468 sthis->pimpl_->checkedMembers_.clear();
2469 }
2470 if (sthis->pimpl_->bootstrapCb_)
2471 sthis->pimpl_->bootstrapCb_();
2472#ifdef LIBJAMI_TEST
2473 if (sthis->pimpl_->bootstrapCbTest_)
2474 sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2475#endif
2476 return;
2477 }
2478 fallback(sthis);
2479 });
2480 });
2481 {
2482 std::lock_guard lock(pimpl_->membersMtx_);
2483 pimpl_->checkedMembers_.clear();
2484 }
2485 // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic connectivity change
2486 if (pimpl_->swarmManager_->isShutdown()) {
2487 pimpl_->swarmManager_->restart();
2488 pimpl_->swarmManager_->maintainBuckets();
2489 } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2490 fallback(this, true);
2491 }
2492}
2493
2494std::vector<std::string>
2496{
2497 pimpl_->loadActiveCalls();
2498 pimpl_->loadHostedCalls();
2499 auto commits = pimpl_->commitsEndedCalls();
2500 if (!commits.empty()) {
2501 // Announce to client
2502 dht::ThreadPool::io().run([w = weak(), commits] {
2503 if (auto sthis = w.lock())
2504 sthis->pimpl_->announce(commits, true);
2505 });
2506 }
2507 return commits;
2508}
2509
2510void
2512{
2513 pimpl_->onMembersChanged_ = std::move(cb);
2514 pimpl_->repository_->onMembersChanged([w=weak()] (const std::set<std::string>& memberUris) {
2515 if (auto sthis = w.lock())
2516 sthis->pimpl_->onMembersChanged_(memberUris);
2517 });
2518}
2519
2520void
2522{
2523 pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket),
2524 w=weak()](const std::string& deviceId, ChannelCb&& cb) {
2525 if (auto sthis = w.lock())
2526 needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2527 };
2528}
2529
2530void
2531Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2532{
2533 auto deviceId = channel->deviceId();
2534 // Transmit avatar if necessary
2535 // We do this here, because at this point we know both sides are connected and in
2536 // the same conversation
2537 // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2538 auto cert = channel->peerCertificate();
2539 if (!cert || !cert->issuer)
2540 return;
2541 auto member = cert->issuer->getId().toString();
2542 pimpl_->swarmManager_->addChannel(std::move(channel));
2543 dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2544 auto sthis = w.lock();
2545 if (auto account = a.lock()) {
2546 account->sendProfile(sthis->id(), member, deviceId.toString());
2547 }
2548 });
2549}
2550
2553 const std::string& fromId,
2554 const std::string& authorUri) const
2555{
2557 options.to = toId;
2558 options.from = fromId;
2559 options.authorUri = authorUri;
2560 options.logIfNotFound = false;
2561 options.fastLog = true;
2563 std::lock_guard lk(history.mutex);
2564 auto res = pimpl_->loadMessages2(options, &history);
2565 return res.size();
2566}
2567
2568void
2570 const Filter& filter,
2571 const std::shared_ptr<std::atomic_int>& flag) const
2572{
2573 // Because logging a conversation can take quite some time,
2574 // do it asynchronously
2575 dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2576 if (auto sthis = w.lock()) {
2577 History history;
2578 std::vector<std::map<std::string, std::string>> commits {};
2579 // std::regex_constants::ECMAScript is the default flag.
2580 auto re = std::regex(filter.regexSearch,
2581 filter.caseSensitive ? std::regex_constants::ECMAScript
2582 : std::regex_constants::icase);
2583 sthis->pimpl_->repository_->log(
2584 [&](const auto& id, const auto& author, auto& commit) {
2585 if (!filter.author.empty()
2586 && filter.author != sthis->uriFromDevice(author.email)) {
2587 // Filter author
2588 return CallbackResult::Skip;
2589 }
2590 auto commitTime = git_commit_time(commit.get());
2591 if (filter.before && filter.before < commitTime) {
2592 // Only get commits before this date
2593 return CallbackResult::Skip;
2594 }
2595 if (filter.after && filter.after > commitTime) {
2596 // Only get commits before this date
2597 if (git_commit_parentcount(commit.get()) <= 1)
2598 return CallbackResult::Break;
2599 else
2600 return CallbackResult::Skip; // Because we are sorting it with
2601 // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2602 }
2603
2604 return CallbackResult::Ok; // Continue
2605 },
2606 [&](auto&& cc) {
2607 if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2608 sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2609 },
2610 [&](auto id, auto, auto) {
2611 if (id == filter.lastId)
2612 return true;
2613 return false;
2614 },
2615 "",
2616 false);
2617 // Search on generated history
2618 for (auto& message : history.messageList) {
2619 auto contentType = message->type;
2620 auto isSearchable = contentType == "text/plain"
2621 || contentType == "application/data-transfer+json";
2622 if (filter.type.empty() && !isSearchable) {
2623 // Not searchable, at least for now
2624 continue;
2625 } else if (contentType == filter.type || filter.type.empty()) {
2626 if (isSearchable) {
2627 // If it's a text match the body, else the display name
2628 auto body = contentType == "text/plain" ? message->body.at("body")
2629 : message->body.at("displayName");
2630 std::smatch body_match;
2631 if (std::regex_search(body, body_match, re)) {
2632 auto commit = message->body;
2633 commit["id"] = message->id;
2634 commit["type"] = message->type;
2635 commits.emplace_back(commit);
2636 }
2637 } else {
2638 // Matching type, just add it to the results
2639 commits.emplace_back(message->body);
2640 }
2641
2642 if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2643 break;
2644 }
2645 }
2646
2647 if (commits.size() > 0)
2649 sthis->pimpl_->accountId_,
2650 sthis->id(),
2651 std::move(commits));
2652 // If we're the latest thread, inform client that the search is finished
2653 if ((*flag)-- == 1 /* decrement return the old value */) {
2655 req,
2656 sthis->pimpl_->accountId_,
2657 std::string {},
2658 std::vector<std::map<std::string, std::string>> {});
2659 }
2660 }
2661 });
2662}
2663
2664void
2666{
2667 if (!message.isMember("confId")) {
2668 JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2669 return;
2670 }
2671
2672 auto now = std::chrono::system_clock::now();
2673 auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2674 {
2675 std::lock_guard lk(pimpl_->activeCallsMtx_);
2676 pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2677 pimpl_->saveHostedCalls();
2678 }
2679
2680 sendMessage(std::move(message), "", {}, std::move(cb));
2681}
2682
2683bool
2684Conversation::isHosting(const std::string& confId) const
2685{
2686 auto info = infos();
2687 if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2688 return true; // We are the current device Host
2689 std::lock_guard lk(pimpl_->activeCallsMtx_);
2690 return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2691}
2692
2693void
2695{
2696 if (!message.isMember("confId")) {
2697 JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2698 return;
2699 }
2700
2701 auto erased = false;
2702 {
2703 std::lock_guard lk(pimpl_->activeCallsMtx_);
2704 erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2705 }
2706 if (erased) {
2707 pimpl_->saveHostedCalls();
2708 sendMessage(std::move(message), "", {}, std::move(cb));
2709 } else
2710 cb(false, "");
2711}
2712
2713std::vector<std::map<std::string, std::string>>
2715{
2716 std::lock_guard lk(pimpl_->activeCallsMtx_);
2717 return pimpl_->activeCalls_;
2718}
2719} // namespace jami
#define _(S)
Definition SHA3.cpp:123
This class gives access to the git repository that represents the conversation.
static LIBJAMI_TEST_EXPORT std::unique_ptr< ConversationRepository > cloneConversation(const std::shared_ptr< JamiAccount > &account, const std::string &deviceId, const std::string &conversationId, std::function< void(std::vector< ConversationCommit >)> &&checkCommitCb={})
Clones a conversation on a remote device.
std::map< std::string, int32_t > memberToStatus
Status: 0 = commited, 1 = fetched, 2 = read This cache the curent status to add in the messages.
void addGitSocket(const DeviceId &deviceId, const std::shared_ptr< dhtnet::ChannelSocket > &socket)
void updateStatus(const std::string &uri, libjami::Account::MessageStates status, const std::string &commitId, const std::string &ts, bool emit=false)
std::filesystem::path conversationDataPath_
Impl(const std::shared_ptr< JamiAccount > &account, const std::string &remoteDevice, const std::string &conversationId)
std::vector< std::string > commitsEndedCalls()
If, for whatever reason, the daemon is stopped while hosting a conference, we need to announce the en...
std::shared_ptr< Typers > typers_
std::shared_ptr< dhtnet::ChannelSocket > gitSocket(const DeviceId &deviceId) const
void removeGitSocket(const DeviceId &deviceId)
std::set< std::string > checkedMembers_
std::filesystem::path statusPath_
std::filesystem::path repoPath() const
void disconnectFromPeer(const std::string &peerUri)
Remove all git sockets and all DRT nodes associated with the given peer.
History loadedHistory_
Loaded history represents the linearized history to show for clients.
void announce(const std::vector< std::string > &commits, bool commitFromSelf=false)
std::map< std::string, std::map< std::string, std::string > > messagesStatus_
std::shared_ptr< asio::io_context > ioContext_
std::unique_ptr< ConversationRepository > repository_
std::filesystem::path preferencesPath_
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
std::filesystem::path activeCallsPath_
std::filesystem::path fetchedPath_
void updateActiveCalls(const std::map< std::string, std::string > &commit, bool eraseOnly=false, bool emitSig=true) const
Update activeCalls_ via announced commits (in load or via new commits)
std::vector< std::map< std::string, std::string > > loadMessages(const LogOptions &options)
void handleReaction(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit) const
std::vector< std::shared_ptr< libjami::SwarmMessage > > addToHistory(History &history, const std::vector< std::map< std::string, std::string > > &commits, bool messageReceived=false, bool commitFromSelf=false)
std::shared_ptr< SwarmManager > swarmManager_
Impl(const std::shared_ptr< JamiAccount > &account, const std::string &conversationId)
void voteUnban(const std::string &contactUri, const std::string_view type, const OnDoneCb &cb)
void announce(const std::string &commitId, bool commitFromSelf=false)
std::shared_ptr< TransferManager > transferManager_
void initActiveCalls(const std::vector< std::map< std::string, std::string > > &commits) const
Initialize activeCalls_ from the list of commits in the repository.
std::atomic_bool isRemoving_
Impl(const std::shared_ptr< JamiAccount > &account, ConversationMode mode, const std::string &otherMember="")
OnMembersChanged onMembersChanged_
std::unique_ptr< asio::steady_timer > fallbackTimer_
std::map< std::string, std::deque< std::pair< std::string, OnPullCb > > > fetchingRemotes_
std::vector< std::map< std::string, std::string > > activeCalls_
void pull(const std::string &deviceId)
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited, bool includeLeft) const
std::function< void(const std::map< std::string, std::map< std::string, std::string > > &)> messageStatusCb_
void announce(const std::vector< std::map< std::string, std::string > > &commits, bool commitFromSelf=false)
std::weak_ptr< JamiAccount > account_
std::string_view bannedType(const std::string &uri) const
void handleEdition(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit, bool messageReceived) const
std::vector< libjami::SwarmMessage > loadMessages2(const LogOptions &options, History *optHistory=nullptr)
std::filesystem::path hostedCallsPath_
std::string toString() const
std::filesystem::path sendingPath_
std::vector< std::map< std::string, std::string > > mergeHistory(const std::string &uri)
std::map< std::string, std::map< std::string, int32_t > > futureStatus
void init(const std::shared_ptr< JamiAccount > &account)
std::mutex messageStatusMtx_
{uri, { {"fetch", "commitId"}, {"fetched_ts", "timestamp"}, {"read", "commitId"}, {"read_ts",...
std::function< void()> bootstrapCb_
std::map< std::string, uint64_t > hostedCalls_
void rectifyStatus(const std::shared_ptr< libjami::SwarmMessage > &message, History &history) const
bool handleMessage(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit, bool messageReceived) const
bool pull(const std::string &deviceId, OnPullCb &&cb, std::string commitId="")
Fetch and merge from peer.
std::set< std::string > memberUris(std::string_view filter={}, const std::set< MemberRole > &filteredRoles={MemberRole::INVITED, MemberRole::LEFT, MemberRole::BANNED}) const
std::shared_ptr< Typers > typers() const
Get Typers object.
bool downloadFile(const std::string &interactionId, const std::string &fileId, const std::string &path, const std::string &member="", const std::string &deviceId="")
Adds a file to the waiting list and ask members.
void addGitSocket(const DeviceId &deviceId, const std::shared_ptr< dhtnet::ChannelSocket > &socket)
std::string leave()
Leave a conversation.
std::string id() const
Get conversation's id.
void clearCache()
Clear all cached messages.
std::vector< std::string > commitsEndedCalls()
Refresh active calls.
std::map< std::string, std::map< std::string, std::string > > messageStatus() const
Retrieve last displayed and fetch status per member.
void loadMessages2(const OnLoadMessages2 &cb, const LogOptions &options)
Get a range of messages.
bool isMember(const std::string &uri, bool includeInvited=false) const
Test if an URI is a member.
void search(uint32_t req, const Filter &filter, const std::shared_ptr< std::atomic_int > &flag) const
Search in the conversation via a filter.
std::vector< uint8_t > vCard() const
std::vector< std::string > getInitialMembers() const
One to one util, get initial members.
void connectivityChanged()
If we change from one network to one another, we will need to update the state of the connections.
std::shared_ptr< TransferManager > dataTransfer() const
Access to transfer manager.
void removeGitSocket(const DeviceId &deviceId)
std::vector< std::map< std::string, std::string > > currentCalls() const
Return current detected calls.
void onMembersChanged(OnMembersChanged &&cb)
void bootstrap(std::function< void()> onBootstraped, const std::vector< DeviceId > &knownDevices)
Bootstrap swarm manager to other peers.
std::map< std::string, std::string > infos() const
Retrieve current infos (title, description, avatar, mode)
void monitor()
Print the state of the DRT linked to the conversation.
std::vector< NodeId > peersToSyncWith() const
Get peers to sync with.
void removeActiveConference(Json::Value &&message, OnDoneCb &&cb={})
Announce the end of a call.
bool isInitialMember(const std::string &uri) const
ConversationMode mode() const
Get conversation's mode.
std::string join()
Join a conversation.
void addSwarmChannel(std::shared_ptr< dhtnet::ChannelSocket > channel)
Add swarm connection to the DRT.
std::vector< jami::DeviceId > getDeviceIdList() const
bool setMessageDisplayed(const std::string &uri, const std::string &interactionId)
Store last read commit (returned in getMembers)
void loadMessages(OnLoadMessages cb, const LogOptions &options)
Get a range of messages.
void updateInfos(const std::map< std::string, std::string > &map, const OnDoneCb &cb={})
Change repository's infos.
void updateMessageStatus(const std::map< std::string, std::map< std::string, std::string > > &messageStatus)
Update fetch/read status.
void onMessageStatusChanged(const std::function< void(const std::map< std::string, std::map< std::string, std::string > > &)> &cb)
void shutdownConnections()
Stop SwarmManager, bootstrap and gitSockets.
void updatePreferences(const std::map< std::string, std::string > &map)
Change user's preferences.
void sendMessages(std::vector< Json::Value > &&messages, OnMultiDoneCb &&cb={})
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited=false, bool includeLeft=false, bool includeBanned=false) const
std::map< std::string, std::string > preferences(bool includeLastModified) const
Retrieve current preferences (color, notification, etc)
void sync(const std::string &member, const std::string &deviceId, OnPullCb &&cb, std::string commitId="")
Fetch new commits and re-ask for waiting files.
std::shared_ptr< dhtnet::ChannelSocket > gitSocket(const DeviceId &deviceId) const
Git operations will need a ChannelSocket for cloning/fetching commits Because libgit2 is a C library,...
void onNeedSocket(NeedSocketCb cb)
Set the callback that will be called whenever a new socket will be needed.
std::string uriFromDevice(const std::string &deviceId) const
Retrieve the uri from a deviceId.
std::optional< std::map< std::string, std::string > > getCommit(const std::string &commitId) const
Retrieve one commit.
void erase()
Erase all related datas.
void setRemovingFlag()
Set a conversation as removing (when loading convInfo and still not sync)
void sendMessage(std::string &&message, const std::string &type="text/plain", const std::string &replyTo="", OnCommitCb &&onCommit={}, OnDoneCb &&cb={})
void hasFetched(const std::string &deviceId, const std::string &commitId)
Store information about who fetch or not.
bool isHosting(const std::string &confId) const
Check if we're currently hosting this conference.
bool isBootstraped() const
Check if we're at least connected to one node.
std::string lastCommitId() const
Get last commit id.
bool isBanned(const std::string &uri) const
Conversation(const std::shared_ptr< JamiAccount > &account, ConversationMode mode, const std::string &otherMember="")
bool onFileChannelRequest(const std::string &member, const std::string &fileId, std::filesystem::path &path, std::string &sha3sum) const
Choose if we can accept channel request.
void addMember(const std::string &contactUri, const OnDoneCb &cb={})
Add conversation member.
bool hasSwarmChannel(const std::string &deviceId)
Used to avoid multiple connections, we just check if we got a swarm channel with a specific device.
bool isRemoving()
Check if we are removing the conversation.
void removeMember(const std::string &contactUri, bool isDevice, const OnDoneCb &cb={})
void hostConference(Json::Value &&message, OnDoneCb &&cb={})
Host a conference in the conversation.
uint32_t countInteractions(const std::string &toId, const std::string &fromId="", const std::string &authorUri="") const
Retrieve how many interactions there is from HEAD to interactionId.
std::map< std::string, std::string > generateInvitation() const
Generate an invitation to send to new contacts.
static LIBJAMI_TEST_EXPORT Manager & instance()
Definition manager.cpp:676
std::shared_ptr< asio::io_context > ioContext() const
Definition manager.cpp:1712
std::mt19937_64 getSeededRandomEngine()
Definition manager.cpp:2738
#define JAMI_ERR(...)
Definition logger.h:218
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:226
#define JAMI_WARN(...)
Definition logger.h:217
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
Definition Address.h:25
static constexpr const char * CREATED
static constexpr const char * REMOVED
static constexpr const char * ID
static constexpr const char * RECEIVED
static constexpr const char * METADATAS
static constexpr const char * MEMBERS
static constexpr const char * DECLINED
static constexpr const char * ERASED
static constexpr const char * FROM
static constexpr const char * CONVERSATIONID
static constexpr const char * HOSTED_CALLS
static constexpr const char * PREFERENCES
static constexpr const char * ACTIVE_CALLS
static constexpr const char * LAST_DISPLAYED
const std::filesystem::path & get_data_dir()
std::vector< uint8_t > loadFile(const std::filesystem::path &path, const std::filesystem::path &default_dir)
Read the full content of a file at path.
uint64_t lastWriteTimeInSeconds(const std::filesystem::path &filePath)
Return the last write time (epoch time) of a given file path (in seconds).
std::filesystem::path getFullPath(const std::filesystem::path &base, const std::filesystem::path &path)
If path is relative, it is appended to base.
std::string toString(const Json::Value &jsonVal)
Definition json_utils.h:42
std::function< void(const std::string &, const std::string &, ChannelCb &&, const std::string &)> NeedSocketCb
std::function< void(std::vector< libjami::SwarmMessage > &&messages)> OnLoadMessages2
dht::PkId DeviceId
static constexpr std::string_view toString(AuthDecodingState state)
dht::h256 NodeId
std::function< void(const std::vector< std::string > &)> OnMultiDoneCb
void emitSignal(Args... args)
Definition ring_signal.h:64
std::function< void(bool, const std::string &)> OnDoneCb
std::list< std::shared_ptr< libjami::SwarmMessage > > MessageList
std::function< void(std::vector< std::map< std::string, std::string > > &&messages)> OnLoadMessages
std::function< bool(const std::shared_ptr< dhtnet::ChannelSocket > &)> ChannelCb
std::function< void(bool fetchOk)> OnPullCb
std::map< DeviceId, std::shared_ptr< dhtnet::ChannelSocket > > GitSocketList
std::function< void(const std::string &)> OnCommitCb
static const char *const LAST_MODIFIED
constexpr auto EFETCH
std::function< void(const std::set< std::string > &)> OnMembersChanged
bool regex_search(string_view sv, svmatch &m, const regex &e, regex_constants::match_flag_type flags=regex_constants::match_default)
std::set< std::string > members
ConvInfo()=default
std::string lastDisplayed
Json::Value toJson() const
std::string id
std::map< std::string, std::string > metadatas
Json::Value toJson() const
std::map< std::string, std::string > toMap() const
std::mutex mutex
std::map< std::string, std::shared_ptr< libjami::SwarmMessage > > quickAccess
std::map< std::string, std::list< std::map< std::string, std::string > > > pendingReactions
MessageList messageList
std::map< std::string, std::list< std::shared_ptr< libjami::SwarmMessage > > > pendingEditions
std::condition_variable cv