/* $Id$ */ /* * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include /* Only declare the API if PJ_HAS_TCP is true */ #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 #define THIS_FILE "sip_transport_tcp.c" #define MAX_ASYNC_CNT 16 #define POOL_LIS_INIT 512 #define POOL_LIS_INC 512 #define POOL_TP_INIT 512 #define POOL_TP_INC 512 struct tcp_listener; struct tcp_transport; /* * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the * SIP transport factory). */ struct tcp_listener { pjsip_tpfactory factory; pj_bool_t is_registered; pjsip_endpoint *endpt; pjsip_tpmgr *tpmgr; pj_activesock_t *asock; }; /* * This structure is used to keep delayed transmit operation in a list. * A delayed transmission occurs when application sends tx_data when * the TCP connect/establishment is still in progress. These delayed * transmission will be "flushed" once the socket is connected (either * successfully or with errors). */ struct delayed_tdata { PJ_DECL_LIST_MEMBER(struct delayed_tdata); pjsip_tx_data_op_key *tdata_op_key; }; /* * This structure describes the TCP transport, and it's descendant of * pjsip_transport. */ struct tcp_transport { pjsip_transport base; pj_bool_t is_server; /* Do not save listener instance in the transport, because * listener might be destroyed during transport's lifetime. * See http://trac.pjsip.org/repos/ticket/491 struct tcp_listener *listener; */ pj_bool_t is_registered; pj_bool_t is_closing; pj_status_t close_reason; pj_sock_t sock; pj_activesock_t *asock; pj_bool_t has_pending_connect; /* Keep-alive timer. */ pj_timer_entry ka_timer; pj_time_val last_activity; pjsip_tx_data_op_key ka_op_key; pj_str_t ka_pkt; /* TCP transport can only have one rdata! * Otherwise chunks of incoming PDU may be received on different * buffer. */ pjsip_rx_data rdata; /* Pending transmission list. */ struct delayed_tdata delayed_list; }; /**************************************************************************** * PROTOTYPES */ /* This callback is called when pending accept() operation completes. */ static pj_bool_t on_accept_complete(pj_activesock_t *asock, pj_sock_t newsock, const pj_sockaddr_t *src_addr, int src_addr_len); /* This callback is called by transport manager to destroy listener */ static pj_status_t lis_destroy(pjsip_tpfactory *factory); /* This callback is called by transport manager to create transport */ static pj_status_t lis_create_transport(pjsip_tpfactory *factory, pjsip_tpmgr *mgr, pjsip_endpoint *endpt, const pj_sockaddr *rem_addr, int addr_len, pjsip_transport **transport); /* Common function to create and initialize transport */ static pj_status_t tcp_create(struct tcp_listener *listener, pj_pool_t *pool, pj_sock_t sock, pj_bool_t is_server, const pj_sockaddr_in *local, const pj_sockaddr_in *remote, struct tcp_transport **p_tcp); static void tcp_perror(const char *sender, const char *title, pj_status_t status) { char errmsg[PJ_ERR_MSG_SIZE]; pj_strerror(status, errmsg, sizeof(errmsg)); PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status)); } static void sockaddr_to_host_port( pj_pool_t *pool, pjsip_host_port *host_port, const pj_sockaddr_in *addr ) { host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4); pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 2); host_port->host.slen = pj_ansi_strlen(host_port->host.ptr); host_port->port = pj_sockaddr_get_port(addr); } /**************************************************************************** * The TCP listener/transport factory. */ /* * This is the public API to create, initialize, register, and start the * TCP listener. */ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt, const pj_sockaddr_in *local, const pjsip_host_port *a_name, unsigned async_cnt, pjsip_tpfactory **p_factory) { pj_pool_t *pool; pj_sock_t sock = PJ_INVALID_SOCKET; struct tcp_listener *listener; pj_activesock_cfg asock_cfg; pj_activesock_cb listener_cb; pj_sockaddr_in *listener_addr; int addr_len; pj_status_t status; /* Sanity check */ PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL); /* Verify that address given in a_name (if any) is valid */ if (a_name && a_name->host.slen) { pj_sockaddr_in tmp; status = pj_sockaddr_in_init(&tmp, &a_name->host, (pj_uint16_t)a_name->port); if (status != PJ_SUCCESS || tmp.sin_addr.s_addr == PJ_INADDR_ANY || tmp.sin_addr.s_addr == PJ_INADDR_NONE) { /* Invalid address */ return PJ_EINVAL; } } pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT, POOL_LIS_INC); PJ_ASSERT_RETURN(pool, PJ_ENOMEM); listener = PJ_POOL_ZALLOC_T(pool, struct tcp_listener); listener->factory.pool = pool; listener->factory.type = PJSIP_TRANSPORT_TCP; listener->factory.type_name = "tcp"; listener->factory.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); pj_ansi_strcpy(listener->factory.obj_name, "tcplis"); status = pj_lock_create_recursive_mutex(pool, "tcplis", &listener->factory.lock); if (status != PJ_SUCCESS) goto on_error; /* Create and bind socket */ status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock); if (status != PJ_SUCCESS) goto on_error; listener_addr = (pj_sockaddr_in*)&listener->factory.local_addr; if (local) { pj_memcpy(listener_addr, local, sizeof(pj_sockaddr_in)); } else { pj_sockaddr_in_init(listener_addr, NULL, 0); } status = pj_sock_bind(sock, listener_addr, sizeof(pj_sockaddr_in)); if (status != PJ_SUCCESS) goto on_error; /* Retrieve the bound address */ addr_len = sizeof(pj_sockaddr_in); status = pj_sock_getsockname(sock, listener_addr, &addr_len); if (status != PJ_SUCCESS) goto on_error; /* If published host/IP is specified, then use that address as the * listener advertised address. */ if (a_name && a_name->host.slen) { /* Copy the address */ listener->factory.addr_name = *a_name; pj_strdup(listener->factory.pool, &listener->factory.addr_name.host, &a_name->host); listener->factory.addr_name.port = a_name->port; } else { /* No published address is given, use the bound address */ /* If the address returns 0.0.0.0, use the default * interface address as the transport's address. */ if (listener_addr->sin_addr.s_addr == 0) { pj_sockaddr hostip; status = pj_gethostip(pj_AF_INET(), &hostip); if (status != PJ_SUCCESS) goto on_error; listener_addr->sin_addr.s_addr = hostip.ipv4.sin_addr.s_addr; } /* Save the address name */ sockaddr_to_host_port(listener->factory.pool, &listener->factory.addr_name, listener_addr); } /* If port is zero, get the bound port */ if (listener->factory.addr_name.port == 0) { listener->factory.addr_name.port = pj_ntohs(listener_addr->sin_port); } pj_ansi_snprintf(listener->factory.obj_name, sizeof(listener->factory.obj_name), "tcplis:%d", listener->factory.addr_name.port); /* Start listening to the address */ status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG); if (status != PJ_SUCCESS) goto on_error; /* Create active socket */ if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; pj_activesock_cfg_default(&asock_cfg); asock_cfg.async_cnt = async_cnt; pj_bzero(&listener_cb, sizeof(listener_cb)); listener_cb.on_accept_complete = &on_accept_complete; status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg, pjsip_endpt_get_ioqueue(endpt), &listener_cb, listener, &listener->asock); /* Register to transport manager */ listener->endpt = endpt; listener->tpmgr = pjsip_endpt_get_tpmgr(endpt); listener->factory.create_transport = lis_create_transport; listener->factory.destroy = lis_destroy; listener->is_registered = PJ_TRUE; status = pjsip_tpmgr_register_tpfactory(listener->tpmgr, &listener->factory); if (status != PJ_SUCCESS) { listener->is_registered = PJ_FALSE; goto on_error; } /* Start pending accept() operations */ status = pj_activesock_start_accept(listener->asock, pool); if (status != PJ_SUCCESS) goto on_error; PJ_LOG(4,(listener->factory.obj_name, "SIP TCP listener ready for incoming connections at %.*s:%d", (int)listener->factory.addr_name.host.slen, listener->factory.addr_name.host.ptr, listener->factory.addr_name.port)); /* Return the pointer to user */ if (p_factory) *p_factory = &listener->factory; return PJ_SUCCESS; on_error: if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET) pj_sock_close(sock); lis_destroy(&listener->factory); return status; } /* * This is the public API to create, initialize, register, and start the * TCP listener. */ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, const pj_sockaddr_in *local, unsigned async_cnt, pjsip_tpfactory **p_factory) { return pjsip_tcp_transport_start2(endpt, local, NULL, async_cnt, p_factory); } /* This callback is called by transport manager to destroy listener */ static pj_status_t lis_destroy(pjsip_tpfactory *factory) { struct tcp_listener *listener = (struct tcp_listener *)factory; if (listener->is_registered) { pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory); listener->is_registered = PJ_FALSE; } if (listener->asock) { pj_activesock_close(listener->asock); listener->asock = NULL; } if (listener->factory.lock) { pj_lock_destroy(listener->factory.lock); listener->factory.lock = NULL; } if (listener->factory.pool) { pj_pool_t *pool = listener->factory.pool; PJ_LOG(4,(listener->factory.obj_name, "SIP TCP listener destroyed")); listener->factory.pool = NULL; pj_pool_release(pool); } return PJ_SUCCESS; } /***************************************************************************/ /* * TCP Transport */ /* * Prototypes. */ /* Called by transport manager to send message */ static pj_status_t tcp_send_msg(pjsip_transport *transport, pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, int addr_len, void *token, pjsip_transport_callback callback); /* Called by transport manager to shutdown */ static pj_status_t tcp_shutdown(pjsip_transport *transport); /* Called by transport manager to destroy transport */ static pj_status_t tcp_destroy_transport(pjsip_transport *transport); /* Utility to destroy transport */ static pj_status_t tcp_destroy(pjsip_transport *transport, pj_status_t reason); /* Callback on incoming data */ static pj_bool_t on_data_read(pj_activesock_t *asock, void *data, pj_size_t size, pj_status_t status, pj_size_t *remainder); /* Callback when packet is sent */ static pj_bool_t on_data_sent(pj_activesock_t *asock, pj_ioqueue_op_key_t *send_key, pj_ssize_t sent); /* Callback when connect completes */ static pj_bool_t on_connect_complete(pj_activesock_t *asock, pj_status_t status); /* TCP keep-alive timer callback */ static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e); /* * Common function to create TCP transport, called when pending accept() and * pending connect() complete. */ static pj_status_t tcp_create( struct tcp_listener *listener, pj_pool_t *pool, pj_sock_t sock, pj_bool_t is_server, const pj_sockaddr_in *local, const pj_sockaddr_in *remote, struct tcp_transport **p_tcp) { struct tcp_transport *tcp; pj_ioqueue_t *ioqueue; pj_activesock_cfg asock_cfg; pj_activesock_cb tcp_callback; const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA; pj_status_t status; PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL); if (pool == NULL) { pool = pjsip_endpt_create_pool(listener->endpt, "tcp", POOL_TP_INIT, POOL_TP_INC); PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM); } /* * Create and initialize basic transport structure. */ tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport); tcp->is_server = is_server; tcp->sock = sock; /*tcp->listener = listener;*/ pj_list_init(&tcp->delayed_list); tcp->base.pool = pool; pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, (is_server ? "tcps%p" :"tcpc%p"), tcp); status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt); if (status != PJ_SUCCESS) { goto on_error; } status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock); if (status != PJ_SUCCESS) { goto on_error; } tcp->base.key.type = PJSIP_TRANSPORT_TCP; pj_memcpy(&tcp->base.key.rem_addr, remote, sizeof(pj_sockaddr_in)); tcp->base.type_name = "tcp"; tcp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); tcp->base.info = (char*) pj_pool_alloc(pool, 64); pj_ansi_snprintf(tcp->base.info, 64, "TCP to %s:%d", pj_inet_ntoa(remote->sin_addr), (int)pj_ntohs(remote->sin_port)); tcp->base.addr_len = sizeof(pj_sockaddr_in); pj_memcpy(&tcp->base.local_addr, local, sizeof(pj_sockaddr_in)); sockaddr_to_host_port(pool, &tcp->base.local_name, local); sockaddr_to_host_port(pool, &tcp->base.remote_name, remote); tcp->base.endpt = listener->endpt; tcp->base.tpmgr = listener->tpmgr; tcp->base.send_msg = &tcp_send_msg; tcp->base.do_shutdown = &tcp_shutdown; tcp->base.destroy = &tcp_destroy_transport; /* Create active socket */ pj_activesock_cfg_default(&asock_cfg); asock_cfg.async_cnt = 1; pj_bzero(&tcp_callback, sizeof(tcp_callback)); tcp_callback.on_data_read = &on_data_read; tcp_callback.on_data_sent = &on_data_sent; tcp_callback.on_connect_complete = &on_connect_complete; ioqueue = pjsip_endpt_get_ioqueue(listener->endpt); status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg, ioqueue, &tcp_callback, tcp, &tcp->asock); if (status != PJ_SUCCESS) { goto on_error; } /* Register transport to transport manager */ status = pjsip_transport_register(listener->tpmgr, &tcp->base); if (status != PJ_SUCCESS) { goto on_error; } tcp->is_registered = PJ_TRUE; /* Initialize keep-alive timer */ tcp->ka_timer.user_data = (void*)tcp; tcp->ka_timer.cb = &tcp_keep_alive_timer; pj_ioqueue_op_key_init(&tcp->ka_op_key.key, sizeof(pj_ioqueue_op_key_t)); pj_strdup(tcp->base.pool, &tcp->ka_pkt, &ka_pkt); /* Done setting up basic transport. */ *p_tcp = tcp; PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created", (tcp->is_server ? "server" : "client"))); return PJ_SUCCESS; on_error: tcp_destroy(&tcp->base, status); return status; } /* Flush all delayed transmision once the socket is connected. */ static void tcp_flush_pending_tx(struct tcp_transport *tcp) { pj_lock_acquire(tcp->base.lock); while (!pj_list_empty(&tcp->delayed_list)) { struct delayed_tdata *pending_tx; pjsip_tx_data *tdata; pj_ioqueue_op_key_t *op_key; pj_ssize_t size; pj_status_t status; pending_tx = tcp->delayed_list.next; pj_list_erase(pending_tx); tdata = pending_tx->tdata_op_key->tdata; op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; /* send! */ size = tdata->buf.cur - tdata->buf.start; status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start, &size, 0); if (status != PJ_EPENDING) { on_data_sent(tcp->asock, op_key, size); } } pj_lock_release(tcp->base.lock); } /* Called by transport manager to destroy transport */ static pj_status_t tcp_destroy_transport(pjsip_transport *transport) { struct tcp_transport *tcp = (struct tcp_transport*)transport; /* Transport would have been unregistered by now since this callback * is called by transport manager. */ tcp->is_registered = PJ_FALSE; return tcp_destroy(transport, tcp->close_reason); } /* Destroy TCP transport */ static pj_status_t tcp_destroy(pjsip_transport *transport, pj_status_t reason) { struct tcp_transport *tcp = (struct tcp_transport*)transport; if (tcp->close_reason == 0) tcp->close_reason = reason; if (tcp->is_registered) { tcp->is_registered = PJ_FALSE; pjsip_transport_destroy(transport); /* pjsip_transport_destroy will recursively call this function * again. */ return PJ_SUCCESS; } /* Mark transport as closing */ tcp->is_closing = PJ_TRUE; /* Stop keep-alive timer. */ if (tcp->ka_timer.id) { pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer); tcp->ka_timer.id = PJ_FALSE; } /* Cancel all delayed transmits */ while (!pj_list_empty(&tcp->delayed_list)) { struct delayed_tdata *pending_tx; pj_ioqueue_op_key_t *op_key; pending_tx = tcp->delayed_list.next; pj_list_erase(pending_tx); op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; on_data_sent(tcp->asock, op_key, -reason); } if (tcp->rdata.tp_info.pool) { pj_pool_release(tcp->rdata.tp_info.pool); tcp->rdata.tp_info.pool = NULL; } if (tcp->asock) { pj_activesock_close(tcp->asock); tcp->asock = NULL; tcp->sock = PJ_INVALID_SOCKET; } else if (tcp->sock != PJ_INVALID_SOCKET) { pj_sock_close(tcp->sock); tcp->sock = PJ_INVALID_SOCKET; } if (tcp->base.lock) { pj_lock_destroy(tcp->base.lock); tcp->base.lock = NULL; } if (tcp->base.ref_cnt) { pj_atomic_destroy(tcp->base.ref_cnt); tcp->base.ref_cnt = NULL; } if (tcp->base.pool) { pj_pool_t *pool; if (reason != PJ_SUCCESS) { char errmsg[PJ_ERR_MSG_SIZE]; pj_strerror(reason, errmsg, sizeof(errmsg)); PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed with reason %d: %s", reason, errmsg)); } else { PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed normally")); } pool = tcp->base.pool; tcp->base.pool = NULL; pj_pool_release(pool); } return PJ_SUCCESS; } /* * This utility function creates receive data buffers and start * asynchronous recv() operations from the socket. It is called after * accept() or connect() operation complete. */ static pj_status_t tcp_start_read(struct tcp_transport *tcp) { pj_pool_t *pool; pj_ssize_t size; pj_sockaddr_in *rem_addr; void *readbuf[1]; pj_status_t status; /* Init rdata */ pool = pjsip_endpt_create_pool(tcp->base.endpt, "rtd%p", PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC); if (!pool) { tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM); return PJ_ENOMEM; } tcp->rdata.tp_info.pool = pool; tcp->rdata.tp_info.transport = &tcp->base; tcp->rdata.tp_info.tp_data = tcp; tcp->rdata.tp_info.op_key.rdata = &tcp->rdata; pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t)); tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr; tcp->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in); rem_addr = (pj_sockaddr_in*) &tcp->base.key.rem_addr; pj_ansi_strcpy(tcp->rdata.pkt_info.src_name, pj_inet_ntoa(rem_addr->sin_addr)); tcp->rdata.pkt_info.src_port = pj_ntohs(rem_addr->sin_port); size = sizeof(tcp->rdata.pkt_info.packet); readbuf[0] = tcp->rdata.pkt_info.packet; status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size, readbuf, 0); if (status != PJ_SUCCESS && status != PJ_EPENDING) { PJ_LOG(4, (tcp->base.obj_name, "pj_activesock_start_read() error, status=%d", status)); return status; } return PJ_SUCCESS; } /* This callback is called by transport manager for the TCP factory * to create outgoing transport to the specified destination. */ static pj_status_t lis_create_transport(pjsip_tpfactory *factory, pjsip_tpmgr *mgr, pjsip_endpoint *endpt, const pj_sockaddr *rem_addr, int addr_len, pjsip_transport **p_transport) { struct tcp_listener *listener; struct tcp_transport *tcp; pj_sock_t sock; pj_sockaddr_in local_addr; pj_status_t status; /* Sanity checks */ PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr && addr_len && p_transport, PJ_EINVAL); /* Check that address is a sockaddr_in */ PJ_ASSERT_RETURN(rem_addr->addr.sa_family == pj_AF_INET() && addr_len == sizeof(pj_sockaddr_in), PJ_EINVAL); listener = (struct tcp_listener*)factory; /* Create socket */ status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock); if (status != PJ_SUCCESS) return status; /* Bind to any port */ status = pj_sock_bind_in(sock, 0, 0); if (status != PJ_SUCCESS) { pj_sock_close(sock); return status; } /* Get the local port */ addr_len = sizeof(pj_sockaddr_in); status = pj_sock_getsockname(sock, &local_addr, &addr_len); if (status != PJ_SUCCESS) { pj_sock_close(sock); return status; } /* Initially set the address from the listener's address */ local_addr.sin_addr.s_addr = ((pj_sockaddr_in*)&listener->factory.local_addr)->sin_addr.s_addr; /* Create the transport descriptor */ status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr, (pj_sockaddr_in*)rem_addr, &tcp); if (status != PJ_SUCCESS) return status; /* Start asynchronous connect() operation */ tcp->has_pending_connect = PJ_TRUE; status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr, sizeof(pj_sockaddr_in)); if (status == PJ_SUCCESS) { on_connect_complete(tcp->asock, PJ_SUCCESS); } else if (status != PJ_EPENDING) { tcp_destroy(&tcp->base, status); return status; } if (tcp->has_pending_connect) { /* Update (again) local address, just in case local address currently * set is different now that asynchronous connect() is started. */ addr_len = sizeof(pj_sockaddr_in); if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) { pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; /* Some systems (like old Win32 perhaps) may not set local address * properly before socket is fully connected. */ if (tp_addr->sin_addr.s_addr != local_addr.sin_addr.s_addr && local_addr.sin_addr.s_addr != 0) { tp_addr->sin_addr.s_addr = local_addr.sin_addr.s_addr; tp_addr->sin_port = local_addr.sin_port; sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name, &local_addr); } } PJ_LOG(4,(tcp->base.obj_name, "TCP transport %.*s:%d is connecting to %.*s:%d...", (int)tcp->base.local_name.host.slen, tcp->base.local_name.host.ptr, tcp->base.local_name.port, (int)tcp->base.remote_name.host.slen, tcp->base.remote_name.host.ptr, tcp->base.remote_name.port)); } /* Done */ *p_transport = &tcp->base; return PJ_SUCCESS; } /* * This callback is called by active socket when pending accept() operation * has completed. */ static pj_bool_t on_accept_complete(pj_activesock_t *asock, pj_sock_t sock, const pj_sockaddr_t *src_addr, int src_addr_len) { struct tcp_listener *listener; struct tcp_transport *tcp; char addr[PJ_INET6_ADDRSTRLEN+10]; pj_status_t status; PJ_UNUSED_ARG(src_addr_len); listener = (struct tcp_listener*) pj_activesock_get_user_data(asock); PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE); PJ_LOG(4,(listener->factory.obj_name, "TCP listener %.*s:%d: got incoming TCP connection " "from %s, sock=%d", (int)listener->factory.addr_name.host.slen, listener->factory.addr_name.host.ptr, listener->factory.addr_name.port, pj_sockaddr_print(src_addr, addr, sizeof(addr), 3), sock)); /* * Incoming connection! * Create TCP transport for the new socket. */ status = tcp_create( listener, NULL, sock, PJ_TRUE, (const pj_sockaddr_in*)&listener->factory.local_addr, (const pj_sockaddr_in*)src_addr, &tcp); if (status == PJ_SUCCESS) { status = tcp_start_read(tcp); if (status != PJ_SUCCESS) { PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); tcp_destroy(&tcp->base, status); } else { /* Start keep-alive timer */ if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) { pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0}; pjsip_endpt_schedule_timer(listener->endpt, &tcp->ka_timer, &delay); tcp->ka_timer.id = PJ_TRUE; pj_gettimeofday(&tcp->last_activity); } } } return PJ_TRUE; } /* * Callback from ioqueue when packet is sent. */ static pj_bool_t on_data_sent(pj_activesock_t *asock, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent) { struct tcp_transport *tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock); pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; /* Note that op_key may be the op_key from keep-alive, thus * it will not have tdata etc. */ tdata_op_key->tdata = NULL; if (tdata_op_key->callback) { /* * Notify sip_transport.c that packet has been sent. */ if (bytes_sent == 0) bytes_sent = -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent); /* Mark last activity time */ pj_gettimeofday(&tcp->last_activity); } /* Check for error/closure */ if (bytes_sent <= 0) { pj_status_t status; PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", bytes_sent)); status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) : -bytes_sent; if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); return PJ_FALSE; } return PJ_TRUE; } /* * This callback is called by transport manager to send SIP message */ static pj_status_t tcp_send_msg(pjsip_transport *transport, pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, int addr_len, void *token, pjsip_transport_callback callback) { struct tcp_transport *tcp = (struct tcp_transport*)transport; pj_ssize_t size; pj_bool_t delayed = PJ_FALSE; pj_status_t status = PJ_SUCCESS; /* Sanity check */ PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL); /* Check that there's no pending operation associated with the tdata */ PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX); /* Check the address is supported */ PJ_ASSERT_RETURN(rem_addr && addr_len==sizeof(pj_sockaddr_in), PJ_EINVAL); /* Init op key. */ tdata->op_key.tdata = tdata; tdata->op_key.token = token; tdata->op_key.callback = callback; /* If asynchronous connect() has not completed yet, just put the * transmit data in the pending transmission list since we can not * use the socket yet. */ if (tcp->has_pending_connect) { /* * Looks like connect() is still in progress. Check again (this time * with holding the lock) to be sure. */ pj_lock_acquire(tcp->base.lock); if (tcp->has_pending_connect) { struct delayed_tdata *delayed_tdata; /* * connect() is still in progress. Put the transmit data to * the delayed list. */ delayed_tdata = PJ_POOL_ALLOC_T(tdata->pool, struct delayed_tdata); delayed_tdata->tdata_op_key = &tdata->op_key; pj_list_push_back(&tcp->delayed_list, delayed_tdata); status = PJ_EPENDING; /* Prevent pj_ioqueue_send() to be called below */ delayed = PJ_TRUE; } pj_lock_release(tcp->base.lock); } if (!delayed) { /* * Transport is ready to go. Send the packet to ioqueue to be * sent asynchronously. */ size = tdata->buf.cur - tdata->buf.start; status = pj_activesock_send(tcp->asock, (pj_ioqueue_op_key_t*)&tdata->op_key, tdata->buf.start, &size, 0); if (status != PJ_EPENDING) { /* Not pending (could be immediate success or error) */ tdata->op_key.tdata = NULL; /* Shutdown transport on closure/errors */ if (size <= 0) { PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", size)); if (status == PJ_SUCCESS) status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); } } } return status; } /* * This callback is called by transport manager to shutdown transport. */ static pj_status_t tcp_shutdown(pjsip_transport *transport) { struct tcp_transport *tcp = (struct tcp_transport*)transport; /* Stop keep-alive timer. */ if (tcp->ka_timer.id) { pjsip_endpt_cancel_timer(tcp->base.endpt, &tcp->ka_timer); tcp->ka_timer.id = PJ_FALSE; } return PJ_SUCCESS; } /* * Callback from ioqueue that an incoming data is received from the socket. */ static pj_bool_t on_data_read(pj_activesock_t *asock, void *data, pj_size_t size, pj_status_t status, pj_size_t *remainder) { enum { MAX_IMMEDIATE_PACKET = 10 }; struct tcp_transport *tcp; pjsip_rx_data *rdata; PJ_UNUSED_ARG(data); tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock); rdata = &tcp->rdata; /* Don't do anything if transport is closing. */ if (tcp->is_closing) { tcp->is_closing++; return PJ_FALSE; } /* Houston, we have packet! Report the packet to transport manager * to be parsed. */ if (status == PJ_SUCCESS) { pj_size_t size_eaten; /* Mark this as an activity */ pj_gettimeofday(&tcp->last_activity); pj_assert((void*)rdata->pkt_info.packet == data); /* Init pkt_info part. */ rdata->pkt_info.len = size; rdata->pkt_info.zero = 0; pj_gettimeofday(&rdata->pkt_info.timestamp); /* Report to transport manager. * The transport manager will tell us how many bytes of the packet * have been processed (as valid SIP message). */ size_eaten = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata); pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); /* Move unprocessed data to the front of the buffer */ *remainder = size - size_eaten; if (*remainder > 0 && *remainder != size) { pj_memmove(rdata->pkt_info.packet, rdata->pkt_info.packet + size_eaten, *remainder); } } else { /* Transport is closed */ PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); /* We can not destroy the transport since high level objects may * still keep reference to this transport. So we can only * instruct transport manager to gracefully start the shutdown * procedure for this transport. */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); return PJ_FALSE; } /* Reset pool. */ pj_pool_reset(rdata->tp_info.pool); return PJ_TRUE; } /* * Callback from ioqueue when asynchronous connect() operation completes. */ static pj_bool_t on_connect_complete(pj_activesock_t *asock, pj_status_t status) { struct tcp_transport *tcp; pj_sockaddr_in addr; int addrlen; tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock); /* Mark that pending connect() operation has completed. */ tcp->has_pending_connect = PJ_FALSE; /* Check connect() status */ if (status != PJ_SUCCESS) { tcp_perror(tcp->base.obj_name, "TCP connect() error", status); /* Cancel all delayed transmits */ while (!pj_list_empty(&tcp->delayed_list)) { struct delayed_tdata *pending_tx; pj_ioqueue_op_key_t *op_key; pending_tx = tcp->delayed_list.next; pj_list_erase(pending_tx); op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; on_data_sent(tcp->asock, op_key, -status); } /* We can not destroy the transport since high level objects may * still keep reference to this transport. So we can only * instruct transport manager to gracefully start the shutdown * procedure for this transport. */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); return PJ_FALSE; } PJ_LOG(4,(tcp->base.obj_name, "TCP transport %.*s:%d is connected to %.*s:%d", (int)tcp->base.local_name.host.slen, tcp->base.local_name.host.ptr, tcp->base.local_name.port, (int)tcp->base.remote_name.host.slen, tcp->base.remote_name.host.ptr, tcp->base.remote_name.port)); /* Update (again) local address, just in case local address currently * set is different now that the socket is connected (could happen * on some systems, like old Win32 probably?). */ addrlen = sizeof(pj_sockaddr_in); if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) { pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; if (tp_addr->sin_addr.s_addr != addr.sin_addr.s_addr) { tp_addr->sin_addr.s_addr = addr.sin_addr.s_addr; tp_addr->sin_port = addr.sin_port; sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name, tp_addr); } } /* Start pending read */ status = tcp_start_read(tcp); if (status != PJ_SUCCESS) { /* We can not destroy the transport since high level objects may * still keep reference to this transport. So we can only * instruct transport manager to gracefully start the shutdown * procedure for this transport. */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); return PJ_FALSE; } /* Flush all pending send operations */ tcp_flush_pending_tx(tcp); /* Start keep-alive timer */ if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) { pj_time_val delay = { PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0 }; pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer, &delay); tcp->ka_timer.id = PJ_TRUE; pj_gettimeofday(&tcp->last_activity); } return PJ_TRUE; } /* Transport keep-alive timer callback */ static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e) { struct tcp_transport *tcp = (struct tcp_transport*) e->user_data; pj_time_val delay; pj_time_val now; pj_ssize_t size; pj_status_t status; PJ_UNUSED_ARG(th); tcp->ka_timer.id = PJ_TRUE; pj_gettimeofday(&now); PJ_TIME_VAL_SUB(now, tcp->last_activity); if (now.sec > 0 && now.sec < PJSIP_TCP_KEEP_ALIVE_INTERVAL) { /* There has been activity, so don't send keep-alive */ delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL - now.sec; delay.msec = 0; pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer, &delay); tcp->ka_timer.id = PJ_TRUE; return; } PJ_LOG(5,(tcp->base.obj_name, "Sending %d byte(s) keep-alive to %.*s:%d", (int)tcp->ka_pkt.slen, (int)tcp->base.remote_name.host.slen, tcp->base.remote_name.host.ptr, tcp->base.remote_name.port)); /* Send the data */ size = tcp->ka_pkt.slen; status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key, tcp->ka_pkt.ptr, &size, 0); if (status != PJ_SUCCESS && status != PJ_EPENDING) { tcp_perror(tcp->base.obj_name, "Error sending keep-alive packet", status); pjsip_transport_shutdown(&tcp->base); return; } /* Register next keep-alive */ delay.sec = PJSIP_TCP_KEEP_ALIVE_INTERVAL; delay.msec = 0; pjsip_endpt_schedule_timer(tcp->base.endpt, &tcp->ka_timer, &delay); tcp->ka_timer.id = PJ_TRUE; } #endif /* PJ_HAS_TCP */