/* $Id$ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 # include static pj_bool_t ios_bg_support = PJ_TRUE; #endif #define PJ_ACTIVESOCK_MAX_LOOP 50 enum read_type { TYPE_NONE, TYPE_RECV, TYPE_RECV_FROM }; enum shutdown_dir { SHUT_NONE = 0, SHUT_RX = 1, SHUT_TX = 2 }; struct read_op { pj_ioqueue_op_key_t op_key; pj_uint8_t *pkt; unsigned max_size; pj_size_t size; pj_sockaddr src_addr; int src_addr_len; }; struct accept_op { pj_ioqueue_op_key_t op_key; pj_sock_t new_sock; pj_sockaddr rem_addr; int rem_addr_len; }; struct send_data { pj_uint8_t *data; pj_ssize_t len; pj_ssize_t sent; unsigned flags; }; struct pj_activesock_t { pj_ioqueue_key_t *key; pj_bool_t stream_oriented; pj_bool_t whole_data; pj_ioqueue_t *ioqueue; void *user_data; unsigned async_count; unsigned shutdown; unsigned max_loop; pj_activesock_cb cb; #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 int bg_setting; pj_sock_t sock; CFReadStreamRef readStream; #endif unsigned err_counter; pj_status_t last_err; struct send_data send_data; struct read_op *read_op; pj_uint32_t read_flags; enum read_type read_type; struct accept_op *accept_op; }; static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read); static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent); #if PJ_HAS_TCP static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t sock, pj_status_t status); static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, pj_status_t status); #endif PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg) { pj_bzero(cfg, sizeof(*cfg)); cfg->async_cnt = 1; cfg->concurrency = -1; cfg->whole_data = PJ_TRUE; } #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 static void activesock_destroy_iphone_os_stream(pj_activesock_t *asock) { if (asock->readStream) { CFReadStreamClose(asock->readStream); CFRelease(asock->readStream); asock->readStream = NULL; } } static void activesock_create_iphone_os_stream(pj_activesock_t *asock) { if (ios_bg_support && asock->bg_setting && asock->stream_oriented) { activesock_destroy_iphone_os_stream(asock); CFStreamCreatePairWithSocket(kCFAllocatorDefault, asock->sock, &asock->readStream, NULL); if (!asock->readStream || CFReadStreamSetProperty(asock->readStream, kCFStreamNetworkServiceType, kCFStreamNetworkServiceTypeVoIP) != TRUE || CFReadStreamOpen(asock->readStream) != TRUE) { PJ_LOG(2,("", "Failed to configure TCP transport for VoIP " "usage. Usage of THIS particular TCP transport in " "background mode will not be supported.")); activesock_destroy_iphone_os_stream(asock); } } } PJ_DEF(void) pj_activesock_set_iphone_os_bg(pj_activesock_t *asock, int val) { asock->bg_setting = val; if (asock->bg_setting) activesock_create_iphone_os_stream(asock); else activesock_destroy_iphone_os_stream(asock); } PJ_DEF(void) pj_activesock_enable_iphone_os_bg(pj_bool_t val) { ios_bg_support = val; } #endif PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, pj_sock_t sock, int sock_type, const pj_activesock_cfg *opt, pj_ioqueue_t *ioqueue, const pj_activesock_cb *cb, void *user_data, pj_activesock_t **p_asock) { pj_activesock_t *asock; pj_ioqueue_callback ioq_cb; pj_status_t status; PJ_ASSERT_RETURN(pool && ioqueue && cb && p_asock, PJ_EINVAL); PJ_ASSERT_RETURN(sock!=0 && sock!=PJ_INVALID_SOCKET, PJ_EINVAL); PJ_ASSERT_RETURN(sock_type==pj_SOCK_STREAM() || sock_type==pj_SOCK_DGRAM(), PJ_EINVAL); PJ_ASSERT_RETURN(!opt || opt->async_cnt >= 1, PJ_EINVAL); asock = PJ_POOL_ZALLOC_T(pool, pj_activesock_t); asock->ioqueue = ioqueue; asock->stream_oriented = (sock_type == pj_SOCK_STREAM()); asock->async_count = (opt? opt->async_cnt : 1); asock->whole_data = (opt? opt->whole_data : 1); asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP; asock->user_data = user_data; pj_memcpy(&asock->cb, cb, sizeof(*cb)); pj_bzero(&ioq_cb, sizeof(ioq_cb)); ioq_cb.on_read_complete = &ioqueue_on_read_complete; ioq_cb.on_write_complete = &ioqueue_on_write_complete; #if PJ_HAS_TCP ioq_cb.on_connect_complete = &ioqueue_on_connect_complete; ioq_cb.on_accept_complete = &ioqueue_on_accept_complete; #endif status = pj_ioqueue_register_sock2(pool, ioqueue, sock, (opt? opt->grp_lock : NULL), asock, &ioq_cb, &asock->key); if (status != PJ_SUCCESS) { pj_activesock_close(asock); return status; } if (asock->whole_data) { /* Must disable concurrency otherwise there is a race condition */ pj_ioqueue_set_concurrency(asock->key, 0); } else if (opt && opt->concurrency >= 0) { pj_ioqueue_set_concurrency(asock->key, opt->concurrency); } #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 asock->sock = sock; asock->bg_setting = PJ_ACTIVESOCK_TCP_IPHONE_OS_BG; #endif *p_asock = asock; return PJ_SUCCESS; } PJ_DEF(pj_status_t) pj_activesock_create_udp( pj_pool_t *pool, const pj_sockaddr *addr, const pj_activesock_cfg *opt, pj_ioqueue_t *ioqueue, const pj_activesock_cb *cb, void *user_data, pj_activesock_t **p_asock, pj_sockaddr *bound_addr) { pj_sock_t sock_fd; pj_sockaddr default_addr; pj_status_t status; if (addr == NULL) { pj_sockaddr_init(pj_AF_INET(), &default_addr, NULL, 0); addr = &default_addr; } status = pj_sock_socket(addr->addr.sa_family, pj_SOCK_DGRAM(), 0, &sock_fd); if (status != PJ_SUCCESS) { return status; } status = pj_sock_bind(sock_fd, addr, pj_sockaddr_get_len(addr)); if (status != PJ_SUCCESS) { pj_sock_close(sock_fd); return status; } status = pj_activesock_create(pool, sock_fd, pj_SOCK_DGRAM(), opt, ioqueue, cb, user_data, p_asock); if (status != PJ_SUCCESS) { pj_sock_close(sock_fd); return status; } if (bound_addr) { int addr_len = sizeof(*bound_addr); status = pj_sock_getsockname(sock_fd, bound_addr, &addr_len); if (status != PJ_SUCCESS) { pj_activesock_close(*p_asock); return status; } } return PJ_SUCCESS; } PJ_DEF(pj_status_t) pj_activesock_close(pj_activesock_t *asock) { pj_ioqueue_key_t *key; pj_bool_t unregister = PJ_FALSE; PJ_ASSERT_RETURN(asock, PJ_EINVAL); asock->shutdown = SHUT_RX | SHUT_TX; /* Avoid double unregistration on the key */ key = asock->key; if (key) { pj_ioqueue_lock_key(key); unregister = (asock->key != NULL); asock->key = NULL; pj_ioqueue_unlock_key(key); } if (unregister) { pj_ioqueue_unregister(key); #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 activesock_destroy_iphone_os_stream(asock); #endif } return PJ_SUCCESS; } PJ_DEF(pj_status_t) pj_activesock_set_user_data( pj_activesock_t *asock, void *user_data) { PJ_ASSERT_RETURN(asock, PJ_EINVAL); asock->user_data = user_data; return PJ_SUCCESS; } PJ_DEF(void*) pj_activesock_get_user_data(pj_activesock_t *asock) { PJ_ASSERT_RETURN(asock, NULL); return asock->user_data; } PJ_DEF(pj_status_t) pj_activesock_start_read(pj_activesock_t *asock, pj_pool_t *pool, unsigned buff_size, pj_uint32_t flags) { void **readbuf; unsigned i; PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); readbuf = (void**) pj_pool_calloc(pool, asock->async_count, sizeof(void*)); for (i=0; iasync_count; ++i) { readbuf[i] = pj_pool_alloc(pool, buff_size); } return pj_activesock_start_read2(asock, pool, buff_size, readbuf, flags); } PJ_DEF(pj_status_t) pj_activesock_start_read2( pj_activesock_t *asock, pj_pool_t *pool, unsigned buff_size, void *readbuf[], pj_uint32_t flags) { unsigned i; pj_status_t status; PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP); PJ_ASSERT_RETURN(asock->read_op == NULL, PJ_EINVALIDOP); asock->read_op = (struct read_op*) pj_pool_calloc(pool, asock->async_count, sizeof(struct read_op)); asock->read_type = TYPE_RECV; asock->read_flags = flags; for (i=0; iasync_count; ++i) { struct read_op *r = &asock->read_op[i]; pj_ssize_t size_to_read; r->pkt = (pj_uint8_t*)readbuf[i]; size_to_read = r->max_size = buff_size; status = pj_ioqueue_recv(asock->key, &r->op_key, r->pkt, &size_to_read, PJ_IOQUEUE_ALWAYS_ASYNC | flags); PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG); if (status != PJ_EPENDING) return status; } return PJ_SUCCESS; } PJ_DEF(pj_status_t) pj_activesock_start_recvfrom(pj_activesock_t *asock, pj_pool_t *pool, unsigned buff_size, pj_uint32_t flags) { void **readbuf; unsigned i; PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); readbuf = (void**) pj_pool_calloc(pool, asock->async_count, sizeof(void*)); for (i=0; iasync_count; ++i) { readbuf[i] = pj_pool_alloc(pool, buff_size); } return pj_activesock_start_recvfrom2(asock, pool, buff_size, readbuf, flags); } PJ_DEF(pj_status_t) pj_activesock_start_recvfrom2( pj_activesock_t *asock, pj_pool_t *pool, unsigned buff_size, void *readbuf[], pj_uint32_t flags) { unsigned i; pj_status_t status; PJ_ASSERT_RETURN(asock && pool && buff_size, PJ_EINVAL); PJ_ASSERT_RETURN(asock->read_type == TYPE_NONE, PJ_EINVALIDOP); asock->read_op = (struct read_op*) pj_pool_calloc(pool, asock->async_count, sizeof(struct read_op)); asock->read_type = TYPE_RECV_FROM; asock->read_flags = flags; for (i=0; iasync_count; ++i) { struct read_op *r = &asock->read_op[i]; pj_ssize_t size_to_read; r->pkt = (pj_uint8_t*) readbuf[i]; size_to_read = r->max_size = buff_size; r->src_addr_len = sizeof(r->src_addr); status = pj_ioqueue_recvfrom(asock->key, &r->op_key, r->pkt, &size_to_read, PJ_IOQUEUE_ALWAYS_ASYNC | flags, &r->src_addr, &r->src_addr_len); PJ_ASSERT_RETURN(status != PJ_SUCCESS, PJ_EBUG); if (status != PJ_EPENDING) return status; } return PJ_SUCCESS; } static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read) { pj_activesock_t *asock; struct read_op *r = (struct read_op*)op_key; unsigned loop = 0; pj_status_t status; asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); /* Ignore if we've been shutdown */ if (asock->shutdown & SHUT_RX) return; do { unsigned flags; if (bytes_read > 0) { /* * We've got new data. */ pj_size_t remainder; pj_bool_t ret; /* Append this new data to existing data. If socket is stream * oriented, user might have left some data in the buffer. * Otherwise if socket is datagram there will be nothing in * existing packet hence the packet will contain only the new * packet. */ r->size += bytes_read; /* Set default remainder to zero */ remainder = 0; /* And return value to TRUE */ ret = PJ_TRUE; /* Notify callback */ if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) { ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size, PJ_SUCCESS, &remainder); } else if (asock->read_type == TYPE_RECV_FROM && asock->cb.on_data_recvfrom) { ret = (*asock->cb.on_data_recvfrom)(asock, r->pkt, r->size, &r->src_addr, r->src_addr_len, PJ_SUCCESS); } /* If callback returns false, we have been destroyed! */ if (!ret) return; /* Only stream oriented socket may leave data in the packet */ if (asock->stream_oriented) { r->size = remainder; } else { r->size = 0; } } else if (bytes_read <= 0 && -bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && (asock->stream_oriented || -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))) { pj_size_t remainder; pj_bool_t ret; if (bytes_read == 0) { /* For stream/connection oriented socket, this means the * connection has been closed. For datagram sockets, it means * we've received datagram with zero length. */ if (asock->stream_oriented) status = PJ_EEOF; else status = PJ_SUCCESS; } else { /* This means we've got an error. If this is stream/connection * oriented, it means connection has been closed. For datagram * sockets, it means we've got some error (e.g. EWOULDBLOCK). */ status = (pj_status_t)-bytes_read; } /* Set default remainder to zero */ remainder = 0; /* And return value to TRUE */ ret = PJ_TRUE; /* Notify callback */ if (asock->read_type == TYPE_RECV && asock->cb.on_data_read) { /* For connection oriented socket, we still need to report * the remainder data (if any) to the user to let user do * processing with the remainder data before it closes the * connection. * If there is no remainder data, set the packet to NULL. */ /* Shouldn't set the packet to NULL, as there may be active * socket user, such as SSL socket, that needs to have access * to the read buffer packet. */ //ret = (*asock->cb.on_data_read)(asock, (r->size? r->pkt:NULL), // r->size, status, &remainder); ret = (*asock->cb.on_data_read)(asock, r->pkt, r->size, status, &remainder); } else if (asock->read_type == TYPE_RECV_FROM && asock->cb.on_data_recvfrom) { /* This would always be datagram oriented hence there's * nothing in the packet. We can't be sure if there will be * anything useful in the source_addr, so just put NULL * there too. */ /* In some scenarios, status may be PJ_SUCCESS. The upper * layer application may not expect the callback to be called * with successful status and NULL data, so lets not call the * callback if the status is PJ_SUCCESS. */ if (status != PJ_SUCCESS ) { ret = (*asock->cb.on_data_recvfrom)(asock, NULL, 0, NULL, 0, status); } } /* If callback returns false, we have been destroyed! */ if (!ret) return; /* Also stop further read if we've been shutdown */ if (asock->shutdown & SHUT_RX) return; /* Only stream oriented socket may leave data in the packet */ if (asock->stream_oriented) { r->size = remainder; } else { r->size = 0; } } /* Read next data. We limit ourselves to processing max_loop immediate * data, so when the loop counter has exceeded this value, force the * read()/recvfrom() to return pending operation to allow the program * to do other jobs. */ bytes_read = r->max_size - r->size; flags = asock->read_flags; if (++loop >= asock->max_loop) flags |= PJ_IOQUEUE_ALWAYS_ASYNC; if (asock->read_type == TYPE_RECV) { status = pj_ioqueue_recv(key, op_key, r->pkt + r->size, &bytes_read, flags); } else { r->src_addr_len = sizeof(r->src_addr); status = pj_ioqueue_recvfrom(key, op_key, r->pkt + r->size, &bytes_read, flags, &r->src_addr, &r->src_addr_len); } if (status == PJ_SUCCESS) { /* Immediate data */ ; } else if (status != PJ_EPENDING && status != PJ_ECANCELLED) { /* Error */ bytes_read = -status; } else { break; } } while (1); } static pj_status_t send_remaining(pj_activesock_t *asock, pj_ioqueue_op_key_t *send_key) { struct send_data *sd = (struct send_data*)send_key->activesock_data; pj_status_t status; do { pj_ssize_t size; size = sd->len - sd->sent; status = pj_ioqueue_send(asock->key, send_key, sd->data+sd->sent, &size, sd->flags); if (status != PJ_SUCCESS) { /* Pending or error */ break; } sd->sent += size; if (sd->sent == sd->len) { /* The whole data has been sent. */ return PJ_SUCCESS; } } while (sd->sent < sd->len); return status; } PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, pj_ioqueue_op_key_t *send_key, const void *data, pj_ssize_t *size, unsigned flags) { PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); if (asock->shutdown & SHUT_TX) return PJ_EINVALIDOP; send_key->activesock_data = NULL; if (asock->whole_data) { pj_ssize_t whole; pj_status_t status; whole = *size; status = pj_ioqueue_send(asock->key, send_key, data, size, flags); if (status != PJ_SUCCESS) { /* Pending or error */ return status; } if (*size == whole) { /* The whole data has been sent. */ return PJ_SUCCESS; } /* Data was partially sent */ asock->send_data.data = (pj_uint8_t*)data; asock->send_data.len = whole; asock->send_data.sent = *size; asock->send_data.flags = flags; send_key->activesock_data = &asock->send_data; /* Try again */ status = send_remaining(asock, send_key); if (status == PJ_SUCCESS) { *size = whole; } return status; } else { return pj_ioqueue_send(asock->key, send_key, data, size, flags); } } PJ_DEF(pj_status_t) pj_activesock_sendto( pj_activesock_t *asock, pj_ioqueue_op_key_t *send_key, const void *data, pj_ssize_t *size, unsigned flags, const pj_sockaddr_t *addr, int addr_len) { PJ_ASSERT_RETURN(asock && send_key && data && size && addr && addr_len, PJ_EINVAL); if (asock->shutdown & SHUT_TX) return PJ_EINVALIDOP; return pj_ioqueue_sendto(asock->key, send_key, data, size, flags, addr, addr_len); } static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent) { pj_activesock_t *asock; asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); /* Ignore if we've been shutdown. This may cause data to be partially * sent even when 'wholedata' was requested if the OS only sent partial * buffer. */ if (asock->shutdown & SHUT_TX) return; if (bytes_sent > 0 && op_key->activesock_data) { /* whole_data is requested. Make sure we send all the data */ struct send_data *sd = (struct send_data*)op_key->activesock_data; sd->sent += bytes_sent; if (sd->sent == sd->len) { /* all has been sent */ bytes_sent = sd->sent; op_key->activesock_data = NULL; } else { /* send remaining data */ pj_status_t status; status = send_remaining(asock, op_key); if (status == PJ_EPENDING) return; else if (status == PJ_SUCCESS) bytes_sent = sd->sent; else bytes_sent = -status; op_key->activesock_data = NULL; } } if (asock->cb.on_data_sent) { pj_bool_t ret; ret = (*asock->cb.on_data_sent)(asock, op_key, bytes_sent); /* If callback returns false, we have been destroyed! */ if (!ret) return; } } #if PJ_HAS_TCP PJ_DEF(pj_status_t) pj_activesock_start_accept(pj_activesock_t *asock, pj_pool_t *pool) { unsigned i; PJ_ASSERT_RETURN(asock, PJ_EINVAL); PJ_ASSERT_RETURN(asock->accept_op==NULL, PJ_EINVALIDOP); /* Ignore if we've been shutdown */ if (asock->shutdown) return PJ_EINVALIDOP; asock->accept_op = (struct accept_op*) pj_pool_calloc(pool, asock->async_count, sizeof(struct accept_op)); for (i=0; iasync_count; ++i) { struct accept_op *a = &asock->accept_op[i]; pj_status_t status; do { a->new_sock = PJ_INVALID_SOCKET; a->rem_addr_len = sizeof(a->rem_addr); status = pj_ioqueue_accept(asock->key, &a->op_key, &a->new_sock, NULL, &a->rem_addr, &a->rem_addr_len); if (status == PJ_SUCCESS) { /* We've got immediate connection. Not sure if it's a good * idea to call the callback now (probably application will * not be prepared to process it), so lets just silently * close the socket. */ pj_sock_close(a->new_sock); } } while (status == PJ_SUCCESS); if (status != PJ_EPENDING) { return status; } } return PJ_SUCCESS; } static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t new_sock, pj_status_t status) { pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); struct accept_op *accept_op = (struct accept_op*) op_key; PJ_UNUSED_ARG(new_sock); /* Ignore if we've been shutdown */ if (asock->shutdown) return; do { if (status == asock->last_err && status != PJ_SUCCESS) { asock->err_counter++; if (asock->err_counter >= PJ_ACTIVESOCK_MAX_CONSECUTIVE_ACCEPT_ERROR) { PJ_LOG(3, ("", "Received %d consecutive errors: %d for the accept()" " operation, stopping further ioqueue accepts.", asock->err_counter, asock->last_err)); if ((status == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) && (asock->cb.on_accept_complete2)) { (*asock->cb.on_accept_complete2)(asock, accept_op->new_sock, &accept_op->rem_addr, accept_op->rem_addr_len, PJ_ESOCKETSTOP); } return; } } else { asock->err_counter = 0; asock->last_err = status; } if (status==PJ_SUCCESS && (asock->cb.on_accept_complete2 || asock->cb.on_accept_complete)) { pj_bool_t ret; /* Notify callback */ if (asock->cb.on_accept_complete2) { ret = (*asock->cb.on_accept_complete2)(asock, accept_op->new_sock, &accept_op->rem_addr, accept_op->rem_addr_len, status); } else { ret = (*asock->cb.on_accept_complete)(asock, accept_op->new_sock, &accept_op->rem_addr, accept_op->rem_addr_len); } /* If callback returns false, we have been destroyed! */ if (!ret) return; #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 activesock_create_iphone_os_stream(asock); #endif } else if (status==PJ_SUCCESS) { /* Application doesn't handle the new socket, we need to * close it to avoid resource leak. */ pj_sock_close(accept_op->new_sock); } /* Don't start another accept() if we've been shutdown */ if (asock->shutdown) return; /* Prepare next accept() */ accept_op->new_sock = PJ_INVALID_SOCKET; accept_op->rem_addr_len = sizeof(accept_op->rem_addr); status = pj_ioqueue_accept(asock->key, op_key, &accept_op->new_sock, NULL, &accept_op->rem_addr, &accept_op->rem_addr_len); } while (status != PJ_EPENDING && status != PJ_ECANCELLED); } PJ_DEF(pj_status_t) pj_activesock_start_connect( pj_activesock_t *asock, pj_pool_t *pool, const pj_sockaddr_t *remaddr, int addr_len) { PJ_UNUSED_ARG(pool); if (asock->shutdown) return PJ_EINVALIDOP; return pj_ioqueue_connect(asock->key, remaddr, addr_len); } static void ioqueue_on_connect_complete(pj_ioqueue_key_t *key, pj_status_t status) { pj_activesock_t *asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); /* Ignore if we've been shutdown */ if (asock->shutdown) return; if (asock->cb.on_connect_complete) { pj_bool_t ret; ret = (*asock->cb.on_connect_complete)(asock, status); if (!ret) { /* We've been destroyed */ return; } #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \ PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0 activesock_create_iphone_os_stream(asock); #endif } } #endif /* PJ_HAS_TCP */