This source file includes following definitions.
- RP_ConfirmChannelConnect
- RP_InitStream
- RP_DeleteStream
- rtp_sl_packet_cbk
- RP_NewSatipStream
- RP_NewStream
- RP_ProcessRTP
- RP_ProcessRTCP
- RP_DataOnTCP
- SendTCPData
- RP_ReadStream
#include "rtp_in.h"
#include <gpac/internal/ietf_dev.h>
#include <gpac/internal/terminal_dev.h>
#ifndef GPAC_DISABLE_STREAMING
void RP_ConfirmChannelConnect(RTPStream *ch, GF_Err e)
{
GF_NetworkCommand com;
if (!ch->channel) return;
gf_service_connect_ack(ch->owner->service, ch->channel, e);
if (e != GF_OK || !ch->rtp_ch) return;
memset(&com, 0, sizeof(GF_NetworkCommand));
com.command_type = GF_NET_CHAN_RECONFIG;
com.base.on_channel = ch->channel;
gf_rtp_depacketizer_get_slconfig(ch->depacketizer, &com.cfg.sl_config);
gf_service_command(ch->owner->service, &com, GF_OK);
if (ch->depacketizer->flags & GF_RTP_HAS_ISMACRYP) {
memset(&com, 0, sizeof(GF_NetworkCommand));
com.base.on_channel = ch->channel;
com.command_type = GF_NET_CHAN_DRM_CFG;
com.drm_cfg.scheme_type = ch->depacketizer->isma_scheme;
com.drm_cfg.scheme_version = 1;
com.drm_cfg.scheme_uri = NULL;
com.drm_cfg.kms_uri = ch->depacketizer->key;
gf_service_command(ch->owner->service, &com, GF_OK);
}
}
GF_Err RP_InitStream(RTPStream *ch, Bool ResetOnly)
{
gf_rtp_depacketizer_reset(ch->depacketizer, !ResetOnly);
if (!ResetOnly) {
const char *ip_ifce = NULL;
u32 reorder_size = 0;
if (!ch->owner->transport_mode) {
const char *sOpt = gf_modules_get_option((GF_BaseInterface *) gf_service_get_interface(ch->owner->service), "Streaming", "ReorderSize");
if (sOpt) reorder_size = atoi(sOpt);
else reorder_size = 10;
ip_ifce = gf_modules_get_option((GF_BaseInterface *) gf_service_get_interface(ch->owner->service), "Network", "DefaultMCastInterface");
if (!ip_ifce) {
const char *mob_on = gf_modules_get_option((GF_BaseInterface *) gf_service_get_interface(ch->owner->service), "Network", "MobileIPEnabled");
if (mob_on && !strcmp(mob_on, "yes")) {
ip_ifce = gf_modules_get_option((GF_BaseInterface *) gf_service_get_interface(ch->owner->service), "Network", "MobileIP");
ch->flags |= RTP_MOBILEIP;
}
}
}
return gf_rtp_initialize(ch->rtp_ch, RTP_BUFFER_SIZE, GF_FALSE, 0, reorder_size, 200, (char *)ip_ifce);
}
gf_rtp_reset_buffers(ch->rtp_ch);
return GF_OK;
}
void RP_DeleteStream(RTPStream *ch)
{
if (ch->rtsp) {
if (ch->status == RTP_Running) {
RP_Teardown(ch->rtsp, ch);
ch->status = RTP_Disconnected;
}
RP_RemoveStream(ch->owner, ch);
} else {
RP_FindChannel(ch->owner, ch->channel, 0, NULL, GF_TRUE);
}
if (ch->depacketizer) gf_rtp_depacketizer_del(ch->depacketizer);
if (ch->rtp_ch) gf_rtp_del(ch->rtp_ch);
if (ch->control) gf_free(ch->control);
if (ch->session_id) gf_free(ch->session_id);
if (ch->satip_m2ts_ifce) {
if (ch->satip_m2ts_service_connected) {
ch->satip_m2ts_ifce->CloseService(ch->satip_m2ts_ifce);
}
gf_modules_close_interface((GF_BaseInterface *)ch->satip_m2ts_ifce);
}
gf_free(ch);
}
static void rtp_sl_packet_cbk(void *udta, char *payload, u32 size, GF_SLHeader *hdr, GF_Err e)
{
u64 cts, dts;
RTPStream *ch = (RTPStream *)udta;
if (!ch->rtcp_init) {
if (!ch->rtcp_check_start) {
ch->rtcp_check_start = gf_sys_clock();
return;
}
else if (gf_sys_clock() - ch->rtcp_check_start <= RTCP_DEFAULT_TIMEOUT_MS) {
return;
}
GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Timeout for RTCP: no SR recevied after %d ms - forcing playback, sync may be broken\n", RTCP_DEFAULT_TIMEOUT_MS));
ch->rtcp_init = GF_TRUE;
}
cts = hdr->compositionTimeStamp;
dts = hdr->decodingTimeStamp;
if (hdr->compositionTimeStamp < ch->ts_offset) {
hdr->compositionTimeStamp = hdr->decodingTimeStamp = 0;
hdr->seekFlag = 1;
} else {
hdr->seekFlag = 0;
hdr->compositionTimeStamp -= ch->ts_offset;
if (hdr->decodingTimeStamp) {
hdr->decodingTimeStamp -= ch->ts_offset;
}
}
if (ch->rtp_ch->packet_loss) e = GF_REMOTE_SERVICE_ERROR;
if (ch->owner->first_packet_drop && (hdr->packetSequenceNumber >= ch->owner->first_packet_drop) ) {
if ( (hdr->packetSequenceNumber - ch->owner->first_packet_drop) % ch->owner->frequency_drop)
gf_service_send_packet(ch->owner->service, ch->channel, payload, size, hdr, e);
} else {
gf_service_send_packet(ch->owner->service, ch->channel, payload, size, hdr, e);
}
hdr->compositionTimeStamp = cts;
hdr->decodingTimeStamp = dts;
}
RTPStream *RP_NewSatipStream(RTPClient *rtp, const char *server_ip)
{
char *ctrl;
GF_RTPMap map;
GF_RTSPTransport trans;
RTPStream *tmp;
GF_SAFEALLOC(tmp, RTPStream);
if (!tmp) return NULL;
tmp->owner = rtp;
tmp->rtp_ch = gf_rtp_new();
tmp->control = gf_strdup("*");
memset(&trans, 0, sizeof(GF_RTSPTransport));
trans.Profile = "RTP/AVP";
trans.source = gf_strdup(server_ip);
trans.IsUnicast = GF_TRUE;
trans.client_port_first = 0;
trans.client_port_last = 0;
trans.port_first = 0;
trans.port_last = 0;
if (gf_rtp_setup_transport(tmp->rtp_ch, &trans, NULL) != GF_OK) {
RP_DeleteStream(tmp);
return NULL;
}
memset(&map, 0, sizeof(GF_RTPMap));
map.PayloadType = 33;
map.ClockRate = 90000;
gf_rtp_setup_payload(tmp->rtp_ch, &map);
ctrl = (char *)gf_modules_get_option((GF_BaseInterface *)gf_service_get_interface(rtp->service), "Streaming", "DisableRTCP");
if (!ctrl || stricmp(ctrl, "yes")) tmp->flags |= RTP_ENABLE_RTCP;
ctrl = (char *)gf_modules_get_option((GF_BaseInterface *)gf_service_get_interface(rtp->service), "Streaming", "NATKeepAlive");
if (ctrl) {
gf_rtp_enable_nat_keepalive(tmp->rtp_ch, atoi(ctrl));
} else {
gf_rtp_enable_nat_keepalive(tmp->rtp_ch, 30000);
}
tmp->range_start = 0;
tmp->range_end = 0;
tmp->satip_m2ts_ifce = (GF_InputService*)gf_modules_load_interface_by_name(rtp->service->term->user->modules, "GPAC MPEG-2 TS Reader", GF_NET_CLIENT_INTERFACE);
if (!tmp->satip_m2ts_ifce) {
GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[SAT>IP] Couldn't load the M2TS demuxer.\n"));
RP_DeleteStream(tmp);
return NULL;
}
tmp->satip_m2ts_ifce->proxy_udta = rtp;
return tmp;
}
RTPStream *RP_NewStream(RTPClient *rtp, GF_SDPMedia *media, GF_SDPInfo *sdp, RTPStream *input_stream)
{
GF_RTSPRange *range;
RTPStream *tmp;
GF_RTPMap *map;
u32 i, ESID, ODID, ssrc, rtp_seq, rtp_time;
Bool force_bcast = GF_FALSE;
Double Start, End;
Float CurrentTime;
u16 rvc_predef = 0;
char *rvc_config_att = NULL;
u32 s_port_first, s_port_last;
GF_X_Attribute *att;
Bool is_migration = GF_FALSE;
char *ctrl;
GF_SDPConnection *conn;
GF_RTSPTransport trans;
u32 mid, prev_stream, base_stream;
Start = 0.0;
End = -1.0;
CurrentTime = 0.0f;
ODID = 0;
ESID = 0;
ctrl = NULL;
range = NULL;
s_port_first = s_port_last = 0;
ssrc = rtp_seq = rtp_time = 0;
mid = prev_stream = base_stream = 0;
i=0;
while ((att = (GF_X_Attribute*)gf_list_enum(media->Attributes, &i))) {
if (!stricmp(att->Name, "control")) ctrl = att->Value;
else if (!stricmp(att->Name, "gpac-broadcast")) force_bcast = GF_TRUE;
else if (!stricmp(att->Name, "mpeg4-esid") && att->Value) ESID = atoi(att->Value);
else if (!stricmp(att->Name, "mpeg4-odid") && att->Value) ODID = atoi(att->Value);
else if (!stricmp(att->Name, "range") && !range) range = gf_rtsp_range_parse(att->Value);
else if (!stricmp(att->Name, "x-stream-state") ) {
sscanf(att->Value, "server-port=%u-%u;ssrc=%X;npt=%g;seq=%u;rtptime=%u",
&s_port_first, &s_port_last, &ssrc, &CurrentTime, &rtp_seq, &rtp_time);
is_migration = GF_TRUE;
}
else if (!stricmp(att->Name, "x-server-port") ) {
sscanf(att->Value, "%u-%u", &s_port_first, &s_port_last);
} else if (!stricmp(att->Name, "rvc-config-predef")) {
rvc_predef = atoi(att->Value);
} else if (!stricmp(att->Name, "rvc-config")) {
rvc_config_att = att->Value;
} else if (!stricmp(att->Name, "mid")) {
sscanf(att->Value, "L%d", &mid);
} else if (!stricmp(att->Name, "depend")) {
char buf[3000];
memset(buf, 0, 3000);
sscanf(att->Value, "%*d lay L%d %*s %s", &base_stream, buf);
if (!strlen(buf))
sscanf(att->Value, "%*d lay %s", buf);
sscanf(buf, "L%d", &prev_stream);
}
}
if (range) {
Start = range->start;
End = range->end;
gf_rtsp_range_del(range);
}
conn = sdp->c_connection;
if (conn && (!conn->host || !strcmp(conn->host, "0.0.0.0"))) conn = NULL;
if (!conn) conn = (GF_SDPConnection*)gf_list_get(media->Connections, 0);
if (conn && (!conn->host || !strcmp(conn->host, "0.0.0.0"))) conn = NULL;
if (!conn) {
if (!sdp->o_net_type || !sdp->o_add_type || strcmp(sdp->o_net_type, "IN")) return NULL;
if (strcmp(sdp->o_add_type, "IP4") && strcmp(sdp->o_add_type, "IP6")) return NULL;
} else {
if (strcmp(conn->net_type, "IN")) return NULL;
if (strcmp(conn->add_type, "IP4") && strcmp(conn->add_type, "IP6")) return NULL;
}
if (strcmp(media->Profile, "RTP/AVP") && strcmp(media->Profile, "RTP/AVP/TCP")
&& strcmp(media->Profile, "RTP/SAVP") && strcmp(media->Profile, "RTP/SAVP/TCP")
) return NULL;
if (media->fmt_list || (gf_list_count(media->RTPMaps) > 1)) return NULL;
map = (GF_RTPMap*)gf_list_get(media->RTPMaps, 0);
if (input_stream) {
ESID = input_stream->ES_ID;
if (!ctrl) ctrl = input_stream->control;
tmp = input_stream;
} else {
tmp = RP_FindChannel(rtp, NULL, ESID, NULL, GF_FALSE);
if (tmp) return NULL;
GF_SAFEALLOC(tmp, RTPStream);
if (!tmp) return NULL;
tmp->owner = rtp;
}
tmp->rtp_ch = gf_rtp_new();
if (ctrl) tmp->control = gf_strdup(ctrl);
tmp->ES_ID = ESID;
tmp->OD_ID = ODID;
tmp->mid = mid;
tmp->prev_stream = prev_stream;
tmp->base_stream = base_stream;
memset(&trans, 0, sizeof(GF_RTSPTransport));
trans.Profile = media->Profile;
trans.source = conn ? conn->host : sdp->o_address;
trans.IsUnicast = gf_sk_is_multicast_address(trans.source) ? GF_FALSE : GF_TRUE;
if (!trans.IsUnicast) {
trans.port_first = media->PortNumber;
trans.port_last = media->PortNumber + 1;
trans.TTL = conn ? conn->TTL : 0;
} else {
trans.client_port_first = media->PortNumber;
trans.client_port_last = media->PortNumber + 1;
trans.port_first = s_port_first ? s_port_first : trans.client_port_first;
trans.port_last = s_port_last ? s_port_last : trans.client_port_last;
}
if (gf_rtp_setup_transport(tmp->rtp_ch, &trans, NULL) != GF_OK) {
RP_DeleteStream(tmp);
return NULL;
}
tmp->depacketizer = gf_rtp_depacketizer_new(media, rtp_sl_packet_cbk, tmp);
if (!tmp->depacketizer) {
RP_DeleteStream(tmp);
return NULL;
}
gf_rtp_setup_payload(tmp->rtp_ch, map);
ctrl = (char *) gf_modules_get_option((GF_BaseInterface *) gf_service_get_interface(rtp->service), "Streaming", "DisableRTCP");
if (!ctrl || stricmp(ctrl, "yes")) tmp->flags |= RTP_ENABLE_RTCP;
ctrl = (char *) gf_modules_get_option((GF_BaseInterface *) gf_service_get_interface(rtp->service), "Streaming", "NATKeepAlive");
if (ctrl) gf_rtp_enable_nat_keepalive(tmp->rtp_ch, atoi(ctrl));
tmp->range_start = Start;
tmp->range_end = End;
if (End != -1.0) tmp->flags |= RTP_HAS_RANGE;
if (force_bcast) tmp->flags |= RTP_FORCE_BROADCAST;
if (is_migration) {
tmp->current_start = (Double) CurrentTime;
tmp->check_rtp_time = RTP_SET_TIME_RTP;
gf_rtp_set_info_rtp(tmp->rtp_ch, rtp_seq, rtp_time, ssrc);
tmp->status = RTP_SessionResume;
}
if (rvc_predef) {
tmp->depacketizer->sl_map.rvc_predef = rvc_predef ;
} else if (rvc_config_att) {
char *rvc_data=NULL;
u32 rvc_size;
Bool is_gz = GF_FALSE;
if (!strncmp(rvc_config_att, "data:application/rvc-config+xml", 32) && strstr(rvc_config_att, "base64") ) {
char *data = strchr(rvc_config_att, ',');
if (data) {
rvc_size = (u32) strlen(data) * 3 / 4 + 1;
rvc_data = (char*)gf_malloc(sizeof(char) * rvc_size);
rvc_size = gf_base64_decode(data, (u32) strlen(data), rvc_data, rvc_size);
rvc_data[rvc_size] = 0;
}
if (!strncmp(rvc_config_att, "data:application/rvc-config+xml+gz", 35)) is_gz = GF_TRUE;
} else if (!strnicmp(rvc_config_att, "http://", 7) || !strnicmp(rvc_config_att, "https://", 8) ) {
char *mime;
if (gf_dm_get_file_memory(rvc_config_att, &rvc_data, &rvc_size, &mime) == GF_OK) {
if (mime && strstr(mime, "+gz")) is_gz = GF_TRUE;
if (mime) gf_free(mime);
}
}
if (rvc_data) {
if (is_gz) {
#ifdef GPAC_DISABLE_ZLIB
fprintf(stderr, "Error: no zlib support - RVC not supported in RTP\n");
return NULL;
#endif
gf_gz_decompress_payload(rvc_data, rvc_size, &tmp->depacketizer->sl_map.rvc_config, &tmp->depacketizer->sl_map.rvc_config_size);
gf_free(rvc_data);
} else {
tmp->depacketizer->sl_map.rvc_config = rvc_data;
tmp->depacketizer->sl_map.rvc_config_size = rvc_size;
}
}
}
return tmp;
}
static void RP_ProcessRTP(RTPStream *ch, char *pck, u32 size)
{
GF_NetworkCommand com;
GF_Err e;
GF_RTPHeader hdr;
u32 PayloadStart;
ch->rtp_bytes += size;
e = gf_rtp_decode_rtp(ch->rtp_ch, pck, size, &hdr, &PayloadStart);
if (e || (PayloadStart >= size)) {
return;
}
if (ch->check_rtp_time ) {
Double ch_time;
if (ch->rtp_ch->rtp_time
&& (ch->rtp_ch->rtp_first_SN > hdr.SequenceNumber)
&& (ch->rtp_ch->rtp_time < hdr.TimeStamp)
) {
GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] Rejecting too early packet (TS %d vs signaled rtp time %d - diff %d ms)\n",
hdr.TimeStamp, ch->rtp_ch->rtp_time, ((hdr.TimeStamp - ch->rtp_ch->rtp_time)*1000) / ch->rtp_ch->TimeScale));
return;
}
ch_time = gf_rtp_get_current_time(ch->rtp_ch);
if (ch->check_rtp_time == RTP_SET_TIME_RTP) {
memset(&com, 0, sizeof(com));
com.command_type = GF_NET_CHAN_MAP_TIME;
com.base.on_channel = ch->channel;
if (ch->rtsp) {
com.map_time.media_time = ch->current_start + ch_time;
} else {
com.map_time.media_time = 0;
}
com.map_time.timestamp = hdr.TimeStamp;
com.map_time.reset_buffers = GF_FALSE;
gf_service_command(ch->owner->service, &com, GF_OK);
GF_LOG(GF_LOG_INFO, GF_LOG_RTP, ("[RTP] Mapping RTP Time seq %d TS %d Media Time %g - rtp info seq %d TS %d\n",
hdr.SequenceNumber, hdr.TimeStamp, com.map_time.media_time, ch->rtp_ch->rtp_first_SN, ch->rtp_ch->rtp_time
));
if (ch->rtsp) ch->rtcp_init = GF_TRUE;
}
else if (ch_time <= 0.021) {
return;
}
ch->check_rtp_time = RTP_SET_TIME_NONE;
}
gf_rtp_depacketizer_process(ch->depacketizer, &hdr, pck + PayloadStart, size - PayloadStart);
if ((ch->flags & RTP_HAS_RANGE) && !(ch->flags & RTP_EOS) ) {
Double ts = (Double) ((u32) ch->depacketizer->sl_hdr.compositionTimeStamp - hdr.TimeStamp);
ts /= gf_rtp_get_clockrate(ch->rtp_ch);
if (ABSDIFF(ch->range_end, (ts + ch->current_start + gf_rtp_get_current_time(ch->rtp_ch)) ) < 0.2) {
ch->flags |= RTP_EOS;
ch->stat_stop_time = gf_sys_clock();
gf_service_send_packet(ch->owner->service, ch->channel, NULL, 0, NULL, GF_EOS);
}
}
}
void RP_ProcessRTCP(RTPStream *ch, char *pck, u32 size)
{
Bool has_sr;
GF_Err e;
if (ch->status == RTP_Connected) return;
ch->rtcp_bytes += size;
e = gf_rtp_decode_rtcp(ch->rtp_ch, pck, size, &has_sr);
if (e<0) return;
if (!ch->rtcp_init && has_sr) {
Double ntp_clock;
ntp_clock = ch->rtp_ch->last_SR_NTP_sec;
ntp_clock += ((Double)ch->rtp_ch->last_SR_NTP_frac)/0xFFFFFFFF;
if (!ch->owner->last_ntp) {
ch->owner->last_ntp = ntp_clock;
}
if (ntp_clock >= ch->owner->last_ntp) {
ntp_clock -= ch->owner->last_ntp;
} else {
ntp_clock = 0;
}
ch->ts_offset = ch->rtp_ch->last_SR_rtp_time;
ch->ts_offset -= (s64) (ntp_clock * ch->rtp_ch->TimeScale);
GF_LOG(GF_LOG_INFO, GF_LOG_RTP, ("[RTCP] At %d Using Sender Report to map RTP TS %d to NTP clock %g - new TS offset "LLD" \n",
gf_sys_clock(), ch->rtp_ch->last_SR_rtp_time, ntp_clock, ch->ts_offset
));
ch->rtcp_init = GF_TRUE;
ch->check_rtp_time = RTP_SET_TIME_NONE;
}
if (e == GF_EOS) {
ch->flags |= RTP_EOS;
ch->stat_stop_time = gf_sys_clock();
gf_service_send_packet(ch->owner->service, ch->channel, NULL, 0, NULL, GF_EOS);
}
}
GF_Err RP_DataOnTCP(GF_RTSPSession *sess, void *cbk, char *buffer, u32 bufferSize, Bool IsRTCP)
{
RTPStream *ch = (RTPStream *) cbk;
if (!ch) return GF_OK;
if (IsRTCP) {
RP_ProcessRTCP(ch, buffer, bufferSize);
} else {
RP_ProcessRTP(ch, buffer, bufferSize);
}
return GF_OK;
}
static GF_Err SendTCPData(void *par, char *pck, u32 pck_size)
{
return GF_OK;
}
void RP_ReadStream(RTPStream *ch)
{
u32 size, tot_size;
if (!ch->rtp_ch) return;
tot_size = 0;
while (1) {
size = gf_rtp_read_rtcp(ch->rtp_ch, ch->buffer, RTP_BUFFER_SIZE);
if (!size) break;
tot_size += size;
RP_ProcessRTCP(ch, ch->buffer, size);
}
while (1) {
size = gf_rtp_read_rtp(ch->rtp_ch, ch->buffer, RTP_BUFFER_SIZE);
if (!size) break;
tot_size += size;
RP_ProcessRTP(ch, ch->buffer, size);
}
if (ch->flags & RTP_ENABLE_RTCP) gf_rtp_send_rtcp_report(ch->rtp_ch, SendTCPData, ch);
if (tot_size) ch->owner->udp_time_out = 0;
if (ch->owner->udp_time_out) {
if (!ch->last_udp_time) {
ch->last_udp_time = gf_sys_clock();
} else if (ch->rtp_ch->net_info.IsUnicast && !(ch->flags & RTP_MOBILEIP) ) {
u32 diff = gf_sys_clock() - ch->last_udp_time;
if (diff >= ch->owner->udp_time_out) {
char szMessage[1024];
GF_LOG(GF_LOG_WARNING, GF_LOG_RTP, ("[RTP] UDP Timeout after %d ms\n", diff));
sprintf(szMessage, "No data received in %d ms", diff);
RP_SendMessage(ch->owner->service, GF_IP_UDP_TIMEOUT, szMessage);
ch->status = RTP_Unavailable;
}
}
}
}
#endif