38#include <opendht/thread_pool.h>
39#include <opendht/infohash.h>
40#include <fmt/compile.h>
41#include <asio/error_code.hpp>
68 members.emplace(v[
"uri"].asString());
102 for (
const auto&
member :
md.getMemberNames()) {
116 for (
const auto& [key, value] :
metadatas) {
122std::map<std::string, std::string>
134using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
142 std::condition_variable
cv {};
145 std::map<std::string, std::shared_ptr<libjami::SwarmMessage>>
quickAccess {};
146 std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>>
pendingEditions {};
153 Impl(std::unique_ptr<ConversationRepository>&& repository,
154 const std::shared_ptr<JamiAccount>&
account,
155 std::vector<ConversationCommit>&&
commits = {})
156 :
repository_(std::move(repository ? repository :
throw std::logic_error(
"Invalid repository")))
164 if (
auto acc =
account.lock()) {
165 return acc->isConnectedWith(deviceId);
190 Impl(std::pair<std::unique_ptr<ConversationRepository>, std::vector<ConversationCommit>>&&
repoAndCommits,
191 const std::shared_ptr<JamiAccount>&
account)
200 Impl(
const std::shared_ptr<JamiAccount>&
account,
const std::string& conversationId)
232 std::vector<std::string>
vec;
260 for (
const auto& commit :
commits) {
261 if (commit.at(
"type") ==
"member") {
271 }
else if (commit.find(
"confId") != commit.end() && commit.find(
"uri") != commit.end()
272 && commit.find(
"device") != commit.end()) {
276 auto confId = commit.at(
"confId");
277 auto uri = commit.at(
"uri");
278 auto device = commit.at(
"device");
282 std::map<std::string, std::string>
activeCall;
287 fmt::print(
"swarm:{} new active call detected: {} (on device {}, account {})\n",
316 if (commit.at(
"type") ==
"member") {
322 if (
it->at(
"uri") == commit.at(
"uri") ||
it->at(
"device") == commit.at(
"uri")) {
323 JAMI_DEBUG(
"Removing {:s} from the active calls, because {:s} left",
it->at(
"id"), commit.at(
"uri"));
340 if (commit.find(
"confId") != commit.end() && commit.find(
"uri") != commit.end()
341 && commit.find(
"device") != commit.end()) {
343 auto confId = commit.at(
"confId");
344 auto uri = commit.at(
"uri");
345 auto device = commit.at(
"device");
348 return value.at(
"id") == confId && value.at(
"uri") == uri && value.at(
"device") == device;
350 if (commit.find(
"duration") == commit.end()) {
352 JAMI_DEBUG(
"swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
357 activeCalls_.emplace_back(std::map<std::string, std::string> {
374 return value.at(
"id") == confId && value.at(
"uri") == uri && value.at(
"device") == device;
377 JAMI_ERROR(
"Duplicate call found. (This is a bug)");
383 JAMI_WARNING(
"previous swarm:{:s} call finished detected: {:s} on device "
384 "{:s}, account {:s}",
390 JAMI_DEBUG(
"swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
416 for (
const auto& c :
commits) {
418 if (c.at(
"type") ==
"member") {
419 if (c.find(
"uri") != c.end() && c.find(
"action") != c.end()) {
420 const auto& uri = c.at(
"uri");
446 }
else if (c.at(
"type") ==
"application/call-history+json") {
471 msgpack::object_handle
oh = msgpack::unpack((
const char*)
file.data(),
file.size());
474 }
catch (
const std::exception&
e) {
489 msgpack::object_handle
oh = msgpack::unpack((
const char*)
file.data(),
file.size());
492 }
catch (
const std::exception&
e) {
509 msgpack::object_handle
oh = msgpack::unpack((
const char*)
file.data(),
file.size());
512 }
catch (
const std::exception&
e) {
533 auto crt = fmt::format(
"{}.crt", uri);
590 void pull(
const std::string& deviceId);
630 mutable std::vector<std::map<std::string, std::string>>
activeCalls_ {};
641 std::vector<std::shared_ptr<libjami::SwarmMessage>>
addToHistory(
643 const std::vector<std::map<std::string, std::string>>&
commits,
649 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit,
652 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit,
682 const std::string&
ts,
698 acc->presenceManager()->untrackBuddy(
it->first);
716 auto acc = account_.lock();
721 std::lock_guard
lk(trackedMembersMtx_);
725 presenceDeviceListenerToken_ = acc->presenceManager()->addDeviceListener(
726 [w](
const std::string& uri,
const DeviceId& deviceId,
bool online) {
727 if (
auto sthis = w.lock()) {
728 if (online && sthis->isMember(uri)) {
729 sthis->addKnownDevices({deviceId}, uri);
735 rotateTrackedMembers();
765 acc->presenceManager()->removeDeviceListener(token);
769 acc->presenceManager()->untrackBuddy(uri);
776 auto acc = account_.lock();
780 std::lock_guard
lk(trackedMembersMtx_);
785 if (
auto it = trackedMembers_.find(
memberUri);
it != trackedMembers_.end()) {
786 JAMI_WARNING(
"{} [device {}] Rotating tracked members after connection failure",
toString(), deviceId);
787 auto& info =
it->second;
788 info.failedDevices.insert(deviceId);
789 if (std::includes(info.failedDevices.begin(),
790 info.failedDevices.end(),
791 info.devices.begin(),
792 info.devices.end())) {
793 acc->presenceManager()->untrackBuddy(
it->first);
794 trackedMembers_.erase(
it);
801 auto members = repository_->members();
802 size_t N = members.size();
803 size_t K = std::min(N, 3 + (
size_t) std::log2(N));
810 JAMI_WARNING(
"{} Refreshing tracked members: {}/{} active ({}/{} devices)",
812 trackedMembers_.size(),
818 if (trackedMembers_.size() <
K) {
820 candidates.reserve(N - trackedMembers_.size());
821 for (
const auto&
m : members) {
822 if (
m.uri !=
memberUri && trackedMembers_.find(
m.uri) == trackedMembers_.end()) {
827 std::vector<std::string>
chosen;
830 std::back_inserter(
chosen),
831 K - trackedMembers_.size(),
833 for (
const auto& uri :
chosen) {
834 acc->presenceManager()->trackBuddy(uri);
844 rotateTrackedMembers(
memberUri, deviceId);
850 if (!swarmManager_->isConnected()) {
854 swarmManager_->onConnectionChanged([w](
bool ok) {
855 dht::ThreadPool::io().run([w,
ok] {
856 if (
auto sthis = w.lock()) {
863 if (sthis->pimpl_->swarmManager_->isConnected()) {
864 sthis->pimpl_->stopTracking();
866 sthis->pimpl_->startTracking(w);
869 if (
sthis->pimpl_->bootstrapCb_)
870 sthis->pimpl_->bootstrapCb_();
873 if (
sthis->pimpl_->bootstrapCbTest_)
875 ok ? BootstrapStatus::SUCCESS : BootstrapStatus::FAILED);
893 const auto nodes = swarmManager_->getAllNodes();
895 for (
const auto node : nodes)
896 if (peerUri == repository_->uriFromDevice(node.toString()))
898 swarmManager_->deleteNode(
toRemove);
901 for (
auto it = gitSocketList_.begin();
it != gitSocketList_.end();) {
902 if (peerUri == repository_->uriFromDevice(
it->first.toString()))
903 it = gitSocketList_.erase(
it);
909std::vector<std::map<std::string, std::string>>
912 std::vector<std::map<std::string, std::string>> result;
913 auto members = repository_->members();
914 std::lock_guard
lk(messageStatusMtx_);
915 for (
const auto&
member : members) {
924 auto it = messagesStatus_.find(
member.uri);
925 if (
it != messagesStatus_.end()) {
926 auto readIt =
it->second.find(
"read");
930 result.emplace_back(std::move(
mm));
935std::vector<std::map<std::string, std::string>>
938 std::vector<std::map<std::string, std::string>> result;
939 std::lock_guard
lk(trackedMembersMtx_);
940 for (
const auto& [uri,
member] : trackedMembers_) {
941 std::map<std::string, std::string> map;
950 result.emplace_back(std::move(map));
955std::vector<std::string>
959 std::vector<std::string>
commits {};
960 std::unique_lock
lk(writeMtx_);
961 std::unique_lock
lkA(activeCallsMtx_);
968 value[
"uri"] = userId_;
969 value[
"device"] = deviceId_;
971 value[
"type"] =
"application/call-history+json";
972 auto now = std::chrono::system_clock::now();
973 auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
975 auto itActive = std::find_if(activeCalls_.begin(),
978 return value.at(
"id") == confId && value.at(
"uri") == userId_
979 && value.at(
"device") == deviceId_;
987 hostedCalls_.clear();
993std::vector<libjami::SwarmMessage>
999 if (!repository_ ||
history->loading) {
1011 std::vector<std::string>
replies;
1012 std::vector<std::shared_ptr<libjami::SwarmMessage>>
msgList;
1015 [&](
const auto&
id,
const auto& author,
const auto& commit) {
1017 return CallbackResult::Skip;
1020 if (options.includeTo)
1021 breakLogging = true;
1026 if ((limitNbOfCommits
1027 && (loadedHistory_.messageList.size() - currentHistorySize) == options.nbOfCommits))
1028 return CallbackResult::Break;
1030 return CallbackResult::Break;
1031 if (id == options.to && !options.includeTo) {
1032 return CallbackResult::Break;
1042 if (
options.authorUri !=
"") {
1043 if (
options.authorUri == repository_->uriFromDevice(author.email)) {
1055 auto optMessage = repository_->convCommitToMap(cc);
1059 if (message.find(
"reply-to") != message.end()) {
1060 auto it = std::find(
replies.begin(),
replies.end(), message.at(
"reply-to"));
1062 replies.emplace_back(message.at(
"reply-to"));
1069 std::shared_ptr<libjami::SwarmMessage>
firstMsg;
1070 if ((
history == &loadedHistory_) &&
msgList.empty() && !loadedHistory_.messageList.empty()) {
1071 firstMsg = *loadedHistory_.messageList.rbegin();
1073 auto added = addToHistory(*
history, {message},
false,
false);
1080 [&](
auto,
auto,
auto) {
1095 std::vector<libjami::SwarmMessage>
ret;
1128 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit,
1137 std::string
toReplace = (
baseCommit->type ==
"application/data-transfer+json") ?
"tid" :
"body";
1149 return reaction.at(
"id") == editId;
1165 [&](
const auto&
reaction) { return reaction.at(
"id") == editId; });
1178 it->second->editions.emplace(
it->second->editions.begin(),
it->second->body);
1182 it->second->body[
"fileId"] =
"";
1186 it->second->reactions.clear();
1198 const std::shared_ptr<libjami::SwarmMessage>&
sharedCommit,
1203 if (!
history.messageList.empty())
1209 if (!
history.messageList.empty())
1223 if (
sharedCommit->type ==
"application/data-transfer+json") {
1230 for (
const auto& commit :
peditIt->second) {
1250 for (
const auto& [peer, value] : message->status) {
1253 parent->status[peer] = value;
1268std::vector<std::shared_ptr<libjami::SwarmMessage>>
1270 const std::vector<std::map<std::string, std::string>>&
commits,
1285 auto acc = account_.lock();
1288 auto username = acc->getUsername();
1297 std::vector<std::shared_ptr<libjami::SwarmMessage>>
sharedCommits;
1298 for (
const auto& commit :
commits) {
1302 auto typeIt = commit.find(
"type");
1304 if (
typeIt != commit.end() &&
typeIt->second ==
"merge")
1307 auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
1311 std::lock_guard
lk(messageStatusMtx_);
1314 if (
itFuture != futureStatus.end()) {
1327 std::lock_guard
lk(messageStatusMtx_);
1328 for (
const auto&
member : repository_->members()) {
1344 auto status = SENDING;
1385 memberToStatus[
member.uri] = status;
1390 std::vector<std::shared_ptr<libjami::SwarmMessage>>
messages;
1421 const std::string& conversationId)
1430 return pimpl_->repository_ ? pimpl_->repository_->id() :
"";
1442 JAMI_WARN(
"Unable to add new member in one to one conversation");
1447 }
catch (
const std::exception&
e) {
1448 JAMI_WARN(
"Unable to get mode: %s",
e.what());
1458 if (pimpl_->isAdmin()) {
1460 if (
auto sthis = w.lock()) {
1461 auto members = sthis->pimpl_->repository_->members();
1462 auto type = sthis->pimpl_->bannedType(contactUri);
1478 if (
auto sthis = w.lock()) {
1480 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1481 auto commit = sthis->pimpl_->repository_->addMember(contactUri);
1482 if (not commit.empty())
1483 sthis->pimpl_->announce(commit, true);
1486 cb(!commit.empty(), commit);
1491std::shared_ptr<dhtnet::ChannelSocket>
1494 return pimpl_->gitSocket(deviceId);
1500 pimpl_->addGitSocket(deviceId, socket);
1506 pimpl_->removeGitSocket(deviceId);
1512 pimpl_->gitSocketList_.clear();
1513 if (pimpl_->swarmManager_)
1514 pimpl_->swarmManager_->shutdown();
1520 if (pimpl_->swarmManager_)
1521 pimpl_->swarmManager_->maintainBuckets();
1524std::vector<jami::DeviceId>
1527 return pimpl_->swarmManager_->getAllNodes();
1530std::shared_ptr<Typers>
1533 return pimpl_->typers_;
1536std::vector<std::map<std::string, std::string>>
1539 return pimpl_->getConnectivity();
1545 if (!pimpl_->swarmManager_)
1547 return pimpl_->swarmManager_->isConnectedWith(
DeviceId(deviceId));
1570 std::vector<std::string>
commits;
1583 cb(!lastId.empty(), lastId);
1589 dht::ThreadPool::io().run(
1591 if (
auto sthis = w.lock()) {
1593 if (!sthis->pimpl_->isAdmin()) {
1594 JAMI_WARN(
"You're not an admin of this repo. Unable to block %s", contactUri.c_str());
1604 auto members =
sthis->pimpl_->repository_->members();
1605 for (
const auto&
member : members) {
1624 std::unique_lock
lk(
sthis->pimpl_->writeMtx_);
1633 std::vector<std::string>
commits;
1647 cb(!lastId.empty(), lastId);
1652std::vector<std::map<std::string, std::string>>
1658std::vector<std::map<std::string, std::string>>
1661 return pimpl_->getTrackedMembers();
1664std::set<std::string>
1673 auto s = pimpl_->swarmManager_->getConnectedNodes();
1674 for (
const auto& [deviceId,
_] : pimpl_->gitSocketList_)
1675 if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
1676 s.emplace_back(deviceId);
1683 return pimpl_->swarmManager_->isConnected();
1689 return pimpl_->repository_->uriFromDevice(deviceId);
1695 pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
1701 return pimpl_->repository_->join();
1729 return !pimpl_->bannedType(uri).empty();
1736 if (!pimpl_->repository_->hasCommit(
replyTo)) {
1742 dht::ThreadPool::io().run(
1744 if (
auto sthis = w.lock()) {
1745 std::unique_lock lk(sthis->pimpl_->writeMtx_);
1746 auto commit = sthis->pimpl_->repository_->commitMessage(json::toString(value));
1750 sthis->pimpl_->announce(commit, true);
1752 cb(!commit.empty(), commit);
1760 return pimpl_->repository_->hasCommit(
commitId);
1763std::optional<std::map<std::string, std::string>>
1766 auto commit = pimpl_->repository_->getCommit(
commitId);
1767 if (commit == std::nullopt)
1768 return std::nullopt;
1769 return pimpl_->repository_->convCommitToMap(*commit);
1777 dht::ThreadPool::io().run([w = weak(),
cb = std::move(
cb),
options] {
1778 if (
auto sthis = w.lock()) {
1779 std::unique_lock lk(sthis->pimpl_->loadedHistory_.mutex);
1780 auto result = sthis->pimpl_->loadMessages(options);
1782 cb(std::move(result));
1790 std::lock_guard
lk(pimpl_->loadedHistory_.mutex);
1791 pimpl_->loadedHistory_.messageList.clear();
1792 pimpl_->loadedHistory_.quickAccess.clear();
1793 pimpl_->loadedHistory_.pendingEditions.clear();
1794 pimpl_->loadedHistory_.pendingReactions.clear();
1796 std::lock_guard
lk(pimpl_->messageStatusMtx_);
1797 pimpl_->memberToStatus.clear();
1805 std::lock_guard
lk(pimpl_->loadedHistory_.mutex);
1806 if (!pimpl_->loadedHistory_.messageList.empty())
1807 return (*pimpl_->loadedHistory_.messageList.begin())->id;
1813 std::scoped_lock lock(pimpl_->writeMtx_,
optHistory.mutex);
1817 return (*
optHistory.messageList.begin())->id;
1823 std::lock_guard
lk(pimpl_->pullcbsMtx_);
1825 std::deque<std::pair<std::string, OnPullCb>>());
1828 return std::get<0>(elem) == commitId;
1831 JAMI_DEBUG(
"{} Ignoring request to pull from {:s} with commit {:s}: pull already in progress",
1841 dht::ThreadPool::io().run([w = weak(), deviceId] {
1842 if (
auto sthis_ = w.lock())
1843 sthis_->pimpl_->pull(deviceId);
1851 auto&
repo = repository_;
1857 std::lock_guard
lk(pullcbsMtx_);
1858 auto it = fetchingRemotes_.find(deviceId);
1859 if (
it == fetchingRemotes_.end()) {
1860 JAMI_ERROR(
"Could not find device {:s} in fetchingRemotes", deviceId);
1865 fetchingRemotes_.erase(
it);
1870 cb = std::move(std::get<1>(
elem));
1887 std::unique_lock
lk(writeMtx_);
1889 [
this](
const std::string& peerUri) { this->disconnectFromPeer(peerUri); });
1900 for (
const auto& commit :
commits) {
1910 JAMI_WARNING(
"Successfully fetched from device {} but didn't receive expected commit {}",
1925 auto diffStats =
repo->diffStats(
"HEAD",
oldHead);
1926 auto changedFiles =
repo->changedFiles(diffStats);
1927 if (find(changedFiles.begin(), changedFiles.end(),
"profile.vcf") != changedFiles.end()) {
1941 auto sthis = w.lock();
1943 for (
const auto&
wr :
sthis->dataTransfer()->waitingRequests()) {
1949std::map<std::string, std::string>
1955 for (
const auto& [
k, v] :
infos()) {
1956 if (v.size() >= 64000) {
1957 JAMI_WARNING(
"Cutting invite because the SIP message will be too long");
1970 std::lock_guard
lk(pimpl_->writeMtx_);
1971 return pimpl_->repository_->leave();
1977 pimpl_->isRemoving_ =
true;
1983 return pimpl_->isRemoving_;
1989 if (pimpl_->conversationDataPath_ !=
"")
1990 dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_,
true);
1991 if (!pimpl_->repository_)
1993 std::lock_guard
lk(pimpl_->writeMtx_);
1994 pimpl_->repository_->erase();
2000 return pimpl_->repository_->mode();
2003std::vector<std::string>
2006 return pimpl_->repository_->getInitialMembers();
2013 return std::find(members.begin(), members.end(), uri) != members.end();
2019 dht::ThreadPool::io().run([w = weak(), map = std::move(map),
cb = std::move(
cb)] {
2020 if (
auto sthis = w.lock()) {
2021 auto& repo = sthis->pimpl_->repository_;
2022 std::unique_lock lk(sthis->pimpl_->writeMtx_);
2023 auto commit = repo->updateInfos(map);
2024 sthis->pimpl_->announce(commit, true);
2027 cb(!commit.empty(), commit);
2028 emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(sthis->pimpl_->accountId_,
2035std::map<std::string, std::string>
2038 return pimpl_->repository_->infos();
2044 const auto&
filePath = pimpl_->preferencesPath_;
2049 if (std::filesystem::is_regular_file(
filePath,
ec)) {
2061 std::ofstream
file(
filePath, std::ios::trunc | std::ios::binary);
2066std::map<std::string, std::string>
2071 const auto&
filePath = pimpl_->preferencesPath_;
2073 msgpack::object_handle
oh = msgpack::unpack((
const char*)
file.data(),
file.size());
2078 }
catch (
const std::exception&
e) {
2093std::shared_ptr<TransferManager>
2096 return pimpl_->transferManager_;
2101 const std::string& fileId,
2102 std::filesystem::path& path,
2103 std::string& sha3sum)
const
2108 auto sep = fileId.find(
'_');
2109 if (
sep == std::string::npos)
2112 auto interactionId = fileId.substr(0,
sep);
2114 if (commit == std::nullopt || commit->find(
"type") == commit->end() || commit->find(
"tid") == commit->end()
2115 || commit->find(
"sha3sum") == commit->end() || commit->at(
"type") !=
"application/data-transfer+json") {
2116 JAMI_WARNING(
"[Account {:s}] {} requested invalid file transfer commit {}",
2124 sha3sum = commit->at(
"sha3sum");
2131 const std::string& fileId,
2132 const std::string& path,
2134 const std::string& deviceId)
2137 if (commit == std::nullopt || commit->at(
"type") !=
"application/data-transfer+json") {
2138 JAMI_ERROR(
"Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId,
id());
2141 auto tid = commit->find(
"tid");
2142 auto sha3sum = commit->find(
"sha3sum");
2143 auto size_str = commit->find(
"totalSize");
2145 if (
tid == commit->end() || sha3sum == commit->end() ||
size_str == commit->end()) {
2146 JAMI_ERROR(
"Invalid file transfer commit (missing tid, size or sha3)");
2151 if (totalSize < 0) {
2152 JAMI_ERROR(
"Invalid file size {}", totalSize);
2157 dht::ThreadPool().io().run(
2158 [w = weak(), deviceId, fileId, interactionId, sha3sum = sha3sum->second, path, totalSize] {
2159 if (auto shared = w.lock()) {
2160 std::filesystem::path filePath(path);
2161 if (filePath.empty()) {
2162 filePath = shared->dataTransfer()->path(fileId);
2166 if (std::filesystem::file_size(filePath, ec) == static_cast<size_t>(totalSize)) {
2167 if (fileutils::sha3File(filePath) == sha3sum) {
2168 JAMI_WARNING(
"Ignoring request to download existing file: {}", filePath);
2173 std::filesystem::path tempFilePath(filePath);
2174 tempFilePath +=
".tmp";
2175 auto start = std::filesystem::file_size(tempFilePath, ec);
2176 if (ec || start == static_cast<decltype(start)>(-1)) {
2181 auto acc = shared->pimpl_->account_.lock();
2184 shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
2185 acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
2194 dht::ThreadPool::io().run([w = weak(), deviceId,
commitId]() {
2195 auto sthis = w.lock();
2199 auto uri =
sthis->uriFromDevice(deviceId);
2200 if (uri.empty() || uri ==
sthis->pimpl_->userId_)
2203 sthis->pimpl_->updateStatus(uri,
2206 std::to_string(std::time(
nullptr)),
2215 const std::string&
ts,
2221 std::map<std::string, std::map<std::string, std::string>>
newStatus;
2224 std::lock_guard
lk(messageStatusMtx_);
2225 auto& status = messagesStatus_[uri];
2235 newStatus[uri].insert(status.begin(), status.end());
2237 if (emit && messageStatusCb_) {
2241 options.logIfNotFound =
false;
2246 std::unique_lock
mlk(messageStatusMtx_);
2248 if (
res.size() == 0) {
2253 auto message = loadedHistory_.quickAccess.find(
cid);
2254 if (message != loadedHistory_.quickAccess.end()) {
2256 if (
static_cast<int32_t>(
st) > message->second->status[uri]) {
2257 message->second->status[uri] =
static_cast<int32_t>(
st);
2273 static_cast<int>(status));
2279 std::lock_guard
lk(pimpl_->messageStatusMtx_);
2280 if (pimpl_->messagesStatus_[uri][
"read"] == interactionId)
2282 dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
2283 auto sthis = w.lock();
2286 sthis->pimpl_->updateStatus(uri,
2289 std::to_string(std::time(
nullptr)),
2295std::map<std::string, std::map<std::string, std::string>>
2298 std::lock_guard
lk(pimpl_->messageStatusMtx_);
2299 return pimpl_->messagesStatus_;
2305 std::unique_lock
lk(pimpl_->messageStatusMtx_);
2306 std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>>
stVec;
2308 auto&
oldMs = pimpl_->messagesStatus_[uri];
2309 if (status.find(
"fetched_ts") != status.end() && status.at(
"fetched") !=
oldMs[
"fetched"]) {
2310 if (
oldMs[
"fetched_ts"].empty() || std::stol(
oldMs[
"fetched_ts"]) <= std::stol(status.at(
"fetched_ts"))) {
2313 status.at(
"fetched"),
2314 status.at(
"fetched_ts"));
2317 if (status.find(
"read_ts") != status.end() && status.at(
"read") !=
oldMs[
"read"]) {
2318 if (
oldMs[
"read_ts"].empty() || std::stol(
oldMs[
"read_ts"]) <= std::stol(status.at(
"read_ts"))) {
2322 status.at(
"read_ts"));
2329 pimpl_->updateStatus(uri, status,
commitId,
ts);
2335 const std::function<
void(
const std::map<std::string, std::map<std::string, std::string>>&)>&
cb)
2337 std::unique_lock
lk(pimpl_->messageStatusMtx_);
2338 pimpl_->messageStatusCb_ =
cb;
2343Conversation::onBootstrapStatus(
const std::function<
void(std::string,
BootstrapStatus)>&
cb)
2345 std::lock_guard lock(pimpl_->bootstrapMtx_);
2346 pimpl_->bootstrapCbTest_ =
cb;
2349std::vector<libjami::SwarmMessage>
2350Conversation::loadMessagesSync(
const LogOptions&
options)
2352 std::lock_guard
lk(pimpl_->loadedHistory_.mutex);
2353 auto result = pimpl_->loadMessages(
options);
2358Conversation::announce(
const std::vector<std::map<std::string, std::string>>&
commits,
bool commitFromSelf)
2373 std::lock_guard lock(pimpl_->bootstrapMtx_);
2374 if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
2380 JAMI_DEBUG(
"{} Bootstrap with {} device(s)", pimpl_->toString(), devices.size());
2382 if (!devices.empty()) {
2383 pimpl_->swarmManager_->setKnownNodes(devices);
2390 if (pimpl_->swarmManager_->isShutdown()) {
2391 pimpl_->swarmManager_->restart();
2392 pimpl_->swarmManager_->maintainBuckets();
2399 if (devices.empty())
2403 std::lock_guard
lk(pimpl_->trackedMembersMtx_);
2404 auto it = pimpl_->trackedMembers_.find(
memberUri);
2405 if (
it != pimpl_->trackedMembers_.end()) {
2406 it->second.devices.insert(devices.begin(), devices.end());
2409 JAMI_ERROR(
"{} Adding {} known devices without member URI", pimpl_->toString(), devices.size());
2411 pimpl_->swarmManager_->setKnownNodes(devices);
2414std::vector<std::string>
2417 pimpl_->loadActiveCalls();
2418 pimpl_->loadHostedCalls();
2419 auto commits = pimpl_->commitsEndedCalls();
2422 dht::ThreadPool::io().run([w = weak(),
commits] {
2423 if (
auto sthis = w.lock())
2433 pimpl_->onMembersChanged_ = std::move(
cb);
2439 pimpl_->swarmManager_->needSocketCb_ = [
needSocket = std::move(
needSocket), w = weak()](
const std::string& deviceId,
2441 if (
auto sthis = w.lock()) {
2442 auto wrappedCb = [
cb = std::move(
cb), w, deviceId](
const std::shared_ptr<dhtnet::ChannelSocket>& socket) {
2443 if (
auto sthis = w.lock()) {
2445 if (
auto acc =
sthis->pimpl_->account_.lock()) {
2446 if (
auto cert = acc->certStore().getCertificate(deviceId)) {
2447 sthis->pimpl_->onConnectionFailed(
DeviceId(deviceId),
cert->issuer->getId().toString());
2449 JAMI_WARNING(
"{} Unable to get member URI from device ID {}",
2450 sthis->pimpl_->toString(),
2468 auto deviceId = channel->deviceId();
2473 auto cert = channel->peerCertificate();
2476 auto member =
cert->issuer->getId().toString();
2477 pimpl_->swarmManager_->addChannel(std::move(channel));
2479 auto sthis = w.lock();
2481 account->sendProfile(sthis->id(), member, deviceId.toString());
2492 options.authorUri = authorUri;
2493 options.logIfNotFound =
false;
2506 dht::ThreadPool::io().run([w = weak(), req,
filter,
flag] {
2507 if (
auto sthis = w.lock()) {
2509 std::vector<std::map<std::string, std::string>> commits {};
2511 auto re = std::regex(
filter.regexSearch,
2512 filter.caseSensitive ? std::regex_constants::ECMAScript : std::regex_constants::icase);
2513 sthis->pimpl_->repository_->log(
2536 if (
auto optMessage =
sthis->pimpl_->repository_->convCommitToMap(cc))
2547 for (
auto& message :
history.messageList) {
2556 auto body =
contentType ==
"text/plain" ? message->body.at(
"body")
2557 : message->body.at(
"displayName");
2560 auto commit = message->body;
2561 commit[
"id"] = message->id;
2562 commit[
"type"] = message->type;
2567 commits.emplace_back(message->body);
2577 sthis->pimpl_->accountId_,
2581 if ((*
flag)-- == 1 ) {
2583 req,
sthis->pimpl_->accountId_, std::string {}, std::vector<std::map<std::string, std::string>> {});
2592 if (!message.isMember(
"confId")) {
2593 JAMI_ERROR(
"{}Malformed commit: no confId", pimpl_->toString());
2597 auto now = std::chrono::system_clock::now();
2598 auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
2600 std::lock_guard
lk(pimpl_->activeCallsMtx_);
2601 pimpl_->hostedCalls_[message[
"confId"].asString()] =
nowSecs;
2602 pimpl_->saveHostedCalls();
2611 auto info =
infos();
2612 if (info[
"rdvDevice"] == pimpl_->deviceId_ && info[
"rdvHost"] == pimpl_->userId_)
2614 std::lock_guard
lk(pimpl_->activeCallsMtx_);
2615 return pimpl_->hostedCalls_.find(
confId) != pimpl_->hostedCalls_.end();
2621 if (!message.isMember(
"confId")) {
2622 JAMI_ERROR(
"{}Malformed commit: no confId", pimpl_->toString());
2626 auto erased =
false;
2628 std::lock_guard
lk(pimpl_->activeCallsMtx_);
2629 erased = pimpl_->hostedCalls_.erase(message[
"confId"].asString());
2632 pimpl_->saveHostedCalls();
2638std::vector<std::map<std::string, std::string>>
2641 std::lock_guard
lk(pimpl_->activeCallsMtx_);
2642 return pimpl_->activeCalls_;
This class gives access to the git repository that represents the conversation.
std::vector< std::map< std::string, std::string > > getTrackedMembers() const
std::map< std::string, int32_t > memberToStatus
Status: 0 = commited, 1 = fetched, 2 = read This cache the curent status to add in the messages.
uint64_t presenceDeviceListenerToken_
const std::string userId_
GitSocketList gitSocketList_
const std::filesystem::path conversationDataPath_
const std::weak_ptr< JamiAccount > account_
void addGitSocket(const DeviceId &deviceId, const std::shared_ptr< dhtnet::ChannelSocket > &socket)
void onConnectionFailed(const DeviceId &deviceId, const std::string &memberUri="")
void updateStatus(const std::string &uri, libjami::Account::MessageStates status, const std::string &commitId, const std::string &ts, bool emit=false)
void rotateTrackedMembers(const std::string &memberUri="", const DeviceId &deviceId={})
Impl(const std::shared_ptr< JamiAccount > &account, const std::string &remoteDevice, const std::string &conversationId)
std::vector< std::string > commitsEndedCalls()
If, for whatever reason, the daemon is stopped while hosting a conference, we need to announce the en...
std::shared_ptr< Typers > typers_
std::shared_ptr< dhtnet::ChannelSocket > gitSocket(const DeviceId &deviceId) const
void removeGitSocket(const DeviceId &deviceId)
const std::filesystem::path preferencesPath_
const std::unique_ptr< ConversationRepository > repository_
void disconnectFromPeer(const std::string &peerUri)
Remove all git sockets and all DRT nodes associated with the given peer.
const std::filesystem::path fetchedPath_
History loadedHistory_
Loaded history represents the linearized history to show for clients.
const std::filesystem::path sendingPath_
void announce(const std::vector< std::string > &commits, bool commitFromSelf=false)
std::map< std::string, std::map< std::string, std::string > > messagesStatus_
std::mutex activeCallsMtx_
const std::string deviceId_
std::vector< libjami::SwarmMessage > loadMessages(const LogOptions &options, History *optHistory=nullptr)
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
std::filesystem::path activeCallsPath_
std::map< std::string, TrackedMember > trackedMembers_
const std::filesystem::path statusPath_
const std::shared_ptr< asio::io_context > ioContext_
void updateActiveCalls(const std::map< std::string, std::string > &commit, bool eraseOnly=false, bool emitSig=true) const
Update activeCalls_ via announced commits (in load or via new commits)
void handleReaction(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit) const
std::vector< std::shared_ptr< libjami::SwarmMessage > > addToHistory(History &history, const std::vector< std::map< std::string, std::string > > &commits, bool messageReceived=false, bool commitFromSelf=false)
Impl(const std::shared_ptr< JamiAccount > &account, const std::string &conversationId)
void voteUnban(const std::string &contactUri, const std::string_view type, const OnDoneCb &cb)
void announce(const std::string &commitId, bool commitFromSelf=false)
void saveHostedCalls() const
const std::string accountId_
void initActiveCalls(const std::vector< std::map< std::string, std::string > > &commits) const
Initialize activeCalls_ from the list of commits in the repository.
std::atomic_bool isRemoving_
Impl(const std::shared_ptr< JamiAccount > &account, ConversationMode mode, const std::string &otherMember="")
OnMembersChanged onMembersChanged_
std::mutex trackedMembersMtx_
std::map< std::string, std::deque< std::pair< std::string, OnPullCb > > > fetchingRemotes_
std::vector< std::map< std::string, std::string > > activeCalls_
const std::shared_ptr< TransferManager > transferManager_
void pull(const std::string &deviceId)
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited, bool includeLeft) const
void monitorConnection(std::weak_ptr< Conversation > w)
std::function< void(const std::map< std::string, std::map< std::string, std::string > > &)> messageStatusCb_
void announce(const std::vector< std::map< std::string, std::string > > &commits, bool commitFromSelf=false)
std::string_view bannedType(const std::string &uri) const
void handleEdition(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit, bool messageReceived) const
void saveActiveCalls() const
void setupMemberCallback()
void loadHostedCalls() const
std::filesystem::path hostedCallsPath_
std::string toString() const
void startTracking(std::weak_ptr< Conversation > w)
const std::filesystem::path repoPath_
std::map< std::string, std::map< std::string, int32_t > > futureStatus
void loadActiveCalls() const
std::mutex messageStatusMtx_
{uri, { {"fetch", "commitId"}, {"fetched_ts", "timestamp"}, {"read", "commitId"}, {"read_ts",...
std::function< void()> bootstrapCb_
std::vector< std::map< std::string, std::string > > getConnectivity() const
std::map< std::string, uint64_t > hostedCalls_
void rectifyStatus(const std::shared_ptr< libjami::SwarmMessage > &message, History &history) const
bool handleMessage(History &history, const std::shared_ptr< libjami::SwarmMessage > &sharedCommit, bool messageReceived) const
const std::shared_ptr< SwarmManager > swarmManager_
bool pull(const std::string &deviceId, OnPullCb &&cb, std::string commitId="")
Fetch and merge from peer.
std::set< std::string > memberUris(std::string_view filter={}, const std::set< MemberRole > &filteredRoles={MemberRole::INVITED, MemberRole::LEFT, MemberRole::BANNED}) const
std::shared_ptr< Typers > typers() const
Get Typers object.
bool downloadFile(const std::string &interactionId, const std::string &fileId, const std::string &path, const std::string &member="", const std::string &deviceId="")
Adds a file to the waiting list and ask members.
void addGitSocket(const DeviceId &deviceId, const std::shared_ptr< dhtnet::ChannelSocket > &socket)
std::string leave()
Leave a conversation.
std::string id() const
Get conversation's id.
void clearCache()
Clear all cached messages.
std::vector< std::string > commitsEndedCalls()
Refresh active calls.
std::map< std::string, std::map< std::string, std::string > > messageStatus() const
Retrieve last displayed and fetch status per member.
bool isMember(const std::string &uri, bool includeInvited=false) const
Test if an URI is a member.
void search(uint32_t req, const Filter &filter, const std::shared_ptr< std::atomic_int > &flag) const
Search in the conversation via a filter.
std::vector< std::map< std::string, std::string > > getConnectivity() const
Get connectivity information for the conversation.
std::vector< uint8_t > vCard() const
std::vector< std::string > getInitialMembers() const
One to one util, get initial members.
void connectivityChanged()
If we change from one network to one another, we will need to update the state of the connections.
std::shared_ptr< TransferManager > dataTransfer() const
Access to transfer manager.
void removeGitSocket(const DeviceId &deviceId)
std::vector< std::map< std::string, std::string > > currentCalls() const
Return current detected calls.
void onMembersChanged(OnMembersChanged &&cb)
std::map< std::string, std::string > infos() const
Retrieve current infos (title, description, avatar, mode)
void monitor()
Print the state of the DRT linked to the conversation.
std::vector< NodeId > peersToSyncWith() const
Get peers to sync with.
void sendMessage(Json::Value &&message, const std::string &replyTo="", OnCommitCb &&onCommit={}, OnDoneCb &&cb={})
void bootstrap(std::function< void()> onBootstrapped, const std::vector< DeviceId > &knownDevices={})
Bootstrap swarm manager to other peers.
void removeActiveConference(Json::Value &&message, OnDoneCb &&cb={})
Announce the end of a call.
bool isInitialMember(const std::string &uri) const
ConversationMode mode() const
Get conversation's mode.
std::string join()
Join a conversation.
void addSwarmChannel(std::shared_ptr< dhtnet::ChannelSocket > channel)
Add swarm connection to the DRT.
std::vector< jami::DeviceId > getDeviceIdList() const
bool setMessageDisplayed(const std::string &uri, const std::string &interactionId)
Store last read commit (returned in getMembers)
void updateInfos(const std::map< std::string, std::string > &map, const OnDoneCb &cb={})
Change repository's infos.
void updateMessageStatus(const std::map< std::string, std::map< std::string, std::string > > &messageStatus)
Update fetch/read status.
void onMessageStatusChanged(const std::function< void(const std::map< std::string, std::map< std::string, std::string > > &)> &cb)
void shutdownConnections()
Stop SwarmManager, bootstrap and gitSockets.
void updatePreferences(const std::map< std::string, std::string > &map)
Change user's preferences.
std::vector< std::map< std::string, std::string > > getMembers(bool includeInvited=false, bool includeLeft=false, bool includeBanned=false) const
std::map< std::string, std::string > preferences(bool includeLastModified) const
Retrieve current preferences (color, notification, etc)
bool hasCommit(const std::string &commitId) const
Check if a commit exists in the repository.
void sync(const std::string &member, const std::string &deviceId, OnPullCb &&cb, std::string commitId="")
Fetch new commits and re-ask for waiting files.
std::shared_ptr< dhtnet::ChannelSocket > gitSocket(const DeviceId &deviceId) const
Git operations will need a ChannelSocket for cloning/fetching commits Because libgit2 is a C library,...
void onNeedSocket(NeedSocketCb cb)
Set the callback that will be called whenever a new socket will be needed.
std::string uriFromDevice(const std::string &deviceId) const
Retrieve the uri from a deviceId.
std::optional< std::map< std::string, std::string > > getCommit(const std::string &commitId) const
Retrieve one commit.
void erase()
Erase all related datas.
void setRemovingFlag()
Set a conversation as removing (when loading convInfo and still not sync)
void hasFetched(const std::string &deviceId, const std::string &commitId)
Store information about who fetch or not.
bool isHosting(const std::string &confId) const
Check if we're currently hosting this conference.
std::string lastCommitId() const
Get last commit id.
bool isBanned(const std::string &uri) const
void loadMessages(const OnLoadMessages &cb, const LogOptions &options)
Get a range of messages.
Conversation(const std::shared_ptr< JamiAccount > &account, ConversationMode mode, const std::string &otherMember="")
bool onFileChannelRequest(const std::string &member, const std::string &fileId, std::filesystem::path &path, std::string &sha3sum) const
Choose if we can accept channel request.
void addMember(const std::string &contactUri, const OnDoneCb &cb={})
Add conversation member.
bool hasSwarmChannel(const std::string &deviceId)
Used to avoid multiple connections, we just check if we got a swarm channel with a specific device.
void addKnownDevices(const std::vector< DeviceId > &devices, const std::string &memberUri)
Add known devices to the swarm manager.
bool isBootstrapped() const
Check if we're at least connected to one node.
bool isRemoving()
Check if we are removing the conversation.
void removeMember(const std::string &contactUri, bool isDevice, const OnDoneCb &cb={})
void hostConference(Json::Value &&message, OnDoneCb &&cb={})
Host a conference in the conversation.
std::vector< std::map< std::string, std::string > > getTrackedMembers() const
uint32_t countInteractions(const std::string &toId, const std::string &fromId="", const std::string &authorUri="") const
Retrieve how many interactions there is from HEAD to interactionId.
std::map< std::string, std::string > generateInvitation() const
Generate an invitation to send to new contacts.
static LIBJAMI_TEST_EXPORT Manager & instance()
std::mt19937_64 getSeededRandomEngine()
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_WARNING(formatstr,...)
static constexpr std::string_view SENDING
static constexpr std::string_view ACTIVE_CALLS
static constexpr std::string_view HOSTED_CALLS
static constexpr std::string_view PREFERENCES
static constexpr std::string_view FETCHED
static constexpr std::string_view STATUS
static constexpr const char * CREATED
static constexpr const char * REMOVED
static constexpr const char * ID
static constexpr const char * RECEIVED
static constexpr const char * METADATAS
static constexpr const char * MEMBERS
static constexpr const char * DECLINED
static constexpr const char * ERASED
static constexpr const char * FROM
static constexpr const char * CONVERSATIONID
static constexpr const char * LAST_DISPLAYED
static const std::filesystem::path DEVICES
static const std::filesystem::path INVITED
static const std::filesystem::path BANNED
static const std::filesystem::path ADMINS
static const std::filesystem::path MEMBERS
const std::filesystem::path & get_data_dir()
std::vector< uint8_t > loadFile(const std::filesystem::path &path, const std::filesystem::path &default_dir)
Read the full content of a file at path.
uint64_t lastWriteTimeInSeconds(const std::filesystem::path &filePath)
Return the last write time (epoch time) of a given file path (in seconds).
std::filesystem::path getFullPath(const std::filesystem::path &base, const std::filesystem::path &path)
If path is relative, it is appended to base.
std::string toString(const Json::Value &jsonVal)
std::function< void(const std::string &, const std::string &, ChannelCb &&, const std::string &)> NeedSocketCb
static constexpr std::string_view toString(AuthDecodingState state)
void emitSignal(Args... args)
std::function< void(std::vector< libjami::SwarmMessage > &&messages)> OnLoadMessages
std::unique_ptr< git_commit, GitCommitDeleter > GitCommit
std::function< void(bool, const std::string &)> OnDoneCb
std::list< std::shared_ptr< libjami::SwarmMessage > > MessageList
std::function< bool(const std::shared_ptr< dhtnet::ChannelSocket > &)> ChannelCb
std::function< void(bool fetchOk)> OnPullCb
std::map< DeviceId, std::shared_ptr< dhtnet::ChannelSocket > > GitSocketList
std::function< void(const std::string &)> OnCommitCb
static const char *const LAST_MODIFIED
std::function< void(const std::set< std::string > &)> OnMembersChanged
bool regex_search(string_view sv, svmatch &m, const regex &e, regex_constants::match_flag_type flags=regex_constants::match_default)
std::set< std::string > members
std::string lastDisplayed
Json::Value toJson() const
ConversationRequest()=default
std::map< std::string, std::string > metadatas
Json::Value toJson() const
std::string conversationId
std::map< std::string, std::string > toMap() const
std::set< DeviceId > failedDevices
std::set< DeviceId > devices
std::map< std::string, std::shared_ptr< libjami::SwarmMessage > > quickAccess
std::map< std::string, std::list< std::map< std::string, std::string > > > pendingReactions
std::map< std::string, std::list< std::shared_ptr< libjami::SwarmMessage > > > pendingEditions
std::condition_variable cv