/* $Id$ */ /* * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) * Copyright (C) 2003-2008 Benny Prijono * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* memcpy() */ #define THIS_FILE "stream.c" #define ERRLEVEL 1 #define LOGERR_(expr) PJ_PERROR(4,expr); #define TRC_(expr) PJ_LOG(5,expr) #define BYTES_PER_SAMPLE 2 /* Limit the number of synthetic audio samples that are generated by PLC. * Normally PLC should have it's own means to limit the number of * synthetic frames, so we need to set this to a reasonably large value * just as precaution */ #define MAX_PLC_MSEC PJMEDIA_MAX_PLC_DURATION_MSEC /* Tracing jitter buffer operations in a stream session to a CSV file. * The trace will contain JB operation timestamp, frame info, RTP info, and * the JB state right after the operation. */ #define TRACE_JB 0 /* Enable/disable trace. */ #define TRACE_JB_PATH_PREFIX "" /* Optional path/prefix for the CSV filename. */ #if TRACE_JB # include # define TRACE_JB_INVALID_FD ((pj_oshandle_t)-1) # define TRACE_JB_OPENED(s) (s->trace_jb_fd != TRACE_JB_INVALID_FD) #endif #ifndef PJMEDIA_STREAM_SIZE # define PJMEDIA_STREAM_SIZE 1000 #endif #ifndef PJMEDIA_STREAM_INC # define PJMEDIA_STREAM_INC 1000 #endif /* Number of DTMF E bit transmissions */ #define DTMF_EBIT_RETRANSMIT_CNT 3 /* Number of send error before repeat the report. */ #define SEND_ERR_COUNT_TO_REPORT 50 /** * Media channel. */ struct pjmedia_channel { pjmedia_stream *stream; /**< Parent stream. */ pjmedia_dir dir; /**< Channel direction. */ unsigned pt; /**< Payload type. */ pj_bool_t paused; /**< Paused?. */ unsigned out_pkt_size; /**< Size of output buffer. */ void *out_pkt; /**< Output buffer. */ pjmedia_rtp_session rtp; /**< RTP session. */ }; struct dtmf { int event; pj_uint32_t duration; int ebit_cnt; /**< # of E bit transmissions */ }; /** * This structure describes media stream. * A media stream is bidirectional media transmission between two endpoints. * It consists of two channels, i.e. encoding and decoding channels. * A media stream corresponds to a single "m=" line in a SDP session * description. */ struct pjmedia_stream { pjmedia_endpt *endpt; /**< Media endpoint. */ pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */ pjmedia_stream_info si; /**< Creation parameter. */ pjmedia_port port; /**< Port interface. */ pjmedia_channel *enc; /**< Encoding channel. */ pjmedia_channel *dec; /**< Decoding channel. */ pj_pool_t *own_pool; /**< Only created if not given */ pjmedia_dir dir; /**< Stream direction. */ void *user_data; /**< User data. */ pj_str_t cname; /**< SDES CNAME */ pjmedia_transport *transport; /**< Stream transport. */ pjmedia_codec *codec; /**< Codec instance being used. */ pjmedia_codec_param codec_param; /**< Codec param. */ pj_int16_t *enc_buf; /**< Encoding buffer, when enc's ptime is different than dec. Otherwise it's NULL. */ unsigned enc_samples_per_pkt; unsigned enc_buf_size; /**< Encoding buffer size, in samples. */ unsigned enc_buf_pos; /**< First position in buf. */ unsigned enc_buf_count; /**< Number of samples in the encoding buffer. */ pj_int16_t *dec_buf; /**< Decoding buffer. */ unsigned dec_buf_size; /**< Decoding buffer size, in samples. */ unsigned dec_buf_pos; /**< First position in buf. */ unsigned dec_buf_count; /**< Number of samples in the decoding buffer. */ pj_uint16_t dec_ptime; /**< Decoder frame ptime in ms. */ pj_bool_t detect_ptime_change; /**< Detect decode ptime change */ unsigned plc_cnt; /**< # of consecutive PLC frames*/ unsigned max_plc_cnt; /**< Max # of PLC frames */ unsigned vad_enabled; /**< VAD enabled in param. */ unsigned frame_size; /**< Size of encoded base frame.*/ pj_bool_t is_streaming; /**< Currently streaming?. This is used to put RTP marker bit. */ pj_uint32_t ts_vad_disabled;/**< TS when VAD was disabled. */ pj_uint32_t tx_duration; /**< TX duration in timestamp. */ pj_mutex_t *jb_mutex; pjmedia_jbuf *jb; /**< Jitter buffer. */ char jb_last_frm; /**< Last frame type from jb */ unsigned jb_last_frm_cnt;/**< Last JB frame type counter*/ pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */ pj_uint32_t rtcp_last_tx; /**< RTCP tx time in timestamp */ pj_uint32_t rtcp_interval; /**< Interval, in timestamp. */ pj_bool_t initial_rr; /**< Initial RTCP RR sent */ pj_bool_t rtcp_sdes_bye_disabled;/**< Send RTCP SDES/BYE?*/ void *out_rtcp_pkt; /**< Outgoing RTCP packet. */ unsigned out_rtcp_pkt_size; /**< Outgoing RTCP packet size. */ /* RFC 2833 DTMF transmission queue: */ unsigned dtmf_duration; /**< DTMF duration(in timestamp)*/ int tx_event_pt; /**< Outgoing pt for dtmf. */ int tx_dtmf_count; /**< # of digits in tx dtmf buf.*/ struct dtmf tx_dtmf_buf[32];/**< Outgoing dtmf queue. */ /* Incoming DTMF: */ int rx_event_pt; /**< Incoming pt for dtmf. */ int last_dtmf; /**< Current digit, or -1. */ pj_uint32_t last_dtmf_dur; /**< Start ts for cur digit. */ unsigned rx_dtmf_count; /**< # of digits in dtmf rx buf.*/ char rx_dtmf_buf[32];/**< Incoming DTMF buffer. */ /* DTMF callback */ void (*dtmf_cb)(pjmedia_stream*, void*, int); void *dtmf_cb_user_data; #if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) /* Enable support to handle codecs with inconsistent clock rate * between clock rate in SDP/RTP & the clock rate that is actually used. * This happens for example with G.722 and MPEG audio codecs. */ pj_bool_t has_g722_mpeg_bug; /**< Flag to specify whether normalization process is needed */ unsigned rtp_tx_ts_len_per_pkt; /**< Normalized ts length per packet transmitted according to 'erroneous' definition */ unsigned rtp_rx_ts_len_per_frame; /**< Normalized ts length per frame received according to 'erroneous' definition */ unsigned rtp_rx_last_cnt;/**< Nb of frames in last pkt */ unsigned rtp_rx_check_cnt; /**< Counter of remote timestamp checking */ #endif #if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) pj_uint32_t rtcp_xr_last_tx; /**< RTCP XR tx time in timestamp. */ pj_uint32_t rtcp_xr_interval; /**< Interval, in timestamp. */ pj_sockaddr rtcp_xr_dest; /**< Additional remote RTCP XR dest. If sin_family is zero, it will be ignored*/ unsigned rtcp_xr_dest_len; /**< Length of RTCP XR dest address */ #endif #if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 pj_bool_t use_ka; /**< Stream keep-alive with non- codec-VAD mechanism is enabled? */ pj_timestamp last_frm_ts_sent; /**< Timestamp of last sending packet */ #endif pj_sockaddr rem_rtp_addr; /**< Remote RTP address */ unsigned rem_rtp_flag; /**< Indicator flag about packet from this addr. 0=no pkt, 1=good ssrc, 2=bad ssrc pkts */ unsigned rtp_src_cnt; /**< How many pkt from this addr. */ #if TRACE_JB pj_oshandle_t trace_jb_fd; /**< Jitter tracing file handle.*/ char *trace_jb_buf; /**< Jitter tracing buffer. */ #endif pj_uint32_t rtp_rx_last_ts; /**< Last received RTP timestamp */ pj_uint32_t rtp_tx_err_cnt; /**< The number of RTP send() error */ pj_uint32_t rtcp_tx_err_cnt; /**< The number of RTCP send() error */ /* RTCP Feedback */ pj_bool_t send_rtcp_fb_nack; /**< Send NACK? */ pjmedia_rtcp_fb_nack rtcp_fb_nack; /**< TX NACK state. */ int rtcp_fb_nack_cap_idx; /**< RX NACK cap idx. */ }; /* RFC 2833 digit */ static const char digitmap[17] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '*', '#', 'A', 'B', 'C', 'D', 'R'}; /* Zero audio frame samples */ static pj_int16_t zero_frame[2 * 30 * 16000 / 1000]; static void on_rx_rtcp( void *data, void *pkt, pj_ssize_t bytes_read); static pj_status_t send_rtcp(pjmedia_stream *stream, pj_bool_t with_sdes, pj_bool_t with_bye, pj_bool_t with_xr, pj_bool_t with_fb); #if TRACE_JB PJ_INLINE(int) trace_jb_print_timestamp(char **buf, pj_ssize_t len) { pj_time_val now; pj_parsed_time ptime; char *p = *buf; if (len < 14) return -1; pj_gettimeofday(&now); pj_time_decode(&now, &ptime); p += pj_utoa_pad(ptime.hour, p, 2, '0'); *p++ = ':'; p += pj_utoa_pad(ptime.min, p, 2, '0'); *p++ = ':'; p += pj_utoa_pad(ptime.sec, p, 2, '0'); *p++ = '.'; p += pj_utoa_pad(ptime.msec, p, 3, '0'); *p++ = ','; *buf = p; return 0; } PJ_INLINE(int) trace_jb_print_state(pjmedia_stream *stream, char **buf, pj_ssize_t len) { char *p = *buf; char *endp = *buf + len; pjmedia_jb_state state; pjmedia_jbuf_get_state(stream->jb, &state); len = pj_ansi_snprintf(p, endp-p, "%d, %d, %d", state.size, state.burst, state.prefetch); if ((len < 0) || (len >= endp-p)) return -1; p += len; *buf = p; return 0; } static void trace_jb_get(pjmedia_stream *stream, pjmedia_jb_frame_type ft, pj_size_t fsize) { char *p = stream->trace_jb_buf; char *endp = stream->trace_jb_buf + PJ_LOG_MAX_SIZE; pj_ssize_t len = 0; const char* ft_st; if (!TRACE_JB_OPENED(stream)) return; /* Print timestamp. */ if (trace_jb_print_timestamp(&p, endp-p)) goto on_insuff_buffer; /* Print frame type and size */ switch(ft) { case PJMEDIA_JB_MISSING_FRAME: ft_st = "missing"; break; case PJMEDIA_JB_NORMAL_FRAME: ft_st = "normal"; break; case PJMEDIA_JB_ZERO_PREFETCH_FRAME: ft_st = "prefetch"; break; case PJMEDIA_JB_ZERO_EMPTY_FRAME: ft_st = "empty"; break; default: ft_st = "unknown"; break; } /* Print operation, size, frame count, frame type */ len = pj_ansi_snprintf(p, endp-p, "GET,%d,1,%s,,,,", fsize, ft_st); if ((len < 0) || (len >= endp-p)) goto on_insuff_buffer; p += len; /* Print JB state */ if (trace_jb_print_state(stream, &p, endp-p)) goto on_insuff_buffer; /* Print end of line */ if (endp-p < 2) goto on_insuff_buffer; *p++ = '\n'; /* Write and flush */ len = p - stream->trace_jb_buf; pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len); pj_file_flush(stream->trace_jb_fd); return; on_insuff_buffer: pj_assert(!"Trace buffer too small, check PJ_LOG_MAX_SIZE!"); } static void trace_jb_put(pjmedia_stream *stream, const pjmedia_rtp_hdr *hdr, unsigned payloadlen, unsigned frame_cnt) { char *p = stream->trace_jb_buf; char *endp = stream->trace_jb_buf + PJ_LOG_MAX_SIZE; pj_ssize_t len = 0; if (!TRACE_JB_OPENED(stream)) return; /* Print timestamp. */ if (trace_jb_print_timestamp(&p, endp-p)) goto on_insuff_buffer; /* Print operation, size, frame count, RTP info */ len = pj_ansi_snprintf(p, endp-p, "PUT,%d,%d,,%d,%d,%d,", payloadlen, frame_cnt, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts), hdr->m); if ((len < 0) || (len >= endp-p)) goto on_insuff_buffer; p += len; /* Print JB state */ if (trace_jb_print_state(stream, &p, endp-p)) goto on_insuff_buffer; /* Print end of line */ if (endp-p < 2) goto on_insuff_buffer; *p++ = '\n'; /* Write and flush */ len = p - stream->trace_jb_buf; pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len); pj_file_flush(stream->trace_jb_fd); return; on_insuff_buffer: pj_assert(!"Trace buffer too small, check PJ_LOG_MAX_SIZE!"); } #endif /* TRACE_JB */ #if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0 /* * Send keep-alive packet using non-codec frame. */ static void send_keep_alive_packet(pjmedia_stream *stream) { #if PJMEDIA_STREAM_ENABLE_KA == PJMEDIA_STREAM_KA_EMPTY_RTP /* Keep-alive packet is empty RTP */ pj_status_t status; void *pkt; int pkt_len; TRC_((stream->port.info.name.ptr, "Sending keep-alive (RTCP and empty RTP)")); /* Send RTP */ status = pjmedia_rtp_encode_rtp( &stream->enc->rtp, stream->enc->pt, 0, 1, 0, (const void**)&pkt, &pkt_len); pj_assert(status == PJ_SUCCESS); pj_memcpy(stream->enc->out_pkt, pkt, pkt_len); pjmedia_transport_send_rtp(stream->transport, stream->enc->out_pkt, pkt_len); /* Send RTCP */ send_rtcp(stream, PJ_TRUE, PJ_FALSE, PJ_FALSE, PJ_FALSE); /* Update stats in case the stream is paused */ stream->rtcp.stat.rtp_tx_last_seq = pj_ntohs(stream->enc->rtp.out_hdr.seq); #elif PJMEDIA_STREAM_ENABLE_KA == PJMEDIA_STREAM_KA_USER /* Keep-alive packet is defined in PJMEDIA_STREAM_KA_USER_PKT */ int pkt_len; const pj_str_t str_ka = PJMEDIA_STREAM_KA_USER_PKT; TRC_((stream->port.info.name.ptr, "Sending keep-alive (custom RTP/RTCP packets)")); /* Send to RTP port */ pj_memcpy(stream->enc->out_pkt, str_ka.ptr, str_ka.slen); pkt_len = str_ka.slen; pjmedia_transport_send_rtp(stream->transport, stream->enc->out_pkt, pkt_len); /* Send to RTCP port */ pjmedia_transport_send_rtcp(stream->transport, stream->enc->out_pkt, pkt_len); #else PJ_UNUSED_ARG(stream); #endif } #endif /* defined(PJMEDIA_STREAM_ENABLE_KA) */ /* * play_callback() * * This callback is called by sound device's player thread when it * needs to feed the player with some frames. */ static pj_status_t get_frame( pjmedia_port *port, pjmedia_frame *frame) { pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; pjmedia_channel *channel = stream->dec; unsigned samples_count, samples_per_frame, samples_required; pj_int16_t *p_out_samp; pj_status_t status; /* Return no frame is channel is paused */ if (channel->paused) { frame->type = PJMEDIA_FRAME_TYPE_NONE; return PJ_SUCCESS; } /* Repeat get frame from the jitter buffer and decode the frame * until we have enough frames according to codec's ptime. */ /* Lock jitter buffer mutex first */ pj_mutex_lock( stream->jb_mutex ); samples_required = PJMEDIA_PIA_SPF(&stream->port.info); samples_per_frame = stream->dec_ptime * stream->codec_param.info.clock_rate * stream->codec_param.info.channel_cnt / 1000; p_out_samp = (pj_int16_t*) frame->buf; for (samples_count=0; samples_count < samples_required;) { char frame_type; pj_size_t frame_size = channel->out_pkt_size; pj_uint32_t bit_info; if (stream->dec_buf && stream->dec_buf_pos < stream->dec_buf_count) { unsigned nsamples_req = samples_required - samples_count; unsigned nsamples_avail = stream->dec_buf_count - stream->dec_buf_pos; unsigned nsamples_copy = PJ_MIN(nsamples_req, nsamples_avail); pjmedia_copy_samples(p_out_samp + samples_count, stream->dec_buf + stream->dec_buf_pos, nsamples_copy); samples_count += nsamples_copy; stream->dec_buf_pos += nsamples_copy; continue; } /* Get frame from jitter buffer. */ pjmedia_jbuf_get_frame2(stream->jb, channel->out_pkt, &frame_size, &frame_type, &bit_info); #if TRACE_JB trace_jb_get(stream, frame_type, frame_size); #endif if (frame_type == PJMEDIA_JB_MISSING_FRAME) { /* Activate PLC */ if (stream->codec->op->recover && stream->codec_param.setting.plc && stream->plc_cnt < stream->max_plc_cnt) { pjmedia_frame frame_out; frame_out.buf = p_out_samp + samples_count; frame_out.size = frame->size - samples_count*2; status = pjmedia_codec_recover(stream->codec, (unsigned)frame_out.size, &frame_out); ++stream->plc_cnt; } else { status = -1; } if (status != PJ_SUCCESS) { /* Either PLC failed or PLC not supported/enabled */ pjmedia_zero_samples(p_out_samp + samples_count, samples_required - samples_count); } if (frame_type != stream->jb_last_frm) { /* Report changing frame type event */ PJ_LOG(5,(stream->port.info.name.ptr, "Frame lost%s!", (status == PJ_SUCCESS? ", recovered":""))); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } samples_count += samples_per_frame; } else if (frame_type == PJMEDIA_JB_ZERO_EMPTY_FRAME) { const char *with_plc = ""; /* Jitter buffer is empty. If this is the first "empty" state, * activate PLC to smoothen the fade-out, otherwise zero * the frame. */ //Using this "if" will only invoke PLC for the first packet //lost and not the subsequent ones. //if (frame_type != stream->jb_last_frm) { if (1) { /* Activate PLC to smoothen the missing frame */ if (stream->codec->op->recover && stream->codec_param.setting.plc && stream->plc_cnt < stream->max_plc_cnt) { pjmedia_frame frame_out; do { frame_out.buf = p_out_samp + samples_count; frame_out.size = frame->size - samples_count*2; status = pjmedia_codec_recover(stream->codec, (unsigned)frame_out.size, &frame_out); if (status != PJ_SUCCESS) break; samples_count += samples_per_frame; ++stream->plc_cnt; } while (samples_count < samples_required && stream->plc_cnt < stream->max_plc_cnt); with_plc = ", plc invoked"; } } if (samples_count < samples_required) { pjmedia_zero_samples(p_out_samp + samples_count, samples_required - samples_count); samples_count = samples_required; } if (stream->jb_last_frm != frame_type) { pjmedia_jb_state jb_state; /* Report changing frame type event */ pjmedia_jbuf_get_state(stream->jb, &jb_state); PJ_LOG(5,(stream->port.info.name.ptr, "Jitter buffer empty (prefetch=%d)%s", jb_state.prefetch, with_plc)); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } break; } else if (frame_type != PJMEDIA_JB_NORMAL_FRAME) { const char *with_plc = ""; /* It can only be PJMEDIA_JB_ZERO_PREFETCH frame */ pj_assert(frame_type == PJMEDIA_JB_ZERO_PREFETCH_FRAME); /* Always activate PLC when it's available.. */ if (stream->codec->op->recover && stream->codec_param.setting.plc && stream->plc_cnt < stream->max_plc_cnt) { pjmedia_frame frame_out; do { frame_out.buf = p_out_samp + samples_count; frame_out.size = frame->size - samples_count*2; status = pjmedia_codec_recover(stream->codec, (unsigned)frame_out.size, &frame_out); if (status != PJ_SUCCESS) break; samples_count += samples_per_frame; ++stream->plc_cnt; } while (samples_count < samples_required && stream->plc_cnt < stream->max_plc_cnt); with_plc = ", plc invoked"; } if (samples_count < samples_required) { pjmedia_zero_samples(p_out_samp + samples_count, samples_required - samples_count); samples_count = samples_required; } if (stream->jb_last_frm != frame_type) { pjmedia_jb_state jb_state; /* Report changing frame type event */ pjmedia_jbuf_get_state(stream->jb, &jb_state); PJ_LOG(5,(stream->port.info.name.ptr, "Jitter buffer is bufferring (prefetch=%d)%s", jb_state.prefetch, with_plc)); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } break; } else { /* Got "NORMAL" frame from jitter buffer */ pjmedia_frame frame_in, frame_out; pj_bool_t use_dec_buf = PJ_FALSE; stream->plc_cnt = 0; /* Decode */ frame_in.buf = channel->out_pkt; frame_in.size = frame_size; frame_in.bit_info = bit_info; frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO; /* ignored */ frame_out.buf = p_out_samp + samples_count; frame_out.size = frame->size - samples_count*BYTES_PER_SAMPLE; if (stream->dec_buf && bit_info * sizeof(pj_int16_t) > frame_out.size) { stream->dec_buf_pos = 0; stream->dec_buf_count = bit_info; use_dec_buf = PJ_TRUE; frame_out.buf = stream->dec_buf; frame_out.size = stream->dec_buf_size; } status = pjmedia_codec_decode( stream->codec, &frame_in, (unsigned)frame_out.size, &frame_out); if (status != 0) { LOGERR_((port->info.name.ptr, status, "codec decode() error")); if (use_dec_buf) { pjmedia_zero_samples(p_out_samp + samples_count, samples_per_frame); } else { pjmedia_zero_samples(stream->dec_buf, stream->dec_buf_count); } } else if (use_dec_buf) { stream->dec_buf_count = frame_out.size / sizeof(pj_int16_t); } if (stream->jb_last_frm != frame_type) { /* Report changing frame type event */ PJ_LOG(5,(stream->port.info.name.ptr, "Jitter buffer starts returning normal frames " "(after %d empty/lost)", stream->jb_last_frm_cnt, stream->jb_last_frm)); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } if (!use_dec_buf) samples_count += samples_per_frame; } } /* Unlock jitter buffer mutex. */ pj_mutex_unlock( stream->jb_mutex ); /* Return PJMEDIA_FRAME_TYPE_NONE if we have no frames at all * (it can happen when jitter buffer returns PJMEDIA_JB_ZERO_EMPTY_FRAME). */ if (samples_count == 0) { frame->type = PJMEDIA_FRAME_TYPE_NONE; frame->size = 0; } else { frame->type = PJMEDIA_FRAME_TYPE_AUDIO; frame->size = samples_count * BYTES_PER_SAMPLE; frame->timestamp.u64 = 0; } return PJ_SUCCESS; } /* The other version of get_frame callback used when stream port format * is non linear PCM. */ static pj_status_t get_frame_ext( pjmedia_port *port, pjmedia_frame *frame) { pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; pjmedia_channel *channel = stream->dec; pjmedia_frame_ext *f = (pjmedia_frame_ext*)frame; unsigned samples_per_frame, samples_required; pj_status_t status; /* Return no frame if channel is paused */ if (channel->paused) { frame->type = PJMEDIA_FRAME_TYPE_NONE; return PJ_SUCCESS; } /* Repeat get frame from the jitter buffer and decode the frame * until we have enough frames according to codec's ptime. */ samples_required = PJMEDIA_PIA_SPF(&stream->port.info); samples_per_frame = stream->codec_param.info.frm_ptime * stream->codec_param.info.clock_rate * stream->codec_param.info.channel_cnt / 1000; pj_bzero(f, sizeof(pjmedia_frame_ext)); f->base.type = PJMEDIA_FRAME_TYPE_EXTENDED; while (f->samples_cnt < samples_required) { char frame_type; pj_size_t frame_size = channel->out_pkt_size; pj_uint32_t bit_info; /* Lock jitter buffer mutex first */ pj_mutex_lock( stream->jb_mutex ); /* Get frame from jitter buffer. */ pjmedia_jbuf_get_frame2(stream->jb, channel->out_pkt, &frame_size, &frame_type, &bit_info); #if TRACE_JB trace_jb_get(stream, frame_type, frame_size); #endif /* Unlock jitter buffer mutex. */ pj_mutex_unlock( stream->jb_mutex ); if (frame_type == PJMEDIA_JB_NORMAL_FRAME) { /* Got "NORMAL" frame from jitter buffer */ pjmedia_frame frame_in; /* Decode */ frame_in.buf = channel->out_pkt; frame_in.size = frame_size; frame_in.bit_info = bit_info; frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO; status = pjmedia_codec_decode( stream->codec, &frame_in, 0, frame); if (status != PJ_SUCCESS) { LOGERR_((port->info.name.ptr, status, "codec decode() error")); pjmedia_frame_ext_append_subframe(f, NULL, 0, (pj_uint16_t)samples_per_frame); } if (stream->jb_last_frm != frame_type) { /* Report changing frame type event */ PJ_LOG(5,(stream->port.info.name.ptr, "Jitter buffer starts returning normal frames " "(after %d empty/lost)", stream->jb_last_frm_cnt, stream->jb_last_frm)); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } } else { /* Try to generate frame by invoking PLC (when any) */ status = PJ_SUCCESS; if (stream->codec->op->recover) { status = pjmedia_codec_recover(stream->codec, 0, frame); } /* No PLC or PLC failed */ if (!stream->codec->op->recover || status != PJ_SUCCESS) { pjmedia_frame_ext_append_subframe(f, NULL, 0, (pj_uint16_t)samples_per_frame); } if (frame_type == PJMEDIA_JB_MISSING_FRAME) { if (frame_type != stream->jb_last_frm) { /* Report changing frame type event */ PJ_LOG(5,(stream->port.info.name.ptr, "Frame lost!")); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } } else if (frame_type == PJMEDIA_JB_ZERO_EMPTY_FRAME) { if (frame_type != stream->jb_last_frm) { pjmedia_jb_state jb_state; /* Report changing frame type event */ pjmedia_jbuf_get_state(stream->jb, &jb_state); PJ_LOG(5,(stream->port.info.name.ptr, "Jitter buffer empty (prefetch=%d)", jb_state.prefetch)); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } } else { /* It can only be PJMEDIA_JB_ZERO_PREFETCH frame */ pj_assert(frame_type == PJMEDIA_JB_ZERO_PREFETCH_FRAME); if (stream->jb_last_frm != frame_type) { pjmedia_jb_state jb_state; /* Report changing frame type event */ pjmedia_jbuf_get_state(stream->jb, &jb_state); PJ_LOG(5,(stream->port.info.name.ptr, "Jitter buffer is bufferring (prefetch=%d)", jb_state.prefetch)); stream->jb_last_frm = frame_type; stream->jb_last_frm_cnt = 1; } else { stream->jb_last_frm_cnt++; } } } } return PJ_SUCCESS; } /* * Transmit DTMF */ static void create_dtmf_payload(pjmedia_stream *stream, struct pjmedia_frame *frame_out, int forced_last, int *first, int *last) { pjmedia_rtp_dtmf_event *event; struct dtmf *digit = &stream->tx_dtmf_buf[0]; pj_assert(sizeof(pjmedia_rtp_dtmf_event) == 4); *first = *last = 0; event = (pjmedia_rtp_dtmf_event*) frame_out->buf; if (digit->duration == 0) { PJ_LOG(5,(stream->port.info.name.ptr, "Sending DTMF digit id %c", digitmap[digit->event])); *first = 1; } digit->duration += stream->rtp_tx_ts_len_per_pkt; if (digit->duration >= stream->dtmf_duration) digit->duration = stream->dtmf_duration; event->event = (pj_uint8_t)digit->event; event->e_vol = 10; event->duration = pj_htons((pj_uint16_t)digit->duration); if (forced_last) { digit->duration = stream->dtmf_duration; } if (digit->duration >= stream->dtmf_duration) { event->e_vol |= 0x80; if (++digit->ebit_cnt >= DTMF_EBIT_RETRANSMIT_CNT) { *last = 1; /* Prepare next digit. */ pj_mutex_lock(stream->jb_mutex); pj_array_erase(stream->tx_dtmf_buf, sizeof(stream->tx_dtmf_buf[0]), stream->tx_dtmf_count, 0); --stream->tx_dtmf_count; pj_mutex_unlock(stream->jb_mutex); } } frame_out->size = 4; } static pj_status_t build_rtcp_fb(pjmedia_stream *stream, void *buf, pj_size_t *length) { pj_status_t status; /* Generic NACK */ if (stream->send_rtcp_fb_nack && stream->rtcp_fb_nack.pid >= 0) { status = pjmedia_rtcp_fb_build_nack(&stream->rtcp, buf, length, 1, &stream->rtcp_fb_nack); if (status != PJ_SUCCESS) return status; /* Reset Packet ID */ stream->rtcp_fb_nack.pid = -1; } return PJ_SUCCESS; } /** * Publish transport error event. */ static void publish_tp_event(pjmedia_event_type event_type, pj_status_t status, pj_bool_t is_rtp, pjmedia_dir dir, pjmedia_stream *stream) { pjmedia_event ev; pj_timestamp ts_now; pj_get_timestamp(&ts_now); pj_bzero(&ev.data.med_tp_err, sizeof(ev.data.med_tp_err)); /* Publish event. */ pjmedia_event_init(&ev, event_type, &ts_now, stream); ev.data.med_tp_err.type = PJMEDIA_TYPE_AUDIO; ev.data.med_tp_err.is_rtp = is_rtp; ev.data.med_tp_err.dir = dir; ev.data.med_tp_err.status = status; pjmedia_event_publish(NULL, stream, &ev, 0); } static pj_status_t send_rtcp(pjmedia_stream *stream, pj_bool_t with_sdes, pj_bool_t with_bye, pj_bool_t with_xr, pj_bool_t with_fb) { void *sr_rr_pkt; pj_uint8_t *pkt; int len, max_len; pj_status_t status; /* Build RTCP RR/SR packet */ pjmedia_rtcp_build_rtcp(&stream->rtcp, &sr_rr_pkt, &len); #if !defined(PJMEDIA_HAS_RTCP_XR) || (PJMEDIA_HAS_RTCP_XR == 0) with_xr = PJ_FALSE; #endif if (with_sdes || with_bye || with_xr || with_fb) { pkt = (pj_uint8_t*) stream->out_rtcp_pkt; pj_memcpy(pkt, sr_rr_pkt, len); max_len = stream->out_rtcp_pkt_size; } else { pkt = (pj_uint8_t*)sr_rr_pkt; max_len = len; } /* RTCP FB must be sent in compound (i.e: with RR/SR and SDES) */ if (with_fb) with_sdes = PJ_TRUE; /* Build RTCP SDES packet */ if (with_sdes) { pjmedia_rtcp_sdes sdes; pj_size_t sdes_len; pj_bzero(&sdes, sizeof(sdes)); sdes.cname = stream->cname; sdes_len = max_len - len; status = pjmedia_rtcp_build_rtcp_sdes(&stream->rtcp, pkt+len, &sdes_len, &sdes); if (status != PJ_SUCCESS) { PJ_PERROR(4,(stream->port.info.name.ptr, status, "Error generating RTCP SDES")); } else { len += (int)sdes_len; } } if (with_fb) { pj_size_t fb_len = max_len - len; status = build_rtcp_fb(stream, pkt+len, &fb_len); if (status != PJ_SUCCESS) { PJ_PERROR(4,(stream->port.info.name.ptr, status, "Error generating RTCP FB")); } else { len += (int)fb_len; } } /* Build RTCP XR packet */ #if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) if (with_xr) { int i; pjmedia_jb_state jb_state; void *xr_pkt; int xr_len; /* Update RTCP XR with current JB states */ pjmedia_jbuf_get_state(stream->jb, &jb_state); i = jb_state.avg_delay; status = pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, PJMEDIA_RTCP_XR_INFO_JB_NOM, i); pj_assert(status == PJ_SUCCESS); i = jb_state.max_delay; status = pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, PJMEDIA_RTCP_XR_INFO_JB_MAX, i); pj_assert(status == PJ_SUCCESS); pjmedia_rtcp_build_rtcp_xr(&stream->rtcp.xr_session, 0, &xr_pkt, &xr_len); if (xr_len + len <= max_len) { pj_memcpy(pkt+len, xr_pkt, xr_len); len += xr_len; /* Send the RTCP XR to third-party destination if specified */ if (stream->rtcp_xr_dest_len) { pjmedia_transport_send_rtcp2(stream->transport, &stream->rtcp_xr_dest, stream->rtcp_xr_dest_len, xr_pkt, xr_len); } } else { PJ_PERROR(4,(stream->port.info.name.ptr, PJ_ETOOBIG, "Error generating RTCP-XR")); } } #endif /* Build RTCP BYE packet */ if (with_bye) { pj_size_t bye_len; bye_len = max_len - len; status = pjmedia_rtcp_build_rtcp_bye(&stream->rtcp, pkt+len, &bye_len, NULL); if (status != PJ_SUCCESS) { PJ_PERROR(4,(stream->port.info.name.ptr, status, "Error generating RTCP BYE")); } else { len += (int)bye_len; } } /* Send! */ status = pjmedia_transport_send_rtcp(stream->transport, pkt, len); if (status != PJ_SUCCESS) { if (stream->rtcp_tx_err_cnt++ == 0) { LOGERR_((stream->port.info.name.ptr, status, "Error sending RTCP")); } if (stream->rtcp_tx_err_cnt > SEND_ERR_COUNT_TO_REPORT) { stream->rtcp_tx_err_cnt = 0; } } return status; } /** * check_tx_rtcp() * * This function is can be called by either put_frame() or get_frame(), * to transmit periodic RTCP SR/RR report. */ static void check_tx_rtcp(pjmedia_stream *stream, pj_uint32_t timestamp) { /* Note that timestamp may represent local or remote timestamp, * depending on whether this function is called from put_frame() * or get_frame(). */ if (stream->rtcp_last_tx == 0) { stream->rtcp_last_tx = timestamp; } else if (timestamp - stream->rtcp_last_tx >= stream->rtcp_interval) { pj_bool_t with_xr = PJ_FALSE; pj_status_t status; #if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) if (stream->rtcp.xr_enabled) { if (stream->rtcp_xr_last_tx == 0) { stream->rtcp_xr_last_tx = timestamp; } else if (timestamp - stream->rtcp_xr_last_tx >= stream->rtcp_xr_interval) { with_xr = PJ_TRUE; /* Update last tx RTCP XR */ stream->rtcp_xr_last_tx = timestamp; } } #endif status = send_rtcp(stream, !stream->rtcp_sdes_bye_disabled, PJ_FALSE, with_xr, PJ_FALSE); if (status == PJ_SUCCESS) { stream->rtcp_last_tx = timestamp; } } } /** * Rebuffer the frame when encoder and decoder has different ptime * (such as when different iLBC modes are used by local and remote) */ static void rebuffer(pjmedia_stream *stream, pjmedia_frame *frame) { /* How many samples are needed */ unsigned count; /* Normalize frame */ if (frame->type != PJMEDIA_FRAME_TYPE_AUDIO) frame->size = 0; /* Remove used frame from the buffer. */ if (stream->enc_buf_pos) { if (stream->enc_buf_count) { pj_memmove(stream->enc_buf, stream->enc_buf + stream->enc_buf_pos, (stream->enc_buf_count << 1)); } stream->enc_buf_pos = 0; } /* Make sure we have space to store the new frame */ pj_assert(stream->enc_buf_count + (frame->size >> 1) < stream->enc_buf_size); /* Append new frame to the buffer */ if (frame->size) { /* Handle case when there is no port transmitting to this port */ if (frame->buf) { pj_memcpy(stream->enc_buf + stream->enc_buf_count, frame->buf, frame->size); } else { pj_bzero(stream->enc_buf + stream->enc_buf_count, frame->size); } stream->enc_buf_count += ((unsigned)frame->size >> 1); } /* How many samples are needed */ count = stream->codec_param.info.enc_ptime * PJMEDIA_PIA_SRATE(&stream->port.info) / 1000; /* See if we have enough samples */ if (stream->enc_buf_count >= count) { frame->type = PJMEDIA_FRAME_TYPE_AUDIO; frame->buf = stream->enc_buf; frame->size = (count << 1); stream->enc_buf_pos = count; stream->enc_buf_count -= count; } else { /* We don't have enough samples */ frame->type = PJMEDIA_FRAME_TYPE_NONE; } } /** * put_frame_imp() */ static pj_status_t put_frame_imp( pjmedia_port *port, pjmedia_frame *frame ) { pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; pjmedia_channel *channel = stream->enc; pj_status_t status = 0; pjmedia_frame frame_out; unsigned ts_len, rtp_ts_len; void *rtphdr; int rtphdrlen; int inc_timestamp = 0; #if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0 /* If the interval since last sending packet is greater than * PJMEDIA_STREAM_KA_INTERVAL, send keep-alive packet. */ if (stream->use_ka) { pj_uint32_t dtx_duration; dtx_duration = pj_timestamp_diff32(&stream->last_frm_ts_sent, &frame->timestamp); if (dtx_duration > PJMEDIA_STREAM_KA_INTERVAL * PJMEDIA_PIA_SRATE(&stream->port.info)) { send_keep_alive_packet(stream); stream->last_frm_ts_sent = frame->timestamp; } } #endif /* Don't do anything if stream is paused */ if (channel->paused) { stream->enc_buf_pos = stream->enc_buf_count = 0; return PJ_SUCCESS; } /* Number of samples in the frame */ if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO) ts_len = ((unsigned)frame->size >> 1) / stream->codec_param.info.channel_cnt; else if (frame->type == PJMEDIA_FRAME_TYPE_EXTENDED) ts_len = PJMEDIA_PIA_SPF(&stream->port.info) / PJMEDIA_PIA_CCNT(&stream->port.info); else ts_len = 0; /* Increment transmit duration */ stream->tx_duration += ts_len; #if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) /* Handle special case for audio codec with RTP timestamp inconsistence * e.g: G722, MPEG audio. */ if (stream->has_g722_mpeg_bug) rtp_ts_len = stream->rtp_tx_ts_len_per_pkt; else rtp_ts_len = ts_len; #else rtp_ts_len = ts_len; #endif /* Init frame_out buffer. */ frame_out.buf = ((char*)channel->out_pkt) + sizeof(pjmedia_rtp_hdr); frame_out.size = 0; /* If we have DTMF digits in the queue, transmit the digits. * Otherwise encode the PCM buffer. */ if (stream->tx_dtmf_count) { int first=0, last=0; create_dtmf_payload(stream, &frame_out, 0, &first, &last); /* Encapsulate into RTP packet. Note that: * - RTP marker should be set on the beginning of a new event * - RTP timestamp is constant for the same packet. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, stream->tx_event_pt, first, (int)frame_out.size, (first ? rtp_ts_len : 0), (const void**)&rtphdr, &rtphdrlen); if (last) { /* This is the last packet for the event. * Increment the RTP timestamp of the RTP session, for next * RTP packets. */ inc_timestamp = stream->dtmf_duration + ((DTMF_EBIT_RETRANSMIT_CNT-1) * stream->rtp_tx_ts_len_per_pkt) - rtp_ts_len; } /* * Special treatment for FRAME_TYPE_AUDIO but with frame->buf==NULL. * This happens when stream input is disconnected from the bridge. * In this case we periodically transmit RTP frame to keep NAT binding * open, by giving zero PCM frame to the codec. * * This was originally done in http://trac.pjsip.org/repos/ticket/56, * but then disabled in http://trac.pjsip.org/repos/ticket/439, but * now it's enabled again. */ } else if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO && frame->buf == NULL && stream->port.info.fmt.id == PJMEDIA_FORMAT_L16 && (stream->dir & PJMEDIA_DIR_ENCODING) && stream->enc_samples_per_pkt < PJ_ARRAY_SIZE(zero_frame)) { pjmedia_frame silence_frame; pj_bzero(&silence_frame, sizeof(silence_frame)); silence_frame.buf = zero_frame; silence_frame.size = stream->enc_samples_per_pkt * 2; silence_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; silence_frame.timestamp.u32.lo = pj_ntohl(stream->enc->rtp.out_hdr.ts); /* Encode! */ status = pjmedia_codec_encode( stream->codec, &silence_frame, channel->out_pkt_size - sizeof(pjmedia_rtp_hdr), &frame_out); if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, status, "Codec encode() error")); return status; } /* Encapsulate. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, channel->pt, 0, (int)frame_out.size, rtp_ts_len, (const void**)&rtphdr, &rtphdrlen); /* Encode audio frame */ } else if ((frame->type == PJMEDIA_FRAME_TYPE_AUDIO && frame->buf != NULL) || (frame->type == PJMEDIA_FRAME_TYPE_EXTENDED)) { /* Encode! */ status = pjmedia_codec_encode( stream->codec, frame, channel->out_pkt_size - sizeof(pjmedia_rtp_hdr), &frame_out); if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, status, "Codec encode() error")); return status; } /* Encapsulate. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, channel->pt, 0, (int)frame_out.size, rtp_ts_len, (const void**)&rtphdr, &rtphdrlen); } else { /* Just update RTP session's timestamp. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, 0, 0, 0, rtp_ts_len, (const void**)&rtphdr, &rtphdrlen); } if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, status, "RTP encode_rtp() error")); return status; } /* Check if now is the time to transmit RTCP SR/RR report. * We only do this when stream direction is not "decoding only", because * when it is, check_tx_rtcp() will be handled by get_frame(). */ if (stream->dir != PJMEDIA_DIR_DECODING) { check_tx_rtcp(stream, pj_ntohl(channel->rtp.out_hdr.ts)); } /* Do nothing if we have nothing to transmit */ if (frame_out.size == 0) { if (stream->is_streaming) { PJ_LOG(5,(stream->port.info.name.ptr,"Starting silence")); stream->is_streaming = PJ_FALSE; } return PJ_SUCCESS; } /* Copy RTP header to the beginning of packet */ pj_memcpy(channel->out_pkt, rtphdr, sizeof(pjmedia_rtp_hdr)); /* Special case for DTMF: timestamp remains constant for * the same event, and is only updated after a complete event * has been transmitted. */ if (inc_timestamp) { pjmedia_rtp_encode_rtp( &channel->rtp, stream->tx_event_pt, 0, 0, inc_timestamp, NULL, NULL); } /* Set RTP marker bit if currently not streaming */ if (stream->is_streaming == PJ_FALSE) { pjmedia_rtp_hdr *rtp = (pjmedia_rtp_hdr*) channel->out_pkt; rtp->m = 1; PJ_LOG(5,(stream->port.info.name.ptr,"Start talksprut..")); } stream->is_streaming = PJ_TRUE; /* Send the RTP packet to the transport. */ status = pjmedia_transport_send_rtp(stream->transport, channel->out_pkt, frame_out.size + sizeof(pjmedia_rtp_hdr)); if (status != PJ_SUCCESS) { if (stream->rtp_tx_err_cnt++ == 0) { LOGERR_((stream->port.info.name.ptr, status, "Error sending RTP")); } if (stream->rtp_tx_err_cnt > SEND_ERR_COUNT_TO_REPORT) { stream->rtp_tx_err_cnt = 0; } return PJ_SUCCESS; } /* Update stat */ pjmedia_rtcp_tx_rtp(&stream->rtcp, (unsigned)frame_out.size); stream->rtcp.stat.rtp_tx_last_ts = pj_ntohl(stream->enc->rtp.out_hdr.ts); stream->rtcp.stat.rtp_tx_last_seq = pj_ntohs(stream->enc->rtp.out_hdr.seq); #if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 /* Update timestamp of last sending packet. */ stream->last_frm_ts_sent = frame->timestamp; #endif return PJ_SUCCESS; } /** * put_frame() * * This callback is called by upstream component when it has PCM frame * to transmit. This function encodes the PCM frame, pack it into * RTP packet, and transmit to peer. */ static pj_status_t put_frame( pjmedia_port *port, pjmedia_frame *frame ) { pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; pjmedia_frame tmp_zero_frame; unsigned samples_per_frame; samples_per_frame = stream->enc_samples_per_pkt; /* http://www.pjsip.org/trac/ticket/56: * when input is PJMEDIA_FRAME_TYPE_NONE, feed zero PCM frame * instead so that encoder can decide whether or not to transmit * silence frame. */ if (frame->type == PJMEDIA_FRAME_TYPE_NONE) { pj_memcpy(&tmp_zero_frame, frame, sizeof(pjmedia_frame)); frame = &tmp_zero_frame; tmp_zero_frame.buf = NULL; tmp_zero_frame.size = samples_per_frame * 2; tmp_zero_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; } #if 0 // This is no longer needed because each TYPE_NONE frame will // be converted into zero frame above /* If VAD is temporarily disabled during creation, feed zero PCM frame * to the codec. */ if (stream->vad_enabled != stream->codec_param.setting.vad && stream->vad_enabled != 0 && frame->type == PJMEDIA_FRAME_TYPE_NONE && samples_per_frame <= ZERO_PCM_MAX_SIZE) { pj_memcpy(&tmp_in_frame, frame, sizeof(pjmedia_frame)); frame = &tmp_in_frame; tmp_in_frame.buf = NULL; tmp_in_frame.size = samples_per_frame * 2; tmp_in_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; } #endif /* If VAD is temporarily disabled during creation, enable it * after transmitting for VAD_SUSPEND_SEC seconds. */ if (stream->vad_enabled != stream->codec_param.setting.vad && (stream->tx_duration - stream->ts_vad_disabled) > PJMEDIA_PIA_SRATE(&stream->port.info) * PJMEDIA_STREAM_VAD_SUSPEND_MSEC / 1000) { stream->codec_param.setting.vad = stream->vad_enabled; pjmedia_codec_modify(stream->codec, &stream->codec_param); PJ_LOG(4,(stream->port.info.name.ptr,"VAD re-enabled")); } /* If encoder has different ptime than decoder, then the frame must * be passed through the encoding buffer via rebuffer() function. */ if (stream->enc_buf != NULL) { pjmedia_frame tmp_rebuffer_frame; pj_status_t status = PJ_SUCCESS; /* Copy original frame to temporary frame since we need * to modify it. */ pj_memcpy(&tmp_rebuffer_frame, frame, sizeof(pjmedia_frame)); /* Loop while we have full frame in enc_buffer */ for (;;) { pj_status_t st; /* Run rebuffer() */ rebuffer(stream, &tmp_rebuffer_frame); /* Process this frame */ st = put_frame_imp(port, &tmp_rebuffer_frame); if (st != PJ_SUCCESS) status = st; /* If we still have full frame in the buffer, re-run * rebuffer() with NULL frame. */ if (stream->enc_buf_count >= stream->enc_samples_per_pkt) { tmp_rebuffer_frame.type = PJMEDIA_FRAME_TYPE_NONE; } else { /* Otherwise break */ break; } } return status; } else { return put_frame_imp(port, frame); } } #if 0 static void dump_bin(const char *buf, unsigned len) { unsigned i; PJ_LOG(3,(THIS_FILE, "begin dump")); for (i=0; ilast_dtmf != -1 && event->event == stream->last_dtmf && pj_ntohs(event->duration) >= stream->last_dtmf_dur) { /* Yes, this is the same event. */ stream->last_dtmf_dur = pj_ntohs(event->duration); return; } /* Ignore unknown event. */ #if defined(PJMEDIA_HAS_DTMF_FLASH) && PJMEDIA_HAS_DTMF_FLASH!= 0 if (event->event > 16) { #else if (event->event > 15) { #endif PJ_LOG(5,(stream->port.info.name.ptr, "Ignored RTP pkt with bad DTMF event %d", event->event)); return; } /* New event! */ PJ_LOG(5,(stream->port.info.name.ptr, "Received DTMF digit %c, vol=%d", digitmap[event->event], (event->e_vol & 0x3F))); stream->last_dtmf = event->event; stream->last_dtmf_dur = pj_ntohs(event->duration); /* If DTMF callback is installed, call the callback, otherwise keep * the DTMF digits in the buffer. */ if (stream->dtmf_cb) { stream->dtmf_cb(stream, stream->dtmf_cb_user_data, digitmap[event->event]); } else { /* By convention, we use jitter buffer's mutex to access shared * DTMF variables. */ pj_mutex_lock(stream->jb_mutex); if (stream->rx_dtmf_count >= PJ_ARRAY_SIZE(stream->rx_dtmf_buf)) { /* DTMF digits overflow. Discard the oldest digit. */ pj_array_erase(stream->rx_dtmf_buf, sizeof(stream->rx_dtmf_buf[0]), stream->rx_dtmf_count, 0); --stream->rx_dtmf_count; } stream->rx_dtmf_buf[stream->rx_dtmf_count++] = digitmap[event->event]; pj_mutex_unlock(stream->jb_mutex); } } /* * This callback is called by stream transport on receipt of packets * in the RTP socket. */ static void on_rx_rtp( pjmedia_tp_cb_param *param) { pjmedia_stream *stream = (pjmedia_stream*) param->user_data; void *pkt = param->pkt; pj_ssize_t bytes_read = param->size; pjmedia_channel *channel = stream->dec; const pjmedia_rtp_hdr *hdr; const void *payload; unsigned payloadlen; pjmedia_rtp_status seq_st; pj_bool_t check_pt; pj_status_t status; pj_bool_t pkt_discarded = PJ_FALSE; /* Check for errors */ if (bytes_read < 0) { status = (pj_status_t)-bytes_read; if (status == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { return; } LOGERR_((stream->port.info.name.ptr, status, "Unable to receive RTP packet")); if (status == PJ_ESOCKETSTOP) { /* Publish receive error event. */ publish_tp_event(PJMEDIA_EVENT_MEDIA_TP_ERR, status, PJ_TRUE, PJMEDIA_DIR_DECODING, stream); } return; } /* Ignore keep-alive packets */ if (bytes_read < (pj_ssize_t) sizeof(pjmedia_rtp_hdr)) return; /* Update RTP and RTCP session. */ status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, (int)bytes_read, &hdr, &payload, &payloadlen); if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, status, "RTP decode error")); stream->rtcp.stat.rx.discard++; return; } /* Check if multiplexing is allowed and the payload indicates RTCP. */ if (stream->si.rtcp_mux && hdr->pt >= 64 && hdr->pt <= 95) { on_rx_rtcp(stream, pkt, bytes_read); return; } /* Ignore the packet if decoder is paused */ pj_bzero(&seq_st, sizeof(seq_st)); if (channel->paused) goto on_return; /* Update RTP session (also checks if RTP session can accept * the incoming packet. */ check_pt = (hdr->pt != stream->rx_event_pt) && PJMEDIA_STREAM_CHECK_RTP_PT; pjmedia_rtp_session_update2(&channel->rtp, hdr, &seq_st, check_pt); #if !PJMEDIA_STREAM_CHECK_RTP_PT if (!check_pt && hdr->pt != channel->rtp.out_pt && hdr->pt != stream->rx_event_pt) { seq_st.status.flag.badpt = 1; } #endif if (seq_st.status.value) { TRC_ ((stream->port.info.name.ptr, "RTP status: badpt=%d, badssrc=%d, dup=%d, " "outorder=%d, probation=%d, restart=%d", seq_st.status.flag.badpt, seq_st.status.flag.badssrc, seq_st.status.flag.dup, seq_st.status.flag.outorder, seq_st.status.flag.probation, seq_st.status.flag.restart)); if (seq_st.status.flag.badpt) { PJ_LOG(4,(stream->port.info.name.ptr, "Bad RTP pt %d (expecting %d)", hdr->pt, channel->rtp.out_pt)); } if (!stream->si.has_rem_ssrc && seq_st.status.flag.badssrc) { PJ_LOG(4,(stream->port.info.name.ptr, "Changed RTP peer SSRC %d (previously %d)", channel->rtp.peer_ssrc, stream->rtcp.peer_ssrc)); stream->rtcp.peer_ssrc = channel->rtp.peer_ssrc; } } /* Skip bad RTP packet */ if (seq_st.status.flag.bad) { pkt_discarded = PJ_TRUE; goto on_return; } /* Ignore if payloadlen is zero */ if (payloadlen == 0) { pkt_discarded = PJ_TRUE; goto on_return; } /* Handle incoming DTMF. */ if (hdr->pt == stream->rx_event_pt) { /* Ignore out-of-order packet as it will be detected as new * digit. Also ignore duplicate packet as it serves no use. */ if (seq_st.status.flag.outorder || seq_st.status.flag.dup) { goto on_return; } handle_incoming_dtmf(stream, payload, payloadlen); goto on_return; } /* See if source address of RTP packet is different than the * configured address, and check if we need to tell the * media transport to switch RTP remote address. */ if (param->src_addr) { pj_bool_t badssrc = (stream->si.has_rem_ssrc && seq_st.status.flag.badssrc); if (pj_sockaddr_cmp(&stream->rem_rtp_addr, param->src_addr) == 0) { /* We're still receiving from rem_rtp_addr. */ stream->rtp_src_cnt = 0; stream->rem_rtp_flag = badssrc? 2: 1; } else { stream->rtp_src_cnt++; if (stream->rtp_src_cnt < PJMEDIA_RTP_NAT_PROBATION_CNT) { if (stream->rem_rtp_flag == 1 || (stream->rem_rtp_flag == 2 && badssrc)) { /* Only discard if: * - we have ever received packet with good ssrc from * remote address (rem_rtp_addr), or * - we have ever received packet with bad ssrc from * remote address and this packet also has bad ssrc. */ pkt_discarded = PJ_TRUE; goto on_return; } if (stream->si.has_rem_ssrc && !seq_st.status.flag.badssrc && stream->rem_rtp_flag != 1) { /* Immediately switch if we receive packet with the * correct ssrc AND we never receive packets with * good ssrc from rem_rtp_addr. */ param->rem_switch = PJ_TRUE; } } else { /* Switch. We no longer receive packets from rem_rtp_addr. */ param->rem_switch = PJ_TRUE; } if (param->rem_switch) { /* Set remote RTP address to source address */ pj_sockaddr_cp(&stream->rem_rtp_addr, param->src_addr); /* Reset counter and flag */ stream->rtp_src_cnt = 0; stream->rem_rtp_flag = badssrc? 2: 1; /* Update RTCP peer ssrc */ stream->rtcp.peer_ssrc = pj_ntohl(hdr->ssrc); } } } /* Put "good" packet to jitter buffer, or reset the jitter buffer * when RTP session is restarted. */ pj_mutex_lock( stream->jb_mutex ); if (seq_st.status.flag.restart) { status = pjmedia_jbuf_reset(stream->jb); PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); } else { /* * Packets may contain more than one frames, while the jitter * buffer can only take one frame per "put" operation. So we need * to ask the codec to "parse" the payload into multiple frames. */ enum { MAX = 16 }; pj_timestamp ts; unsigned i, count = MAX; unsigned ts_span; pjmedia_frame frames[MAX]; /* Get the timestamp of the first sample */ ts.u64 = pj_ntohl(hdr->ts); /* Parse the payload. */ status = pjmedia_codec_parse(stream->codec, (void*)payload, payloadlen, &ts, &count, frames); if (status != PJ_SUCCESS) { LOGERR_((stream->port.info.name.ptr, status, "Codec parse() error")); count = 0; } else if (stream->detect_ptime_change && frames[0].bit_info > 0xFFFF) { unsigned dec_ptime; PJ_LOG(4, (stream->port.info.name.ptr, "codec decode " "ptime change detected")); frames[0].bit_info &= 0xFFFF; dec_ptime = frames[0].bit_info * 1000 / stream->codec_param.info.clock_rate; stream->rtp_rx_ts_len_per_frame= stream->rtp_rx_ts_len_per_frame * dec_ptime / stream->dec_ptime; stream->dec_ptime = (pj_uint16_t)dec_ptime; pjmedia_jbuf_set_ptime(stream->jb, stream->dec_ptime); /* Reset jitter buffer after ptime changed */ pjmedia_jbuf_reset(stream->jb); } #if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) /* This code is used to learn the samples per frame value that is put * by remote endpoint, for codecs with inconsistent clock rate such * as G.722 or MPEG audio. We need to learn the samples per frame * value as it is used as divider when inserting frames into the * jitter buffer. */ if (stream->has_g722_mpeg_bug) { if (stream->rtp_rx_check_cnt) { /* Make sure the detection performed only on two consecutive * packets with valid RTP sequence and no wrapped timestamp. */ if (seq_st.diff == 1 && stream->rtp_rx_last_ts && ts.u64 > stream->rtp_rx_last_ts && stream->rtp_rx_last_cnt > 0) { unsigned peer_frm_ts_diff; unsigned frm_ts_span; /* Calculate actual frame timestamp span */ frm_ts_span = PJMEDIA_PIA_SPF(&stream->port.info) / stream->codec_param.setting.frm_per_pkt/ PJMEDIA_PIA_CCNT(&stream->port.info); /* Get remote frame timestamp span */ peer_frm_ts_diff = ((pj_uint32_t)ts.u64-stream->rtp_rx_last_ts) / stream->rtp_rx_last_cnt; /* Possibilities remote's samples per frame for G.722 * are only (frm_ts_span) and (frm_ts_span/2), this * validation is needed to avoid wrong decision because * of silence frames. */ if (stream->codec_param.info.pt == PJMEDIA_RTP_PT_G722 && (peer_frm_ts_diff == frm_ts_span || peer_frm_ts_diff == (frm_ts_span>>1))) { if (peer_frm_ts_diff < stream->rtp_rx_ts_len_per_frame) { stream->rtp_rx_ts_len_per_frame = peer_frm_ts_diff; /* Done, stop the check immediately */ stream->rtp_rx_check_cnt = 1; } if (--stream->rtp_rx_check_cnt == 0) { PJ_LOG(4, (THIS_FILE, "G722 codec used, remote" " samples per frame detected = %d", stream->rtp_rx_ts_len_per_frame)); /* Reset jitter buffer once detection done */ pjmedia_jbuf_reset(stream->jb); } } } stream->rtp_rx_last_ts = (pj_uint32_t)ts.u64; stream->rtp_rx_last_cnt = count; } ts_span = stream->rtp_rx_ts_len_per_frame; /* Adjust the timestamp of the parsed frames */ for (i=0; idec_ptime * stream->codec_param.info.clock_rate / 1000; } #else ts_span = stream->dec_ptime * stream->codec_param.info.clock_rate / 1000; #endif /* Put each frame to jitter buffer. */ for (i=0; ijb, frames[i].buf, frames[i].size, frames[i].bit_info, ext_seq, &discarded); if (discarded) pkt_discarded = PJ_TRUE; } #if TRACE_JB trace_jb_put(stream, hdr, payloadlen, count); #endif } pj_mutex_unlock( stream->jb_mutex ); /* Check if now is the time to transmit RTCP SR/RR report. * We only do this when stream direction is "decoding only" or * if the encoder is paused, * because otherwise check_tx_rtcp() will be handled by put_frame() */ if (stream->dir == PJMEDIA_DIR_DECODING || stream->enc->paused) { check_tx_rtcp(stream, pj_ntohl(hdr->ts)); } if (status != 0) { LOGERR_((stream->port.info.name.ptr, status, "Jitter buffer put() error")); pkt_discarded = PJ_TRUE; goto on_return; } on_return: /* Update RTCP session */ if (stream->rtcp.peer_ssrc == 0) stream->rtcp.peer_ssrc = channel->rtp.peer_ssrc; pjmedia_rtcp_rx_rtp2(&stream->rtcp, pj_ntohs(hdr->seq), pj_ntohl(hdr->ts), payloadlen, pkt_discarded); /* RTCP-FB generic NACK */ if (stream->rtcp.received >= 10 && seq_st.diff > 1 && stream->send_rtcp_fb_nack && pj_ntohs(hdr->seq) >= seq_st.diff) { int i; pj_bzero(&stream->rtcp_fb_nack, sizeof(stream->rtcp_fb_nack)); stream->rtcp_fb_nack.pid = pj_ntohs(hdr->seq) - seq_st.diff + 1; for (i = 0; i < (seq_st.diff - 1); ++i) { stream->rtcp_fb_nack.blp <<= 1; stream->rtcp_fb_nack.blp |= 1; } /* Send it immediately */ status = send_rtcp(stream, PJ_TRUE, PJ_FALSE, PJ_FALSE, PJ_TRUE); if (status != PJ_SUCCESS) { PJ_PERROR(4,(stream->port.info.name.ptr, status, "Error sending RTCP FB generic NACK")); } else { stream->initial_rr = PJ_TRUE; } } /* Send RTCP RR and SDES after we receive some RTP packets */ if (stream->rtcp.received >= 10 && !stream->initial_rr) { status = send_rtcp(stream, !stream->rtcp_sdes_bye_disabled, PJ_FALSE, PJ_FALSE, PJ_FALSE); if (status != PJ_SUCCESS) { PJ_PERROR(4,(stream->port.info.name.ptr, status, "Error sending initial RTCP RR")); } else { stream->initial_rr = PJ_TRUE; } } } /* * This callback is called by stream transport on receipt of packets * in the RTCP socket. */ static void on_rx_rtcp( void *data, void *pkt, pj_ssize_t bytes_read) { pjmedia_stream *stream = (pjmedia_stream*) data; pj_status_t status; /* Check for errors */ if (bytes_read < 0) { status = (pj_status_t)-bytes_read; if (status == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { return; } LOGERR_((stream->port.info.name.ptr, status, "Unable to receive RTCP packet")); if (status == PJ_ESOCKETSTOP) { /* Publish receive error event. */ publish_tp_event(PJMEDIA_EVENT_MEDIA_TP_ERR, status, PJ_FALSE, PJMEDIA_DIR_DECODING, stream); } return; } pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read); } /* * Create media channel. */ static pj_status_t create_channel( pj_pool_t *pool, pjmedia_stream *stream, pjmedia_dir dir, unsigned pt, const pjmedia_stream_info *param, pjmedia_channel **p_channel) { pjmedia_channel *channel; pj_status_t status; /* Allocate memory for channel descriptor */ channel = PJ_POOL_ZALLOC_T(pool, pjmedia_channel); PJ_ASSERT_RETURN(channel != NULL, PJ_ENOMEM); /* Init channel info. */ channel->stream = stream; channel->dir = dir; channel->paused = 1; channel->pt = pt; /* Allocate buffer for outgoing packet. */ if (param->type == PJMEDIA_TYPE_AUDIO) { unsigned max_rx_based_size; unsigned max_bps_based_size; /* out_pkt buffer is used for sending and receiving, so lets calculate * its size based on both. For receiving, we have stream->frame_size, * which is used in configuring jitter buffer frame length. * For sending, it is based on codec max_bps info. */ max_rx_based_size = stream->frame_size; max_bps_based_size = stream->codec_param.info.max_bps * PJMEDIA_MAX_FRAME_DURATION_MS / 8 / 1000; channel->out_pkt_size = PJ_MAX(max_rx_based_size, max_bps_based_size); /* Also include RTP header size (for sending) */ channel->out_pkt_size += sizeof(pjmedia_rtp_hdr); if (channel->out_pkt_size > PJMEDIA_MAX_MTU - PJMEDIA_STREAM_RESV_PAYLOAD_LEN) { channel->out_pkt_size = PJMEDIA_MAX_MTU - PJMEDIA_STREAM_RESV_PAYLOAD_LEN; } } else { return PJ_ENOTSUP; } channel->out_pkt = pj_pool_alloc(pool, channel->out_pkt_size); PJ_ASSERT_RETURN(channel->out_pkt != NULL, PJ_ENOMEM); /* Create RTP and RTCP sessions: */ { pjmedia_rtp_session_setting settings; settings.flags = (pj_uint8_t)((param->rtp_seq_ts_set << 2) | (param->has_rem_ssrc << 4) | 3); settings.default_pt = pt; settings.sender_ssrc = param->ssrc; settings.peer_ssrc = param->rem_ssrc; settings.seq = param->rtp_seq; settings.ts = param->rtp_ts; status = pjmedia_rtp_session_init2(&channel->rtp, settings); } if (status != PJ_SUCCESS) return status; /* Done. */ *p_channel = channel; return PJ_SUCCESS; } /* * Handle events. */ static pj_status_t stream_event_cb(pjmedia_event *event, void *user_data) { pjmedia_stream *stream = (pjmedia_stream*)user_data; /* Set RTCP FB capability in the event */ if (event->type==PJMEDIA_EVENT_RX_RTCP_FB && event->epub==&stream->rtcp) { pjmedia_event_rx_rtcp_fb_data *data = (pjmedia_event_rx_rtcp_fb_data*) &event->data.rx_rtcp_fb; /* Application not configured to listen to NACK, discard this event */ if (stream->rtcp_fb_nack_cap_idx < 0) return PJ_SUCCESS; data->cap = stream->si.loc_rtcp_fb.caps[stream->rtcp_fb_nack_cap_idx]; } /* Republish events */ return pjmedia_event_publish(NULL, stream, event, PJMEDIA_EVENT_PUBLISH_POST_EVENT); } /* * Create media stream. */ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, pj_pool_t *pool, const pjmedia_stream_info *info, pjmedia_transport *tp, void *user_data, pjmedia_stream **p_stream) { enum { M = 32 }; pjmedia_stream *stream; pj_str_t name; unsigned jb_init, jb_max, jb_min_pre, jb_max_pre; pjmedia_audio_format_detail *afd; pj_pool_t *own_pool = NULL; char *p; pj_status_t status; pjmedia_transport_attach_param att_param; PJ_ASSERT_RETURN(endpt && info && p_stream, PJ_EINVAL); if (pool == NULL) { own_pool = pjmedia_endpt_create_pool( endpt, "strm%p", PJMEDIA_STREAM_SIZE, PJMEDIA_STREAM_INC); PJ_ASSERT_RETURN(own_pool != NULL, PJ_ENOMEM); pool = own_pool; } /* Allocate the media stream: */ stream = PJ_POOL_ZALLOC_T(pool, pjmedia_stream); PJ_ASSERT_RETURN(stream != NULL, PJ_ENOMEM); stream->own_pool = own_pool; /* Duplicate stream info */ pj_memcpy(&stream->si, info, sizeof(*info)); pj_strdup(pool, &stream->si.fmt.encoding_name, &info->fmt.encoding_name); if (info->param) stream->si.param = pjmedia_codec_param_clone(pool, info->param); pjmedia_rtcp_fb_info_dup(pool, &stream->si.loc_rtcp_fb, &info->loc_rtcp_fb); pjmedia_rtcp_fb_info_dup(pool, &stream->si.rem_rtcp_fb, &info->rem_rtcp_fb); /* Init stream/port name */ name.ptr = (char*) pj_pool_alloc(pool, M); name.slen = pj_ansi_snprintf(name.ptr, M, "strm%p", stream); /* Init some port-info. Some parts of the info will be set later * once we have more info about the codec. */ pjmedia_port_info_init(&stream->port.info, &name, PJMEDIA_SIG_PORT_STREAM, info->fmt.clock_rate, info->fmt.channel_cnt, 16, 80); afd = pjmedia_format_get_audio_format_detail(&stream->port.info.fmt, 1); /* Init port. */ //No longer there in 2.0 //pj_strdup(pool, &stream->port.info.encoding_name, &info->fmt.encoding_name); afd->clock_rate = info->fmt.clock_rate; afd->channel_count = info->fmt.channel_cnt; stream->port.port_data.pdata = stream; /* Init stream: */ stream->endpt = endpt; stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); stream->dir = info->dir; stream->user_data = user_data; stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL-500 + (pj_rand()%1000)) * info->fmt.clock_rate / 1000; stream->rtcp_sdes_bye_disabled = info->rtcp_sdes_bye_disabled; stream->tx_event_pt = info->tx_event_pt ? info->tx_event_pt : -1; stream->rx_event_pt = info->rx_event_pt ? info->rx_event_pt : -1; stream->last_dtmf = -1; stream->jb_last_frm = PJMEDIA_JB_NORMAL_FRAME; stream->rtcp_fb_nack.pid = -1; #if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 stream->use_ka = info->use_ka; #endif stream->cname = info->cname; if (stream->cname.slen == 0) { /* Build random RTCP CNAME. CNAME has user@host format */ stream->cname.ptr = p = (char*) pj_pool_alloc(pool, 20); pj_create_random_string(p, 5); p += 5; *p++ = '@'; *p++ = 'p'; *p++ = 'j'; pj_create_random_string(p, 6); p += 6; *p++ = '.'; *p++ = 'o'; *p++ = 'r'; *p++ = 'g'; stream->cname.slen = p - stream->cname.ptr; } /* Create mutex to protect jitter buffer: */ status = pj_mutex_create_simple(pool, NULL, &stream->jb_mutex); if (status != PJ_SUCCESS) goto err_cleanup; /* Create and initialize codec: */ status = pjmedia_codec_mgr_alloc_codec( stream->codec_mgr, &info->fmt, &stream->codec); if (status != PJ_SUCCESS) goto err_cleanup; /* Get codec param: */ if (info->param) stream->codec_param = *stream->si.param; else { status = pjmedia_codec_mgr_get_default_param(stream->codec_mgr, &info->fmt, &stream->codec_param); if (status != PJ_SUCCESS) goto err_cleanup; } /* Check for invalid max_bps. */ if (stream->codec_param.info.max_bps < stream->codec_param.info.avg_bps) stream->codec_param.info.max_bps = stream->codec_param.info.avg_bps; /* Check for invalid frame per packet. */ if (stream->codec_param.setting.frm_per_pkt < 1) stream->codec_param.setting.frm_per_pkt = 1; /* Init the codec. */ status = pjmedia_codec_init(stream->codec, pool); if (status != PJ_SUCCESS) goto err_cleanup; /* Open the codec. */ /* The clock rate for Opus codec is not static, * it's negotiated in the SDP. */ if (!pj_stricmp2(&info->fmt.encoding_name, "opus")) { stream->codec_param.info.clock_rate = info->fmt.clock_rate; stream->codec_param.info.channel_cnt = info->fmt.channel_cnt; /* Allocate decoding buffer as Opus can send a packet duration of * up to 120 ms. */ stream->dec_buf_size = stream->codec_param.info.clock_rate * 120 / 1000; stream->dec_buf = (pj_int16_t*)pj_pool_alloc(pool, stream->dec_buf_size * sizeof(pj_int16_t)); } status = pjmedia_codec_open(stream->codec, &stream->codec_param); if (status != PJ_SUCCESS) goto err_cleanup; /* Set additional info and callbacks. */ stream->dec_ptime = stream->codec_param.info.frm_ptime; afd->bits_per_sample = 16; afd->frame_time_usec = stream->codec_param.info.frm_ptime * stream->codec_param.setting.frm_per_pkt * 1000; stream->port.info.fmt.id = stream->codec_param.info.fmt_id; if (stream->codec_param.info.fmt_id == PJMEDIA_FORMAT_L16) { /* Raw format */ afd->avg_bps = afd->max_bps = afd->clock_rate * afd->channel_count * afd->bits_per_sample; stream->port.put_frame = &put_frame; stream->port.get_frame = &get_frame; } else { /* Encoded format */ afd->avg_bps = stream->codec_param.info.avg_bps; afd->max_bps = stream->codec_param.info.max_bps; /* Not applicable for 2.0 if ((stream->codec_param.info.max_bps * stream->codec_param.info.frm_ptime * stream->codec_param.setting.frm_per_pkt) % 8000 != 0) { ++stream->port.info.bytes_per_frame; } stream->port.info.format.bitrate = stream->codec_param.info.avg_bps; stream->port.info.format.vad = (stream->codec_param.setting.vad != 0); */ stream->port.put_frame = &put_frame; stream->port.get_frame = &get_frame_ext; } /* If encoder and decoder's ptime are asymmetric, then we need to * create buffer on the encoder side. This could happen for example * with iLBC */ if (stream->codec_param.info.enc_ptime!=0 && stream->codec_param.info.enc_ptime!=stream->codec_param.info.frm_ptime) { unsigned ptime; stream->enc_samples_per_pkt = stream->codec_param.info.enc_ptime * stream->codec_param.info.channel_cnt * afd->clock_rate / 1000; /* Set buffer size as twice the largest ptime value between * stream's ptime, encoder ptime, or decoder ptime. */ ptime = afd->frame_time_usec / 1000; if (stream->codec_param.info.enc_ptime > ptime) ptime = stream->codec_param.info.enc_ptime; if (stream->codec_param.info.frm_ptime > ptime) ptime = stream->codec_param.info.frm_ptime; ptime <<= 1; /* Allocate buffer */ stream->enc_buf_size = afd->clock_rate * ptime / 1000; stream->enc_buf = (pj_int16_t*) pj_pool_alloc(pool, stream->enc_buf_size * 2); } else { stream->enc_samples_per_pkt = PJMEDIA_AFD_SPF(afd); } /* Initially disable the VAD in the stream, to help traverse NAT better */ stream->vad_enabled = stream->codec_param.setting.vad; if (PJMEDIA_STREAM_VAD_SUSPEND_MSEC > 0 && stream->vad_enabled) { stream->codec_param.setting.vad = 0; stream->ts_vad_disabled = 0; pjmedia_codec_modify(stream->codec, &stream->codec_param); PJ_LOG(4,(stream->port.info.name.ptr,"VAD temporarily disabled")); } /* Get the frame size */ if (stream->codec_param.info.max_rx_frame_size > 0) { stream->frame_size = stream->codec_param.info.max_rx_frame_size; } else { stream->frame_size = stream->codec_param.info.max_bps * stream->codec_param.info.frm_ptime / 8 / 1000; if ((stream->codec_param.info.max_bps * stream->codec_param.info.frm_ptime) % 8000 != 0) { ++stream->frame_size; } } /* How many consecutive PLC frames can be generated */ stream->max_plc_cnt = (MAX_PLC_MSEC+stream->codec_param.info.frm_ptime-1)/ stream->codec_param.info.frm_ptime; /* Disable PLC until a "NORMAL" frame is gotten from the jitter buffer. */ stream->plc_cnt = stream->max_plc_cnt; #if defined(PJMEDIA_DTMF_DURATION_MSEC) && (PJMEDIA_DTMF_DURATION_MSEC > 0) stream->dtmf_duration = PJMEDIA_DTMF_DURATION_MSEC * afd->clock_rate / 1000; #else stream->dtmf_duration = PJMEDIA_DTMF_DURATION; #endif #if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) stream->rtp_rx_check_cnt = 50; stream->has_g722_mpeg_bug = PJ_FALSE; stream->rtp_rx_last_ts = 0; stream->rtp_rx_last_cnt = 0; stream->rtp_tx_ts_len_per_pkt = stream->enc_samples_per_pkt / stream->codec_param.info.channel_cnt; stream->rtp_rx_ts_len_per_frame = PJMEDIA_AFD_SPF(afd) / stream->codec_param.setting.frm_per_pkt / stream->codec_param.info.channel_cnt; if (info->fmt.pt == PJMEDIA_RTP_PT_G722) { stream->has_g722_mpeg_bug = PJ_TRUE; /* RTP clock rate = 1/2 real clock rate */ stream->rtp_tx_ts_len_per_pkt >>= 1; #if defined(PJMEDIA_DTMF_DURATION_MSEC) && (PJMEDIA_DTMF_DURATION_MSEC > 0) stream->dtmf_duration >>= 1; #endif } else if (!pj_stricmp2(&info->fmt.encoding_name, "opus")) { unsigned opus_ts_modifier = 48000 / afd->clock_rate; stream->rtp_rx_check_cnt = 0; stream->has_g722_mpeg_bug = PJ_TRUE; stream->rtp_tx_ts_len_per_pkt *= opus_ts_modifier; stream->rtp_rx_ts_len_per_frame *= opus_ts_modifier; stream->detect_ptime_change = PJ_TRUE; #if defined(PJMEDIA_DTMF_DURATION_MSEC) && (PJMEDIA_DTMF_DURATION_MSEC > 0) stream->dtmf_duration *= opus_ts_modifier; #endif } #endif /* Init jitter buffer parameters: */ if (info->jb_max >= stream->codec_param.info.frm_ptime) jb_max = (info->jb_max + stream->codec_param.info.frm_ptime - 1) / stream->codec_param.info.frm_ptime; else jb_max = 500 / stream->codec_param.info.frm_ptime; if (info->jb_min_pre >= stream->codec_param.info.frm_ptime) jb_min_pre = info->jb_min_pre / stream->codec_param.info.frm_ptime; else //jb_min_pre = 60 / stream->codec_param.info.frm_ptime; jb_min_pre = 1; if (info->jb_max_pre >= stream->codec_param.info.frm_ptime) jb_max_pre = info->jb_max_pre / stream->codec_param.info.frm_ptime; else //jb_max_pre = 240 / stream->codec_param.info.frm_ptime; jb_max_pre = jb_max * 4 / 5; if (info->jb_init >= stream->codec_param.info.frm_ptime) jb_init = info->jb_init / stream->codec_param.info.frm_ptime; else //jb_init = (jb_min_pre + jb_max_pre) / 2; jb_init = 0; /* Create jitter buffer */ status = pjmedia_jbuf_create(pool, &stream->port.info.name, stream->frame_size, stream->codec_param.info.frm_ptime, jb_max, &stream->jb); if (status != PJ_SUCCESS) goto err_cleanup; /* Set up jitter buffer */ pjmedia_jbuf_set_adaptive( stream->jb, jb_init, jb_min_pre, jb_max_pre); /* Create decoder channel: */ status = create_channel( pool, stream, PJMEDIA_DIR_DECODING, info->rx_pt, info, &stream->dec); if (status != PJ_SUCCESS) goto err_cleanup; /* Create encoder channel: */ status = create_channel( pool, stream, PJMEDIA_DIR_ENCODING, info->tx_pt, info, &stream->enc); if (status != PJ_SUCCESS) goto err_cleanup; /* Init RTCP session: */ { pjmedia_rtcp_session_setting rtcp_setting; pjmedia_rtcp_session_setting_default(&rtcp_setting); rtcp_setting.name = stream->port.info.name.ptr; rtcp_setting.ssrc = info->ssrc; rtcp_setting.rtp_ts_base = pj_ntohl(stream->enc->rtp.out_hdr.ts); rtcp_setting.clock_rate = info->fmt.clock_rate; rtcp_setting.samples_per_frame = PJMEDIA_AFD_SPF(afd); #if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) /* Special case for G.722 */ if (info->fmt.pt == PJMEDIA_RTP_PT_G722) { rtcp_setting.clock_rate = 8000; rtcp_setting.samples_per_frame = 160; } #endif pjmedia_rtcp_init2(&stream->rtcp, &rtcp_setting); if (info->rtp_seq_ts_set) { stream->rtcp.stat.rtp_tx_last_seq = info->rtp_seq; stream->rtcp.stat.rtp_tx_last_ts = info->rtp_ts; } /* Subscribe to RTCP events */ pjmedia_event_subscribe(NULL, &stream_event_cb, stream, &stream->rtcp); } /* Allocate outgoing RTCP buffer, should be enough to hold SR/RR, SDES, * BYE, and XR. */ stream->out_rtcp_pkt_size = sizeof(pjmedia_rtcp_sr_pkt) + sizeof(pjmedia_rtcp_common) + (4 + (unsigned)stream->cname.slen) + 32; #if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) if (info->rtcp_xr_enabled) { stream->out_rtcp_pkt_size += sizeof(pjmedia_rtcp_xr_pkt); } #endif if (stream->out_rtcp_pkt_size > PJMEDIA_MAX_MTU) stream->out_rtcp_pkt_size = PJMEDIA_MAX_MTU; stream->out_rtcp_pkt = pj_pool_alloc(pool, stream->out_rtcp_pkt_size); pj_bzero(&att_param, sizeof(att_param)); att_param.stream = stream; att_param.media_type = PJMEDIA_TYPE_AUDIO; att_param.user_data = stream; pj_sockaddr_cp(&att_param.rem_addr, &info->rem_addr); pj_sockaddr_cp(&stream->rem_rtp_addr, &info->rem_addr); if (stream->si.rtcp_mux) { pj_sockaddr_cp(&att_param.rem_rtcp, &info->rem_addr); } else if (pj_sockaddr_has_addr(&info->rem_rtcp.addr)) { pj_sockaddr_cp(&att_param.rem_rtcp, &info->rem_rtcp); } att_param.addr_len = pj_sockaddr_get_len(&info->rem_addr); att_param.rtp_cb2 = &on_rx_rtp; att_param.rtcp_cb = &on_rx_rtcp; /* Only attach transport when stream is ready. */ status = pjmedia_transport_attach2(tp, &att_param); if (status != PJ_SUCCESS) goto err_cleanup; stream->transport = tp; #if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) /* Enable RTCP XR and update stream info/config to RTCP XR */ if (info->rtcp_xr_enabled) { int i; pjmedia_rtcp_enable_xr(&stream->rtcp, PJ_TRUE); /* Set RTCP XR TX interval */ if (info->rtcp_xr_interval != 0) stream->rtcp_xr_interval = info->rtcp_xr_interval; else stream->rtcp_xr_interval = (PJMEDIA_RTCP_INTERVAL + (pj_rand() % 8000)) * info->fmt.clock_rate / 1000; /* Additional third-party RTCP XR destination */ if (info->rtcp_xr_dest.addr.sa_family != 0) { stream->rtcp_xr_dest_len = pj_sockaddr_get_len(&info->rtcp_xr_dest); pj_memcpy(&stream->rtcp_xr_dest, &info->rtcp_xr_dest, stream->rtcp_xr_dest_len); } /* jitter buffer adaptive info */ i = PJMEDIA_RTCP_XR_JB_ADAPTIVE; pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, PJMEDIA_RTCP_XR_INFO_CONF_JBA, i); /* Jitter buffer aggressiveness info (estimated) */ i = 7; pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, PJMEDIA_RTCP_XR_INFO_CONF_JBR, i); /* Jitter buffer absolute maximum delay */ i = jb_max * stream->codec_param.info.frm_ptime; pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, PJMEDIA_RTCP_XR_INFO_JB_ABS_MAX, i); /* PLC info */ if (stream->codec_param.setting.plc == 0) i = PJMEDIA_RTCP_XR_PLC_DIS; else #if PJMEDIA_WSOLA_IMP==PJMEDIA_WSOLA_IMP_WSOLA i = PJMEDIA_RTCP_XR_PLC_ENH; #else i = PJMEDIA_RTCP_XR_PLC_DIS; #endif pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, PJMEDIA_RTCP_XR_INFO_CONF_PLC, i); } #endif /* Check if we should send RTCP-FB */ if (stream->si.rem_rtcp_fb.cap_count) { pjmedia_rtcp_fb_info *rfi = &stream->si.rem_rtcp_fb; unsigned i; for (i = 0; i < rfi->cap_count; ++i) { if (rfi->caps[i].type == PJMEDIA_RTCP_FB_NACK && rfi->caps[i].param.slen == 0) { stream->send_rtcp_fb_nack = PJ_TRUE; PJ_LOG(4,(stream->port.info.name.ptr, "Send RTCP-FB generic NACK")); break; } } } /* Check if we should process incoming RTCP-FB */ stream->rtcp_fb_nack_cap_idx = -1; if (stream->si.loc_rtcp_fb.cap_count) { pjmedia_rtcp_fb_info *lfi = &stream->si.loc_rtcp_fb; unsigned i; for (i = 0; i < lfi->cap_count; ++i) { if (lfi->caps[i].type == PJMEDIA_RTCP_FB_NACK && lfi->caps[i].param.slen == 0) { stream->rtcp_fb_nack_cap_idx = i; PJ_LOG(4,(stream->port.info.name.ptr, "Receive RTCP-FB generic NACK")); break; } } } /* Update the stream info's codec param */ stream->si.param = &stream->codec_param; /* Send RTCP SDES */ if (!stream->rtcp_sdes_bye_disabled) { pjmedia_stream_send_rtcp_sdes(stream); } #if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 /* NAT hole punching by sending KA packet via RTP transport. */ if (stream->use_ka) send_keep_alive_packet(stream); #endif #if TRACE_JB { char trace_name[PJ_MAXPATH]; pj_ssize_t len; pj_ansi_snprintf(trace_name, sizeof(trace_name), TRACE_JB_PATH_PREFIX "%s.csv", stream->port.info.name.ptr); status = pj_file_open(pool, trace_name, PJ_O_WRONLY, &stream->trace_jb_fd); if (status != PJ_SUCCESS) { stream->trace_jb_fd = TRACE_JB_INVALID_FD; PJ_PERROR(3,(THIS_FILE, status, "Failed creating RTP trace file '%s'", trace_name)); } else { stream->trace_jb_buf = (char*)pj_pool_alloc(pool, PJ_LOG_MAX_SIZE); /* Print column header */ len = pj_ansi_snprintf(stream->trace_jb_buf, PJ_LOG_MAX_SIZE, "Time, Operation, Size, Frame Count, " "Frame type, RTP Seq, RTP TS, RTP M, " "JB size, JB burst level, JB prefetch\n"); if (len < 1 || len >= PJ_LOG_MAX_SIZE) len = PJ_LOG_MAX_SIZE-1; pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len); pj_file_flush(stream->trace_jb_fd); } } #endif /* Success! */ *p_stream = stream; PJ_LOG(5,(THIS_FILE, "Stream %s created", stream->port.info.name.ptr)); return PJ_SUCCESS; err_cleanup: pjmedia_stream_destroy(stream); return status; } /* * Destroy stream. */ PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream ) { pj_status_t status; PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL); /* Send RTCP BYE (also SDES & XR) */ if (stream->transport && !stream->rtcp_sdes_bye_disabled) { send_rtcp(stream, PJ_TRUE, PJ_TRUE, PJ_TRUE, PJ_FALSE); } /* If we're in the middle of transmitting DTMF digit, send one last * RFC 2833 RTP packet with 'End' flag set. */ if (stream->tx_dtmf_count && stream->tx_dtmf_buf[0].duration != 0) { pjmedia_frame frame_out; pjmedia_channel *channel = stream->enc; int first=0, last=0; void *rtphdr; int rtphdrlen; pj_bzero(&frame_out, sizeof(frame_out)); frame_out.buf = ((char*)channel->out_pkt) + sizeof(pjmedia_rtp_hdr); frame_out.size = 0; create_dtmf_payload(stream, &frame_out, 1, &first, &last); /* Encapsulate into RTP packet. Note that: * - RTP marker should be set on the beginning of a new event * - RTP timestamp is constant for the same packet. */ status = pjmedia_rtp_encode_rtp( &channel->rtp, stream->tx_event_pt, first, (int)frame_out.size, 0, (const void**)&rtphdr, &rtphdrlen); if (status == PJ_SUCCESS) { /* Copy RTP header to the beginning of packet */ pj_memcpy(channel->out_pkt, rtphdr, sizeof(pjmedia_rtp_hdr)); /* Send the RTP packet to the transport. */ status = pjmedia_transport_send_rtp(stream->transport, channel->out_pkt, frame_out.size + sizeof(pjmedia_rtp_hdr)); } if (status != PJ_SUCCESS) { PJ_PERROR(4,(stream->port.info.name.ptr, status, "Error sending RTP/DTMF end packet")); } } /* Unsubscribe from RTCP session events */ pjmedia_event_unsubscribe(NULL, &stream_event_cb, stream, &stream->rtcp); /* Detach from transport * MUST NOT hold stream mutex while detaching from transport, as * it may cause deadlock. See ticket #460 for the details. */ if (stream->transport) { pjmedia_transport_detach(stream->transport, stream); stream->transport = NULL; } /* This function may be called when stream is partly initialized. */ if (stream->jb_mutex) pj_mutex_lock(stream->jb_mutex); /* Free codec. */ if (stream->codec) { pjmedia_codec_close(stream->codec); pjmedia_codec_mgr_dealloc_codec(stream->codec_mgr, stream->codec); stream->codec = NULL; } /* Free mutex */ if (stream->jb_mutex) { pj_mutex_unlock(stream->jb_mutex); pj_mutex_destroy(stream->jb_mutex); stream->jb_mutex = NULL; } /* Destroy jitter buffer */ if (stream->jb) pjmedia_jbuf_destroy(stream->jb); #if TRACE_JB if (TRACE_JB_OPENED(stream)) { pj_file_close(stream->trace_jb_fd); stream->trace_jb_fd = TRACE_JB_INVALID_FD; } #endif pj_pool_safe_release(&stream->own_pool); return PJ_SUCCESS; } /* * Get the last frame frame type retreived from the jitter buffer. */ PJ_DEF(char) pjmedia_stream_get_last_jb_frame_type(pjmedia_stream *stream) { return stream->jb_last_frm; } /* * Get the port interface. */ PJ_DEF(pj_status_t) pjmedia_stream_get_port( pjmedia_stream *stream, pjmedia_port **p_port ) { *p_port = &stream->port; return PJ_SUCCESS; } /* * Get the transport object */ PJ_DEF(pjmedia_transport*) pjmedia_stream_get_transport(pjmedia_stream *st) { return st->transport; } /* * Start stream. */ PJ_DEF(pj_status_t) pjmedia_stream_start(pjmedia_stream *stream) { PJ_ASSERT_RETURN(stream && stream->enc && stream->dec, PJ_EINVALIDOP); if (stream->enc && (stream->dir & PJMEDIA_DIR_ENCODING)) { stream->enc->paused = 0; //pjmedia_snd_stream_start(stream->enc->snd_stream); PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream started")); } else { PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream paused")); } if (stream->dec && (stream->dir & PJMEDIA_DIR_DECODING)) { stream->dec->paused = 0; //pjmedia_snd_stream_start(stream->dec->snd_stream); PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream started")); } else { PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream paused")); } return PJ_SUCCESS; } PJ_DEF(pj_status_t) pjmedia_stream_get_info( const pjmedia_stream *stream, pjmedia_stream_info *info) { PJ_ASSERT_RETURN(stream && info, PJ_EINVAL); pj_memcpy(info, &stream->si, sizeof(pjmedia_stream_info)); return PJ_SUCCESS; } /* * Get stream statistics. */ PJ_DEF(pj_status_t) pjmedia_stream_get_stat( const pjmedia_stream *stream, pjmedia_rtcp_stat *stat) { PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL); pj_memcpy(stat, &stream->rtcp.stat, sizeof(pjmedia_rtcp_stat)); return PJ_SUCCESS; } /* * Reset the stream statistics in the middle of a stream session. */ PJ_DEF(pj_status_t) pjmedia_stream_reset_stat(pjmedia_stream *stream) { PJ_ASSERT_RETURN(stream, PJ_EINVAL); pjmedia_rtcp_init_stat(&stream->rtcp.stat); return PJ_SUCCESS; } #if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) /* * Get stream extended statistics. */ PJ_DEF(pj_status_t) pjmedia_stream_get_stat_xr( const pjmedia_stream *stream, pjmedia_rtcp_xr_stat *stat) { PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL); if (stream->rtcp.xr_enabled) { pj_memcpy(stat, &stream->rtcp.xr_session.stat, sizeof(pjmedia_rtcp_xr_stat)); return PJ_SUCCESS; } return PJ_ENOTFOUND; } #endif /* * Get jitter buffer state. */ PJ_DEF(pj_status_t) pjmedia_stream_get_stat_jbuf(const pjmedia_stream *stream, pjmedia_jb_state *state) { PJ_ASSERT_RETURN(stream && state, PJ_EINVAL); return pjmedia_jbuf_get_state(stream->jb, state); } /* * Pause stream. */ PJ_DEF(pj_status_t) pjmedia_stream_pause( pjmedia_stream *stream, pjmedia_dir dir) { PJ_ASSERT_RETURN(stream, PJ_EINVAL); if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) { stream->enc->paused = 1; PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream paused")); } if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) { stream->dec->paused = 1; /* Also reset jitter buffer */ pj_mutex_lock( stream->jb_mutex ); pjmedia_jbuf_reset(stream->jb); pj_mutex_unlock( stream->jb_mutex ); PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream paused")); } return PJ_SUCCESS; } /* * Resume stream */ PJ_DEF(pj_status_t) pjmedia_stream_resume( pjmedia_stream *stream, pjmedia_dir dir) { PJ_ASSERT_RETURN(stream, PJ_EINVAL); if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) { stream->enc->paused = 0; PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream resumed")); } if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) { stream->dec->paused = 0; PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream resumed")); } return PJ_SUCCESS; } /* * Dial DTMF */ PJ_DEF(pj_status_t) pjmedia_stream_dial_dtmf( pjmedia_stream *stream, const pj_str_t *digit_char) { pj_status_t status = PJ_SUCCESS; /* By convention we use jitter buffer mutex to access DTMF * queue. */ PJ_ASSERT_RETURN(stream && digit_char, PJ_EINVAL); /* Check that remote can receive DTMF events. */ if (stream->tx_event_pt < 0) { return PJMEDIA_RTP_EREMNORFC2833; } pj_mutex_lock(stream->jb_mutex); if (stream->tx_dtmf_count+digit_char->slen >= (long)PJ_ARRAY_SIZE(stream->tx_dtmf_buf)) { status = PJ_ETOOMANY; } else { int i; /* convert ASCII digits into payload type first, to make sure * that all digits are valid. */ for (i=0; islen; ++i) { unsigned pt; int dig = pj_tolower(digit_char->ptr[i]); if (dig >= '0' && dig <= '9') { pt = dig - '0'; } else if (dig >= 'a' && dig <= 'd') { pt = dig - 'a' + 12; } else if (dig == '*') { pt = 10; } else if (dig == '#') { pt = 11; } #if defined(PJMEDIA_HAS_DTMF_FLASH) && PJMEDIA_HAS_DTMF_FLASH!= 0 else if (dig == 'r') { pt = 16; } #endif else { status = PJMEDIA_RTP_EINDTMF; break; } stream->tx_dtmf_buf[stream->tx_dtmf_count+i].event = pt; stream->tx_dtmf_buf[stream->tx_dtmf_count+i].duration = 0; stream->tx_dtmf_buf[stream->tx_dtmf_count+i].ebit_cnt = 0; } if (status != PJ_SUCCESS) goto on_return; /* Increment digit count only if all digits are valid. */ stream->tx_dtmf_count += (int)digit_char->slen; } on_return: pj_mutex_unlock(stream->jb_mutex); return status; } /* * See if we have DTMF digits in the rx buffer. */ PJ_DEF(pj_bool_t) pjmedia_stream_check_dtmf(pjmedia_stream *stream) { return stream->rx_dtmf_count != 0; } /* * Retrieve incoming DTMF digits from the stream's DTMF buffer. */ PJ_DEF(pj_status_t) pjmedia_stream_get_dtmf( pjmedia_stream *stream, char *digits, unsigned *size) { PJ_ASSERT_RETURN(stream && digits && size, PJ_EINVAL); /* By convention, we use jitter buffer's mutex to access DTMF * digits resources. */ pj_mutex_lock(stream->jb_mutex); if (stream->rx_dtmf_count < *size) *size = stream->rx_dtmf_count; if (*size) { pj_memcpy(digits, stream->rx_dtmf_buf, *size); stream->rx_dtmf_count -= *size; if (stream->rx_dtmf_count) { pj_memmove(stream->rx_dtmf_buf, &stream->rx_dtmf_buf[*size], stream->rx_dtmf_count); } } pj_mutex_unlock(stream->jb_mutex); return PJ_SUCCESS; } /* * Set callback to be called upon receiving DTMF digits. */ PJ_DEF(pj_status_t) pjmedia_stream_set_dtmf_callback(pjmedia_stream *stream, void (*cb)(pjmedia_stream*, void *user_data, int digit), void *user_data) { PJ_ASSERT_RETURN(stream, PJ_EINVAL); /* By convention, we use jitter buffer's mutex to access DTMF * digits resources. */ pj_mutex_lock(stream->jb_mutex); stream->dtmf_cb = cb; stream->dtmf_cb_user_data = user_data; pj_mutex_unlock(stream->jb_mutex); return PJ_SUCCESS; } /* * Send RTCP SDES. */ PJ_DEF(pj_status_t) pjmedia_stream_send_rtcp_sdes( pjmedia_stream *stream ) { PJ_ASSERT_RETURN(stream, PJ_EINVAL); return send_rtcp(stream, PJ_TRUE, PJ_FALSE, PJ_FALSE, PJ_FALSE); } /* * Send RTCP BYE. */ PJ_DEF(pj_status_t) pjmedia_stream_send_rtcp_bye( pjmedia_stream *stream ) { PJ_ASSERT_RETURN(stream, PJ_EINVAL); if (stream->enc && stream->transport) { return send_rtcp(stream, PJ_TRUE, PJ_TRUE, PJ_FALSE, PJ_FALSE); } return PJ_SUCCESS; } /** * Get RTP session information from stream. */ PJ_DEF(pj_status_t) pjmedia_stream_get_rtp_session_info(pjmedia_stream *stream, pjmedia_stream_rtp_sess_info *session_info) { session_info->rx_rtp = &stream->dec->rtp; session_info->tx_rtp = &stream->enc->rtp; session_info->rtcp = &stream->rtcp; return PJ_SUCCESS; }