Ring Daemon
Loading...
Searching...
No Matches
channeled_transport.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2026 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17
18#include "channeled_transport.h"
19
20#include "logger.h"
21#include <dhtnet/multiplexed_socket.h>
23
24#include <pjsip/sip_transport.h>
25#include <pjsip/sip_endpoint.h>
26#include <pj/compat/socket.h>
27#include <pj/lock.h>
28
29namespace jami {
30namespace tls {
31
33 const std::shared_ptr<dhtnet::ChannelSocket>& socket,
35 : socket_(socket)
36 , shutdownCb_(std::move(cb))
37 , trData_()
38 , pool_ {nullptr}
39 , rxPool_(nullptr)
40{
41 local_ = socket->getLocalAddress();
42 remote_ = socket->getRemoteAddress();
43 int tp_type = local_.isIpv6() ? PJSIP_TRANSPORT_TLS6 : PJSIP_TRANSPORT_TLS;
44
45 JAMI_LOG("ChanneledSIPTransport@{} tr={}", fmt::ptr(this), fmt::ptr(&trData_.base));
46
47 // Init memory
48 trData_.self = this; // up-link for PJSIP callbacks
49
51
52 auto& base = trData_.base;
53 std::memset(&base, 0, sizeof(base));
54
55 pj_ansi_snprintf(base.obj_name, PJ_MAX_OBJ_NAME, "chan%p", &base);
56 base.endpt = endpt;
57 base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
58 base.pool = pool_.get();
59
60 if (pj_atomic_create(pool_.get(), 0, &base.ref_cnt) != PJ_SUCCESS)
61 throw std::runtime_error("Unable to create PJSIP atomic.");
62
63 if (pj_lock_create_recursive_mutex(pool_.get(), "chan", &base.lock) != PJ_SUCCESS)
64 throw std::runtime_error("Unable to create PJSIP mutex.");
65
66 if (not local_) {
67 JAMI_ERROR("Invalid local address");
68 throw std::runtime_error("Invalid local address");
69 }
70 if (not remote_) {
71 JAMI_ERROR("Invalid remote address");
72 throw std::runtime_error("Invalid remote address");
73 }
74
75 pj_sockaddr_cp(&base.key.rem_addr, remote_.pjPtr());
76 base.key.type = tp_type;
77 auto reg_type = static_cast<pjsip_transport_type_e>(tp_type);
78 base.type_name = const_cast<char*>(pjsip_transport_get_type_name(reg_type));
80 base.info = static_cast<char*>(pj_pool_alloc(pool_.get(), sip_utils::TRANSPORT_INFO_LENGTH));
81
82 auto remote_addr = remote_.toString();
83 pj_ansi_snprintf(base.info, sip_utils::TRANSPORT_INFO_LENGTH, "%s to %s", base.type_name, remote_addr.c_str());
84 base.addr_len = remote_.getLength();
85 base.dir = PJSIP_TP_DIR_NONE;
86
87 // Set initial local address
88 pj_sockaddr_cp(&base.local_addr, local_.pjPtr());
89
90 sip_utils::sockaddr_to_host_port(pool_.get(), &base.local_name, &base.local_addr);
91 sip_utils::sockaddr_to_host_port(pool_.get(), &base.remote_name, remote_.pjPtr());
92
93 // Init transport callbacks
94 base.send_msg = [](pjsip_transport* transport,
97 int addr_len,
98 void* token,
100 auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(reinterpret_cast<TransportData*>(transport)->self);
101 return this_->send(tdata, rem_addr, addr_len, token, callback);
102 };
103 base.do_shutdown = [](pjsip_transport* transport) -> pj_status_t {
104 auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(reinterpret_cast<TransportData*>(transport)->self);
105 JAMI_LOG("ChanneledSIPTransport@{} tr={} rc={:d}: shutdown",
106 fmt::ptr(this_),
107 fmt::ptr(transport),
108 pj_atomic_get(transport->ref_cnt));
109 if (this_->socket_)
110 this_->socket_->shutdown();
111 return PJ_SUCCESS;
112 };
113 base.destroy = [](pjsip_transport* transport) -> pj_status_t {
114 auto* this_ = reinterpret_cast<ChanneledSIPTransport*>(reinterpret_cast<TransportData*>(transport)->self);
115 delete this_;
116 return PJ_SUCCESS;
117 };
118
119 // Init rdata_
120 std::memset(&rdata_, 0, sizeof(pjsip_rx_data));
122 rdata_.tp_info.pool = rxPool_.get();
123 rdata_.tp_info.transport = &base;
124 rdata_.tp_info.tp_data = this;
125 rdata_.tp_info.op_key.rdata = &rdata_;
126 pj_ioqueue_op_key_init(&rdata_.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t));
127 rdata_.pkt_info.src_addr = base.key.rem_addr;
128 rdata_.pkt_info.src_addr_len = sizeof(rdata_.pkt_info.src_addr);
129 auto rem_addr = &base.key.rem_addr;
130 pj_sockaddr_print(rem_addr, rdata_.pkt_info.src_name, sizeof(rdata_.pkt_info.src_name), 0);
131 rdata_.pkt_info.src_port = pj_sockaddr_get_port(rem_addr);
132
133 // Register callbacks
134 if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
135 throw std::runtime_error("Unable to register PJSIP transport.");
136}
137
138void
140{
141 // Link to Channel Socket
142 socket_->setOnRecv([this](const uint8_t* buf, size_t len) {
143 pj_gettimeofday(&rdata_.pkt_info.timestamp);
144 size_t remaining {len};
145 while (remaining) {
146 // Build rdata
147 size_t added = std::min(remaining, (size_t) PJSIP_MAX_PKT_LEN - (size_t) rdata_.pkt_info.len);
148 std::copy_n(buf, added, rdata_.pkt_info.packet + rdata_.pkt_info.len);
149 rdata_.pkt_info.len += added;
150 buf += added;
151 remaining -= added;
152
153 // Consume packet
154 auto eaten = pjsip_tpmgr_receive_packet(trData_.base.tpmgr, &rdata_);
155 if (eaten == rdata_.pkt_info.len) {
156 rdata_.pkt_info.len = 0;
157 } else if (eaten > 0) {
158 memmove(rdata_.pkt_info.packet, rdata_.pkt_info.packet + eaten, eaten);
159 rdata_.pkt_info.len -= eaten;
160 }
161 pj_pool_reset(rdata_.tp_info.pool);
162 }
163 return len;
164 });
165 socket_->onShutdown([this](const std::error_code& ec) {
166 disconnected_ = true;
167 if (auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr)) {
168 // JAMI_LOG("[SIPS] process disconnect event");
170 std::memset(&state_info, 0, sizeof(state_info));
171 state_info.status = ec ? PJ_STATUS_FROM_OS(ec.value()) : PJ_SUCCESS;
172 (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
173 }
174 shutdownCb_();
175 });
176}
177
179{
180 auto base = getTransportBase();
181
182 // Here, we reset callbacks in ChannelSocket to avoid to call it after destruction
183 // ChanneledSIPTransport is managed by pjsip, so we don't have any weak_ptr available
184 socket_->setOnRecv([](const uint8_t*, size_t len) { return len; });
185 socket_->onShutdown([](const std::error_code&) {});
186 // Stop low-level transport first
187 socket_->shutdown();
188 socket_.reset();
189
190 // If delete not trigged by pjsip_transport_destroy (happen if objet not given to pjsip)
191 if (not base->is_shutdown and not base->is_destroying)
193
194 pj_lock_destroy(base->lock);
195 pj_atomic_destroy(base->ref_cnt);
196 JAMI_LOG("~ChanneledSIPTransport@{} tr={}", fmt::ptr(this), fmt::ptr(&trData_.base));
197}
198
200ChanneledSIPTransport::send(
202{
203 // Sanity check
205
206 // Check that there's no pending operation associated with the tdata
207 PJ_ASSERT_RETURN(tdata->op_key.tdata == nullptr, PJSIP_EPENDINGTX);
208
209 // Check the address is supported
211 PJ_EINVAL);
212
213 // Check in we are able to send it in synchronous way first
214 std::size_t size = tdata->buf.cur - tdata->buf.start;
215 if (socket_) {
216 std::error_code ec;
217 socket_->write(reinterpret_cast<const uint8_t*>(tdata->buf.start), size, ec);
218 if (!ec)
219 return PJ_SUCCESS;
220 }
221 return PJ_EINVAL;
222}
223
224} // namespace tls
225} // namespace jami
ChanneledSIPTransport(pjsip_endpoint *endpt, const std::shared_ptr< dhtnet::ChannelSocket > &socket, onShutdownCb &&cb)
void start()
Connect callbacks for channeled socket, must be done when the channel is ready to be used.
pjsip_transport * getTransportBase() override
#define JAMI_ERROR(formatstr,...)
Definition logger.h:243
#define JAMI_LOG(formatstr,...)
Definition logger.h:237
static PoolPtr smart_alloc_pool(pjsip_endpoint *endpt, const char *const name, pj_size_t initial, pj_size_t inc)
Definition sip_utils.h:177
static constexpr int POOL_TP_INIT
Definition sip_utils.h:187
static constexpr int TRANSPORT_INFO_LENGTH
Definition sip_utils.h:189
static constexpr int POOL_TP_INC
Definition sip_utils.h:188
void sockaddr_to_host_port(pj_pool_t *pool, pjsip_host_port *host_port, const pj_sockaddr *addr)
void emitSignal(Args... args)
Definition jami_signal.h:64
std::function< void(void)> onShutdownCb