23#include <dhtnet/multiplexed_socket.h>
24#include <dhtnet/channel_utils.h>
25#include <opendht/thread_pool.h>
45 void syncInfos(
const std::shared_ptr<dhtnet::ChannelSocket>& socket,
const std::shared_ptr<SyncMsg>&
syncMsg);
51 , accountId_ {
account->getAccountID()}
56 const std::shared_ptr<SyncMsg>&
syncMsg)
58 auto acc = account_.lock();
67 if (
auto info = acc->accountManager()->getInfo()) {
70 msg.
ds = info->contacts->getSyncData();
72 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
74 JAMI_ERROR(
"[Account {}] [device {}] {:s}", accountId_, socket->deviceId(),
ec.message());
86 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
88 JAMI_ERROR(
"[Account {}] [device {}] {:s}", accountId_, socket->deviceId(),
ec.message());
97 msg.
cr = std::move(cr);
99 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
101 JAMI_ERROR(
"[Account {}] [device {}] {:s}", accountId_, socket->deviceId(),
ec.message());
106 auto convModule = acc->convModule(
true);
110 auto p = convModule->convPreferences();
113 msg.
p = std::move(p);
115 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
117 JAMI_ERROR(
"[Account {}] [device {}] {:s}", accountId_, socket->deviceId(),
ec.message());
123 auto ms = convModule->convMessageStatus();
126 msg.
ms = std::move(ms);
128 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
130 JAMI_ERROR(
"[Account {}] [device {}] {:s}", accountId_, socket->deviceId(),
ec.message());
138 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
140 JAMI_ERROR(
"[Account {}] [device {}] {:s}", accountId_, socket->deviceId(),
ec.message());
156 JAMI_WARNING(
"[Account {}] [device {}] onChannelShutdown: no connection found.",
accountId_, device.to_view());
163 JAMI_LOG(
"[Account {}] [device {}] removed connection, remaining: {:d}",
173 const std::string& peerId,
176 std::lock_guard
lk(pimpl_->syncConnectionsMtx_);
177 pimpl_->syncConnections_[device].emplace_back(socket);
179 socket->setOnRecv(dhtnet::buildMsgpackReader<SyncMsg>([acc = pimpl_->account_, device, peerId](
SyncMsg&&
msg) {
180 auto account = acc.lock();
182 return std::make_error_code(std::errc::operation_canceled);
185 if (auto manager = account->accountManager())
186 manager->onSyncData(std::move(msg.ds), false);
188 if (!msg.c.empty() || !msg.cr.empty() || !msg.p.empty() || !msg.ld.empty() || !msg.ms.empty())
189 if (auto cm = account->convModule(true))
190 cm->onSyncData(msg, peerId, device.toString());
191 }
catch (
const std::exception&
e) {
192 JAMI_WARNING(
"[Account {}] [device {}] [convInfo] error on sync: {:s}",
197 return std::error_code();
199 socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)](
const std::error_code&) {
200 if (
auto shared = w.lock())
201 shared->onChannelShutdown(s.lock(), device);
204 dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket = std::move(socket)]() {
205 if (auto s = w.lock())
206 s->syncInfos(socket, nullptr);
213 std::lock_guard
lk(pimpl_->syncConnectionsMtx_);
214 auto it = pimpl_->syncConnections_.find(deviceId);
215 if (
it == pimpl_->syncConnections_.end())
217 return !
it->second.empty();
223 std::lock_guard
lk(pimpl_->syncConnectionsMtx_);
225 for (
const auto& [
did,
sockets] : pimpl_->syncConnections_) {
228 dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), s =
sockets.back(),
syncMsg] {
229 if (auto sthis = w.lock())
230 sthis->syncInfos(s, syncMsg);
235 JAMI_WARNING(
"[Account {}] [device {}] no sync connection.", pimpl_->accountId_, deviceId.toString());
237 JAMI_DEBUG(
"[Account {}] [device {}] syncing with {:d} devices", pimpl_->accountId_, deviceId.to_view(), count);
static std::map< std::string, ConversationRequest > convRequests(const std::string &accountId)
static std::map< std::string, ConvInfo > convInfos(const std::string &accountId)
void syncInfos(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const std::shared_ptr< SyncMsg > &syncMsg)
Build SyncMsg and send it on socket.
std::map< DeviceId, std::vector< std::shared_ptr< dhtnet::ChannelSocket > > > syncConnections_
std::recursive_mutex syncConnectionsMtx_
const std::string accountId_
void onChannelShutdown(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const DeviceId &device)
std::weak_ptr< JamiAccount > account_
Impl(const std::shared_ptr< JamiAccount > &account)
SyncModule(const std::shared_ptr< JamiAccount > &account)
void syncWithConnected(const std::shared_ptr< SyncMsg > &syncMsg=nullptr, const DeviceId &deviceId={})
Send sync to all connected devices.
bool isConnected(const DeviceId &deviceId) const
void cacheSyncConnection(std::shared_ptr< dhtnet::ChannelSocket > &&socket, const std::string &peerId, const DeviceId &deviceId)
Store a new Sync connection.
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_WARNING(formatstr,...)
#define JAMI_LOG(formatstr,...)
void emitSignal(Args... args)
std::map< std::string, std::map< std::string, std::string > > p
std::map< std::string, std::map< std::string, std::map< std::string, std::string > > > ms
std::map< std::string, ConversationRequest > cr
std::map< std::string, ConvInfo > c