22#include <dhtnet/multiplexed_socket.h>
35 std::map<
DeviceId , std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>
42 void syncInfos(
const std::shared_ptr<dhtnet::ChannelSocket>& socket,
43 const std::shared_ptr<SyncMsg>&
syncMsg);
54 const std::shared_ptr<SyncMsg>&
syncMsg)
56 auto acc = account_.lock();
65 if (
auto info = acc->accountManager()->getInfo()) {
68 msg.
ds = info->contacts->getSyncData();
70 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
86 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
97 msg.
cr = std::move(cr);
99 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
106 auto convModule = acc->convModule(
true);
110 auto p = convModule->convPreferences();
113 msg.
p = std::move(p);
115 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
123 auto ms = convModule->convMessageStatus();
126 msg.
ms = std::move(ms);
128 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
138 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
168 const std::string& peerId,
171 std::lock_guard
lk(pimpl_->syncConnectionsMtx_);
172 pimpl_->syncConnections_[device].emplace_back(socket);
174 socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)]() {
175 if (auto shared = w.lock())
176 shared->onChannelShutdown(s.lock(), device);
179 struct DecodingContext
181 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t,
void*) {
return true; },
186 socket->setOnRecv([acc = pimpl_->account_.lock(),
189 ctx = std::make_shared<DecodingContext>()](
const uint8_t*
buf,
size_t len) {
193 ctx->pac.reserve_buffer(len);
194 std::copy_n(
buf, len,
ctx->pac.buffer());
195 ctx->pac.buffer_consumed(len);
197 msgpack::object_handle
oh;
199 while (
ctx->pac.next(
oh)) {
201 oh.get().convert(
msg);
202 if (
auto manager = acc->accountManager())
203 manager->onSyncData(std::move(
msg.ds),
false);
205 if (!
msg.c.empty() || !
msg.cr.empty() || !
msg.p.empty() || !
msg.ld.empty()
207 if (
auto cm = acc->convModule(
true))
208 cm->onSyncData(
msg, peerId, device.toString());
210 }
catch (
const std::exception&
e) {
217 pimpl_->syncInfos(socket,
nullptr);
223 std::lock_guard
lk(pimpl_->syncConnectionsMtx_);
224 auto it = pimpl_->syncConnections_.find(deviceId);
225 if (
it == pimpl_->syncConnections_.end())
227 return !
it->second.empty();
233 std::lock_guard
lk(pimpl_->syncConnectionsMtx_);
234 for (
auto& [
did,
sockets] : pimpl_->syncConnections_) {
236 if (!deviceId || deviceId ==
did) {
static std::map< std::string, ConversationRequest > convRequests(const std::string &accountId)
static std::map< std::string, ConvInfo > convInfos(const std::string &accountId)
void syncInfos(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const std::shared_ptr< SyncMsg > &syncMsg)
Build SyncMsg and send it on socket.
std::map< DeviceId, std::vector< std::shared_ptr< dhtnet::ChannelSocket > > > syncConnections_
std::recursive_mutex syncConnectionsMtx_
Impl(std::weak_ptr< JamiAccount > &&account)
void onChannelShutdown(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const DeviceId &device)
std::weak_ptr< JamiAccount > account_
SyncModule(std::weak_ptr< JamiAccount > &&account)
void syncWithConnected(const std::shared_ptr< SyncMsg > &syncMsg=nullptr, const DeviceId &deviceId={})
Send sync to all connected devices.
bool isConnected(const DeviceId &deviceId) const
void cacheSyncConnection(std::shared_ptr< dhtnet::ChannelSocket > &&socket, const std::string &peerId, const DeviceId &deviceId)
Store a new Sync connection.
#define JAMI_ERROR(formatstr,...)
#define JAMI_WARNING(formatstr,...)
void emitSignal(Args... args)
std::map< std::string, std::map< std::string, std::string > > p
std::map< std::string, std::map< std::string, std::map< std::string, std::string > > > ms
std::map< std::string, ConversationRequest > cr
std::map< std::string, ConvInfo > c