197 if (
auto shared = w.lock()) {
198 std::lock_guard<std::mutex> lk(shared->streamMtx_);
199 if (shared->stream_.is_open())
200 shared->stream_.write(reinterpret_cast<const char*>(buf), static_cast<long>(len));
201 shared->info_.bytesProgress = shared->stream_.tellp();
202 return static_cast<int>(len);
205 JAMI_ERROR(
"{} bytes received after IncomingFile destruction.", len);
209 auto shared = w.lock();
213 std::lock_guard<std::mutex>
lk(shared->streamMtx_);
214 if (shared->stream_ && shared->stream_.is_open())
215 shared->stream_.close();
217 auto correct = shared->sha3Sum_.empty();
220 if (shared->isUserCancelled_) {
221 std::filesystem::remove(shared->path_,
ec);
222 }
else if (shared->info_.bytesProgress < shared->info_.totalSize) {
223 JAMI_WARNING(
"Channel for {} shut down before transfer was complete (progress: {}/{})",
225 shared->info_.bytesProgress,
226 shared->info_.totalSize);
227 }
else if (shared->info_.totalSize != 0 && shared->info_.bytesProgress > shared->info_.totalSize) {
228 JAMI_WARNING(
"Removing {} larger than announced: {}/{}",
230 shared->info_.bytesProgress,
231 shared->info_.totalSize);
232 std::filesystem::remove(shared->path_,
ec);
235 if (shared->sha3Sum_ ==
sha3Sum) {
236 JAMI_LOG(
"New file received: {}", shared->info_.path);
240 "Removing {} with expected size ({} bytes) but invalid sha3sum (expected: {}, actual: {})",
242 shared->info_.totalSize,
245 std::filesystem::remove(shared->path_,
ec);
249 JAMI_ERROR(
"Failed to remove file {}: {}", shared->path_,
ec.message());
253 std::filesystem::rename(shared->path_, shared->info_.path,
ec);
255 JAMI_ERROR(
"Failed to rename file from {} to {}: {}", shared->path_, shared->info_.path,
ec.message());
259 if (shared->isUserCancelled_)
261 auto code =
correct ? libjami::DataTransferEventCode::finished : libjami::DataTransferEventCode::closed_by_host;
263 dht::ThreadPool::io().run([s = std::move(shared)] {});
346 const std::string& fileId,
347 const std::string& interactionId,
348 const std::string& path,
353 std::lock_guard
lk {pimpl_->mapMutex_};
354 if (pimpl_->outgoings_.find(channel) != pimpl_->outgoings_.end())
358 info.conversationId = pimpl_->to_;
360 auto f = std::make_shared<OutgoingFile>(channel, fileId, interactionId,
info, start, end);
361 f->onFinished([w = weak(), channel, onFinished = std::move(onFinished)](
uint32_t code) {
362 if (code ==
uint32_t(libjami::DataTransferEventCode::finished) && onFinished) {
366 dht::ThreadPool().computation().run([w, channel] {
367 if (
auto sthis_ = w.lock()) {
368 auto& pimpl = sthis_->pimpl_;
369 std::lock_guard lk {pimpl->mapMutex_};
370 auto itO =
pimpl->outgoings_.find(channel);
376 auto [
outFile,
_] = pimpl_->outgoings_.emplace(channel, std::move(
f));
377 dht::ThreadPool::io().run([w = std::weak_ptr<OutgoingFile>(
outFile->second)] {
378 if (auto of = w.lock())
402TransferManager::info(
const std::string& fileId, std::string& path, int64_t& total, int64_t& progress)
const noexcept
404 std::unique_lock lk {pimpl_->mapMutex_};
405 if (pimpl_->to_.empty())
408 auto itI = pimpl_->incomings_.find(fileId);
409 auto itW = pimpl_->waitingIds_.find(fileId);
410 path = this->path(fileId).string();
411 if (itI != pimpl_->incomings_.end()) {
412 total = itI->second->info().totalSize;
413 progress = itI->second->info().bytesProgress;
415 }
else if (std::filesystem::is_regular_file(path)) {
416 std::ifstream transfer(path, std::ios::binary);
417 transfer.seekg(0, std::ios::end);
418 progress = transfer.tellg();
419 if (itW != pimpl_->waitingIds_.end()) {
420 total =
static_cast<int64_t
>(itW->second.totalSize);
426 }
else if (itW != pimpl_->waitingIds_.end()) {
427 total =
static_cast<int64_t
>(itW->second.totalSize);
452TransferManager::onIncomingFileTransfer(
const std::string& fileId,
453 const std::shared_ptr<dhtnet::ChannelSocket>& channel,
456 std::lock_guard lk(pimpl_->mapMutex_);
458 auto itC = pimpl_->incomings_.find(fileId);
459 if (itC != pimpl_->incomings_.end()) {
460 dht::ThreadPool().io().run([channel] { channel->shutdown(); });
463 auto itW = pimpl_->waitingIds_.find(fileId);
464 if (itW == pimpl_->waitingIds_.end()) {
465 dht::ThreadPool().io().run([channel] { channel->shutdown(); });
470 info.accountId = pimpl_->accountId_;
471 info.conversationId = pimpl_->to_;
472 info.path = itW->second.path;
473 info.totalSize =
static_cast<int64_t
>(itW->second.totalSize);
474 info.bytesProgress =
static_cast<int64_t
>(start);
479 auto filePath = path(fileId);
480 if (info.path.empty()) {
481 info.path = filePath.string();
485 fileutils::createFileLink(filePath, info.path);
488 auto ifile = std::make_shared<IncomingFile>(std::move(channel),
491 itW->second.interactionId,
492 itW->second.sha3sum);
493 auto res = pimpl_->incomings_.emplace(fileId, std::move(ifile));
495 res.first->second->onFinished([w = weak(), fileId](uint32_t code) {
497 dht::ThreadPool().computation().run([w, fileId, code] {
498 if (
auto sthis_ = w.lock()) {
499 auto& pimpl = sthis_->pimpl_;
500 std::lock_guard lk {pimpl->mapMutex_};
501 auto itO = pimpl->incomings_.find(fileId);
502 if (itO != pimpl->incomings_.end())
503 pimpl->incomings_.erase(itO);
504 if (code == uint32_t(libjami::DataTransferEventCode::finished)) {
505 auto itW = pimpl->waitingIds_.find(fileId);
506 if (itW != pimpl->waitingIds_.end()) {
507 pimpl->waitingIds_.erase(itW);
508 pimpl->saveWaiting();
514 res.first->second->process();
525TransferManager::onIncomingProfile(
const std::shared_ptr<dhtnet::ChannelSocket>& channel,
const std::string& sha3Sum)
530 auto chName = channel->name();
531 std::string_view name = chName;
532 auto sep = name.find_last_of(
'?');
533 if (sep != std::string::npos)
534 name = name.substr(0, sep);
536 auto lastSep = name.find_last_of(
'/');
537 auto fileId = name.substr(lastSep + 1);
539 auto deviceId = channel->deviceId().toString();
540 auto cert = channel->peerCertificate();
541 if (!cert || !cert->issuer || fileId.find(
".vcf") == std::string::npos)
544 auto uri = fileId ==
"profile.vcf" ? cert->issuer->getId().toString()
545 : std::string(fileId.substr(0, fileId.size() - 4 ));
547 std::lock_guard lk(pimpl_->mapMutex_);
548 auto idx = std::make_pair(deviceId, uri);
550 auto itV = pimpl_->vcards_.find(idx);
551 if (itV != pimpl_->vcards_.end()) {
552 dht::ThreadPool().io().run([channel] { channel->shutdown(); });
558 info.accountId = pimpl_->accountId_;
559 info.conversationId = pimpl_->to_;
561 auto recvDir = fileutils::get_cache_dir() / pimpl_->accountId_ /
"vcard";
562 dhtnet::fileutils::recursive_mkdir(recvDir);
563 info.path = (recvDir / fmt::format(
"{:s}_{:s}_{}", deviceId, uri, tid)).
string();
565 auto ifile = std::make_shared<IncomingFile>(std::move(channel), info,
"profile.vcf",
"", sha3Sum);
566 auto res = pimpl_->vcards_.emplace(idx, std::move(ifile));
568 res.first->second->onFinished([w = weak(),
569 uri = std::move(uri),
570 deviceId = std::move(deviceId),
571 accountId = pimpl_->accountId_,
572 cert = std::move(cert),
573 path = info.path](uint32_t code) {
574 dht::ThreadPool().computation().run([w,
575 uri = std::move(uri),
576 deviceId = std::move(deviceId),
577 accountId = std::move(accountId),
578 path = std::move(path),
580 if (auto sthis_ = w.lock()) {
581 auto& pimpl = sthis_->pimpl_;
583 auto destPath = sthis_->profilePath(uri);
586 std::lock_guard lock(dhtnet::fileutils::getFileLock(destPath));
587 dhtnet::fileutils::recursive_mkdir(destPath.parent_path());
588 std::filesystem::rename(path, destPath);
589 if (!pimpl->accountUri_.empty() && uri == pimpl->accountUri_) {
591 if (!fileutils::createFileLink(pimpl->accountProfilePath_, destPath)) {
593 std::filesystem::copy_file(destPath, pimpl->accountProfilePath_, ec);
596 } catch (const std::exception& e) {
597 JAMI_ERROR(
"{}", e.what());
600 std::lock_guard lk {pimpl->mapMutex_};
601 auto itO = pimpl->vcards_.find({deviceId, uri});
602 if (itO != pimpl->vcards_.end())
603 pimpl->vcards_.erase(itO);
604 if (code == uint32_t(libjami::DataTransferEventCode::finished)) {
605 emitSignal<libjami::ConfigurationSignal::ProfileReceived>(accountId, uri, destPath.string());
610 res.first->second->process();