recover semaphore in fixed_queue to control the queue capacity

This commit is contained in:
wangmengyang
2016-10-10 20:37:26 +08:00
parent 833eb88679
commit 0b30c22162
3 changed files with 23 additions and 26 deletions

View File

@@ -21,13 +21,14 @@
#include "fixed_queue.h" #include "fixed_queue.h"
#include "list.h" #include "list.h"
#include "osi.h" #include "osi.h"
#include "osi_arch.h"
#include "bt_trace.h" #include "bt_trace.h"
typedef struct fixed_queue_t { typedef struct fixed_queue_t {
list_t *list; list_t *list;
//semaphore_t *enqueue_sem; osi_sem_t enqueue_sem;
//semaphore_t *dequeue_sem; osi_sem_t dequeue_sem;
pthread_mutex_t lock; pthread_mutex_t lock;
size_t capacity; size_t capacity;
@@ -53,15 +54,15 @@ fixed_queue_t *fixed_queue_new(size_t capacity) {
if (!ret->list) if (!ret->list)
goto error; goto error;
/*
ret->enqueue_sem = semaphore_new(capacity); osi_sem_new(&ret->enqueue_sem, capacity, capacity);
if (!ret->enqueue_sem) if (!ret->enqueue_sem)
goto error; goto error;
ret->dequeue_sem = semaphore_new(0); osi_sem_new(&ret->dequeue_sem, capacity, 0);
if (!ret->dequeue_sem) if (!ret->dequeue_sem)
goto error; goto error;
*/
return ret; return ret;
error:; error:;
@@ -81,8 +82,8 @@ void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
free_cb(list_node(node)); free_cb(list_node(node));
list_free(queue->list); list_free(queue->list);
// semaphore_free(queue->enqueue_sem); osi_sem_free(&queue->enqueue_sem);
// semaphore_free(queue->dequeue_sem); osi_sem_free(&queue->dequeue_sem);
pthread_mutex_destroy(&queue->lock); pthread_mutex_destroy(&queue->lock);
osi_free(queue); osi_free(queue);
} }
@@ -108,28 +109,28 @@ void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
assert(queue != NULL); assert(queue != NULL);
assert(data != NULL); assert(data != NULL);
// semaphore_wait(queue->enqueue_sem); osi_sem_wait(&queue->enqueue_sem, 0);
pthread_mutex_lock(&queue->lock); pthread_mutex_lock(&queue->lock);
list_append(queue->list, data); list_append(queue->list, data);
pthread_mutex_unlock(&queue->lock); pthread_mutex_unlock(&queue->lock);
// semaphore_post(queue->dequeue_sem); osi_sem_signal(&queue->dequeue_sem);
} }
void *fixed_queue_dequeue(fixed_queue_t *queue) { void *fixed_queue_dequeue(fixed_queue_t *queue) {
void *ret = NULL; void *ret = NULL;
assert(queue != NULL); assert(queue != NULL);
// semaphore_wait(queue->dequeue_sem); osi_sem_wait(&queue->dequeue_sem, 0);
pthread_mutex_lock(&queue->lock); pthread_mutex_lock(&queue->lock);
ret = list_front(queue->list); ret = list_front(queue->list);
list_remove(queue->list, ret); list_remove(queue->list, ret);
pthread_mutex_unlock(&queue->lock); pthread_mutex_unlock(&queue->lock);
// semaphore_post(queue->enqueue_sem); osi_sem_signal(&queue->enqueue_sem);
return ret; return ret;
} }

View File

@@ -26,7 +26,7 @@ void osi_mutex_unlock(osi_mutex_t *pxMutex);
void osi_mutex_free(osi_mutex_t *pxMutex); void osi_mutex_free(osi_mutex_t *pxMutex);
int osi_sem_new(osi_sem_t *sem, uint8_t count); int osi_sem_new(osi_sem_t *sem, uint32_t max_count, uint32_t init_count);
void osi_sem_signal(osi_sem_t *sem); void osi_sem_signal(osi_sem_t *sem);

View File

@@ -84,22 +84,18 @@ osi_mutex_free(osi_mutex_t *pxMutex)
} }
/*-----------------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------------*/
// Creates and returns a new semaphore. The "count" argument specifies // Creates and returns a new semaphore. The "init_count" argument specifies
// the initial state of the semaphore. TBD finish and test // the initial state of the semaphore, "max_count" specifies the maximum value
// that can be reached.
int int
osi_sem_new(osi_sem_t *sem, uint8_t count) osi_sem_new(osi_sem_t *sem, uint32_t max_count, uint32_t init_count)
{ {
int xReturn = -1; int xReturn = -1;
vSemaphoreCreateBinary(*sem); if (sem) {
*sem = xSemaphoreCreateCounting(max_count, init_count);
if ((*sem) != NULL) { if ((*sem) != NULL) {
if (count == 0) { // Means it can't be taken
xSemaphoreTake(*sem, 1);
}
xReturn = 0; xReturn = 0;
} else { }
; // TBD need assert
} }
return xReturn; return xReturn;