-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathqueue-simulation.c
159 lines (135 loc) · 4.05 KB
/
queue-simulation.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
* A queue simulation using two threads - a consumer and a producer.
*
* At every interval, the producer randomly inserts a message to the queue or
* not, and at each step, the consumer randomly processes a message or not.
* One condition variable and one mutex is used. Note that if we used
* semaphores, we would need 2 of them to represent both "sleeping" states.
*
* On a full queue, the producer will be put to sleep; on an empty queue,
* consumer will be put to sleep.
*
* A randomness factor is used here to have the queue get some items in it and
* also to hit both boundary cases (empty and full queue) from time to time.
* When hitting a boundary, one of the threads actualy gets blocked on the
* condition variable.
*
* (c) jp@devnull.cz
*/
#define _XOPEN_SOURCE 700 // needed for random() on Linux
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>
#define DEFAULT_MAX 10
/* number of items in the queue. protected by the mutex below. */
int in_queue;
/* queue capacity */
int capacity = DEFAULT_MAX;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *
producer(void *x)
{
int msg_created;
while (1) {
poll(NULL, 0, 95); /* sleep 95 ms */
msg_created = random() % 2;
if (msg_created == 0)
continue;
pthread_mutex_lock(&mutex);
/* We can't insert a "message" when the queue is full. */
while (in_queue == capacity) {
(void) printf("Producer: queue already full, "
"putting myself to sleep.\n");
pthread_cond_wait(&cond, &mutex);
(void) printf("Producer: woken up.\n");
}
in_queue += msg_created;
/*
* If the queue was empty and we produced a "message", we must
* notify the consumer so it can start working the queue again.
* Note that the consumer actually may or may not sleep while
* the queue was empty - it may not sleep if it didn't get to
* check the queue again to find it empty.
*
* We could also signal each time we insert a message but that
* is really not necessary and would be actually wasteful.
*/
if (in_queue == 1) {
(void) printf("Producer: we inserted a message to "
"an empty queue, signalling the consumer.\n");
pthread_cond_signal(&cond);
}
pthread_mutex_unlock(&mutex);
}
/* NOTREACHED */
}
void *
consumer(void *x)
{
int msg_removed;
while (1) {
poll(NULL, 0, 100); /* sleep 100 ms */
msg_removed = random() % 2;
if (msg_removed == 0)
continue;
pthread_mutex_lock(&mutex);
/*
* We cannot get a "message" when there is none so go to
* sleep.
*/
while (in_queue == 0) {
(void) printf("Consumer: queue already empty, "
"putting myself to sleep.\n");
pthread_cond_wait(&cond, &mutex);
(void) printf("Consumer: woken up.\n");
}
int orig_in_queue = in_queue;
/*
* If the queue was full and we pulled a "message(s)" from it,
* signal the producer so that it can start producing
* messages again.
*/
in_queue -= msg_removed;
if (orig_in_queue == capacity) {
(void) printf("Consumer: queue no longer full, "
"signalling the producer.\n");
pthread_cond_signal(&cond);
}
pthread_mutex_unlock(&mutex);
}
/* NOTREACHED */
}
int
main(int argc, char **argv)
{
int i;
pthread_t t_prod, t_cons;
if (argc == 2)
capacity = atoi(argv[1]);
/* Seed the random device. */
srandom(time(NULL));
pthread_create(&t_prod, NULL, producer, NULL);
pthread_create(&t_cons, NULL, consumer, NULL);
/*
* Main thread. Periodically print the "contents" of the queue. A dot
* means an item, a space means an empty slot. Note that we do that
* under a lock to make sure we work with consistent data.
* This could be refactored into separate thread. Main thread would
* have to wait for the threads to complete via pthread_join().
*/
while (1) {
pthread_mutex_lock(&mutex);
printf(" |");
for (i = 0; i < in_queue; ++i)
putchar('.');
for (i = in_queue; i < capacity; ++i)
putchar(' ');
printf("|\n");
pthread_mutex_unlock(&mutex);
poll(NULL, 0, 85); /* sleep 85 ms */
}
/* Never reached. */
}