33#include <opendht/thread_pool.h>
34#include <fmt/compile.h>
52 members.emplace(v[
"uri"].asString());
86 for (
const auto&
member :
md.getMemberNames()) {
106std::map<std::string, std::string>
118using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
126 std::condition_variable
cv {};
129 std::map<std::string, std::shared_ptr<libjami::SwarmMessage>>
quickAccess {};
130 std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>>
pendingEditions {};
147 throw std::logic_error(
"Unable to create repository");
152 Impl(
const std::shared_ptr<JamiAccount>&
account,
const std::string& conversationId)
160 throw std::logic_error(
"Unable to create repository");
167 const std::string& conversationId)
173 std::vector<ConversationCommit>
commits;
183 throw std::logic_error(
"Unable to clone repository");
187 /
"conversation_data" / conversationId;
200 if (
auto acc =
account.lock()) {
201 return acc->isConnectedWith(deviceId);
207 = std::make_shared<TransferManager>(
accountId_,
234 }
catch (
const std::exception&
e) {
246 std::filesystem::path
repoPath()
const;
250 std::vector<std::string>
vec;
262 if (commit != std::nullopt) {
279 for (
const auto& commit :
commits) {
280 if (commit.at(
"type") ==
"member") {
290 }
else if (commit.find(
"confId") != commit.end() && commit.find(
"uri") != commit.end()
291 && commit.find(
"device") != commit.end()) {
295 auto confId = commit.at(
"confId");
296 auto uri = commit.at(
"uri");
297 auto device = commit.at(
"device");
299 if (commit.find(
"duration") == commit.end()
302 std::map<std::string, std::string>
activeCall;
307 fmt::print(
"swarm:{} new active call detected: {} (on device {}, account {})\n",
338 if (commit.at(
"type") ==
"member") {
344 if (
it->at(
"uri") == commit.at(
"uri") ||
it->at(
"device") == commit.at(
"uri")) {
345 JAMI_DEBUG(
"Removing {:s} from the active calls, because {:s} left",
364 if (commit.find(
"confId") != commit.end() && commit.find(
"uri") != commit.end()
365 && commit.find(
"device") != commit.end()) {
367 auto confId = commit.at(
"confId");
368 auto uri = commit.at(
"uri");
369 auto device = commit.at(
"device");
373 [&](
const auto&
value) {
374 return value.at(
"id") == confId
375 && value.at(
"uri") == uri
376 && value.at(
"device") == device;
378 if (commit.find(
"duration") == commit.end()) {
381 "swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
386 std::map<std::string, std::string>
activeCall;
404 return value.at(
"id") == confId && value.at(
"uri") == uri
405 && value.at(
"device") == device;
408 JAMI_ERROR(
"Duplicate call found. (This is a bug)");
414 JAMI_WARNING(
"previous swarm:{:s} call finished detected: {:s} on device "
415 "{:s}, account {:s}",
421 JAMI_DEBUG(
"swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
447 for (
const auto& c :
commits) {
449 if (c.at(
"type") ==
"member") {
450 if (c.find(
"uri") != c.end() && c.find(
"action") != c.end()) {
451 const auto& uri = c.at(
"uri");
475 }
else if (c.at(
"type") ==
"application/call-history+json") {
507 msgpack::object_handle
oh = msgpack::unpack((
const char*)
file.data(),
file.size());
510 }
catch (
const std::exception&
e) {
525 msgpack::object_handle
oh = msgpack::unpack((
const char*)
file.data(),
file.size());
528 }
catch (
const std::exception&
e) {
545 msgpack::object_handle
oh = msgpack::unpack((
const char*)
file.data(),
file.size());
548 }
catch (
const std::exception&
e) {
568 auto crt = fmt::format(
"{}.crt", uri);
629 void pull(
const std::string& deviceId);
630 std::vector<std::map<std::string, std::string>>
mergeHistory(
const std::string& uri);
650 mutable std::vector<std::map<std::string, std::string>>
activeCalls_ {};
663 std::vector<std::shared_ptr<libjami::SwarmMessage>>
addToHistory(
665 const std::vector<std::map<std::string, std::string>>&
commits,
670 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit)
const;
672 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit,
675 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit,
677 void rectifyStatus(
const std::shared_ptr<libjami::SwarmMessage>& message,
703 mutable std::map<std::string, std::map<std::string, int32_t>>
futureStatus;
708 const std::string&
ts,
726 const auto nodes = swarmManager_->getRoutingTable().getAllNodes();
728 for (
const auto node : nodes)
729 if (peerUri == repository_->uriFromDevice(node.toString()))
731 swarmManager_->deleteNode(
toRemove);
734 for (
auto it = gitSocketList_.begin();
it != gitSocketList_.end();) {
735 if (peerUri == repository_->uriFromDevice(
it->first.toString()))
736 it = gitSocketList_.erase(
it);
742std::vector<std::map<std::string, std::string>>
745 std::vector<std::map<std::string, std::string>> result;
746 auto members = repository_->members();
747 std::lock_guard
lk(messageStatusMtx_);
748 for (
const auto&
member : members) {
758 result.emplace_back(std::move(
mm));
763std::vector<std::string>
767 std::vector<std::string>
commits {};
768 std::unique_lock
lk(writeMtx_);
769 std::unique_lock
lkA(activeCallsMtx_);
776 value[
"uri"] = userId_;
777 value[
"device"] = deviceId_;
779 value[
"type"] =
"application/call-history+json";
780 auto now = std::chrono::system_clock::now();
781 auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch())
784 auto itActive = std::find_if(activeCalls_.begin(),
787 return value.at(
"id") == confId && value.at(
"uri") == userId_
788 && value.at(
"device") == deviceId_;
796 hostedCalls_.clear();
808std::vector<std::map<std::string, std::string>>
813 std::vector<ConversationCommit>
commits;
817 [&](
const auto&
id,
const auto& author,
const auto& commit) {
820 commits.rbegin()->linearized_parent = id;
823 return CallbackResult::Skip;
843 if (
options.authorUri == repository_->uriFromDevice(author.email)) {
854 [&](
auto&& cc) {
commits.emplace(
commits.end(), std::forward<decltype(cc)>(cc)); },
855 [](
auto,
auto,
auto) {
return false; },
858 return repository_->convCommitsToMap(
commits);
861std::vector<libjami::SwarmMessage>
867 if (!repository_ ||
history->loading) {
879 std::vector<std::string>
replies;
880 std::vector<std::shared_ptr<libjami::SwarmMessage>>
msgList;
883 [&](
const auto&
id,
const auto& author,
const auto& commit) {
885 return CallbackResult::Skip;
888 if (options.includeTo)
894 if ((limitNbOfCommits
895 && (loadedHistory_.messageList.size() - currentHistorySize)
896 == options.nbOfCommits))
897 return CallbackResult::Break;
899 return CallbackResult::Break;
900 if (id == options.to && !options.includeTo) {
901 return CallbackResult::Break;
912 if (
options.authorUri == repository_->uriFromDevice(author.email)) {
924 auto optMessage = repository_->convCommitToMap(cc);
928 if (message.find(
"reply-to") != message.end()) {
929 auto it = std::find(
replies.begin(),
replies.end(), message.at(
"reply-to"));
931 replies.emplace_back(message.at(
"reply-to"));
938 std::shared_ptr<libjami::SwarmMessage>
firstMsg;
939 if ((
history == &loadedHistory_) &&
msgList.empty() && !loadedHistory_.messageList.empty()) {
940 firstMsg = *loadedHistory_.messageList.rbegin();
942 auto added = addToHistory(*
history, {message},
false,
false);
951 [&](
auto,
auto,
auto) {
966 std::vector<libjami::SwarmMessage>
ret;
976 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit)
const
1000 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit,
1022 return reaction.at(
"id") == editId;
1040 return reaction.at(
"id") == editId;
1054 it->second->editions.emplace(
it->second->editions.begin(),
it->second->body);
1058 it->second->body[
"fileId"] =
"";
1062 it->second->reactions.clear();
1074 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit,
1079 if (!
history.messageList.empty())
1085 if (!
history.messageList.empty())
1099 if (
sharedCommit->type ==
"application/data-transfer+json") {
1106 for (
const auto& commit :
peditIt->second) {
1129 for (
const auto& [peer,
value] : message->status) {
1149std::vector<std::shared_ptr<libjami::SwarmMessage>>
1151 const std::vector<std::map<std::string, std::string>>&
commits,
1166 auto acc = account_.lock();
1169 auto username = acc->getUsername();
1178 std::vector<std::shared_ptr<libjami::SwarmMessage>>
sharedCommits;
1179 for (
const auto& commit :
commits) {
1183 auto typeIt = commit.find(
"type");
1185 if (
typeIt != commit.end() &&
typeIt->second ==
"merge")
1188 auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1194 if (
itFuture != futureStatus.end()) {
1208 std::lock_guard
lk(messageStatusMtx_);
1209 for (
const auto&
member: repository_->members()) {
1225 auto status = SENDING;
1264 memberToStatus[
member.uri] = status;
1269 std::vector<std::shared_ptr<libjami::SwarmMessage>>
messages;
1295 const std::string& conversationId)
1301 const std::string& conversationId)
1310 return pimpl_->repository_ ? pimpl_->repository_->id() :
"";
1322 JAMI_WARN(
"Unable to add new member in one to one conversation");
1327 }
catch (
const std::exception&
e) {
1328 JAMI_WARN(
"Unable to get mode: %s",
e.what());
1338 if (pimpl_->isAdmin()) {
1339 dht::ThreadPool::io().run(
1341 if (
auto sthis = w.lock()) {
1342 auto members = sthis->pimpl_->repository_->members();
1343 auto type = sthis->pimpl_->bannedType(contactUri);
1359 if (
auto sthis = w.lock()) {
1361 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1362 auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1363 sthis->pimpl_->announce(commit, true);
1366 cb(!commit.empty(), commit);
1371std::shared_ptr<dhtnet::ChannelSocket>
1374 return pimpl_->gitSocket(deviceId);
1379 const std::shared_ptr<dhtnet::ChannelSocket>& socket)
1381 pimpl_->addGitSocket(deviceId, socket);
1387 pimpl_->removeGitSocket(deviceId);
1393 pimpl_->fallbackTimer_->cancel();
1394 pimpl_->gitSocketList_.clear();
1395 if (pimpl_->swarmManager_)
1396 pimpl_->swarmManager_->shutdown();
1397 std::lock_guard
lk(pimpl_->membersMtx_);
1398 pimpl_->checkedMembers_.clear();
1404 if (pimpl_->swarmManager_)
1405 pimpl_->swarmManager_->maintainBuckets();
1408std::vector<jami::DeviceId>
1411 return pimpl_->swarmManager_->getRoutingTable().getAllNodes();
1414std::shared_ptr<Typers>
1417 return pimpl_->typers_;
1423 if (!pimpl_->swarmManager_)
1425 return pimpl_->swarmManager_->isConnectedWith(
DeviceId(deviceId));
1430 const std::string_view type,
1450 std::vector<std::string>
commits;
1463 cb(!lastId.empty(), lastId);
1469 dht::ThreadPool::io().run([w = weak(),
1472 cb = std::move(
cb)] {
1473 if (
auto sthis = w.lock()) {
1475 if (!sthis->pimpl_->isAdmin()) {
1476 JAMI_WARN(
"You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1486 auto members =
sthis->pimpl_->repository_->members();
1487 for (
const auto&
member : members) {
1506 std::unique_lock
lk(
sthis->pimpl_->writeMtx_);
1515 std::vector<std::string>
commits;
1523 JAMI_WARN(
"Vote solved for %s. %s banned",
1531 cb(!lastId.empty(), lastId);
1536std::vector<std::map<std::string, std::string>>
1542std::set<std::string>
1551 const auto&
routingTable = pimpl_->swarmManager_->getRoutingTable();
1554 std::vector<NodeId> s;
1555 s.reserve(nodes.size() +
mobiles.size());
1556 s.insert(s.end(), nodes.begin(), nodes.end());
1558 for (
const auto& [deviceId,
_] : pimpl_->gitSocketList_)
1559 if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1560 s.emplace_back(deviceId);
1567 const auto&
routingTable = pimpl_->swarmManager_->getRoutingTable();
1574 return pimpl_->repository_->uriFromDevice(deviceId);
1580 pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1586 return pimpl_->repository_->join();
1592 auto repoPath = pimpl_->repoPath();
1600 for (
const auto& certificate : dhtnet::fileutils::readDirectory(path)) {
1601 std::string_view
crtUri = certificate;
1604 JAMI_WARNING(
"Incorrect file found: {}/{}", path, certificate);
1607 if (
crtIt != std::string_view::npos)
1627 return !pimpl_->bannedType(uri).empty();
1632 const std::string& type,
1638 json[
"body"] = std::move(message);
1639 json[
"type"] = type;
1650 auto commit = pimpl_->repository_->getCommit(
replyTo);
1651 if (commit == std::nullopt) {
1657 dht::ThreadPool::io().run(
1659 if (
auto sthis = w.lock()) {
1660 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1661 auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1665 sthis->pimpl_->announce(commit, true);
1667 cb(!commit.empty(), commit);
1676 if (
auto sthis = w.lock()) {
1677 std::vector<std::string> commits;
1678 commits.reserve(messages.size());
1679 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1680 for (const auto& message : messages) {
1681 auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(message));
1682 commits.emplace_back(std::move(commit));
1692std::optional<std::map<std::string, std::string>>
1695 auto commit = pimpl_->repository_->getCommit(
commitId);
1696 if (commit == std::nullopt)
1697 return std::nullopt;
1698 return pimpl_->repository_->convCommitToMap(*commit);
1706 dht::ThreadPool::io().run([w = weak(),
cb = std::move(
cb),
options] {
1707 if (
auto sthis = w.lock()) {
1708 cb(sthis->pimpl_->loadMessages(options));
1718 dht::ThreadPool::io().run([w = weak(),
cb = std::move(
cb),
options] {
1719 if (
auto sthis = w.lock()) {
1720 std::lock_guard lk(sthis->pimpl_->loadedHistory_.mutex);
1721 cb(sthis->pimpl_->loadMessages2(options));
1729 std::lock_guard
lk(pimpl_->loadedHistory_.mutex);
1730 pimpl_->loadedHistory_.messageList.clear();
1731 pimpl_->loadedHistory_.quickAccess.clear();
1732 pimpl_->loadedHistory_.pendingEditions.clear();
1733 pimpl_->loadedHistory_.pendingReactions.clear();
1735 std::lock_guard
lk(pimpl_->messageStatusMtx_);
1736 pimpl_->memberToStatus.clear();
1744 std::lock_guard
lk(pimpl_->loadedHistory_.mutex);
1745 if (!pimpl_->loadedHistory_.messageList.empty())
1746 return (*pimpl_->loadedHistory_.messageList.begin())->id;
1752 std::scoped_lock lock(pimpl_->writeMtx_,
optHistory.mutex);
1756 return (*
optHistory.messageList.begin())->id;
1759std::vector<std::map<std::string, std::string>>
1762 if (
not repository_) {
1766 auto remoteHead = repository_->remoteHead(uri);
1767 if (remoteHead.empty()) {
1777 repository_->removeBranchWith(uri);
1782 auto [
ok,
cid] = repository_->merge(remoteHead);
1785 repository_->removeBranchWith(uri);
1790 auto commit = repository_->getCommit(
cid);
1791 if (commit != std::nullopt)
1796 auto result = repository_->convCommitsToMap(
newCommits);
1797 for (
auto& commit : result) {
1798 auto it = commit.find(
"type");
1799 if (
it != commit.end() &&
it->second ==
"member") {
1800 repository_->refreshMembers();
1802 if (commit[
"action"] ==
"ban")
1803 disconnectFromPeer(commit[
"uri"]);
1812 std::lock_guard
lk(pimpl_->pullcbsMtx_);
1813 auto [
it,
notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId, std::deque<std::pair<std::string, OnPullCb>>());
1817 [&](
const auto&
elem) { return std::get<0>(elem) == commitId; });
1819 JAMI_DEBUG(
"{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress", pimpl_->toString(), deviceId,
commitId);
1826 dht::ThreadPool::io().run([w = weak(), deviceId] {
1827 if (
auto sthis_ = w.lock())
1828 sthis_->pimpl_->pull(deviceId);
1836 auto&
repo = repository_;
1842 std::lock_guard
lk(pullcbsMtx_);
1843 auto it = fetchingRemotes_.find(deviceId);
1844 if (
it == fetchingRemotes_.end()) {
1845 JAMI_ERROR(
"Could not find device {:s} in fetchingRemotes", deviceId);
1850 fetchingRemotes_.erase(
it);
1855 cb = std::move(std::get<1>(
elem));
1871 std::unique_lock
lk(writeMtx_);
1872 auto commits = mergeHistory(deviceId);
1879 std::string mergeBase =
oldHead;
1901 for (
const auto& commit :
commits) {
1911 JAMI_WARNING(
"Successfully fetched from device {} but didn't receive expected commit {}",
1925 auto changedFiles =
repo->changedFiles(diffStats);
1926 if (find(changedFiles.begin(), changedFiles.end(),
"profile.vcf")
1927 != changedFiles.end()) {
1929 accountId_,
repo->id(),
repo->infos());
1937 const std::string& deviceId,
1943 auto sthis = w.lock();
1945 for (
const auto&
wr :
sthis->dataTransfer()->waitingRequests()) {
1951std::map<std::string, std::string>
1957 for (
const auto& [
k, v] :
infos()) {
1958 if (v.size() >= 64000) {
1959 JAMI_WARNING(
"Cutting invite because the SIP message will be too long");
1972 std::lock_guard
lk(pimpl_->writeMtx_);
1973 return pimpl_->repository_->leave();
1979 pimpl_->isRemoving_ =
true;
1985 return pimpl_->isRemoving_;
1991 if (pimpl_->conversationDataPath_ !=
"")
1992 dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_,
true);
1993 if (!pimpl_->repository_)
1995 std::lock_guard
lk(pimpl_->writeMtx_);
1996 pimpl_->repository_->erase();
2002 return pimpl_->repository_->mode();
2005std::vector<std::string>
2008 return pimpl_->repository_->getInitialMembers();
2015 return std::find(members.begin(), members.end(), uri) != members.end();
2021 dht::ThreadPool::io().run([w = weak(), map = std::move(map),
cb = std::move(
cb)] {
2022 if (
auto sthis = w.lock()) {
2023 auto& repo = sthis->pimpl_->repository_;
2024 std::unique_lock lk(sthis->pimpl_->writeMtx_);
2025 auto commit = repo->updateInfos(map);
2026 sthis->pimpl_->announce(commit, true);
2029 cb(!commit.empty(), commit);
2030 emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
2031 sthis->pimpl_->accountId_, repo->id(), repo->infos());
2036std::map<std::string, std::string>
2039 return pimpl_->repository_->infos();
2045 auto filePath = pimpl_->conversationDataPath_ /
"preferences";
2049 if (std::filesystem::is_regular_file(
filePath)) {
2061 std::ofstream
file(
filePath, std::ios::trunc | std::ios::binary);
2068std::map<std::string, std::string>
2073 auto filePath = pimpl_->conversationDataPath_ /
"preferences";
2075 msgpack::object_handle
oh = msgpack::unpack((
const char*)
file.data(),
file.size());
2080 }
catch (
const std::exception&
e) {
2095std::shared_ptr<TransferManager>
2098 return pimpl_->transferManager_;
2103 const std::string& fileId,
2104 std::filesystem::path& path,
2105 std::string& sha3sum)
const
2110 auto sep = fileId.find(
'_');
2111 if (
sep == std::string::npos)
2114 auto interactionId = fileId.substr(0,
sep);
2116 if (commit == std::nullopt || commit->find(
"type") == commit->end()
2117 || commit->find(
"tid") == commit->end() || commit->find(
"sha3sum") == commit->end()
2118 || commit->at(
"type") !=
"application/data-transfer+json") {
2119 JAMI_WARNING(
"[Account {:s}] {} requested invalid file transfer commit {}",
2127 sha3sum = commit->at(
"sha3sum");
2134 const std::string& fileId,
2135 const std::string& path,
2137 const std::string& deviceId)
2140 if (commit == std::nullopt || commit->at(
"type") !=
"application/data-transfer+json") {
2141 JAMI_ERROR(
"Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId,
id());
2144 auto tid = commit->find(
"tid");
2145 auto sha3sum = commit->find(
"sha3sum");
2146 auto size_str = commit->find(
"totalSize");
2148 if (
tid == commit->end() || sha3sum == commit->end() ||
size_str == commit->end()) {
2149 JAMI_ERROR(
"Invalid file transfer commit (missing tid, size or sha3)");
2154 if (totalSize < 0) {
2155 JAMI_ERROR(
"Invalid file size {}", totalSize);
2160 dht::ThreadPool().io().run([w = weak(),
2164 sha3sum = sha3sum->second,
2167 if (auto shared = w.lock()) {
2168 std::filesystem::path filePath(path);
2169 if (filePath.empty()) {
2170 filePath = shared->dataTransfer()->path(fileId);
2173 if (fileutils::size(filePath) == totalSize) {
2174 if (fileutils::sha3File(filePath) == sha3sum) {
2175 JAMI_WARNING(
"Ignoring request to download existing file: {}", filePath);
2180 std::filesystem::path tempFilePath(filePath);
2181 tempFilePath +=
".tmp";
2182 auto start = fileutils::size(tempFilePath);
2187 auto acc = shared->pimpl_->account_.lock();
2190 shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2191 acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2200 dht::ThreadPool::io().run([w = weak(), deviceId,
commitId]() {
2201 auto sthis = w.lock();
2205 auto uri =
sthis->uriFromDevice(deviceId);
2206 if (uri.empty() || uri ==
sthis->pimpl_->userId_)
2218 const std::string&
ts,
2223 std::map<std::string, std::map<std::string, std::string>>
newStatus;
2226 std::lock_guard
lk(messageStatusMtx_);
2227 auto& status = messagesStatus_[uri];
2237 newStatus[uri].insert(status.begin(), status.end());
2239 if (emit && messageStatusCb_) {
2243 options.logIfNotFound =
false;
2248 if (
res.size() == 0) {
2253 auto message = loadedHistory_.quickAccess.find(
cid);
2254 if (message != loadedHistory_.quickAccess.end()) {
2256 if(
static_cast<int32_t>(
st) > message->second->status[uri]){
2257 message->second->status[uri] =
static_cast<int32_t>(
st);
2263 static_cast<int>(
st));
2276 std::lock_guard
lk(pimpl_->messageStatusMtx_);
2277 if (pimpl_->messagesStatus_[uri][
"read"] == interactionId)
2279 dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2280 auto sthis = w.lock();
2288std::map<std::string, std::map<std::string, std::string>>
2291 std::lock_guard
lk(pimpl_->messageStatusMtx_);
2292 return pimpl_->messagesStatus_;
2298 std::unique_lock
lk(pimpl_->messageStatusMtx_);
2299 std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>>
stVec;
2301 auto&
oldMs = pimpl_->messagesStatus_[uri];
2302 if (status.find(
"fetched_ts") != status.end() && status.at(
"fetched") !=
oldMs[
"fetched"]) {
2303 if (
oldMs[
"fetched_ts"].empty() || std::stol(
oldMs[
"fetched_ts"]) <= std::stol(status.at(
"fetched_ts"))) {
2307 if (status.find(
"read_ts") != status.end() && status.at(
"read") !=
oldMs[
"read"]) {
2308 if (
oldMs[
"read_ts"].empty() || std::stol(
oldMs[
"read_ts"]) <= std::stol(status.at(
"read_ts"))) {
2316 pimpl_->updateStatus(uri, status,
commitId,
ts);
2323 std::unique_lock
lk(pimpl_->messageStatusMtx_);
2324 pimpl_->messageStatusCb_ =
cb;
2329Conversation::onBootstrapStatus(
const std::function<
void(std::string,
BootstrapStatus)>&
cb)
2331 pimpl_->bootstrapCbTest_ =
cb;
2336Conversation::checkBootstrapMember(
const asio::error_code&
ec,
2337 std::vector<std::map<std::string, std::string>> members)
2339 if (
ec == asio::error::operation_aborted)
2341 auto acc = pimpl_->account_.lock();
2342 if (pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0
or not acc)
2347 std::unique_lock lock(pimpl_->membersMtx_);
2350 while (!members.empty()) {
2351 auto member = std::move(members.back());
2353 uri = std::move(
member.at(
"uri"));
2354 if (uri != pimpl_->userId_
2355 && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
2359 JAMI_LOG(
"{} Bootstrap: Fallback failed. Wait for remote connections.",
2360 sthis->pimpl_->toString());
2362 if (
sthis->pimpl_->bootstrapCbTest_)
2363 sthis->pimpl_->bootstrapCbTest_(
sthis->id(), BootstrapStatus::FAILED);
2367 if (members.empty() && uri.empty()) {
2374 pimpl_->checkedMembers_.emplace(uri);
2375 auto devices = std::make_shared<std::vector<NodeId>>();
2378 [w = weak(), devices](
const std::shared_ptr<dht::crypto::PublicKey>&
dev) {
2380 if (
auto sthis = w.lock()) {
2381 if (!
sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(
dev->getLongId()))
2382 devices->emplace_back(
dev->getLongId());
2386 auto sthis = w.lock();
2390 if (
ok && devices->size() != 0) {
2392 if (
sthis->pimpl_->bootstrapCbTest_)
2393 sthis->pimpl_->bootstrapCbTest_(
sthis->id(), BootstrapStatus::FALLBACK);
2395 JAMI_LOG(
"{} Bootstrap: Fallback with member: {}",
2396 sthis->pimpl_->toString(),
2398 if (
sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
2403 sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2404 sthis->pimpl_->fallbackTimer_->async_wait(
2405 std::bind(&Conversation::checkBootstrapMember,
2407 std::placeholders::_1,
2408 std::move(members)));
2420 if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2437 auto acc =
sthis->pimpl_->account_.lock();
2440 auto members =
sthis->getMembers(
false,
false);
2441 std::shuffle(members.begin(), members.end(), acc->rand);
2443 sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
2446 sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
2449 sthis->pimpl_->toString(),
2452 sthis->pimpl_->fallbackTimer_->async_wait(std::bind(&Conversation::checkBootstrapMember,
2454 std::placeholders::_1,
2455 std::move(members)));
2458 pimpl_->swarmManager_->onConnectionChanged([w = weak(),
fallback](
bool ok) {
2461 auto sthis = w.lock();
2467 std::lock_guard lock(
sthis->pimpl_->membersMtx_);
2468 sthis->pimpl_->checkedMembers_.clear();
2470 if (
sthis->pimpl_->bootstrapCb_)
2471 sthis->pimpl_->bootstrapCb_();
2473 if (
sthis->pimpl_->bootstrapCbTest_)
2474 sthis->pimpl_->bootstrapCbTest_(
sthis->id(), BootstrapStatus::SUCCESS);
2482 std::lock_guard lock(pimpl_->membersMtx_);
2483 pimpl_->checkedMembers_.clear();
2486 if (pimpl_->swarmManager_->isShutdown()) {
2487 pimpl_->swarmManager_->restart();
2488 pimpl_->swarmManager_->maintainBuckets();
2489 }
else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
2494std::vector<std::string>
2497 pimpl_->loadActiveCalls();
2498 pimpl_->loadHostedCalls();
2499 auto commits = pimpl_->commitsEndedCalls();
2502 dht::ThreadPool::io().run([w = weak(),
commits] {
2503 if (
auto sthis = w.lock())
2513 pimpl_->onMembersChanged_ = std::move(
cb);
2514 pimpl_->repository_->onMembersChanged([w=weak()] (
const std::set<std::string>&
memberUris) {
2515 if (
auto sthis = w.lock())
2524 w=weak()](
const std::string& deviceId,
ChannelCb&&
cb) {
2525 if (
auto sthis = w.lock())
2526 needSocket(
sthis->id(), deviceId, std::move(
cb),
"application/im-gitmessage-id");
2533 auto deviceId = channel->deviceId();
2538 auto cert = channel->peerCertificate();
2541 auto member =
cert->issuer->getId().toString();
2542 pimpl_->swarmManager_->addChannel(std::move(channel));
2544 auto sthis = w.lock();
2546 account->sendProfile(sthis->id(), member, deviceId.toString());
2553 const std::string&
fromId,
2554 const std::string& authorUri)
const
2559 options.authorUri = authorUri;
2560 options.logIfNotFound =
false;
2571 const std::shared_ptr<std::atomic_int>&
flag)
const
2575 dht::ThreadPool::io().run([w = weak(), req,
filter,
flag] {
2576 if (
auto sthis = w.lock()) {
2578 std::vector<std::map<std::string, std::string>> commits {};
2580 auto re = std::regex(
filter.regexSearch,
2581 filter.caseSensitive ? std::regex_constants::ECMAScript
2582 : std::regex_constants::icase);
2583 sthis->pimpl_->repository_->log(
2584 [&](
const auto&
id,
const auto& author,
auto& commit) {
2585 if (!
filter.author.empty()
2586 &&
filter.author !=
sthis->uriFromDevice(author.email)) {
2607 if (
auto optMessage =
sthis->pimpl_->repository_->convCommitToMap(cc))
2610 [&](
auto id,
auto,
auto) {
2618 for (
auto& message :
history.messageList) {
2621 ||
contentType ==
"application/data-transfer+json";
2628 auto body =
contentType ==
"text/plain" ? message->body.at(
"body")
2629 : message->body.at(
"displayName");
2632 auto commit = message->body;
2633 commit[
"id"] = message->id;
2634 commit[
"type"] = message->type;
2639 commits.emplace_back(message->body);
2649 sthis->pimpl_->accountId_,
2653 if ((*
flag)-- == 1 ) {
2656 sthis->pimpl_->accountId_,
2658 std::vector<std::map<std::string, std::string>> {});
2667 if (!message.isMember(
"confId")) {
2668 JAMI_ERROR(
"{}Malformed commit: no confId", pimpl_->toString());
2672 auto now = std::chrono::system_clock::now();
2673 auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
2675 std::lock_guard
lk(pimpl_->activeCallsMtx_);
2676 pimpl_->hostedCalls_[message[
"confId"].asString()] =
nowSecs;
2677 pimpl_->saveHostedCalls();
2686 auto info =
infos();
2687 if (info[
"rdvDevice"] == pimpl_->deviceId_ && info[
"rdvHost"] == pimpl_->userId_)
2689 std::lock_guard
lk(pimpl_->activeCallsMtx_);
2690 return pimpl_->hostedCalls_.find(
confId) != pimpl_->hostedCalls_.end();
2696 if (!message.isMember(
"confId")) {
2697 JAMI_ERROR(
"{}Malformed commit: no confId", pimpl_->toString());
2701 auto erased =
false;
2703 std::lock_guard
lk(pimpl_->activeCallsMtx_);
2704 erased = pimpl_->hostedCalls_.erase(message[
"confId"].asString());
2707 pimpl_->saveHostedCalls();
2713std::vector<std::map<std::string, std::string>>
2716 std::lock_guard
lk(pimpl_->activeCallsMtx_);
2717 return pimpl_->activeCalls_;
This class gives access to the git repository that represents the conversation.
static LIBJAMI_TEST_EXPORT std::unique_ptr< ConversationRepository > cloneConversation(const std::shared_ptr< JamiAccount > &account, const std::string &deviceId, const std::string &conversationId, std::function< void(std::vector< ConversationCommit >)> &&checkCommitCb={})
Clones a conversation on a remote device.
std::map< std::string, int32_t > memberToStatus
Status: 0 = commited, 1 = fetched, 2 = read This cache the curent status to add in the messages.
GitSocketList gitSocketList_
void addGitSocket(const DeviceId &deviceId, const std::shared_ptr< dhtnet::ChannelSocket > &socket)
void updateStatus(const std::string &uri, libjami::Account::MessageStates status, const std::string &commitId, const std::string &ts, bool emit=false)
std::filesystem::path conversationDataPath_
Impl(const std::shared_ptr< JamiAccount > &account, const std::string &remoteDevice, const std::string &conversationId)
std::vector< std::string > commitsEndedCalls()
If, for whatever reason, the daemon is stopped while hosting a conference, we need to announce the en...
std::shared_ptr< Typers > typers_
std::shared_ptr< dhtnet::ChannelSocket > gitSocket(const DeviceId &deviceId) const
void removeGitSocket(const DeviceId &deviceId)
std::set< std::string > checkedMembers_
std::filesystem::path statusPath_
std::filesystem::path repoPath() const
void disconnectFromPeer(const std::string &peerUri)
Remove all git sockets and all DRT nodes associated with the given peer.
History loadedHistory_
Loaded history represents the linearized history to show for clients.
void announce(const std::vector< std::string > &commits, bool commitFromSelf=false)
std::map< std::string, std::map< std::string, std::string > > messagesStatus_
std::shared_ptr< asio::io_context > ioContext_
std::mutex activeCallsMtx_
std::unique_ptr< ConversationRepository > repository_
std::filesystem::path preferencesPath_
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
std::filesystem::path activeCallsPath_
std::filesystem::path fetchedPath_
void updateActiveCalls(const std::map< std::string, std::string > &commit, bool eraseOnly=false, bool emitSig=true) const
Update activeCalls_ via announced commits (in load or via new commits)
std::vector< std::map< std::string, std::string > > loadMessages(const LogOptions &options)
void handleReaction(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit) const
std::vector< std::shared_ptr< libjami::SwarmMessage > > addToHistory(History &history, const std::vector< std::map< std::string, std::string > > &commits, bool messageReceived=false, bool commitFromSelf=false)
std::shared_ptr< SwarmManager > swarmManager_
Impl(const std::shared_ptr< JamiAccount > &account, const std::string &conversationId)
void voteUnban(const std::string &contactUri, const std::string_view type, const OnDoneCb &cb)
void announce(const std::string &commitId, bool commitFromSelf=false)
std::shared_ptr< TransferManager > transferManager_
void saveHostedCalls() const
void initActiveCalls(const std::vector< std::map< std::string, std::string > > &commits) const
Initialize activeCalls_ from the list of commits in the repository.
std::atomic_bool isRemoving_
Impl(const std::shared_ptr< JamiAccount > &account, ConversationMode mode, const std::string &otherMember="")
OnMembersChanged onMembersChanged_
std::unique_ptr< asio::steady_timer > fallbackTimer_
std::map< std::string, std::deque< std::pair< std::string, OnPullCb > > > fetchingRemotes_
std::vector< std::map< std::string, std::string > > activeCalls_
void pull(const std::string &deviceId)
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited, bool includeLeft) const
std::function< void(const std::map< std::string, std::map< std::string, std::string > > &)> messageStatusCb_
void announce(const std::vector< std::map< std::string, std::string > > &commits, bool commitFromSelf=false)
std::weak_ptr< JamiAccount > account_
std::string_view bannedType(const std::string &uri) const
void handleEdition(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit, bool messageReceived) const
void saveActiveCalls() const
std::vector< libjami::SwarmMessage > loadMessages2(const LogOptions &options, History *optHistory=nullptr)
void loadHostedCalls() const
std::filesystem::path hostedCallsPath_
std::string toString() const
std::filesystem::path sendingPath_
std::vector< std::map< std::string, std::string > > mergeHistory(const std::string &uri)
std::map< std::string, std::map< std::string, int32_t > > futureStatus
void init(const std::shared_ptr< JamiAccount > &account)
void loadActiveCalls() const
std::mutex messageStatusMtx_
{uri, { {"fetch", "commitId"}, {"fetched_ts", "timestamp"}, {"read", "commitId"}, {"read_ts",...
std::function< void()> bootstrapCb_
std::map< std::string, uint64_t > hostedCalls_
void rectifyStatus(const std::shared_ptr< libjami::SwarmMessage > &message, History &history) const
bool handleMessage(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit, bool messageReceived) const
bool pull(const std::string &deviceId, OnPullCb &&cb, std::string commitId="")
Fetch and merge from peer.
std::set< std::string > memberUris(std::string_view filter={}, const std::set< MemberRole > &filteredRoles={MemberRole::INVITED, MemberRole::LEFT, MemberRole::BANNED}) const
std::shared_ptr< Typers > typers() const
Get Typers object.
bool downloadFile(const std::string &interactionId, const std::string &fileId, const std::string &path, const std::string &member="", const std::string &deviceId="")
Adds a file to the waiting list and ask members.
void addGitSocket(const DeviceId &deviceId, const std::shared_ptr< dhtnet::ChannelSocket > &socket)
std::string leave()
Leave a conversation.
std::string id() const
Get conversation's id.
void clearCache()
Clear all cached messages.
std::vector< std::string > commitsEndedCalls()
Refresh active calls.
std::map< std::string, std::map< std::string, std::string > > messageStatus() const
Retrieve last displayed and fetch status per member.
void loadMessages2(const OnLoadMessages2 &cb, const LogOptions &options)
Get a range of messages.
bool isMember(const std::string &uri, bool includeInvited=false) const
Test if an URI is a member.
void search(uint32_t req, const Filter &filter, const std::shared_ptr< std::atomic_int > &flag) const
Search in the conversation via a filter.
std::vector< uint8_t > vCard() const
std::vector< std::string > getInitialMembers() const
One to one util, get initial members.
void connectivityChanged()
If we change from one network to one another, we will need to update the state of the connections.
std::shared_ptr< TransferManager > dataTransfer() const
Access to transfer manager.
void removeGitSocket(const DeviceId &deviceId)
std::vector< std::map< std::string, std::string > > currentCalls() const
Return current detected calls.
void onMembersChanged(OnMembersChanged &&cb)
void bootstrap(std::function< void()> onBootstraped, const std::vector< DeviceId > &knownDevices)
Bootstrap swarm manager to other peers.
std::map< std::string, std::string > infos() const
Retrieve current infos (title, description, avatar, mode)
void monitor()
Print the state of the DRT linked to the conversation.
std::vector< NodeId > peersToSyncWith() const
Get peers to sync with.
void removeActiveConference(Json::Value &&message, OnDoneCb &&cb={})
Announce the end of a call.
bool isInitialMember(const std::string &uri) const
ConversationMode mode() const
Get conversation's mode.
std::string join()
Join a conversation.
void addSwarmChannel(std::shared_ptr< dhtnet::ChannelSocket > channel)
Add swarm connection to the DRT.
std::vector< jami::DeviceId > getDeviceIdList() const
bool setMessageDisplayed(const std::string &uri, const std::string &interactionId)
Store last read commit (returned in getMembers)
void loadMessages(OnLoadMessages cb, const LogOptions &options)
Get a range of messages.
void updateInfos(const std::map< std::string, std::string > &map, const OnDoneCb &cb={})
Change repository's infos.
void updateMessageStatus(const std::map< std::string, std::map< std::string, std::string > > &messageStatus)
Update fetch/read status.
void onMessageStatusChanged(const std::function< void(const std::map< std::string, std::map< std::string, std::string > > &)> &cb)
void shutdownConnections()
Stop SwarmManager, bootstrap and gitSockets.
void updatePreferences(const std::map< std::string, std::string > &map)
Change user's preferences.
void sendMessages(std::vector< Json::Value > &&messages, OnMultiDoneCb &&cb={})
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited=false, bool includeLeft=false, bool includeBanned=false) const
std::map< std::string, std::string > preferences(bool includeLastModified) const
Retrieve current preferences (color, notification, etc)
void sync(const std::string &member, const std::string &deviceId, OnPullCb &&cb, std::string commitId="")
Fetch new commits and re-ask for waiting files.
std::shared_ptr< dhtnet::ChannelSocket > gitSocket(const DeviceId &deviceId) const
Git operations will need a ChannelSocket for cloning/fetching commits Because libgit2 is a C library,...
void onNeedSocket(NeedSocketCb cb)
Set the callback that will be called whenever a new socket will be needed.
std::string uriFromDevice(const std::string &deviceId) const
Retrieve the uri from a deviceId.
std::optional< std::map< std::string, std::string > > getCommit(const std::string &commitId) const
Retrieve one commit.
void erase()
Erase all related datas.
void setRemovingFlag()
Set a conversation as removing (when loading convInfo and still not sync)
void sendMessage(std::string &&message, const std::string &type="text/plain", const std::string &replyTo="", OnCommitCb &&onCommit={}, OnDoneCb &&cb={})
void hasFetched(const std::string &deviceId, const std::string &commitId)
Store information about who fetch or not.
bool isHosting(const std::string &confId) const
Check if we're currently hosting this conference.
bool isBootstraped() const
Check if we're at least connected to one node.
std::string lastCommitId() const
Get last commit id.
bool isBanned(const std::string &uri) const
Conversation(const std::shared_ptr< JamiAccount > &account, ConversationMode mode, const std::string &otherMember="")
bool onFileChannelRequest(const std::string &member, const std::string &fileId, std::filesystem::path &path, std::string &sha3sum) const
Choose if we can accept channel request.
void addMember(const std::string &contactUri, const OnDoneCb &cb={})
Add conversation member.
bool hasSwarmChannel(const std::string &deviceId)
Used to avoid multiple connections, we just check if we got a swarm channel with a specific device.
bool isRemoving()
Check if we are removing the conversation.
void removeMember(const std::string &contactUri, bool isDevice, const OnDoneCb &cb={})
void hostConference(Json::Value &&message, OnDoneCb &&cb={})
Host a conference in the conversation.
uint32_t countInteractions(const std::string &toId, const std::string &fromId="", const std::string &authorUri="") const
Retrieve how many interactions there is from HEAD to interactionId.
std::map< std::string, std::string > generateInvitation() const
Generate an invitation to send to new contacts.
static LIBJAMI_TEST_EXPORT Manager & instance()
std::shared_ptr< asio::io_context > ioContext() const
std::mt19937_64 getSeededRandomEngine()
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_WARNING(formatstr,...)
#define JAMI_LOG(formatstr,...)
static constexpr const char * CREATED
static constexpr const char * REMOVED
static constexpr const char * ID
static constexpr const char * RECEIVED
static constexpr const char * METADATAS
static constexpr const char * MEMBERS
static constexpr const char * DECLINED
static constexpr const char * ERASED
static constexpr const char * FROM
static constexpr const char * CONVERSATIONID
static constexpr const char * HOSTED_CALLS
static constexpr const char * PREFERENCES
static constexpr const char * ACTIVE_CALLS
static constexpr const char * LAST_DISPLAYED
const std::filesystem::path & get_data_dir()
std::vector< uint8_t > loadFile(const std::filesystem::path &path, const std::filesystem::path &default_dir)
Read the full content of a file at path.
uint64_t lastWriteTimeInSeconds(const std::filesystem::path &filePath)
Return the last write time (epoch time) of a given file path (in seconds).
std::filesystem::path getFullPath(const std::filesystem::path &base, const std::filesystem::path &path)
If path is relative, it is appended to base.
std::string toString(const Json::Value &jsonVal)
std::function< void(const std::string &, const std::string &, ChannelCb &&, const std::string &)> NeedSocketCb
std::function< void(std::vector< libjami::SwarmMessage > &&messages)> OnLoadMessages2
static constexpr std::string_view toString(AuthDecodingState state)
std::function< void(const std::vector< std::string > &)> OnMultiDoneCb
void emitSignal(Args... args)
std::function< void(bool, const std::string &)> OnDoneCb
std::list< std::shared_ptr< libjami::SwarmMessage > > MessageList
std::function< void(std::vector< std::map< std::string, std::string > > &&messages)> OnLoadMessages
std::function< bool(const std::shared_ptr< dhtnet::ChannelSocket > &)> ChannelCb
std::function< void(bool fetchOk)> OnPullCb
std::map< DeviceId, std::shared_ptr< dhtnet::ChannelSocket > > GitSocketList
std::function< void(const std::string &)> OnCommitCb
static const char *const LAST_MODIFIED
std::function< void(const std::set< std::string > &)> OnMembersChanged
bool regex_search(string_view sv, svmatch &m, const regex &e, regex_constants::match_flag_type flags=regex_constants::match_default)
std::set< std::string > members
std::string lastDisplayed
Json::Value toJson() const
ConversationRequest()=default
std::map< std::string, std::string > metadatas
Json::Value toJson() const
std::string conversationId
std::map< std::string, std::string > toMap() const
std::map< std::string, std::shared_ptr< libjami::SwarmMessage > > quickAccess
std::map< std::string, std::list< std::map< std::string, std::string > > > pendingReactions
std::map< std::string, std::list< std::shared_ptr< libjami::SwarmMessage > > > pendingEditions
std::condition_variable cv