19#include <dhtnet/ip_utils.h>
28#include <dhtnet/ice_socket.h>
46#define SOCK_NONBLOCK FIONBIO
48#define close(x) closesocket(x)
52#include <asm-generic/fcntl.h>
53#define SOCK_NONBLOCK O_NONBLOCK
61#define Swap2Bytes(val) ((((val) >> 8) & 0x00FF) | (((val) << 8) & 0xFF00))
64#define Swap4Bytes(val) \
65 ((((val) >> 24) & 0x000000FF) | (((val) >> 8) & 0x0000FF00) | (((val) << 8) & 0x00FF0000) \
66 | (((val) << 24) & 0xFF000000))
69#define Swap8Bytes(val) \
70 ((((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) \
71 | (((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) \
72 | (((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) \
73 | (((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000))
100 throw std::runtime_error(
"Unable to set crypto on output");
107 throw std::runtime_error(
"Unable to set crypto on input");
190 JAMI_DBG(
"[%p] Creating instance using ICE sockets for comp %d and %d",
192 rtp_sock_->getCompId(),
193 rtcp_sock_->getCompId());
195 rtp_sock_->setOnRecv([
this](
uint8_t*
buf,
size_t len) {
196 std::lock_guard
l(dataBuffMutex_);
197 rtpDataBuff_.emplace_back(
buf,
buf + len);
201 rtcp_sock_->setOnRecv([
this](
uint8_t*
buf,
size_t len) {
202 std::lock_guard
l(dataBuffMutex_);
203 rtcpDataBuff_.emplace_back(
buf,
buf + len);
213 JAMI_DBG(
"[%p] Instance destroyed",
this);
219 std::unique_lock lock(rtcpInfo_mutex_);
220 return cvRtcpPacketReadyToRead_.wait_for(lock,
interval, [
this] {
221 return interrupted_
or not listRtcpRRHeader_.empty()
or not listRtcpREMBHeader_.empty();
226SocketPair::saveRtcpRRPacket(
uint8_t*
buf,
size_t len)
232 if (header->pt != 201)
235 std::lock_guard lock(rtcpInfo_mutex_);
237 if (listRtcpRRHeader_.size() >= MAX_LIST_SIZE) {
238 listRtcpRRHeader_.pop_front();
241 listRtcpRRHeader_.emplace_back(*header);
243 cvRtcpPacketReadyToRead_.notify_one();
247SocketPair::saveRtcpREMBPacket(
uint8_t*
buf,
size_t len)
249 if (len <
sizeof(rtcpREMBHeader))
252 auto header =
reinterpret_cast<rtcpREMBHeader*
>(
buf);
253 if (header->pt != 206)
256 if (header->uid != 0x424D4552)
259 std::lock_guard lock(rtcpInfo_mutex_);
261 if (listRtcpREMBHeader_.size() >= MAX_LIST_SIZE) {
262 listRtcpREMBHeader_.pop_front();
265 listRtcpREMBHeader_.push_back(*header);
267 cvRtcpPacketReadyToRead_.notify_one();
270std::list<rtcpRRHeader>
273 std::lock_guard lock(rtcpInfo_mutex_);
274 return std::move(listRtcpRRHeader_);
277std::list<rtcpREMBHeader>
280 std::lock_guard lock(rtcpInfo_mutex_);
281 return std::move(listRtcpREMBHeader_);
296 JAMI_WARN(
"[%p] Interrupting RTP sockets",
this);
299 rtp_sock_->setOnRecv(
nullptr);
301 rtcp_sock_->setOnRecv(
nullptr);
303 cvRtcpPacketReadyToRead_.notify_all();
309 JAMI_DBG(
"[%p] Read operations in blocking mode [%s]",
this,
block ?
"YES" :
"NO");
310 readBlockingMode_ =
block;
312 cvRtcpPacketReadyToRead_.notify_all();
324 if (rtcpHandle_ > 0
and close(rtcpHandle_))
326 if (rtpHandle_ > 0
and close(rtpHandle_))
344 rtpDestAddr_ = dhtnet::IpAddr {hostname};
346 rtcpDestAddr_ = dhtnet::IpAddr {hostname};
353 JAMI_ERR(
"[%p] Sockets creation failed",
this);
354 throw std::runtime_error(
"Sockets creation failed");
357 JAMI_WARN(
"SocketPair: local{%d,%d} / %s{%d,%d}",
371 else if (rtpDestAddr_.getFamily() ==
AF_INET6)
385 reinterpret_cast<void*
>(
this));
389SocketPair::waitForData()
392 if (rtpHandle_ >= 0) {
400 if (
not readBlockingMode_) {
421 std::unique_lock
lk(dataBuffMutex_);
422 cv_.wait(
lk, [
this] {
423 return interrupted_
or not rtpDataBuff_.empty()
or not rtcpDataBuff_.empty()
424 or not readBlockingMode_;
440 if (rtpHandle_ >= 0) {
444 static_cast<char*
>(
buf),
452 std::unique_lock
lk(dataBuffMutex_);
453 if (
not rtpDataBuff_.empty()) {
454 auto pkt = std::move(rtpDataBuff_.front());
455 rtpDataBuff_.pop_front();
459 std::copy_n(
pkt.begin(), len,
static_cast<char*
>(
buf));
470 if (rtcpHandle_ >= 0) {
474 static_cast<char*
>(
buf),
482 std::unique_lock
lk(dataBuffMutex_);
483 if (
not rtcpDataBuff_.empty()) {
484 auto pkt = std::move(rtcpDataBuff_.front());
485 rtcpDataBuff_.pop_front();
489 std::copy_n(
pkt.begin(), len,
static_cast<char*
>(
buf));
509 auto header =
reinterpret_cast<rtcpRRHeader*
>(
buf);
511 if (header->pt == 201) {
514 lastRR_time = std::chrono::steady_clock::now();
515 saveRtcpRRPacket(
buf, len);
518 else if (header->pt == 206)
519 saveRtcpREMBPacket(
buf, len);
521 else if (header->pt == 200) {
524 JAMI_DBG(
"Unable to read RTCP: unknown packet type %u", header->pt);
561 if (packetLossCallback_
and (
buf[2] << 8 |
buf[3]) != lastSeqNumIn_ + 1)
562 packetLossCallback_();
563 lastSeqNumIn_ =
buf[2] << 8 |
buf[3];
580 if (rtpHandle_ >= 0) {
599 reinterpret_cast<const char*
>(
buf),
632 srtpContext_->encryptbuf,
633 sizeof(srtpContext_->encryptbuf));
639 buf = srtpContext_->encryptbuf;
644 if (
isRTCP &&
static_cast<unsigned>(
buf_size) >=
sizeof(rtcpRRHeader)) {
645 auto header =
reinterpret_cast<rtcpRRHeader*
>(
buf);
646 rtcpPacketLoss_ = (header->pt == 201
658 auto header =
reinterpret_cast<rtcpSRHeader*
>(
buf);
664 if (lastSRTS_ != 0 && lastDLSR_ != 0) {
665 if (histoLatency_.size() >= MAX_LIST_SIZE)
666 histoLatency_.pop_front();
677 }
else if (
buf[1] == 201)
689 if (
not histoLatency_.empty())
690 return histoLatency_.back();
698 rtpDelayCallback_ = std::move(
cb);
710 if (
not lastSendTS_) {
712 lastReceiveTS_ = std::chrono::steady_clock::now();
721 std::chrono::steady_clock::time_point arrival_TS = std::chrono::steady_clock::now();
722 auto deltaR = std::chrono::duration_cast<std::chrono::milliseconds>(arrival_TS - lastReceiveTS_)
724 lastReceiveTS_ = arrival_TS;
754 return srtpContext_->srtp_out.seq_largest;
755 JAMI_ERR(
"SRTP context not found.");
uint8_t encryptbuf[RTP_MAX_PACKET_LENGTH]
SRTPProtoContext(const char *out_suite, const char *out_key, const char *in_suite, const char *in_key)
void openSockets(const char *uri, int localPort)
void setReadBlockingMode(bool blocking)
void setRtpDelayCallback(std::function< void(int, int)> cb)
int writeData(uint8_t *buf, int buf_size)
MediaIOHandle * createIOContext(const uint16_t mtu)
std::list< rtcpREMBHeader > getRtcpREMB()
std::list< rtcpRRHeader > getRtcpRR()
bool waitForRTCP(std::chrono::seconds interval)
SocketPair(const char *uri, int localPort)
void createSRTP(const char *out_suite, const char *out_params, const char *in_suite, const char *in_params)
void stopSendOp(bool state=true)
void ring_secure_memzero(void *ptr, size_t length)
static int ff_network_wait_fd(int fd)
rational< I > abs(const rational< I > &r)
static constexpr auto UDP_HEADER_SIZE
void emitSignal(Args... args)
static int udp_socket_create(int family, int port)
static constexpr auto SRTP_OVERHEAD
static constexpr unsigned MINIMUM_RTP_HEADER_SIZE
void strErr()
Thread-safe function to print the stringified contents of errno.
static constexpr int RTP_MAX_PACKET_LENGTH
static constexpr uint32_t RTCP_RR_FRACTION_MASK
static constexpr int NET_POLL_TIMEOUT
int ff_srtp_encrypt(struct SRTPContext *s, const uint8_t *in, int len, uint8_t *out, int outlen)
int ff_srtp_set_crypto(struct SRTPContext *s, const char *suite, const char *params)
#define RTP_PT_IS_RTCP(x)
int ff_srtp_decrypt(struct SRTPContext *s, uint8_t *buf, int *lenptr)
void ff_srtp_free(struct SRTPContext *s)