Ring Daemon 16.0.0
Loading...
Searching...
No Matches
conversation.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "conversation.h"
18
19#include "account_const.h"
20#include "jamiaccount.h"
21#include "client/ring_signal.h"
22#include "swarm/swarm_manager.h"
23#ifdef ENABLE_PLUGIN
24#include "manager.h"
26#include "plugin/streamdata.h"
27#endif
29
30#include "fileutils.h"
31#include "json_utils.h"
32
33#include <opendht/thread_pool.h>
34#include <fmt/compile.h>
35
36#include <charconv>
37#include <string_view>
38#include <tuple>
39#include <optional>
40
41namespace jami {
42
43static const char* const LAST_MODIFIED = "lastModified";
44
45ConvInfo::ConvInfo(const Json::Value& json)
46{
47 id = json[ConversationMapKeys::ID].asString();
48 created = json[ConversationMapKeys::CREATED].asLargestUInt();
49 removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
50 erased = json[ConversationMapKeys::ERASED].asLargestUInt();
51 for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
52 members.emplace(v["uri"].asString());
53 }
55}
56
57Json::Value
59{
60 Json::Value json;
62 json[ConversationMapKeys::CREATED] = Json::Int64(created);
63 if (removed) {
64 json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
65 }
66 if (erased) {
67 json[ConversationMapKeys::ERASED] = Json::Int64(erased);
68 }
69 for (const auto& m : members) {
70 Json::Value member;
71 member["uri"] = m;
73 }
75 return json;
76}
77
78// ConversationRequest
80{
81 received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
82 declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
83 from = json[ConversationMapKeys::FROM].asString();
86 for (const auto& member : md.getMemberNames()) {
87 metadatas.emplace(member, md[member].asString());
88 }
89}
90
91Json::Value
93{
94 Json::Value json;
98 if (declined)
100 for (const auto& [key, value] : metadatas) {
102 }
103 return json;
104}
105
106std::map<std::string, std::string>
108{
109 auto result = metadatas;
112 if (declined)
113 result[ConversationMapKeys::DECLINED] = std::to_string(declined);
114 result[ConversationMapKeys::RECEIVED] = std::to_string(received);
115 return result;
116}
117
118using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
119
121{
122 // While loading the history, we need to avoid:
123 // - reloading history (can just be ignored)
124 // - adding new commits (should wait for history to be loaded)
125 std::mutex mutex {};
126 std::condition_variable cv {};
127 bool loading {false};
129 std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
130 std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
131 std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
132};
133
135{
136public:
137 Impl(const std::shared_ptr<JamiAccount>& account,
139 const std::string& otherMember = "")
142 , accountId_(account->getAccountID())
143 , userId_(account->getUsername())
144 , deviceId_(account->currentDeviceId())
145 {
146 if (!repository_) {
147 throw std::logic_error("Unable to create repository");
148 }
149 init(account);
150 }
151
152 Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
154 , accountId_(account->getAccountID())
155 , userId_(account->getUsername())
156 , deviceId_(account->currentDeviceId())
157 {
158 repository_ = std::make_unique<ConversationRepository>(account, conversationId);
159 if (!repository_) {
160 throw std::logic_error("Unable to create repository");
161 }
162 init(account);
163 }
164
165 Impl(const std::shared_ptr<JamiAccount>& account,
166 const std::string& remoteDevice,
167 const std::string& conversationId)
169 , accountId_(account->getAccountID())
170 , userId_(account->getUsername())
171 , deviceId_(account->currentDeviceId())
172 {
173 std::vector<ConversationCommit> commits;
176 conversationId,
177 [&](auto c) {
178 commits = std::move(c);
179 });
180 if (!repository_) {
182 accountId_, conversationId, EFETCH, "Unable to clone repository");
183 throw std::logic_error("Unable to clone repository");
184 }
185 // To detect current active calls, we need to check history
187 / "conversation_data" / conversationId;
189 initActiveCalls(repository_->convCommitsToMap(commits));
190 init(account);
191 }
192
193 void init(const std::shared_ptr<JamiAccount>& account) {
195 fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
197 = std::make_shared<SwarmManager>(NodeId(deviceId_),
198 Manager::instance().getSeededRandomEngine(),
199 [account = account_](const DeviceId& deviceId) {
200 if (auto acc = account.lock()) {
201 return acc->isConnectedWith(deviceId);
202 }
203 return false;
204 });
205 swarmManager_->setMobility(account->isMobile());
207 = std::make_shared<TransferManager>(accountId_,
208 "",
209 repository_->id(),
212 / "conversation_data" / repository_->id();
220 loadStatus();
221 typers_ = std::make_shared<Typers>(account, repository_->id());
222 }
223
224 std::string toString() const {
225 return fmt::format(FMT_COMPILE("[Account {}] [Conversation {}]"), accountId_, repository_->id());
226 }
227 mutable std::string fmtStr_;
228
230 {
231 try {
232 if (fallbackTimer_)
233 fallbackTimer_->cancel();
234 } catch (const std::exception& e) {
235 JAMI_ERROR("{:s} {:s}", toString(), e.what());
236 }
237 }
238
244 std::vector<std::string> commitsEndedCalls();
245 bool isAdmin() const;
246 std::filesystem::path repoPath() const;
247
248 void announce(const std::string& commitId, bool commitFromSelf = false)
249 {
250 std::vector<std::string> vec;
251 if (!commitId.empty())
252 vec.emplace_back(commitId);
254 }
255
256 void announce(const std::vector<std::string>& commits, bool commitFromSelf = false)
257 {
258 std::vector<ConversationCommit> convcommits;
259 convcommits.reserve(commits.size());
260 for (const auto& cid : commits) {
261 auto commit = repository_->getCommit(cid);
262 if (commit != std::nullopt) {
263 convcommits.emplace_back(*commit);
264 }
265 }
266 announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
267 }
268
273 void initActiveCalls(const std::vector<std::map<std::string, std::string>>& commits) const
274 {
275 std::unordered_set<std::string> invalidHostUris;
276 std::unordered_set<std::string> invalidCallIds;
277
278 std::lock_guard lk(activeCallsMtx_);
279 for (const auto& commit : commits) {
280 if (commit.at("type") == "member") {
281 // Each commit of type "member" has an "action" field whose value can be one
282 // of the following: "add", "join", "remove", "ban", "unban"
283 // In the case of "remove" and "ban", we need to add the member's URI to
284 // invalidHostUris to ensure that any call they may have started in the past
285 // is no longer considered active.
286 // For the other actions, there's no harm in adding the member's URI anyway,
287 // since it's not possible to start hosting a call before joining the swarm (or
288 // before getting unbanned in the case of previously banned members).
289 invalidHostUris.emplace(commit.at("uri"));
290 } else if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
291 && commit.find("device") != commit.end()) {
292 // The commit indicates either the end or the beginning of a call
293 // (depending on whether there is a "duration" field or not).
294 auto convId = repository_->id();
295 auto confId = commit.at("confId");
296 auto uri = commit.at("uri");
297 auto device = commit.at("device");
298
299 if (commit.find("duration") == commit.end()
300 && invalidCallIds.find(confId) == invalidCallIds.end()
301 && invalidHostUris.find(uri) == invalidHostUris.end()) {
302 std::map<std::string, std::string> activeCall;
303 activeCall["id"] = confId;
304 activeCall["uri"] = uri;
305 activeCall["device"] = device;
306 activeCalls_.emplace_back(activeCall);
307 fmt::print("swarm:{} new active call detected: {} (on device {}, account {})\n",
308 convId,
309 confId,
310 device,
311 uri);
312 }
313 // Even if the call was active, we still add its ID to invalidCallIds to make sure it
314 // doesn't get added a second time. (This shouldn't happen normally, but in practice
315 // there are sometimes multiple commits indicating the beginning of the same call.)
316 invalidCallIds.emplace(confId);
317 }
318 }
321 repository_->id(),
323 }
324
332 void updateActiveCalls(const std::map<std::string, std::string>& commit,
333 bool eraseOnly = false,
334 bool emitSig = true) const
335 {
336 if (!repository_)
337 return;
338 if (commit.at("type") == "member") {
339 // In this case, we need to check if we are not removing a hosting member or device
340 std::lock_guard lk(activeCallsMtx_);
341 auto it = activeCalls_.begin();
342 auto updateActives = false;
343 while (it != activeCalls_.end()) {
344 if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
345 JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left",
346 it->at("id"),
347 commit.at("uri"));
348 it = activeCalls_.erase(it);
349 updateActives = true;
350 } else {
351 ++it;
352 }
353 }
354 if (updateActives) {
356 if (emitSig)
358 repository_->id(),
360 }
361 return;
362 }
363 // Else, it's a call information
364 if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
365 && commit.find("device") != commit.end()) {
366 auto convId = repository_->id();
367 auto confId = commit.at("confId");
368 auto uri = commit.at("uri");
369 auto device = commit.at("device");
370 std::lock_guard lk(activeCallsMtx_);
371 auto itActive = std::find_if(activeCalls_.begin(),
372 activeCalls_.end(),
373 [&](const auto& value) {
374 return value.at("id") == confId
375 && value.at("uri") == uri
376 && value.at("device") == device;
377 });
378 if (commit.find("duration") == commit.end()) {
379 if (itActive == activeCalls_.end() && !eraseOnly) {
381 "swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
382 convId,
383 confId,
384 device,
385 uri);
386 std::map<std::string, std::string> activeCall;
387 activeCall["id"] = confId;
388 activeCall["uri"] = uri;
389 activeCall["device"] = device;
390 activeCalls_.emplace_back(activeCall);
392 if (emitSig)
395 ->id(),
397 }
398 } else {
399 if (itActive != activeCalls_.end()) {
401 // Unlikely, but we must ensure that no duplicate exists
402 while (itActive != activeCalls_.end()) {
403 itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
404 return value.at("id") == confId && value.at("uri") == uri
405 && value.at("device") == device;
406 });
407 if (itActive != activeCalls_.end()) {
408 JAMI_ERROR("Duplicate call found. (This is a bug)");
410 }
411 }
412
413 if (eraseOnly) {
414 JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
415 "{:s}, account {:s}",
416 convId,
417 confId,
418 device,
419 uri);
420 } else {
421 JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
422 convId,
423 confId,
424 device,
425 uri);
426 }
427 }
429 if (emitSig)
431 repository_->id(),
433 }
434 }
435 }
436
437 void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false)
438 {
439 if (!repository_)
440 return;
441 auto convId = repository_->id();
442 auto ok = !commits.empty();
443 auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
445 if (ok) {
446 bool announceMember = false;
447 for (const auto& c : commits) {
448 // Announce member events
449 if (c.at("type") == "member") {
450 if (c.find("uri") != c.end() && c.find("action") != c.end()) {
451 const auto& uri = c.at("uri");
452 const auto& actionStr = c.at("action");
453 auto action = -1;
454 if (actionStr == "add")
455 action = 0;
456 else if (actionStr == "join")
457 action = 1;
458 else if (actionStr == "remove")
459 action = 2;
460 else if (actionStr == "ban")
461 action = 3;
462 else if (actionStr == "unban")
463 action = 4;
464 if (actionStr == "ban" || actionStr == "remove") {
465 // In this case, a potential host was removed during a call.
467 typers_->removeTyper(uri);
468 }
469 if (action != -1) {
470 announceMember = true;
472 accountId_, convId, uri, action);
473 }
474 }
475 } else if (c.at("type") == "application/call-history+json") {
477 }
478#ifdef ENABLE_PLUGIN
480 = Manager::instance().getJamiPluginManager().getChatServicesManager();
481 if (pluginChatManager.hasHandlers()) {
482 auto cm = std::make_shared<JamiMessage>(accountId_,
483 convId,
484 c.at("author") != userId_,
485 c,
486 false);
487 cm->isSwarm = true;
488 pluginChatManager.publishMessage(std::move(cm));
489 }
490#endif
491 // announce message
493 }
494
496 onMembersChanged_(repository_->memberUris("", {}));
497 }
498 }
499 }
500
502 {
503 try {
504 // read file
506 // load values
507 msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
508 std::lock_guard lk {messageStatusMtx_};
509 oh.get().convert(messagesStatus_);
510 } catch (const std::exception& e) {
511 }
512 }
514 {
515 std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
516 msgpack::pack(file, messagesStatus_);
517 }
518
519 void loadActiveCalls() const
520 {
521 try {
522 // read file
524 // load values
525 msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
526 std::lock_guard lk {activeCallsMtx_};
527 oh.get().convert(activeCalls_);
528 } catch (const std::exception& e) {
529 return;
530 }
531 }
532
533 void saveActiveCalls() const
534 {
535 std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
536 msgpack::pack(file, activeCalls_);
537 }
538
539 void loadHostedCalls() const
540 {
541 try {
542 // read file
544 // load values
545 msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
546 std::lock_guard lk {activeCallsMtx_};
547 oh.get().convert(hostedCalls_);
548 } catch (const std::exception& e) {
549 return;
550 }
551 }
552
553 void saveHostedCalls() const
554 {
555 std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
556 msgpack::pack(file, hostedCalls_);
557 }
558
559 void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
560
561 std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
562 bool includeLeft,
563 bool includeBanned) const;
564
565 std::string_view bannedType(const std::string& uri) const
566 {
567 auto repo = repoPath();
568 auto crt = fmt::format("{}.crt", uri);
569 auto bannedMember = repo / "banned" / "members" / crt;
570 if (std::filesystem::is_regular_file(bannedMember))
571 return "members"sv;
572 auto bannedAdmin = repo / "banned" / "admins" / crt;
573 if (std::filesystem::is_regular_file(bannedAdmin))
574 return "admins"sv;
575 auto bannedInvited = repo / "banned" / "invited" / uri;
576 if (std::filesystem::is_regular_file(bannedInvited))
577 return "invited"sv;
578 auto bannedDevice = repo / "banned" / "devices" / crt;
579 if (std::filesystem::is_regular_file(bannedDevice))
580 return "devices"sv;
581 return {};
582 }
583
584 std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
585 {
586 auto deviceSockets = gitSocketList_.find(deviceId);
587 return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
588 }
589
590 void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
591 {
592 gitSocketList_[deviceId] = socket;
593 }
594 void removeGitSocket(const DeviceId& deviceId)
595 {
596 auto deviceSockets = gitSocketList_.find(deviceId);
597 if (deviceSockets != gitSocketList_.end())
599 }
600
606 void disconnectFromPeer(const std::string& peerUri);
607
608 std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
609 bool includeLeft) const;
610
611 std::mutex membersMtx_ {};
612 std::set<std::string> checkedMembers_; // Store members we tried
613 std::function<void()> bootstrapCb_;
614#ifdef LIBJAMI_TEST
615 std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
616#endif
617
618 std::mutex writeMtx_ {};
619 std::unique_ptr<ConversationRepository> repository_;
620 std::shared_ptr<SwarmManager> swarmManager_;
621 std::weak_ptr<JamiAccount> account_;
622 std::string accountId_ {};
623 std::string userId_;
624 std::string deviceId_;
625 std::atomic_bool isRemoving_ {false};
626 std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
627 std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options,
628 History* optHistory = nullptr);
629 void pull(const std::string& deviceId);
630 std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
631
632 // Avoid multiple fetch/merges at the same time.
633 std::mutex pullcbsMtx_ {};
634 std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {}; // store current remote in fetch
635 std::shared_ptr<TransferManager> transferManager_ {};
636 std::filesystem::path conversationDataPath_ {};
637 std::filesystem::path fetchedPath_ {};
638
639 // Manage last message displayed and status
640 std::filesystem::path sendingPath_ {};
641 std::filesystem::path preferencesPath_ {};
643
644 // Manage hosted calls on this device
645 std::filesystem::path hostedCallsPath_ {};
646 mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
647 // Manage active calls for this conversation (can be hosted by other devices)
648 std::filesystem::path activeCallsPath_ {};
649 mutable std::mutex activeCallsMtx_ {};
650 mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
651
653
654 // Bootstrap
655 std::shared_ptr<asio::io_context> ioContext_;
656 std::unique_ptr<asio::steady_timer> fallbackTimer_;
657
658
663 std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
665 const std::vector<std::map<std::string, std::string>>& commits,
666 bool messageReceived = false,
667 bool commitFromSelf = false);
668
670 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
672 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
673 bool messageReceived) const;
675 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
676 bool messageReceived) const;
677 void rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
678 History& history) const;
688 mutable std::mutex messageStatusMtx_;
689 std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
690 std::filesystem::path statusPath_ {};
691 mutable std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
696 // Note: only store int32_t cause it's easy to pass to dbus this way
697 // memberToStatus serves as a cache for loading messages
698 mutable std::map<std::string, int32_t> memberToStatus;
699
700
701 // futureStatus is used to store the status for receiving messages
702 // (because we're not sure to fetch the commit before receiving a status change for this)
703 mutable std::map<std::string, std::map<std::string, int32_t>> futureStatus;
704 // Update internal structures regarding status
705 void updateStatus(const std::string& uri,
707 const std::string& commitId,
708 const std::string& ts,
709 bool emit = false);
710
711
712 std::shared_ptr<Typers> typers_;
713};
714
715bool
717{
718 auto adminsPath = repoPath() / "admins";
719 return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, userId_ + ".crt"));
720}
721
722void
723Conversation::Impl::disconnectFromPeer(const std::string& peerUri)
724{
725 // Remove nodes from swarmManager
726 const auto nodes = swarmManager_->getRoutingTable().getAllNodes();
727 std::vector<NodeId> toRemove;
728 for (const auto node : nodes)
729 if (peerUri == repository_->uriFromDevice(node.toString()))
730 toRemove.emplace_back(node);
731 swarmManager_->deleteNode(toRemove);
732
733 // Remove git sockets with this member
734 for (auto it = gitSocketList_.begin(); it != gitSocketList_.end();) {
735 if (peerUri == repository_->uriFromDevice(it->first.toString()))
736 it = gitSocketList_.erase(it);
737 else
738 ++it;
739 }
740}
741
742std::vector<std::map<std::string, std::string>>
744{
745 std::vector<std::map<std::string, std::string>> result;
746 auto members = repository_->members();
747 std::lock_guard lk(messageStatusMtx_);
748 for (const auto& member : members) {
749 if (member.role == MemberRole::BANNED && !includeBanned) {
750 continue;
751 }
753 continue;
754 if (member.role == MemberRole::LEFT && !includeLeft)
755 continue;
756 auto mm = member.map();
757 mm[ConversationMapKeys::LAST_DISPLAYED] = messagesStatus_[member.uri]["read"];
758 result.emplace_back(std::move(mm));
759 }
760 return result;
761}
762
763std::vector<std::string>
765{
766 // Handle current calls
767 std::vector<std::string> commits {};
768 std::unique_lock lk(writeMtx_);
769 std::unique_lock lkA(activeCallsMtx_);
770 for (const auto& hostedCall : hostedCalls_) {
771 // In this case, this means that we left
772 // the conference while still hosting it, so activeCalls
773 // will not be correctly updated
774 // We don't need to send notifications there, as peers will sync with presence
775 Json::Value value;
776 value["uri"] = userId_;
777 value["device"] = deviceId_;
778 value["confId"] = hostedCall.first;
779 value["type"] = "application/call-history+json";
780 auto now = std::chrono::system_clock::now();
781 auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
782 .count();
783 value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
784 auto itActive = std::find_if(activeCalls_.begin(),
785 activeCalls_.end(),
786 [this, confId = hostedCall.first](const auto& value) {
787 return value.at("id") == confId && value.at("uri") == userId_
788 && value.at("device") == deviceId_;
789 });
790 if (itActive != activeCalls_.end())
791 activeCalls_.erase(itActive);
792 commits.emplace_back(repository_->commitMessage(json::toString(value)));
793
794 JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
795 }
796 hostedCalls_.clear();
797 saveActiveCalls();
798 saveHostedCalls();
799 return commits;
800}
801
802std::filesystem::path
804{
805 return fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id();
806}
807
808std::vector<std::map<std::string, std::string>>
810{
811 if (!repository_)
812 return {};
813 std::vector<ConversationCommit> commits;
814 auto startLogging = options.from == "";
815 auto breakLogging = false;
816 repository_->log(
817 [&](const auto& id, const auto& author, const auto& commit) {
818 if (!commits.empty()) {
819 // Set linearized parent
820 commits.rbegin()->linearized_parent = id;
821 }
822 if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
823 return CallbackResult::Skip;
824 }
825 if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
826 return CallbackResult::Break; // Stop logging
827 if (breakLogging)
828 return CallbackResult::Break; // Stop logging
829 if (id == options.to) {
830 if (options.includeTo)
831 breakLogging = true; // For the next commit
832 else
833 return CallbackResult::Break; // Stop logging
834 }
835
836 if (!startLogging && options.from != "" && options.from == id)
837 startLogging = true;
838 if (!startLogging)
839 return CallbackResult::Skip; // Start logging after this one
840
841 if (options.fastLog) {
842 if (options.authorUri != "") {
843 if (options.authorUri == repository_->uriFromDevice(author.email)) {
844 return CallbackResult::Break; // Found author, stop
845 }
846 }
847 // Used to only count commit
848 commits.emplace(commits.end(), ConversationCommit {});
850 }
851
852 return CallbackResult::Ok; // Continue
853 },
854 [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
855 [](auto, auto, auto) { return false; },
856 options.from,
857 options.logIfNotFound);
858 return repository_->convCommitsToMap(commits);
859}
860
861std::vector<libjami::SwarmMessage>
863{
864 auto history = optHistory ? optHistory : &loadedHistory_;
865
866 // history->mutex is locked by the caller
867 if (!repository_ || history->loading) {
868 return {};
869 }
870 history->loading = true;
871
872 // By convention, if options.nbOfCommits is zero, then we
873 // don't impose a limit on the number of commits returned.
874 bool limitNbOfCommits = options.nbOfCommits > 0;
875
876 auto startLogging = options.from == "";
877 auto breakLogging = false;
878 auto currentHistorySize = loadedHistory_.messageList.size();
879 std::vector<std::string> replies;
880 std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
881 repository_->log(
882 /* preCondition */
883 [&](const auto& id, const auto& author, const auto& commit) {
884 if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
885 return CallbackResult::Skip;
886 }
887 if (id == options.to) {
888 if (options.includeTo)
889 breakLogging = true; // For the next commit
890 }
891 if (replies.empty()) { // This avoid load until
892 // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
893 // until this commit
894 if ((limitNbOfCommits
895 && (loadedHistory_.messageList.size() - currentHistorySize)
896 == options.nbOfCommits))
897 return CallbackResult::Break; // Stop logging
898 if (breakLogging)
899 return CallbackResult::Break; // Stop logging
900 if (id == options.to && !options.includeTo) {
901 return CallbackResult::Break; // Stop logging
902 }
903 }
904
905 if (!startLogging && options.from != "" && options.from == id)
906 startLogging = true;
907 if (!startLogging)
908 return CallbackResult::Skip; // Start logging after this one
909
910 if (options.fastLog) {
911 if (options.authorUri != "") {
912 if (options.authorUri == repository_->uriFromDevice(author.email)) {
913 return CallbackResult::Break; // Found author, stop
914 }
915 }
916 }
917
918 return CallbackResult::Ok; // Continue
919 },
920 /* emplaceCb */
921 [&](auto&& cc) {
922 if(limitNbOfCommits && (msgList.size() == options.nbOfCommits))
923 return;
924 auto optMessage = repository_->convCommitToMap(cc);
925 if (!optMessage.has_value())
926 return;
927 auto message = optMessage.value();
928 if (message.find("reply-to") != message.end()) {
929 auto it = std::find(replies.begin(), replies.end(), message.at("reply-to"));
930 if(it == replies.end()) {
931 replies.emplace_back(message.at("reply-to"));
932 }
933 }
934 auto it = std::find(replies.begin(), replies.end(), message.at("id"));
935 if (it != replies.end()) {
936 replies.erase(it);
937 }
938 std::shared_ptr<libjami::SwarmMessage> firstMsg;
939 if ((history == &loadedHistory_) && msgList.empty() && !loadedHistory_.messageList.empty()) {
940 firstMsg = *loadedHistory_.messageList.rbegin();
941 }
942 auto added = addToHistory(*history, {message}, false, false);
943 if (!added.empty() && firstMsg) {
945 repository_->id(),
946 *firstMsg);
947 }
948 msgList.insert(msgList.end(), added.begin(), added.end());
949 },
950 /* postCondition */
951 [&](auto, auto, auto) {
952 // Stop logging if there was a limit set on the number of commits
953 // to return and we reached it. This isn't strictly necessary since
954 // the check at the beginning of `emplaceCb` ensures that we won't
955 // return too many messages, but it prevents us from needlessly
956 // iterating over a (potentially) large number of commits.
957 return limitNbOfCommits && (msgList.size() == options.nbOfCommits);
958 },
959 options.from,
960 options.logIfNotFound);
961
962 history->loading = false;
963 history->cv.notify_all();
964
965 // Convert for client (remove ptr)
966 std::vector<libjami::SwarmMessage> ret;
967 ret.reserve(msgList.size());
968 for (const auto& msg: msgList) {
969 ret.emplace_back(*msg);
970 }
971 return ret;
972}
973
974void
976 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
977{
978 auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
979 auto peditIt = history.pendingEditions.find(sharedCommit->id);
980 if (peditIt != history.pendingEditions.end()) {
981 auto oldBody = sharedCommit->body;
982 sharedCommit->body["body"] = peditIt->second.front()->body["body"];
983 if (sharedCommit->body.at("body").empty())
984 return;
985 history.pendingEditions.erase(peditIt);
986 }
987 if (it != history.quickAccess.end()) {
988 it->second->reactions.emplace_back(sharedCommit->body);
990 repository_->id(),
991 it->second->id,
992 sharedCommit->body);
993 } else {
994 history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
995 }
996}
997
998void
1000 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1001 bool messageReceived) const
1002{
1003 auto editId = sharedCommit->body.at("edit");
1004 auto it = history.quickAccess.find(editId);
1005 if (it != history.quickAccess.end()) {
1006 auto baseCommit = it->second;
1007 if (baseCommit) {
1008 auto itReact = baseCommit->body.find("react-to");
1009 std::string toReplace = (baseCommit->type == "application/data-transfer+json") ?
1010 "tid" : "body";
1011 auto body = sharedCommit->body.at(toReplace);
1012 // Edit reaction
1013 if (itReact != baseCommit->body.end()) {
1014 baseCommit->body[toReplace] = body; // Replace body if pending
1015 it = history.quickAccess.find(itReact->second);
1016 auto itPending = history.pendingReactions.find(itReact->second);
1017 if (it != history.quickAccess.end()) {
1018 baseCommit = it->second; // Base commit
1019 auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
1020 baseCommit->reactions.end(),
1021 [&](const auto& reaction) {
1022 return reaction.at("id") == editId;
1023 });
1024 if (itPreviousReact != baseCommit->reactions.end()) {
1025 (*itPreviousReact)[toReplace] = body;
1026 if (body.empty()) {
1027 baseCommit->reactions.erase(itPreviousReact);
1029 repository_
1030 ->id(),
1031 baseCommit->id,
1032 editId);
1033 }
1034 }
1035 } else if (itPending != history.pendingReactions.end()) {
1036 // Else edit if pending
1037 auto itReaction = std::find_if(itPending->second.begin(),
1038 itPending->second.end(),
1039 [&](const auto& reaction) {
1040 return reaction.at("id") == editId;
1041 });
1042 if (itReaction != itPending->second.end()) {
1043 (*itReaction)[toReplace] = body;
1044 if (body.empty())
1045 itPending->second.erase(itReaction);
1046 }
1047 } else {
1048 // Add to pending edtions
1049 messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1050 : history.pendingEditions[editId].emplace_back(sharedCommit);
1051 }
1052 } else {
1053 // Normal message
1054 it->second->editions.emplace(it->second->editions.begin(), it->second->body);
1055 it->second->body[toReplace] = sharedCommit->body[toReplace];
1056 if (toReplace == "tid") {
1057 // Avoid to replace fileId in client
1058 it->second->body["fileId"] = "";
1059 }
1060 // Remove reactions
1061 if (sharedCommit->body.at(toReplace).empty())
1062 it->second->reactions.clear();
1063 emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
1064 }
1065 }
1066 } else {
1067 messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
1068 : history.pendingEditions[editId].emplace_back(sharedCommit);
1069 }
1070}
1071
1072bool
1074 const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
1075 bool messageReceived) const
1076{
1077 if (messageReceived) {
1078 // For a received message, we place it at the beginning of the list
1079 if (!history.messageList.empty())
1080 sharedCommit->linearizedParent = (*history.messageList.begin())->id;
1081 history.messageList.emplace_front(sharedCommit);
1082 } else {
1083 // For a loaded message, we load from newest to oldest
1084 // So we change the parent of the last message.
1085 if (!history.messageList.empty())
1086 (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
1087 history.messageList.emplace_back(sharedCommit);
1088 }
1089 // Handle pending reactions/editions
1090 auto reactIt = history.pendingReactions.find(sharedCommit->id);
1091 if (reactIt != history.pendingReactions.end()) {
1092 for (const auto& commitBody : reactIt->second)
1093 sharedCommit->reactions.emplace_back(commitBody);
1094 history.pendingReactions.erase(reactIt);
1095 }
1096 auto peditIt = history.pendingEditions.find(sharedCommit->id);
1097 if (peditIt != history.pendingEditions.end()) {
1098 auto oldBody = sharedCommit->body;
1099 if (sharedCommit->type == "application/data-transfer+json") {
1100 sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
1101 sharedCommit->body["fileId"] = "";
1102 } else {
1103 sharedCommit->body["body"] = peditIt->second.front()->body["body"];
1104 }
1105 peditIt->second.pop_front();
1106 for (const auto& commit : peditIt->second) {
1107 sharedCommit->editions.emplace_back(commit->body);
1108 }
1109 sharedCommit->editions.emplace_back(oldBody);
1110 history.pendingEditions.erase(peditIt);
1111 }
1112 // Announce to client
1113 if (messageReceived)
1115 repository_->id(),
1116 *sharedCommit);
1117 return !messageReceived;
1118}
1119
1120void Conversation::Impl::rectifyStatus(const std::shared_ptr<libjami::SwarmMessage>& message,
1121 History& history) const
1122{
1123
1124 auto parentIt = history.quickAccess.find(message->linearizedParent);
1125 auto currentMessage = message;
1126
1127 while(parentIt != history.quickAccess.end()){
1128 const auto& parent = parentIt->second;
1129 for (const auto& [peer, value] : message->status) {
1130 auto parentStatusIt = parent->status.find(peer);
1131 if (parentStatusIt == parent->status.end() || parentStatusIt->second < value) {
1132 parent->status[peer] = value;
1134 accountId_,
1135 repository_->id(),
1136 peer,
1137 parent->id,
1138 value);
1139 }
1140 else if(parentStatusIt->second >= value){
1141 break;
1142 }
1143 }
1145 parentIt = history.quickAccess.find(parent->linearizedParent);
1146 }
1147}
1148
1149std::vector<std::shared_ptr<libjami::SwarmMessage>>
1151 const std::vector<std::map<std::string, std::string>>& commits,
1152 bool messageReceived,
1153 bool commitFromSelf)
1154{
1155 //
1156 // NOTE: This function makes the following assumptions on its arguments:
1157 // - The messages in "history" are in reverse chronological order (newest message
1158 // first, oldest message last).
1159 // - If messageReceived is true, then the commits in "commits" are assumed to be in
1160 // chronological order (oldest to newest) and to be newer than the ones in "history".
1161 // They are therefore inserted at the beginning of the message list.
1162 // - If messageReceived is false, then the commits in "commits" are assumed to be in
1163 // reverse chronological order (newest to oldest) and to be older than the ones in
1164 // "history". They are therefore appended at the end of the message list.
1165 //
1166 auto acc = account_.lock();
1167 if (!acc)
1168 return {};
1169 auto username = acc->getUsername();
1170 if (messageReceived && (&history == &loadedHistory_ && history.loading)) {
1171 std::unique_lock lk(history.mutex);
1172 history.cv.wait(lk, [&] { return !history.loading; });
1173 }
1174
1175 // Only set messages' status on history for client
1176 bool needToSetMessageStatus = !commitFromSelf && &history == &loadedHistory_;
1177
1178 std::vector<std::shared_ptr<libjami::SwarmMessage>> sharedCommits;
1179 for (const auto& commit : commits) {
1180 auto commitId = commit.at("id");
1181 if (history.quickAccess.find(commitId) != history.quickAccess.end())
1182 continue; // Already present
1183 auto typeIt = commit.find("type");
1184 // Nothing to show for the client, skip
1185 if (typeIt != commit.end() && typeIt->second == "merge")
1186 continue;
1187
1188 auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1189 sharedCommit->fromMapStringString(commit);
1190
1192 // Check if we already have status information for the commit.
1193 auto itFuture = futureStatus.find(sharedCommit->id);
1194 if (itFuture != futureStatus.end()) {
1195 sharedCommit->status = std::move(itFuture->second);
1196 futureStatus.erase(itFuture);
1197 }
1198 }
1199
1200 sharedCommits.emplace_back(sharedCommit);
1201 }
1202
1204 constexpr int32_t SENDING = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
1205 constexpr int32_t SENT = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
1206 constexpr int32_t DISPLAYED = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
1207
1208 std::lock_guard lk(messageStatusMtx_);
1209 for (const auto& member: repository_->members()) {
1210 // For each member, we iterate over the commits to add in reverse chronological
1211 // order (i.e. from newest to oldest) and set their status from the point of view
1212 // of that member (as best we can given the information we have).
1213 //
1214 // The key assumption made in order to compute the status is that it can never decrease
1215 // (with respect to the ordering SENDING < SENT < DISPLAYED) as we go back in time. We
1216 // therefore start by setting the "status" variable below to the lowest possible value,
1217 // and increase it when we encounter a commit for which it is justified to do so.
1218 //
1219 // If messageReceived is true, then the commits we are adding are the most recent in the
1220 // conversation history, so the lowest possible value is SENDING.
1221 //
1222 // If messageReceived is false, then the commits we are adding are older than the ones
1223 // that are already in the history, so the lowest possible value is the status of the
1224 // oldest message in the history so far, which is stored in memberToStatus.
1225 auto status = SENDING;
1226 if (!messageReceived) {
1227 auto cache = memberToStatus[member.uri];
1228 if (cache > status)
1229 status = cache;
1230 }
1231
1232 for (auto it = sharedCommits.rbegin(); it != sharedCommits.rend(); it++) {
1233 auto sharedCommit = *it;
1234 auto previousStatus = status;
1235
1236 // Compute status for the current commit.
1237 if (status < SENT && messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
1238 status = SENT;
1239 }
1240 if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
1241 status = DISPLAYED;
1242 }
1243 if (member.uri == sharedCommit->body.at("author")) {
1244 status = DISPLAYED;
1245 }
1246 if(status < sharedCommit->status[member.uri]){
1247 status = sharedCommit->status[member.uri];
1248 }
1249
1250 // Store computed value.
1251 sharedCommit->status[member.uri] = status;
1252
1253 // Update messagesStatus_ if needed.
1254 if (previousStatus == SENDING && status >= SENT) {
1255 messagesStatus_[member.uri]["fetched"] = sharedCommit->id;
1256 }
1257 if (previousStatus <= SENT && status == DISPLAYED) {
1258 messagesStatus_[member.uri]["read"] = sharedCommit->id;
1259 }
1260 }
1261
1262 if (!messageReceived) {
1263 // Update memberToStatus with the status of the last (i.e. oldest) added commit.
1264 memberToStatus[member.uri] = status;
1265 }
1266 }
1267 }
1268
1269 std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
1270 for (const auto& sharedCommit : sharedCommits) {
1271 history.quickAccess[sharedCommit->id] = sharedCommit;
1272
1273 auto reactToIt = sharedCommit->body.find("react-to");
1274 auto editIt = sharedCommit->body.find("edit");
1275 if (reactToIt != sharedCommit->body.end() && !reactToIt->second.empty()) {
1276 handleReaction(history, sharedCommit);
1277 } else if (editIt != sharedCommit->body.end() && !editIt->second.empty()) {
1278 handleEdition(history, sharedCommit, messageReceived);
1279 } else if (handleMessage(history, sharedCommit, messageReceived)) {
1280 messages.emplace_back(sharedCommit);
1281 }
1282 rectifyStatus(sharedCommit, history);
1283 }
1284
1285 return messages;
1286}
1287
1288Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1290 const std::string& otherMember)
1291 : pimpl_ {new Impl {account, mode, otherMember}}
1292{}
1293
1294Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1295 const std::string& conversationId)
1296 : pimpl_ {new Impl {account, conversationId}}
1297{}
1298
1299Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
1300 const std::string& remoteDevice,
1301 const std::string& conversationId)
1302 : pimpl_ {new Impl {account, remoteDevice, conversationId}}
1303{}
1304
1306
1307std::string
1309{
1310 return pimpl_->repository_ ? pimpl_->repository_->id() : "";
1311}
1312
1313void
1315{
1316 try {
1318 // Only authorize to add left members
1320 auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
1321 if (it == initialMembers.end()) {
1322 JAMI_WARN("Unable to add new member in one to one conversation");
1323 cb(false, "");
1324 return;
1325 }
1326 }
1327 } catch (const std::exception& e) {
1328 JAMI_WARN("Unable to get mode: %s", e.what());
1329 cb(false, "");
1330 return;
1331 }
1332 if (isMember(contactUri, true)) {
1333 JAMI_WARN("Unable to add member %s because it's already a member", contactUri.c_str());
1334 cb(false, "");
1335 return;
1336 }
1337 if (isBanned(contactUri)) {
1338 if (pimpl_->isAdmin()) {
1339 dht::ThreadPool::io().run(
1340 [w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1341 if (auto sthis = w.lock()) {
1342 auto members = sthis->pimpl_->repository_->members();
1343 auto type = sthis->pimpl_->bannedType(contactUri);
1344 if (type.empty()) {
1345 cb(false, {});
1346 return;
1347 }
1348 sthis->pimpl_->voteUnban(contactUri, type, cb);
1349 }
1350 });
1351 } else {
1352 JAMI_WARN("Unable to add member %s because this member is blocked", contactUri.c_str());
1353 cb(false, "");
1354 }
1355 return;
1356 }
1357
1358 dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
1359 if (auto sthis = w.lock()) {
1360 // Add member files and commit
1361 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1362 auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1363 sthis->pimpl_->announce(commit, true);
1364 lk.unlock();
1365 if (cb)
1366 cb(!commit.empty(), commit);
1367 }
1368 });
1369}
1370
1371std::shared_ptr<dhtnet::ChannelSocket>
1372Conversation::gitSocket(const DeviceId& deviceId) const
1373{
1374 return pimpl_->gitSocket(deviceId);
1375}
1376
1377void
1379 const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1380{
1381 pimpl_->addGitSocket(deviceId, socket);
1382}
1383
1384void
1386{
1387 pimpl_->removeGitSocket(deviceId);
1388}
1389
1390void
1392{
1393 pimpl_->fallbackTimer_->cancel();
1394 pimpl_->gitSocketList_.clear();
1395 if (pimpl_->swarmManager_)
1396 pimpl_->swarmManager_->shutdown();
1397 std::lock_guard lk(pimpl_->membersMtx_);
1398 pimpl_->checkedMembers_.clear();
1399}
1400
1401void
1403{
1404 if (pimpl_->swarmManager_)
1405 pimpl_->swarmManager_->maintainBuckets();
1406}
1407
1408std::vector<jami::DeviceId>
1410{
1411 return pimpl_->swarmManager_->getRoutingTable().getAllNodes();
1412}
1413
1414std::shared_ptr<Typers>
1416{
1417 return pimpl_->typers_;
1418}
1419
1420bool
1421Conversation::hasSwarmChannel(const std::string& deviceId)
1422{
1423 if (!pimpl_->swarmManager_)
1424 return false;
1425 return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
1426}
1427
1428void
1430 const std::string_view type,
1431 const OnDoneCb& cb)
1432{
1433 // Check if admin
1434 if (!isAdmin()) {
1435 JAMI_WARN("You're not an admin of this repo. Unable to unblock %s", contactUri.c_str());
1436 cb(false, {});
1437 return;
1438 }
1439
1440 // Vote for removal
1441 std::unique_lock lk(writeMtx_);
1442 auto voteCommit = repository_->voteUnban(contactUri, type);
1443 if (voteCommit.empty()) {
1444 JAMI_WARN("Unbanning %s failed", contactUri.c_str());
1445 cb(false, "");
1446 return;
1447 }
1448
1449 auto lastId = voteCommit;
1450 std::vector<std::string> commits;
1451 commits.emplace_back(voteCommit);
1452
1453 // If admin, check vote
1454 auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
1455 if (!resolveCommit.empty()) {
1456 commits.emplace_back(resolveCommit);
1457 lastId = resolveCommit;
1458 JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
1459 }
1460 announce(commits, true);
1461 lk.unlock();
1462 if (cb)
1463 cb(!lastId.empty(), lastId);
1464}
1465
1466void
1468{
1469 dht::ThreadPool::io().run([w = weak(),
1470 contactUri = std::move(contactUri),
1471 isDevice = std::move(isDevice),
1472 cb = std::move(cb)] {
1473 if (auto sthis = w.lock()) {
1474 // Check if admin
1475 if (!sthis->pimpl_->isAdmin()) {
1476 JAMI_WARN("You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1477 cb(false, {});
1478 return;
1479 }
1480
1481 // Get current user type
1482 std::string type;
1483 if (isDevice) {
1484 type = "devices";
1485 } else {
1486 auto members = sthis->pimpl_->repository_->members();
1487 for (const auto& member : members) {
1488 if (member.uri == contactUri) {
1489 if (member.role == MemberRole::INVITED) {
1490 type = "invited";
1491 } else if (member.role == MemberRole::ADMIN) {
1492 type = "admins";
1493 } else if (member.role == MemberRole::MEMBER) {
1494 type = "members";
1495 }
1496 break;
1497 }
1498 }
1499 if (type.empty()) {
1500 cb(false, {});
1501 return;
1502 }
1503 }
1504
1505 // Vote for removal
1506 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1507 auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
1508 if (voteCommit.empty()) {
1509 JAMI_WARN("Kicking %s failed", contactUri.c_str());
1510 cb(false, "");
1511 return;
1512 }
1513
1514 auto lastId = voteCommit;
1515 std::vector<std::string> commits;
1516 commits.emplace_back(voteCommit);
1517
1518 // If admin, check vote
1519 auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
1520 if (!resolveCommit.empty()) {
1521 commits.emplace_back(resolveCommit);
1522 lastId = resolveCommit;
1523 JAMI_WARN("Vote solved for %s. %s banned",
1524 contactUri.c_str(),
1525 isDevice ? "Device" : "Member");
1526 sthis->pimpl_->disconnectFromPeer(contactUri);
1527 }
1528
1529 sthis->pimpl_->announce(commits, true);
1530 lk.unlock();
1531 cb(!lastId.empty(), lastId);
1532 }
1533 });
1534}
1535
1536std::vector<std::map<std::string, std::string>>
1538{
1539 return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
1540}
1541
1542std::set<std::string>
1543Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
1544{
1545 return pimpl_->repository_->memberUris(filter, filteredRoles);
1546}
1547
1548std::vector<NodeId>
1550{
1551 const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1552 const auto& nodes = routingTable.getNodes();
1553 const auto& mobiles = routingTable.getMobileNodes();
1554 std::vector<NodeId> s;
1555 s.reserve(nodes.size() + mobiles.size());
1556 s.insert(s.end(), nodes.begin(), nodes.end());
1557 s.insert(s.end(), mobiles.begin(), mobiles.end());
1558 for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
1559 if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1560 s.emplace_back(deviceId);
1561 return s;
1562}
1563
1564bool
1566{
1567 const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
1568 return !routingTable.getNodes().empty();
1569}
1570
1571std::string
1572Conversation::uriFromDevice(const std::string& deviceId) const
1573{
1574 return pimpl_->repository_->uriFromDevice(deviceId);
1575}
1576
1577void
1579{
1580 pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1581}
1582
1583std::string
1585{
1586 return pimpl_->repository_->join();
1587}
1588
1589bool
1590Conversation::isMember(const std::string& uri, bool includeInvited) const
1591{
1592 auto repoPath = pimpl_->repoPath();
1593 auto invitedPath = repoPath / "invited";
1594 auto adminsPath = repoPath / "admins";
1595 auto membersPath = repoPath / "members";
1596 std::vector<std::filesystem::path> pathsToCheck = {adminsPath, membersPath};
1597 if (includeInvited)
1598 pathsToCheck.emplace_back(invitedPath);
1599 for (const auto& path : pathsToCheck) {
1600 for (const auto& certificate : dhtnet::fileutils::readDirectory(path)) {
1601 std::string_view crtUri = certificate;
1602 auto crtIt = crtUri.find(".crt");
1603 if (path != invitedPath && crtIt == std::string_view::npos) {
1604 JAMI_WARNING("Incorrect file found: {}/{}", path, certificate);
1605 continue;
1606 }
1607 if (crtIt != std::string_view::npos)
1608 crtUri = crtUri.substr(0, crtIt);
1609 if (crtUri == uri)
1610 return true;
1611 }
1612 }
1613
1615 for (const auto& member : getInitialMembers()) {
1616 if (member == uri)
1617 return true;
1618 }
1619 }
1620
1621 return false;
1622}
1623
1624bool
1625Conversation::isBanned(const std::string& uri) const
1626{
1627 return !pimpl_->bannedType(uri).empty();
1628}
1629
1630void
1631Conversation::sendMessage(std::string&& message,
1632 const std::string& type,
1633 const std::string& replyTo,
1635 OnDoneCb&& cb)
1636{
1637 Json::Value json;
1638 json["body"] = std::move(message);
1639 json["type"] = type;
1640 sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
1641}
1642
1643void
1645 const std::string& replyTo,
1647 OnDoneCb&& cb)
1648{
1649 if (!replyTo.empty()) {
1650 auto commit = pimpl_->repository_->getCommit(replyTo);
1651 if (commit == std::nullopt) {
1652 JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
1653 return;
1654 }
1655 value["reply-to"] = replyTo;
1656 }
1657 dht::ThreadPool::io().run(
1658 [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
1659 if (auto sthis = w.lock()) {
1660 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1661 auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1662 lk.unlock();
1663 if (onCommit)
1664 onCommit(commit);
1665 sthis->pimpl_->announce(commit, true);
1666 if (cb)
1667 cb(!commit.empty(), commit);
1668 }
1669 });
1670}
1671
1672void
1674{
1675 dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
1676 if (auto sthis = w.lock()) {
1677 std::vector<std::string> commits;
1678 commits.reserve(messages.size());
1679 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1680 for (const auto& message : messages) {
1681 auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(message));
1682 commits.emplace_back(std::move(commit));
1683 }
1684 lk.unlock();
1685 sthis->pimpl_->announce(commits, true);
1686 if (cb)
1687 cb(commits);
1688 }
1689 });
1690}
1691
1692std::optional<std::map<std::string, std::string>>
1693Conversation::getCommit(const std::string& commitId) const
1694{
1695 auto commit = pimpl_->repository_->getCommit(commitId);
1696 if (commit == std::nullopt)
1697 return std::nullopt;
1698 return pimpl_->repository_->convCommitToMap(*commit);
1699}
1700
1701void
1703{
1704 if (!cb)
1705 return;
1706 dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1707 if (auto sthis = w.lock()) {
1708 cb(sthis->pimpl_->loadMessages(options));
1709 }
1710 });
1711}
1712
1713void
1715{
1716 if (!cb)
1717 return;
1718 dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
1719 if (auto sthis = w.lock()) {
1720 std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1721 auto result = sthis->pimpl_->loadMessages2(options);
1722 lk.unlock();
1723 cb(std::move(result));
1724 }
1725 });
1726}
1727
1728void
1730{
1731 std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1732 pimpl_->loadedHistory_.messageList.clear();
1733 pimpl_->loadedHistory_.quickAccess.clear();
1734 pimpl_->loadedHistory_.pendingEditions.clear();
1735 pimpl_->loadedHistory_.pendingReactions.clear();
1736 {
1737 std::lock_guard lk(pimpl_->messageStatusMtx_);
1738 pimpl_->memberToStatus.clear();
1739 }
1740}
1741
1742std::string
1744{
1745 {
1746 std::lock_guard lk(pimpl_->loadedHistory_.mutex);
1747 if (!pimpl_->loadedHistory_.messageList.empty())
1748 return (*pimpl_->loadedHistory_.messageList.begin())->id;
1749 }
1751 options.nbOfCommits = 1;
1752 options.skipMerge = true;
1754 std::scoped_lock lock(pimpl_->writeMtx_, optHistory.mutex);
1755 auto res = pimpl_->loadMessages2(options, &optHistory);
1756 if (res.empty())
1757 return {};
1758 return (*optHistory.messageList.begin())->id;
1759}
1760
1761std::vector<std::map<std::string, std::string>>
1763{
1764 if (not repository_) {
1765 JAMI_WARNING("{} Invalid repo. Abort merge", toString());
1766 return {};
1767 }
1768 auto remoteHead = repository_->remoteHead(uri);
1769 if (remoteHead.empty()) {
1770 JAMI_WARNING("{} Unable to get HEAD of {}", toString(), uri);
1771 return {};
1772 }
1773
1774 // Validate commit
1775 auto [newCommits, err] = repository_->validFetch(uri);
1776 if (newCommits.empty()) {
1777 if (err)
1778 JAMI_ERROR("{} Unable to validate history with {}", toString(), uri);
1779 repository_->removeBranchWith(uri);
1780 return {};
1781 }
1782
1783 // If validated, merge
1784 auto [ok, cid] = repository_->merge(remoteHead);
1785 if (!ok) {
1786 JAMI_ERROR("{} Unable to merge history with {}", toString(), uri);
1787 repository_->removeBranchWith(uri);
1788 return {};
1789 }
1790 if (!cid.empty()) {
1791 // A merge commit was generated, should be added in new commits
1792 auto commit = repository_->getCommit(cid);
1793 if (commit != std::nullopt)
1794 newCommits.emplace_back(*commit);
1795 }
1796
1797 JAMI_LOG("{} Successfully merged history with {:s}", toString(), uri);
1798 auto result = repository_->convCommitsToMap(newCommits);
1799 for (auto& commit : result) {
1800 auto it = commit.find("type");
1801 if (it != commit.end() && it->second == "member") {
1802 repository_->refreshMembers();
1803
1804 if (commit["action"] == "ban")
1805 disconnectFromPeer(commit["uri"]);
1806 }
1807 }
1808 return result;
1809}
1810
1811bool
1812Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
1813{
1814 std::lock_guard lk(pimpl_->pullcbsMtx_);
1815 auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId, std::deque<std::pair<std::string, OnPullCb>>());
1816 auto& pullcbs = it->second;
1817 auto itPull = std::find_if(pullcbs.begin(),
1818 pullcbs.end(),
1819 [&](const auto& elem) { return std::get<0>(elem) == commitId; });
1820 if (itPull != pullcbs.end()) {
1821 JAMI_DEBUG("{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress", pimpl_->toString(), deviceId, commitId);
1822 cb(false);
1823 return false;
1824 }
1825 JAMI_DEBUG("{} [device {}] Pulling '{:s}'", pimpl_->toString(), deviceId, commitId);
1826 pullcbs.emplace_back(std::move(commitId), std::move(cb));
1827 if (notInProgress)
1828 dht::ThreadPool::io().run([w = weak(), deviceId] {
1829 if (auto sthis_ = w.lock())
1830 sthis_->pimpl_->pull(deviceId);
1831 });
1832 return true;
1833}
1834
1835void
1836Conversation::Impl::pull(const std::string& deviceId)
1837{
1838 auto& repo = repository_;
1839
1840 std::string commitId;
1841 OnPullCb cb;
1842 while (true) {
1843 {
1844 std::lock_guard lk(pullcbsMtx_);
1845 auto it = fetchingRemotes_.find(deviceId);
1846 if (it == fetchingRemotes_.end()) {
1847 JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId);
1848 break;
1849 }
1850 auto& pullcbs = it->second;
1851 if (pullcbs.empty()) {
1852 fetchingRemotes_.erase(it);
1853 break;
1854 }
1855 auto& elem = pullcbs.front();
1856 commitId = std::move(std::get<0>(elem));
1857 cb = std::move(std::get<1>(elem));
1858 pullcbs.pop_front();
1859 }
1860 // If recently fetched, the commit can already be there, so no need to do complex operations
1861 if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
1862 cb(true);
1863 continue;
1864 }
1865 // Pull from remote
1866 auto fetched = repo->fetch(deviceId);
1867 if (!fetched) {
1868 cb(false);
1869 continue;
1870 }
1871 auto oldHead = repo->getHead();
1872 std::string newHead = oldHead;
1873 std::unique_lock lk(writeMtx_);
1874 auto commits = mergeHistory(deviceId);
1875 if (!commits.empty()) {
1876 newHead = commits.rbegin()->at("id");
1877 // Note: Because clients needs to linearize the history, they need to know all commits
1878 // that can be updated.
1879 // In this case, all commits until the common merge base should be announced.
1880 // The client ill need to update it's model after this.
1881 std::string mergeBase = oldHead; // If fast-forward, the merge base is the previous head
1882 auto newHeadCommit = repo->getCommit(newHead);
1883 if (newHeadCommit != std::nullopt && newHeadCommit->parents.size() > 1) {
1884 mergeBase = repo->mergeBase(newHeadCommit->parents[0], newHeadCommit->parents[1]);
1886 options.to = mergeBase;
1888 // We announce commits from oldest to update to newest. This generally avoid
1889 // to get detached commits until they are all announced.
1890 std::reverse(std::begin(updatedCommits), std::end(updatedCommits));
1891 announce(updatedCommits);
1892 } else {
1893 announce(commits);
1894 }
1895 }
1896 lk.unlock();
1897
1898 bool commitFound = false;
1899 if (commitId != "") {
1900 // If `commitId` is non-empty, then we were attempting to pull a specific commit.
1901 // We need to check if we actually got it; the fact that the fetch above was
1902 // successful doesn't guarantee that we did.
1903 for (const auto& commit : commits) {
1904 if (commit.at("id") == commitId) {
1905 commitFound = true;
1906 break;
1907 }
1908 }
1909 } else {
1910 commitFound = true;
1911 }
1912 if (!commitFound)
1913 JAMI_WARNING("Successfully fetched from device {} but didn't receive expected commit {}",
1914 deviceId, commitId);
1915 // WARNING: If its argument is `true`, this callback will attempt to send a message notification
1916 // for commit `commitId` to other members of the swarm. It's important that we only
1917 // send these notifications if we actually have the commit. Otherwise, we can end up
1918 // in a situation where the members of the swarm keep sending notifications to each
1919 // other for a commit that none of them have (note that we are unable to rule this out, as
1920 // nothing prevents a malicious user from intentionally sending a notification with
1921 // a fake commit ID).
1922 if (cb)
1923 cb(commitFound);
1924 // Announce if profile changed
1925 if (oldHead != newHead) {
1926 auto diffStats = repo->diffStats(newHead, oldHead);
1927 auto changedFiles = repo->changedFiles(diffStats);
1928 if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf")
1929 != changedFiles.end()) {
1931 accountId_, repo->id(), repo->infos());
1932 }
1933 }
1934 }
1935}
1936
1937void
1938Conversation::sync(const std::string& member,
1939 const std::string& deviceId,
1940 OnPullCb&& cb,
1941 std::string commitId)
1942{
1943 pull(deviceId, std::move(cb), commitId);
1944 dht::ThreadPool::io().run([member, deviceId, w = weak_from_this()] {
1945 auto sthis = w.lock();
1946 // For waiting request, downloadFile
1947 for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
1948 sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId);
1949 }
1950 });
1951}
1952
1953std::map<std::string, std::string>
1955{
1956 // Invite the new member to the conversation
1957 Json::Value root;
1959 for (const auto& [k, v] : infos()) {
1960 if (v.size() >= 64000) {
1961 JAMI_WARNING("Cutting invite because the SIP message will be too long");
1962 continue;
1963 }
1964 metadata[k] = v;
1965 }
1967 return {{"application/invite+json", json::toString(root)}};
1968}
1969
1970std::string
1972{
1974 std::lock_guard lk(pimpl_->writeMtx_);
1975 return pimpl_->repository_->leave();
1976}
1977
1978void
1980{
1981 pimpl_->isRemoving_ = true;
1982}
1983
1984bool
1986{
1987 return pimpl_->isRemoving_;
1988}
1989
1990void
1992{
1993 if (pimpl_->conversationDataPath_ != "")
1994 dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
1995 if (!pimpl_->repository_)
1996 return;
1997 std::lock_guard lk(pimpl_->writeMtx_);
1998 pimpl_->repository_->erase();
1999}
2000
2003{
2004 return pimpl_->repository_->mode();
2005}
2006
2007std::vector<std::string>
2009{
2010 return pimpl_->repository_->getInitialMembers();
2011}
2012
2013bool
2014Conversation::isInitialMember(const std::string& uri) const
2015{
2016 auto members = getInitialMembers();
2017 return std::find(members.begin(), members.end(), uri) != members.end();
2018}
2019
2020void
2021Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
2022{
2023 dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
2024 if (auto sthis = w.lock()) {
2025 auto& repo = sthis->pimpl_->repository_;
2026 std::unique_lock lk(sthis->pimpl_->writeMtx_);
2027 auto commit = repo->updateInfos(map);
2028 sthis->pimpl_->announce(commit, true);
2029 lk.unlock();
2030 if (cb)
2031 cb(!commit.empty(), commit);
2032 emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
2033 sthis->pimpl_->accountId_, repo->id(), repo->infos());
2034 }
2035 });
2036}
2037
2038std::map<std::string, std::string>
2040{
2041 return pimpl_->repository_->infos();
2042}
2043
2044void
2045Conversation::updatePreferences(const std::map<std::string, std::string>& map)
2046{
2047 auto filePath = pimpl_->conversationDataPath_ / "preferences";
2048 auto prefs = map;
2049 auto itLast = prefs.find(LAST_MODIFIED);
2050 if (itLast != prefs.end()) {
2051 if (std::filesystem::is_regular_file(filePath)) {
2053 try {
2054 if (lastModified >= std::stoul(itLast->second))
2055 return;
2056 } catch (...) {
2057 return;
2058 }
2059 }
2060 prefs.erase(itLast);
2061 }
2062
2063 std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
2064 msgpack::pack(file, prefs);
2066 id(),
2067 std::move(prefs));
2068}
2069
2070std::map<std::string, std::string>
2072{
2073 try {
2074 std::map<std::string, std::string> preferences;
2075 auto filePath = pimpl_->conversationDataPath_ / "preferences";
2077 msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
2078 oh.get().convert(preferences);
2081 return preferences;
2082 } catch (const std::exception& e) {
2083 }
2084 return {};
2085}
2086
2087std::vector<uint8_t>
2089{
2090 try {
2091 return fileutils::loadFile(pimpl_->repoPath() / "profile.vcf");
2092 } catch (...) {
2093 }
2094 return {};
2095}
2096
2097std::shared_ptr<TransferManager>
2099{
2100 return pimpl_->transferManager_;
2101}
2102
2103bool
2105 const std::string& fileId,
2106 std::filesystem::path& path,
2107 std::string& sha3sum) const
2108{
2109 if (!isMember(member))
2110 return false;
2111
2112 auto sep = fileId.find('_');
2113 if (sep == std::string::npos)
2114 return false;
2115
2116 auto interactionId = fileId.substr(0, sep);
2117 auto commit = getCommit(interactionId);
2118 if (commit == std::nullopt || commit->find("type") == commit->end()
2119 || commit->find("tid") == commit->end() || commit->find("sha3sum") == commit->end()
2120 || commit->at("type") != "application/data-transfer+json") {
2121 JAMI_WARNING("[Account {:s}] {} requested invalid file transfer commit {}",
2122 pimpl_->accountId_,
2123 member,
2124 interactionId);
2125 return false;
2126 }
2127
2128 path = dataTransfer()->path(fileId);
2129 sha3sum = commit->at("sha3sum");
2130
2131 return true;
2132}
2133
2134bool
2135Conversation::downloadFile(const std::string& interactionId,
2136 const std::string& fileId,
2137 const std::string& path,
2138 const std::string&,
2139 const std::string& deviceId)
2140{
2141 auto commit = getCommit(interactionId);
2142 if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
2143 JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
2144 return false;
2145 }
2146 auto tid = commit->find("tid");
2147 auto sha3sum = commit->find("sha3sum");
2148 auto size_str = commit->find("totalSize");
2149
2150 if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
2151 JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
2152 return false;
2153 }
2154
2155 auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
2156 if (totalSize < 0) {
2157 JAMI_ERROR("Invalid file size {}", totalSize);
2158 return false;
2159 }
2160
2161 // Be sure to not lock conversation
2162 dht::ThreadPool().io().run([w = weak(),
2163 deviceId,
2164 fileId,
2165 interactionId,
2166 sha3sum = sha3sum->second,
2167 path,
2168 totalSize] {
2169 if (auto shared = w.lock()) {
2170 std::filesystem::path filePath(path);
2171 if (filePath.empty()) {
2172 filePath = shared->dataTransfer()->path(fileId);
2173 }
2174
2175 if (fileutils::size(filePath) == totalSize) {
2176 if (fileutils::sha3File(filePath) == sha3sum) {
2177 JAMI_WARNING("Ignoring request to download existing file: {}", filePath);
2178 return;
2179 }
2180 }
2181
2182 std::filesystem::path tempFilePath(filePath);
2183 tempFilePath += ".tmp";
2184 auto start = fileutils::size(tempFilePath);
2185 if (start < 0)
2186 start = 0;
2187 size_t end = 0;
2188
2189 auto acc = shared->pimpl_->account_.lock();
2190 if (!acc)
2191 return;
2192 shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2193 acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2194 }
2195 });
2196 return true;
2197}
2198
2199void
2200Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
2201{
2202 dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
2203 auto sthis = w.lock();
2204 if (!sthis)
2205 return;
2206 // Update fetched for Uri
2207 auto uri = sthis->uriFromDevice(deviceId);
2208 if (uri.empty() || uri == sthis->pimpl_->userId_)
2209 return;
2210 // When a user fetches a commit, the message is sent for this person
2211 sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::SENT, commitId, std::to_string(std::time(nullptr)), true);
2212 });
2213}
2214
2215
2216void
2219 const std::string& commitId,
2220 const std::string& ts,
2221 bool emit)
2222{
2223 // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer send us a status and will emit to other connected devices.
2225 std::map<std::string, std::map<std::string, std::string>> newStatus;
2226 {
2227 // Update internal structures.
2228 std::lock_guard lk(messageStatusMtx_);
2229 auto& status = messagesStatus_[uri];
2230 auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
2231 if (oldStatus == commitId)
2232 return; // Nothing to do
2233 options.to = oldStatus;
2234 options.from = commitId;
2236 status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
2237 saveStatus();
2238 if (emit)
2239 newStatus[uri].insert(status.begin(), status.end());
2240 }
2241 if (emit && messageStatusCb_) {
2242 messageStatusCb_(newStatus);
2243 }
2244 // Update messages status for all commit between the old and new one
2245 options.logIfNotFound = false;
2246 options.fastLog = true;
2248 std::lock_guard lk(optHistory.mutex); // Avoid to announce messages while updating status.
2250 if (res.size() == 0) {
2251 // In this case, commit is not received yet, so we cache it
2252 futureStatus[commitId][uri] = static_cast<int32_t>(st);
2253 }
2254 for (const auto& [cid, _]: optHistory.quickAccess) {
2255 auto message = loadedHistory_.quickAccess.find(cid);
2256 if (message != loadedHistory_.quickAccess.end()) {
2257 // Update message and emit to client,
2258 if(static_cast<int32_t>(st) > message->second->status[uri]){
2259 message->second->status[uri] = static_cast<int32_t>(st);
2261 accountId_,
2262 repository_->id(),
2263 uri,
2264 cid,
2265 static_cast<int>(st));
2266 }
2267 } else {
2268 // In this case, commit is not loaded by client, so we cache it
2269 // No need to emit to client, they will get a correct status on load.
2270 futureStatus[cid][uri] = static_cast<int32_t>(st);
2271 }
2272 }
2273}
2274
2275bool
2276Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
2277{
2278 std::lock_guard lk(pimpl_->messageStatusMtx_);
2279 if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
2280 return false; // Nothing to do
2281 dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2282 auto sthis = w.lock();
2283 if (!sthis)
2284 return;
2285 sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::DISPLAYED, interactionId, std::to_string(std::time(nullptr)), true);
2286 });
2287 return true;
2288}
2289
2290std::map<std::string, std::map<std::string, std::string>>
2292{
2293 std::lock_guard lk(pimpl_->messageStatusMtx_);
2294 return pimpl_->messagesStatus_;
2295}
2296
2297void
2298Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
2299{
2300 std::unique_lock lk(pimpl_->messageStatusMtx_);
2301 std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
2302 for (const auto& [uri, status] : messageStatus) {
2303 auto& oldMs = pimpl_->messagesStatus_[uri];
2304 if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
2305 if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
2306 stVec.emplace_back(libjami::Account::MessageStates::SENT, uri, status.at("fetched"), status.at("fetched_ts"));
2307 }
2308 }
2309 if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
2310 if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
2311 stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED, uri, status.at("read"), status.at("read_ts"));
2312 }
2313 }
2314 }
2315 lk.unlock();
2316
2317 for (const auto& [status, uri, commitId, ts] : stVec) {
2318 pimpl_->updateStatus(uri, status, commitId, ts);
2319 }
2320}
2321
2322void
2323Conversation::onMessageStatusChanged(const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
2324{
2325 std::unique_lock lk(pimpl_->messageStatusMtx_);
2326 pimpl_->messageStatusCb_ = cb;
2327}
2328
2329#ifdef LIBJAMI_TEST
2330void
2331Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
2332{
2333 pimpl_->bootstrapCbTest_ = cb;
2334}
2335#endif
2336
2337void
2338Conversation::checkBootstrapMember(const asio::error_code& ec,
2339 std::vector<std::map<std::string, std::string>> members)
2340{
2341 if (ec == asio::error::operation_aborted)
2342 return;
2343 auto acc = pimpl_->account_.lock();
2344 if (pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 or not acc)
2345 return;
2346 // We bootstrap the DRT with devices who already wrote in the repository.
2347 // However, in a conversation, a large number of devices may just watch
2348 // the conversation, but never write any message.
2349 std::unique_lock lock(pimpl_->membersMtx_);
2350
2351 std::string uri;
2352 while (!members.empty()) {
2353 auto member = std::move(members.back());
2354 members.pop_back();
2355 uri = std::move(member.at("uri"));
2356 if (uri != pimpl_->userId_
2357 && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2358 break;
2359 }
2360 auto fallbackFailed = [](auto sthis) {
2361 JAMI_LOG("{} Bootstrap: Fallback failed. Wait for remote connections.",
2362 sthis->pimpl_->toString());
2363#ifdef LIBJAMI_TEST
2364 if (sthis->pimpl_->bootstrapCbTest_)
2365 sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
2366#endif
2367 };
2368 // If members is empty, we finished the fallback un-successfully
2369 if (members.empty() && uri.empty()) {
2370 lock.unlock();
2371 fallbackFailed(this);
2372 return;
2373 }
2374
2375 // Fallback, check devices of a member (we didn't check yet) in the conversation
2376 pimpl_->checkedMembers_.emplace(uri);
2377 auto devices = std::make_shared<std::vector<NodeId>>();
2378 acc->forEachDevice(
2379 dht::InfoHash(uri),
2380 [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
2381 // Test if already sent
2382 if (auto sthis = w.lock()) {
2383 if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
2384 devices->emplace_back(dev->getLongId());
2385 }
2386 },
2387 [w = weak(), devices, members = std::move(members), uri, fallbackFailed=std::move(fallbackFailed)](bool ok) {
2388 auto sthis = w.lock();
2389 if (!sthis)
2390 return;
2391 auto checkNext = true;
2392 if (ok && devices->size() != 0) {
2393#ifdef LIBJAMI_TEST
2394 if (sthis->pimpl_->bootstrapCbTest_)
2395 sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
2396#endif
2397 JAMI_LOG("{} Bootstrap: Fallback with member: {}",
2398 sthis->pimpl_->toString(),
2399 uri);
2400 if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2401 checkNext = false;
2402 }
2403 if (checkNext) {
2404 // Check next member
2405 sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2406 sthis->pimpl_->fallbackTimer_->async_wait(
2407 std::bind(&Conversation::checkBootstrapMember,
2408 sthis,
2409 std::placeholders::_1,
2410 std::move(members)));
2411 } else {
2412 // In this case, all members are checked. Fallback failed
2414 }
2415 });
2416}
2417
2418void
2420 const std::vector<DeviceId>& knownDevices)
2421{
2422 if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2423 return;
2424 // Here, we bootstrap the DRT with devices who already wrote in the conversation
2425 // If this doesn't work, it will attempt to fallback with checkBootstrapMember
2426 // If it works, the callback onConnectionChanged will be called with ok=true
2427 pimpl_->bootstrapCb_ = std::move(onBootstraped);
2428 std::vector<DeviceId> devices = knownDevices;
2429 for (const auto& [member, memberDevices] : pimpl_->repository_->devices()) {
2430 if (!isBanned(member))
2431 devices.insert(devices.end(), memberDevices.begin(), memberDevices.end());
2432 }
2433 JAMI_DEBUG("{} Bootstrap with {} device(s)",
2434 pimpl_->toString(),
2435 devices.size());
2436 // set callback
2437 auto fallback = [](auto sthis, bool now = false) {
2438 // Fallback
2439 auto acc = sthis->pimpl_->account_.lock();
2440 if (!acc)
2441 return;
2442 auto members = sthis->getMembers(false, false);
2443 std::shuffle(members.begin(), members.end(), acc->rand);
2444 if (now) {
2445 sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2446 } else {
2447 auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
2448 sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2449 - std::chrono::seconds(timeForBootstrap));
2450 JAMI_DEBUG("{} Fallback in {} seconds",
2451 sthis->pimpl_->toString(),
2452 (20 - timeForBootstrap));
2453 }
2454 sthis->pimpl_->fallbackTimer_->async_wait(std::bind(&Conversation::checkBootstrapMember,
2455 sthis,
2456 std::placeholders::_1,
2457 std::move(members)));
2458 };
2459
2460 pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
2461 // This will call methods from accounts, so trigger on another thread.
2462 dht::ThreadPool::io().run([w, ok, fallback=std::move(fallback)] {
2463 auto sthis = w.lock();
2464 if (!sthis)
2465 return;
2466 if (ok) {
2467 // Bootstrap succeeded!
2468 {
2469 std::lock_guard lock(sthis->pimpl_->membersMtx_);
2470 sthis->pimpl_->checkedMembers_.clear();
2471 }
2472 if (sthis->pimpl_->bootstrapCb_)
2473 sthis->pimpl_->bootstrapCb_();
2474#ifdef LIBJAMI_TEST
2475 if (sthis->pimpl_->bootstrapCbTest_)
2476 sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
2477#endif
2478 return;
2479 }
2480 fallback(sthis);
2481 });
2482 });
2483 {
2484 std::lock_guard lock(pimpl_->membersMtx_);
2485 pimpl_->checkedMembers_.clear();
2486 }
2487 // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic connectivity change
2488 if (pimpl_->swarmManager_->isShutdown()) {
2489 pimpl_->swarmManager_->restart();
2490 pimpl_->swarmManager_->maintainBuckets();
2491 } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2492 fallback(this, true);
2493 }
2494}
2495
2496std::vector<std::string>
2498{
2499 pimpl_->loadActiveCalls();
2500 pimpl_->loadHostedCalls();
2501 auto commits = pimpl_->commitsEndedCalls();
2502 if (!commits.empty()) {
2503 // Announce to client
2504 dht::ThreadPool::io().run([w = weak(), commits] {
2505 if (auto sthis = w.lock())
2506 sthis->pimpl_->announce(commits, true);
2507 });
2508 }
2509 return commits;
2510}
2511
2512void
2514{
2515 pimpl_->onMembersChanged_ = std::move(cb);
2516 pimpl_->repository_->onMembersChanged([w=weak()] (const std::set<std::string>& memberUris) {
2517 if (auto sthis = w.lock())
2518 sthis->pimpl_->onMembersChanged_(memberUris);
2519 });
2520}
2521
2522void
2524{
2525 pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket),
2526 w=weak()](const std::string& deviceId, ChannelCb&& cb) {
2527 if (auto sthis = w.lock())
2528 needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
2529 };
2530}
2531
2532void
2533Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
2534{
2535 auto deviceId = channel->deviceId();
2536 // Transmit avatar if necessary
2537 // We do this here, because at this point we know both sides are connected and in
2538 // the same conversation
2539 // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
2540 auto cert = channel->peerCertificate();
2541 if (!cert || !cert->issuer)
2542 return;
2543 auto member = cert->issuer->getId().toString();
2544 pimpl_->swarmManager_->addChannel(std::move(channel));
2545 dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
2546 auto sthis = w.lock();
2547 if (auto account = a.lock()) {
2548 account->sendProfile(sthis->id(), member, deviceId.toString());
2549 }
2550 });
2551}
2552
2555 const std::string& fromId,
2556 const std::string& authorUri) const
2557{
2559 options.to = toId;
2560 options.from = fromId;
2561 options.authorUri = authorUri;
2562 options.logIfNotFound = false;
2563 options.fastLog = true;
2565 std::lock_guard lk(history.mutex);
2566 auto res = pimpl_->loadMessages2(options, &history);
2567 return res.size();
2568}
2569
2570void
2572 const Filter& filter,
2573 const std::shared_ptr<std::atomic_int>& flag) const
2574{
2575 // Because logging a conversation can take quite some time,
2576 // do it asynchronously
2577 dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
2578 if (auto sthis = w.lock()) {
2579 History history;
2580 std::vector<std::map<std::string, std::string>> commits {};
2581 // std::regex_constants::ECMAScript is the default flag.
2582 auto re = std::regex(filter.regexSearch,
2583 filter.caseSensitive ? std::regex_constants::ECMAScript
2584 : std::regex_constants::icase);
2585 sthis->pimpl_->repository_->log(
2586 [&](const auto& id, const auto& author, auto& commit) {
2587 if (!filter.author.empty()
2588 && filter.author != sthis->uriFromDevice(author.email)) {
2589 // Filter author
2590 return CallbackResult::Skip;
2591 }
2592 auto commitTime = git_commit_time(commit.get());
2593 if (filter.before && filter.before < commitTime) {
2594 // Only get commits before this date
2595 return CallbackResult::Skip;
2596 }
2597 if (filter.after && filter.after > commitTime) {
2598 // Only get commits before this date
2599 if (git_commit_parentcount(commit.get()) <= 1)
2600 return CallbackResult::Break;
2601 else
2602 return CallbackResult::Skip; // Because we are sorting it with
2603 // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
2604 }
2605
2606 return CallbackResult::Ok; // Continue
2607 },
2608 [&](auto&& cc) {
2609 if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
2610 sthis->pimpl_->addToHistory(history, {optMessage.value()}, false, false);
2611 },
2612 [&](auto id, auto, auto) {
2613 if (id == filter.lastId)
2614 return true;
2615 return false;
2616 },
2617 "",
2618 false);
2619 // Search on generated history
2620 for (auto& message : history.messageList) {
2621 auto contentType = message->type;
2622 auto isSearchable = contentType == "text/plain"
2623 || contentType == "application/data-transfer+json";
2624 if (filter.type.empty() && !isSearchable) {
2625 // Not searchable, at least for now
2626 continue;
2627 } else if (contentType == filter.type || filter.type.empty()) {
2628 if (isSearchable) {
2629 // If it's a text match the body, else the display name
2630 auto body = contentType == "text/plain" ? message->body.at("body")
2631 : message->body.at("displayName");
2632 std::smatch body_match;
2633 if (std::regex_search(body, body_match, re)) {
2634 auto commit = message->body;
2635 commit["id"] = message->id;
2636 commit["type"] = message->type;
2637 commits.emplace_back(commit);
2638 }
2639 } else {
2640 // Matching type, just add it to the results
2641 commits.emplace_back(message->body);
2642 }
2643
2644 if (filter.maxResult != 0 && commits.size() == filter.maxResult)
2645 break;
2646 }
2647 }
2648
2649 if (commits.size() > 0)
2651 sthis->pimpl_->accountId_,
2652 sthis->id(),
2653 std::move(commits));
2654 // If we're the latest thread, inform client that the search is finished
2655 if ((*flag)-- == 1 /* decrement return the old value */) {
2657 req,
2658 sthis->pimpl_->accountId_,
2659 std::string {},
2660 std::vector<std::map<std::string, std::string>> {});
2661 }
2662 }
2663 });
2664}
2665
2666void
2668{
2669 if (!message.isMember("confId")) {
2670 JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2671 return;
2672 }
2673
2674 auto now = std::chrono::system_clock::now();
2675 auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
2676 {
2677 std::lock_guard lk(pimpl_->activeCallsMtx_);
2678 pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
2679 pimpl_->saveHostedCalls();
2680 }
2681
2682 sendMessage(std::move(message), "", {}, std::move(cb));
2683}
2684
2685bool
2686Conversation::isHosting(const std::string& confId) const
2687{
2688 auto info = infos();
2689 if (info["rdvDevice"] == pimpl_->deviceId_ && info["rdvHost"] == pimpl_->userId_)
2690 return true; // We are the current device Host
2691 std::lock_guard lk(pimpl_->activeCallsMtx_);
2692 return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
2693}
2694
2695void
2697{
2698 if (!message.isMember("confId")) {
2699 JAMI_ERROR("{}Malformed commit: no confId", pimpl_->toString());
2700 return;
2701 }
2702
2703 auto erased = false;
2704 {
2705 std::lock_guard lk(pimpl_->activeCallsMtx_);
2706 erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
2707 }
2708 if (erased) {
2709 pimpl_->saveHostedCalls();
2710 sendMessage(std::move(message), "", {}, std::move(cb));
2711 } else
2712 cb(false, "");
2713}
2714
2715std::vector<std::map<std::string, std::string>>
2717{
2718 std::lock_guard lk(pimpl_->activeCallsMtx_);
2719 return pimpl_->activeCalls_;
2720}
2721} // namespace jami
#define _(S)
Definition SHA3.cpp:123
This class gives access to the git repository that represents the conversation.
static LIBJAMI_TEST_EXPORT std::unique_ptr< ConversationRepository > cloneConversation(const std::shared_ptr< JamiAccount > &account, const std::string &deviceId, const std::string &conversationId, std::function< void(std::vector< ConversationCommit >)> &&checkCommitCb={})
Clones a conversation on a remote device.
std::map< std::string, int32_t > memberToStatus
Status: 0 = commited, 1 = fetched, 2 = read This cache the curent status to add in the messages.
void addGitSocket(const DeviceId &deviceId, const std::shared_ptr< dhtnet::ChannelSocket > &socket)
void updateStatus(const std::string &uri, libjami::Account::MessageStates status, const std::string &commitId, const std::string &ts, bool emit=false)
std::filesystem::path conversationDataPath_
Impl(const std::shared_ptr< JamiAccount > &account, const std::string &remoteDevice, const std::string &conversationId)
std::vector< std::string > commitsEndedCalls()
If, for whatever reason, the daemon is stopped while hosting a conference, we need to announce the en...
std::shared_ptr< Typers > typers_
std::shared_ptr< dhtnet::ChannelSocket > gitSocket(const DeviceId &deviceId) const
void removeGitSocket(const DeviceId &deviceId)
std::set< std::string > checkedMembers_
std::filesystem::path statusPath_
std::filesystem::path repoPath() const
void disconnectFromPeer(const std::string &peerUri)
Remove all git sockets and all DRT nodes associated with the given peer.
History loadedHistory_
Loaded history represents the linearized history to show for clients.
void announce(const std::vector< std::string > &commits, bool commitFromSelf=false)
std::map< std::string, std::map< std::string, std::string > > messagesStatus_
std::shared_ptr< asio::io_context > ioContext_
std::unique_ptr< ConversationRepository > repository_
std::filesystem::path preferencesPath_
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
std::filesystem::path activeCallsPath_
std::filesystem::path fetchedPath_
void updateActiveCalls(const std::map< std::string, std::string > &commit, bool eraseOnly=false, bool emitSig=true) const
Update activeCalls_ via announced commits (in load or via new commits)
std::vector< std::map< std::string, std::string > > loadMessages(const LogOptions &options)
void handleReaction(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit) const
std::vector< std::shared_ptr< libjami::SwarmMessage > > addToHistory(History &history, const std::vector< std::map< std::string, std::string > > &commits, bool messageReceived=false, bool commitFromSelf=false)
std::shared_ptr< SwarmManager > swarmManager_
Impl(const std::shared_ptr< JamiAccount > &account, const std::string &conversationId)
void voteUnban(const std::string &contactUri, const std::string_view type, const OnDoneCb &cb)
void announce(const std::string &commitId, bool commitFromSelf=false)
std::shared_ptr< TransferManager > transferManager_
void initActiveCalls(const std::vector< std::map< std::string, std::string > > &commits) const
Initialize activeCalls_ from the list of commits in the repository.
std::atomic_bool isRemoving_
Impl(const std::shared_ptr< JamiAccount > &account, ConversationMode mode, const std::string &otherMember="")
OnMembersChanged onMembersChanged_
std::unique_ptr< asio::steady_timer > fallbackTimer_
std::map< std::string, std::deque< std::pair< std::string, OnPullCb > > > fetchingRemotes_
std::vector< std::map< std::string, std::string > > activeCalls_
void pull(const std::string &deviceId)
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited, bool includeLeft) const
std::function< void(const std::map< std::string, std::map< std::string, std::string > > &)> messageStatusCb_
void announce(const std::vector< std::map< std::string, std::string > > &commits, bool commitFromSelf=false)
std::weak_ptr< JamiAccount > account_
std::string_view bannedType(const std::string &uri) const
void handleEdition(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit, bool messageReceived) const
std::vector< libjami::SwarmMessage > loadMessages2(const LogOptions &options, History *optHistory=nullptr)
std::filesystem::path hostedCallsPath_
std::string toString() const
std::filesystem::path sendingPath_
std::vector< std::map< std::string, std::string > > mergeHistory(const std::string &uri)
std::map< std::string, std::map< std::string, int32_t > > futureStatus
void init(const std::shared_ptr< JamiAccount > &account)
std::mutex messageStatusMtx_
{uri, { {"fetch", "commitId"}, {"fetched_ts", "timestamp"}, {"read", "commitId"}, {"read_ts",...
std::function< void()> bootstrapCb_
std::map< std::string, uint64_t > hostedCalls_
void rectifyStatus(const std::shared_ptr< libjami::SwarmMessage > &message, History &history) const
bool handleMessage(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit, bool messageReceived) const
bool pull(const std::string &deviceId, OnPullCb &&cb, std::string commitId="")
Fetch and merge from peer.
std::set< std::string > memberUris(std::string_view filter={}, const std::set< MemberRole > &filteredRoles={MemberRole::INVITED, MemberRole::LEFT, MemberRole::BANNED}) const
std::shared_ptr< Typers > typers() const
Get Typers object.
bool downloadFile(const std::string &interactionId, const std::string &fileId, const std::string &path, const std::string &member="", const std::string &deviceId="")
Adds a file to the waiting list and ask members.
void addGitSocket(const DeviceId &deviceId, const std::shared_ptr< dhtnet::ChannelSocket > &socket)
std::string leave()
Leave a conversation.
std::string id() const
Get conversation's id.
void clearCache()
Clear all cached messages.
std::vector< std::string > commitsEndedCalls()
Refresh active calls.
std::map< std::string, std::map< std::string, std::string > > messageStatus() const
Retrieve last displayed and fetch status per member.
void loadMessages2(const OnLoadMessages2 &cb, const LogOptions &options)
Get a range of messages.
bool isMember(const std::string &uri, bool includeInvited=false) const
Test if an URI is a member.
void search(uint32_t req, const Filter &filter, const std::shared_ptr< std::atomic_int > &flag) const
Search in the conversation via a filter.
std::vector< uint8_t > vCard() const
std::vector< std::string > getInitialMembers() const
One to one util, get initial members.
void connectivityChanged()
If we change from one network to one another, we will need to update the state of the connections.
std::shared_ptr< TransferManager > dataTransfer() const
Access to transfer manager.
void removeGitSocket(const DeviceId &deviceId)
std::vector< std::map< std::string, std::string > > currentCalls() const
Return current detected calls.
void onMembersChanged(OnMembersChanged &&cb)
void bootstrap(std::function< void()> onBootstraped, const std::vector< DeviceId > &knownDevices)
Bootstrap swarm manager to other peers.
std::map< std::string, std::string > infos() const
Retrieve current infos (title, description, avatar, mode)
void monitor()
Print the state of the DRT linked to the conversation.
std::vector< NodeId > peersToSyncWith() const
Get peers to sync with.
void removeActiveConference(Json::Value &&message, OnDoneCb &&cb={})
Announce the end of a call.
bool isInitialMember(const std::string &uri) const
ConversationMode mode() const
Get conversation's mode.
std::string join()
Join a conversation.
void addSwarmChannel(std::shared_ptr< dhtnet::ChannelSocket > channel)
Add swarm connection to the DRT.
std::vector< jami::DeviceId > getDeviceIdList() const
bool setMessageDisplayed(const std::string &uri, const std::string &interactionId)
Store last read commit (returned in getMembers)
void loadMessages(OnLoadMessages cb, const LogOptions &options)
Get a range of messages.
void updateInfos(const std::map< std::string, std::string > &map, const OnDoneCb &cb={})
Change repository's infos.
void updateMessageStatus(const std::map< std::string, std::map< std::string, std::string > > &messageStatus)
Update fetch/read status.
void onMessageStatusChanged(const std::function< void(const std::map< std::string, std::map< std::string, std::string > > &)> &cb)
void shutdownConnections()
Stop SwarmManager, bootstrap and gitSockets.
void updatePreferences(const std::map< std::string, std::string > &map)
Change user's preferences.
void sendMessages(std::vector< Json::Value > &&messages, OnMultiDoneCb &&cb={})
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited=false, bool includeLeft=false, bool includeBanned=false) const
std::map< std::string, std::string > preferences(bool includeLastModified) const
Retrieve current preferences (color, notification, etc)
void sync(const std::string &member, const std::string &deviceId, OnPullCb &&cb, std::string commitId="")
Fetch new commits and re-ask for waiting files.
std::shared_ptr< dhtnet::ChannelSocket > gitSocket(const DeviceId &deviceId) const
Git operations will need a ChannelSocket for cloning/fetching commits Because libgit2 is a C library,...
void onNeedSocket(NeedSocketCb cb)
Set the callback that will be called whenever a new socket will be needed.
std::string uriFromDevice(const std::string &deviceId) const
Retrieve the uri from a deviceId.
std::optional< std::map< std::string, std::string > > getCommit(const std::string &commitId) const
Retrieve one commit.
void erase()
Erase all related datas.
void setRemovingFlag()
Set a conversation as removing (when loading convInfo and still not sync)
void sendMessage(std::string &&message, const std::string &type="text/plain", const std::string &replyTo="", OnCommitCb &&onCommit={}, OnDoneCb &&cb={})
void hasFetched(const std::string &deviceId, const std::string &commitId)
Store information about who fetch or not.
bool isHosting(const std::string &confId) const
Check if we're currently hosting this conference.
bool isBootstraped() const
Check if we're at least connected to one node.
std::string lastCommitId() const
Get last commit id.
bool isBanned(const std::string &uri) const
Conversation(const std::shared_ptr< JamiAccount > &account, ConversationMode mode, const std::string &otherMember="")
bool onFileChannelRequest(const std::string &member, const std::string &fileId, std::filesystem::path &path, std::string &sha3sum) const
Choose if we can accept channel request.
void addMember(const std::string &contactUri, const OnDoneCb &cb={})
Add conversation member.
bool hasSwarmChannel(const std::string &deviceId)
Used to avoid multiple connections, we just check if we got a swarm channel with a specific device.
bool isRemoving()
Check if we are removing the conversation.
void removeMember(const std::string &contactUri, bool isDevice, const OnDoneCb &cb={})
void hostConference(Json::Value &&message, OnDoneCb &&cb={})
Host a conference in the conversation.
uint32_t countInteractions(const std::string &toId, const std::string &fromId="", const std::string &authorUri="") const
Retrieve how many interactions there is from HEAD to interactionId.
std::map< std::string, std::string > generateInvitation() const
Generate an invitation to send to new contacts.
static LIBJAMI_TEST_EXPORT Manager & instance()
Definition manager.cpp:676
std::shared_ptr< asio::io_context > ioContext() const
Definition manager.cpp:1711
std::mt19937_64 getSeededRandomEngine()
Definition manager.cpp:2737
#define JAMI_ERR(...)
Definition logger.h:218
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:226
#define JAMI_WARN(...)
Definition logger.h:217
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
Definition Address.h:25
static constexpr const char * CREATED
static constexpr const char * REMOVED
static constexpr const char * ID
static constexpr const char * RECEIVED
static constexpr const char * METADATAS
static constexpr const char * MEMBERS
static constexpr const char * DECLINED
static constexpr const char * ERASED
static constexpr const char * FROM
static constexpr const char * CONVERSATIONID
static constexpr const char * HOSTED_CALLS
static constexpr const char * PREFERENCES
static constexpr const char * ACTIVE_CALLS
static constexpr const char * LAST_DISPLAYED
const std::filesystem::path & get_data_dir()
std::vector< uint8_t > loadFile(const std::filesystem::path &path, const std::filesystem::path &default_dir)
Read the full content of a file at path.
uint64_t lastWriteTimeInSeconds(const std::filesystem::path &filePath)
Return the last write time (epoch time) of a given file path (in seconds).
std::filesystem::path getFullPath(const std::filesystem::path &base, const std::filesystem::path &path)
If path is relative, it is appended to base.
std::string toString(const Json::Value &jsonVal)
Definition json_utils.h:42
std::function< void(const std::string &, const std::string &, ChannelCb &&, const std::string &)> NeedSocketCb
std::function< void(std::vector< libjami::SwarmMessage > &&messages)> OnLoadMessages2
dht::PkId DeviceId
static constexpr std::string_view toString(AuthDecodingState state)
dht::h256 NodeId
std::function< void(const std::vector< std::string > &)> OnMultiDoneCb
void emitSignal(Args... args)
Definition ring_signal.h:64
std::function< void(bool, const std::string &)> OnDoneCb
std::list< std::shared_ptr< libjami::SwarmMessage > > MessageList
std::function< void(std::vector< std::map< std::string, std::string > > &&messages)> OnLoadMessages
std::function< bool(const std::shared_ptr< dhtnet::ChannelSocket > &)> ChannelCb
std::function< void(bool fetchOk)> OnPullCb
std::map< DeviceId, std::shared_ptr< dhtnet::ChannelSocket > > GitSocketList
std::function< void(const std::string &)> OnCommitCb
static const char *const LAST_MODIFIED
constexpr auto EFETCH
std::function< void(const std::set< std::string > &)> OnMembersChanged
bool regex_search(string_view sv, svmatch &m, const regex &e, regex_constants::match_flag_type flags=regex_constants::match_default)
std::set< std::string > members
ConvInfo()=default
std::string lastDisplayed
Json::Value toJson() const
std::string id
std::map< std::string, std::string > metadatas
Json::Value toJson() const
std::map< std::string, std::string > toMap() const
std::mutex mutex
std::map< std::string, std::shared_ptr< libjami::SwarmMessage > > quickAccess
std::map< std::string, std::list< std::map< std::string, std::string > > > pendingReactions
MessageList messageList
std::map< std::string, std::list< std::shared_ptr< libjami::SwarmMessage > > > pendingEditions
std::condition_variable cv