summaryrefslogtreecommitdiff
path: root/src/safe_fifo.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/safe_fifo.c')
-rw-r--r--src/safe_fifo.c152
1 files changed, 152 insertions, 0 deletions
diff --git a/src/safe_fifo.c b/src/safe_fifo.c
new file mode 100644
index 0000000..cdd634f
--- /dev/null
+++ b/src/safe_fifo.c
@@ -0,0 +1,152 @@
+#include "common.h"
+
+#include "safe_fifo.h"
+#include "thread.h"
+#include "ref.h"
+
+#include <string.h>
+
+struct safe_fifo_t
+{
+ REF_DECLARE();
+ thread_lock_t* lock;
+ thread_cond_t* empty;
+ size_t pos, fill, size;
+ void** data;
+};
+
+safe_fifo_t* safe_fifo_new(void)
+{
+ safe_fifo_t* fifo = calloc(1, sizeof(safe_fifo_t));
+ if (fifo)
+ {
+ if (!REF_INIT(fifo))
+ {
+ free(fifo);
+ return NULL;
+ }
+ fifo->lock = thread_lock_new();
+ fifo->empty = thread_cond_new();
+ if (!fifo->lock || !fifo->empty)
+ {
+ thread_lock_free(fifo->lock);
+ thread_cond_free(fifo->empty);
+ free(fifo);
+ return NULL;
+ }
+ }
+ return fifo;
+}
+
+void safe_fifo_ref(safe_fifo_t* fifo)
+{
+ assert(fifo);
+ REF_INC(fifo);
+}
+
+void safe_fifo_unref(safe_fifo_t* fifo)
+{
+ if (fifo && REF_DEC(fifo))
+ {
+ thread_cond_free(fifo->empty);
+ thread_lock_free(fifo->lock);
+ free(fifo->data);
+ REF_FREE(fifo);
+ free(fifo);
+ }
+}
+
+void safe_fifo_push(safe_fifo_t* fifo, void* item)
+{
+ assert(fifo && item);
+ thread_lock_lock(fifo->lock);
+ do
+ {
+ if (fifo->pos > 0)
+ {
+ fifo->fill -= fifo->pos;
+ memmove(fifo->data, fifo->data + fifo->pos,
+ fifo->fill * sizeof(void*));
+ fifo->pos = 0;
+ }
+ if (fifo->fill == fifo->size)
+ {
+ size_t ns = MAX(4, fifo->size * 2);
+ void** tmp = realloc(fifo->data, ns * sizeof(void*));
+ if (!tmp)
+ {
+ break;
+ }
+ fifo->size = ns;
+ fifo->data = tmp;
+ }
+ fifo->data[fifo->fill++] = item;
+ } while (false);
+ thread_cond_signal(fifo->empty);
+ thread_lock_unlock(fifo->lock);
+}
+
+void* safe_fifo_pop(safe_fifo_t* fifo)
+{
+ void* ret;
+ assert(fifo);
+ thread_lock_lock(fifo->lock);
+ for (;;)
+ {
+ if (fifo->fill > 0)
+ {
+ ret = fifo->data[fifo->pos++];
+ if (fifo->pos == fifo->fill)
+ {
+ fifo->pos = fifo->fill = 0;
+ }
+ break;
+ }
+ thread_cond_wait(fifo->empty, fifo->lock);
+ }
+ thread_lock_unlock(fifo->lock);
+ return ret;
+}
+
+void* safe_fifo_trypop(safe_fifo_t* fifo)
+{
+ void* ret = NULL;
+ assert(fifo);
+ thread_lock_lock(fifo->lock);
+ if (fifo->fill > 0)
+ {
+ ret = fifo->data[fifo->pos++];
+ if (fifo->pos == fifo->fill)
+ {
+ fifo->pos = fifo->fill = 0;
+ }
+ }
+ thread_lock_unlock(fifo->lock);
+ return ret;
+}
+
+void* safe_fifo_timedpop(safe_fifo_t* fifo, const struct timespec* abstime)
+{
+ void* ret;
+ assert(fifo);
+ thread_lock_lock(fifo->lock);
+ for (;;)
+ {
+ if (fifo->fill > 0)
+ {
+ ret = fifo->data[fifo->pos++];
+ if (fifo->pos == fifo->fill)
+ {
+ fifo->pos = fifo->fill = 0;
+ }
+ break;
+ }
+ if (!thread_cond_timedwait(fifo->empty, fifo->lock, abstime))
+ {
+ ret = NULL;
+ break;
+ }
+ }
+ thread_lock_unlock(fifo->lock);
+ return ret;
+}