diff --git a/components/bt/common/osi/thread.c b/components/bt/common/osi/thread.c index c26fc998c2..fa35e87b86 100644 --- a/components/bt/common/osi/thread.c +++ b/components/bt/common/osi/thread.c @@ -19,16 +19,27 @@ #include #include "osi/allocator.h" -#include "osi/fixed_queue.h" +#include "freertos/FreeRTOS.h" +#include "freertos/queue.h" #include "osi/semaphore.h" #include "osi/thread.h" +struct work_item { + osi_thread_func_t func; + void *context; +}; + +struct work_queue { + QueueHandle_t queue; + size_t capacity; +}; + struct osi_thread { void *thread_handle; /*!< Store the thread object */ int thread_id; /*!< May for some OS, such as Linux */ bool stop; uint8_t work_queue_num; /*!< Work queue number */ - fixed_queue_t **work_queues; /*!< Point to queue array, and the priority inverse array index */ + struct work_queue **work_queues; /*!< Point to queue array, and the priority inverse array index */ osi_sem_t work_sem; osi_sem_t stop_sem; }; @@ -39,13 +50,90 @@ struct osi_thread_start_arg { int error; }; -typedef struct { - osi_thread_func_t func; - void *context; -} work_item_t; - static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 100; +static struct work_queue *osi_work_queue_create(size_t capacity) +{ + if (capacity == 0) { + return NULL; + } + + struct work_queue *wq = (struct work_queue *)osi_malloc(sizeof(struct work_queue)); + if (wq != NULL) { + wq->queue = xQueueCreate(capacity, sizeof(struct work_item)); + if (wq->queue != 0) { + wq->capacity = capacity; + return wq; + } else { + osi_free(wq); + } + } + + return NULL; +} + +static void osi_work_queue_delete(struct work_queue *wq) +{ + if (wq != NULL) { + if (wq->queue != 0) { + vQueueDelete(wq->queue); + } + wq->queue = 0; + wq->capacity = 0; + osi_free(wq); + } + return; +} + +static bool osi_thead_work_queue_get(struct work_queue *wq, struct work_item *item) +{ + assert (wq != NULL); + assert (wq->queue != 0); + assert (item != NULL); + + if (pdTRUE == xQueueReceive(wq->queue, item, 0)) { + return true; + } else { + return false; + } +} + +static bool osi_thead_work_queue_put(struct work_queue *wq, const struct work_item *item, uint32_t timeout) +{ + assert (wq != NULL); + assert (wq->queue != 0); + assert (item != NULL); + + bool ret = true; + if (timeout == OSI_SEM_MAX_TIMEOUT) { + if (xQueueSend(wq->queue, item, portMAX_DELAY) != pdTRUE) { + ret = false; + } + } else { + if (xQueueSend(wq->queue, item, timeout / portTICK_PERIOD_MS) != pdTRUE) { + ret = false; + } + } + + return ret; +} + +static size_t osi_thead_work_queue_len(struct work_queue *wq) +{ + assert (wq != NULL); + assert (wq->queue != 0); + assert (wq->capacity != 0); + + size_t available_spaces = (size_t)uxQueueSpacesAvailable(wq->queue); + + if (available_spaces <= wq->capacity) { + return wq->capacity - available_spaces; + } else { + assert (0); + } + return 0; +} + static void osi_thread_run(void *arg) { struct osi_thread_start_arg *start = (struct osi_thread_start_arg *)arg; @@ -62,11 +150,10 @@ static void osi_thread_run(void *arg) break; } + struct work_item item; while (!thread->stop && idx < thread->work_queue_num) { - work_item_t *item = fixed_queue_dequeue(thread->work_queues[idx], 0); - if (item) { - item->func(item->context); - osi_free(item); + if (osi_thead_work_queue_get(thread->work_queues[idx], &item) == true) { + item.func(item.context); idx = 0; continue; } else { @@ -125,13 +212,13 @@ osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priorit thread->stop = false; thread->work_queue_num = work_queue_num; - thread->work_queues = (fixed_queue_t **)osi_malloc(sizeof(fixed_queue_t *) * work_queue_num); + thread->work_queues = (struct work_queue **)osi_malloc(sizeof(struct work_queue *) * work_queue_num); if (thread->work_queues == NULL) { goto _err; } for (int i = 0; i < thread->work_queue_num; i++) { - thread->work_queues[i] = fixed_queue_new(DEFAULT_WORK_QUEUE_CAPACITY); + thread->work_queues[i] = osi_work_queue_create(DEFAULT_WORK_QUEUE_CAPACITY); if (thread->work_queues[i] == NULL) { goto _err; } @@ -175,12 +262,14 @@ _err: for (int i = 0; i < thread->work_queue_num; i++) { if (thread->work_queues[i]) { - fixed_queue_free(thread->work_queues[i], osi_free_func); + osi_work_queue_delete(thread->work_queues[i]); } + thread->work_queues[i] = NULL; } if (thread->work_queues) { osi_free(thread->work_queues); + thread->work_queues = NULL; } if (thread->work_sem) { @@ -206,12 +295,14 @@ void osi_thread_free(osi_thread_t *thread) for (int i = 0; i < thread->work_queue_num; i++) { if (thread->work_queues[i]) { - fixed_queue_free(thread->work_queues[i], osi_free_func); + osi_work_queue_delete(thread->work_queues[i]); + thread->work_queues[i] = NULL; } } if (thread->work_queues) { osi_free(thread->work_queues); + thread->work_queues = NULL; } if (thread->work_sem) { @@ -235,15 +326,12 @@ bool osi_thread_post(osi_thread_t *thread, osi_thread_func_t func, void *context return false; } - work_item_t *item = (work_item_t *)osi_malloc(sizeof(work_item_t)); - if (item == NULL) { - return false; - } - item->func = func; - item->context = context; + struct work_item item; - if (fixed_queue_enqueue(thread->work_queues[queue_idx], item, timeout) == false) { - osi_free(item); + item.func = func; + item.context = context; + + if (osi_thead_work_queue_put(thread->work_queues[queue_idx], &item, timeout) == false) { return false; } @@ -273,5 +361,5 @@ int osi_thread_queue_wait_size(osi_thread_t *thread, int wq_idx) return -1; } - return fixed_queue_length(thread->work_queues[wq_idx]); + return (int)(osi_thead_work_queue_len(thread->work_queues[wq_idx])); }