This source file includes following definitions.
- dc_message_queue_init
- dc_message_queue_put
- dc_message_queue_get
- dc_message_queue_flush
- dc_message_queue_free
#include "message_queue.h"
void dc_message_queue_init(MessageQueue *mq)
{
memset(mq, 0, sizeof(MessageQueue));
mq->first_node = NULL;
mq->last_node = NULL;
mq->nb_nodes = 0;
mq->mutex = gf_mx_new("MessageQueue Mutex");
mq->sem = gf_sema_new(1000, 0);
}
void dc_message_queue_put(MessageQueue *mq, void *data, u32 size)
{
MessageQueueNode *mqn = (MessageQueueNode*)gf_malloc(sizeof(MessageQueueNode));
mqn->data = gf_malloc(size);
memcpy(mqn->data, data, size);
mqn->size = size;
mqn->next = NULL;
gf_mx_p(mq->mutex);
if (!mq->last_node)
mq->first_node = mqn;
else
mq->last_node->next = mqn;
mq->last_node = mqn;
mq->nb_nodes++;
gf_sema_notify(mq->sem, 1);
gf_mx_v(mq->mutex);
}
int dc_message_queue_get(MessageQueue *mq, void * data)
{
int ret = 0;
MessageQueueNode *mqn;
gf_mx_p(mq->mutex);
mqn = mq->first_node;
if (!mqn) {
gf_mx_v(mq->mutex);
ret = gf_sema_wait_for(mq->sem, 10000);
gf_mx_p(mq->mutex);
mqn = mq->first_node;
if (!ret || !mqn) {
gf_mx_v(mq->mutex);
return -1;
}
}
if (mqn) {
mq->first_node = mqn->next;
if (!mq->first_node)
mq->last_node = NULL;
mq->nb_nodes--;
memcpy(data, mqn->data, mqn->size);
ret = (int)mqn->size;
gf_free(mqn->data);
gf_free(mqn);
}
gf_mx_v(mq->mutex);
return ret;
}
void dc_message_queue_flush(MessageQueue *mq)
{
MessageQueueNode *mqn, *mqn1;
gf_mx_p(mq->mutex);
for (mqn = mq->first_node; mqn != NULL; mqn = mqn1) {
mqn1 = mqn->next;
gf_free(mqn);
}
mq->last_node = NULL;
mq->first_node = NULL;
mq->nb_nodes = 0;
gf_sema_notify(mq->sem, 1);
gf_mx_v(mq->mutex);
}
void dc_message_queue_free(MessageQueue *mq)
{
dc_message_queue_flush(mq);
gf_mx_del(mq->mutex);
gf_sema_del(mq->sem);
}