Потокобезопасная ФИФО в С99


Я начал небольшой проект на выходные, чтобы попробовать и узнать, как многопоточный с чистого C99 и POSIX-нитей. Проект состоит из трех потоков, ввода, обработки и вывода, которые общаются друг с другом через FIFO-очередей. Именно у нас IN->FIFO->PROC->FIFO->OUT. Я только что получил реализации FIFO на работу, быть потокобезопасным, и я действительно хотел некоторые отзывы о том, что хорошо/плохо и что можно улучшить.

ФИФО.ч

#ifndef INC_05_ENC_DEC_FIFO_H
#define INC_05_ENC_DEC_FIFO_H
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include <jemalloc/jemalloc.h>

struct list_node {
    struct list_node *prev;
    struct list_node *next;
    uint8_t *data;
};

typedef struct FIFO {
    struct list_node *first;
    struct list_node *last;

    size_t count;
    pthread_mutex_t *mutex;

    size_t (*count_mutex)(struct FIFO*);
    void (*enqueue)(struct FIFO*, uint8_t*);
    uint8_t* (*dequeue)(struct FIFO*);
    void (*free)(struct FIFO**, bool);
} fifo_t;

struct thread_bus {
    fifo_t *input;
    fifo_t *output;
    bool kill;
};

fifo_t *fifo_init();
void fifo_enqueue(fifo_t *queue, uint8_t *data);
uint8_t *fifo_dequeue(fifo_t *queue);
void fifo_free(fifo_t **queue, bool free_data);
size_t fifo_count(fifo_t *queue);

#endif //INC_05_ENC_DEC_FIFO_H

ФИФО.с

#include "fifo.h"

/*
 * Initializes a FIFO queue
 */
fifo_t *fifo_init() {
    fifo_t *queue = calloc(1, sizeof(fifo_t)); // Allocate struct
    queue->last = NULL;
    queue->first = NULL;
    queue->mutex = calloc(1, sizeof(pthread_mutex_t)); // Allocate mutex
    pthread_mutex_init(queue->mutex, NULL); // Initialize the mutex, no attributes needed
    queue->count = 0; // To allow for O(1) counting

    // Function pointers
    queue->count_mutex = fifo_count;
    queue->enqueue = fifo_enqueue;
    queue->dequeue = fifo_dequeue;
    queue->free = fifo_free;

    return queue;
}

/*
 * Enqueues into FIFO
 */
void fifo_enqueue(fifo_t *queue, uint8_t *data) {
    pthread_mutex_lock(queue->mutex); // Lock
    // Allocate the node
    struct list_node *new = calloc(1, sizeof(struct list_node));
    new->data = data; // Link data

    // If we are enqueuing on an empty list, set first and last to be the singleton node
    if (queue->first == NULL) {
        queue->first = queue->last = new;
        ++queue->count;
        pthread_mutex_unlock(queue->mutex);
        return;
    }

    // Attach node
    queue->last->next = new;
    new->prev = queue->last;

    queue->last = new;
    ++queue->count; // Increment element count

    pthread_mutex_unlock(queue->mutex); // Unlock
}

/*
 * Dequeue from FIFO
 */
uint8_t *fifo_dequeue(fifo_t *queue) {
    pthread_mutex_lock(queue->mutex); // Lock
    // Dequeueing on an empty FIFO returns NULL
    if (queue->first == NULL) {
        pthread_mutex_unlock(queue->mutex);
        return NULL;
    }

    // Detach node
    struct list_node *first = queue->first;
    queue->first = first->next;
    first->prev = NULL;

    // Save data pointer, free node
    uint8_t *data = first->data;
    free(first);

    --queue->count; // Decrement element count
    pthread_mutex_unlock(queue->mutex); // Unlock

    return data;
}

/*
 * Free FIFO
 */
void fifo_free(fifo_t **queue, bool free_data) {
    pthread_mutex_lock((*queue)->mutex); // Lock

    // Iterate over FIFO, freeing nodes
    struct list_node *index;
    while ((index = (*queue)->first) != NULL) {
        (*queue)->first = (*queue)->first->next;
        // Optional data freeing
        if (free_data) {
            free(index->data);
            index->data = NULL;
        }
        free(index);
    }
    // Clean mutex
    pthread_mutex_unlock((*queue)->mutex);
    pthread_mutex_destroy(((*queue)->mutex));
    free((*queue)->mutex);
    // Free structure
    free(*queue);
    *queue = NULL;
}

size_t fifo_count(fifo_t *queue) {
    pthread_mutex_lock(queue->mutex); // Lock
    size_t res = queue->count; // Save count
    pthread_mutex_unlock(queue->mutex); // Unlock
    return res;
}

size_t fifo_debug_count(fifo_t *queue) {
    pthread_mutex_lock(queue->mutex); // Lock
    size_t count = 0;
    struct list_node *index = queue->first;
    while (index != NULL) {
        ++count;
        index = index->prev;
    }
    pthread_mutex_unlock(queue->mutex);
    return count;
}


Комментарии