Ring Daemon 16.0.0
Loading...
Searching...
No Matches
audio_rtp_session.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "libav_deps.h" // MUST BE INCLUDED FIRST
19
20#include "audio_rtp_session.h"
21
22#include "logger.h"
23#include "noncopyable.h"
24#include "sip/sdp.h"
25
27#include "audio_sender.h"
28#include "socket_pair.h"
29#include "media_recorder.h"
30#include "media_encoder.h"
31#include "media_decoder.h"
32#include "media_io_handle.h"
33#include "media_device.h"
34#include "media_const.h"
35
36#include "audio/audio_input.h"
38#include "audio/resampler.h"
39#include "client/videomanager.h"
40#include "manager.h"
41#include "observer.h"
42
43#include <asio/io_context.hpp>
44#include <asio/post.hpp>
45#include <sstream>
46
47namespace jami {
48
49AudioRtpSession::AudioRtpSession(const std::string& callId,
50 const std::string& streamId,
51 const std::shared_ptr<MediaRecorder>& rec)
52 : RtpSession(callId, streamId, MediaType::MEDIA_AUDIO)
53 , rtcpCheckerThread_([] { return true; }, [this] { processRtcpChecker(); }, [] {})
54
55{
56 recorder_ = rec;
57 JAMI_DEBUG("Created Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
58
59 // don't move this into the initializer list or Cthulus will emerge
60 ringbuffer_ = Manager::instance().getRingBufferPool().createRingBuffer(streamId_);
61}
62
64{
66 stop();
67 JAMI_DEBUG("Destroyed Audio RTP session: {} - stream id {}", fmt::ptr(this), streamId_);
68}
69
70void
71AudioRtpSession::startSender()
72{
73 std::lock_guard lock(mutex_);
74 JAMI_DEBUG("Start audio RTP sender: input [{}] - muted [{}]",
75 input_,
76 muteState_ ? "YES" : "NO");
77
79 JAMI_WARNING("Audio sending disabled");
80 if (sender_) {
81 if (socketPair_)
82 socketPair_->interrupt();
83 if (audioInput_)
84 audioInput_->detach(sender_.get());
85 sender_.reset();
86 }
87 return;
88 }
89
90 if (sender_)
91 JAMI_WARNING("Restarting audio sender");
92 if (audioInput_)
93 audioInput_->detach(sender_.get());
94
95 bool fileAudio = !input_.empty() && input_.find("file://") != std::string::npos;
97 if (fileAudio) {
98 auto suffix = input_;
99 static const std::string& sep = libjami::Media::VideoProtocolPrefix::SEPARATOR;
100 const auto pos = input_.find(sep);
101 if (pos != std::string::npos) {
102 suffix = input_.substr(pos + sep.size());
103 }
105 }
106
107 // sender sets up input correctly, we just keep a reference in case startSender is called
108 audioInput_ = jami::getAudioInput(audioInputId);
109 audioInput_->setRecorderCallback(
110 [w=weak_from_this()](const MediaStream& ms) {
111 asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
112 if (auto shared = w.lock())
113 shared->attachLocalRecorder(ms);
114 });
115 });
116 audioInput_->setMuted(muteState_);
117 audioInput_->setSuccessfulSetupCb(onSuccessfulSetup_);
118 if (!fileAudio) {
119 auto newParams = audioInput_->switchInput(input_);
120 try {
121 if (newParams.valid()
122 && newParams.wait_for(NEWPARAMS_TIMEOUT) == std::future_status::ready) {
123 localAudioParams_ = newParams.get();
124 } else {
125 JAMI_ERROR("No valid new audio parameters");
126 return;
127 }
128 } catch (const std::exception& e) {
129 JAMI_ERROR("Exception while retrieving audio parameters: {}", e.what());
130 return;
131 }
132 }
133 if (streamId_ != audioInput_->getId())
135
136 send_.fecEnabled = true;
137
138 // be sure to not send any packets before saving last RTP seq value
139 socketPair_->stopSendOp();
140 if (sender_)
141 initSeqVal_ = sender_->getLastSeqValue() + 1;
142 try {
143 sender_.reset();
144 socketPair_->stopSendOp(false);
145 sender_.reset(new AudioSender(getRemoteRtpUri(), send_, *socketPair_, initSeqVal_, mtu_));
146 } catch (const MediaEncoderException& e) {
147 JAMI_ERROR("{}", e.what());
148 send_.enabled = false;
149 }
150
151 if (voiceCallback_)
152 sender_->setVoiceCallback(voiceCallback_);
153
154 // NOTE do after sender/encoder are ready
155 auto codec = std::static_pointer_cast<SystemAudioCodecInfo>(send_.codec);
156 audioInput_->setFormat(codec->audioformat);
157 audioInput_->attach(sender_.get());
158
159 if (not rtcpCheckerThread_.isRunning())
160 rtcpCheckerThread_.start();
161}
162
163void
165{
166 std::lock_guard lock(mutex_);
167 // ensure that start has been called before restart
168 if (not socketPair_) {
169 return;
170 }
171
172 startSender();
173}
174
175void
176AudioRtpSession::startReceiver()
177{
178 if (socketPair_)
179 socketPair_->setReadBlockingMode(true);
180
182 JAMI_WARNING("Audio receiving disabled");
183 receiveThread_.reset();
184 return;
185 }
186
187 if (receiveThread_)
188 JAMI_WARNING("Restarting audio receiver");
189
190 auto accountAudioCodec = std::static_pointer_cast<SystemAudioCodecInfo>(receive_.codec);
191 receiveThread_.reset(new AudioReceiveThread(streamId_,
192 accountAudioCodec->audioformat,
194 mtu_));
195
196 receiveThread_->setRecorderCallback([w=weak_from_this()](const MediaStream& ms) {
197 asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
198 if (auto shared = w.lock())
199 shared->attachRemoteRecorder(ms);
200 });
201 });
202 receiveThread_->addIOContext(*socketPair_);
203 receiveThread_->setSuccessfulSetupCb(onSuccessfulSetup_);
204 receiveThread_->startReceiver();
205}
206
207void
208AudioRtpSession::start(std::unique_ptr<dhtnet::IceSocket> rtp_sock, std::unique_ptr<dhtnet::IceSocket> rtcp_sock)
209{
210 std::lock_guard lock(mutex_);
211
213 stop();
214 return;
215 }
216
217 try {
218 if (rtp_sock and rtcp_sock) {
219 if (send_.addr) {
220 rtp_sock->setDefaultRemoteAddress(send_.addr);
221 }
222
224 if (rtcpAddr) {
225 rtcp_sock->setDefaultRemoteAddress(rtcpAddr);
226 }
227
228 socketPair_.reset(new SocketPair(std::move(rtp_sock), std::move(rtcp_sock)));
229 } else {
230 socketPair_.reset(new SocketPair(getRemoteRtpUri().c_str(), receive_.addr.getPort()));
231 }
232
234 socketPair_->createSRTP(receive_.crypto.getCryptoSuite().c_str(),
236 send_.crypto.getCryptoSuite().c_str(),
237 send_.crypto.getSrtpKeyInfo().c_str());
238 }
239 } catch (const std::runtime_error& e) {
240 JAMI_ERROR("Socket creation failed: {}", e.what());
241 return;
242 }
243
244 startSender();
245 startReceiver();
246}
247
248void
250{
251 std::lock_guard lock(mutex_);
252
253 JAMI_DEBUG("[{}] Stopping receiver", fmt::ptr(this));
254
255 if (not receiveThread_)
256 return;
257
258 if (socketPair_)
259 socketPair_->setReadBlockingMode(false);
260
261 receiveThread_->stopReceiver();
262
263 if (audioInput_)
264 audioInput_->detach(sender_.get());
265
266 if (socketPair_)
267 socketPair_->interrupt();
268
269 rtcpCheckerThread_.join();
270
271 receiveThread_.reset();
272 sender_.reset();
273 socketPair_.reset();
274 audioInput_.reset();
275}
276
277void
279{
280 asio::post(*Manager::instance().ioContext(), [w=weak_from_this(), muted, dir]() {
281 if (auto shared = w.lock()) {
282 std::lock_guard lock(shared->mutex_);
283 if (dir == Direction::SEND) {
284 shared->muteState_ = muted;
285 if (shared->audioInput_) {
286 shared->audioInput_->setMuted(muted);
287 }
288 } else {
289 if (shared->receiveThread_) {
290 auto ms = shared->receiveThread_->getInfo();
291 ms.name = shared->streamId_ + ":remote";
292 if (muted) {
293 if (auto ob = shared->recorder_->getStream(ms.name)) {
294 shared->receiveThread_->detach(ob);
295 shared->recorder_->removeStream(ms);
296 }
297 } else {
298 if (auto ob = shared->recorder_->addStream(ms)) {
299 shared->receiveThread_->attach(ob);
300 }
301 }
302 }
303 }
304 }
305 });
306}
307
308void
309AudioRtpSession::setVoiceCallback(std::function<void(bool)> cb)
310{
311 std::lock_guard lock(mutex_);
312 voiceCallback_ = std::move(cb);
313 if (sender_) {
314 sender_->setVoiceCallback(voiceCallback_);
315 }
316}
317
318bool
319AudioRtpSession::check_RCTP_Info_RR(RTCPInfo& rtcpi)
320{
321 auto rtcpInfoVect = socketPair_->getRtcpRR();
322 unsigned totalLost = 0;
323 unsigned totalJitter = 0;
324 unsigned nbDropNotNull = 0;
325 auto vectSize = rtcpInfoVect.size();
326
327 if (vectSize != 0) {
328 for (const auto& it : rtcpInfoVect) {
329 if (it.fraction_lost != 0) // Exclude null drop
330 nbDropNotNull++;
331 totalLost += it.fraction_lost;
332 totalJitter += ntohl(it.jitter);
333 }
334 rtcpi.packetLoss = nbDropNotNull ? (float) (100 * totalLost) / (256.0 * nbDropNotNull) : 0;
335 // Jitter is expressed in timestamp unit -> convert to milliseconds
336 // https://stackoverflow.com/questions/51956520/convert-jitter-from-rtp-timestamp-unit-to-millisseconds
337 rtcpi.jitter = (totalJitter / vectSize / 90000.0f) * 1000;
338 rtcpi.nb_sample = vectSize;
339 rtcpi.latency = socketPair_->getLastLatency();
340 return true;
341 }
342 return false;
343}
344
345void
346AudioRtpSession::adaptQualityAndBitrate()
347{
348 RTCPInfo rtcpi {};
349 if (check_RCTP_Info_RR(rtcpi)) {
350 dropProcessing(&rtcpi);
351 }
352}
353
354void
355AudioRtpSession::dropProcessing(RTCPInfo* rtcpi)
356{
357 auto pondLoss = getPonderateLoss(rtcpi->packetLoss);
358 setNewPacketLoss(pondLoss);
359}
360
361void
362AudioRtpSession::setNewPacketLoss(unsigned int newPL)
363{
364 newPL = std::clamp((int) newPL, 0, 100);
365 if (newPL != packetLoss_) {
366 if (sender_) {
367 auto ret = sender_->setPacketLoss(newPL);
368 packetLoss_ = newPL;
369 if (ret == -1)
370 JAMI_ERROR("Fail to access the encoder");
371 } else {
372 JAMI_ERROR("Fail to access the sender");
373 }
374 }
375}
376
377float
378AudioRtpSession::getPonderateLoss(float lastLoss)
379{
380 static float pond = 10.0f;
381
382 pond = floor(0.5 * lastLoss + 0.5 * pond);
383 if (lastLoss > pond) {
384 return lastLoss;
385 } else {
386 return pond;
387 }
388}
389
390void
391AudioRtpSession::processRtcpChecker()
392{
393 adaptQualityAndBitrate();
394 socketPair_->waitForRTCP(std::chrono::seconds(rtcp_checking_interval));
395}
396
397void
398AudioRtpSession::attachRemoteRecorder(const MediaStream& ms)
399{
400 std::lock_guard lock(mutex_);
401 if (!recorder_ || !receiveThread_)
402 return;
403 MediaStream remoteMS = ms;
404 remoteMS.name = streamId_ + ":remote";
405 if (auto ob = recorder_->addStream(remoteMS)) {
406 receiveThread_->attach(ob);
407 }
408}
409
410void
411AudioRtpSession::attachLocalRecorder(const MediaStream& ms)
412{
413 std::lock_guard lock(mutex_);
414 if (!recorder_ || !audioInput_)
415 return;
416 MediaStream localMS = ms;
417 localMS.name = streamId_ + ":local";
418 if (auto ob = recorder_->addStream(localMS)) {
419 audioInput_->attach(ob);
420 }
421}
422
423void
424AudioRtpSession::initRecorder()
425{
426 if (!recorder_)
427 return;
428 if (receiveThread_)
429 receiveThread_->setRecorderCallback(
430 [w=weak_from_this()](const MediaStream& ms) {
431 asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
432 if (auto shared = w.lock())
433 shared->attachRemoteRecorder(ms);
434 });
435 });
436 if (audioInput_)
437 audioInput_->setRecorderCallback(
438 [w=weak_from_this()](const MediaStream& ms) {
439 asio::post(*Manager::instance().ioContext(), [w=std::move(w), ms]() {
440 if (auto shared = w.lock())
441 shared->attachLocalRecorder(ms);
442 });
443 });
444}
445
446void
447AudioRtpSession::deinitRecorder()
448{
449 if (!recorder_)
450 return;
451 if (receiveThread_) {
452 auto ms = receiveThread_->getInfo();
453 ms.name = streamId_ + ":remote";
454 if (auto ob = recorder_->getStream(ms.name)) {
455 receiveThread_->detach(ob);
456 recorder_->removeStream(ms);
457 }
458 }
459 if (audioInput_) {
460 auto ms = audioInput_->getInfo();
461 ms.name = streamId_ + ":local";
462 if (auto ob = recorder_->getStream(ms.name)) {
463 audioInput_->detach(ob);
464 recorder_->removeStream(ms);
465 }
466 }
467}
468
469} // namespace jami
void setMuted(bool muted, Direction dir=Direction::SEND) override
void start(std::unique_ptr< dhtnet::IceSocket > rtp_sock, std::unique_ptr< dhtnet::IceSocket > rtcp_sock) override
AudioRtpSession(const std::string &callId, const std::string &streamId, const std::shared_ptr< MediaRecorder > &rec)
std::string getSrtpKeyInfo() const
std::string getCryptoSuite() const
static LIBJAMI_TEST_EXPORT Manager & instance()
Definition manager.cpp:676
RingBufferPool & getRingBufferPool()
Return a pointer to the instance of the RingBufferPool.
Definition manager.cpp:3156
std::shared_ptr< RingBuffer > createRingBuffer(const std::string &id)
Create a new ringbuffer with a default readoffset.
void bindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Attaches a reader the specified source.
std::string input_
Definition rtp_session.h:81
std::string getRemoteRtpUri() const
Definition rtp_session.h:88
std::recursive_mutex mutex_
Definition rtp_session.h:76
MediaDescription receive_
Definition rtp_session.h:83
const std::string streamId_
Definition rtp_session.h:78
MediaDescription send_
Definition rtp_session.h:82
std::function< void(MediaType, bool)> onSuccessfulSetup_
Definition rtp_session.h:86
std::unique_ptr< SocketPair > socketPair_
Definition rtp_session.h:80
bool isRunning() const noexcept
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:226
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
void emitSignal(Args... args)
Definition ring_signal.h:64
static constexpr auto NEWPARAMS_TIMEOUT
std::shared_ptr< AudioInput > getAudioInput(const std::string &device)
@ MEDIA_AUDIO
Definition media_codec.h:47
static constexpr const char * SEPARATOR
Definition media_const.h:33
Simple macro to hide class' copy constructor and assignment operator.
dhtnet::IpAddr addr
Endpoint socket address.
dhtnet::IpAddr rtcp_addr
RTCP socket address.
std::shared_ptr< SystemCodecInfo > codec
RTP.
CryptoAttribute crypto
Crypto parameters.
std::string receiving_sdp
unsigned int jitter
unsigned int nb_sample