/* $Id$ */ /* * Copyright (C) 2011-2011 Teluu Inc. (http://www.teluu.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #define THIS_FILE "event.c" #define MAX_EVENTS 16 typedef struct esub esub; struct esub { PJ_DECL_LIST_MEMBER(esub); pjmedia_event_cb *cb; void *user_data; void *epub; }; typedef struct event_queue { pjmedia_event events[MAX_EVENTS]; /**< array of events. */ int head, tail; pj_bool_t is_full; } event_queue; struct pjmedia_event_mgr { pj_pool_t *pool; pj_thread_t *thread; /**< worker thread. */ pj_bool_t is_quitting; pj_sem_t *sem; pj_mutex_t *mutex; event_queue ev_queue; event_queue *pub_ev_queue; /**< publish() event queue. */ esub esub_list; /**< list of subscribers. */ esub free_esub_list; /**< list of subscribers. */ esub *th_next_sub, /**< worker thread's next sub. */ *pub_next_sub; /**< publish() next sub. */ }; static pjmedia_event_mgr *event_manager_instance; static pj_status_t event_queue_add_event(event_queue* ev_queue, pjmedia_event *event) { if (ev_queue->is_full) { char ev_name[5]; /* This event will be ignored. */ PJ_LOG(4, (THIS_FILE, "Lost event %s from publisher [0x%p] " "due to full queue.", pjmedia_fourcc_name(event->type, ev_name), event->epub)); return PJ_ETOOMANY; } pj_memcpy(&ev_queue->events[ev_queue->tail], event, sizeof(*event)); ev_queue->tail = (ev_queue->tail + 1) % MAX_EVENTS; if (ev_queue->tail == ev_queue->head) ev_queue->is_full = PJ_TRUE; return PJ_SUCCESS; } static pj_status_t event_mgr_distribute_events(pjmedia_event_mgr *mgr, event_queue *ev_queue, esub **next_sub, pj_bool_t rls_lock) { pj_status_t err = PJ_SUCCESS; esub * sub = mgr->esub_list.next; pjmedia_event *ev = &ev_queue->events[ev_queue->head]; while (sub != &mgr->esub_list) { *next_sub = sub->next; /* Check if the subscriber is interested in * receiving the event from the publisher. */ if (sub->epub == ev->epub || !sub->epub) { pjmedia_event_cb *cb = sub->cb; void *user_data = sub->user_data; pj_status_t status; if (rls_lock) pj_mutex_unlock(mgr->mutex); status = (*cb)(ev, user_data); if (status != PJ_SUCCESS && err == PJ_SUCCESS) err = status; if (rls_lock) pj_mutex_lock(mgr->mutex); } sub = *next_sub; } *next_sub = NULL; ev_queue->head = (ev_queue->head + 1) % MAX_EVENTS; ev_queue->is_full = PJ_FALSE; return err; } /* Event worker thread function. */ static int event_worker_thread(void *arg) { pjmedia_event_mgr *mgr = (pjmedia_event_mgr *)arg; while (1) { /* Wait until there is an event. */ pj_sem_wait(mgr->sem); if (mgr->is_quitting) break; pj_mutex_lock(mgr->mutex); event_mgr_distribute_events(mgr, &mgr->ev_queue, &mgr->th_next_sub, PJ_TRUE); pj_mutex_unlock(mgr->mutex); } return 0; } PJ_DEF(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool, unsigned options, pjmedia_event_mgr **p_mgr) { pjmedia_event_mgr *mgr; pj_status_t status; mgr = PJ_POOL_ZALLOC_T(pool, pjmedia_event_mgr); mgr->pool = pj_pool_create(pool->factory, "evt mgr", 500, 500, NULL); pj_list_init(&mgr->esub_list); pj_list_init(&mgr->free_esub_list); if (!(options & PJMEDIA_EVENT_MGR_NO_THREAD)) { status = pj_sem_create(mgr->pool, "ev_sem", 0, MAX_EVENTS + 1, &mgr->sem); if (status != PJ_SUCCESS) return status; status = pj_thread_create(mgr->pool, "ev_thread", &event_worker_thread, mgr, 0, 0, &mgr->thread); if (status != PJ_SUCCESS) { pjmedia_event_mgr_destroy(mgr); return status; } } status = pj_mutex_create_recursive(mgr->pool, "ev_mutex", &mgr->mutex); if (status != PJ_SUCCESS) { pjmedia_event_mgr_destroy(mgr); return status; } if (!event_manager_instance) event_manager_instance = mgr; if (p_mgr) *p_mgr = mgr; return PJ_SUCCESS; } PJ_DEF(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void) { return event_manager_instance; } PJ_DEF(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr) { event_manager_instance = mgr; } PJ_DEF(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr) { if (!mgr) mgr = pjmedia_event_mgr_instance(); PJ_ASSERT_ON_FAIL(mgr != NULL, return); if (mgr->thread) { mgr->is_quitting = PJ_TRUE; pj_sem_post(mgr->sem); pj_thread_join(mgr->thread); } if (mgr->sem) { pj_sem_destroy(mgr->sem); mgr->sem = NULL; } if (mgr->mutex) { pj_mutex_destroy(mgr->mutex); mgr->mutex = NULL; } if (mgr->pool) pj_pool_release(mgr->pool); if (event_manager_instance == mgr) event_manager_instance = NULL; } PJ_DEF(void) pjmedia_event_init( pjmedia_event *event, pjmedia_event_type type, const pj_timestamp *ts, const void *src) { pj_bzero(event, sizeof(*event)); event->type = type; if (ts) event->timestamp.u64 = ts->u64; event->epub = event->src = src; } PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_mgr *mgr, pjmedia_event_cb *cb, void *user_data, void *epub) { esub *sub; PJ_ASSERT_RETURN(cb, PJ_EINVAL); if (!mgr) mgr = pjmedia_event_mgr_instance(); PJ_ASSERT_RETURN(mgr, PJ_EINVAL); pj_mutex_lock(mgr->mutex); /* Check whether callback function with the same user data is already * subscribed to the publisher. This is to prevent the callback function * receiving the same event from the same publisher more than once. */ sub = mgr->esub_list.next; while (sub != &mgr->esub_list) { esub *next = sub->next; if (sub->cb == cb && sub->user_data == user_data && sub->epub == epub) { pj_mutex_unlock(mgr->mutex); return PJ_SUCCESS; } sub = next; } if (mgr->free_esub_list.next != &mgr->free_esub_list) { sub = mgr->free_esub_list.next; pj_list_erase(sub); } else sub = PJ_POOL_ZALLOC_T(mgr->pool, esub); sub->cb = cb; sub->user_data = user_data; sub->epub = epub; pj_list_push_back(&mgr->esub_list, sub); pj_mutex_unlock(mgr->mutex); return PJ_SUCCESS; } PJ_DEF(pj_status_t) pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr, pjmedia_event_cb *cb, void *user_data, void *epub) { esub *sub; PJ_ASSERT_RETURN(cb, PJ_EINVAL); if (!mgr) mgr = pjmedia_event_mgr_instance(); PJ_ASSERT_RETURN(mgr, PJ_EINVAL); pj_mutex_lock(mgr->mutex); sub = mgr->esub_list.next; while (sub != &mgr->esub_list) { esub *next = sub->next; if (sub->cb == cb && (sub->user_data == user_data || !user_data) && (sub->epub == epub || !epub)) { /* If the worker thread or pjmedia_event_publish() API is * in the process of distributing events, make sure that * its pointer to the next subscriber stays valid. */ if (mgr->th_next_sub == sub) mgr->th_next_sub = sub->next; if (mgr->pub_next_sub == sub) mgr->pub_next_sub = sub->next; pj_list_erase(sub); pj_list_push_back(&mgr->free_esub_list, sub); if (user_data && epub) break; } sub = next; } pj_mutex_unlock(mgr->mutex); return PJ_SUCCESS; } PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_mgr *mgr, void *epub, pjmedia_event *event, pjmedia_event_publish_flag flag) { pj_status_t err = PJ_SUCCESS; PJ_ASSERT_RETURN(epub && event, PJ_EINVAL); if (!mgr) mgr = pjmedia_event_mgr_instance(); PJ_ASSERT_RETURN(mgr, PJ_EINVAL); event->epub = epub; pj_mutex_lock(mgr->mutex); if (flag & PJMEDIA_EVENT_PUBLISH_POST_EVENT) { if (event_queue_add_event(&mgr->ev_queue, event) == PJ_SUCCESS) pj_sem_post(mgr->sem); } else { /* For nested pjmedia_event_publish() calls, i.e. calling publish() * inside the subscriber's callback, the function will only add * the event to the event queue of the first publish() call. It * is the first publish() call that will be responsible to * distribute the events. */ if (mgr->pub_ev_queue) { event_queue_add_event(mgr->pub_ev_queue, event); } else { static event_queue ev_queue; pj_status_t status; ev_queue.head = ev_queue.tail = 0; ev_queue.is_full = PJ_FALSE; mgr->pub_ev_queue = &ev_queue; event_queue_add_event(mgr->pub_ev_queue, event); do { status = event_mgr_distribute_events(mgr, mgr->pub_ev_queue, &mgr->pub_next_sub, PJ_FALSE); if (status != PJ_SUCCESS && err == PJ_SUCCESS) err = status; } while(ev_queue.head != ev_queue.tail || ev_queue.is_full); mgr->pub_ev_queue = NULL; } } pj_mutex_unlock(mgr->mutex); return err; }