This source file includes following definitions.
- gf_term_init_scheduler
- gf_term_stop_scheduler
- mm_get_codec
- gf_term_add_codec
- gf_term_remove_codec
- gf_term_find_codec
- MM_SimulationStep_Decoder
- MM_Loop
- RunSingleDec
- gf_term_start_codec
- gf_term_stop_codec
- gf_term_set_threading
- gf_term_lock_codec
- gf_term_set_priority
- gf_term_process_step
- gf_term_process_flush
- gf_term_process_flush_video
#include <gpac/internal/terminal_dev.h>
#include "media_memory.h"
#include <gpac/internal/compositor_dev.h>
u32 MM_Loop(void *par);
enum
{
GF_MM_CE_RUNNING= 1,
GF_MM_CE_HAS_ERROR = 1<<1,
GF_MM_CE_THREADED = 1<<2,
GF_MM_CE_REQ_THREAD = 1<<3,
GF_MM_CE_DEAD = 1<<4,
GF_MM_CE_DISCARDED = 1<<5,
};
typedef struct
{
u32 flags;
GF_Codec *dec;
GF_Thread *thread;
GF_Mutex *mx;
} CodecEntry;
GF_Err gf_term_init_scheduler(GF_Terminal *term, u32 threading_mode)
{
term->mm_mx = gf_mx_new("MediaManager");
term->codecs = gf_list_new();
term->frame_duration = 33;
switch (threading_mode) {
case GF_TERM_THREAD_SINGLE:
term->flags |= GF_TERM_SINGLE_THREAD;
break;
case GF_TERM_THREAD_MULTI:
term->flags |= GF_TERM_MULTI_THREAD;
break;
default:
break;
}
if (term->user->init_flags & GF_TERM_NO_DECODER_THREAD)
return GF_OK;
term->mm_thread = gf_th_new("MediaManager");
term->flags |= GF_TERM_RUNNING;
term->priority = GF_THREAD_PRIORITY_NORMAL;
gf_th_run(term->mm_thread, MM_Loop, term);
return GF_OK;
}
void gf_term_stop_scheduler(GF_Terminal *term)
{
if (term->mm_thread) {
u32 count, i;
term->flags &= ~GF_TERM_RUNNING;
while (!(term->flags & GF_TERM_DEAD) )
gf_sleep(2);
count = gf_list_count(term->codecs);
for (i=0; i<count; i++) {
CodecEntry *ce = gf_list_get(term->codecs, i);
if (ce->flags & GF_MM_CE_DISCARDED) {
gf_free(ce);
gf_list_rem(term->codecs, i);
count--;
i--;
}
}
assert(! gf_list_count(term->codecs));
gf_th_del(term->mm_thread);
}
gf_list_del(term->codecs);
gf_mx_del(term->mm_mx);
}
static CodecEntry *mm_get_codec(GF_List *list, GF_Codec *codec)
{
CodecEntry *ce;
u32 i = 0;
while ((ce = (CodecEntry*)gf_list_enum(list, &i))) {
if (ce->dec==codec) return ce;
}
return NULL;
}
void gf_term_add_codec(GF_Terminal *term, GF_Codec *codec)
{
u32 i, count;
Bool threaded;
CodecEntry *cd;
CodecEntry *ptr, *next;
GF_CodecCapability cap;
assert(codec);
GF_LOG(GF_LOG_DEBUG, GF_LOG_MEDIA, ("[Terminal] Registering codec %s\n", codec->decio ? codec->decio->module_name : "RAW"));
gf_mx_p(term->mm_mx);
cd = mm_get_codec(term->codecs, codec);
if (cd) goto exit;
GF_SAFEALLOC(cd, CodecEntry);
if (!cd) {
GF_LOG(GF_LOG_ERROR, GF_LOG_MEDIA, ("[Terminal] Failed to allocate decoder entry\n"));
return;
}
cd->dec = codec;
if (!cd->dec->Priority)
cd->dec->Priority = 1;
if (codec->type==GF_STREAM_AUDIO) {
threaded = 1;
} else {
cap.CapCode = GF_CODEC_WANTS_THREAD;
cap.cap.valueInt = 0;
gf_codec_get_capability(codec, &cap);
threaded = cap.cap.valueInt;
}
if (threaded) cd->flags |= GF_MM_CE_REQ_THREAD;
if (term->flags & GF_TERM_MULTI_THREAD) {
if ((codec->type==GF_STREAM_AUDIO) || (codec->type==GF_STREAM_VISUAL)) threaded = 1;
} else if (term->flags & GF_TERM_SINGLE_THREAD) {
threaded = 0;
}
if (codec->flags & GF_ESM_CODEC_IS_RAW_MEDIA)
threaded = 0;
if (threaded) {
cd->thread = gf_th_new(cd->dec->decio->module_name);
cd->mx = gf_mx_new(cd->dec->decio->module_name);
cd->flags |= GF_MM_CE_THREADED;
gf_list_add(term->codecs, cd);
goto exit;
}
count = gf_list_count(term->codecs);
for (i=0; i<count; i++) {
ptr = (CodecEntry*)gf_list_get(term->codecs, i);
if (ptr->flags & GF_MM_CE_THREADED) continue;
if (ptr->dec->Priority > codec->Priority) continue;
if (ptr->dec->Priority == codec->Priority) {
if (ptr->dec->type < codec->type) {
gf_list_insert(term->codecs, cd, i);
goto exit;
}
if (ptr->dec->type == codec->type) {
if (i+1==count) {
gf_list_add(term->codecs, cd);
} else {
gf_list_insert(term->codecs, cd, i+1);
}
goto exit;
}
if (i+1 == count) {
gf_list_add(term->codecs, cd);
goto exit;
}
next = (CodecEntry*)gf_list_get(term->codecs, i+1);
if ((next->flags & GF_MM_CE_THREADED) || (next->dec->Priority != codec->Priority)) {
gf_list_insert(term->codecs, cd, i+1);
goto exit;
}
continue;
}
gf_list_insert(term->codecs, cd, i);
goto exit;
}
gf_list_add(term->codecs, cd);
exit:
gf_mx_v(term->mm_mx);
return;
}
void gf_term_remove_codec(GF_Terminal *term, GF_Codec *codec)
{
u32 i;
Bool locked;
CodecEntry *ce;
GF_LOG(GF_LOG_DEBUG, GF_LOG_MEDIA, ("[Terminal] Unregistering codec %s\n", codec->decio ? codec->decio->module_name : "RAW"));
locked = gf_mx_try_lock(term->mm_mx);
i=0;
while ((ce = (CodecEntry*)gf_list_enum(term->codecs, &i))) {
if (ce->dec != codec) continue;
if (ce->thread) {
if (ce->flags & GF_MM_CE_RUNNING) {
ce->flags &= ~GF_MM_CE_RUNNING;
while (! (ce->flags & GF_MM_CE_DEAD)) gf_sleep(10);
ce->flags &= ~GF_MM_CE_DEAD;
}
gf_th_del(ce->thread);
gf_mx_del(ce->mx);
}
if (locked) {
gf_free(ce);
gf_list_rem(term->codecs, i-1);
} else {
ce->flags |= GF_MM_CE_DISCARDED;
}
break;
}
if (locked) gf_mx_v(term->mm_mx);
return;
}
Bool gf_term_find_codec(GF_Terminal *term, GF_Codec *codec)
{
CodecEntry *ce;
u32 i=0;
while ((ce = (CodecEntry*)gf_list_enum(term->codecs, &i))) {
if (ce->dec == codec) return 1;
}
return 0;
}
static u32 MM_SimulationStep_Decoder(GF_Terminal *term, u32 *nb_active_decs)
{
CodecEntry *ce;
GF_Err e;
u32 count, remain;
u32 time_taken, time_slice, time_left;
#ifndef GF_DISABLE_LOG
term->compositor->networks_time = gf_sys_clock();
#endif
gf_term_handle_services(term);
#ifndef GF_DISABLE_LOG
term->compositor->networks_time = gf_sys_clock() - term->compositor->networks_time;
#endif
#ifndef GF_DISABLE_LOG
term->compositor->decoders_time = gf_sys_clock();
#endif
gf_mx_p(term->mm_mx);
count = gf_list_count(term->codecs);
time_left = term->frame_duration;
*nb_active_decs = 0;
if (term->last_codec >= count) term->last_codec = 0;
remain = count;
while (remain) {
ce = (CodecEntry*)gf_list_get(term->codecs, term->last_codec);
if (!ce) break;
if (!(ce->flags & GF_MM_CE_RUNNING) || (ce->flags & GF_MM_CE_THREADED) || ce->dec->force_cb_resize) {
remain--;
if (!remain) break;
term->last_codec = (term->last_codec + 1) % count;
continue;
}
time_slice = ce->dec->Priority * time_left / term->cumulated_priority;
if (ce->dec->PriorityBoost) time_slice *= 2;
time_taken = gf_sys_clock();
(*nb_active_decs) ++;
e = gf_codec_process(ce->dec, time_slice);
time_taken = gf_sys_clock() - time_taken;
#ifndef GPAC_DISABLE_LOG
if (e) {
GF_LOG(GF_LOG_WARNING, GF_LOG_CODEC, ("[ODM%d] Decoding Error %s\n", ce->dec->odm->OD->objectDescriptorID, gf_error_to_string(e) ));
} else {
}
#endif
if (ce->flags & GF_MM_CE_DISCARDED) {
gf_free(ce);
gf_list_rem(term->codecs, term->last_codec);
count--;
if (!count)
break;
} else {
if (ce->dec->CB && (ce->dec->CB->UnitCount >= ce->dec->CB->Min)) ce->dec->PriorityBoost = 0;
}
term->last_codec = (term->last_codec + 1) % count;
remain -= 1;
if (time_left > time_taken) {
time_left -= time_taken;
if (!remain) break;
} else {
time_left = 0;
break;
}
}
gf_mx_v(term->mm_mx);
#ifndef GF_DISABLE_LOG
term->compositor->decoders_time = gf_sys_clock() - term->compositor->decoders_time;
#endif
return time_left;
}
u32 MM_Loop(void *par)
{
GF_Terminal *term = (GF_Terminal *) par;
Bool do_scene = (term->flags & GF_TERM_NO_VISUAL_THREAD) ? 1 : 0;
Bool do_codec = (term->flags & GF_TERM_NO_DECODER_THREAD) ? 0 : 1;
Bool do_regulate = (term->user->init_flags & GF_TERM_NO_REGULATION) ? 0 : 1;
gf_th_set_priority(term->mm_thread, term->priority);
GF_LOG(GF_LOG_DEBUG, GF_LOG_CORE, ("[MediaManager] Entering thread ID %d\n", gf_th_id() ));
while (term->flags & GF_TERM_RUNNING) {
u32 nb_decs = 0;
u32 left = 0;
if (do_codec) left = MM_SimulationStep_Decoder(term, &nb_decs);
else left = term->frame_duration;
if (do_scene) {
s32 ms_until_next=0;
u32 time_taken = gf_sys_clock();
gf_sc_draw_frame(term->compositor, 0, &ms_until_next);
time_taken = gf_sys_clock() - time_taken;
if (ms_until_next< (s32) term->frame_duration/2) {
left = 0;
} else if (left>time_taken)
left -= time_taken;
else
left = 0;
}
if (do_regulate) {
if (term->bench_mode) {
gf_sleep(0);
} else {
if (left==term->frame_duration) {
gf_sleep(nb_decs ? 0 : term->frame_duration/2);
}
}
}
}
term->flags |= GF_TERM_DEAD;
return 0;
}
u32 RunSingleDec(void *ptr)
{
GF_Err e;
u64 time_taken;
CodecEntry *ce = (CodecEntry *) ptr;
GF_LOG(GF_LOG_DEBUG, GF_LOG_CORE, ("[MediaDecoder %d] Entering thread ID %d\n", ce->dec->odm->OD->objectDescriptorID, gf_th_id() ));
while (ce->flags & GF_MM_CE_RUNNING) {
time_taken = gf_sys_clock_high_res();
if (!ce->dec->force_cb_resize) {
gf_mx_p(ce->mx);
e = gf_codec_process(ce->dec, ce->dec->odm->term->frame_duration);
if (e) gf_term_message(ce->dec->odm->term, ce->dec->odm->net_service->url, "Decoding Error", e);
gf_mx_v(ce->mx);
}
time_taken = gf_sys_clock_high_res() - time_taken;
if (!ce->dec->CB || (ce->dec->CB->UnitCount == ce->dec->CB->Capacity))
ce->dec->PriorityBoost = 0;
if (ce->dec->PriorityBoost) continue;
if (time_taken<20) {
gf_sleep(1);
}
}
ce->flags |= GF_MM_CE_DEAD;
return 0;
}
void gf_term_start_codec(GF_Codec *codec, Bool is_resume)
{
GF_CodecCapability cap;
CodecEntry *ce;
GF_Terminal *term = codec->odm->term;
if (!gf_list_count(codec->odm->channels)) return;
ce = mm_get_codec(term->codecs, codec);
if (!ce) return;
if (ce->mx) gf_mx_p(ce->mx);
if (!is_resume && codec->CB) gf_cm_reset(codec->CB);
if (!is_resume) {
cap.CapCode = GF_CODEC_WAIT_RAP;
gf_codec_set_capability(codec, cap);
if (codec->decio && (codec->decio->InterfaceType == GF_SCENE_DECODER_INTERFACE)) {
cap.CapCode = GF_CODEC_SHOW_SCENE;
cap.cap.valueInt = 1;
gf_codec_set_capability(codec, cap);
}
}
gf_codec_set_status(codec, GF_ESM_CODEC_PLAY);
if (!(ce->flags & GF_MM_CE_RUNNING)) {
ce->flags |= GF_MM_CE_RUNNING;
if (ce->thread) {
gf_th_run(ce->thread, RunSingleDec, ce);
gf_th_set_priority(ce->thread, term->priority);
} else {
term->cumulated_priority += ce->dec->Priority+1;
}
}
if (ce->mx)
gf_mx_v(ce->mx);
}
void gf_term_stop_codec(GF_Codec *codec, u32 reason)
{
GF_CodecCapability cap;
Bool locked = 0;
CodecEntry *ce;
GF_Terminal *term = codec->odm->term;
ce = mm_get_codec(term->codecs, codec);
if (!ce) return;
if (ce->mx) gf_mx_p(ce->mx);
else if (codec->CB) {
locked = 1;
gf_mx_p(term->mm_mx);
} else {
locked = gf_mx_try_lock(term->mm_mx);
}
if (reason == 0) {
cap.CapCode = GF_CODEC_ABORT;
cap.cap.valueInt = 0;
gf_codec_set_capability(codec, cap);
if (codec->decio && codec->odm->mo && (codec->odm->mo->flags & GF_MO_DISPLAY_REMOVE) ) {
cap.CapCode = GF_CODEC_SHOW_SCENE;
cap.cap.valueInt = 0;
gf_codec_set_capability(codec, cap);
codec->odm->mo->flags &= ~GF_MO_DISPLAY_REMOVE;
}
}
if (codec->type==GF_STREAM_AUDIO) {
if (reason != 2) {
gf_codec_set_status(codec, GF_ESM_CODEC_STOP);
}
}
else if ((codec->Status<GF_ESM_CODEC_EOS) && codec->odm && codec->odm->parentscene && codec->odm->parentscene->is_dynamic_scene && codec->CB && (codec->CB->Capacity>1)) {
gf_codec_set_status(codec, GF_ESM_CODEC_STOP);
}
else {
codec->Status = GF_ESM_CODEC_STOP;
}
if ((reason==2) && codec->CB) {
gf_cm_set_eos(codec->CB);
}
if (ce->flags & GF_MM_CE_RUNNING) {
ce->flags &= ~GF_MM_CE_RUNNING;
if (!ce->thread)
term->cumulated_priority -= codec->Priority+1;
}
if (codec->CB) gf_cm_abort_buffering(codec->CB);
if (ce->mx) gf_mx_v(ce->mx);
else if (locked) gf_mx_v(term->mm_mx);
}
void gf_term_set_threading(GF_Terminal *term, u32 mode)
{
u32 i;
Bool thread_it, restart_it;
CodecEntry *ce;
switch (mode) {
case GF_TERM_THREAD_SINGLE:
if (term->flags & GF_TERM_SINGLE_THREAD) return;
term->flags &= ~GF_TERM_MULTI_THREAD;
term->flags |= GF_TERM_SINGLE_THREAD;
break;
case GF_TERM_THREAD_MULTI:
if (term->flags & GF_TERM_MULTI_THREAD) return;
term->flags &= ~GF_TERM_SINGLE_THREAD;
term->flags |= GF_TERM_MULTI_THREAD;
break;
default:
if (!(term->flags & (GF_TERM_MULTI_THREAD | GF_TERM_SINGLE_THREAD) ) ) return;
term->flags &= ~GF_TERM_SINGLE_THREAD;
term->flags &= ~GF_TERM_MULTI_THREAD;
break;
}
gf_mx_p(term->mm_mx);
i=0;
while ((ce = (CodecEntry*)gf_list_enum(term->codecs, &i))) {
thread_it = 0;
if ((mode == GF_TERM_THREAD_FREE) && (ce->flags & GF_MM_CE_REQ_THREAD)) thread_it = 1;
else if (mode == GF_TERM_THREAD_MULTI) thread_it = 1;
if (thread_it && (ce->flags & GF_MM_CE_THREADED)) continue;
if (!thread_it && !(ce->flags & GF_MM_CE_THREADED)) continue;
restart_it = 0;
if (ce->flags & GF_MM_CE_RUNNING) {
restart_it = 1;
ce->flags &= ~GF_MM_CE_RUNNING;
}
if (ce->flags & GF_MM_CE_THREADED) {
while (!(ce->flags & GF_MM_CE_DEAD)) gf_sleep(1);
ce->flags &= ~GF_MM_CE_DEAD;
gf_th_del(ce->thread);
ce->thread = NULL;
gf_mx_del(ce->mx);
ce->mx = NULL;
ce->flags &= ~GF_MM_CE_THREADED;
} else {
term->cumulated_priority -= ce->dec->Priority+1;
}
if (thread_it) {
ce->flags |= GF_MM_CE_THREADED;
ce->thread = gf_th_new(ce->dec->decio->module_name);
ce->mx = gf_mx_new(ce->dec->decio->module_name);
}
if (restart_it) {
ce->flags |= GF_MM_CE_RUNNING;
if (ce->thread) {
gf_th_run(ce->thread, RunSingleDec, ce);
gf_th_set_priority(ce->thread, term->priority);
} else {
term->cumulated_priority += ce->dec->Priority+1;
}
}
}
gf_mx_v(term->mm_mx);
}
Bool gf_term_lock_codec(GF_Codec *codec, Bool lock, Bool trylock)
{
Bool res = 1;
CodecEntry *ce;
GF_Terminal *term = codec->odm->term;
ce = mm_get_codec(term->codecs, codec);
if (!ce) return 0;
if (ce->mx) {
if (lock) {
if (trylock) {
res = gf_mx_try_lock(ce->mx);
} else {
res = gf_mx_p(ce->mx);
}
}
else gf_mx_v(ce->mx);
}
else {
if (lock) {
if (trylock) {
res = gf_mx_try_lock(term->mm_mx);
} else {
res = gf_mx_p(term->mm_mx);
}
}
else gf_mx_v(term->mm_mx);
}
return res;
}
void gf_term_set_priority(GF_Terminal *term, s32 Priority)
{
u32 i;
CodecEntry *ce;
gf_mx_p(term->mm_mx);
gf_th_set_priority(term->mm_thread, Priority);
i=0;
while ((ce = (CodecEntry*)gf_list_enum(term->codecs, &i))) {
if (ce->flags & GF_MM_CE_THREADED)
gf_th_set_priority(ce->thread, Priority);
}
term->priority = Priority;
gf_mx_v(term->mm_mx);
}
GF_EXPORT
u32 gf_term_process_step(GF_Terminal *term)
{
u32 nb_decs=0;
u32 sleep_time=0;
u32 dec_time = 0, step_start_time = gf_sys_clock();
if (term->flags & GF_TERM_NO_DECODER_THREAD) {
MM_SimulationStep_Decoder(term, &nb_decs);
dec_time = gf_sys_clock() - step_start_time;
}
if (term->flags & GF_TERM_NO_COMPOSITOR_THREAD) {
s32 ms_until_next;
gf_sc_draw_frame(term->compositor, 0, &ms_until_next);
if ((ms_until_next>=0) && ((u32) ms_until_next > dec_time)) {
sleep_time = ms_until_next - dec_time;
}
} else {
if (dec_time < term->frame_duration) {
sleep_time = term->frame_duration - dec_time;
}
}
if (term->bench_mode || (term->user->init_flags & GF_TERM_NO_REGULATION)) return sleep_time;
assert((s32) sleep_time >= 0);
if (sleep_time>33) sleep_time = 33;
gf_sleep(sleep_time);
return sleep_time;
}
GF_EXPORT
GF_Err gf_term_process_flush(GF_Terminal *term)
{
u32 i;
CodecEntry *ce;
u32 diff, now = gf_sys_clock();
if (!(term->flags & GF_TERM_NO_COMPOSITOR_THREAD) ) return GF_BAD_PARAM;
while (1) {
if (term->flags & GF_TERM_NO_DECODER_THREAD) {
gf_term_handle_services(term);
gf_mx_p(term->mm_mx);
i=0;
while ((ce = (CodecEntry*)gf_list_enum(term->codecs, &i))) {
gf_codec_process(ce->dec, 10000);
}
gf_mx_v(term->mm_mx);
}
if (!gf_sc_draw_frame(term->compositor, 1, NULL)) {
if (!term->root_scene || !term->root_scene->root_od)
break;
if (gf_list_count(term->media_queue) )
continue;
if (gf_sc_check_audio_pending(term->compositor) )
continue;
if (gf_scene_check_clocks(term->root_scene->root_od->net_service, term->root_scene, 1))
break;
diff = gf_sys_clock() - now;
if (diff>30000) {
GF_LOG(GF_LOG_ERROR, GF_LOG_MEDIA, ("[Terminal] Waited more than %d ms to flush frame - aborting\n", diff));
return GF_IP_UDP_TIMEOUT;
}
}
if (! (term->user->init_flags & GF_TERM_NO_REGULATION))
break;
}
return GF_OK;
}
GF_EXPORT
GF_Err gf_term_process_flush_video(GF_Terminal *term)
{
if (!(term->flags & GF_TERM_NO_COMPOSITOR_THREAD) ) return GF_BAD_PARAM;
gf_sc_flush_video(term->compositor);
return GF_OK;
}