19#include <dhtnet/ip_utils.h>
27#include <dhtnet/ice_socket.h>
42#define SOCK_NONBLOCK FIONBIO
44#define close(x) closesocket(x)
48#include <asm-generic/fcntl.h>
49#define SOCK_NONBLOCK O_NONBLOCK
57#define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
60#define Swap4Bytes(val) \
61 ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
62 | (((val) << 24) & 0xFF000000))
65#define Swap8Bytes(val) \
66 ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
67 | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
68 | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
69 | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
93 throw std::runtime_error(
"Unable to set crypto on output");
100 throw std::runtime_error(
"Unable to set crypto on input");
183 JAMI_DBG(
"[%p] Creating instance using ICE sockets for comp %d and %d",
185 rtp_sock_->getCompId(),
186 rtcp_sock_->getCompId());
188 rtp_sock_->setOnRecv([
this](
uint8_t*
buf,
size_t len) {
189 std::lock_guard
l(dataBuffMutex_);
190 rtpDataBuff_.emplace_back(
buf,
buf + len);
194 rtcp_sock_->setOnRecv([
this](
uint8_t*
buf,
size_t len) {
195 std::lock_guard
l(dataBuffMutex_);
196 rtcpDataBuff_.emplace_back(
buf,
buf + len);
206 JAMI_DBG(
"[%p] Instance destroyed",
this);
212 std::unique_lock lock(rtcpInfo_mutex_);
213 return cvRtcpPacketReadyToRead_.wait_for(lock,
interval, [
this] {
214 return interrupted_
or not listRtcpRRHeader_.empty()
or not listRtcpREMBHeader_.empty();
219SocketPair::saveRtcpRRPacket(
uint8_t*
buf,
size_t len)
228 std::lock_guard lock(rtcpInfo_mutex_);
230 if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
231 listRtcpRRHeader_.pop_front();
234 listRtcpRRHeader_.emplace_back(*
header);
236 cvRtcpPacketReadyToRead_.notify_one();
240SocketPair::saveRtcpREMBPacket(
uint8_t*
buf,
size_t len)
242 if (len <
sizeof(rtcpREMBHeader))
245 auto*
header =
reinterpret_cast<rtcpREMBHeader*
>(
buf);
249 if (
header->uid != 0x424D4552)
252 std::lock_guard lock(rtcpInfo_mutex_);
254 if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
255 listRtcpREMBHeader_.pop_front();
258 listRtcpREMBHeader_.push_back(*
header);
260 cvRtcpPacketReadyToRead_.notify_one();
263std::list<rtcpRRHeader>
266 std::lock_guard lock(rtcpInfo_mutex_);
267 return std::move(listRtcpRRHeader_);
270std::list<rtcpREMBHeader>
273 std::lock_guard lock(rtcpInfo_mutex_);
274 return std::move(listRtcpREMBHeader_);
286 JAMI_WARN(
"[%p] Interrupting RTP sockets",
this);
289 rtp_sock_->setOnRecv(
nullptr);
291 rtcp_sock_->setOnRecv(
nullptr);
293 cvRtcpPacketReadyToRead_.notify_all();
299 JAMI_DBG(
"[%p] Read operations in blocking mode [%s]",
this,
block ?
"YES" :
"NO");
300 readBlockingMode_ =
block;
302 cvRtcpPacketReadyToRead_.notify_all();
314 if (rtcpHandle_ > 0
and close(rtcpHandle_))
316 if (rtpHandle_ > 0
and close(rtpHandle_))
334 rtpDestAddr_ = dhtnet::IpAddr {hostname};
336 rtcpDestAddr_ = dhtnet::IpAddr {hostname};
343 JAMI_ERR(
"[%p] Sockets creation failed",
this);
344 throw std::runtime_error(
"Sockets creation failed");
347 JAMI_WARN(
"SocketPair: local{%d,%d} / %s{%d,%d}",
361 else if (rtpDestAddr_.getFamily() ==
AF_INET6)
371 reinterpret_cast<void*
>(
this));
375SocketPair::waitForData()
378 if (rtpHandle_ >= 0) {
386 if (
not readBlockingMode_) {
407 std::unique_lock
lk(dataBuffMutex_);
408 cv_.wait(
lk, [
this] {
409 return interrupted_
or not rtpDataBuff_.empty()
or not rtcpDataBuff_.empty()
or not readBlockingMode_;
425 if (rtpHandle_ >= 0) {
428 return static_cast<int>(
recvfrom(rtpHandle_,
429 static_cast<char*
>(
buf),
437 std::unique_lock
lk(dataBuffMutex_);
438 if (
not rtpDataBuff_.empty()) {
439 auto pkt = std::move(rtpDataBuff_.front());
440 rtpDataBuff_.pop_front();
444 std::copy_n(
pkt.begin(), len,
static_cast<char*
>(
buf));
455 if (rtcpHandle_ >= 0) {
458 return static_cast<int>(
recvfrom(rtcpHandle_,
459 static_cast<char*
>(
buf),
467 std::unique_lock
lk(dataBuffMutex_);
468 if (
not rtcpDataBuff_.empty()) {
469 auto pkt = std::move(rtcpDataBuff_.front());
470 rtcpDataBuff_.pop_front();
474 std::copy_n(
pkt.begin(), len,
static_cast<char*
>(
buf));
494 auto*
header =
reinterpret_cast<rtcpRRHeader*
>(
buf);
499 lastRR_time = std::chrono::steady_clock::now();
500 saveRtcpRRPacket(
buf, len);
503 else if (
header->pt == 206)
504 saveRtcpREMBPacket(
buf, len);
506 else if (
header->pt == 200) {
509 JAMI_DBG(
"Unable to read RTCP: unknown packet type %u",
header->pt);
546 if (packetLossCallback_
and (
buf[2] << 8 |
buf[3]) != lastSeqNumIn_ + 1)
547 packetLossCallback_();
548 lastSeqNumIn_ =
buf[2] << 8 |
buf[3];
565 if (rtpHandle_ >= 0) {
583 return static_cast<int>(
592 return static_cast<int>(rtcp_sock_->send(
buf,
buf_size));
594 return static_cast<int>(rtp_sock_->send(
buf,
buf_size));
613 srtpContext_->encryptbuf,
614 sizeof(srtpContext_->encryptbuf));
620 buf = srtpContext_->encryptbuf;
625 if (
isRTCP &&
static_cast<unsigned>(
buf_size) >=
sizeof(rtcpRRHeader)) {
626 auto*
header =
reinterpret_cast<rtcpRRHeader*
>(
buf);
638 auto*
header =
reinterpret_cast<rtcpSRHeader*
>(
buf);
644 if (lastSRTS_ != 0 && lastDLSR_ != 0) {
645 if (histoLatency_.size() >= MAX_LIST_SIZE)
646 histoLatency_.pop_front();
657 }
else if (
buf[1] == 201)
669 if (
not histoLatency_.empty())
670 return histoLatency_.back();
678 rtpDelayCallback_ = std::move(
cb);
690 if (lastSendTS_ == 0.0f) {
692 lastReceiveTS_ = std::chrono::steady_clock::now();
701 std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
703 std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_).count());
704 lastReceiveTS_ = arrival_TS;
734 return srtpContext_->srtp_out.seq_largest;
735 JAMI_ERR(
"SRTP context not found.");
uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH]
SRTPProtoContext(const char *out_suite, const char *out_key, const char *in_suite, const char *in_key)
void openSockets(const char *uri, int localPort)
void setReadBlockingMode(bool blocking)
void setRtpDelayCallback(std::function< void(int, int)> cb)
int writeData(uint8_t *buf, int buf_size)
MediaIOHandle * createIOContext(const uint16_t mtu)
std::list< rtcpREMBHeader > getRtcpREMB()
std::list< rtcpRRHeader > getRtcpRR()
bool waitForRTCP(std::chrono::seconds interval)
SocketPair(const char *uri, int localPort)
void createSRTP(const char *out_suite, const char *out_params, const char *in_suite, const char *in_params)
void stopSendOp(bool state=true)
void jami_secure_memzero(void *ptr, size_t length)
static int ff_network_wait_fd(int fd)
rational< I > abs(const rational< I > &r)
static constexpr auto UDP_HEADER_SIZE
void emitSignal(Args... args)
static int udp_socket_create(int family, int port)
static constexpr auto SRTP_OVERHEAD
static constexpr unsigned MINIMUM_RTP_HEADER_SIZE
void strErr()
Thread-safe function to print the stringified contents of errno.
static constexpr int RTP_MAX_PACKET_LENGTH
static constexpr uint32_t RTCP_RR_FRACTION_MASK
static constexpr int NET_POLL_TIMEOUT
int ff_srtp_encrypt(struct SRTPContext *s, const uint8_t *in, int len, uint8_t *out, int outlen)
int ff_srtp_set_crypto(struct SRTPContext *s, const char *suite, const char *params)
#define RTP_PT_IS_RTCP(x)
int ff_srtp_decrypt(struct SRTPContext *s, uint8_t *buf, int *lenptr)
void ff_srtp_free(struct SRTPContext *s)