Ring Daemon 16.0.0
Loading...
Searching...
No Matches
ringbufferpool.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "ringbufferpool.h"
19#include "ringbuffer.h"
20#include "ring_types.h" // for SIZEBUF
21#include "logger.h"
22
23#include <limits>
24#include <utility> // for std::pair
25#include <cstring>
26#include <algorithm>
27
28namespace jami {
29
30const char* const RingBufferPool::DEFAULT_ID = "audiolayer_id";
31
33 : defaultRingBuffer_(createRingBuffer(DEFAULT_ID))
34{}
35
37{
38 readBindingsMap_.clear();
39 defaultRingBuffer_.reset();
40
41 // Verify ringbuffer not removed yet
42 // XXXX: With a good design this should never happen! :-P
43 for (const auto& item : ringBufferMap_) {
44 const auto& weak = item.second;
45 if (not weak.expired())
46 JAMI_WARNING("Leaking RingBuffer '{}'", item.first);
47 }
48}
49
50void
52{
53 std::lock_guard lk(stateLock_);
54
55 if (sr != internalAudioFormat_.sample_rate) {
57 internalAudioFormat_.sample_rate = sr;
58 }
59}
60
61void
63{
64 std::lock_guard lk(stateLock_);
65
66 if (format != internalAudioFormat_) {
68 internalAudioFormat_ = format;
69 for (auto& wrb : ringBufferMap_)
70 if (auto rb = wrb.second.lock())
71 rb->setFormat(internalAudioFormat_);
72 }
73}
74
75std::shared_ptr<RingBuffer>
76RingBufferPool::getRingBuffer(const std::string& id)
77{
78 std::lock_guard lk(stateLock_);
79
80 const auto& it = ringBufferMap_.find(id);
81 if (it != ringBufferMap_.cend()) {
82 if (const auto& sptr = it->second.lock())
83 return sptr;
84 ringBufferMap_.erase(it);
85 }
86
87 return nullptr;
88}
89
90std::shared_ptr<RingBuffer>
91RingBufferPool::getRingBuffer(const std::string& id) const
92{
93 std::lock_guard lk(stateLock_);
94
95 const auto& it = ringBufferMap_.find(id);
96 if (it != ringBufferMap_.cend())
97 return it->second.lock();
98
99 return nullptr;
100}
101
102std::shared_ptr<RingBuffer>
104{
105 std::lock_guard lk(stateLock_);
106
107 auto rbuf = getRingBuffer(id);
108 if (rbuf) {
109 JAMI_DEBUG("Ringbuffer already exists for id '{}'", id);
110 return rbuf;
111 }
112
113 rbuf.reset(new RingBuffer(id, SIZEBUF, internalAudioFormat_));
114 ringBufferMap_.emplace(id, std::weak_ptr<RingBuffer>(rbuf));
115 return rbuf;
116}
117
118const RingBufferPool::ReadBindings*
119RingBufferPool::getReadBindings(const std::string& ringbufferId) const
120{
121 const auto& iter = readBindingsMap_.find(ringbufferId);
122 return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
123}
124
125RingBufferPool::ReadBindings*
126RingBufferPool::getReadBindings(const std::string& ringbufferId)
127{
128 const auto& iter = readBindingsMap_.find(ringbufferId);
129 return iter != readBindingsMap_.cend() ? &iter->second : nullptr;
130}
131
132void
133RingBufferPool::removeReadBindings(const std::string& ringbufferId)
134{
135 if (not readBindingsMap_.erase(ringbufferId))
136 JAMI_ERROR("Ringbuffer {} does not exist!", ringbufferId);
137}
138
139void
140RingBufferPool::addReaderToRingBuffer(const std::shared_ptr<RingBuffer> &sourceBuffer,
141 const std::string &readerBufferId) {
143 JAMI_WARNING("RingBuffer has a readoffset on itself");
144
145 sourceBuffer->createReadOffset(readerBufferId);
146 readBindingsMap_[readerBufferId].insert(sourceBuffer);
147}
148
149void
150RingBufferPool::removeReaderFromRingBuffer(const std::shared_ptr<RingBuffer> &sourceBuffer,
151 const std::string &readerBufferId) {
152 if (auto bindings = getReadBindings(readerBufferId)) {
153 bindings->erase(sourceBuffer);
154 if (bindings->empty())
155 removeReadBindings(readerBufferId);
156 }
157
158 sourceBuffer->removeReadOffset(readerBufferId);
159}
160
161void
163 const std::string &ringbufferId2) {
164 JAMI_LOG("Bind ringbuffer {} to ringbuffer {}", ringbufferId1, ringbufferId2);
165
166 const auto &rb1 = getRingBuffer(ringbufferId1);
167 if (not rb1) {
168 JAMI_ERROR("No ringbuffer associated with id '{}'", ringbufferId1);
169 return;
170 }
171
172 const auto &rb2 = getRingBuffer(ringbufferId2);
173 if (not rb2) {
174 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
175 return;
176 }
177
178 std::lock_guard lk(stateLock_);
179
180 addReaderToRingBuffer(rb1, ringbufferId2);
181 addReaderToRingBuffer(rb2, ringbufferId1);
182}
183
184void
186 const std::string &sourceBufferId) {
187 /* This method is used only for active ringbuffers, if this ringbuffer does not exist,
188 * do nothing */
189 if (const auto &rb = getRingBuffer(sourceBufferId)) {
190 std::lock_guard lk(stateLock_);
191
192 // p1 est le binding de p2 (p2 lit le stream de p1)
193 addReaderToRingBuffer(rb, readerBufferId);
194 }
195}
196
197void
199 const std::string &ringbufferId2) {
200 JAMI_LOG("Unbind ringbuffers {} and {}", ringbufferId1, ringbufferId2);
201
202 const auto &rb1 = getRingBuffer(ringbufferId1);
203 if (not rb1) {
204 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId1);
205 return;
206 }
207
208 const auto &rb2 = getRingBuffer(ringbufferId2);
209 if (not rb2) {
210 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId2);
211 return;
212 }
213
214 std::lock_guard lk(stateLock_);
215
216 removeReaderFromRingBuffer(rb1, ringbufferId2);
217 removeReaderFromRingBuffer(rb2, ringbufferId1);
218}
219
220void
222 const std::string &sourceBufferId) {
223 std::lock_guard lk(stateLock_);
224
225 if (const auto &rb = getRingBuffer(sourceBufferId))
226 removeReaderFromRingBuffer(rb, readerBufferId);
227}
228
229void
231 const auto &rb = getRingBuffer(ringbufferId);
232 if (not rb) {
233 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
234 return;
235 }
236 std::lock_guard lk(stateLock_);
237 auto bindings = getReadBindings(ringbufferId);
238 if (not bindings)
239 return;
240 const auto bindings_copy = *bindings; // temporary copy
241 for (const auto &rbuf: bindings_copy) {
242 removeReaderFromRingBuffer(rb, rbuf->getId());
243 }
244}
245
246void
248 std::lock_guard lk(stateLock_);
249 const std::shared_ptr<RingBuffer> &ringBuffer = getRingBuffer(sourceBufferId);
250 const std::vector<std::string> &subscribers = ringBuffer->getSubscribers();
251 for (const auto &subscriber: subscribers) {
252 removeReaderFromRingBuffer(ringBuffer, subscriber);
253 }
254}
255
256void
258 JAMI_LOG("Unbind ringbuffer {} from all bound ringbuffers", ringbufferId);
259
260 const auto &rb = getRingBuffer(ringbufferId);
261 if (not rb) {
262 JAMI_ERROR("No ringbuffer associated to id '{}'", ringbufferId);
263 return;
264 }
265
266 std::lock_guard lk(stateLock_);
267
268 auto bindings = getReadBindings(ringbufferId);
269 if (not bindings)
270 return;
271
272 const auto bindings_copy = *bindings; // temporary copy
273 for (const auto &rbuf: bindings_copy) {
274 removeReaderFromRingBuffer(rbuf, ringbufferId);
275 removeReaderFromRingBuffer(rb, rbuf->getId());
276 }
277}
278
279std::shared_ptr<AudioFrame>
281{
282 std::lock_guard lk(stateLock_);
283
284 const auto bindings = getReadBindings(ringbufferId);
285 if (not bindings)
286 return {};
287
288 // No mixing
289 if (bindings->size() == 1)
290 return (*bindings->cbegin())->get(ringbufferId);
291
292 auto mixBuffer = std::make_shared<AudioFrame>(internalAudioFormat_);
293 auto mixed = false;
294 for (const auto& rbuf : *bindings) {
295 if (auto b = rbuf->get(ringbufferId)) {
296 mixed = true;
297 mixBuffer->mix(*b);
298
299 // voice is true if any of mixed frames has voice
300 mixBuffer->has_voice |= b->has_voice;
301 }
302 }
303
304 return mixed ? mixBuffer : nullptr;
305}
306
307bool
309 const std::chrono::microseconds& max_wait) const
310{
311 std::unique_lock<std::recursive_mutex> lk(stateLock_);
312
313 // convert to absolute time
314 const auto deadline = std::chrono::high_resolution_clock::now() + max_wait;
315
316 auto bindings = getReadBindings(ringbufferId);
317 if (not bindings)
318 return 0;
319
320 const auto bindings_copy = *bindings; // temporary copy
321 for (const auto& rbuf : bindings_copy) {
322 lk.unlock();
323 if (rbuf->waitForDataAvailable(ringbufferId, deadline) == 0)
324 return false;
325 lk.lock();
326 }
327 return true;
328}
329
330std::shared_ptr<AudioFrame>
332{
333 std::lock_guard lk(stateLock_);
334
335 auto bindings = getReadBindings(ringbufferId);
336 if (not bindings)
337 return 0;
338
339 // No mixing
340 if (bindings->size() == 1) {
341 return (*bindings->cbegin())->get(ringbufferId);
342 }
343
344 size_t availableFrames = 0;
345
346 for (const auto& rbuf : *bindings)
347 availableFrames = std::min(availableFrames, rbuf->availableForGet(ringbufferId));
348
349 if (availableFrames == 0)
350 return {};
351
352 auto buf = std::make_shared<AudioFrame>(internalAudioFormat_);
353 for (const auto& rbuf : *bindings) {
354 if (auto b = rbuf->get(ringbufferId)) {
355 buf->mix(*b);
356
357 // voice is true if any of mixed frames has voice
358 buf->has_voice |= b->has_voice;
359 }
360 }
361
362 return buf;
363}
364
365size_t
367{
368 std::lock_guard lk(stateLock_);
369
370 const auto bindings = getReadBindings(ringbufferId);
371 if (not bindings)
372 return 0;
373
374 // No mixing
375 if (bindings->size() == 1) {
376 return (*bindings->begin())->availableForGet(ringbufferId);
377 }
378
379 size_t availableSamples = std::numeric_limits<size_t>::max();
380
381 for (const auto& rbuf : *bindings) {
382 const size_t nbSamples = rbuf->availableForGet(ringbufferId);
383 if (nbSamples != 0)
385 }
386
387 return availableSamples != std::numeric_limits<size_t>::max() ? availableSamples : 0;
388}
389
390size_t
392{
393 std::lock_guard lk(stateLock_);
394
395 const auto bindings = getReadBindings(ringbufferId);
396 if (not bindings)
397 return 0;
398
399 for (const auto& rbuf : *bindings)
400 rbuf->discard(toDiscard, ringbufferId);
401
402 return toDiscard;
403}
404
405void
407{
408 std::lock_guard lk(stateLock_);
409
410 const auto bindings = getReadBindings(ringbufferId);
411 if (not bindings)
412 return;
413
414 for (const auto& rbuf : *bindings)
415 rbuf->flush(ringbufferId);
416}
417
418void
420{
421 std::lock_guard lk(stateLock_);
422
423 for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end();) {
424 if (const auto rb = item->second.lock()) {
425 rb->flushAll();
426 ++item;
427 } else {
428 // Use this version of erase to avoid using invalidated iterator
429 item = ringBufferMap_.erase(item);
430 }
431 }
432}
433
434bool
436{
437 std::lock_guard lk(stateLock_);
438 if (!id.empty()) {
439 if (auto rb = getRingBuffer(id)) {
440 return rb->isAudioMeterActive();
441 }
442 } else {
443 for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
444 if (const auto rb = item->second.lock()) {
445 if (rb->isAudioMeterActive()) {
446 return true;
447 }
448 }
449 }
450 }
451 return false;
452}
453
454void
455RingBufferPool::setAudioMeterState(const std::string& id, bool state)
456{
457 std::lock_guard lk(stateLock_);
458 if (!id.empty()) {
459 if (auto rb = getRingBuffer(id)) {
460 rb->setAudioMeterState(state);
461 }
462 } else {
463 for (auto item = ringBufferMap_.begin(); item != ringBufferMap_.end(); ++item) {
464 if (const auto rb = item->second.lock()) {
465 rb->setAudioMeterState(state);
466 }
467 }
468 }
469}
470
471} // namespace jami
std::shared_ptr< RingBuffer > createRingBuffer(const std::string &id)
Create a new ringbuffer with a default readoffset.
std::shared_ptr< RingBuffer > getRingBuffer(const std::string &id)
Obtain a shared pointer on a RingBuffer given by its ID.
size_t availableForGet(const std::string &ringbufferId) const
void unBindAllHalfDuplexIn(const std::string &sourceBufferId)
Detaches a source from all its readers.
size_t discard(size_t toDiscard, const std::string &ringbufferId)
bool waitForDataAvailable(const std::string &ringbufferId, const std::chrono::microseconds &max_wait) const
void setAudioMeterState(const std::string &id, bool state)
void bindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Attaches a reader the specified source.
std::shared_ptr< AudioFrame > getAvailableData(const std::string &ringbufferId)
void unBindAllHalfDuplexOut(const std::string &ringbufferId)
Detaches a reader from all his sources.
void setInternalSamplingRate(unsigned sr)
bool isAudioMeterActive(const std::string &id)
void bindRingBuffers(const std::string &ringbufferId1, const std::string &ringbufferId2)
Bind two RingBuffer together (full duplex).
std::shared_ptr< AudioFrame > getData(const std::string &ringbufferId)
static const char *const DEFAULT_ID
void unbindRingBuffers(const std::string &ringbufferId1, const std::string &ringbufferId2)
Unbind two RingBuffer (full duplex).
void unBindAll(const std::string &ringbufferId)
void flush(const std::string &ringbufferId)
void unBindHalfDuplexOut(const std::string &readerBufferId, const std::string &sourceBufferId)
Detaches a reader from the specified source.
void setInternalAudioFormat(AudioFormat format)
A ring buffer for mutichannel audio samples.
Definition ringbuffer.h:38
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DEBUG(formatstr,...)
Definition logger.h:226
#define JAMI_WARNING(formatstr,...)
Definition logger.h:227
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
void emitSignal(Args... args)
Definition ring_signal.h:64
static constexpr size_t SIZEBUF
Definition ring_types.h:31
Structure to hold sample rate and channel number associated with audio data.