/* $Id$ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "turn.h" #include #if PJ_HAS_TCP struct accept_op { pj_ioqueue_op_key_t op_key; pj_sock_t sock; pj_sockaddr src_addr; int src_addr_len; }; struct tcp_listener { pj_turn_listener base; pj_ioqueue_key_t *key; unsigned accept_cnt; struct accept_op *accept_op; /* Array of accept_op's */ }; static void lis_on_accept_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t sock, pj_status_t status); static pj_status_t lis_destroy(pj_turn_listener *listener); static void transport_create(pj_sock_t sock, pj_turn_listener *lis, pj_sockaddr_t *src_addr, int src_addr_len); static void show_err(const char *sender, const char *title, pj_status_t status) { char errmsg[PJ_ERR_MSG_SIZE]; pj_strerror(status, errmsg, sizeof(errmsg)); PJ_LOG(4,(sender, "%s: %s", title, errmsg)); } /* * Create a new listener on the specified port. */ PJ_DEF(pj_status_t) pj_turn_listener_create_tcp(pj_turn_srv *srv, int af, const pj_str_t *bound_addr, unsigned port, unsigned concurrency_cnt, unsigned flags, pj_turn_listener **p_listener) { pj_pool_t *pool; struct tcp_listener *tcp_lis; pj_ioqueue_callback ioqueue_cb; unsigned i; pj_status_t status; /* Create structure */ pool = pj_pool_create(srv->core.pf, "tcpl%p", 1000, 1000, NULL); tcp_lis = PJ_POOL_ZALLOC_T(pool, struct tcp_listener); tcp_lis->base.pool = pool; tcp_lis->base.obj_name = pool->obj_name; tcp_lis->base.server = srv; tcp_lis->base.tp_type = PJ_TURN_TP_TCP; tcp_lis->base.sock = PJ_INVALID_SOCKET; //tcp_lis->base.sendto = &tcp_sendto; tcp_lis->base.destroy = &lis_destroy; tcp_lis->accept_cnt = concurrency_cnt; tcp_lis->base.flags = flags; /* Create socket */ status = pj_sock_socket(af, pj_SOCK_STREAM(), 0, &tcp_lis->base.sock); if (status != PJ_SUCCESS) goto on_error; /* Init bind address */ status = pj_sockaddr_init(af, &tcp_lis->base.addr, bound_addr, (pj_uint16_t)port); if (status != PJ_SUCCESS) goto on_error; /* Create info */ pj_ansi_strcpy(tcp_lis->base.info, "TCP:"); pj_sockaddr_print(&tcp_lis->base.addr, tcp_lis->base.info+4, sizeof(tcp_lis->base.info)-4, 3); /* Bind socket */ status = pj_sock_bind(tcp_lis->base.sock, &tcp_lis->base.addr, pj_sockaddr_get_len(&tcp_lis->base.addr)); if (status != PJ_SUCCESS) goto on_error; /* Listen() */ status = pj_sock_listen(tcp_lis->base.sock, 5); if (status != PJ_SUCCESS) goto on_error; /* Register to ioqueue */ pj_bzero(&ioqueue_cb, sizeof(ioqueue_cb)); ioqueue_cb.on_accept_complete = &lis_on_accept_complete; status = pj_ioqueue_register_sock(pool, srv->core.ioqueue, tcp_lis->base.sock, tcp_lis, &ioqueue_cb, &tcp_lis->key); /* Create op keys */ tcp_lis->accept_op = (struct accept_op*)pj_pool_calloc(pool, concurrency_cnt, sizeof(struct accept_op)); /* Create each accept_op and kick off read operation */ for (i=0; ikey, &tcp_lis->accept_op[i].op_key, PJ_INVALID_SOCKET, PJ_EPENDING); } /* Done */ PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s created", tcp_lis->base.info)); *p_listener = &tcp_lis->base; return PJ_SUCCESS; on_error: lis_destroy(&tcp_lis->base); return status; } /* * Destroy listener. */ static pj_status_t lis_destroy(pj_turn_listener *listener) { struct tcp_listener *tcp_lis = (struct tcp_listener *)listener; unsigned i; if (tcp_lis->key) { pj_ioqueue_unregister(tcp_lis->key); tcp_lis->key = NULL; tcp_lis->base.sock = PJ_INVALID_SOCKET; } else if (tcp_lis->base.sock != PJ_INVALID_SOCKET) { pj_sock_close(tcp_lis->base.sock); tcp_lis->base.sock = PJ_INVALID_SOCKET; } for (i=0; iaccept_cnt; ++i) { /* Nothing to do */ } if (tcp_lis->base.pool) { pj_pool_t *pool = tcp_lis->base.pool; PJ_LOG(4,(tcp_lis->base.obj_name, "Listener %s destroyed", tcp_lis->base.info)); tcp_lis->base.pool = NULL; pj_pool_release(pool); } return PJ_SUCCESS; } /* * Callback on new TCP connection. */ static void lis_on_accept_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t sock, pj_status_t status) { struct tcp_listener *tcp_lis; struct accept_op *accept_op = (struct accept_op*) op_key; tcp_lis = (struct tcp_listener*) pj_ioqueue_get_user_data(key); PJ_UNUSED_ARG(sock); do { /* Report new connection. */ if (status == PJ_SUCCESS) { char addr[PJ_INET6_ADDRSTRLEN+8]; PJ_LOG(5,(tcp_lis->base.obj_name, "Incoming TCP from %s", pj_sockaddr_print(&accept_op->src_addr, addr, sizeof(addr), 3))); transport_create(accept_op->sock, &tcp_lis->base, &accept_op->src_addr, accept_op->src_addr_len); } else if (status != PJ_EPENDING) { show_err(tcp_lis->base.obj_name, "accept()", status); } /* Prepare next accept() */ accept_op->src_addr_len = sizeof(accept_op->src_addr); status = pj_ioqueue_accept(key, op_key, &accept_op->sock, NULL, &accept_op->src_addr, &accept_op->src_addr_len); } while (status != PJ_EPENDING && status != PJ_ECANCELLED && status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)); } /****************************************************************************/ /* * Transport */ enum { TIMER_NONE, TIMER_DESTROY }; /* The delay in seconds to be applied before TCP transport is destroyed when * no allocation is referencing it. This also means the initial time to wait * after the initial TCP connection establishment to receive a valid STUN * message in the transport. */ #define SHUTDOWN_DELAY 10 struct recv_op { pj_ioqueue_op_key_t op_key; pj_turn_pkt pkt; }; struct tcp_transport { pj_turn_transport base; pj_pool_t *pool; pj_timer_entry timer; pj_turn_allocation *alloc; int ref_cnt; pj_sock_t sock; pj_ioqueue_key_t *key; struct recv_op recv_op; pj_ioqueue_op_key_t send_op; }; static void tcp_on_read_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read); static pj_status_t tcp_sendto(pj_turn_transport *tp, const void *packet, pj_size_t size, unsigned flag, const pj_sockaddr_t *addr, int addr_len); static void tcp_destroy(struct tcp_transport *tcp); static void tcp_add_ref(pj_turn_transport *tp, pj_turn_allocation *alloc); static void tcp_dec_ref(pj_turn_transport *tp, pj_turn_allocation *alloc); static void timer_callback(pj_timer_heap_t *timer_heap, pj_timer_entry *entry); static void transport_create(pj_sock_t sock, pj_turn_listener *lis, pj_sockaddr_t *src_addr, int src_addr_len) { pj_pool_t *pool; struct tcp_transport *tcp; pj_ioqueue_callback cb; pj_status_t status; pool = pj_pool_create(lis->server->core.pf, "tcp%p", 1000, 1000, NULL); tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport); tcp->base.obj_name = pool->obj_name; tcp->base.listener = lis; tcp->base.info = lis->info; tcp->base.sendto = &tcp_sendto; tcp->base.add_ref = &tcp_add_ref; tcp->base.dec_ref = &tcp_dec_ref; tcp->pool = pool; tcp->sock = sock; pj_timer_entry_init(&tcp->timer, TIMER_NONE, tcp, &timer_callback); /* Register to ioqueue */ pj_bzero(&cb, sizeof(cb)); cb.on_read_complete = &tcp_on_read_complete; status = pj_ioqueue_register_sock(pool, lis->server->core.ioqueue, sock, tcp, &cb, &tcp->key); if (status != PJ_SUCCESS) { tcp_destroy(tcp); return; } /* Init pkt */ tcp->recv_op.pkt.pool = pj_pool_create(lis->server->core.pf, "tcpkt%p", 1000, 1000, NULL); tcp->recv_op.pkt.transport = &tcp->base; tcp->recv_op.pkt.src.tp_type = PJ_TURN_TP_TCP; tcp->recv_op.pkt.src_addr_len = src_addr_len; pj_memcpy(&tcp->recv_op.pkt.src.clt_addr, src_addr, src_addr_len); tcp_on_read_complete(tcp->key, &tcp->recv_op.op_key, -PJ_EPENDING); /* Should not access transport from now, it may have been destroyed */ } static void tcp_destroy(struct tcp_transport *tcp) { if (tcp->key) { pj_ioqueue_unregister(tcp->key); tcp->key = NULL; tcp->sock = 0; } else if (tcp->sock) { pj_sock_close(tcp->sock); tcp->sock = 0; } if (tcp->pool) { pj_pool_release(tcp->pool); } } static void timer_callback(pj_timer_heap_t *timer_heap, pj_timer_entry *entry) { struct tcp_transport *tcp = (struct tcp_transport*) entry->user_data; PJ_UNUSED_ARG(timer_heap); tcp_destroy(tcp); } static void tcp_on_read_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read) { struct tcp_transport *tcp; struct recv_op *recv_op = (struct recv_op*) op_key; pj_status_t status; tcp = (struct tcp_transport*) pj_ioqueue_get_user_data(key); do { /* Report to server or allocation, if we have allocation */ if (bytes_read > 0) { recv_op->pkt.len = bytes_read; pj_gettimeofday(&recv_op->pkt.rx_time); tcp_add_ref(&tcp->base, NULL); if (tcp->alloc) { pj_turn_allocation_on_rx_client_pkt(tcp->alloc, &recv_op->pkt); } else { pj_turn_srv_on_rx_pkt(tcp->base.listener->server, &recv_op->pkt); } pj_assert(tcp->ref_cnt > 0); tcp_dec_ref(&tcp->base, NULL); } else if (bytes_read != -PJ_EPENDING) { /* TCP connection closed/error. Notify client and then destroy * ourselves. * Note: the -PJ_EPENDING is the value passed during init. */ ++tcp->ref_cnt; if (tcp->alloc) { if (bytes_read != 0) { show_err(tcp->base.obj_name, "TCP socket error", -bytes_read); } else { PJ_LOG(5,(tcp->base.obj_name, "TCP socket closed")); } pj_turn_allocation_on_transport_closed(tcp->alloc, &tcp->base); tcp->alloc = NULL; } pj_assert(tcp->ref_cnt > 0); if (--tcp->ref_cnt == 0) { tcp_destroy(tcp); return; } } /* Reset pool */ pj_pool_reset(recv_op->pkt.pool); /* If packet is full discard it */ if (recv_op->pkt.len == sizeof(recv_op->pkt.pkt)) { PJ_LOG(4,(tcp->base.obj_name, "Buffer discarded")); recv_op->pkt.len = 0; } /* Read next packet */ bytes_read = sizeof(recv_op->pkt.pkt) - recv_op->pkt.len; status = pj_ioqueue_recv(tcp->key, op_key, recv_op->pkt.pkt + recv_op->pkt.len, &bytes_read, 0); if (status != PJ_EPENDING && status != PJ_SUCCESS) bytes_read = -status; } while (status != PJ_EPENDING && status != PJ_ECANCELLED && status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)); } static pj_status_t tcp_sendto(pj_turn_transport *tp, const void *packet, pj_size_t size, unsigned flag, const pj_sockaddr_t *addr, int addr_len) { struct tcp_transport *tcp = (struct tcp_transport*) tp; pj_ssize_t length = size; PJ_UNUSED_ARG(addr); PJ_UNUSED_ARG(addr_len); return pj_ioqueue_send(tcp->key, &tcp->send_op, packet, &length, flag); } static void tcp_add_ref(pj_turn_transport *tp, pj_turn_allocation *alloc) { struct tcp_transport *tcp = (struct tcp_transport*) tp; ++tcp->ref_cnt; if (tcp->alloc == NULL && alloc) { tcp->alloc = alloc; } /* Cancel shutdown timer if it's running */ if (tcp->timer.id != TIMER_NONE) { pj_timer_heap_cancel(tcp->base.listener->server->core.timer_heap, &tcp->timer); tcp->timer.id = TIMER_NONE; } } static void tcp_dec_ref(pj_turn_transport *tp, pj_turn_allocation *alloc) { struct tcp_transport *tcp = (struct tcp_transport*) tp; --tcp->ref_cnt; if (alloc && alloc == tcp->alloc) { tcp->alloc = NULL; } if (tcp->ref_cnt == 0 && tcp->timer.id == TIMER_NONE) { pj_time_val delay = { SHUTDOWN_DELAY, 0 }; tcp->timer.id = TIMER_DESTROY; pj_timer_heap_schedule(tcp->base.listener->server->core.timer_heap, &tcp->timer, &delay); } } #else /* PJ_HAS_TCP */ /* To avoid empty translation unit warning */ int listener_tcp_dummy = 0; #endif /* PJ_HAS_TCP */