Ring Daemon
Loading...
Searching...
No Matches
ringbuffer.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "ringbuffer.h"
19#include "logger.h"
20#include "client/jami_signal.h"
21#include "media_buffer.h"
22#include "libav_deps.h"
23
24#include <chrono>
25#include <cstdlib>
26#include <cstring>
27#include <algorithm>
28
29namespace jami {
30
31static constexpr const int RMS_SIGNAL_INTERVAL = 5;
32
33RingBuffer::RingBuffer(const std::string& rbuf_id, AudioFormat format)
34 : id(rbuf_id)
35 , endPos_(0)
36 , format_(format)
37 , lock_()
38 , not_empty_()
39 , readoffsets_()
40 , resizer_(format_, static_cast<int>(format_.sample_rate) / 50, [this](std::shared_ptr<AudioFrame>&& frame) {
41 putToBuffer(std::move(frame));
42 })
43{
44 JAMI_LOG("Create new RingBuffer {}", id);
45}
46
48{
49 JAMI_LOG("Destroy RingBuffer {}", id);
50}
51
52void
54{
55 storeReadOffset(endPos_, ringbufferId);
56}
57
58void
60{
61 for (auto& offset : readoffsets_)
62 offset.second.offset = endPos_;
63}
64
65std::vector<std::string>
67{
68 std::vector<std::string> subscribers;
69 for (const auto& offset : readoffsets_) {
70 subscribers.push_back(offset.first);
71 }
72 return subscribers;
73}
74
75size_t
77{
78 const size_t buffer_size = buffer_.size();
79 if (buffer_size == 0)
80 return 0;
81 const size_t startPos = getSmallestReadOffset();
82 return (endPos_ + buffer_size - startPos) % buffer_size;
83}
84
85size_t
86RingBuffer::getLength(const std::string& ringbufferId) const
87{
88 const size_t buffer_size = buffer_.size();
89 if (buffer_size == 0)
90 return 0;
91 return (endPos_ + buffer_size - getReadOffset(ringbufferId)) % buffer_size;
92}
93
94void
96{
97 JAMI_DBG("Start=%zu; End=%zu; BufferSize=%zu", getSmallestReadOffset(), endPos_, buffer_.size());
98}
99
100size_t
101RingBuffer::getReadOffset(const std::string& ringbufferId) const
102{
103 auto iter = readoffsets_.find(ringbufferId);
104 return (iter != readoffsets_.end()) ? iter->second.offset : 0;
105}
106
107size_t
108RingBuffer::getSmallestReadOffset() const
109{
110 if (hasNoReadOffsets())
111 return 0;
112 size_t smallest = buffer_.size();
113 for (auto const& iter : readoffsets_)
114 smallest = std::min(smallest, iter.second.offset);
115 return smallest;
116}
117
118void
119RingBuffer::storeReadOffset(size_t offset, const std::string& ringbufferId)
120{
121 ReadOffsetMap::iterator iter = readoffsets_.find(ringbufferId);
122
123 if (iter != readoffsets_.end())
124 iter->second.offset = offset;
125 else
126 JAMI_ERROR("RingBuffer::storeReadOffset() failed: unknown ringbuffer '{}'", ringbufferId);
127}
128
129void
131{
132 std::lock_guard l(lock_);
133 if (!hasThisReadOffset(ringbufferId))
134 readoffsets_.emplace(ringbufferId, ReadOffset {endPos_, {}});
135}
136
137void
139{
140 std::lock_guard l(lock_);
141 auto iter = readoffsets_.find(ringbufferId);
142 if (iter != readoffsets_.end())
143 readoffsets_.erase(iter);
144}
145
146bool
147RingBuffer::hasThisReadOffset(const std::string& ringbufferId) const
148{
149 return readoffsets_.find(ringbufferId) != readoffsets_.end();
150}
151
152bool
153RingBuffer::hasNoReadOffsets() const
154{
155 return readoffsets_.empty();
156}
157
158//
159// For the writer only:
160//
161
162void
163RingBuffer::put(std::shared_ptr<AudioFrame>&& data)
164{
165 std::lock_guard l(writeLock_);
166 resizer_.enqueue(resampler_.resample(std::move(data), format_));
167}
168
169// This one puts some data inside the ring buffer.
170void
171RingBuffer::putToBuffer(std::shared_ptr<AudioFrame>&& data)
172{
173 std::lock_guard l(lock_);
174 const size_t buffer_size = buffer_.size();
175 if (buffer_size == 0)
176 return;
177
178 size_t len = buffer_size - putLength();
179 if (len == 0)
180 discard(1);
181
182 size_t pos = endPos_;
183
184 buffer_[pos] = std::move(data);
185 const auto& newBuf = buffer_[pos];
186 pos = (pos + 1) % buffer_size;
187
188 endPos_ = pos;
189
190 if (rmsSignal_) {
191 ++rmsFrameCount_;
192 rmsLevel_ += newBuf->calcRMS();
193 if (rmsFrameCount_ == RMS_SIGNAL_INTERVAL) {
195 rmsLevel_ = 0;
196 rmsFrameCount_ = 0;
197 }
198 }
199
200 for (auto& offset : readoffsets_) {
201 if (offset.second.callback)
202 offset.second.callback(newBuf);
203 }
204
205 not_empty_.notify_all();
206}
207
208//
209// For the reader only:
210//
211
212size_t
214{
215 // Used space
216 return getLength(ringbufferId);
217}
218
219std::shared_ptr<AudioFrame>
220RingBuffer::get(const std::string& ringbufferId)
221{
222 std::lock_guard l(lock_);
223
224 auto offset = readoffsets_.find(ringbufferId);
225 if (offset == readoffsets_.end())
226 return {};
227
228 const size_t buffer_size = buffer_.size();
229 if (buffer_size == 0)
230 return {};
231
232 size_t startPos = offset->second.offset;
233 size_t len = (endPos_ + buffer_size - startPos) % buffer_size;
234 if (len == 0)
235 return {};
236
237 auto ret = buffer_[startPos];
238 offset->second.offset = (startPos + 1) % buffer_size;
239 return ret;
240}
241
242size_t
244{
245 std::unique_lock l(lock_);
246
247 if (buffer_.empty())
248 return 0;
249 if (readoffsets_.find(ringbufferId) == readoffsets_.end())
250 return 0;
251
252 size_t getl = 0;
253 auto check = [=, &getl] {
254 // Re-find read_ptr: it may be destroyed during the wait
255 const size_t buffer_size = buffer_.size();
256 const auto read_ptr = readoffsets_.find(ringbufferId);
257 if (buffer_size == 0 || read_ptr == readoffsets_.end())
258 return true;
259 getl = (endPos_ + buffer_size - read_ptr->second.offset) % buffer_size;
260 return getl != 0;
261 };
262
263 if (deadline == time_point::max()) {
264 // no timeout provided, wait as long as necessary
265 not_empty_.wait(l, check);
266 } else {
267 not_empty_.wait_until(l, deadline, check);
268 }
269
270 return getl;
271}
272
273size_t
274RingBuffer::discard(size_t toDiscard, const std::string& ringbufferId)
275{
276 std::lock_guard l(lock_);
277
278 const size_t buffer_size = buffer_.size();
279 if (buffer_size == 0)
280 return 0;
281
282 auto offset = readoffsets_.find(ringbufferId);
283 if (offset == readoffsets_.end())
284 return 0;
285
286 size_t len = (endPos_ + buffer_size - offset->second.offset) % buffer_size;
287 toDiscard = std::min(toDiscard, len);
288
289 offset->second.offset = (offset->second.offset + toDiscard) % buffer_size;
290 return toDiscard;
291}
292
293size_t
295{
296 const size_t buffer_size = buffer_.size();
297 if (buffer_size == 0)
298 return 0;
299
300 for (auto& r : readoffsets_) {
301 size_t dst = (r.second.offset + buffer_size - endPos_) % buffer_size;
302 if (dst < toDiscard)
303 r.second.offset = (r.second.offset + toDiscard - dst) % buffer_size;
304 }
305 return toDiscard;
306}
307
308} // namespace jami
void enqueue(std::shared_ptr< AudioFrame > &&frame)
Write @frame's data to the queue.
int resample(const AVFrame *input, AVFrame *output)
Resample a frame.
void debug()
Debug function print mEnd, mStart, mBufferSize.
RingBuffer(const std::string &id, AudioFormat format=AudioFormat::MONO())
Constructor.
clock::time_point time_point
Definition ringbuffer.h:40
void removeReadOffset(const std::string &ringbufferId)
Remove a readoffset for this ringbuffer.
void flush(const std::string &ringbufferId)
Reset the counters to 0 for this read offset.
void createReadOffset(const std::string &ringbufferId)
Add a new readoffset for this ringbuffer.
size_t availableForGet(const std::string &ringbufferId) const
To get how much samples are available in the buffer to read in.
std::shared_ptr< AudioFrame > get(const std::string &ringbufferId)
Get data in the ring buffer.
std::vector< std::string > getSubscribers()
Return the list of subscribers (Ring buffers Id that are reading this ring buffer).
size_t getLength(const std::string &ringbufferId) const
size_t discard(size_t toDiscard, const std::string &ringbufferId)
Discard data from the buffer.
void put(std::shared_ptr< AudioFrame > &&data)
Write data in the ring buffer.
size_t putLength() const
Total length of the ring buffer which is available for "putting".
size_t waitForDataAvailable(const std::string &ringbufferId, const time_point &deadline=time_point::max()) const
Blocks until min_data_length samples of data is available, or until deadline has passed.
~RingBuffer()
Destructor.
#define JAMI_ERROR(formatstr,...)
Definition logger.h:243
#define JAMI_DBG(...)
Definition logger.h:228
#define JAMI_LOG(formatstr,...)
Definition logger.h:237
void emitSignal(Args... args)
Definition jami_signal.h:64
static constexpr const int RMS_SIGNAL_INTERVAL
Structure to hold sample rate and channel number associated with audio data.