Ring Daemon 16.0.0
Loading...
Searching...
No Matches
audio_rtp_session.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "libav_deps.h" // MUST BE INCLUDED FIRST
19
20#include "audio_rtp_session.h"
21
22#include "logger.h"
23#include "noncopyable.h"
24#include "sip/sdp.h"
25
27#include "audio_sender.h"
28#include "socket_pair.h"
29#include "media_recorder.h"
30#include "media_encoder.h"
31#include "media_decoder.h"
32#include "media_io_handle.h"
33#include "media_device.h"
34#include "media_const.h"
35
36#include "audio/audio_input.h"
38#include "audio/resampler.h"
39#include "client/videomanager.h"
40#include "manager.h"
41#include "observer.h"
42
43#include <asio/io_context.hpp>
44#include <sstream>
45
46namespace jami {
47
48AudioRtpSession::AudioRtpSession(const std::string& callId,
49 const std::string& streamId,
50 const std::shared_ptr<MediaRecorder>& rec)
51 : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO)
52 , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
53
54{
55 recorder_ = rec;
56 JAMI_DEBUG("Created Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
57
58 // don't move this into the initializer list or Cthulus will emerge
59 ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_);
60}
61
63{
65 stop();
66 JAMI_DEBUG("Destroyed Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
67}
68
69void
70AudioRtpSession::startSender()
71{
72 std::lock_guard lock(mutex_);
73 JAMI_DEBUG("Start audio RTP sender: input [{}] - muted [{}]",
74 input_,
75 muteState_ ? "YES" : "NO");
76
78 JAMI_WARNING("Audio sending disabled");
79 if (sender_) {
80 if (socketPair_)
81 socketPair_->interrupt();
82 if (audioInput_)
83 audioInput_->detach(sender_.get());
84 sender_.reset();
85 }
86 return;
87 }
88
89 if (sender_)
90 JAMI_WARNING("Restarting audio sender");
91 if (audioInput_)
92 audioInput_->detach(sender_.get());
93
94 bool fileAudio = !input_.empty() && input_.find("file://") != std::string::npos;
96 if (fileAudio) {
97 auto suffix = input_;
98 static const std::string& sep = libjami::Media::VideoProtocolPrefix::SEPARATOR;
99 const auto pos = input_.find(sep);
100 if (pos != std::string::npos) {
101 suffix = input_.substr(pos + sep.size());
102 }
104 }
105
106 // sender sets up input correctly, we just keep a reference in case startSender is called
107 audioInput_ = jami::getAudioInput(audioInputId);
108 audioInput_->setRecorderCallback(
109 [w=weak_from_this()](const MediaStream& ms) {
110 Manager::instance().ioContext()->post([w=std::move(w), ms]() {
111 if (auto shared = w.lock())
112 shared->attachLocalRecorder(ms);
113 });
114 });
115 audioInput_->setMuted(muteState_);
116 audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
117 if (!fileAudio) {
118 auto newParams = audioInput_->switchInput(input_);
119 try {
120 if (newParams.valid()
121 && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
122 localAudioParams_ = newParams.get();
123 } else {
124 JAMI_ERROR("No valid new audio parameters");
125 return;
126 }
127 } catch (const std::exception& e) {
128 JAMI_ERROR("Exception while retrieving audio parameters: {}", e.what());
129 return;
130 }
131 }
132 if (streamId_ != audioInput_->getId())
134
135 send_.fecEnabled = true;
136
137 // be sure to not send any packets before saving last RTP seq value
138 socketPair_->stopSendOp();
139 if (sender_)
140 initSeqVal_ = sender_->getLastSeqValue() + 1;
141 try {
142 sender_.reset();
143 socketPair_->stopSendOp(false);
144 sender_.reset(new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
145 } catch (const MediaEncoderException& e) {
146 JAMI_ERROR("{}", e.what());
147 send_.enabled = false;
148 }
149
150 if (voiceCallback_)
151 sender_->setVoiceCallback(voiceCallback_);
152
153 // NOTE do after sender/encoder are ready
154 auto codec = std::static_pointer_cast<SystemAudioCodecInfo>(send_.codec);
155 audioInput_->setFormat(codec->audioformat);
156 audioInput_->attach(sender_.get());
157
158 if (not rtcpCheckerThread_.isRunning())
159 rtcpCheckerThread_.start();
160}
161
162void
164{
165 std::lock_guard lock(mutex_);
166 // ensure that start has been called before restart
167 if (not socketPair_) {
168 return;
169 }
170
171 startSender();
172}
173
174void
175AudioRtpSession::startReceiver()
176{
177 if (socketPair_)
178 socketPair_->setReadBlockingMode(true);
179
181 JAMI_WARNING("Audio receiving disabled");
182 receiveThread_.reset();
183 return;
184 }
185
186 if (receiveThread_)
187 JAMI_WARNING("Restarting audio receiver");
188
189 auto accountAudioCodec = std::static_pointer_cast<SystemAudioCodecInfo>(receive_.codec);
190 receiveThread_.reset(new AudioReceiveThread(streamId_,
191 accountAudioCodec->audioformat,
193 mtu_));
194
195 receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
196 Manager::instance().ioContext()->post([w=std::move(w), ms]() {
197 if (auto shared = w.lock())
198 shared->attachRemoteRecorder(ms);
199 });
200 });
201 receiveThread_->addIOContext(*socketPair_);
202 receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
203 receiveThread_->startReceiver();
204}
205
206void
207AudioRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
208{
209 std::lock_guard lock(mutex_);
210
212 stop();
213 return;
214 }
215
216 try {
217 if (rtp_sock and rtcp_sock) {
218 if (send_.addr) {
219 rtp_sock->setDefaultRemoteAddress(send_.addr);
220 }
221
223 if (rtcpAddr) {
224 rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
225 }
226
227 socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
228 } else {
229 socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
230 }
231
233 socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
235 send_.crypto.getCryptoSuite().c_str(),
236 send_.crypto.getSrtpKeyInfo().c_str());
237 }
238 } catch (const std::runtime_error& e) {
239 JAMI_ERROR("Socket creation failed: {}", e.what());
240 return;
241 }
242
243 startSender();
244 startReceiver();
245}
246
247void
249{
250 std::lock_guard lock(mutex_);
251
252 JAMI_DEBUG("[{}] Stopping receiver", fmt::ptr(this));
253
254 if (not receiveThread_)
255 return;
256
257 if (socketPair_)
258 socketPair_->setReadBlockingMode(false);
259
260 receiveThread_->stopReceiver();
261
262 if (audioInput_)
263 audioInput_->detach(sender_.get());
264
265 if (socketPair_)
266 socketPair_->interrupt();
267
268 rtcpCheckerThread_.join();
269
270 receiveThread_.reset();
271 sender_.reset();
272 socketPair_.reset();
273 audioInput_.reset();
274}
275
276void
278{
280 if (auto shared = w.lock()) {
281 std::lock_guard lock(shared->mutex_);
282 if (dir == Direction::SEND) {
283 shared->muteState_ = muted;
284 if (shared->audioInput_) {
285 shared->audioInput_->setMuted(muted);
286 }
287 } else {
288 if (shared->receiveThread_) {
289 auto ms = shared->receiveThread_->getInfo();
290 ms.name = shared->streamId_ + ":remote";
291 if (muted) {
292 if (auto ob = shared->recorder_->getStream(ms.name)) {
293 shared->receiveThread_->detach(ob);
294 shared->recorder_->removeStream(ms);
295 }
296 } else {
297 if (auto ob = shared->recorder_->addStream(ms)) {
298 shared->receiveThread_->attach(ob);
299 }
300 }
301 }
302 }
303 }
304 });
305}
306
307void
308AudioRtpSession::setVoiceCallback(std::function<void(bool)> cb)
309{
310 std::lock_guard lock(mutex_);
311 voiceCallback_ = std::move(cb);
312 if (sender_) {
313 sender_->setVoiceCallback(voiceCallback_);
314 }
315}
316
317bool
318AudioRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
319{
320 auto rtcpInfoVect = socketPair_->getRtcpRR();
321 unsigned totalLost = 0;
322 unsigned totalJitter = 0;
323 unsigned nbDropNotNull = 0;
324 auto vectSize = rtcpInfoVect.size();
325
326 if (vectSize != 0) {
327 for (const auto& it : rtcpInfoVect) {
328 if (it.fraction_lost != 0) // Exclude null drop
329 nbDropNotNull++;
330 totalLost += it.fraction_lost;
331 totalJitter += ntohl(it.jitter);
332 }
333 rtcpi.packetLoss = nbDropNotNull ? (float) (100 * totalLost) / (256.0 * nbDropNotNull) : 0;
334 // Jitter is expressed in timestamp unit -> convert to milliseconds
335 // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
336 rtcpi.jitter = (totalJitter / vectSize / 90000.0f) * 1000;
337 rtcpi.nb_sample = vectSize;
338 rtcpi.latency = socketPair_->getLastLatency();
339 return true;
340 }
341 return false;
342}
343
344void
345AudioRtpSession::adaptQualityAndBitrate()
346{
347 RTCPInfo rtcpi {};
348 if (check_RCTP_Info_RR(rtcpi)) {
349 dropProcessing(&rtcpi);
350 }
351}
352
353void
354AudioRtpSession::dropProcessing(RTCPInfo* rtcpi)
355{
356 auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
357 setNewPacketLoss(pondLoss);
358}
359
360void
361AudioRtpSession::setNewPacketLoss(unsigned int newPL)
362{
363 newPL = std::clamp((int) newPL, 0, 100);
364 if (newPL != packetLoss_) {
365 if (sender_) {
366 auto ret = sender_->setPacketLoss(newPL);
367 packetLoss_ = newPL;
368 if (ret == -1)
369 JAMI_ERROR("Fail to access the encoder");
370 } else {
371 JAMI_ERROR("Fail to access the sender");
372 }
373 }
374}
375
376float
377AudioRtpSession::getPonderateLoss(float lastLoss)
378{
379 static float pond = 10.0f;
380
381 pond = floor(0.5 * lastLoss + 0.5 * pond);
382 if (lastLoss > pond) {
383 return lastLoss;
384 } else {
385 return pond;
386 }
387}
388
389void
390AudioRtpSession::processRtcpChecker()
391{
392 adaptQualityAndBitrate();
393 socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
394}
395
396void
397AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
398{
399 std::lock_guard lock(mutex_);
400 if (!recorder_ || !receiveThread_)
401 return;
402 MediaStream remoteMS = ms;
403 remoteMS.name = streamId_ + ":remote";
404 if (auto ob = recorder_->addStream(remoteMS)) {
405 receiveThread_->attach(ob);
406 }
407}
408
409void
410AudioRtpSession::attachLocalRecorder(const MediaStream& ms)
411{
412 std::lock_guard lock(mutex_);
413 if (!recorder_ || !audioInput_)
414 return;
415 MediaStream localMS = ms;
416 localMS.name = streamId_ + ":local";
417 if (auto ob = recorder_->addStream(localMS)) {
418 audioInput_->attach(ob);
419 }
420}
421
422void
423AudioRtpSession::initRecorder()
424{
425 if (!recorder_)
426 return;
427 if (receiveThread_)
428 receiveThread_->setRecorderCallback(
429 [w=weak_from_this()](const MediaStream& ms) {
430 Manager::instance().ioContext()->post([w=std::move(w), ms]() {
431 if (auto shared = w.lock())
432 shared->attachRemoteRecorder(ms);
433 });
434 });
435 if (audioInput_)
436 audioInput_->setRecorderCallback(
437 [w=weak_from_this()](const MediaStream& ms) {
438 Manager::instance().ioContext()->post([w=std::move(w), ms]() {
439 if (auto shared = w.lock())
440 shared->attachLocalRecorder(ms);
441 });
442 });
443}
444
445void
446AudioRtpSession::deinitRecorder()
447{
448 if (!recorder_)
449 return;
450 if (receiveThread_) {
451 auto ms = receiveThread_->getInfo();
452 ms.name = streamId_ + ":remote";
453 if (auto ob = recorder_->getStream(ms.name)) {
454 receiveThread_->detach(ob);
455 recorder_->removeStream(ms);
456 }
457 }
458 if (audioInput_) {
459 auto ms = audioInput_->getInfo();
460 ms.name = streamId_ + ":local";
461 if (auto ob = recorder_->getStream(ms.name)) {
462 audioInput_->detach(ob);
463 recorder_->removeStream(ms);
464 }
465 }
466}
467
468} // namespace jami
void setMuted(bool muted, Direction dir=Direction::SEND) override
void start(std::unique_ptr< dhtnet::IceSocket > rtp_sock, std::unique_ptr< dhtnet::IceSocket > rtcp_sock) override
AudioRtpSession(const std::string &callId, const std::string &streamId, const std::shared_ptr< MediaRecorder > &rec)
std::string getSrtpKeyInfo() const
std::string getCryptoSuite() const
static LIBJAMI_TEST_EXPORT Manager & instance()
Definition manager.cpp:676
std::shared_ptr< asio::io_context > ioContext() const
Definition manager.cpp:1712
RingBufferPool & getRingBufferPool()
Return a pointer to the instance of the RingBufferPool.
Definition manager.cpp:3157
std::shared_ptr< RingBuffer > createRingBuffer(const std::string &id)
Create a new ringbuffer with a default readoffset.
void bindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Attaches a reader the specified source.
std::string input_
Definition rtp_session.h:81
std::string getRemoteRtpUri() const
Definition rtp_session.h:88
std::recursive_mutex mutex_
Definition rtp_session.h:76
MediaDescription receive_
Definition rtp_session.h:83
const std::string streamId_
Definition rtp_session.h:78
MediaDescription send_
Definition rtp_session.h:82
std::function< void(MediaType, bool)> onSuccessfulSetup_
Definition rtp_session.h:86
std::unique_ptr< SocketPair > socketPair_
Definition rtp_session.h:80
bool isRunning() const noexcept
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:226
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
void emitSignal(Args... args)
Definition ring_signal.h:64
static constexpr auto NEWPARAMS_TIMEOUT
std::shared_ptr< AudioInput > getAudioInput(const std::string &device)
@ MEDIA_AUDIO
Definition media_codec.h:47
static constexpr const char * SEPARATOR
Definition media_const.h:33
Simple macro to hide class' copy constructor and assignment operator.
dhtnet::IpAddr addr
Endpoint socket address.
dhtnet::IpAddr rtcp_addr
RTCP socket address.
std::shared_ptr< SystemCodecInfo > codec
RTP.
CryptoAttribute crypto
Crypto parameters.
std::string receiving_sdp
unsigned int jitter
unsigned int nb_sample