Ring Daemon 16.0.0
Loading...
Searching...
No Matches
ringbuffer.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "ringbuffer.h"
19#include "logger.h"
20#include "client/ring_signal.h"
21#include "media_buffer.h"
22#include "libav_deps.h"
23
24#include <chrono>
25#include <cstdlib>
26#include <cstring>
27#include <algorithm>
28
29namespace jami {
30
31// corresponds to 160 ms (about 5 rtp packets)
32static const size_t MIN_BUFFER_SIZE = 1024;
33
34static constexpr const int RMS_SIGNAL_INTERVAL = 5;
35
36RingBuffer::RingBuffer(const std::string& rbuf_id, size_t /*size*/, AudioFormat format)
37 : id(rbuf_id)
38 , endPos_(0)
39 , format_(format)
40 , lock_()
41 , not_empty_()
42 , readoffsets_()
43 , resizer_(format_, format_.sample_rate / 50, [this](std::shared_ptr<AudioFrame>&& frame) {
44 putToBuffer(std::move(frame));
45 })
46{
47 JAMI_LOG("Create new RingBuffer {}", id);
48}
49
51{
52 JAMI_LOG("Destroy RingBuffer {}", id);
53}
54
55void
57{
58 storeReadOffset(endPos_, ringbufferId);
59}
60
61void
63{
64 for (auto& offset : readoffsets_)
65 offset.second.offset = endPos_;
66}
67
68std::vector<std::string> RingBuffer::getSubscribers() {
69 std::vector<std::string> subscribers;
70 for (const auto &offset: readoffsets_) {
71 subscribers.push_back(offset.first);
72 }
73 return subscribers;
74}
75
76size_t
78{
79 const size_t buffer_size = buffer_.size();
80 if (buffer_size == 0)
81 return 0;
82 const size_t startPos = getSmallestReadOffset();
83 return (endPos_ + buffer_size - startPos) % buffer_size;
84}
85
86size_t
87RingBuffer::getLength(const std::string& ringbufferId) const
88{
89 const size_t buffer_size = buffer_.size();
90 if (buffer_size == 0)
91 return 0;
92 return (endPos_ + buffer_size - getReadOffset(ringbufferId)) % buffer_size;
93}
94
95void
97{
98 JAMI_DBG("Start=%zu; End=%zu; BufferSize=%zu", getSmallestReadOffset(), endPos_, buffer_.size());
99}
100
101size_t
102RingBuffer::getReadOffset(const std::string& ringbufferId) const
103{
104 auto iter = readoffsets_.find(ringbufferId);
105 return (iter != readoffsets_.end()) ? iter->second.offset : 0;
106}
107
108size_t
109RingBuffer::getSmallestReadOffset() const
110{
111 if (hasNoReadOffsets())
112 return 0;
113 size_t smallest = buffer_.size();
114 for (auto const& iter : readoffsets_)
115 smallest = std::min(smallest, iter.second.offset);
116 return smallest;
117}
118
119void
120RingBuffer::storeReadOffset(size_t offset, const std::string& ringbufferId)
121{
122 ReadOffsetMap::iterator iter = readoffsets_.find(ringbufferId);
123
124 if (iter != readoffsets_.end())
125 iter->second.offset = offset;
126 else
127 JAMI_ERROR("RingBuffer::storeReadOffset() failed: unknown ringbuffer '{}'", ringbufferId);
128}
129
130void
132{
133 std::lock_guard l(lock_);
134 if (!hasThisReadOffset(ringbufferId))
135 readoffsets_.emplace(ringbufferId, ReadOffset {endPos_, {}});
136}
137
138void
140{
141 std::lock_guard l(lock_);
142 auto iter = readoffsets_.find(ringbufferId);
143 if (iter != readoffsets_.end())
144 readoffsets_.erase(iter);
145}
146
147bool
148RingBuffer::hasThisReadOffset(const std::string& ringbufferId) const
149{
150 return readoffsets_.find(ringbufferId) != readoffsets_.end();
151}
152
153bool
154RingBuffer::hasNoReadOffsets() const
155{
156 return readoffsets_.empty();
157}
158
159//
160// For the writer only:
161//
162
163void
164RingBuffer::put(std::shared_ptr<AudioFrame>&& data)
165{
166 std::lock_guard l(writeLock_);
167 resizer_.enqueue(resampler_.resample(std::move(data), format_));
168}
169
170// This one puts some data inside the ring buffer.
171void
172RingBuffer::putToBuffer(std::shared_ptr<AudioFrame>&& data)
173{
174 std::lock_guard l(lock_);
175 const size_t buffer_size = buffer_.size();
176 if (buffer_size == 0)
177 return;
178
179 size_t len = buffer_size - putLength();
180 if (len == 0)
181 discard(1);
182
183 size_t pos = endPos_;
184
185 buffer_[pos] = std::move(data);
186 const auto& newBuf = buffer_[pos];
187 pos = (pos + 1) % buffer_size;
188
189 endPos_ = pos;
190
191 if (rmsSignal_) {
192 ++rmsFrameCount_;
193 rmsLevel_ += newBuf->calcRMS();
194 if (rmsFrameCount_ == RMS_SIGNAL_INTERVAL) {
196 rmsLevel_ = 0;
197 rmsFrameCount_ = 0;
198 }
199 }
200
201 for (auto& offset : readoffsets_) {
202 if (offset.second.callback)
203 offset.second.callback(newBuf);
204 }
205
206 not_empty_.notify_all();
207}
208
209//
210// For the reader only:
211//
212
213size_t
215{
216 // Used space
217 return getLength(ringbufferId);
218}
219
220std::shared_ptr<AudioFrame>
221RingBuffer::get(const std::string& ringbufferId)
222{
223 std::lock_guard l(lock_);
224
225 auto offset = readoffsets_.find(ringbufferId);
226 if (offset == readoffsets_.end())
227 return {};
228
229 const size_t buffer_size = buffer_.size();
230 if (buffer_size == 0)
231 return {};
232
233 size_t startPos = offset->second.offset;
234 size_t len = (endPos_ + buffer_size - startPos) % buffer_size;
235 if (len == 0)
236 return {};
237
238 auto ret = buffer_[startPos];
239 offset->second.offset = (startPos + 1) % buffer_size;
240 return ret;
241}
242
243size_t
245{
246 std::unique_lock l(lock_);
247
248 if (buffer_.empty())
249 return 0;
250 if (readoffsets_.find(ringbufferId) == readoffsets_.end())
251 return 0;
252
253 size_t getl = 0;
254 auto check = [=, &getl] {
255 // Re-find read_ptr: it may be destroyed during the wait
256 const size_t buffer_size = buffer_.size();
257 const auto read_ptr = readoffsets_.find(ringbufferId);
258 if (buffer_size == 0 || read_ptr == readoffsets_.end())
259 return true;
260 getl = (endPos_ + buffer_size - read_ptr->second.offset) % buffer_size;
261 return getl != 0;
262 };
263
264 if (deadline == time_point::max()) {
265 // no timeout provided, wait as long as necessary
266 not_empty_.wait(l, check);
267 } else {
268 not_empty_.wait_until(l, deadline, check);
269 }
270
271 return getl;
272}
273
274size_t
275RingBuffer::discard(size_t toDiscard, const std::string& ringbufferId)
276{
277 std::lock_guard l(lock_);
278
279 const size_t buffer_size = buffer_.size();
280 if (buffer_size == 0)
281 return 0;
282
283 auto offset = readoffsets_.find(ringbufferId);
284 if (offset == readoffsets_.end())
285 return 0;
286
287 size_t len = (endPos_ + buffer_size - offset->second.offset) % buffer_size;
288 toDiscard = std::min(toDiscard, len);
289
290 offset->second.offset = (offset->second.offset + toDiscard) % buffer_size;
291 return toDiscard;
292}
293
294size_t
296{
297 const size_t buffer_size = buffer_.size();
298 if (buffer_size == 0)
299 return 0;
300
301 for (auto& r : readoffsets_) {
302 size_t dst = (r.second.offset + buffer_size - endPos_) % buffer_size;
303 if (dst < toDiscard)
304 r.second.offset = (r.second.offset + toDiscard - dst) % buffer_size;
305 }
306 return toDiscard;
307}
308
309} // namespace jami
void enqueue(std::shared_ptr< AudioFrame > &&frame)
Write @frame's data to the queue.
int resample(const AVFrame *input, AVFrame *output)
Resample a frame.
void debug()
Debug function print mEnd, mStart, mBufferSize.
clock::time_point time_point
Definition ringbuffer.h:41
void removeReadOffset(const std::string &ringbufferId)
Remove a readoffset for this ringbuffer.
void flush(const std::string &ringbufferId)
Reset the counters to 0 for this read offset.
void createReadOffset(const std::string &ringbufferId)
Add a new readoffset for this ringbuffer.
size_t availableForGet(const std::string &ringbufferId) const
To get how much samples are available in the buffer to read in.
std::shared_ptr< AudioFrame > get(const std::string &ringbufferId)
Get data in the ring buffer.
std::vector< std::string > getSubscribers()
Return the list of subscribers (Ring buffers Id that are reading this ring buffer).
size_t getLength(const std::string &ringbufferId) const
size_t discard(size_t toDiscard, const std::string &ringbufferId)
Discard data from the buffer.
void put(std::shared_ptr< AudioFrame > &&data)
Write data in the ring buffer.
size_t putLength() const
Total length of the ring buffer which is available for "putting".
size_t waitForDataAvailable(const std::string &ringbufferId, const time_point &deadline=time_point::max()) const
Blocks until min_data_length samples of data is available, or until deadline has passed.
~RingBuffer()
Destructor.
RingBuffer(const std::string &id, size_t size, AudioFormat format=AudioFormat::MONO())
Constructor.
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DBG(...)
Definition logger.h:216
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
void emitSignal(Args... args)
Definition ring_signal.h:64
static constexpr const int RMS_SIGNAL_INTERVAL
static const size_t MIN_BUFFER_SIZE
Structure to hold sample rate and channel number associated with audio data.