/* $Id$ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include "test.h" static pj_ioqueue_key_t *key; static pj_atomic_t *total_bytes; static pj_bool_t thread_quit_flag; struct op_key { pj_ioqueue_op_key_t op_key_; struct op_key *peer; char *buffer; pj_size_t size; int is_pending; pj_status_t last_err; pj_sockaddr_in addr; int addrlen; }; static void on_read_complete(pj_ioqueue_key_t *ioq_key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_received) { pj_status_t rc; struct op_key *recv_rec = (struct op_key *)op_key; for (;;) { struct op_key *send_rec = recv_rec->peer; recv_rec->is_pending = 0; if (bytes_received < 0) { if (-bytes_received != recv_rec->last_err) { recv_rec->last_err = (pj_status_t)-bytes_received; app_perror("...error receiving data", recv_rec->last_err); } } else if (bytes_received == 0) { /* note: previous error, or write callback */ } else { pj_atomic_add(total_bytes, (pj_atomic_value_t)bytes_received); if (!send_rec->is_pending) { pj_ssize_t sent = bytes_received; pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received); pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen); send_rec->addrlen = recv_rec->addrlen; rc = pj_ioqueue_sendto(ioq_key, &send_rec->op_key_, send_rec->buffer, &sent, 0, &send_rec->addr, send_rec->addrlen); send_rec->is_pending = (rc==PJ_EPENDING); if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) { app_perror("...send error(1)", rc); } } } if (!send_rec->is_pending) { bytes_received = recv_rec->size; rc = pj_ioqueue_recvfrom(ioq_key, &recv_rec->op_key_, recv_rec->buffer, &bytes_received, 0, &recv_rec->addr, &recv_rec->addrlen); recv_rec->is_pending = (rc==PJ_EPENDING); if (rc == PJ_SUCCESS) { /* fall through next loop. */ } else if (rc == PJ_EPENDING) { /* quit callback. */ break; } else { /* error */ app_perror("...recv error", rc); recv_rec->last_err = rc; bytes_received = 0; /* fall through next loop. */ } } else { /* recv will be done when write completion callback is called. */ break; } } } static void on_write_complete(pj_ioqueue_key_t *ioq_key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent) { struct op_key *send_rec = (struct op_key*)op_key; if (bytes_sent <= 0) { pj_status_t rc = (pj_status_t)-bytes_sent; if (rc != send_rec->last_err) { send_rec->last_err = rc; app_perror("...send error(2)", rc); } } send_rec->is_pending = 0; on_read_complete(ioq_key, &send_rec->peer->op_key_, 0); } static int worker_thread(void *arg) { pj_ioqueue_t *ioqueue = (pj_ioqueue_t*) arg; struct op_key read_op, write_op; char recv_buf[512], send_buf[512]; pj_ssize_t length; pj_status_t rc; read_op.peer = &write_op; read_op.is_pending = 0; read_op.last_err = 0; read_op.buffer = recv_buf; read_op.size = sizeof(recv_buf); read_op.addrlen = sizeof(read_op.addr); write_op.peer = &read_op; write_op.is_pending = 0; write_op.last_err = 0; write_op.buffer = send_buf; write_op.size = sizeof(send_buf); length = sizeof(recv_buf); rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0, &read_op.addr, &read_op.addrlen); if (rc == PJ_SUCCESS) { read_op.is_pending = 1; on_read_complete(key, &read_op.op_key_, length); } while (!thread_quit_flag) { pj_time_val timeout; timeout.sec = 0; timeout.msec = 10; rc = pj_ioqueue_poll(ioqueue, &timeout); } return 0; } int udp_echo_srv_ioqueue(void) { pj_pool_t *pool; pj_sock_t sock; pj_ioqueue_t *ioqueue; pj_ioqueue_callback callback; int i; pj_thread_t *thread[ECHO_SERVER_MAX_THREADS]; pj_status_t rc; pj_bzero(&callback, sizeof(callback)); callback.on_read_complete = &on_read_complete; callback.on_write_complete = &on_write_complete; pool = pj_pool_create(mem, NULL, 4000, 4000, NULL); if (!pool) return -10; rc = pj_ioqueue_create(pool, 2, &ioqueue); if (rc != PJ_SUCCESS) { app_perror("...pj_ioqueue_create error", rc); return -20; } rc = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, ECHO_SERVER_START_PORT, &sock); if (rc != PJ_SUCCESS) { app_perror("...app_socket error", rc); return -30; } rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL, &callback, &key); if (rc != PJ_SUCCESS) { app_perror("...error registering socket", rc); return -40; } rc = pj_atomic_create(pool, 0, &total_bytes); if (rc != PJ_SUCCESS) { app_perror("...error creating atomic variable", rc); return -45; } for (i=0; i