/* $Id$ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #define THIS_FILE "sip_transport_udp.c" /** * These are the target values for socket send and receive buffer sizes, * respectively. They will be applied to UDP socket with setsockopt(). * When transport failed to set these size, it will decrease it until * sufficiently large number has been successfully set. * * The buffer size is important, especially in WinXP/2000 machines. * Basicly the lower the size, the more packets will be lost (dropped?) * when we're sending (receiving?) packets in large volumes. * * The figure here is taken based on my experiment on WinXP/2000 machine, * and with this value, the rate of dropped packet is about 8% when * sending 1800 requests simultaneously (percentage taken as average * after 50K requests or so). * * More experiments are needed probably. */ /* 2010/01/14 * Too many people complained about seeing "Error setting SNDBUF" log, * so lets just remove this. People who want to have SNDBUF set can * still do so by declaring these two macros in config_site.h */ #ifndef PJSIP_UDP_SO_SNDBUF_SIZE /*# define PJSIP_UDP_SO_SNDBUF_SIZE (24*1024*1024)*/ # define PJSIP_UDP_SO_SNDBUF_SIZE 0 #endif #ifndef PJSIP_UDP_SO_RCVBUF_SIZE /*# define PJSIP_UDP_SO_RCVBUF_SIZE (24*1024*1024)*/ # define PJSIP_UDP_SO_RCVBUF_SIZE 0 #endif /* Struct udp_transport "inherits" struct pjsip_transport */ struct udp_transport { pjsip_transport base; pj_sock_t sock; pj_ioqueue_key_t *key; int rdata_cnt; pjsip_rx_data **rdata; int is_closing; pj_bool_t is_paused; int read_loop_spin; /* Group lock to be used by UDP transport and ioqueue key */ pj_grp_lock_t *grp_lock; }; /* * Initialize transport's receive buffer from the specified pool. */ static void init_rdata(struct udp_transport *tp, unsigned rdata_index, pj_pool_t *pool, pjsip_rx_data **p_rdata) { pjsip_rx_data *rdata; /* Reset pool. */ //note: already done by caller //pj_pool_reset(pool); rdata = PJ_POOL_ZALLOC_T(pool, pjsip_rx_data); /* Init tp_info part. */ rdata->tp_info.pool = pool; rdata->tp_info.transport = &tp->base; rdata->tp_info.tp_data = (void*)(pj_ssize_t)rdata_index; rdata->tp_info.op_key.rdata = rdata; pj_ioqueue_op_key_init(&rdata->tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t)); tp->rdata[rdata_index] = rdata; if (p_rdata) *p_rdata = rdata; } /* * udp_on_read_complete() * * This is callback notification from ioqueue that a pending recvfrom() * operation has completed. */ static void udp_on_read_complete( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read) { /* See https://trac.pjsip.org/repos/ticket/1197 */ enum { MAX_IMMEDIATE_PACKET = 50 }; pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; pjsip_rx_data *rdata = rdata_op_key->rdata; struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport; int i; pj_status_t status; ++tp->read_loop_spin; /* Don't do anything if transport is closing. */ if (tp->is_closing) { tp->is_closing++; goto on_return; } /* Don't do anything if transport is being paused. */ if (tp->is_paused) goto on_return; if (-bytes_read == PJ_ESOCKETSTOP) { --tp->read_loop_spin; /* Try to recover by restarting the transport. */ PJ_LOG(4,(tp->base.obj_name, "Restarting SIP UDP transport")); status = pjsip_udp_transport_restart2( &tp->base, PJSIP_UDP_TRANSPORT_DESTROY_SOCKET, PJ_INVALID_SOCKET, &tp->base.local_addr, &tp->base.local_name); if (status != PJ_SUCCESS) { PJ_PERROR(1,(THIS_FILE, status, "Error restarting SIP UDP transport")); } return; } /* * The idea of the loop is to process immediate data received by * pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When * i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to * complete asynchronously, to allow other sockets to get their data. */ for (i=0;; ++i) { enum { MIN_SIZE = 32 }; pj_uint32_t flags; /* Report the packet to transport manager. Only do so if packet size * is relatively big enough for a SIP packet. */ if (bytes_read > MIN_SIZE) { pj_ssize_t size_eaten; const pj_sockaddr *src_addr = &rdata->pkt_info.src_addr; /* Init pkt_info part. */ rdata->pkt_info.len = bytes_read; rdata->pkt_info.zero = 0; pj_gettimeofday(&rdata->pkt_info.timestamp); pj_sockaddr_print(src_addr, rdata->pkt_info.src_name, sizeof(rdata->pkt_info.src_name), 0); rdata->pkt_info.src_port = pj_sockaddr_get_port(src_addr); size_eaten = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata); if (size_eaten < 0) { pj_assert(!"It shouldn't happen!"); size_eaten = rdata->pkt_info.len; } /* Since this is UDP, the whole buffer is the message. */ rdata->pkt_info.len = 0; } else if (bytes_read <= MIN_SIZE) { /* TODO: */ } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { /* Report error to endpoint. */ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, rdata->tp_info.transport->obj_name, (pj_status_t)-bytes_read, "Warning: pj_ioqueue_recvfrom()" " callback error")); } if (i >= MAX_IMMEDIATE_PACKET) { /* Force ioqueue_recvfrom() to return PJ_EPENDING */ flags = PJ_IOQUEUE_ALWAYS_ASYNC; } else { flags = 0; } /* Reset pool. * Need to copy rdata fields to temp variable because they will * be invalid after pj_pool_reset(). */ { pj_pool_t *rdata_pool = rdata->tp_info.pool; struct udp_transport *rdata_tp ; unsigned rdata_index; rdata_tp = (struct udp_transport*)rdata->tp_info.transport; rdata_index = (unsigned)(unsigned long)(pj_ssize_t) rdata->tp_info.tp_data; pj_pool_reset(rdata_pool); init_rdata(rdata_tp, rdata_index, rdata_pool, &rdata); /* Change some vars to point to new location after * pool reset. */ op_key = &rdata->tp_info.op_key.op_key; } /* Only read next packet if transport is not being paused. This * check handles the case where transport is paused while endpoint * is still processing a SIP message. */ if (tp->is_paused) break; /* Read next packet. */ bytes_read = sizeof(rdata->pkt_info.packet); rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr); status = pj_ioqueue_recvfrom(key, op_key, rdata->pkt_info.packet, &bytes_read, flags, &rdata->pkt_info.src_addr, &rdata->pkt_info.src_addr_len); if (status == PJ_SUCCESS) { /* Continue loop. */ pj_assert(i < MAX_IMMEDIATE_PACKET); } else if (status == PJ_EPENDING) { break; } else if (status == PJ_ECANCELLED) { /* Socket is closing, quit loop */ break; } else { if (i < MAX_IMMEDIATE_PACKET) { /* Report error to endpoint if this is not EWOULDBLOCK error.*/ if (status != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && status != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && status != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, rdata->tp_info.transport->obj_name, status, "Warning: pj_ioqueue_recvfrom")); } /* Continue loop. */ bytes_read = 0; } else { /* This is fatal error. * Ioqueue operation will stop for this transport! */ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, rdata->tp_info.transport->obj_name, status, "FATAL: pj_ioqueue_recvfrom() error, " "UDP transport stopping! Error")); break; } } } on_return: --tp->read_loop_spin; } /* * udp_on_write_complete() * * This is callback notification from ioqueue that a pending sendto() * operation has completed. */ static void udp_on_write_complete( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent) { struct udp_transport *tp = (struct udp_transport*) pj_ioqueue_get_user_data(key); pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; tdata_op_key->tdata = NULL; if (-bytes_sent == PJ_ESOCKETSTOP) { pj_status_t status; /* Try to recover by restarting the transport. */ PJ_LOG(4,(tp->base.obj_name, "Restarting SIP UDP transport")); status = pjsip_udp_transport_restart2( &tp->base, PJSIP_UDP_TRANSPORT_DESTROY_SOCKET, PJ_INVALID_SOCKET, &tp->base.local_addr, &tp->base.local_name); if (status != PJ_SUCCESS) { PJ_PERROR(1,(THIS_FILE, status, "Error restarting SIP UDP transport")); } return; } if (tdata_op_key->callback) { tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent); } } /* * udp_send_msg() * * This function is called by transport manager (by transport->send_msg()) * to send outgoing message. */ static pj_status_t udp_send_msg( pjsip_transport *transport, pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, int addr_len, void *token, pjsip_transport_callback callback) { struct udp_transport *tp = (struct udp_transport*)transport; pj_ssize_t size; pj_status_t status; PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL); PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX); /* Return error if transport is paused */ if (tp->is_paused) return PJSIP_ETPNOTAVAIL; /* Init op key. */ tdata->op_key.tdata = tdata; tdata->op_key.token = token; tdata->op_key.callback = callback; /* Send to ioqueue! */ size = tdata->buf.cur - tdata->buf.start; status = pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key, tdata->buf.start, &size, 0, rem_addr, addr_len); if (status != PJ_EPENDING) { if (status == PJ_ESOCKETSTOP) { /* Try to recover by restarting the transport. */ PJ_LOG(4,(tp->base.obj_name, "Restarting SIP UDP transport")); status = pjsip_udp_transport_restart2( &tp->base, PJSIP_UDP_TRANSPORT_DESTROY_SOCKET, PJ_INVALID_SOCKET, &tp->base.local_addr, &tp->base.local_name); if (status != PJ_SUCCESS) { PJ_PERROR(1,(THIS_FILE, status, "Error restarting SIP UDP transport")); } } tdata->op_key.tdata = NULL; } return status; } /* Clean up UDP resources */ static void udp_on_destroy(void *arg) { struct udp_transport *tp = (struct udp_transport*)arg; int i; /* Destroy rdata */ for (i=0; irdata_cnt; ++i) { pj_pool_release(tp->rdata[i]->tp_info.pool); } /* Destroy reference counter. */ if (tp->base.ref_cnt) pj_atomic_destroy(tp->base.ref_cnt); /* Destroy lock */ if (tp->base.lock) pj_lock_destroy(tp->base.lock); PJ_LOG(4,(tp->base.obj_name, "SIP UDP transport destroyed")); /* Destroy pool. */ pjsip_endpt_release_pool(tp->base.endpt, tp->base.pool); } /* * udp_destroy() * * This function is called by transport manager (by transport->destroy()). */ static pj_status_t udp_destroy( pjsip_transport *transport ) { struct udp_transport *tp = (struct udp_transport*)transport; int i; /* Mark this transport as closing. */ tp->is_closing = 1; /* Cancel all pending operations. */ /* blp: NO NO NO... * No need to post queued completion as we poll the ioqueue until * we've got events anyway. Posting completion will only cause * callback to be called twice with IOCP: one for the post completion * and another one for closing the socket. * for (i=0; irdata_cnt; ++i) { pj_ioqueue_post_completion(tp->key, &tp->rdata[i]->tp_info.op_key.op_key, -1); } */ /* Unregister from ioqueue. */ if (tp->key) { pj_ioqueue_unregister(tp->key); tp->key = NULL; } else { /* Close socket. */ if (tp->sock && tp->sock != PJ_INVALID_SOCKET) { pj_sock_close(tp->sock); tp->sock = PJ_INVALID_SOCKET; } } /* Must poll ioqueue because IOCP calls the callback when socket * is closed. We poll the ioqueue until all pending callbacks * have been called. */ for (i=0; i<50 && tp->is_closing < 1+tp->rdata_cnt; ++i) { int cnt; pj_time_val timeout = {0, 1}; cnt = pj_ioqueue_poll(pjsip_endpt_get_ioqueue(transport->endpt), &timeout); if (cnt == 0) break; } /* When creating this transport, reference count was incremented to flag * this transport as permanent so it will not be destroyed by transport * manager whenever idle. Application may or may not have cleared the * flag (by calling pjsip_transport_dec_ref()), so in case it has not, * let's do it now, so this transport can be destroyed. */ if (pj_atomic_get(tp->base.ref_cnt) > 0) pjsip_transport_dec_ref(&tp->base); /* Destroy transport */ if (tp->grp_lock) { pj_grp_lock_t *grp_lock = tp->grp_lock; tp->grp_lock = NULL; pj_grp_lock_dec_ref(grp_lock); /* Transport may have been deleted at this point */ } else { udp_on_destroy(tp); } return PJ_SUCCESS; } /* * udp_shutdown() * * Start graceful UDP shutdown. */ static pj_status_t udp_shutdown(pjsip_transport *transport) { return pjsip_transport_dec_ref(transport); } /* Create socket */ static pj_status_t create_socket(int af, const pj_sockaddr_t *local_a, int addr_len, pj_sock_t *p_sock) { pj_sock_t sock; pj_sockaddr_in tmp_addr; pj_sockaddr_in6 tmp_addr6; pj_status_t status; status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &sock); if (status != PJ_SUCCESS) return status; if (local_a == NULL) { if (af == pj_AF_INET6()) { pj_bzero(&tmp_addr6, sizeof(tmp_addr6)); tmp_addr6.sin6_family = (pj_uint16_t)af; local_a = &tmp_addr6; addr_len = sizeof(tmp_addr6); } else { pj_sockaddr_in_init(&tmp_addr, NULL, 0); local_a = &tmp_addr; addr_len = sizeof(tmp_addr); } } status = pj_sock_bind(sock, local_a, addr_len); if (status != PJ_SUCCESS) { pj_sock_close(sock); return status; } *p_sock = sock; return PJ_SUCCESS; } /* Generate transport's published address */ static pj_status_t get_published_name(pj_sock_t sock, char hostbuf[], int hostbufsz, pjsip_host_port *bound_name) { pj_sockaddr tmp_addr; int addr_len; pj_status_t status; addr_len = sizeof(tmp_addr); status = pj_sock_getsockname(sock, &tmp_addr, &addr_len); if (status != PJ_SUCCESS) return status; bound_name->host.ptr = hostbuf; if (tmp_addr.addr.sa_family == pj_AF_INET()) { bound_name->port = pj_ntohs(tmp_addr.ipv4.sin_port); /* If bound address specifies "0.0.0.0", get the IP address * of local hostname. */ if (tmp_addr.ipv4.sin_addr.s_addr == PJ_INADDR_ANY) { pj_sockaddr hostip; status = pj_gethostip(pj_AF_INET(), &hostip); if (status != PJ_SUCCESS) return status; status = pj_inet_ntop(pj_AF_INET(), &hostip.ipv4.sin_addr, hostbuf, hostbufsz); } else { /* Otherwise use bound address. */ status = pj_inet_ntop(pj_AF_INET(), &tmp_addr.ipv4.sin_addr, hostbuf, hostbufsz); } } else { /* If bound address specifies "INADDR_ANY" (IPv6), get the * IP address of local hostname */ pj_uint32_t loop6[4] = { 0, 0, 0, 0}; bound_name->port = pj_ntohs(tmp_addr.ipv6.sin6_port); if (pj_memcmp(&tmp_addr.ipv6.sin6_addr, loop6, sizeof(loop6))==0) { status = pj_gethostip(tmp_addr.addr.sa_family, &tmp_addr); if (status != PJ_SUCCESS) return status; } status = pj_inet_ntop(tmp_addr.addr.sa_family, pj_sockaddr_get_addr(&tmp_addr), hostbuf, hostbufsz); } if (status == PJ_SUCCESS) { bound_name->host.slen = pj_ansi_strlen(hostbuf); } return status; } /* Set the published address of the transport */ static void udp_set_pub_name(struct udp_transport *tp, const pjsip_host_port *a_name) { enum { INFO_LEN = 80 }; char local_addr[PJ_INET6_ADDRSTRLEN+10]; char pub_addr[PJ_INET6_ADDRSTRLEN+10]; pj_assert(a_name->host.slen != 0); if (pj_strcmp(&tp->base.local_name.host, &a_name->host) == 0 && tp->base.local_name.port == a_name->port) { return; } pj_strdup_with_null(tp->base.pool, &tp->base.local_name.host, &a_name->host); tp->base.local_name.port = a_name->port; /* Update transport info. */ if (tp->base.info == NULL) { tp->base.info = (char*) pj_pool_alloc(tp->base.pool, INFO_LEN); } pj_sockaddr_print(&tp->base.local_addr, local_addr, sizeof(local_addr), 3); pj_addr_str_print(&tp->base.local_name.host, tp->base.local_name.port, pub_addr, sizeof(pub_addr), 1), pj_ansi_snprintf( tp->base.info, INFO_LEN, "udp %s [published as %s]", local_addr, pub_addr); } /* Set the socket handle of the transport */ static void udp_set_socket(struct udp_transport *tp, pj_sock_t sock, const pjsip_host_port *a_name) { #if PJSIP_UDP_SO_RCVBUF_SIZE || PJSIP_UDP_SO_SNDBUF_SIZE long sobuf_size; pj_status_t status; #endif /* Adjust socket rcvbuf size */ #if PJSIP_UDP_SO_RCVBUF_SIZE sobuf_size = PJSIP_UDP_SO_RCVBUF_SIZE; status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_RCVBUF(), &sobuf_size, sizeof(sobuf_size)); if (status != PJ_SUCCESS) { PJ_PERROR(4,(THIS_FILE, status, "Error setting SO_RCVBUF")); } #endif /* Adjust socket sndbuf size */ #if PJSIP_UDP_SO_SNDBUF_SIZE sobuf_size = PJSIP_UDP_SO_SNDBUF_SIZE; status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_SNDBUF(), &sobuf_size, sizeof(sobuf_size)); if (status != PJ_SUCCESS) { PJ_PERROR(4,(THIS_FILE, status, "Error setting SO_SNDBUF")); } #endif /* Set the socket. */ tp->sock = sock; /* Init address name (published address) */ udp_set_pub_name(tp, a_name); } /* Register socket to ioqueue */ static pj_status_t register_to_ioqueue(struct udp_transport *tp) { pj_ioqueue_t *ioqueue; pj_ioqueue_callback ioqueue_cb; pj_status_t status; /* Ignore if already registered */ if (tp->key != NULL) return PJ_SUCCESS; /* Create group lock if not yet (don't need to do so on UDP restart) */ if (!tp->grp_lock) { status = pj_grp_lock_create(tp->base.pool, NULL, &tp->grp_lock); if (status != PJ_SUCCESS) return status; pj_grp_lock_add_ref(tp->grp_lock); pj_grp_lock_add_handler(tp->grp_lock, tp->base.pool, tp, &udp_on_destroy); tp->base.grp_lock = tp->grp_lock; } /* Register to ioqueue. */ ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt); pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); ioqueue_cb.on_read_complete = &udp_on_read_complete; ioqueue_cb.on_write_complete = &udp_on_write_complete; return pj_ioqueue_register_sock2(tp->base.pool, ioqueue, tp->sock, tp->grp_lock, tp, &ioqueue_cb, &tp->key); } /* Start ioqueue asynchronous reading to all rdata */ static pj_status_t start_async_read(struct udp_transport *tp) { int i; pj_status_t status; /* Start reading the ioqueue. */ for (i=0; irdata_cnt; ++i) { pj_ssize_t size; size = sizeof(tp->rdata[i]->pkt_info.packet); tp->rdata[i]->pkt_info.src_addr_len = sizeof(tp->rdata[i]->pkt_info.src_addr); status = pj_ioqueue_recvfrom(tp->key, &tp->rdata[i]->tp_info.op_key.op_key, tp->rdata[i]->pkt_info.packet, &size, PJ_IOQUEUE_ALWAYS_ASYNC, &tp->rdata[i]->pkt_info.src_addr, &tp->rdata[i]->pkt_info.src_addr_len); if (status == PJ_SUCCESS) { pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!"); udp_on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key, size); } else if (status != PJ_EPENDING) { /* Error! */ return status; } } return PJ_SUCCESS; } /* * pjsip_udp_transport_attach() * * Attach UDP socket and start transport. */ static pj_status_t transport_attach( pjsip_endpoint *endpt, pjsip_transport_type_e type, pj_sock_t sock, const pjsip_host_port *a_name, unsigned async_cnt, pjsip_transport **p_transport) { pj_pool_t *pool; struct udp_transport *tp; const char *format, *ipv6_quoteb = "", *ipv6_quotee = ""; unsigned i; pj_status_t status; PJ_ASSERT_RETURN(endpt && sock!=PJ_INVALID_SOCKET && a_name && async_cnt>0, PJ_EINVAL); /* Object name. */ if (type & PJSIP_TRANSPORT_IPV6) { pj_in6_addr dummy6; format = "udpv6%p"; /* We don't need to add quote if the transport type is IPv6, but * actually translated to IPv4. */ if (pj_inet_pton(pj_AF_INET6(), &a_name->host, &dummy6)==PJ_SUCCESS) { ipv6_quoteb = "["; ipv6_quotee = "]"; } } else { format = "udp%p"; } /* Create pool. */ pool = pjsip_endpt_create_pool(endpt, format, PJSIP_POOL_LEN_TRANSPORT, PJSIP_POOL_INC_TRANSPORT); if (!pool) return PJ_ENOMEM; /* Create the UDP transport object. */ tp = PJ_POOL_ZALLOC_T(pool, struct udp_transport); /* Save pool. */ tp->base.pool = pool; pj_memcpy(tp->base.obj_name, pool->obj_name, PJ_MAX_OBJ_NAME); /* Init reference counter. */ status = pj_atomic_create(pool, 0, &tp->base.ref_cnt); if (status != PJ_SUCCESS) goto on_error; /* Init lock. */ status = pj_lock_create_recursive_mutex(pool, pool->obj_name, &tp->base.lock); if (status != PJ_SUCCESS) goto on_error; /* Set type. */ tp->base.key.type = type; /* Remote address is left zero (except the family) */ tp->base.key.rem_addr.addr.sa_family = (pj_uint16_t) ((type & PJSIP_TRANSPORT_IPV6) ? pj_AF_INET6() : pj_AF_INET()); /* Type name. */ tp->base.type_name = "UDP"; /* Transport flag */ tp->base.flag = pjsip_transport_get_flag_from_type(type); /* Length of addressess. */ tp->base.addr_len = sizeof(tp->base.local_addr); /* Init local address. */ status = pj_sock_getsockname(sock, &tp->base.local_addr, &tp->base.addr_len); if (status != PJ_SUCCESS) goto on_error; /* Init remote name. */ if (type == PJSIP_TRANSPORT_UDP) tp->base.remote_name.host = pj_str("0.0.0.0"); else tp->base.remote_name.host = pj_str("::0"); tp->base.remote_name.port = 0; /* Init direction */ tp->base.dir = PJSIP_TP_DIR_NONE; /* Set endpoint. */ tp->base.endpt = endpt; /* Transport manager and timer will be initialized by tpmgr */ /* Attach socket and assign name. */ udp_set_socket(tp, sock, a_name); /* Register to ioqueue */ status = register_to_ioqueue(tp); if (status != PJ_SUCCESS) goto on_error; /* Set functions. */ tp->base.send_msg = &udp_send_msg; tp->base.do_shutdown = &udp_shutdown; tp->base.destroy = &udp_destroy; /* Register to transport manager. */ tp->base.tpmgr = pjsip_endpt_get_tpmgr(endpt); status = pjsip_transport_register( tp->base.tpmgr, (pjsip_transport*)tp); if (status != PJ_SUCCESS) goto on_error; /* This is a permanent transport, so we initialize the ref count * to one so that transport manager won't destroy this transport * when there's no user! */ pjsip_transport_add_ref(&tp->base); /* Create rdata and put it in the array. */ tp->rdata_cnt = 0; tp->rdata = (pjsip_rx_data**) pj_pool_calloc(tp->base.pool, async_cnt, sizeof(pjsip_rx_data*)); for (i=0; ibase.ref_cnt, 0); pjsip_transport_destroy(&tp->base); return PJ_ENOMEM; } init_rdata(tp, i, rdata_pool, NULL); tp->rdata_cnt++; } /* Start reading the ioqueue. */ status = start_async_read(tp); if (status != PJ_SUCCESS) { pjsip_transport_destroy(&tp->base); return status; } /* Done. */ if (p_transport) *p_transport = &tp->base; PJ_LOG(4,(tp->base.obj_name, "SIP %s started, published address is %s%.*s%s:%d", pjsip_transport_get_type_desc((pjsip_transport_type_e)tp->base.key.type), ipv6_quoteb, (int)tp->base.local_name.host.slen, tp->base.local_name.host.ptr, ipv6_quotee, tp->base.local_name.port)); return PJ_SUCCESS; on_error: udp_destroy((pjsip_transport*)tp); return status; } PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt, pj_sock_t sock, const pjsip_host_port *a_name, unsigned async_cnt, pjsip_transport **p_transport) { return transport_attach(endpt, PJSIP_TRANSPORT_UDP, sock, a_name, async_cnt, p_transport); } PJ_DEF(pj_status_t) pjsip_udp_transport_attach2( pjsip_endpoint *endpt, pjsip_transport_type_e type, pj_sock_t sock, const pjsip_host_port *a_name, unsigned async_cnt, pjsip_transport **p_transport) { return transport_attach(endpt, type, sock, a_name, async_cnt, p_transport); } /* * Initialize pjsip_udp_transport_cfg structure with default values. */ PJ_DEF(void) pjsip_udp_transport_cfg_default(pjsip_udp_transport_cfg *cfg, int af) { pj_bzero(cfg, sizeof(*cfg)); cfg->af = af; pj_sockaddr_init(cfg->af, &cfg->bind_addr, NULL, 0); cfg->async_cnt = 1; } /* * pjsip_udp_transport_start2() * * Create a UDP socket in the specified address and start a transport. */ PJ_DEF(pj_status_t) pjsip_udp_transport_start2( pjsip_endpoint *endpt, const pjsip_udp_transport_cfg *cfg, pjsip_transport **p_transport) { pj_sock_t sock; pj_status_t status; pjsip_host_port addr_name; char addr_buf[PJ_INET6_ADDRSTRLEN]; pjsip_transport_type_e transport_type; pj_uint16_t af; int addr_len; PJ_ASSERT_RETURN(endpt && cfg && cfg->async_cnt, PJ_EINVAL); if (cfg->bind_addr.addr.sa_family == pj_AF_INET()) { af = pj_AF_INET(); transport_type = PJSIP_TRANSPORT_UDP; addr_len = sizeof(pj_sockaddr_in); } else { af = pj_AF_INET6(); transport_type = PJSIP_TRANSPORT_UDP6; addr_len = sizeof(pj_sockaddr_in6); } status = create_socket(af, &cfg->bind_addr, addr_len, &sock); if (status != PJ_SUCCESS) return status; /* Apply QoS, if specified */ pj_sock_apply_qos2(sock, cfg->qos_type, &cfg->qos_params, 2, THIS_FILE, "SIP UDP transport"); /* Apply sockopt, if specified */ if (cfg->sockopt_params.cnt) pj_sock_setsockopt_params(sock, &cfg->sockopt_params); if (cfg->addr_name.host.slen == 0) { /* Address name is not specified. * Build a name based on bound address. */ status = get_published_name(sock, addr_buf, sizeof(addr_buf), &addr_name); if (status != PJ_SUCCESS) { pj_sock_close(sock); return status; } } else { addr_name = cfg->addr_name; } return pjsip_udp_transport_attach2(endpt, transport_type, sock, &addr_name, cfg->async_cnt, p_transport); } /* * pjsip_udp_transport_start() * * Create a UDP socket in the specified address and start a transport. */ PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt, const pj_sockaddr_in *local_a, const pjsip_host_port *a_name, unsigned async_cnt, pjsip_transport **p_transport) { pjsip_udp_transport_cfg cfg; pjsip_udp_transport_cfg_default(&cfg, pj_AF_INET()); if (local_a) pj_sockaddr_cp(&cfg.bind_addr, local_a); if (a_name) cfg.addr_name = *a_name; cfg.async_cnt = async_cnt; return pjsip_udp_transport_start2(endpt, &cfg, p_transport); } /* * pjsip_udp_transport_start() * * Create a UDP socket in the specified address and start a transport. */ PJ_DEF(pj_status_t) pjsip_udp_transport_start6(pjsip_endpoint *endpt, const pj_sockaddr_in6 *local_a, const pjsip_host_port *a_name, unsigned async_cnt, pjsip_transport **p_transport) { pjsip_udp_transport_cfg cfg; pjsip_udp_transport_cfg_default(&cfg, pj_AF_INET6()); if (local_a) pj_sockaddr_cp(&cfg.bind_addr, local_a); if (a_name) cfg.addr_name = *a_name; cfg.async_cnt = async_cnt; return pjsip_udp_transport_start2(endpt, &cfg, p_transport); } /* * Retrieve the internal socket handle used by the UDP transport. */ PJ_DEF(pj_sock_t) pjsip_udp_transport_get_socket(pjsip_transport *transport) { struct udp_transport *tp; PJ_ASSERT_RETURN(transport != NULL, PJ_INVALID_SOCKET); tp = (struct udp_transport*) transport; return tp->sock; } /* * Temporarily pause or shutdown the transport. */ PJ_DEF(pj_status_t) pjsip_udp_transport_pause(pjsip_transport *transport, unsigned option) { struct udp_transport *tp; unsigned i; PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL); /* Flag must be specified */ PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL); tp = (struct udp_transport*) transport; /* Transport must not have been paused */ PJ_ASSERT_RETURN(tp->is_paused==0, PJ_EINVALIDOP); /* Set transport to paused first, so that when the read callback is * called by pj_ioqueue_post_completion() it will not try to * re-register the rdata. */ tp->is_paused = PJ_TRUE; /* Cancel the ioqueue operation. */ for (i=0; i<(unsigned)tp->rdata_cnt; ++i) { pj_ioqueue_post_completion(tp->key, &tp->rdata[i]->tp_info.op_key.op_key, -1); } /* Destroy the socket? */ if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) { if (tp->key) { /* This implicitly closes the socket */ pj_ioqueue_unregister(tp->key); tp->key = NULL; } else { /* Close socket. */ if (tp->sock && tp->sock != PJ_INVALID_SOCKET) { pj_sock_close(tp->sock); tp->sock = PJ_INVALID_SOCKET; } } tp->sock = PJ_INVALID_SOCKET; } PJ_LOG(4,(tp->base.obj_name, "SIP UDP transport paused")); return PJ_SUCCESS; } /* * Restart transport. * * If option is KEEP_SOCKET, just re-activate ioqueue operation. * * If option is DESTROY_SOCKET: * - if socket is specified, replace. * - if socket is not specified, create and replace. */ PJ_DEF(pj_status_t) pjsip_udp_transport_restart(pjsip_transport *transport, unsigned option, pj_sock_t sock, const pj_sockaddr_in *local, const pjsip_host_port *a_name) { return pjsip_udp_transport_restart2(transport, option, sock, (pj_sockaddr*)local, a_name); } PJ_DEF(pj_status_t) pjsip_udp_transport_restart2(pjsip_transport *transport, unsigned option, pj_sock_t sock, const pj_sockaddr *local, const pjsip_host_port *a_name) { struct udp_transport *tp; pj_status_t status; char addr[PJ_INET6_ADDRSTRLEN+10]; int i; PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL); /* Flag must be specified */ PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL); tp = (struct udp_transport*) transport; /* Pause the transport first, so that any active read loop spin will * quit as soon as possible. */ tp->is_paused = PJ_TRUE; if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) { char addr_buf[PJ_INET6_ADDRSTRLEN]; pjsip_host_port bound_name; /* Request to recreate transport */ /* Destroy existing socket, if any. */ if (tp->key) { /* This implicitly closes the socket */ pj_ioqueue_unregister(tp->key); tp->key = NULL; } else { /* Close socket. */ if (tp->sock && tp->sock != PJ_INVALID_SOCKET) { pj_sock_close(tp->sock); tp->sock = PJ_INVALID_SOCKET; } } tp->sock = PJ_INVALID_SOCKET; /* Create the socket if it's not specified */ if (sock == PJ_INVALID_SOCKET) { status = create_socket(local?local->addr.sa_family:pj_AF_UNSPEC(), local, local?pj_sockaddr_get_len(local):0, &sock); if (status != PJ_SUCCESS) return status; } /* If transport published name is not specified, calculate it * from the bound address. */ if (a_name == NULL) { status = get_published_name(sock, addr_buf, sizeof(addr_buf), &bound_name); if (status != PJ_SUCCESS) { pj_sock_close(sock); return status; } a_name = &bound_name; } /* Init local address. */ status = pj_sock_getsockname(sock, &tp->base.local_addr, &tp->base.addr_len); if (status != PJ_SUCCESS) { pj_sock_close(sock); return status; } /* Assign the socket and published address to transport. */ udp_set_socket(tp, sock, a_name); } else { /* For KEEP_SOCKET, transport must have been paused before */ PJ_ASSERT_RETURN(tp->is_paused, PJ_EINVALIDOP); /* If address name is specified, update it */ if (a_name != NULL) udp_set_pub_name(tp, a_name); } /* Make sure all udp_on_read_complete() loop spin are stopped */ do { pj_thread_sleep(1); } while (tp->read_loop_spin); /* Re-register new or existing socket to ioqueue. */ status = register_to_ioqueue(tp); if (status != PJ_SUCCESS) { return status; } /* Re-init op_key. */ for (i = 0; i < tp->rdata_cnt; ++i) { pj_ioqueue_op_key_init(&tp->rdata[i]->tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t)); } /* Restart async read operation. */ status = start_async_read(tp); if (status != PJ_SUCCESS) return status; /* Everything has been set up */ tp->is_paused = PJ_FALSE; PJ_LOG(4, (tp->base.obj_name, "SIP UDP transport restarted, published address is %s", pj_addr_str_print(&tp->base.local_name.host, tp->base.local_name.port, addr, sizeof(addr), 1))); return PJ_SUCCESS; }