https://www.wikiwand.com/en/Producer%E2%80%93consumer_problem
Code
As the title, I made some chaotic C code.
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
/* output level flags */
#define INFO
#define DEBUG
#define CONSUMER_NUM (8)
#define PRODUCER_NUM (2)
#define ITEM_NUM (256)
#define CONSUMER_LOOP (ITEM_NUM / CONSUMER_NUM)
#define PRODUCER_LOOP (ITEM_NUM / PRODUCER_NUM)
#define BUFFER_SIZE (8)
typedef struct buffer
{
pthread_mutex_t buffer_lock; /* lock protecting the buffer struct */
pthread_cond_t fill; /* buffer has at least one item in it */
pthread_cond_t empty; /* buffer has at least one empty slot */
int space[BUFFER_SIZE]; /* circular buffer space limited by BUFFER_SIZE */
int fill_offset; /* used by producer */
int use_offset; /* used by consumer */
int count; /* item number now in buffer */
} buffer_t;
/* store thread IDs */
typedef struct consumers
{
pthread_t thread_id[CONSUMER_NUM];
} consumers_t;
typedef struct producers
{
pthread_t thread_id[PRODUCER_NUM];
} producers_t;
static inline bool buffer_empty(buffer_t *buf)
{
return (buf->count == 0);
}
static inline bool buffer_full(buffer_t *buf)
{
return (buf->count == BUFFER_SIZE);
}
buffer_t *init_buffer(buffer_t *buf)
{
pthread_mutex_init(&(buf->buffer_lock), NULL);
pthread_cond_init(&(buf->fill), NULL);
pthread_cond_init(&(buf->empty), NULL);
buf->fill_offset = 0;
buf->use_offset = 0;
buf->count = 0;
#ifdef INFO
printf("[INFO]: Initialized buffer with %d slots\n", BUFFER_SIZE);
#endif
return buf;
}
void put(buffer_t *buf, int value)
{
int fill = buf->fill_offset;
buf->space[fill] = value;
buf->fill_offset = (fill + 1) % BUFFER_SIZE;
buf->count++;
#ifdef INFO
printf("[INFO]: Put %d into buffer\n", value);
#endif
}
int get(buffer_t *buf)
{
int use = buf->use_offset;
int value = buf->space[use];
buf->use_offset = (use + 1) % BUFFER_SIZE;
buf->count--;
#ifdef INFO
printf("[INFO]: Get %d from buffer\n", value);
#endif
return value;
}
void *producer(void *args)
{
buffer_t *buf = (buffer_t *)args;
for (int i = 0; i < PRODUCER_LOOP; i++)
{
pthread_mutex_lock(&(buf->buffer_lock));
while (buffer_full(buf))
{
pthread_cond_wait(&(buf->empty), &(buf->buffer_lock));
}
put(buf, i);
pthread_cond_signal(&(buf->fill));
pthread_mutex_unlock(&(buf->buffer_lock));
}
return NULL;
}
void *consumer(void *args)
{
buffer_t *buf = (buffer_t *)args;
for (int i = 0; i < CONSUMER_LOOP; i++)
{
pthread_mutex_lock(&(buf->buffer_lock));
while (buffer_empty(buf))
{
pthread_cond_wait(&(buf->fill), &(buf->buffer_lock));
}
int value = get(buf);
pthread_cond_signal(&(buf->empty));
pthread_mutex_unlock(&(buf->buffer_lock));
}
return NULL;
}
void create_consumer(buffer_t *buf, consumers_t *csm)
{
for (int i = 0; i < CONSUMER_NUM; i++)
{
pthread_create(&(csm->thread_id[i]), NULL, consumer, buf);
}
}
void join_consumer(consumers_t *csm)
{
for (int i = 0; i < CONSUMER_NUM; i++)
{
pthread_join(csm->thread_id[i], NULL);
}
}
void create_producer(buffer_t *buf, producers_t *prod)
{
for (int i = 0; i < PRODUCER_NUM; i++)
{
pthread_create(&(prod->thread_id[i]), NULL, producer, buf);
}
}
void join_producer(producers_t *prod)
{
for (int i = 0; i < PRODUCER_NUM; i++)
{
pthread_join(prod->thread_id[i], NULL);
}
}
int main(void)
{
buffer_t *buf = (buffer_t *)malloc(sizeof(buffer_t));
if (buf == NULL)
{
#ifdef DEBUG
printf("[ERROR]: Failed to allocate memory space for buffer. Fatal error, exit.\n");
#endif
return -1;
}
buf = init_buffer(buf);
/* thread IDs*/
consumers_t *csm = (consumers_t *)malloc(sizeof(consumers_t));
producers_t *prod = (producers_t *)malloc(sizeof(producers_t));
if (csm == NULL || prod == NULL)
{
#ifdef DEBUG
printf("[ERROR]: Failed to allocate memory space for thread IDs. Fatal error, exit.\n");
#endif
return -1;
}
/* real workload */
create_producer(buf, prod);
create_consumer(buf, csm);
join_producer(prod);
join_consumer(csm);
if (buf)
free(buf);
return 0;
}