/*------------------------------------------------------------------------ | | File: prog1sample.cpp | | Compile: | | g++ prog1sample.cpp -lpthread -lrt -o prog1sample | | | Description: A program using two additional threads: | | 1. Initial thread executing main. Prompts the user to input an | integer value for 'limit'. It then prompts the user to enter the | size for a bounded buffer and creates the buffer. | | The initial thread then creates some semaphores, mutex, | slot_avail, and item_avail, and two other two threads | (suspended) - a producer thread and a consumer thread. | | The initial thread finally starts the two suspended threads and | waits for them to finish. | | When the suspended threads have finished, the initial thread | prints the contents of the results list. | | 2. Producer thread. This thread "produces" the tasks 1, 2, ..., | limit. As it "produces" each task, it tries to insert the | integer task into the bounded buffer. | | 3. Consumer thread. This thread tries to remove a task (integer) | from the bounded buffer. For each integer task removed, it | enters the item pair (id, task) into the results list (at the | end), where id is the consumer thread's thread id. | | | 4. Debugging output should be produced by providing 1 or more | command line arguments: | | m - Initial thread(main) prints initialization messages. | p - producer prints "producer" for each task | c - consumer prints "consumer" for each task | i - initial thread prints each result item | | Examples | prog1sample p c | or prog1sample pc | | | should print "producer" and "consumer" as each thread processes a | task. | | prog1sample mi | | should print the initialization messages from the initial thread | and also print each item from the result list. | | | | Created: | Author: | Modifications: | +-------------------------------------------------------------------------*/ #include #include #include #include #include #include // Needed if semaphores, etc are used. #include // Note: The std C++ headers have no .h #include #include using namespace std; struct item { int value; pthread_t which_thread; item(int v = 0, pthread_t tid = 0) : value(v), which_thread(tid) {} }; void* consumer(void *); /* A function for consumer threads to execute */ void* producer(void *); /* A function for producer thread to execute */ // Shared global objects: int limit; // Number of tasks for producer to produce // The bounded buffer list buffer; unsigned int bsize; // Result list indicating which consumer thread got which task list results; // Semaphores sem_t goSem; sem_t item_avail, slot_avail; sem_t mutex; sem_t printMutex; int taskcnt[2]; // taskcnt[i] is how many tasks consumer i processed bool *task_finished; // Boolean array. Was each task consumed? char argstr[256]; void printhelp() { cout << "\nUsage: prog1sample [ -h | Print_options ]\n" "\n" "where \n" " -h print this message and exit\n" " Print_options A string of one or more of the following\n" " print option characters:\n\n" " m - main thread prints initialization messages\n" " p - producer prints \"producer\" for each task\n" " c - consumer prints \"consumer\" for each task\n" " i - initial thread (main) prints each result item" << endl; } void InitDebug(int argc, char * argv[]) { argstr[0] = '\0'; for(int i = 1; i < argc; i++) { if ( strcmp(argv[i], "-h") == 0 ) { printhelp(); exit(0); } strcat(argstr, argv[i]); } } bool DebugEnabled(char ch) { for(int i = 0; i < (int) strlen(argstr); i++) { if ( ch == argstr[i]) return true; } return false; } void DEBUG(char ch, const string& s) { if ( DebugEnabled(ch) ) { sem_wait(&printMutex); cout << s << endl; sem_post(&printMutex); } } int main(int argc, char *argv[]) { stringstream sstr; // For conversions to strings pthread_t producer_id; // To hold the thread id of the producer pthread_t consumer_id[2]; // To hold thread id's of 2 consumers pthread_t init = pthread_self(); // Initialize debug printing InitDebug(argc, argv); cout.setf(ios::unitbuf); cout << "This hp machine has " << pthread_num_processors_np() << " processors." << endl; cout << "main begins" << endl; cerr << "Enter limit: "; cin >> limit; // Allocate the boolean array now that we know // how many tasks there are. task_finished = new bool[limit+1]; cerr << "Enter bounded buffer size: "; cin >> bsize; sstr.clear(); sstr.str(""); sstr << "\nmain Thread (id = " << init << ") begins."; DEBUG('m', sstr.str()); // YOU WRITE (more) CODE HERE. // Create and initialize the semaphore(s) if ( sem_init(&goSem, 0, 0) != 0) { printf("Unable to create goSem semaphore.\n"); exit(0); } if ( sem_init(&printMutex, 0, 1) != 0) { printf("Unable to create printMutex semaphore.\n"); exit(0); } // YOU WRITE (more) CODE HERE. // Create the consumer and producer threads ( they block themselves ) if (pthread_create(&consumer_id[0], 0, consumer, (void *) 0) != 0) { cout << "\nFirst consumer thread create failed." << endl; exit(0); } else { sstr.clear(); sstr.str(""); sstr << "\nFirst consumer thread id = " << consumer_id[0] << " created."; DEBUG('m', sstr.str()); } if (pthread_create(&producer_id, 0, producer, (void *) 0) != 0) { cout << "\nProducer thread create failed" << endl; exit(0); } else { sstr.clear(); sstr.str(""); sstr << "\nProducer thread id = " << producer_id << " created!"; DEBUG('m', sstr.str()); } // Now start the threads running sstr.clear(); sstr.str(""); sstr << "\nInitial thread (id = " << init << ") " << "unblocking producer/consumer threads!"; DEBUG('m', sstr.str()); sem_post(&goSem); sem_post(&goSem); // Wait for the threads to finish sstr.clear(); sstr.str(""); sstr << "\nInitial thread (id = " << init << ") " << "waiting for producer/consumer threads..."; DEBUG('m', sstr.str()); pthread_join(producer_id, 0); pthread_join(consumer_id[0], 0); sstr.clear(); sstr.str(""); sstr << "\nInitial thread (id = " << init << ") ends after the producer (id = " << producer_id << ") and consumer (id = " << consumer_id[0] << ") threads finish."; cout << sstr.str() << endl; cout << "\nFinal Contents of results queue:" << endl; // YOU WRITE CODE HERE. // You should insert code here that counts how many tasks completed // by (each) consumer and mark each completed task. // If debug flag 'i' is on, then also print each item in the results list. // YOU WRITE CODE HERE. // Next you should check to see if each task was marked. // Print the unmarked tasks as "not completed." // Also print the total number of tasks not completed; // this might be 0 if all the tasks WERE completed.) return EXIT_SUCCESS; } void* consumer(void* p) { pthread_t me = pthread_self(); stringstream sstr; sem_wait(&goSem); sstr << "\nConsumer Thread begins( id = " << me << ", function = f)\n"; DEBUG('m', sstr.str()); // YOU WRITE CODE HERE // Code for the consumer to get items from buffer and // insert into the results list goes here. // Final message before consumer terminates. sstr.clear(); sstr.str(""); sstr << "\nConsumer Thread (id = " << me << ") finishes."; DEBUG('m', sstr.str()); return 0; } void* producer(void* p) { pthread_t me = pthread_self(); stringstream sstr; sem_wait(&goSem); sstr << "\nProducer thread begins( id = " << me << ", function = g)\n"; DEBUG('m', sstr.str()); // YOU WRITE CODE HERE. // Code for the producer to insert limit items // 1,2,...,limit into the bounded buffer goes here. // Final message before the producer terminates. sstr.clear(); sstr.str(""); sstr << "\nProducer Thread (id = " << me << ") finishing"; DEBUG('m', sstr.str()); return 0; }