Ring Daemon 16.0.0
Loading...
Searching...
No Matches
channeled_transport.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "channeled_transport.h"
19
20#include "logger.h"
21#include <dhtnet/multiplexed_socket.h>
23
24#include <pjsip/sip_transport.h>
25#include <pjsip/sip_endpoint.h>
26#include <pj/compat/socket.h>
27#include <pj/lock.h>
28
29namespace jami {
30namespace tls {
31
33 const std::shared_ptr<dhtnet::ChannelSocket>& socket,
35 : socket_(socket)
36 , shutdownCb_(std::move(cb))
37 , trData_()
38 , pool_ {nullptr, pj_pool_release}
39 , rxPool_(nullptr, pj_pool_release)
40{
41 local_ = socket->getLocalAddress();
42 remote_ = socket->getRemoteAddress();
43 int tp_type = local_.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS;
44
45 JAMI_LOG("ChanneledSIPTransport@{} tr={}", fmt::ptr(this), fmt::ptr(&trData_.base));
46
47 // Init memory
48 trData_.self = this; // up-link for PJSIP callbacks
49
51 "channeled.pool",
54
55 auto& base = trData_.base;
56 std::memset(&base, 0, sizeof(base));
57
58 pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "chan%p", &base);
59 base.endpt = endpt;
60 base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
61 base.pool = pool_.get();
62
63 if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS)
64 throw std::runtime_error("Unable to create PJSIP atomic.");
65
66 if (pj_lock_create_recursive_mutex(pool_.get(), "chan", &base.lock) != PJ_SUCCESS)
67 throw std::runtime_error("Unable to create PJSIP mutex.");
68
69 if (not local_) {
70 JAMI_ERROR("Invalid local address");
71 throw std::runtime_error("Invalid local address");
72 }
73 if (not remote_) {
74 JAMI_ERROR("Invalid remote address");
75 throw std::runtime_error("Invalid remote address");
76 }
77
78 pj_sockaddr_cp(&base.key.rem_addr, remote_.pjPtr());
79 base.key.type = tp_type;
80 auto reg_type = static_cast<pjsip_transport_type_e>(tp_type);
81 base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type));
83 base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH));
84
85 auto remote_addr = remote_.toString();
86 pj_ansi_snprintf(base.info,
88 "%s to %s",
89 base.type_name,
90 remote_addr.c_str());
91 base.addr_len = remote_.getLength();
92 base.dir = PJSIP_TP_DIR_NONE;
93
94 // Set initial local address
95 pj_sockaddr_cp(&base.local_addr, local_.pjPtr());
96
97 sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr);
98 sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr());
99
100 // Init transport callbacks
101 base.send_msg = [](pjsip_transport* transport,
103 const pj_sockaddr_t* rem_addr,
104 int addr_len,
105 void* token,
107 auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
108 reinterpret_cast<TransportData*>(transport)->self);
109 return this_->send(tdata, rem_addr, addr_len, token, callback);
110 };
111 base.do_shutdown = [](pjsip_transport* transport) -> pj_status_t {
112 auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
113 reinterpret_cast<TransportData*>(transport)->self);
114 JAMI_LOG("ChanneledSIPTransport@{} tr={} rc={:d}: shutdown",
115 fmt::ptr(this_),
116 fmt::ptr(transport),
117 pj_atomic_get(transport->ref_cnt));
118 if (this_->socket_)
119 this_->socket_->shutdown();
120 return PJ_SUCCESS;
121 };
122 base.destroy = [](pjsip_transport* transport) -> pj_status_t {
123 auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(
124 reinterpret_cast<TransportData*>(transport)->self);
125 delete this_;
126 return PJ_SUCCESS;
127 };
128
129 // Init rdata_
130 std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
132 "channeled.rxPool",
135 rdata_.tp_info.pool = rxPool_.get();
136 rdata_.tp_info.transport = &base;
137 rdata_.tp_info.tp_data = this;
138 rdata_.tp_info.op_key.rdata = &rdata_;
139 pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t));
140 rdata_.pkt_info.src_addr = base.key.rem_addr;
141 rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr);
142 auto rem_addr = &base.key.rem_addr;
143 pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, sizeof(rdata_.pkt_info.src_name), 0);
144 rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
145
146 // Register callbacks
147 if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
148 throw std::runtime_error("Unable to register PJSIP transport.");
149}
150
151void
153{
154 // Link to Channel Socket
155 socket_->setOnRecv([this](const uint8_t* buf, size_t len) {
156 pj_gettimeofday(&rdata_.pkt_info.timestamp);
157 size_t remaining {len};
158 while (remaining) {
159 // Build rdata
160 size_t added = std::min(remaining,
161 (size_t) PJSIP_MAX_PKT_LEN - (size_t) rdata_.pkt_info.len);
162 std::copy_n(buf, added, rdata_.pkt_info.packet + rdata_.pkt_info.len);
163 rdata_.pkt_info.len += added;
164 buf += added;
165 remaining -= added;
166
167 // Consume packet
168 auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
169 if (eaten == rdata_.pkt_info.len) {
170 rdata_.pkt_info.len = 0;
171 } else if (eaten > 0) {
172 memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, eaten);
173 rdata_.pkt_info.len -= eaten;
174 }
175 pj_pool_reset(rdata_.tp_info.pool);
176 }
177 return len;
178 });
179 socket_->onShutdown([this] {
180 disconnected_ = true;
181 if (auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr)) {
182 //JAMI_LOG("[SIPS] process disconnect event");
184 std::memset(&state_info, 0, sizeof(state_info));
185 state_info.status = PJ_SUCCESS;
186 (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
187 }
188 shutdownCb_();
189 });
190}
191
193{
194 auto base = getTransportBase();
195
196 // Here, we reset callbacks in ChannelSocket to avoid to call it after destruction
197 // ChanneledSIPTransport is managed by pjsip, so we don't have any weak_ptr available
198 socket_->setOnRecv([](const uint8_t*, size_t len) { return len; });
199 socket_->onShutdown([]() {});
200 // Stop low-level transport first
201 socket_->shutdown();
202 socket_.reset();
203
204 // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
205 if (not base->is_shutdown and not base->is_destroying)
207
208 pj_lock_destroy(base->lock);
209 pj_atomic_destroy(base->ref_cnt);
210 JAMI_LOG("~ChanneledSIPTransport@{} tr={}", fmt::ptr(this), fmt::ptr(&trData_.base));
211}
212
214ChanneledSIPTransport::send(pjsip_tx_data* tdata,
215 const pj_sockaddr_t* rem_addr,
216 int addr_len,
217 void*,
219{
220 // Sanity check
222
223 // Check that there's no pending operation associated with the tdata
224 PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX);
225
226 // Check the address is supported
228 and (addr_len == sizeof(pj_sockaddr_in)
229 or addr_len == sizeof(pj_sockaddr_in6)),
230 PJ_EINVAL);
231
232 // Check in we are able to send it in synchronous way first
233 std::size_t size = tdata->buf.cur - tdata->buf.start;
234 if (socket_) {
235 std::error_code ec;
236 socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec);
237 if (!ec)
238 return PJ_SUCCESS;
239 }
240 return PJ_EINVAL;
241}
242
243} // namespace tls
244} // namespace jami
ChanneledSIPTransport(pjsip_endpoint *endpt, const std::shared_ptr< dhtnet::ChannelSocket > &socket, onShutdownCb &&cb)
void start()
Connect callbacks for channeled socket, must be done when the channel is ready to be used.
pjsip_transport * getTransportBase() override
#define JAMI_ERROR(formatstr,...)
Definition logger.h:228
#define JAMI_LOG(formatstr,...)
Definition logger.h:225
static std::unique_ptr< pj_pool_t, decltype(pj_pool_release)& > smart_alloc_pool(pjsip_endpoint *endpt, const char *const name, pj_size_t initial, pj_size_t inc)
Definition sip_utils.h:173
static constexpr int POOL_TP_INIT
Definition sip_utils.h:183
static constexpr int TRANSPORT_INFO_LENGTH
Definition sip_utils.h:185
static constexpr int POOL_TP_INC
Definition sip_utils.h:184
void sockaddr_to_host_port(pj_pool_t *pool, pjsip_host_port *host_port, const pj_sockaddr *addr)
void emitSignal(Args... args)
Definition ring_signal.h:64
std::function< void(void)> onShutdownCb