23#include <dhtnet/multiplexed_socket.h>
24#include <opendht/thread_pool.h>
37 std::map<
DeviceId , std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>
44 void syncInfos(
const std::shared_ptr<dhtnet::ChannelSocket>& socket,
45 const std::shared_ptr<SyncMsg>&
syncMsg);
56 const std::shared_ptr<SyncMsg>&
syncMsg)
58 auto acc = account_.lock();
67 if (
auto info = acc->accountManager()->getInfo()) {
70 msg.
ds = info->contacts->getSyncData();
72 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
88 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
99 msg.
cr = std::move(cr);
101 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
108 auto convModule = acc->convModule(
true);
112 auto p = convModule->convPreferences();
115 msg.
p = std::move(p);
117 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
125 auto ms = convModule->convMessageStatus();
128 msg.
ms = std::move(ms);
130 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
140 socket->write(
reinterpret_cast<const unsigned char*
>(
buffer.data()),
buffer.size(),
ec);
170 const std::string& peerId,
173 std::lock_guard
lk(pimpl_->syncConnectionsMtx_);
174 pimpl_->syncConnections_[device].emplace_back(socket);
176 socket->onShutdown([w = pimpl_->weak_from_this(), device, s = std::weak_ptr(socket)]() {
177 if (auto shared = w.lock())
178 shared->onChannelShutdown(s.lock(), device);
181 struct DecodingContext
183 msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t,
void*) {
return true; },
188 socket->setOnRecv([acc = pimpl_->account_.lock(),
191 ctx = std::make_shared<DecodingContext>()](
const uint8_t*
buf,
size_t len) {
195 ctx->pac.reserve_buffer(len);
196 std::copy_n(
buf, len,
ctx->pac.buffer());
197 ctx->pac.buffer_consumed(len);
199 msgpack::object_handle
oh;
201 while (
ctx->pac.next(
oh)) {
203 oh.get().convert(
msg);
204 if (
auto manager = acc->accountManager())
205 manager->onSyncData(std::move(
msg.ds),
false);
207 if (!
msg.c.empty() || !
msg.cr.empty() || !
msg.p.empty() || !
msg.ld.empty()
209 if (
auto cm = acc->convModule(
true))
210 cm->onSyncData(
msg, peerId, device.toString());
212 }
catch (
const std::exception&
e) {
219 dht::ThreadPool::io().run([w = pimpl_->weak_from_this(), socket]() {
220 if (auto s = w.lock())
221 s->syncInfos(socket, nullptr);
228 std::lock_guard
lk(pimpl_->syncConnectionsMtx_);
229 auto it = pimpl_->syncConnections_.find(deviceId);
230 if (
it == pimpl_->syncConnections_.end())
232 return !
it->second.empty();
238 std::lock_guard
lk(pimpl_->syncConnectionsMtx_);
239 for (
auto& [
did,
sockets] : pimpl_->syncConnections_) {
241 if (!deviceId || deviceId ==
did) {
static std::map< std::string, ConversationRequest > convRequests(const std::string &accountId)
static std::map< std::string, ConvInfo > convInfos(const std::string &accountId)
void syncInfos(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const std::shared_ptr< SyncMsg > &syncMsg)
Build SyncMsg and send it on socket.
std::map< DeviceId, std::vector< std::shared_ptr< dhtnet::ChannelSocket > > > syncConnections_
std::recursive_mutex syncConnectionsMtx_
Impl(std::weak_ptr< JamiAccount > &&account)
void onChannelShutdown(const std::shared_ptr< dhtnet::ChannelSocket > &socket, const DeviceId &device)
std::weak_ptr< JamiAccount > account_
SyncModule(std::weak_ptr< JamiAccount > &&account)
void syncWithConnected(const std::shared_ptr< SyncMsg > &syncMsg=nullptr, const DeviceId &deviceId={})
Send sync to all connected devices.
bool isConnected(const DeviceId &deviceId) const
void cacheSyncConnection(std::shared_ptr< dhtnet::ChannelSocket > &&socket, const std::string &peerId, const DeviceId &deviceId)
Store a new Sync connection.
#define JAMI_ERROR(formatstr,...)
#define JAMI_WARNING(formatstr,...)
void emitSignal(Args... args)
std::map< std::string, std::map< std::string, std::string > > p
std::map< std::string, std::map< std::string, std::map< std::string, std::string > > > ms
std::map< std::string, ConversationRequest > cr
std::map< std::string, ConvInfo > c