Program Project I.
One Producer/Several Consumers
Using Threads, Shared Memory and Semaphores

Introduction.

In this assignment you are to write a progream whose initial thread creates three other threads. These three threads - a producer, and two consumers - are to communicate using shared memory and semaphores. The producer has a number of "tasks" to be performed. These tasks are to be given to the consumer threads. It doesn't matter which consumer gets which task. However, one and only one consumer should get each task. The nature of these tasks is described in section A) below.

The producer will insert each task, one at a time, into a shared bounded buffer. Each consumer will remove tasks, one at a time, from this buffer. A bound, bufSize, will be imposed on the buffer. That is, at most bufSize tasks can be in the buffer at a time. The total number of tasks to be done will be stored in a global variable, limit, and will generally be larger than bufSize. So the producer will have to wait if the buffer becomes full. Consumers always wait if the buffer is empty. The producer and consumer threads should continue until all tasks have been received. If n1 is the number of tasks removed by Consumer1 and n2, the number removed by Consumer2, then n1 + n2 should be equal to limit. However, the number of tasks removed by Consumer1 will generally not be the same as removed by Consumer2 and this may change each time the program is run.

As each task is removed from the buffer by a consumer, the consumer should build an item:

struct item
{
  // Public members
  int task;
  pthread_t which_thread;

  // Constructor
  item(int tsk=0, pthread_t tid=0) 
   { task = tsk; which_thread = tid; }
};

The task member should be set to the task number removed from the buffer. The which_thread member should be set to the thread id of the consumer constructing the item. Finally, the consumer should then insert the item into a shared list (unbounded), call results.

It is best if the initial thread waits for all three other threads to terminate. (See section D below.) The initial thread can then process the results list sequentially. It should print out the total number of items, the number of items inserted by Consumer1 and the number inserted by Consumer2, and any task was not completed.

Here is a diagram of the threads, and their interaction through the the bounded buffer and the results list:

The producer can presumably terminate when it has copied the last task and one of the consumers has accepted it. However, both consumers should eventually terminate, so the producer may need to do a bit more communication to make sure of this. That is, you are allowed to modify the producer, if necessary, in order to help the consumers know when to stop.

How does a consumer know if a removed task is the last task? How does a consumer know if there are any more tasks to be done? If there are no more tasks to be done, should a Consumer execute a sem_wait? If so, and the Consumer blocks, who will unblock it?

The following details are discussed below:

A. Description of the problem.

A task will be represented by an integer.

The initial thread should:

The producer thread should:

You may want to print the values of the result list for debugging purposes. This feature may result in too much output for large values of limit. You should be able to turn this feature on or off using a command line argument. This will be discussed in class.

B. Remarks on shared memory

When using shared memory and semaphores, you will be using threads. So you will need to include the thread.h header:

#include <thread.h>

but no additional header file is needed to use shared memory with threads, since all threads within a process share the same address space and all have access to the global variables.

You should use the standard c++ list class for the results list. I have provided a BoundedBuffer class for the buffer. y:

#include <list>

int limit;

list<item, malloc_alloc> results; // where item is as declared above

list<int, malloc_alloc> buffer;

int bufsize;

The program should prompt for the buffer size, bufSize, and for the number of tasks, limit. All of this should be done by the initial thread before creating the producer and two consumer threads.

You can also use a small number of other shared (global) memory variables to communicate information if you think it would be useful in solving the termination problem for the two consumer threads.

The producer should only hold exclusive use of the buffer while inserting one item. That is, the producer should not get exclusive use and hold it while it fills up the buffer. This might cause the consumers to wait when the buffer is not empty. Each time the producer gets exclusive control of the buffer, it should only insert ONE task. To insert another task, it must first release control and then reacquire control for the next task.

C. Remarks on semaphores

As noted in above, when using shared memory and semaphores, you will be using threads. So you will need to include the thread.h header. To use semaphores for threads, you will also need to include the semaphore.h header:

#include <thread.h>
#include <sys/semaphore.h>

You will need to use sem_init, sem_wait, and sem_post. These functions are briefly described by:

C.1 Semaphore declaration

sem_t s;  /* declaration for the data structure for a semaphore
*/

C.2 Semaphore initialization


      int sem_init(sem_t *sem, int pshared, unsigned int value);

	sem = address of a sem_t structure
        pshared = should be 0 (can't be shared with threads in other processes)
	value = the initial value for the semaphore, must be >= 0.


Returns 0 if successful, and non-zero otherwise.
Example: To initialize the semaphore, s, declared above, with value 5:
	if ( sem_init(&s, 0, 5) != 0)
	  {
	    printf("Semaphore not initialized\n");
	    exit(0);
          }

C.3 P operation ("decrement") on semaphores.

int sem_wait(sem_t *sp); 

	sp = address of a sem_t structure

Example:

	sem_wait(&s);


C.4 V operation ("increment") on semaphores.

int sem_post(sem_t *sp); 

	sp = address of a sem_t structure

Example:

	sem_post(&s);

D. Waiting for the producer and consumers

You should have the initial thread execute pthread_join(th_id, 0) statement for each of the threads, where th_id is the thread id of the thread for which you are waiting.

	pthread_t th_id;

but the type pthread_t is really just unsigned integer.

E. Initial code.

The initial code provides:

F. What to turn in.

I will run your program. You should put your source code file(s) in a subdirectory named "prog1src" of your unix login directory. You should give me (and only me) access permissions to that directory (r-x) and read permission on your source code files for project 1 (r--). Use the access control list command, setacl, to set these permissions. (See the Jan19 lecture notes and/or the man pages for setacl.) Submit a file to COL for this project with the message that your program is ready and the permissions have been changed to give me access. You may also include any additional comments you wish to provide on running your program.