#include "common.h" #include "safe_fifo.h" #include "thread.h" #include "ref.h" #include struct safe_fifo_t { REF_DECLARE(); thread_lock_t* lock; thread_cond_t* empty; size_t pos, fill, size; void** data; }; safe_fifo_t* safe_fifo_new(void) { safe_fifo_t* fifo = calloc(1, sizeof(safe_fifo_t)); if (fifo) { if (!REF_INIT(fifo)) { free(fifo); return NULL; } fifo->lock = thread_lock_new(); fifo->empty = thread_cond_new(); if (!fifo->lock || !fifo->empty) { thread_lock_free(fifo->lock); thread_cond_free(fifo->empty); free(fifo); return NULL; } } return fifo; } void safe_fifo_ref(safe_fifo_t* fifo) { assert(fifo); REF_INC(fifo); } void safe_fifo_unref(safe_fifo_t* fifo) { if (fifo && REF_DEC(fifo)) { thread_cond_free(fifo->empty); thread_lock_free(fifo->lock); free(fifo->data); REF_FREE(fifo); free(fifo); } } void safe_fifo_push(safe_fifo_t* fifo, void* item) { thread_lock_lock(fifo->lock); do { if (fifo->pos > 0) { fifo->fill -= fifo->pos; memmove(fifo->data, fifo->data + fifo->pos, fifo->fill * sizeof(void*)); fifo->pos = 0; } if (fifo->fill == fifo->size) { size_t ns = MAX(4, fifo->size * 2); void** tmp = realloc(fifo->data, ns * sizeof(void*)); if (!tmp) { break; } fifo->size = ns; fifo->data = tmp; } fifo->data[fifo->fill++] = item; } while (false); thread_cond_signal(fifo->empty); thread_lock_unlock(fifo->lock); } void* safe_fifo_pop(safe_fifo_t* fifo) { void* ret; assert(fifo); thread_lock_lock(fifo->lock); for (;;) { if (fifo->fill > 0) { ret = fifo->data[fifo->pos++]; if (fifo->pos == fifo->fill) { fifo->pos = fifo->fill = 0; } break; } thread_cond_wait(fifo->empty, fifo->lock); } thread_lock_unlock(fifo->lock); return ret; } void* safe_fifo_trypop(safe_fifo_t* fifo) { void* ret = NULL; assert(fifo); thread_lock_lock(fifo->lock); if (fifo->fill > 0) { ret = fifo->data[fifo->pos++]; if (fifo->pos == fifo->fill) { fifo->pos = fifo->fill = 0; } } thread_lock_unlock(fifo->lock); return ret; } void* safe_fifo_timedpop(safe_fifo_t* fifo, const struct timespec* abstime) { void* ret; assert(fifo); thread_lock_lock(fifo->lock); for (;;) { if (fifo->fill > 0) { ret = fifo->data[fifo->pos++]; if (fifo->pos == fifo->fill) { fifo->pos = fifo->fill = 0; } break; } if (!thread_cond_timedwait(fifo->empty, fifo->lock, abstime)) { ret = NULL; break; } } thread_lock_unlock(fifo->lock); return ret; }