37#include <asio/io_context.hpp>
38#include <asio/post.hpp>
43 const std::string& streamId,
44 const std::shared_ptr<MediaRecorder>&
rec)
46 , rtcpCheckerThread_([] {
return true; }, [
this] { processRtcpChecker(); }, [] {})
50 JAMI_DEBUG(
"Created Audio RTP session: {} - stream id {}", fmt::ptr(
this), streamId_);
64AudioRtpSession::startSender()
66 std::lock_guard lock(
mutex_);
67 JAMI_DEBUG(
"Start audio RTP sender: input [{}] - muted [{}]",
input_, muteState_ ?
"YES" :
"NO");
75 audioInput_->detach(sender_.get());
84 audioInput_->detach(sender_.get());
92 if (
pos != std::string::npos) {
100 audioInput_->setRecorderCallback([w =
weak_from_this()](
const MediaStream& ms) {
102 if (
auto shared = w.lock())
103 shared->attachLocalRecorder(ms);
106 audioInput_->setMuted(muteState_);
117 }
catch (
const std::exception&
e) {
118 JAMI_ERROR(
"Exception while retrieving audio parameters: {}",
e.what());
130 initSeqVal_ = sender_->getLastSeqValue() + 1;
135 }
catch (
const MediaEncoderException&
e) {
141 sender_->setVoiceCallback(voiceCallback_);
144 auto codec = std::static_pointer_cast<SystemAudioCodecInfo>(
send_.
codec);
145 audioInput_->setFormat(codec->audioformat);
146 audioInput_->attach(sender_.get());
149 rtcpCheckerThread_.
start();
155 std::lock_guard lock(
mutex_);
165AudioRtpSession::startReceiver()
172 receiveThread_.reset();
180 receiveThread_.reset(
183 receiveThread_->setRecorderCallback([w =
weak_from_this()](
const MediaStream& ms) {
185 if (
auto shared = w.lock())
186 shared->attachRemoteRecorder(ms);
191 receiveThread_->startReceiver();
200 std::lock_guard lock(
mutex_);
229 }
catch (
const std::runtime_error&
e) {
230 JAMI_ERROR(
"Socket creation failed: {}",
e.what());
241 std::lock_guard lock(
mutex_);
243 JAMI_DEBUG(
"[{}] Stopping receiver", fmt::ptr(
this));
245 if (
not receiveThread_)
251 receiveThread_->stopReceiver();
257 audioInput_->detach(sender_.get());
262 rtcpCheckerThread_.
join();
264 receiveThread_.reset();
274 if (
auto shared = w.lock()) {
275 std::lock_guard lock(shared->mutex_);
276 if (dir == Direction::SEND) {
277 shared->muteState_ = muted;
278 if (shared->audioInput_) {
279 shared->audioInput_->setMuted(muted);
282 if (shared->receiveThread_) {
283 auto ms = shared->receiveThread_->getInfo();
284 ms.name = shared->streamId_ +
":remote";
286 if (auto* ob = shared->recorder_->getStream(ms.name)) {
287 shared->receiveThread_->detach(ob);
288 shared->recorder_->removeStream(ms);
291 if (auto* ob = shared->recorder_->addStream(ms)) {
292 shared->receiveThread_->attach(ob);
302AudioRtpSession::setVoiceCallback(std::function<
void(
bool)> cb)
304 std::lock_guard lock(mutex_);
305 voiceCallback_ = std::move(cb);
307 sender_->setVoiceCallback(voiceCallback_);
312AudioRtpSession::check_RCTP_Info_RR(
RTCPInfo& rtcpi)
314 auto rtcpInfoVect = socketPair_->getRtcpRR();
315 unsigned totalLost = 0;
316 unsigned totalJitter = 0;
317 unsigned nbDropNotNull = 0;
318 auto vectSize = rtcpInfoVect.size();
321 for (
const auto& it : rtcpInfoVect) {
322 if (it.fraction_lost != 0)
324 totalLost += it.fraction_lost;
325 totalJitter += ntohl(it.jitter);
327 rtcpi.
packetLoss = nbDropNotNull ?
static_cast<float>((100 * totalLost) / (256.0 * nbDropNotNull)) : 0;
330 rtcpi.
jitter =
static_cast<unsigned int>(
331 (
static_cast<float>(totalJitter) /
static_cast<float>(vectSize) / 90000.0f) * 1000.0f);
333 rtcpi.
latency =
static_cast<float>(socketPair_->getLastLatency());
340AudioRtpSession::adaptQualityAndBitrate()
343 if (check_RCTP_Info_RR(rtcpi)) {
344 dropProcessing(&rtcpi);
349AudioRtpSession::dropProcessing(RTCPInfo* rtcpi)
351 auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
352 setNewPacketLoss(
static_cast<unsigned int>(pondLoss));
356AudioRtpSession::setNewPacketLoss(
unsigned int newPL)
358 newPL = std::clamp((
int) newPL, 0, 100);
359 if (newPL != packetLoss_) {
361 auto ret = sender_->setPacketLoss(newPL);
372AudioRtpSession::getPonderateLoss(
float lastLoss)
374 static float pond = 10.0f;
376 pond = floor(0.5 * lastLoss + 0.5 * pond);
377 if (lastLoss > pond) {
385AudioRtpSession::processRtcpChecker()
387 adaptQualityAndBitrate();
388 socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
392AudioRtpSession::attachRemoteRecorder(
const MediaStream& ms)
394 std::lock_guard lock(mutex_);
395 if (!recorder_ || !receiveThread_)
397 MediaStream remoteMS = ms;
398 remoteMS.name = streamId_ +
":remote";
399 if (
auto* ob = recorder_->addStream(remoteMS)) {
400 receiveThread_->attach(ob);
405AudioRtpSession::attachLocalRecorder(
const MediaStream& ms)
407 std::lock_guard lock(mutex_);
408 if (!recorder_ || !audioInput_)
410 MediaStream localMS = ms;
411 localMS.name = streamId_ +
":local";
412 if (
auto* ob = recorder_->addStream(localMS)) {
413 audioInput_->attach(ob);
418AudioRtpSession::initRecorder()
423 receiveThread_->setRecorderCallback([w = weak_from_this()](
const MediaStream& ms) {
424 asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
425 if (
auto shared = w.lock())
426 shared->attachRemoteRecorder(ms);
430 audioInput_->setRecorderCallback([w = weak_from_this()](
const MediaStream& ms) {
431 asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
432 if (
auto shared = w.lock())
433 shared->attachLocalRecorder(ms);
439AudioRtpSession::deinitRecorder()
443 if (receiveThread_) {
444 auto ms = receiveThread_->getInfo();
445 ms.name = streamId_ +
":remote";
446 if (
auto* ob = recorder_->getStream(ms.name)) {
447 receiveThread_->detach(ob);
448 recorder_->removeStream(ms);
452 auto ms = audioInput_->getInfo();
453 ms.name = streamId_ +
":local";
454 if (
auto* ob = recorder_->getStream(ms.name)) {
455 audioInput_->detach(ob);
456 recorder_->removeStream(ms);
void deinitRecorder() override
void setMuted(bool muted, Direction dir=Direction::SEND) override
void start(std::unique_ptr< dhtnet::IceSocket > rtp_sock, std::unique_ptr< dhtnet::IceSocket > rtcp_sock) override
virtual ~AudioRtpSession()
AudioRtpSession(const std::string &callId, const std::string &streamId, const std::shared_ptr< MediaRecorder > &rec)
void restartSender() override
std::string getSrtpKeyInfo() const
std::string getCryptoSuite() const
static LIBJAMI_TEST_EXPORT Manager & instance()
RingBufferPool & getRingBufferPool()
Return a pointer to the instance of the RingBufferPool.
std::shared_ptr< RingBuffer > createRingBuffer(const std::string &id)
Create a new ringbuffer with a default readoffset.
void bindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Attaches a reader the specified source.
static const char *const DEFAULT_ID
void unBindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Detaches a reader from the specified source.
std::string getRemoteRtpUri() const
std::recursive_mutex mutex_
MediaDescription receive_
const std::string streamId_
std::function< void(MediaType, bool)> onSuccessfulSetup_
std::unique_ptr< SocketPair > socketPair_
bool isRunning() const noexcept
#define JAMI_ERROR(formatstr,...)
#define JAMI_DEBUG(formatstr,...)
#define JAMI_WARNING(formatstr,...)
void emitSignal(Args... args)
static constexpr auto NEWPARAMS_TIMEOUT
std::shared_ptr< AudioInput > getAudioInput(const std::string &device)