23#include <opendht/thread_pool.h>
47 std::map<std::string, std::map<std::string, std::string>>
status {};
49 std::shared_ptr<dhtnet::ChannelSocket>
socket {};
60 std::unique_ptr<PendingConversationFetch>
pending;
84 if (
pending->connectingTo.find(deviceId) !=
pending->connectingTo.end())
87 pending = std::make_unique<PendingConversationFetch>();
88 pending->connectingTo.insert(deviceId);
99 pending->connectingTo.erase(deviceId);
100 if (
pending->connectingTo.empty())
111 std::vector<std::map<std::string, std::string>> result;
114 result.emplace_back(std::map<std::string, std::string> {{
"uri", uri}});
124 std::shared_ptr<AccountManager>&& accountManager,
131 template<
typename S,
typename T>
135 std::lock_guard
lk(
conv->mtx);
140 return decltype(
cb(std::declval<SyncedConversation&>()))();
142 template<
typename S,
typename T>
146 std::lock_guard
lk(
conv->mtx);
147 if (
conv->conversation)
148 return cb(*
conv->conversation);
152 return decltype(
cb(std::declval<Conversation&>()))();
162 const std::string& peer,
163 const std::string&
convId);
165 const std::string& peer,
166 const std::shared_ptr<SyncedConversation>&
conv);
176 const std::string& deviceId,
177 const std::string& conversationId,
185 std::optional<ConversationRequest>
getRequest(
const std::string&
id)
const;
195 const std::string& conversationId,
bool includeBanned =
false)
const;
223 const std::string& deviceId =
"");
227 const std::string& deviceId =
"");
269 c = std::make_shared<SyncedConversation>(
convId);
277 c = std::make_shared<SyncedConversation>(info);
283 std::vector<std::shared_ptr<SyncedConversation>> result;
286 result.emplace_back(c);
292 std::vector<std::shared_ptr<Conversation>> result;
295 if (
auto c =
sc->conversation)
296 result.emplace_back(std::move(c));
302 void sendMessage(
const std::string& conversationId,
304 const std::string&
replyTo =
"",
305 bool announce =
true,
309 void sendMessage(
const std::string& conversationId,
311 const std::string&
replyTo =
"",
312 const std::string& type =
"text/plain",
313 bool announce =
true,
317 void editMessage(
const std::string& conversationId,
362 md =
it->second.metadatas;
363 md[
"syncing"] =
"true";
364 md[
"created"] = std::to_string(
it->second.received);
389 std::map<std::string, std::shared_ptr<SyncedConversation>, std::less<>>
conversations_;
406 std::map<std::string, std::vector<std::map<std::string, std::string>>>
replay_;
415 std::shared_ptr<JamiAccount>
account,
416 const std::vector<std::tuple<std::string, std::string, std::string>>&
updateContactConv,
417 const std::set<std::string>&
toRm);
420 const std::string& deviceId,
423 void fallbackClone(
const asio::error_code&
ec,
const std::string& conversationId);
425 const std::string& uri,
433 std::ofstream
file(path /
"syncingMetadatas", std::ios::trunc | std::ios::binary);
442 std::lock_guard lock(dhtnet::fileutils::getFileLock(path /
"syncingMetadatas"));
445 msgpack::unpacked result;
446 msgpack::unpack(result, (
const char*)
file.data(),
file.size(), 0);
448 }
catch (
const std::exception&
e) {
449 JAMI_WARNING(
"[Account {}] [ConversationModule] unable to load syncing metadata: {}",
457 std::shared_ptr<AccountManager>&& accountManager,
464 , accountManager_(accountManager)
465 , accountId_(
account->getAccountID())
468 , onNeedSocket_(onNeedSocket)
473 if (
const auto* info =
accm->getInfo()) {
483 const std::string& peerUri,
484 const std::string&
convId)
486 JAMI_DEBUG(
"[Account {}] [Conversation {}] [device {}] Cloning conversation", accountId_,
convId, deviceId);
489 std::unique_lock
lk(
conv->mtx);
490 cloneConversation(deviceId, peerUri,
conv);
495 const std::string& peerUri,
496 const std::shared_ptr<SyncedConversation>&
conv)
499 if (!
conv->conversation) {
505 if (!
conv->startFetch(deviceId,
true)) {
506 JAMI_WARNING(
"[Account {}] [Conversation {}] [device {}] Already fetching conversation", accountId_,
conv->info.id, deviceId);
513 [w = weak(),
conv, deviceId](
const auto& channel) {
514 std::lock_guard
lk(
conv->mtx);
515 if (
conv->pending && !
conv->pending->ready) {
517 conv->pending->ready =
true;
518 conv->pending->deviceId = channel->deviceId().toString();
519 conv->pending->socket = channel;
520 if (!
conv->pending->cloning) {
521 conv->pending->cloning =
true;
522 dht::ThreadPool::io().run(
523 [w,
convId =
conv->info.id, deviceId =
conv->pending->deviceId]() {
524 if (auto sthis = w.lock())
525 sthis->handlePendingConversation(convId, deviceId);
530 conv->stopFetch(deviceId);
537 JAMI_LOG(
"[Account {}] [Conversation {}] [device {}] Requesting device",
541 conv->info.members.emplace(username_);
542 conv->info.members.emplace(peerUri);
545 JAMI_DEBUG(
"[Account {}] [Conversation {}] Conversation already cloned", accountId_,
conv->info.id);
551 const std::string& deviceId,
552 const std::string& conversationId,
556 std::lock_guard
lk(convInfosMtx_);
557 auto itConv = convInfos_.find(conversationId);
558 if (
itConv != convInfos_.end() &&
itConv->second.isRemoved()) {
562 JAMI_WARNING(
"[Account {:s}] [Conversation {}] Received a commit, but conversation is removed",
568 std::optional<ConversationRequest>
oldReq;
570 std::lock_guard
lk(conversationsRequestsMtx_);
571 oldReq = getRequest(conversationId);
573 JAMI_DEBUG(
"[Account {}] [Conversation {}] Received a request for a conversation already declined.",
574 accountId_, conversationId);
578 JAMI_DEBUG(
"[Account {:s}] [Conversation {}] [device {}] fetching '{:s}'",
586 if (
oldReq == std::nullopt) {
590 JAMI_WARNING(
"[Account {}] [Conversation {}] Unable to find conversation, asking for an invite",
600 std::unique_lock
lk(
conv->mtx);
602 if (
conv->conversation) {
607 if (
conv->conversation->isRemoving()) {
608 JAMI_WARNING(
"[Account {}] [Conversation {}] conversaton is being removed",
613 if (!
conv->conversation->isMember(peer,
true)) {
614 JAMI_WARNING(
"[Account {}] [Conversation {}] {} is not a membe", accountId_, conversationId, peer);
617 if (
conv->conversation->isBanned(deviceId)) {
618 JAMI_WARNING(
"[Account {}] [Conversation {}] device {} is banned",
628 JAMI_ERROR(
"[Account {}] [Conversation {}] No message detected. This is a bug", accountId_, conversationId);
632 if (!
conv->startFetch(deviceId)) {
633 JAMI_WARNING(
"[Account {}] [Conversation {}] Already fetching", accountId_, conversationId);
637 syncCnt.fetch_add(1);
644 peer = std::move(peer),
647 auto sthis = w.lock();
648 auto acc =
sthis ?
sthis->account_.lock() :
nullptr;
649 std::unique_lock
lk(
conv->mtx);
650 auto conversation =
conv->conversation;
651 if (!channel || !acc || !conversation) {
652 conv->stopFetch(deviceId);
654 sthis->syncCnt.fetch_sub(1);
657 conversation->addGitSocket(channel->deviceId(), channel);
664 conversationId = std::move(conversationId),
668 auto shared = w.lock();
672 JAMI_WARNING(
"[Account {}] [Conversation {}] Unable to fetch new commit from "
673 "{}, other peer may be disconnected",
677 JAMI_LOG(
"[Account {}] [Conversation {}] Relaunch sync with {}",
684 std::lock_guard
lk(
conv->mtx);
685 conv->pending.reset();
688 shared->sendMessageNotification(*
conv->conversation,
694 if (shared->syncCnt.fetch_sub(1) == 1) {
704 if (
oldReq != std::nullopt)
710 cloneConversation(deviceId, peer,
conv);
714 JAMI_WARNING(
"[Account {}] [Conversation {}] Unable to find conversation, asking for an invite",
727 const std::string& deviceId)
729 auto acc = account_.lock();
732 std::vector<DeviceId>
kd;
734 std::unique_lock
lk(conversationsMtx_);
735 const auto& devices = accountManager_->getKnownDevices();
736 kd.reserve(devices.size());
737 for (
const auto& [
id,
_] : devices)
743 std::unique_lock
lk(
conv->mtx, std::defer_lock);
746 if (
conv->pending && !
conv->pending->removeId.empty())
747 toRm = std::move(
conv->pending->removeId);
748 conv->pending.reset();
754 auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId);
755 conversation->onMembersChanged([w =
weak_from_this(), conversationId](
const auto& members) {
757 dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
758 if (
auto sthis = w.lock())
759 sthis->setConversationMembers(conversationId, members);
762 conversation->onMessageStatusChanged([
this, conversationId](
const auto& status) {
763 auto msg = std::make_shared<SyncMsg>();
764 msg->ms = {{conversationId, status}};
765 needsSyncingCb_(std::move(
msg));
767 conversation->onNeedSocket(onNeedSwarmSocket_);
768 if (!conversation->isMember(username_,
true)) {
769 JAMI_ERROR(
"[Account {}] [Conversation {}] Conversation cloned but we do not seem to be a valid member",
772 conversation->erase();
781 setConversationMembers(conversationId, conversation->memberUris(
"", {}));
785 if (
conv->pending &&
conv->pending->socket)
786 conversation->addGitSocket(
DeviceId(deviceId), std::move(
conv->pending->socket));
790 if (
conv->info.isRemoved())
792 std::map<std::string, std::string> preferences;
793 std::map<std::string, std::map<std::string, std::string>> status;
795 preferences = std::move(
conv->pending->preferences);
796 status = std::move(
conv->pending->status);
798 conv->conversation = conversation;
800 removeRepositoryImpl(*
conv,
false,
true);
805 auto commitId = conversation->join();
806 std::vector<std::map<std::string, std::string>>
messages;
808 std::lock_guard
lk(replayMtx_);
809 auto replayIt = replay_.find(conversationId);
816 sendMessageNotification(*conversation,
false,
commitId);
827 if (!preferences.empty())
828 conversation->updatePreferences(preferences);
830 conversation->updateMessageStatus(status);
831 syncingMetadatas_.erase(conversationId);
837 std::vector<Json::Value>
values;
839 for (
const auto& message :
messages) {
842 if (message.at(
"type") ==
"text/plain" && message.at(
"author") == username_) {
844 json[
"body"] = message.at(
"body");
845 json[
"type"] =
"text/plain";
846 values.emplace_back(std::move(json));
850 conversation->sendMessages(std::move(
values),
851 [w = weak(), conversationId](
const auto&
commits) {
852 auto shared = w.lock();
854 shared->sendMessageNotification(conversationId,
863 auto cert = acc->certStore().getCertificate(deviceId);
864 askForProfile =
cert &&
cert->issuer &&
cert->issuer->getId().toString() == username_;
867 for (
const auto&
member : conversation->memberUris(username_)) {
868 acc->askForProfile(conversationId, deviceId,
member);
871 }
catch (
const std::exception&
e) {
872 JAMI_WARNING(
"[Account {}] [Conversation {}] Something went wrong when cloning conversation: {}. Re-clone in {}s",
876 conv->fallbackTimer.count());
877 conv->fallbackClone->expires_at(std::chrono::steady_clock::now() +
conv->fallbackTimer);
878 conv->fallbackTimer *= 2;
883 std::placeholders::_1,
890std::optional<ConversationRequest>
894 auto it = conversationsRequests_.find(
id);
895 if (
it != conversationsRequests_.end())
903 if (
auto details = accountManager_->getContactInfo(uri)) {
911 return details->conversationId;
924 JAMI_DEBUG(
"[Account {}] [Conversation {}] Old conversation is not found in details {} - found: {}",
931 accountManager_->updateContactConversation(uri,
newConv);
941 for (
auto& [
id, request] : conversationsRequests_) {
942 if (request.declined)
944 if (request.isOneToOne() && request.from == uri) {
945 JAMI_WARNING(
"[Account {}] [Conversation {}] Decline conversation request from {}",
946 accountId_,
id, uri);
947 request.declined = std::time(
nullptr);
948 syncingMetadatas_.erase(
id);
955std::vector<std::map<std::string, std::string>>
959 return withConv(conversationId,
969 std::unique_lock
lk(
conv->mtx);
970 removeRepositoryImpl(*
conv, sync,
force);
976 if (
conv.conversation && (
force ||
conv.conversation->isRemoving())) {
978 conv.pending.reset();
980 JAMI_LOG(
"[Account {}] [Conversation {}] Remove conversation", accountId_,
conv.info.id);
983 for (
const auto&
member :
conv.conversation->getInitialMembers()) {
984 if (
member != username_) {
988 accountManager_->removeContactConversation(
member);
993 }
catch (
const std::exception&
e) {
996 conv.conversation->erase();
997 conv.conversation.reset();
1002 conv.info.erased = std::time(
nullptr);
1003 needsSyncingCb_({});
1011 return withConv(conversationId, [
this](
auto&
conv) {
return removeConversationImpl(
conv); });
1017 auto members =
conv.getMembers(
false,
false);
1020 && std::find_if(members.begin(),
1022 [&](
const auto&
member) {
1023 return member.at(
"uri") == username_;
1026 && members.size() != 1;
1027 conv.info.removed = std::time(
nullptr);
1029 conv.info.erased = std::time(
nullptr);
1031 needsSyncingCb_({});
1042 JAMI_LOG(
"Wait that someone sync that user left conversation {}",
conv.info.id);
1046 sendMessageNotification(*
conv.conversation,
false,
commitId);
1048 JAMI_ERROR(
"Failed to send message to conversation {}",
conv.info.id);
1057 for (
const auto&
m : members)
1058 if (username_ !=
m.at(
"uri"))
1062 removeRepositoryImpl(
conv,
true);
1070 const std::string& deviceId)
1073 std::lock_guard
lk(
conv->mtx);
1074 if (
conv->conversation)
1075 sendMessageNotification(*
conv->conversation, sync,
commitId, deviceId);
1083 const std::string& deviceId)
1085 auto acc = account_.lock();
1088 Json::Value message;
1090 message[
"id"] = conversation.
id();
1091 message[
"commit"] = commit;
1092 message[
"deviceId"] = deviceId_;
1100 refreshMessage[username_] = sendMsgCb_(username_,
1102 std::map<std::string, std::string> {
1104 refreshMessage[username_]);
1111 std::vector<NodeId> devices;
1113 std::lock_guard
lk(notSyncedNotificationMtx_);
1118 for (
const auto& device : devices) {
1119 auto cert = acc->certStore().getCertificate(device.toString());
1124 std::set_difference(members.begin(),
1133 JAMI_DEBUG(
"[Conversation {}] Not yet bootstraped, save notification",
1137 notSyncedNotification_[conversation.
id()] = commit;
1144 std::map<std::string, std::string> {
1150 for (
const auto& device : devices) {
1157 std::map<std::string, std::string> {
1165 std::string message,
1167 const std::string& type,
1173 json[
"body"] = std::move(message);
1174 json[
"type"] = type;
1185 Json::Value&&
value,
1192 std::lock_guard
lk(
conv->mtx);
1193 if (
conv->conversation)
1195 ->sendMessage(std::move(
value),
1207 sendMessageNotification(conversationId,
true,
commitId);
1209 JAMI_ERR(
"Failed to send message to conversation %s",
1210 conversationId.c_str());
1222 std::string type,
tid;
1224 std::lock_guard
lk(
conv->mtx);
1225 if (
conv->conversation) {
1227 if (commit != std::nullopt) {
1228 type = commit->at(
"type");
1229 if (type ==
"application/data-transfer+json")
1230 tid = commit->at(
"tid");
1232 && (type ==
"text/plain" || type ==
"application/data-transfer+json");
1242 if (type ==
"application/data-transfer+json") {
1247 dhtnet::fileutils::remove(path,
true);
1252 json[
"type"] = type;
1261 std::lock_guard
lk(notSyncedNotificationMtx_);
1262 auto it = notSyncedNotification_.find(
convId);
1263 if (
it != notSyncedNotification_.end()) {
1265 notSyncedNotification_.erase(
it);
1268 JAMI_DEBUG(
"[Account {}] [Conversation {}] Resend last message notification", accountId_,
convId);
1270 if (
auto sthis = w.lock())
1277 std::shared_ptr<JamiAccount> acc,
1278 const std::vector<std::tuple<std::string, std::string, std::string>>&
updateContactConv,
1279 const std::set<std::string>&
toRm)
1288 auto requests = acc->getTrustRequests();
1289 std::lock_guard
lk(conversationsRequestsMtx_);
1290 for (
const auto& request :
requests) {
1296 auto declined =
itReq == conversationsRequests_.end() ||
itReq->second.declined;
1304 for (
auto it = conversationsRequests_.begin();
it != conversationsRequests_.end();) {
1305 if (
it->second.from == username_) {
1306 JAMI_WARNING(
"Detected request from ourself, this makes no sense. Remove {}",
1308 it = conversationsRequests_.erase(
it);
1322 JAMI_ERROR(
"[Account {}] Remove conversation ({})", accountId_,
conv);
1325 JAMI_DEBUG(
"[Account {}] Conversations loaded!", accountId_);
1330 const std::string& deviceId,
1333 std::lock_guard
lk(
conv->mtx);
1334 const auto& conversationId =
conv->info.id;
1335 if (!
conv->startFetch(deviceId,
true)) {
1336 JAMI_WARNING(
"[Account {}] [Conversation {}] Already fetching", accountId_, conversationId);
1344 std::lock_guard
lk(
conv->mtx);
1345 if (
conv->pending && !
conv->pending->ready) {
1348 conv->pending->ready =
true;
1349 conv->pending->deviceId = channel->deviceId().toString();
1350 conv->pending->socket = channel;
1351 if (!
conv->pending->cloning) {
1352 conv->pending->cloning =
true;
1353 dht::ThreadPool::io().run(
1354 [
wthis, conversationId, deviceId =
conv->pending->deviceId]() {
1355 if (auto sthis = wthis.lock())
1356 sthis->handlePendingConversation(conversationId, deviceId);
1361 conv->stopFetch(deviceId);
1362 JAMI_WARNING(
"[Account {}] [Conversation {}] Clone failed. Re-clone in {}s",
sthis->accountId_, conversationId,
conv->fallbackTimer.count());
1363 conv->fallbackClone->expires_at(std::chrono::steady_clock::now()
1364 +
conv->fallbackTimer);
1365 conv->fallbackTimer *= 2;
1368 conv->fallbackClone->async_wait(
1371 std::placeholders::_1,
1382 const std::string& conversationId)
1384 if (
ec == asio::error::operation_aborted)
1390 for (
const auto&
member : members)
1391 if (
member.at(
"uri") != username_)
1398 std::vector<DeviceId>
kd;
1400 std::unique_lock
lk(conversationsMtx_);
1401 const auto& devices = accountManager_->getKnownDevices();
1402 kd.reserve(devices.size());
1403 for (
const auto& [
id,
_] : devices)
1404 kd.emplace_back(
id);
1414 std::vector<std::string>
toClone;
1416 std::lock_guard
lk(convInfosMtx_);
1417 for (
const auto& [conversationId,
convInfo] : convInfos_) {
1421 if ((!
conv->conversation && !
conv->info.isRemoved())) {
1425 toClone.emplace_back(conversationId);
1426 }
else if (
conv->conversation) {
1431 std::lock_guard
lk(
conv->mtx);
1432 if (
conv->conversation)
1438 for (
const auto&
member : members) {
1439 if (
member.at(
"uri") != username_)
1447 const std::string& uri,
1456 std::lock_guard
lk(
conv->mtx);
1457 conv->info = {conversationId};
1458 conv->info.created = std::time(
nullptr);
1459 conv->info.members.emplace(username_);
1460 conv->info.members.emplace(uri);
1463 const std::shared_ptr<dht::crypto::PublicKey>& pk) {
1464 auto sthis = w.lock();
1465 auto deviceId = pk->getLongId().
toString();
1477 const std::string& accountId,
1478 const std::map<std::string, ConversationRequest>& conversationsRequests)
1486 const std::filesystem::path& path,
1487 const std::map<std::string, ConversationRequest>& conversationsRequests)
1489 auto p = path /
"convRequests";
1490 std::lock_guard lock(dhtnet::fileutils::getFileLock(p));
1491 std::ofstream
file(p, std::ios::trunc | std::ios::binary);
1492 msgpack::pack(
file, conversationsRequests);
1506 std::ofstream
file(path /
"convInfo", std::ios::trunc | std::ios::binary);
1507 msgpack::pack(
file, conversations);
1513 std::shared_ptr<AccountManager> accountManager,
1536 std::unique_lock
lk(pimpl_->conversationsMtx_);
1537 pimpl_->accountManager_ = accountManager;
1542ConversationModule::onBootstrapStatus(
1543 const std::function<
void(std::string, Conversation::BootstrapStatus)>&
cb)
1545 pimpl_->bootstrapCbTest_ =
cb;
1554 auto acc = pimpl_->account_.lock();
1557 JAMI_LOG(
"[Account {}] Start loading conversations…", pimpl_->accountId_);
1561 std::unique_lock
lk(pimpl_->conversationsMtx_);
1562 auto contacts = pimpl_->accountManager_->getContacts(
1564 std::unique_lock
ilk(pimpl_->convInfosMtx_);
1565 pimpl_->convInfos_ =
convInfos(pimpl_->accountId_);
1566 pimpl_->conversations_.clear();
1571 std::condition_variable cv;
1573 std::set<std::string> toRm;
1576 std::vector<std::map<std::string, std::string>> contacts;
1577 std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
1579 auto ctx = std::make_shared<Ctx>();
1581 ctx->contacts = std::move(contacts);
1584 dht::ThreadPool::io().run([
this,
ctx, repository = std::move(r), acc] {
1586 auto sconv = std::make_shared<SyncedConversation>(repository);
1587 auto conv = std::make_shared<Conversation>(acc, repository);
1588 conv->onMessageStatusChanged([
this, repository](
const auto& status) {
1589 auto msg = std::make_shared<SyncMsg>();
1590 msg->ms = {{repository, status}};
1591 pimpl_->needsSyncingCb_(std::move(
msg));
1593 conv->onMembersChanged(
1594 [w = pimpl_->weak_from_this(), repository](
const auto& members) {
1596 dht::ThreadPool::io().run([w, repository, members = std::move(members)] {
1597 if (
auto sthis = w.lock())
1598 sthis->setConversationMembers(repository, members);
1601 conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1602 auto members =
conv->memberUris(acc->getUsername(), {});
1610 ctx->contacts.cend(),
1611 [&](
const auto& c) {
1612 return c.at(
"id") == otherUri;
1616 std::lock_guard
lkCv {
ctx->cvMtx};
1618 ctx->cv.notify_all();
1622 auto removed = std::stoul(
itContact->at(
"removed"));
1623 auto added = std::stoul(
itContact->at(
"added"));
1624 auto isRemoved = removed > added;
1629 JAMI_ERROR(
"Conversation {} detected for {} and should be removed",
1632 std::lock_guard
lkMtx {
ctx->toRmMtx};
1633 ctx->toRm.insert(repository);
1635 JAMI_ERROR(
"No conversation detected for {} but one exists ({}). "
1639 std::lock_guard
lkMtx {
ctx->toRmMtx};
1640 ctx->updateContactConv.emplace_back(
1644 JAMI_ERROR(
"Multiple conversation detected for {} but ({} & {})",
1648 std::lock_guard
lkMtx {
ctx->toRmMtx};
1649 ctx->toRm.insert(repository);
1654 std::lock_guard
lkMtx {
ctx->convMtx};
1655 auto convInfo = pimpl_->convInfos_.find(repository);
1656 if (
convInfo == pimpl_->convInfos_.end()) {
1657 JAMI_ERROR(
"Missing conv info for {}. This is a bug!", repository);
1658 sconv->info.created = std::time(
nullptr);
1659 sconv->info.lastDisplayed
1663 if (
convInfo->second.isRemoved()) {
1665 conv->setRemovingFlag();
1666 std::lock_guard
lkMtx {
ctx->toRmMtx};
1667 ctx->toRm.insert(repository);
1675 members.emplace(acc->getUsername());
1676 sconv->info.members = std::move(members);
1678 pimpl_->convInfos_[repository] =
sconv->info;
1686 pimpl_->sendMessageNotification(*
conv,
true, *
commits.rbegin());
1689 std::lock_guard
lkMtx {
ctx->convMtx};
1690 pimpl_->conversations_.emplace(repository, std::move(
sconv));
1691 }
catch (
const std::logic_error&
e) {
1692 JAMI_WARNING(
"[Account {}] Conversations not loaded: {}",
1696 std::lock_guard
lkCv {
ctx->cvMtx};
1698 ctx->cv.notify_all();
1702 std::unique_lock
lkCv(
ctx->cvMtx);
1703 ctx->cv.wait(
lkCv, [&] {
return ctx->convNb == 0; });
1707 std::set<std::string> removed;
1708 for (
auto itInfo = pimpl_->convInfos_.begin();
itInfo != pimpl_->convInfos_.end();) {
1709 const auto& info =
itInfo->second;
1710 if (info.members.empty()) {
1714 if (info.isRemoved())
1715 removed.insert(info.id);
1716 auto itConv = pimpl_->conversations_.find(info.id);
1717 if (
itConv == pimpl_->conversations_.end()) {
1720 itConv = pimpl_->conversations_
1721 .emplace(info.id, std::make_shared<SyncedConversation>(info))
1724 if (
itConv != pimpl_->conversations_.end() &&
itConv->second &&
itConv->second->conversation
1725 && info.isRemoved())
1726 itConv->second->conversation->setRemovingFlag();
1727 if (!info.isRemoved() &&
itConv == pimpl_->conversations_.end()) {
1729 if (info.members.size() == 1 && *info.members.begin() == acc->getUsername()) {
1730 JAMI_WARNING(
"[Account {:s}] Conversation {:s} seems not present/synced.",
1743 if (!removed.empty())
1744 acc->unlinkConversations(removed);
1746 pimpl_->saveConvInfos();
1751 dht::ThreadPool::io().run([w = pimpl_->weak(),
1754 toRm = std::move(
ctx->toRm)]() {
1756 if (auto shared = w.lock())
1757 shared->fixStructures(acc, updateContactConv, toRm);
1764 auto acc = pimpl_->account_.lock();
1767 JAMI_LOG(
"[Account {}] Start loading conversation {}", pimpl_->accountId_,
convId);
1769 std::unique_lock
lk(pimpl_->conversationsMtx_);
1770 std::unique_lock
ilk(pimpl_->convInfosMtx_);
1772 pimpl_->convInfos_ =
convInfos(pimpl_->accountId_);
1773 pimpl_->conversations_.clear();
1776 auto sconv = std::make_shared<SyncedConversation>(
convId);
1778 auto conv = std::make_shared<Conversation>(acc,
convId);
1780 conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
1783 pimpl_->conversations_.emplace(
convId, std::move(
sconv));
1784 }
catch (
const std::logic_error&
e) {
1785 JAMI_WARNING(
"[Account {}] Conversations not loaded: {}", pimpl_->accountId_,
e.what());
1801 for (
auto itInfo = pimpl_->convInfos_.begin();
itInfo != pimpl_->convInfos_.end();) {
1802 const auto& info =
itInfo->second;
1803 if (info.members.empty()) {
1807 auto itConv = pimpl_->conversations_.find(info.id);
1808 if (
itConv == pimpl_->conversations_.end()) {
1811 pimpl_->conversations_.emplace(info.id, std::make_shared<SyncedConversation>(info));
1823 pimpl_->bootstrap(
convId);
1829 for (
auto&
conv : pimpl_->getConversations())
1843 for (
auto&
conv : pimpl_->getSyncedConversations()) {
1844 std::lock_guard
lk(
conv->mtx);
1846 JAMI_ERR(
"This is a bug, seems to still fetch to some device on initializing");
1847 conv->pending.reset();
1855 pimpl_->conversationsRequests_ =
convRequests(pimpl_->accountId_);
1858std::vector<std::string>
1861 std::vector<std::string> result;
1862 std::lock_guard
lk(pimpl_->convInfosMtx_);
1863 result.reserve(pimpl_->convInfos_.size());
1864 for (
const auto& [key,
conv] : pimpl_->convInfos_) {
1865 if (
conv.isRemoved())
1867 result.emplace_back(key);
1875 return pimpl_->getOneToOneConversation(uri);
1886std::vector<std::map<std::string, std::string>>
1889 std::vector<std::map<std::string, std::string>>
requests;
1890 std::lock_guard
lk(pimpl_->conversationsRequestsMtx_);
1891 requests.reserve(pimpl_->conversationsRequests_.size());
1892 for (
const auto& [
id, request] : pimpl_->conversationsRequests_) {
1893 if (request.declined)
1895 requests.emplace_back(request.toMap());
1902 const std::string& conversationId,
1903 const std::vector<uint8_t>& payload,
1911 "Contact is sending a request for a non active conversation. Ignore. They will "
1912 "clone the old one");
1915 std::unique_lock
lk(pimpl_->conversationsRequestsMtx_);
1921 std::string_view(
reinterpret_cast<const char*
>(payload.data()), payload.size())));
1923 if (pimpl_->addConversationRequest(conversationId, std::move(req))) {
1933 pimpl_->needsSyncingCb_({});
1935 JAMI_DEBUG(
"[Account {}] Received a request for a conversation "
1936 "already existing. Ignore",
1937 pimpl_->accountId_);
1948 oldConv = pimpl_->getOneToOneConversation(from);
1950 std::unique_lock
lk(pimpl_->conversationsRequestsMtx_);
1951 JAMI_DEBUG(
"[Account {}] Receive a new conversation request for conversation {} from {}",
1958 if (pimpl_->isConversation(
convId))
1961 if (
oldReq != std::nullopt) {
1962 JAMI_DEBUG(
"[Account {}] Received a request for a conversation already existing. "
1963 "Ignore. Declined: {}",
1965 static_cast<int>(
oldReq->declined));
1975 "Contact is sending a request for a non active conversation. Ignore. They will "
1976 "clone the old one");
1983 if (pimpl_->addConversationRequest(
convId, std::move(req))) {
1988 pimpl_->oneToOneRecvCb_(
convId, from);
1998 std::lock_guard
lk(pimpl_->conversationsRequestsMtx_);
1999 auto it = pimpl_->conversationsRequests_.find(
convId);
2000 if (
it != pimpl_->conversationsRequests_.end()) {
2001 return it->second.from;
2008 const std::string& conversationId)
2010 pimpl_->withConversation(conversationId, [&](
auto& conversation) {
2011 if (!conversation.isMember(from,
true)) {
2012 JAMI_WARNING(
"{} is asking a new invite for {}, but not a member", from, conversationId);
2015 JAMI_LOG(
"{} is asking a new invite for {}", from, conversationId);
2016 pimpl_->sendMsgCb_(from, {}, conversation.generateInvitation(), 0);
2022 const std::string& deviceId)
2025 std::unique_lock
lkCr(pimpl_->conversationsRequestsMtx_);
2026 auto request = pimpl_->getRequest(conversationId);
2027 if (request == std::nullopt) {
2029 if (
auto conv = pimpl_->getConversation(conversationId)) {
2030 std::unique_lock
lk(
conv->mtx);
2031 if (!
conv->conversation) {
2033 pimpl_->cloneConversationFrom(
conv, deviceId);
2036 JAMI_WARNING(
"[Account {}] Request not found for conversation {}",
2041 pimpl_->rmConversationRequest(conversationId);
2043 pimpl_->accountManager_->acceptTrustRequest(request->from,
true);
2050 std::lock_guard
lk(pimpl_->conversationsRequestsMtx_);
2051 auto it = pimpl_->conversationsRequests_.find(conversationId);
2052 if (
it != pimpl_->conversationsRequests_.end()) {
2053 it->second.declined = std::time(
nullptr);
2054 pimpl_->saveConvRequests();
2056 pimpl_->syncingMetadatas_.erase(conversationId);
2057 pimpl_->saveMetadata();
2060 pimpl_->needsSyncingCb_({});
2066 auto acc = pimpl_->account_.lock();
2069 std::vector<DeviceId>
kd;
2070 for (
const auto& [
id,
_] : acc->getKnownDevices())
2071 kd.emplace_back(
id);
2073 std::shared_ptr<Conversation> conversation;
2075 conversation = std::make_shared<Conversation>(acc, mode,
otherMember.toString());
2076 auto conversationId = conversation->id();
2077 conversation->onMessageStatusChanged([
this, conversationId](
const auto& status) {
2078 auto msg = std::make_shared<SyncMsg>();
2079 msg->ms = {{conversationId, status}};
2080 pimpl_->needsSyncingCb_(std::move(
msg));
2082 conversation->onMembersChanged(
2083 [w = pimpl_->weak_from_this(), conversationId](
const auto& members) {
2085 dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
2086 if (
auto sthis = w.lock())
2087 sthis->setConversationMembers(conversationId, members);
2090 conversation->onNeedSocket(pimpl_->onNeedSwarmSocket_);
2092 conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_);
2098 }
catch (
const std::exception&
e) {
2099 JAMI_ERROR(
"[Account {}] Error while generating a conversation {}",
2104 auto convId = conversation->id();
2105 auto conv = pimpl_->startConversation(
convId);
2106 std::unique_lock
lk(
conv->mtx);
2107 conv->info.created = std::time(
nullptr);
2108 conv->info.members.emplace(pimpl_->username_);
2111 conv->conversation = conversation;
2115 pimpl_->needsSyncingCb_({});
2122 const std::string& uri,
2125 pimpl_->cloneConversationFrom(conversationId, uri,
oldConvId);
2131 std::string message,
2133 const std::string& type,
2138 pimpl_->sendMessage(conversationId,
2149 Json::Value&&
value,
2155 pimpl_->sendMessage(conversationId,
2180 json[
"type"] =
"text/plain";
2181 pimpl_->sendMessage(conversationId, std::move(json));
2187 const std::string&
reason)
2189 auto finalUri = uri.substr(0, uri.find(
"@ring.dht"));
2195 value[
"type"] =
"application/call-history+json";
2205 const std::string& conversationId,
2206 const std::string& interactionId)
2208 if (
auto conv = pimpl_->getConversation(conversationId)) {
2209 std::unique_lock
lk(
conv->mtx);
2210 if (
auto conversation =
conv->conversation) {
2212 return conversation->setMessageDisplayed(peer, interactionId);
2218std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>
2221 std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> messageStatus;
2222 for (
const auto&
conv : pimpl_->getConversations()) {
2223 auto d =
conv->messageStatus();
2225 messageStatus[
conv->id()] = std::move(
d);
2227 return messageStatus;
2235 auto acc = pimpl_->account_.lock();
2236 if (
auto conv = pimpl_->getConversation(conversationId)) {
2237 std::lock_guard
lk(
conv->mtx);
2238 if (
conv->conversation) {
2239 const uint32_t id = std::uniform_int_distribution<uint32_t> {1}(acc->rand);
2243 conv->conversation->loadMessages(
2244 [accountId = pimpl_->accountId_, conversationId,
id](
auto&&
messages) {
2260 if (
auto conv = pimpl_->getConversation(conversationId)) {
2261 std::lock_guard
lk(
conv->mtx);
2262 if (
conv->conversation) {
2263 conv->conversation->clearCache();
2273 auto acc = pimpl_->account_.lock();
2274 if (
auto conv = pimpl_->getConversation(conversationId)) {
2275 std::lock_guard
lk(
conv->mtx);
2276 if (
conv->conversation) {
2277 const uint32_t id = std::uniform_int_distribution<uint32_t> {1}(acc->rand);
2281 conv->conversation->loadMessages2(
2282 [accountId = pimpl_->accountId_, conversationId,
id](
auto&&
messages) {
2300 auto acc = pimpl_->account_.lock();
2301 if (
auto conv = pimpl_->getConversation(conversationId)) {
2302 std::lock_guard
lk(
conv->mtx);
2303 if (
conv->conversation) {
2304 const uint32_t id = std::uniform_int_distribution<uint32_t> {1}(acc->rand);
2309 conv->conversation->loadMessages(
2310 [accountId = pimpl_->accountId_, conversationId,
id](
auto&&
messages) {
2328 auto acc = pimpl_->account_.lock();
2329 if (
auto conv = pimpl_->getConversation(conversationId)) {
2330 std::lock_guard
lk(
conv->mtx);
2331 if (
conv->conversation) {
2332 const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
2337 conv->conversation->loadMessages2(
2338 [accountId = pimpl_->accountId_, conversationId,
id](
auto&&
messages) {
2351std::shared_ptr<TransferManager>
2354 return pimpl_->withConversation(conversationId,
2355 [](
auto& conversation) {
return conversation.dataTransfer(); });
2360 const std::string&
member,
2361 const std::string& fileId,
2364 if (
auto conv = pimpl_->getConversation(conversationId)) {
2365 std::filesystem::path path;
2366 std::string sha3sum;
2367 std::unique_lock
lk(
conv->mtx);
2368 if (!
conv->conversation)
2370 if (!
conv->conversation->onFileChannelRequest(
member, fileId, path, sha3sum))
2375 if (!std::filesystem::is_regular_file(path)) {
2376 JAMI_WARNING(
"[Account {:s}] [Conversation {}] {:s} asked for non existing file {}",
2385 JAMI_WARNING(
"[Account {:s}] [Conversation {}] {:s} asked for file {:s}, but our version is not "
2386 "complete or corrupted",
2400 const std::string& interactionId,
2401 const std::string& fileId,
2402 const std::string& path)
2404 if (
auto conv = pimpl_->getConversation(conversationId)) {
2405 std::lock_guard
lk(
conv->mtx);
2406 if (
conv->conversation)
2407 return conv->conversation->downloadFile(interactionId, fileId, path,
"",
"");
2416 std::set<std::string>
toFetch;
2417 std::set<std::string>
toClone;
2418 for (
const auto&
conv : pimpl_->getSyncedConversations()) {
2419 std::lock_guard
lk(
conv->mtx);
2420 if (
conv->conversation) {
2421 if (!
conv->conversation->isRemoving() &&
conv->conversation->isMember(peer,
false)) {
2424 }
else if (!
conv->info.isRemoved()
2425 && std::find(
conv->info.members.begin(),
conv->info.members.end(), peer)
2426 !=
conv->info.members.end()) {
2432 pimpl_->fetchNewCommits(peer, deviceId,
cid);
2434 pimpl_->cloneConversation(deviceId, peer,
cid);
2435 if (pimpl_->syncCnt.load() == 0)
2441 const std::string& peerId,
2442 const std::string& deviceId)
2444 std::vector<std::string>
toClone;
2448 std::lock_guard
lk(pimpl_->conversationsRequestsMtx_);
2449 pimpl_->rmConversationRequest(
convId);
2453 std::unique_lock
lk(
conv->mtx);
2456 < std::max(
conv->info.created,
conv->info.removed))
2461 if (
conv->info.removed) {
2470 if (!
conv->conversation) {
2471 if (deviceId !=
"") {
2472 pimpl_->cloneConversation(deviceId, peerId,
conv);
2482 if (
conv->conversation && !
conv->conversation->isRemoving()) {
2485 conv->conversation->setRemovingFlag();
2487 auto update =
false;
2488 if (!
conv->info.removed) {
2490 conv->info.removed = std::time(
nullptr);
2493 conv->info.erased = std::time(
nullptr);
2494 pimpl_->addConvInfo(
conv->info);
2495 pimpl_->removeRepositoryImpl(*
conv,
false);
2496 }
else if (update) {
2497 pimpl_->addConvInfo(
conv->info);
2504 for (
const auto&
member : members) {
2505 if (
member.at(
"uri") != pimpl_->username_)
2510 for (
const auto& [
convId, req] :
msg.cr) {
2511 if (req.from == pimpl_->username_) {
2515 std::unique_lock
lk(pimpl_->conversationsRequestsMtx_);
2516 if (pimpl_->isConversation(
convId)) {
2518 pimpl_->rmConversationRequest(
convId);
2523 if (!pimpl_->addConversationRequest(
convId, req))
2527 if (req.declined != 0) {
2529 JAMI_LOG(
"[Account {:s}] Declined request detected for conversation {:s} (device {:s})",
2533 pimpl_->syncingMetadatas_.erase(
convId);
2534 pimpl_->saveMetadata();
2540 JAMI_LOG(
"[Account {:s}] New request detected for conversation {:s} (device {:s})",
2552 if (
auto conv = pimpl_->getConversation(
convId)) {
2553 std::unique_lock
lk(
conv->mtx);
2554 if (
conv->conversation) {
2555 auto conversation =
conv->conversation;
2557 conversation->updatePreferences(p);
2558 }
else if (
conv->pending) {
2559 conv->pending->preferences = p;
2565 for (
const auto& [
convId, ms] :
msg.ms) {
2566 if (
auto conv = pimpl_->getConversation(
convId)) {
2567 std::unique_lock
lk(
conv->mtx);
2568 if (
conv->conversation) {
2569 auto conversation =
conv->conversation;
2571 conversation->updateMessageStatus(ms);
2572 }
else if (
conv->pending) {
2573 conv->pending->status = ms;
2583 std::lock_guard
lk(pimpl_->conversationsMtx_);
2584 for (
const auto& [key,
ci] : pimpl_->conversations_) {
2585 std::lock_guard
lk(
ci->mtx);
2586 if (
ci->conversation) {
2587 if (
ci->conversation->isRemoving() &&
ci->conversation->isMember(
memberUri,
false))
2589 }
else if (!
ci->info.removed
2590 && std::find(
ci->info.members.begin(),
ci->info.members.end(),
memberUri)
2591 !=
ci->info.members.end()) {
2601 const std::string& deviceId,
2604 if (
auto conv = pimpl_->getConversation(conversationId)) {
2605 std::lock_guard
lk(
conv->mtx);
2606 if (
conv->conversation) {
2607 bool remove =
conv->conversation->isRemoving();
2610 pimpl_->removeRepositoryImpl(*
conv,
true);
2617 const std::string& deviceId,
2618 const std::string& conversationId,
2621 pimpl_->fetchNewCommits(peer, deviceId, conversationId,
commitId);
2629 auto conv = pimpl_->getConversation(conversationId);
2631 JAMI_ERROR(
"Conversation {:s} does not exist", conversationId);
2634 std::unique_lock
lk(
conv->mtx);
2641 auto invite =
conv->conversation->generateInvitation();
2647 conv->conversation->addMember(
2652 std::unique_lock
lk(
conv->mtx);
2653 pimpl_->sendMessageNotification(*
conv->conversation,
2657 auto invite =
conv->conversation->generateInvitation();
2671 if (
auto conv = pimpl_->getConversation(conversationId)) {
2672 std::lock_guard
lk(
conv->mtx);
2673 if (
conv->conversation)
2674 return conv->conversation
2677 [
this, conversationId](
bool ok,
const std::string&
commitId) {
2679 pimpl_->sendMessageNotification(conversationId,
2687std::vector<std::map<std::string, std::string>>
2691 return pimpl_->getConversationMembers(conversationId,
includeBanned);
2696 const std::string&
toId,
2697 const std::string&
fromId,
2698 const std::string& authorUri)
const
2700 if (
auto conv = pimpl_->getConversation(
convId)) {
2701 std::lock_guard
lk(
conv->mtx);
2702 if (
conv->conversation)
2703 return conv->conversation->countInteractions(
toId,
fromId, authorUri);
2712 auto convs = pimpl_->getConversations();
2713 if (
convs.empty()) {
2718 std::vector<std::map<std::string, std::string>> {});
2725 }
else if (
auto conv = pimpl_->getConversation(
convId)) {
2726 std::lock_guard
lk(
conv->mtx);
2727 if (
conv->conversation)
2728 conv->conversation->search(req,
filter, std::make_shared<std::atomic_int>(1));
2734 const std::map<std::string, std::string>& infos,
2737 auto conv = pimpl_->getConversation(conversationId);
2739 JAMI_ERROR(
"Conversation {:s} does not exist", conversationId);
2742 std::lock_guard
lk(
conv->mtx);
2744 ->updateInfos(infos, [
this, conversationId, sync](
bool ok,
const std::string&
commitId) {
2746 pimpl_->sendMessageNotification(conversationId,
true,
commitId);
2748 JAMI_WARNING(
"Unable to update info on {:s}", conversationId);
2752std::map<std::string, std::string>
2756 std::lock_guard
lk(pimpl_->conversationsRequestsMtx_);
2757 auto itReq = pimpl_->conversationsRequests_.find(conversationId);
2758 if (
itReq != pimpl_->conversationsRequests_.end())
2759 return itReq->second.metadatas;
2761 if (
auto conv = pimpl_->getConversation(conversationId)) {
2762 std::lock_guard
lk(
conv->mtx);
2763 std::map<std::string, std::string>
md;
2767 if (
conv->conversation) {
2769 pimpl_->saveMetadata();
2775 if (
conv->conversation)
2776 return conv->conversation->infos();
2780 JAMI_ERROR(
"Conversation {:s} does not exist", conversationId);
2786 const std::map<std::string, std::string>&
prefs)
2788 if (
auto conv = pimpl_->getConversation(conversationId)) {
2789 std::unique_lock
lk(
conv->mtx);
2791 JAMI_ERROR(
"Conversation {:s} does not exist", conversationId);
2794 auto conversation =
conv->conversation;
2796 conversation->updatePreferences(
prefs);
2797 auto msg = std::make_shared<SyncMsg>();
2798 msg->p = {{conversationId, conversation->preferences(
true)}};
2799 pimpl_->needsSyncingCb_(std::move(
msg));
2803std::map<std::string, std::string>
2807 if (
auto conv = pimpl_->getConversation(conversationId)) {
2808 std::lock_guard
lk(
conv->mtx);
2809 if (
conv->conversation)
2815std::map<std::string, std::map<std::string, std::string>>
2818 std::map<std::string, std::map<std::string, std::string>> p;
2819 for (
const auto&
conv : pimpl_->getConversations()) {
2830 if (
auto conv = pimpl_->getConversation(conversationId)) {
2831 std::lock_guard
lk(
conv->mtx);
2832 if (
conv->conversation)
2833 return conv->conversation->vCard();
2835 JAMI_ERROR(
"Conversation {:s} does not exist", conversationId);
2842 if (
auto conv = pimpl_->getConversation(
convId)) {
2843 std::lock_guard
lk(
conv->mtx);
2844 if (!
conv->conversation)
2847 return conv->conversation->isBanned(uri);
2850 std::lock_guard
lk(pimpl_->conversationsMtx_);
2851 return pimpl_->accountManager_->getCertificateStatus(uri)
2852 == dhtnet::tls::TrustStore::PermissionStatus::BANNED;
2860 std::lock_guard
lk(pimpl_->conversationsRequestsMtx_);
2861 auto update =
false;
2862 for (
auto it = pimpl_->conversationsRequests_.begin();
2863 it != pimpl_->conversationsRequests_.end();
2865 if (
it->second.from == uri && !
it->second.declined) {
2866 JAMI_DEBUG(
"Declining conversation request {:s} from {:s}",
it->first, uri);
2867 pimpl_->syncingMetadatas_.erase(
it->first);
2868 pimpl_->saveMetadata();
2870 pimpl_->accountId_,
it->first);
2872 it->second.declined = std::time(
nullptr);
2876 pimpl_->saveConvRequests();
2877 pimpl_->needsSyncingCb_({});
2882 pimpl_->withConversation(conversationId, [&](
auto&
conv) {
conv.shutdownConnections(); });
2887 pimpl_->accountManager_->updateContactConversation(uri,
"");
2890 auto isSelf = uri == pimpl_->username_;
2891 std::vector<std::string>
toRm;
2893 if ((
isSelf && members.size() == 1)
2894 || (!
isSelf && std::find(members.begin(), members.end(), uri) != members.end())) {
2896 if (!
conv->info.isRemoved()) {
2897 conv->info.removed = std::time(
nullptr);
2900 pimpl_->addConvInfo(
conv->info);
2907 std::lock_guard
lk(pimpl_->conversationsMtx_);
2908 for (
auto& [
convId,
conv] : pimpl_->conversations_) {
2909 std::lock_guard
lk(
conv->mtx);
2910 if (
conv->conversation) {
2915 auto initMembers =
conv->conversation->getInitialMembers();
2919 }
catch (
const std::exception&
e) {
2927 for (
const auto&
id :
toRm)
2928 pimpl_->removeRepository(
id,
true,
true);
2934 return pimpl_->removeConversation(conversationId);
2941 std::lock_guard
lk(
conv->mtx);
2942 if (
conv->conversation) {
2946 conv->conversation->loadMessages(
2950 std::lock_guard
lk(pimpl_->replayMtx_);
2963 if (conversationId.empty()) {
2964 std::lock_guard
lk(pimpl_->conversationsMtx_);
2965 return std::find_if(pimpl_->conversations_.cbegin(),
2966 pimpl_->conversations_.cend(),
2967 [&](
const auto&
conv) {
2968 return conv.second->conversation
2969 && conv.second->conversation->isHosting(confId);
2971 != pimpl_->conversations_.cend();
2972 }
else if (
auto conv = pimpl_->getConversation(conversationId)) {
2973 if (
conv->conversation) {
2974 return conv->conversation->isHosting(
confId);
2980std::vector<std::map<std::string, std::string>>
2983 return pimpl_->withConversation(conversationId, [](
const auto& conversation) {
2984 return conversation.currentCalls();
2988std::shared_ptr<SIPCall>
2990 const std::string& url,
2991 const std::vector<libjami::MediaMap>&
mediaList,
2992 std::function<
void(
const std::string&,
const DeviceId&,
const std::shared_ptr<SIPCall>&)>&&
cb)
2994 std::string conversationId =
"",
confId =
"", uri =
"", deviceId =
"";
2995 if (url.find(
'/') == std::string::npos) {
2996 conversationId = url;
2999 if (parameters.size() != 4) {
3003 conversationId = parameters[0];
3004 uri = parameters[1];
3005 deviceId = parameters[2];
3009 auto conv = pimpl_->getConversation(conversationId);
3012 std::unique_lock
lk(
conv->mtx);
3013 if (!
conv->conversation) {
3014 JAMI_ERROR(
"Conversation {:s} not found", conversationId);
3022 auto infos =
conv->conversation->infos();
3035 deviceId =
ac.at(
"device");
3043 JAMI_DEBUG(
"Remote host detected. Calling {:s} on device {:s}", uri, deviceId);
3047 auto account = pimpl_->account_.lock();
3048 std::vector<libjami::MediaMap>
mediaMap
3050 pimpl_->account_.lock()->createDefaultMediaList(
3051 pimpl_->account_.lock()->isVideoEnabled()))
3054 if (!
sendCallRequest || (uri == pimpl_->username_ && deviceId == pimpl_->deviceId_)) {
3070 auto callUri = fmt::format(
"{}/{}/{}/{}", conversationId, uri, deviceId,
confId);
3072 accountId =
account->getAccountID(),
3074 uri = std::move(uri),
3078 if (call->isIceEnabled()) {
3079 if (not call->createIceMediaTransport(false)
3080 or not call->initIceMediaTransport(true,
3081 std::forward<dhtnet::IceTransportOptions>(opts))) {
3085 JAMI_DEBUG(
"New outgoing call with {}", uri);
3086 call->setPeerNumber(uri);
3087 call->setPeerUri(
"swarm:" + uri);
3110ConversationModule::hostConference(
const std::string& conversationId,
3111 const std::string& confId,
3112 const std::string& callId,
3113 const std::vector<libjami::MediaMap>& mediaList)
3115 auto acc = pimpl_->account_.lock();
3118 auto conf = acc->getConference(confId);
3119 auto createConf = !conf;
3120 std::shared_ptr<SIPCall> call;
3121 if (!callId.empty()) {
3122 call = std::dynamic_pointer_cast<SIPCall>(acc->getCall(callId));
3129 conf = std::make_shared<Conference>(acc, confId);
3133 if (!callId.empty())
3134 conf->addSubCall(callId);
3137 conf->attachHost(mediaList);
3140 emitSignal<libjami::CallSignal::ConferenceCreated>(acc->getAccountID(),
3144 conf->reportMediaNegotiationStatus();
3145 emitSignal<libjami::CallSignal::ConferenceChanged>(acc->getAccountID(),
3147 conf->getStateStr());
3151 auto conv = pimpl_->getConversation(conversationId);
3154 std::unique_lock lk(conv->mtx);
3155 if (!conv->conversation) {
3156 JAMI_ERROR(
"Conversation {} not found", conversationId);
3161 value[
"uri"] = pimpl_->username_;
3162 value[
"device"] = pimpl_->deviceId_;
3163 value[
"confId"] = conf->getConfId();
3164 value[
"type"] =
"application/call-history+json";
3165 conv->conversation->hostConference(std::move(value),
3166 [w = pimpl_->weak(),
3167 conversationId](
bool ok,
const std::string& commitId) {
3169 if (auto shared = w.lock())
3170 shared->sendMessageNotification(conversationId,
3174 JAMI_ERR(
"Failed to send message to conversation %s",
3175 conversationId.c_str());
3182 conf->onShutdown([w = pimpl_->weak(),
3183 accountUri = pimpl_->username_,
3184 confId = conf->getConfId(),
3186 conv](
int duration) {
3187 auto shared = w.lock();
3190 value[
"uri"] = accountUri;
3191 value[
"device"] = shared->deviceId_;
3192 value[
"confId"] = confId;
3193 value[
"type"] =
"application/call-history+json";
3194 value[
"duration"] = std::to_string(duration);
3196 std::lock_guard lk(conv->mtx);
3197 if (!conv->conversation) {
3198 JAMI_ERROR(
"Conversation {} not found", conversationId);
3201 conv->conversation->removeActiveConference(
3202 std::move(value), [w, conversationId](bool ok, const std::string& commitId) {
3204 if (auto shared = w.lock()) {
3205 shared->sendMessageNotification(conversationId, true, commitId);
3208 JAMI_ERROR(
"Failed to send message to conversation {}", conversationId);
3215std::map<std::string, ConvInfo>
3216ConversationModule::convInfos(
const std::string& accountId)
3218 return convInfosFromPath(fileutils::get_data_dir() / accountId);
3221std::map<std::string, ConvInfo>
3222ConversationModule::convInfosFromPath(
const std::filesystem::path& path)
3224 std::map<std::string, ConvInfo> convInfos;
3227 std::lock_guard lock(dhtnet::fileutils::getFileLock(path /
"convInfo"));
3228 auto file = fileutils::loadFile(
"convInfo", path);
3230 msgpack::unpacked result;
3231 msgpack::unpack(result, (
const char*) file.data(), file.size());
3232 result.get().convert(convInfos);
3233 }
catch (
const std::exception& e) {
3234 JAMI_WARN(
"[convInfo] error loading convInfo: %s", e.what());
3239std::map<std::string, ConversationRequest>
3240ConversationModule::convRequests(
const std::string& accountId)
3242 auto path = fileutils::get_data_dir() / accountId;
3243 return convRequestsFromPath(path.string());
3246std::map<std::string, ConversationRequest>
3247ConversationModule::convRequestsFromPath(
const std::filesystem::path& path)
3249 std::map<std::string, ConversationRequest> convRequests;
3252 std::lock_guard lock(dhtnet::fileutils::getFileLock(path /
"convRequests"));
3253 auto file = fileutils::loadFile(
"convRequests", path);
3255 msgpack::unpacked result;
3256 msgpack::unpack(result, (
const char*) file.data(), file.size(), 0);
3257 result.get().convert(convRequests);
3258 }
catch (
const std::exception& e) {
3259 JAMI_WARN(
"[convInfo] error loading convInfo: %s", e.what());
3261 return convRequests;
3267 pimpl_->addConvInfo(info);
3271ConversationModule::Impl::setConversationMembers(
const std::string& convId,
3272 const std::set<std::string>& members)
3274 if (
auto conv = getConversation(convId)) {
3275 std::lock_guard lk(conv->mtx);
3276 conv->info.members = members;
3277 addConvInfo(conv->info);
3281std::shared_ptr<Conversation>
3282ConversationModule::getConversation(
const std::string& convId)
3284 if (
auto conv = pimpl_->getConversation(convId)) {
3285 std::lock_guard lk(conv->mtx);
3286 return conv->conversation;
3291std::shared_ptr<dhtnet::ChannelSocket>
3292ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId)
const
3294 if (
auto conv = pimpl_->getConversation(convId)) {
3295 std::lock_guard lk(conv->mtx);
3296 if (conv->conversation)
3297 return conv->conversation->gitSocket(
DeviceId(deviceId));
3298 else if (conv->pending)
3299 return conv->pending->socket;
3305ConversationModule::addGitSocket(std::string_view deviceId,
3306 std::string_view convId,
3307 const std::shared_ptr<dhtnet::ChannelSocket>& channel)
3309 if (
auto conv = pimpl_->getConversation(convId)) {
3310 std::lock_guard lk(conv->mtx);
3311 conv->conversation->addGitSocket(
DeviceId(deviceId), channel);
3313 JAMI_WARNING(
"addGitSocket: Unable to find conversation {:s}", convId);
3317ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view convId)
3319 pimpl_->withConversation(convId, [&](
auto& conv) { conv.removeGitSocket(
DeviceId(deviceId)); });
3323ConversationModule::shutdownConnections()
3325 for (
const auto& c : pimpl_->getSyncedConversations()) {
3326 std::lock_guard lkc(c->mtx);
3327 if (c->conversation)
3328 c->conversation->shutdownConnections();
3330 c->pending->socket = {};
3334ConversationModule::addSwarmChannel(
const std::string& conversationId,
3335 std::shared_ptr<dhtnet::ChannelSocket> channel)
3337 pimpl_->withConversation(conversationId,
3338 [&](
auto& conv) { conv.addSwarmChannel(std::move(channel)); });
3342ConversationModule::connectivityChanged()
3344 for (
const auto& conv : pimpl_->getConversations())
3345 conv->connectivityChanged();
3348std::shared_ptr<Typers>
3349ConversationModule::getTypers(
const std::string& convId)
3351 if (
auto c = pimpl_->getConversation(convId)) {
3352 std::lock_guard lk(c->mtx);
3353 if (c->conversation)
3354 return c->conversation->typers();
std::string getNewCallID() const
ConnectionState
Tell where we're at with the call.
std::map< std::string, ConversationRequest > conversationsRequests_
std::string getOneToOneConversation(const std::string &uri) const noexcept
void fallbackClone(const asio::error_code &ec, const std::string &conversationId)
bool removeConversationImpl(SyncedConversation &conv)
std::shared_ptr< AccountManager > accountManager_
void editMessage(const std::string &conversationId, const std::string &newBody, const std::string &editedId)
void saveConvInfos() const
void saveConvRequests() const
bool isConversation(const std::string &convId) const
std::vector< std::map< std::string, std::string > > getConversationMembers(const std::string &conversationId, bool includeBanned=false) const
Get members.
std::optional< ConversationRequest > getRequest(const std::string &id) const
std::vector< std::shared_ptr< SyncedConversation > > getSyncedConversations() const
std::map< std::string, std::vector< std::map< std::string, std::string > > > replay_
std::shared_ptr< SyncedConversation > startConversation(const ConvInfo &info)
void rmConversationRequest(const std::string &id)
auto withConversation(const S &convId, T &&cb)
void fetchNewCommits(const std::string &peer, const std::string &deviceId, const std::string &conversationId, const std::string &commitId="")
Pull remote device.
std::shared_ptr< SyncedConversation > getConversation(std::string_view convId) const
std::map< std::string, std::shared_ptr< SyncedConversation >, std::less<> > conversations_
void sendMessage(const std::string &conversationId, Json::Value &&value, const std::string &replyTo="", bool announce=true, OnCommitCb &&onCommit={}, OnDoneCb &&cb={})
void declineOtherConversationWith(const std::string &uri)
void fixStructures(std::shared_ptr< JamiAccount > account, const std::vector< std::tuple< std::string, std::string, std::string > > &updateContactConv, const std::set< std::string > &toRm)
void bootstrap(const std::string &convId)
std::map< std::string, std::string > notSyncedNotification_
void cloneConversation(const std::string &deviceId, const std::string &peer, const std::string &convId)
Clone a conversation (initial) from device.
void removeRepositoryImpl(SyncedConversation &conv, bool sync, bool force=false)
std::mutex conversationsMtx_
bool addConversationRequest(const std::string &id, const ConversationRequest &req)
void bootstrapCb(std::string convId)
void removeRepository(const std::string &convId, bool sync, bool force=false)
Remove a repository and all files.
OneToOneRecvCb oneToOneRecvCb_
NeedsSyncingCb needsSyncingCb_
void cloneConversationFrom(const std::shared_ptr< SyncedConversation > conv, const std::string &deviceId, const std::string &oldConvId="")
std::vector< std::shared_ptr< Conversation > > getConversations() const
std::map< std::string, ConvInfo > convInfos_
bool removeConversation(const std::string &conversationId)
Remove a conversation.
std::shared_ptr< SyncedConversation > getConversation(std::string_view convId)
std::mutex notSyncedNotificationMtx_
std::map< std::string, std::map< std::string, std::string > > syncingMetadatas_
std::mutex conversationsRequestsMtx_
std::map< std::string, uint64_t > refreshMessage
std::weak_ptr< Impl > weak()
auto withConv(const S &convId, T &&cb) const
NeedSocketCb onNeedSwarmSocket_
NeedSocketCb onNeedSocket_
std::weak_ptr< JamiAccount > account_
void addConvInfo(const ConvInfo &info)
void handlePendingConversation(const std::string &conversationId, const std::string &deviceId)
Handle events to receive new commits.
void setConversationMembers(const std::string &convId, const std::set< std::string > &members)
void sendMessageNotification(const std::string &conversationId, bool sync, const std::string &commitId="", const std::string &deviceId="")
Send a message notification to all members.
std::shared_ptr< SyncedConversation > startConversation(const std::string &convId)
Impl(std::shared_ptr< JamiAccount > &&account, std::shared_ptr< AccountManager > &&accountManager, NeedsSyncingCb &&needsSyncingCb, SengMsgCb &&sendMsgCb, NeedSocketCb &&onNeedSocket, NeedSocketCb &&onNeedSwarmSocket, OneToOneRecvCb &&oneToOneRecvCb)
bool updateConvForContact(const std::string &uri, const std::string &oldConv, const std::string &newConv)
const std::string accountId_
void removeContact(const std::string &uri, bool ban)
Remove one to one conversations related to a contact.
void onConversationRequest(const std::string &from, const Json::Value &value)
Called when receiving a new conversation's request.
bool onMessageDisplayed(const std::string &peer, const std::string &conversationId, const std::string &interactionId)
void editMessage(const std::string &conversationId, const std::string &newBody, const std::string &editedId)
static void saveConvInfosToPath(const std::filesystem::path &path, const std::map< std::string, ConvInfo > &conversations)
static void saveConvInfos(const std::string &accountId, const std::map< std::string, ConvInfo > &conversations)
void search(uint32_t req, const std::string &convId, const Filter &filter) const
Search in conversations via a filter.
void initReplay(const std::string &oldConvId, const std::string &newConvId)
std::vector< std::map< std::string, std::string > > getConversationRequests() const
Return conversation's requests.
void syncConversations(const std::string &peer, const std::string &deviceId)
Sync conversations with detected peer.
std::shared_ptr< Conversation > getConversation(const std::string &convId)
Get a conversation.
void setAccountManager(std::shared_ptr< AccountManager > accountManager)
void addConversationMember(const std::string &conversationId, const dht::InfoHash &contactUri, bool sendRequest=true)
Adds a new member to a conversation (this will triggers a member event + new message on success)
bool onFileChannelRequest(const std::string &conversationId, const std::string &member, const std::string &fileId, bool verifyShaSum=true) const
Choose if we can accept channel request.
void clearPendingFetch()
Clear not removed fetch.
void reloadRequests()
Reload requests from file.
void onNeedConversationRequest(const std::string &from, const std::string &conversationId)
Called when a peer needs an invite for a conversation (generally after that they received a commit no...
uint32_t loadConversationMessages(const std::string &conversationId, const std::string &fromMessage="", size_t n=0)
Load conversation's messages.
static void saveConvRequestsToPath(const std::filesystem::path &path, const std::map< std::string, ConversationRequest > &conversationsRequests)
std::map< std::string, std::map< std::string, std::string > > convPreferences() const
Retrieve all conversation preferences to sync with other devices.
std::vector< std::string > getConversations() const
Return all conversation's id (including syncing ones)
static void saveConvRequests(const std::string &accountId, const std::map< std::string, ConversationRequest > &conversationsRequests)
void setConversationPreferences(const std::string &conversationId, const std::map< std::string, std::string > &prefs)
Update user's preferences (like color, notifications, etc) to be synced across devices.
bool needsSyncingWith(const std::string &memberUri, const std::string &deviceId) const
Check if we need to share infos with a contact.
ConversationModule(std::shared_ptr< JamiAccount > account, std::shared_ptr< AccountManager > accountManager, NeedsSyncingCb &&needsSyncingCb, SengMsgCb &&sendMsgCb, NeedSocketCb &&onNeedSocket, NeedSocketCb &&onNeedSwarmSocket, OneToOneRecvCb &&oneToOneRecvCb, bool autoLoadConversations=true)
std::map< std::string, std::string > getConversationPreferences(const std::string &conversationId, bool includeCreated=false) const
static std::map< std::string, ConversationRequest > convRequests(const std::string &accountId)
std::map< std::string, std::string > conversationInfos(const std::string &conversationId) const
void acceptConversationRequest(const std::string &conversationId, const std::string &deviceId="")
Accept a conversation's request.
std::shared_ptr< SIPCall > call(const std::string &url, const std::vector< libjami::MediaMap > &mediaList, std::function< void(const std::string &, const DeviceId &, const std::shared_ptr< SIPCall > &)> &&cb)
Call the conversation.
bool updateConvForContact(const std::string &uri, const std::string &oldConv, const std::string &newConv)
Replace linked conversation in contact's details.
std::string getOneToOneConversation(const std::string &uri) const noexcept
Get related conversation with member.
void bootstrap(const std::string &convId="")
Bootstrap swarm managers to other peers.
std::string startConversation(ConversationMode mode=ConversationMode::INVITES_ONLY, const dht::InfoHash &otherMember={})
Starts a new conversation.
void fetchNewCommits(const std::string &peer, const std::string &deviceId, const std::string &conversationId, const std::string &commitId)
Launch fetch on new commit.
uint32_t loadConversationUntil(const std::string &conversationId, const std::string &fromMessage, const std::string &to)
void setFetched(const std::string &conversationId, const std::string &deviceId, const std::string &commit)
Notify that a peer fetched a commit.
void sendMessage(const std::string &conversationId, Json::Value &&value, const std::string &replyTo="", bool announce=true, OnCommitCb &&onCommit={}, OnDoneCb &&cb={})
bool isHosting(const std::string &conversationId, const std::string &confId) const
Check if we're hosting a specific conference.
uint32_t loadConversation(const std::string &conversationId, const std::string &fromMessage="", size_t n=0)
void removeConversationMember(const std::string &conversationId, const dht::InfoHash &contactUri, bool isDevice=false)
Remove a member from a conversation (this will trigger a member event + new message on success)
uint32_t countInteractions(const std::string &convId, const std::string &toId, const std::string &fromId, const std::string &authorUri) const
Retrieve the number of interactions from interactionId to HEAD.
bool downloadFile(const std::string &conversationId, const std::string &interactionId, const std::string &fileId, const std::string &path)
Ask conversation's members to send a file to this device.
std::vector< uint8_t > conversationVCard(const std::string &conversationId) const
static std::map< std::string, ConvInfo > convInfos(const std::string &accountId)
void addCallHistoryMessage(const std::string &uri, uint64_t duration_ms, const std::string &reason)
Add to the related conversation the call history message.
void clearCache(const std::string &conversationId)
Clear loaded interactions.
void reactToMessage(const std::string &conversationId, const std::string &newBody, const std::string &reactToId)
std::vector< std::map< std::string, std::string > > getConversationMembers(const std::string &conversationId, bool includeBanned=false) const
Get members.
void hostConference(const std::string &conversationId, const std::string &confId, const std::string &callId, const std::vector< libjami::MediaMap > &mediaList={})
uint32_t loadSwarmUntil(const std::string &conversationId, const std::string &fromMessage, const std::string &toMessage)
void onSyncData(const SyncMsg &msg, const std::string &peerId, const std::string &deviceId)
Detect new conversations and request from other devices.
void loadConversations()
Refresh information about conversations.
void addConvInfo(const ConvInfo &info)
void loadSingleConversation(const std::string &convId)
bool isBanned(const std::string &convId, const std::string &uri) const
Return if a device or member is banned from a conversation.
std::vector< std::map< std::string, std::string > > getActiveCalls(const std::string &conversationId) const
Return active calls.
void updateConversationInfos(const std::string &conversationId, const std::map< std::string, std::string > &infos, bool sync=true)
Update metadatas from conversations (like title, avatar, etc)
void cloneConversationFrom(const std::string &conversationId, const std::string &uri, const std::string &oldConvId="")
Clone conversation from a member.
std::string peerFromConversationRequest(const std::string &convId) const
Retrieve author of a conversation request.
void declineConversationRequest(const std::string &conversationId)
Decline a conversation's request.
void onTrustRequest(const std::string &uri, const std::string &conversationId, const std::vector< uint8_t > &payload, time_t received)
Called when detecting a new trust request with linked one to one.
std::shared_ptr< TransferManager > dataTransfer(const std::string &id) const
Returns related transfer manager.
bool removeConversation(const std::string &conversationId)
Remove a conversation, but not the contact.
std::map< std::string, std::map< std::string, std::map< std::string, std::string > > > convMessageStatus() const
static std::map< std::string, std::string > infosFromVCard(std::map< std::string, std::string > &&details)
std::set< std::string > memberUris(std::string_view filter={}, const std::set< MemberRole > &filteredRoles={MemberRole::INVITED, MemberRole::LEFT, MemberRole::BANNED}) const
std::string id() const
Get conversation's id.
std::vector< NodeId > peersToSyncWith() const
Get peers to sync with.
std::string uriFromDevice(const std::string &deviceId) const
Retrieve the uri from a deviceId.
bool isBootstraped() const
Check if we're at least connected to one node.
std::string lastCommitId() const
Get last commit id.
static LIBJAMI_TEST_EXPORT Manager & instance()
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_WARNING(formatstr,...)
#define JAMI_LOG(formatstr,...)
static constexpr const char * LAST_DISPLAYED
const std::filesystem::path & get_data_dir()
std::string sha3File(const std::filesystem::path &path)
std::vector< uint8_t > loadFile(const std::filesystem::path &path, const std::filesystem::path &default_dir)
Read the full content of a file at path.
std::string toString(const Json::Value &jsonVal)
static constexpr const char MIME_TYPE_INVITE[]
std::function< void(const std::string &, const std::string &, ChannelCb &&, const std::string &)> NeedSocketCb
static constexpr std::string_view toString(AuthDecodingState state)
static constexpr const char MIME_TYPE_GIT[]
void emitSignal(Args... args)
std::function< void(bool, const std::string &)> OnDoneCb
constexpr std::chrono::seconds MAX_FALLBACK
std::function< void(const std::string &, const std::string &)> OneToOneRecvCb
static constexpr float kd
std::function< void(const std::string &)> OnCommitCb
std::vector< std::string_view > split_string(std::string_view str, char delim)
std::function< void(std::shared_ptr< SyncMsg > &&)> NeedsSyncingCb
std::function< uint64_t(const std::string &, const DeviceId &, std::map< std::string, std::string >, uint64_t)> SengMsgCb
std::map< std::string, ConvInfo > ConvInfoMap
static constexpr const char CONVERSATIONID[]
static constexpr const char FROM[]
std::map< std::string, std::string > toMap(std::string_view content)
Payload to vCard.
SIPCall are SIP implementation of a normal Call.
std::set< std::string > members
A ConversationRequest is a request which corresponds to a trust request, but for conversations It's s...
std::map< std::string, std::string > metadatas
std::string conversationId
std::map< std::string, std::string > toMap() const
std::set< std::string > connectingTo
std::map< std::string, std::map< std::string, std::string > > status
std::map< std::string, std::string > preferences
std::shared_ptr< dhtnet::ChannelSocket > socket
std::unique_ptr< asio::steady_timer > fallbackClone
void stopFetch(const std::string &deviceId)
std::chrono::seconds fallbackTimer
bool startFetch(const std::string &deviceId, bool checkIfConv=false)
SyncedConversation(const std::string &convId)
std::vector< std::map< std::string, std::string > > getMembers(bool includeLeft, bool includeBanned) const
std::unique_ptr< PendingConversationFetch > pending
SyncedConversation(const ConvInfo &info)
std::shared_ptr< Conversation > conversation