diff options
Diffstat (limited to 'src/safe_fifo.c')
| -rw-r--r-- | src/safe_fifo.c | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/src/safe_fifo.c b/src/safe_fifo.c new file mode 100644 index 0000000..cdd634f --- /dev/null +++ b/src/safe_fifo.c @@ -0,0 +1,152 @@ +#include "common.h" + +#include "safe_fifo.h" +#include "thread.h" +#include "ref.h" + +#include <string.h> + +struct safe_fifo_t +{ + REF_DECLARE(); + thread_lock_t* lock; + thread_cond_t* empty; + size_t pos, fill, size; + void** data; +}; + +safe_fifo_t* safe_fifo_new(void) +{ + safe_fifo_t* fifo = calloc(1, sizeof(safe_fifo_t)); + if (fifo) + { + if (!REF_INIT(fifo)) + { + free(fifo); + return NULL; + } + fifo->lock = thread_lock_new(); + fifo->empty = thread_cond_new(); + if (!fifo->lock || !fifo->empty) + { + thread_lock_free(fifo->lock); + thread_cond_free(fifo->empty); + free(fifo); + return NULL; + } + } + return fifo; +} + +void safe_fifo_ref(safe_fifo_t* fifo) +{ + assert(fifo); + REF_INC(fifo); +} + +void safe_fifo_unref(safe_fifo_t* fifo) +{ + if (fifo && REF_DEC(fifo)) + { + thread_cond_free(fifo->empty); + thread_lock_free(fifo->lock); + free(fifo->data); + REF_FREE(fifo); + free(fifo); + } +} + +void safe_fifo_push(safe_fifo_t* fifo, void* item) +{ + assert(fifo && item); + thread_lock_lock(fifo->lock); + do + { + if (fifo->pos > 0) + { + fifo->fill -= fifo->pos; + memmove(fifo->data, fifo->data + fifo->pos, + fifo->fill * sizeof(void*)); + fifo->pos = 0; + } + if (fifo->fill == fifo->size) + { + size_t ns = MAX(4, fifo->size * 2); + void** tmp = realloc(fifo->data, ns * sizeof(void*)); + if (!tmp) + { + break; + } + fifo->size = ns; + fifo->data = tmp; + } + fifo->data[fifo->fill++] = item; + } while (false); + thread_cond_signal(fifo->empty); + thread_lock_unlock(fifo->lock); +} + +void* safe_fifo_pop(safe_fifo_t* fifo) +{ + void* ret; + assert(fifo); + thread_lock_lock(fifo->lock); + for (;;) + { + if (fifo->fill > 0) + { + ret = fifo->data[fifo->pos++]; + if (fifo->pos == fifo->fill) + { + fifo->pos = fifo->fill = 0; + } + break; + } + thread_cond_wait(fifo->empty, fifo->lock); + } + thread_lock_unlock(fifo->lock); + return ret; +} + +void* safe_fifo_trypop(safe_fifo_t* fifo) +{ + void* ret = NULL; + assert(fifo); + thread_lock_lock(fifo->lock); + if (fifo->fill > 0) + { + ret = fifo->data[fifo->pos++]; + if (fifo->pos == fifo->fill) + { + fifo->pos = fifo->fill = 0; + } + } + thread_lock_unlock(fifo->lock); + return ret; +} + +void* safe_fifo_timedpop(safe_fifo_t* fifo, const struct timespec* abstime) +{ + void* ret; + assert(fifo); + thread_lock_lock(fifo->lock); + for (;;) + { + if (fifo->fill > 0) + { + ret = fifo->data[fifo->pos++]; + if (fifo->pos == fifo->fill) + { + fifo->pos = fifo->fill = 0; + } + break; + } + if (!thread_cond_timedwait(fifo->empty, fifo->lock, abstime)) + { + ret = NULL; + break; + } + } + thread_lock_unlock(fifo->lock); + return ret; +} |
