Ring Daemon 16.0.0
Loading...
Searching...
No Matches
video_receive_thread.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "libav_deps.h" // MUST BE INCLUDED FIRST
20#include "media/media_decoder.h"
21#include "socket_pair.h"
22#include "manager.h"
23#include "client/videomanager.h"
24#include "sinkclient.h"
25#include "logger.h"
26
27extern "C" {
28#include <libavutil/display.h>
29}
30
31#include <unistd.h>
32#include <map>
33
34namespace jami {
35namespace video {
36
37using std::string;
38
40 bool useSink,
41 const std::string& sdp,
44 , args_()
45 , id_(id)
46 , useSink_(useSink)
47 , stream_(sdp)
48 , sdpContext_(stream_.str().size(), false, &readFunction, 0, 0, this)
49 , sink_ {Manager::instance().createSinkClient(id)}
50 , mtu_(mtu)
51 , loop_(std::bind(&VideoReceiveThread::setup, this),
52 std::bind(&VideoReceiveThread::decodeFrame, this),
53 std::bind(&VideoReceiveThread::cleanup, this))
54{
55 JAMI_DBG("[%p] Instance created", this);
56}
57
59{
60 loop_.join();
61 JAMI_DBG("[%p] Instance destroyed", this);
62}
63
64void
66{
67 JAMI_DBG("[%p] Starting receiver’s loop", this);
68 loop_.start();
69}
70
71void
73{
74 if (loop_.isStopping())
75 return;
76 JAMI_DBG("[%p] Stopping receiver’s loop and waiting for the thread to exit…", this);
77 loop_.stop();
78 loop_.join();
79 JAMI_DBG("[%p] Receiver’s thread exited", this);
80}
81
82// We do this setup here instead of the constructor because we don't want the
83// main thread to block while this executes, so it happens in the video thread.
84bool
85VideoReceiveThread::setup()
86{
87 JAMI_DBG("[%p] Setting up video receiver", this);
88
89 videoDecoder_.reset(new MediaDecoder([this](const std::shared_ptr<MediaFrame>& frame) mutable {
91 {
92 std::lock_guard l(rotationMtx_);
93 if (displayMatrix_)
94 displayMatrix.reset(av_buffer_ref(displayMatrix_.get()));
95 }
96 if (displayMatrix)
99 displayMatrix.release());
100 publishFrame(std::static_pointer_cast<VideoFrame>(frame));
101 }));
102 videoDecoder_->setContextCallback([this]() {
103 if (recorderCallback_)
104 recorderCallback_(getInfo());
105 });
106 videoDecoder_->setResolutionChangedCallback([this](int width, int height) {
107 dstWidth_ = width;
108 dstHeight_ = height;
109 sink_->setFrameSize(dstWidth_, dstHeight_);
110 });
111
112 dstWidth_ = args_.width;
113 dstHeight_ = args_.height;
114
115 static const std::string SDP_FILENAME = "dummyFilename";
116 if (args_.input.empty()) {
117 args_.format = "sdp";
118 args_.input = SDP_FILENAME;
119 } else if (args_.input.substr(0, strlen("/dev/video")) == "/dev/video") {
120 // it's a v4l device if starting with /dev/video
121 // FIXME: This is not a robust way of checking if we mean to use a
122 // v4l2 device
123 args_.format = "video4linux2";
124 }
125
126 videoDecoder_->setInterruptCallback(interruptCb, this);
127
128 if (args_.input == SDP_FILENAME) {
129 // Force custom_io so the SDP demuxer will not open any UDP connections
130 // We need it to use ICE transport.
131 args_.sdp_flags = "custom_io";
132
133 if (stream_.str().empty()) {
134 JAMI_ERR("No SDP loaded");
135 return false;
136 }
137
138 videoDecoder_->setIOContext(&sdpContext_);
139 }
140
141 args_.disable_dts_probe_delay = true;
142
143 if (videoDecoder_->openInput(args_)) {
144 JAMI_ERR("Unable to open input \"%s\"", args_.input.c_str());
145 return false;
146 }
147
148 if (args_.input == SDP_FILENAME) {
149 // Now replace our custom AVIOContext with one that will read packets
150 videoDecoder_->setIOContext(demuxContext_.get());
151 }
152 return true;
153}
154
155void
156VideoReceiveThread::cleanup()
157{
158 JAMI_DBG("[%p] Stopping receiver", this);
159
160 detach(sink_.get());
161 sink_->stop();
162
163 videoDecoder_.reset();
164}
165
166// This callback is used by libav internally to break out of blocking calls
167int
168VideoReceiveThread::interruptCb(void* data)
169{
170 const auto context = static_cast<VideoReceiveThread*>(data);
171 return not context->loop_.isRunning();
172}
173
174int
175VideoReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size)
176{
177 std::istream& is = static_cast<VideoReceiveThread*>(opaque)->stream_;
178 is.read(reinterpret_cast<char*>(buf), buf_size);
179
180 auto count = is.gcount();
181 if (count != 0)
182 return count;
183 else
184 return AVERROR_EOF;
185}
186
187void
189{
190 demuxContext_.reset(socketPair.createIOContext(mtu_));
191}
192
193void
195 const std::function<void(const MediaStream& ms)>& cb)
196{
197 recorderCallback_ = cb;
198 if (videoDecoder_)
199 videoDecoder_->setContextCallback([this]() {
200 if (recorderCallback_)
201 recorderCallback_(getInfo());
202 });
203}
204
205void
207{
208 if (not loop_.isRunning())
209 return;
210
211 if (not isVideoConfigured_) {
212 if (!configureVideoOutput()) {
213 JAMI_ERROR("[{:p}] Failed to configure video output", fmt::ptr(this));
214 return;
215 } else {
216 JAMI_LOG("[{:p}] Decoder configured, starting decoding", fmt::ptr(this));
217 }
218 }
219 auto status = videoDecoder_->decode();
220 if (status == MediaDemuxer::Status::EndOfFile) {
221 JAMI_LOG("[{:p}] End of file", fmt::ptr(this));
222 loop_.stop();
223 }
224 else if (status == MediaDemuxer::Status::ReadError) {
225 JAMI_ERROR("[{:p}] Decoding error: %s", fmt::ptr(this), MediaDemuxer::getStatusStr(status));
226 }
227 else if (status == MediaDemuxer::Status::FallBack) {
228 if (keyFrameRequestCallback_)
229 keyFrameRequestCallback_();
230 }
231}
232
233bool
234VideoReceiveThread::configureVideoOutput()
235{
236 assert(not isVideoConfigured_);
237
238 JAMI_DBG("[%p] Configuring video output", this);
239
240 if (not loop_.isRunning()) {
241 JAMI_WARN("[%p] Unable to configure video output, the loop is not running!", this);
242 return false;
243 }
244
245 if (videoDecoder_->setupVideo() < 0) {
246 JAMI_ERR("Decoder IO startup failed");
247 stopLoop();
248 return false;
249 }
250
251 // Default size from input video
252 if (dstWidth_ == 0 and dstHeight_ == 0) {
253 dstWidth_ = videoDecoder_->getWidth();
254 dstHeight_ = videoDecoder_->getHeight();
255 }
256
257 if (not sink_->start()) {
258 JAMI_ERR("RX: sink startup failed");
259 stopLoop();
260 return false;
261 }
262
263 if (useSink_)
264 startSink();
265
266 if (onSuccessfulSetup_)
267 onSuccessfulSetup_(MEDIA_VIDEO, 1);
268
269 return isVideoConfigured_ = true;
270}
271
272void
274{
275 JAMI_DBG("[%p] Stopping sink", this);
276
277 if (!loop_.isRunning())
278 return;
279
280 detach(sink_.get());
281 sink_->setFrameSize(0, 0);
282}
283
284void
286{
287 JAMI_DBG("[%p] Starting sink", this);
288
289 if (!loop_.isRunning())
290 return;
291
292 if (dstWidth_ > 0 and dstHeight_ > 0 and attach(sink_.get()))
293 sink_->setFrameSize(dstWidth_, dstHeight_);
294}
295
296int
298{
299 return dstWidth_;
300}
301
302int
304{
305 return dstHeight_;
306}
307
310{
311 if (videoDecoder_)
312 return videoDecoder_->getPixelFormat();
313 return {};
314}
315
318{
319 if (videoDecoder_)
320 return videoDecoder_->getStream("v:remote");
321 return {};
322}
323
324void
326{
328 av_display_rotation_set(reinterpret_cast<int32_t*>(displayMatrix->data), angle);
329 std::lock_guard l(rotationMtx_);
330 displayMatrix_ = std::move(displayMatrix);
331}
332
333} // namespace video
334} // namespace jami
Manager (controller) of daemon.
Definition manager.h:67
static const char * getStatusStr(Status status)
bool attach(Observer< std::shared_ptr< MediaFrame > > *o)
Definition observer.h:70
bool detach(Observer< std::shared_ptr< MediaFrame > > *o)
Definition observer.h:101
bool isStopping() const noexcept
Definition threadloop.h:52
bool isRunning() const noexcept
virtual void stop()
void setRotation(int angle)
Set angle of rotation to apply to the video by the decoder.
void setRecorderCallback(const std::function< void(const MediaStream &ms)> &cb)
void addIOContext(SocketPair &socketPair)
VideoReceiveThread(const std::string &id, bool useSink, const std::string &sdp, uint16_t mtu)
AVFrameSideData * av_frame_new_side_data_from_buf(AVFrame *frame, enum AVFrameSideDataType type, AVBufferRef *buf)
#define JAMI_ERR(...)
Definition logger.h:218
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_DBG(...)
Definition logger.h:216
#define JAMI_WARN(...)
Definition logger.h:217
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
std::unique_ptr< AVBufferRef, AVBufferRef_deleter > AVBufferPtr
Definition libav_utils.h:62
void emitSignal(Args... args)
Definition ring_signal.h:64
@ MEDIA_VIDEO
Definition media_codec.h:48
std::string format
std::string input
std::string sdp_flags