Ring Daemon
Loading...
Searching...
No Matches
audio_rtp_session.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "libav_deps.h" // MUST BE INCLUDED FIRST
19
20#include "audio_rtp_session.h"
21
22#include "logger.h"
23
25#include "audio_sender.h"
26#include "socket_pair.h"
27#include "media_recorder.h"
28#include "media_encoder.h"
29#include "media_device.h"
30#include "media_const.h"
31
32#include "audio/audio_input.h"
34#include "client/videomanager.h"
35#include "manager.h"
36
37#include <asio/io_context.hpp>
38#include <asio/post.hpp>
39
40namespace jami {
41
42AudioRtpSession::AudioRtpSession(const std::string& callId,
43 const std::string& streamId,
44 const std::shared_ptr<MediaRecorder>& rec)
45 : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO)
46 , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
47
48{
49 recorder_ = rec;
50 JAMI_DEBUG("Created Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
51
52 // don't move this into the initializer list or Cthulus will emerge
53 ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_);
54}
55
57{
59 stop();
60 JAMI_DEBUG("Destroyed Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
61}
62
63void
64AudioRtpSession::startSender()
65{
66 std::lock_guard lock(mutex_);
67 JAMI_DEBUG("Start audio RTP sender: input [{}] - muted [{}]", input_, muteState_ ? "YES" : "NO");
68
70 JAMI_WARNING("Audio sending disabled");
71 if (sender_) {
72 if (socketPair_)
73 socketPair_->interrupt();
74 if (audioInput_)
75 audioInput_->detach(sender_.get());
76 sender_.reset();
77 }
78 return;
79 }
80
81 if (sender_)
82 JAMI_WARNING("Restarting audio sender");
83 if (audioInput_)
84 audioInput_->detach(sender_.get());
85
86 bool fileAudio = !input_.empty() && input_.find("file://") != std::string::npos;
88 if (fileAudio) {
89 auto suffix = input_;
90 static const std::string& sep = libjami::Media::VideoProtocolPrefix::SEPARATOR;
91 const auto pos = input_.find(sep);
92 if (pos != std::string::npos) {
93 suffix = input_.substr(pos + sep.size());
94 }
96 }
97
98 // sender sets up input correctly, we just keep a reference in case startSender is called
99 audioInput_ = jami::getAudioInput(audioInputId);
100 audioInput_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
101 asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
102 if (auto shared = w.lock())
103 shared->attachLocalRecorder(ms);
104 });
105 });
106 audioInput_->setMuted(muteState_);
107 audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
108 if (!fileAudio) {
109 auto newParams = audioInput_->switchInput(input_);
110 try {
111 if (newParams.valid() && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
112 localAudioParams_ = newParams.get();
113 } else {
114 JAMI_ERROR("No valid new audio parameters");
115 return;
116 }
117 } catch (const std::exception& e) {
118 JAMI_ERROR("Exception while retrieving audio parameters: {}", e.what());
119 return;
120 }
121 }
122 if (streamId_ != audioInput_->getId())
124
125 send_.fecEnabled = true;
126
127 // be sure to not send any packets before saving last RTP seq value
128 socketPair_->stopSendOp();
129 if (sender_)
130 initSeqVal_ = sender_->getLastSeqValue() + 1;
131 try {
132 sender_.reset();
133 socketPair_->stopSendOp(false);
134 sender_.reset(new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
135 } catch (const MediaEncoderException& e) {
136 JAMI_ERROR("{}", e.what());
137 send_.enabled = false;
138 }
139
140 if (voiceCallback_)
141 sender_->setVoiceCallback(voiceCallback_);
142
143 // NOTE do after sender/encoder are ready
144 auto codec = std::static_pointer_cast<SystemAudioCodecInfo>(send_.codec);
145 audioInput_->setFormat(codec->audioformat);
146 audioInput_->attach(sender_.get());
147
148 if (not rtcpCheckerThread_.isRunning())
149 rtcpCheckerThread_.start();
150}
151
152void
154{
155 std::lock_guard lock(mutex_);
156 // ensure that start has been called before restart
157 if (not socketPair_) {
158 return;
159 }
160
161 startSender();
162}
163
164void
165AudioRtpSession::startReceiver()
166{
167 if (socketPair_)
168 socketPair_->setReadBlockingMode(true);
169
171 JAMI_WARNING("Audio receiving disabled");
172 receiveThread_.reset();
173 return;
174 }
175
176 if (receiveThread_)
177 JAMI_WARNING("Restarting audio receiver");
178
179 auto accountAudioCodec = std::static_pointer_cast<SystemAudioCodecInfo>(receive_.codec);
180 receiveThread_.reset(
181 new AudioReceiveThread(streamId_, accountAudioCodec->audioformat, receive_.receiving_sdp, mtu_));
182
183 receiveThread_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
184 asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
185 if (auto shared = w.lock())
186 shared->attachRemoteRecorder(ms);
187 });
188 });
189 receiveThread_->addIOContext(*socketPair_);
190 receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
191 receiveThread_->startReceiver();
192
193 // Make the default ring buffer read the audio from the stream
195}
196
197void
198AudioRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
199{
200 std::lock_guard lock(mutex_);
201
203 stop();
204 return;
205 }
206
207 try {
208 if (rtp_sock and rtcp_sock) {
209 if (send_.addr) {
210 rtp_sock->setDefaultRemoteAddress(send_.addr);
211 }
212
214 if (rtcpAddr) {
215 rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
216 }
217
218 socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
219 } else {
220 socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
221 }
222
224 socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
226 send_.crypto.getCryptoSuite().c_str(),
227 send_.crypto.getSrtpKeyInfo().c_str());
228 }
229 } catch (const std::runtime_error& e) {
230 JAMI_ERROR("Socket creation failed: {}", e.what());
231 return;
232 }
233
234 startSender();
235 startReceiver();
236}
237
238void
240{
241 std::lock_guard lock(mutex_);
242
243 JAMI_DEBUG("[{}] Stopping receiver", fmt::ptr(this));
244
245 if (not receiveThread_)
246 return;
247
248 if (socketPair_)
249 socketPair_->setReadBlockingMode(false);
250
251 receiveThread_->stopReceiver();
252
253 // Unbind the default ring buffer from this audio stream
255
256 if (audioInput_)
257 audioInput_->detach(sender_.get());
258
259 if (socketPair_)
260 socketPair_->interrupt();
261
262 rtcpCheckerThread_.join();
263
264 receiveThread_.reset();
265 sender_.reset();
266 socketPair_.reset();
267 audioInput_.reset();
268}
269
270void
272{
273 asio::post(*Manager::instance().ioContext(), [w = weak_from_this(), muted, dir]() {
274 if (auto shared = w.lock()) {
275 std::lock_guard lock(shared->mutex_);
276 if (dir == Direction::SEND) {
277 shared->muteState_ = muted;
278 if (shared->audioInput_) {
279 shared->audioInput_->setMuted(muted);
280 }
281 } else {
282 if (shared->receiveThread_) {
283 auto ms = shared->receiveThread_->getInfo();
284 ms.name = shared->streamId_ + ":remote";
285 if (muted) {
286 if (auto* ob = shared->recorder_->getStream(ms.name)) {
287 shared->receiveThread_->detach(ob);
288 shared->recorder_->removeStream(ms);
289 }
290 } else {
291 if (auto* ob = shared->recorder_->addStream(ms)) {
292 shared->receiveThread_->attach(ob);
293 }
294 }
295 }
296 }
297 }
298 });
299}
300
301void
302AudioRtpSession::setVoiceCallback(std::function<void(bool)> cb)
303{
304 std::lock_guard lock(mutex_);
305 voiceCallback_ = std::move(cb);
306 if (sender_) {
307 sender_->setVoiceCallback(voiceCallback_);
308 }
309}
310
311bool
312AudioRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
313{
314 auto rtcpInfoVect = socketPair_->getRtcpRR();
315 unsigned totalLost = 0;
316 unsigned totalJitter = 0;
317 unsigned nbDropNotNull = 0;
318 auto vectSize = rtcpInfoVect.size();
319
320 if (vectSize != 0) {
321 for (const auto& it : rtcpInfoVect) {
322 if (it.fraction_lost != 0) // Exclude null drop
323 nbDropNotNull++;
324 totalLost += it.fraction_lost;
325 totalJitter += ntohl(it.jitter);
326 }
327 rtcpi.packetLoss = nbDropNotNull ? static_cast<float>((100 * totalLost) / (256.0 * nbDropNotNull)) : 0;
328 // Jitter is expressed in timestamp unit -> convert to milliseconds
329 // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
330 rtcpi.jitter = static_cast<unsigned int>(
331 (static_cast<float>(totalJitter) / static_cast<float>(vectSize) / 90000.0f) * 1000.0f);
332 rtcpi.nb_sample = vectSize;
333 rtcpi.latency = static_cast<float>(socketPair_->getLastLatency());
334 return true;
335 }
336 return false;
337}
338
339void
340AudioRtpSession::adaptQualityAndBitrate()
341{
342 RTCPInfo rtcpi {};
343 if (check_RCTP_Info_RR(rtcpi)) {
344 dropProcessing(&rtcpi);
345 }
346}
347
348void
349AudioRtpSession::dropProcessing(RTCPInfo* rtcpi)
350{
351 auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
352 setNewPacketLoss(static_cast<unsigned int>(pondLoss));
353}
354
355void
356AudioRtpSession::setNewPacketLoss(unsigned int newPL)
357{
358 newPL = std::clamp((int) newPL, 0, 100);
359 if (newPL != packetLoss_) {
360 if (sender_) {
361 auto ret = sender_->setPacketLoss(newPL);
362 packetLoss_ = newPL;
363 if (ret == -1)
364 JAMI_ERROR("Fail to access the encoder");
365 } else {
366 JAMI_ERROR("Fail to access the sender");
367 }
368 }
369}
370
371float
372AudioRtpSession::getPonderateLoss(float lastLoss)
373{
374 static float pond = 10.0f;
375
376 pond = floor(0.5 * lastLoss + 0.5 * pond);
377 if (lastLoss > pond) {
378 return lastLoss;
379 } else {
380 return pond;
381 }
382}
383
384void
385AudioRtpSession::processRtcpChecker()
386{
387 adaptQualityAndBitrate();
388 socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
389}
390
391void
392AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
393{
394 std::lock_guard lock(mutex_);
395 if (!recorder_ || !receiveThread_)
396 return;
397 MediaStream remoteMS = ms;
398 remoteMS.name = streamId_ + ":remote";
399 if (auto* ob = recorder_->addStream(remoteMS)) {
400 receiveThread_->attach(ob);
401 }
402}
403
404void
405AudioRtpSession::attachLocalRecorder(const MediaStream& ms)
406{
407 std::lock_guard lock(mutex_);
408 if (!recorder_ || !audioInput_)
409 return;
410 MediaStream localMS = ms;
411 localMS.name = streamId_ + ":local";
412 if (auto* ob = recorder_->addStream(localMS)) {
413 audioInput_->attach(ob);
414 }
415}
416
417void
418AudioRtpSession::initRecorder()
419{
420 if (!recorder_)
421 return;
422 if (receiveThread_)
423 receiveThread_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
424 asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
425 if (auto shared = w.lock())
426 shared->attachRemoteRecorder(ms);
427 });
428 });
429 if (audioInput_)
430 audioInput_->setRecorderCallback([w = weak_from_this()](const MediaStream& ms) {
431 asio::post(*Manager::instance().ioContext(), [w = std::move(w), ms]() {
432 if (auto shared = w.lock())
433 shared->attachLocalRecorder(ms);
434 });
435 });
436}
437
438void
439AudioRtpSession::deinitRecorder()
440{
441 if (!recorder_)
442 return;
443 if (receiveThread_) {
444 auto ms = receiveThread_->getInfo();
445 ms.name = streamId_ + ":remote";
446 if (auto* ob = recorder_->getStream(ms.name)) {
447 receiveThread_->detach(ob);
448 recorder_->removeStream(ms);
449 }
450 }
451 if (audioInput_) {
452 auto ms = audioInput_->getInfo();
453 ms.name = streamId_ + ":local";
454 if (auto* ob = recorder_->getStream(ms.name)) {
455 audioInput_->detach(ob);
456 recorder_->removeStream(ms);
457 }
458 }
459}
460
461} // namespace jami
void setMuted(bool muted, Direction dir=Direction::SEND) override
void start(std::unique_ptr< dhtnet::IceSocket > rtp_sock, std::unique_ptr< dhtnet::IceSocket > rtcp_sock) override
AudioRtpSession(const std::string &callId, const std::string &streamId, const std::shared_ptr< MediaRecorder > &rec)
std::string getSrtpKeyInfo() const
std::string getCryptoSuite() const
static LIBJAMI_TEST_EXPORT Manager & instance()
Definition manager.cpp:694
RingBufferPool & getRingBufferPool()
Return a pointer to the instance of the RingBufferPool.
Definition manager.cpp:3197
std::shared_ptr< RingBuffer > createRingBuffer(const std::string &id)
Create a new ringbuffer with a default readoffset.
void bindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Attaches a reader the specified source.
static const char *const DEFAULT_ID
void unBindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Detaches a reader from the specified source.
std::string input_
Definition rtp_session.h:77
std::string getRemoteRtpUri() const
Definition rtp_session.h:84
std::recursive_mutex mutex_
Definition rtp_session.h:72
MediaDescription receive_
Definition rtp_session.h:79
const std::string streamId_
Definition rtp_session.h:74
MediaDescription send_
Definition rtp_session.h:78
std::function< void(MediaType, bool)> onSuccessfulSetup_
Definition rtp_session.h:82
std::unique_ptr< SocketPair > socketPair_
Definition rtp_session.h:76
bool isRunning() const noexcept
#define JAMI_ERROR(formatstr,...)
Definition logger.h:243
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:238
#define JAMI_WARNING(formatstr,...)
Definition logger.h:242
void emitSignal(Args... args)
Definition jami_signal.h:64
static constexpr auto NEWPARAMS_TIMEOUT
std::shared_ptr< AudioInput > getAudioInput(const std::string &device)
@ MEDIA_AUDIO
Definition media_codec.h:46
static constexpr const char * SEPARATOR
Definition media_const.h:32
dhtnet::IpAddr addr
Endpoint socket address.
dhtnet::IpAddr rtcp_addr
RTCP socket address.
std::shared_ptr< SystemCodecInfo > codec
RTP.
CryptoAttribute crypto
Crypto parameters.
std::string receiving_sdp
unsigned int jitter
unsigned int nb_sample