Ring Daemon
Loading...
Searching...
No Matches
ringbufferpool.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "ringbufferpool.h"
19#include "ringbuffer.h"
20#include "logger.h"
21
22#include <cstring>
23
24namespace jami {
25
26const char* const RingBufferPool::DEFAULT_ID = "audiolayer_id";
27
29 : defaultRingBuffer_(createRingBuffer(DEFAULT_ID))
30{}
31
33{
34 readBindingsMap_.clear();
35 defaultRingBuffer_.reset();
36
37 // Verify ringbuffer not removed yet
38 // XXXX: With a good design this should never happen! :-P
39 for (const auto& item : ringBufferMap_) {
40 const auto& weak = item.second;
41 if (not weak.expired())
42 JAMI_WARNING("Leaking RingBuffer '{}'", item.first);
43 }
44}
45
46void
48{
49 std::lock_guard lk(stateLock_);
50
51 if (sr != internalAudioFormat_.sample_rate) {
52 flushAllBuffersLocked();
53 internalAudioFormat_.sample_rate = sr;
54 }
55}
56
57void
59{
60 std::lock_guard lk(stateLock_);
61
62 if (format != internalAudioFormat_) {
63 flushAllBuffersLocked();
64 internalAudioFormat_ = format;
65 for (auto& wrb : ringBufferMap_)
66 if (auto rb = wrb.second.lock())
67 rb->setFormat(internalAudioFormat_);
68 }
69}
70
71std::shared_ptr<RingBuffer>
72RingBufferPool::getRingBufferLocked(const std::string& id)
73{
74 const auto& it = ringBufferMap_.find(id);
75 if (it != ringBufferMap_.cend()) {
76 if (const auto& sptr = it->second.lock())
77 return sptr;
78 ringBufferMap_.erase(it);
79 }
80
81 return nullptr;
82}
83
84std::shared_ptr<RingBuffer>
85RingBufferPool::getRingBufferLocked(const std::string& id) const
86{
87 const auto& it = ringBufferMap_.find(id);
88 if (it != ringBufferMap_.cend())
89 return it->second.lock();
90
91 return nullptr;
92}
93
94std::shared_ptr<RingBuffer>
95RingBufferPool::getRingBuffer(const std::string& id)
96{
97 std::lock_guard lk(stateLock_);
98 return getRingBufferLocked(id);
99}
100
101std::shared_ptr<RingBuffer>
102RingBufferPool::getRingBuffer(const std::string& id) const
103{
104 std::lock_guard lk(stateLock_);
105 return getRingBufferLocked(id);
106}
107
108std::shared_ptr<RingBuffer>
110{
111 std::lock_guard lk(stateLock_);
112
113 auto rbuf = getRingBufferLocked(id);
114 if (rbuf) {
115 JAMI_DEBUG("Ringbuffer already exists for id '{}'", id);
116 return rbuf;
117 }
118
119 rbuf.reset(new RingBuffer(id, internalAudioFormat_));
120 ringBufferMap_.emplace(id, std::weak_ptr<RingBuffer>(rbuf));
121 return rbuf;
122}
123
124const RingBufferPool::ReadBindings*
125RingBufferPool::getReadBindings(const std::string& ringbufferId) const
126{
127 const auto& iter = readBindingsMap_.find(ringbufferId);
128 return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
129}
130
131RingBufferPool::ReadBindings*
132RingBufferPool::getReadBindings(const std::string& ringbufferId)
133{
134 const auto& iter = readBindingsMap_.find(ringbufferId);
135 return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
136}
137
138void
139RingBufferPool::removeReadBindings(const std::string& ringbufferId)
140{
141 if (not readBindingsMap_.erase(ringbufferId))
142 JAMI_ERROR("Ringbuffer {} does not exist!", ringbufferId);
143}
144
145void
146RingBufferPool::addReaderToRingBuffer(const std::shared_ptr<RingBuffer>& sourceBuffer, const std::string& readerBufferId)
147{
149 JAMI_WARNING("RingBuffer has a readoffset on itself");
150
151 sourceBuffer->createReadOffset(readerBufferId);
152 readBindingsMap_[readerBufferId].insert(sourceBuffer);
153}
154
155void
156RingBufferPool::removeReaderFromRingBuffer(const std::shared_ptr<RingBuffer>& sourceBuffer,
157 const std::string& readerBufferId)
158{
159 if (auto* bindings = getReadBindings(readerBufferId)) {
160 bindings->erase(sourceBuffer);
161 if (bindings->empty())
162 removeReadBindings(readerBufferId);
163 }
164
165 sourceBuffer->removeReadOffset(readerBufferId);
166}
167
168void
169RingBufferPool::bindRingBuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
170{
171 JAMI_LOG("Bind ringbuffer {} to ringbuffer {}", ringbufferId1, ringbufferId2);
172
173 std::lock_guard lk(stateLock_);
174
175 const auto& rb1 = getRingBufferLocked(ringbufferId1);
176 if (not rb1) {
177 JAMI_ERROR("No ringbuffer associated with id '{}'", ringbufferId1);
178 return;
179 }
180
181 const auto& rb2 = getRingBufferLocked(ringbufferId2);
182 if (not rb2) {
183 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
184 return;
185 }
186
187 addReaderToRingBuffer(rb1, ringbufferId2);
188 addReaderToRingBuffer(rb2, ringbufferId1);
189}
190
191void
193{
194 /* This method is used only for active ringbuffers, if this ringbuffer does not exist,
195 * do nothing */
196 std::lock_guard lk(stateLock_);
197
198 if (const auto& rb = getRingBufferLocked(sourceBufferId)) {
199 // p1 est le binding de p2 (p2 lit le stream de p1)
200 addReaderToRingBuffer(rb, readerBufferId);
201 }
202}
203
204void
205RingBufferPool::unbindRingBuffers(const std::string& ringbufferId1, const std::string& ringbufferId2)
206{
207 JAMI_LOG("Unbind ringbuffers {} and {}", ringbufferId1, ringbufferId2);
208
209 std::lock_guard lk(stateLock_);
210
211 const auto& rb1 = getRingBufferLocked(ringbufferId1);
212 if (not rb1) {
213 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId1);
214 return;
215 }
216
217 const auto& rb2 = getRingBufferLocked(ringbufferId2);
218 if (not rb2) {
219 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
220 return;
221 }
222
223 removeReaderFromRingBuffer(rb1, ringbufferId2);
224 removeReaderFromRingBuffer(rb2, ringbufferId1);
225}
226
227void
229{
230 std::lock_guard lk(stateLock_);
231
232 if (const auto& rb = getRingBufferLocked(sourceBufferId))
233 removeReaderFromRingBuffer(rb, readerBufferId);
234}
235
236void
238{
239 std::lock_guard lk(stateLock_);
240
241 const auto& rb = getRingBufferLocked(ringbufferId);
242 if (not rb) {
243 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
244 return;
245 }
246
247 auto* bindings = getReadBindings(ringbufferId);
248 if (not bindings)
249 return;
250 const auto bindings_copy = *bindings; // temporary copy
251 for (const auto& rbuf : bindings_copy) {
252 removeReaderFromRingBuffer(rb, rbuf->getId());
253 }
254}
255
256void
258{
259 std::lock_guard lk(stateLock_);
260
261 auto ringBuffer = getRingBufferLocked(sourceBufferId);
262 if (not ringBuffer) {
263 JAMI_ERROR("No ringbuffer associated to id '{}'", sourceBufferId);
264 return;
265 }
266
267 const std::vector<std::string>& subscribers = ringBuffer->getSubscribers();
268 for (const auto& subscriber : subscribers) {
269 removeReaderFromRingBuffer(ringBuffer, subscriber);
270 }
271}
272
273void
275{
276 JAMI_LOG("Unbind ringbuffer {} from all bound ringbuffers", ringbufferId);
277
278 std::lock_guard lk(stateLock_);
279
280 const auto& rb = getRingBufferLocked(ringbufferId);
281 if (not rb) {
282 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
283 return;
284 }
285
286 auto* bindings = getReadBindings(ringbufferId);
287 if (not bindings)
288 return;
289
290 const auto bindings_copy = *bindings; // temporary copy
291 for (const auto& rbuf : bindings_copy) {
292 removeReaderFromRingBuffer(rbuf, ringbufferId);
293 removeReaderFromRingBuffer(rb, rbuf->getId());
294 }
295}
296
297std::shared_ptr<AudioFrame>
299{
300 std::lock_guard lk(stateLock_);
301
302 auto* const bindings = getReadBindings(ringbufferId);
303 if (not bindings)
304 return {};
305
306 // No mixing
307 if (bindings->size() == 1)
308 return (*bindings->cbegin())->get(ringbufferId);
309
310 auto mixBuffer = std::make_shared<AudioFrame>(internalAudioFormat_);
311 auto mixed = false;
312 for (const auto& rbuf : *bindings) {
313 if (auto b = rbuf->get(ringbufferId)) {
314 mixed = true;
315 mixBuffer->mix(*b);
316
317 // voice is true if any of mixed frames has voice
318 mixBuffer->has_voice |= b->has_voice;
319 }
320 }
321
322 return mixed ? mixBuffer : nullptr;
323}
324
325bool
327{
328 return waitForDataAvailable(ringbufferId, clock::now() + max_wait);
329}
330
331bool
333{
334 std::unique_lock lk(stateLock_);
335 const auto* bindings = getReadBindings(ringbufferId);
336 if (not bindings)
337 return false;
338 const auto bindings_copy = *bindings; // temporary copy
339
340 lk.unlock();
341 for (const auto& rbuf : bindings_copy) {
342 if (rbuf->waitForDataAvailable(ringbufferId, deadline) == 0)
343 return false;
344 }
345 return true;
346}
347
348std::shared_ptr<AudioFrame>
350{
351 std::lock_guard lk(stateLock_);
352
353 auto* bindings = getReadBindings(ringbufferId);
354 if (not bindings)
355 return {};
356
357 // No mixing
358 if (bindings->size() == 1) {
359 return (*bindings->cbegin())->get(ringbufferId);
360 }
361
362 size_t availableFrames = 0;
363
364 for (const auto& rbuf : *bindings)
365 availableFrames = std::min(availableFrames, rbuf->availableForGet(ringbufferId));
366
367 if (availableFrames == 0)
368 return {};
369
370 auto buf = std::make_shared<AudioFrame>(internalAudioFormat_);
371 for (const auto& rbuf : *bindings) {
372 if (auto b = rbuf->get(ringbufferId)) {
373 buf->mix(*b);
374
375 // voice is true if any of mixed frames has voice
376 buf->has_voice |= b->has_voice;
377 }
378 }
379
380 return buf;
381}
382
383size_t
385{
386 std::lock_guard lk(stateLock_);
387
388 const auto* const bindings = getReadBindings(ringbufferId);
389 if (not bindings)
390 return 0;
391
392 // No mixing
393 if (bindings->size() == 1) {
394 return (*bindings->begin())->availableForGet(ringbufferId);
395 }
396
397 size_t availableSamples = std::numeric_limits<size_t>::max();
398
399 for (const auto& rbuf : *bindings) {
400 const size_t nbSamples = rbuf->availableForGet(ringbufferId);
401 if (nbSamples != 0)
403 }
404
405 return availableSamples != std::numeric_limits<size_t>::max() ? availableSamples : 0;
406}
407
408size_t
410{
411 std::lock_guard lk(stateLock_);
412
413 auto* const bindings = getReadBindings(ringbufferId);
414 if (not bindings)
415 return 0;
416
417 for (const auto& rbuf : *bindings)
418 rbuf->discard(toDiscard, ringbufferId);
419
420 return toDiscard;
421}
422
423void
425{
426 std::lock_guard lk(stateLock_);
427
428 auto* const bindings = getReadBindings(ringbufferId);
429 if (not bindings)
430 return;
431
432 for (const auto& rbuf : *bindings)
433 rbuf->flush(ringbufferId);
434}
435
436void
437RingBufferPool::flushAllBuffersLocked()
438{
439 for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end();) {
440 if (const auto rb = item->second.lock()) {
441 rb->flushAll();
442 ++item;
443 } else {
444 // Use this version of erase to avoid using invalidated iterator
445 item = ringBufferMap_.erase(item);
446 }
447 }
448}
449
450void
452{
453 std::lock_guard lk(stateLock_);
454 flushAllBuffersLocked();
455}
456
457bool
459{
460 std::lock_guard lk(stateLock_);
461 if (!id.empty()) {
462 if (auto rb = getRingBufferLocked(id)) {
463 return rb->isAudioMeterActive();
464 }
465 } else {
466 for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
467 if (const auto rb = item->second.lock()) {
468 if (rb->isAudioMeterActive()) {
469 return true;
470 }
471 }
472 }
473 }
474 return false;
475}
476
477void
478RingBufferPool::setAudioMeterState(const std::string& id, bool state)
479{
480 std::lock_guard lk(stateLock_);
481 if (!id.empty()) {
482 if (auto rb = getRingBufferLocked(id)) {
483 rb->setAudioMeterState(state);
484 }
485 } else {
486 for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
487 if (const auto rb = item->second.lock()) {
488 rb->setAudioMeterState(state);
489 }
490 }
491 }
492}
493
494} // namespace jami
clock::time_point time_point
std::shared_ptr< RingBuffer > createRingBuffer(const std::string &id)
Create a new ringbuffer with a default readoffset.
std::shared_ptr< RingBuffer > getRingBuffer(const std::string &id)
Obtain a shared pointer on a RingBuffer given by its ID.
size_t availableForGet(const std::string &ringbufferId) const
bool waitForDataAvailable(const std::string &ringbufferId, const duration &max_wait) const
void unBindAllHalfDuplexIn(const std::string &sourceBufferId)
Detaches a source from all its readers.
size_t discard(size_t toDiscard, const std::string &ringbufferId)
void setAudioMeterState(const std::string &id, bool state)
void bindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Attaches a reader the specified source.
std::shared_ptr< AudioFrame > getAvailableData(const std::string &ringbufferId)
clock::duration duration
void unBindAllHalfDuplexOut(const std::string &ringbufferId)
Detaches a reader from all his sources.
void setInternalSamplingRate(unsigned sr)
bool isAudioMeterActive(const std::string &id)
void bindRingBuffers(const std::string &ringbufferId1, const std::string &ringbufferId2)
Bind two RingBuffer together (full duplex).
std::shared_ptr< AudioFrame > getData(const std::string &ringbufferId)
static const char *const DEFAULT_ID
void unbindRingBuffers(const std::string &ringbufferId1, const std::string &ringbufferId2)
Unbind two RingBuffer (full duplex).
void unBindAll(const std::string &ringbufferId)
void flush(const std::string &ringbufferId)
void unBindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Detaches a reader from the specified source.
void setInternalAudioFormat(AudioFormat format)
A ring buffer for mutichannel audio samples.
Definition ringbuffer.h:37
#define JAMI_ERROR(formatstr,...)
Definition logger.h:243
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:238
#define JAMI_WARNING(formatstr,...)
Definition logger.h:242
#define JAMI_LOG(formatstr,...)
Definition logger.h:237
void emitSignal(Args... args)
Definition jami_signal.h:64
Structure to hold sample rate and channel number associated with audio data.