Ring Daemon
Loading...
Searching...
No Matches
video_receive_thread.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "libav_deps.h" // MUST BE INCLUDED FIRST
20#include "media/media_decoder.h"
21#include "socket_pair.h"
22#include "manager.h"
23#include "logger.h"
24
25extern "C" {
26#include <libavutil/display.h>
27}
28
29#include <unistd.h>
30
31namespace jami {
32namespace video {
33
34using std::string;
35
36VideoReceiveThread::VideoReceiveThread(const std::string& id, bool useSink, const std::string& sdp, uint16_t mtu)
38 , args_()
39 , id_(id)
40 , useSink_(useSink)
41 , stream_(sdp)
42 , sdpContext_(stream_.str().size(), false, &readFunction, 0, 0, this)
43 , sink_ {Manager::instance().createSinkClient(id)}
44 , mtu_(mtu)
45 , loop_(std::bind(&VideoReceiveThread::setup, this),
46 std::bind(&VideoReceiveThread::decodeFrame, this),
47 std::bind(&VideoReceiveThread::cleanup, this))
48{
49 JAMI_DBG("[%p] Instance created", this);
50}
51
53{
54 loop_.join();
55 JAMI_DBG("[%p] Instance destroyed", this);
56}
57
58void
60{
61 JAMI_DBG("[%p] Starting receiver’s loop", this);
62 loop_.start();
63}
64
65void
67{
68 if (loop_.isStopping())
69 return;
70 JAMI_DBG("[%p] Stopping receiver’s loop and waiting for the thread to exit…", this);
71 loop_.stop();
72 loop_.join();
73 JAMI_DBG("[%p] Receiver’s thread exited", this);
74}
75
76// We do this setup here instead of the constructor because we don't want the
77// main thread to block while this executes, so it happens in the video thread.
78bool
79VideoReceiveThread::setup()
80{
81 JAMI_DBG("[%p] Setting up video receiver", this);
82
83 videoDecoder_.reset(new MediaDecoder([this](const std::shared_ptr<MediaFrame>& frame) mutable {
85 {
86 std::lock_guard l(rotationMtx_);
87 if (displayMatrix_)
88 displayMatrix.reset(av_buffer_ref(displayMatrix_.get()));
89 }
90 if (displayMatrix)
92 publishFrame(std::static_pointer_cast<VideoFrame>(frame));
93 }));
94 videoDecoder_->setContextCallback([this]() {
95 if (recorderCallback_)
96 recorderCallback_(getInfo());
97 });
98 videoDecoder_->setResolutionChangedCallback([this](int width, int height) {
99 dstWidth_ = width;
100 dstHeight_ = height;
101 sink_->setFrameSize(dstWidth_, dstHeight_);
102 });
103
104 dstWidth_ = static_cast<int>(args_.width);
105 dstHeight_ = static_cast<int>(args_.height);
106
107 static const std::string SDP_FILENAME = "dummyFilename";
108 if (args_.input.empty()) {
109 args_.format = "sdp";
110 args_.input = SDP_FILENAME;
111 } else if (args_.input.substr(0, strlen("/dev/video")) == "/dev/video") {
112 // it's a v4l device if starting with /dev/video
113 // FIXME: This is not a robust way of checking if we mean to use a
114 // v4l2 device
115 args_.format = "video4linux2";
116 }
117
118 videoDecoder_->setInterruptCallback(interruptCb, this);
119
120 if (args_.input == SDP_FILENAME) {
121 // Force custom_io so the SDP demuxer will not open any UDP connections
122 // We need it to use ICE transport.
123 args_.sdp_flags = "custom_io";
124
125 if (stream_.str().empty()) {
126 JAMI_ERR("No SDP loaded");
127 return false;
128 }
129
130 videoDecoder_->setIOContext(&sdpContext_);
131 }
132
133 args_.disable_dts_probe_delay = true;
134
135 if (videoDecoder_->openInput(args_)) {
136 JAMI_ERR("Unable to open input \"%s\"", args_.input.c_str());
137 return false;
138 }
139
140 videoDecoder_->setKeyFrameRequestCb(keyFrameRequestCallback_);
141
142 if (args_.input == SDP_FILENAME) {
143 // Now replace our custom AVIOContext with one that will read packets
144 videoDecoder_->setIOContext(demuxContext_.get());
145 }
146 return true;
147}
148
149void
150VideoReceiveThread::cleanup()
151{
152 JAMI_DBG("[%p] Stopping receiver", this);
153
154 detach(sink_.get());
155 sink_->stop();
156
157 videoDecoder_.reset();
158}
159
160// This callback is used by libav internally to break out of blocking calls
161int
162VideoReceiveThread::interruptCb(void* data)
163{
164 auto* const context = static_cast<VideoReceiveThread*>(data);
165 return not context->loop_.isRunning();
166}
167
168int
169VideoReceiveThread::readFunction(void* opaque, uint8_t* buf, int buf_size)
170{
171 std::istream& is = static_cast<VideoReceiveThread*>(opaque)->stream_;
172 is.read(reinterpret_cast<char*>(buf), buf_size);
173
174 auto count = is.gcount();
175 if (count != 0)
176 return static_cast<int>(count);
177 else
178 return AVERROR_EOF;
179}
180
181void
183{
184 demuxContext_.reset(socketPair.createIOContext(mtu_));
185}
186
187void
188VideoReceiveThread::setRecorderCallback(const std::function<void(const MediaStream& ms)>& cb)
189{
190 recorderCallback_ = cb;
191 if (videoDecoder_)
192 videoDecoder_->setContextCallback([this]() {
193 if (recorderCallback_)
194 recorderCallback_(getInfo());
195 });
196}
197
198void
200{
201 if (not loop_.isRunning())
202 return;
203
204 if (not isVideoConfigured_) {
205 if (!configureVideoOutput()) {
206 JAMI_ERROR("[{:p}] Failed to configure video output", fmt::ptr(this));
207 return;
208 } else {
209 JAMI_LOG("[{:p}] Decoder configured, starting decoding", fmt::ptr(this));
210 }
211 }
212 auto status = videoDecoder_->decode();
213 if (status == MediaDemuxer::Status::EndOfFile) {
214 JAMI_LOG("[{:p}] End of file", fmt::ptr(this));
215 loop_.stop();
216 } else if (status == MediaDemuxer::Status::ReadError) {
217 JAMI_ERROR("[{:p}] Decoding error: %s", fmt::ptr(this), MediaDemuxer::getStatusStr(status));
218 } else if (status == MediaDemuxer::Status::FallBack) {
219 if (keyFrameRequestCallback_)
220 keyFrameRequestCallback_();
221 }
222}
223
224bool
225VideoReceiveThread::configureVideoOutput()
226{
227 assert(not isVideoConfigured_);
228
229 JAMI_DBG("[%p] Configuring video output", this);
230
231 if (not loop_.isRunning()) {
232 JAMI_WARN("[%p] Unable to configure video output, the loop is not running!", this);
233 return false;
234 }
235
236 if (videoDecoder_->setupVideo() < 0) {
237 JAMI_ERR("Decoder IO startup failed");
238 stopLoop();
239 return false;
240 }
241
242 // Default size from input video
243 if (dstWidth_ == 0 and dstHeight_ == 0) {
244 dstWidth_ = videoDecoder_->getWidth();
245 dstHeight_ = videoDecoder_->getHeight();
246 }
247
248 if (not sink_->start()) {
249 JAMI_ERR("RX: sink startup failed");
250 stopLoop();
251 return false;
252 }
253
254 if (useSink_)
255 startSink();
256
257 if (onSuccessfulSetup_)
258 onSuccessfulSetup_(MEDIA_VIDEO, 1);
259
260 return isVideoConfigured_ = true;
261}
262
263void
265{
266 JAMI_DBG("[%p] Stopping sink", this);
267
268 if (!loop_.isRunning())
269 return;
270
271 detach(sink_.get());
272 sink_->setFrameSize(0, 0);
273}
274
275void
277{
278 JAMI_DBG("[%p] Starting sink", this);
279
280 if (!loop_.isRunning())
281 return;
282
283 if (dstWidth_ > 0 and dstHeight_ > 0 and attach(sink_.get()))
284 sink_->setFrameSize(dstWidth_, dstHeight_);
285}
286
287int
289{
290 return dstWidth_;
291}
292
293int
295{
296 return dstHeight_;
297}
298
301{
302 if (videoDecoder_)
303 return videoDecoder_->getPixelFormat();
304 return {};
305}
306
309{
310 if (videoDecoder_)
311 return videoDecoder_->getStream("v:remote");
312 return {};
313}
314
315void
317{
319 av_display_rotation_set(reinterpret_cast<int32_t*>(displayMatrix->data), angle);
320 std::lock_guard l(rotationMtx_);
321 displayMatrix_ = std::move(displayMatrix);
322}
323
324} // namespace video
325} // namespace jami
Manager (controller) of daemon.
Definition manager.h:66
static const char * getStatusStr(Status status)
bool attach(Observer< std::shared_ptr< MediaFrame > > *o)
Definition observer.h:69
bool detach(Observer< std::shared_ptr< MediaFrame > > *o)
Definition observer.h:100
bool isStopping() const noexcept
Definition threadloop.h:52
bool isRunning() const noexcept
virtual void stop()
void setRotation(int angle)
Set angle of rotation to apply to the video by the decoder.
void setRecorderCallback(const std::function< void(const MediaStream &ms)> &cb)
void addIOContext(SocketPair &socketPair)
VideoReceiveThread(const std::string &id, bool useSink, const std::string &sdp, uint16_t mtu)
AVFrameSideData * av_frame_new_side_data_from_buf(AVFrame *frame, enum AVFrameSideDataType type, AVBufferRef *buf)
#define JAMI_ERR(...)
Definition logger.h:230
#define JAMI_ERROR(formatstr,...)
Definition logger.h:243
#define JAMI_DBG(...)
Definition logger.h:228
#define JAMI_WARN(...)
Definition logger.h:229
#define JAMI_LOG(formatstr,...)
Definition logger.h:237
std::unique_ptr< AVBufferRef, AVBufferRef_deleter > AVBufferPtr
Definition libav_utils.h:66
void emitSignal(Args... args)
Definition jami_signal.h:64
@ MEDIA_VIDEO
Definition media_codec.h:47
std::string format
std::string input
std::string sdp_flags