The system function to create a POSIX thread is:
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void*),
void *restrict arg);
Example:
#include <pthread.h>
void start() {
pthread_t thread;
int i;
for(i = 0; i < n; i++) {
pthread_create(&thread, // thread ID
0, // pointer to thread attribute struct
server, // the thread's initial function
arg); // the argument to be passed to the thread's fnc
}
}
void * server(void *arg)
{
// thread body
return 0; // have to return a pointer
}
typedef struct {
int first;
int second;
} fd_pair_t;
void rlogind(int r_in, int r_out, int l_in, int l_out)
{
pthread_t in_thread, out_thread;
fd_pair_t in = {r_in, l_in};
fd_pair_t out = {r_out, l_out};
pthread_create(&in_thread, 0, incoming, (void *) &in);
pthread_create(&out_thread, 0, outgoing, (void *) &out);
}
Four approaches
Copy all argumnets to the thread's stack: not
supported!
pthread initial functions only take 1 argument.
-
Pass a pointer to local storage containing the arguments:
Ok if we are certain this storage doesn't go out of
scope until the thread is finished with it!
-
Pass a pointer to static or global storage containing the arguments:
Works only if only one thread at a time is using the storage.
-
Pass a pointer to dynamically allocated storage containing
the arguments.
This works provided the storage can be released (free())
when (and only when) the thread is finished with it.
The function
pthread_join(thread, ptr)
provides a way to wait for a thread to terminate and get
its exit/return value.
First argument is the thread id of the thread to wait
for.
-
The second (if not 0) indicates where the return value
should go.
-
The return value is 0 if successful or else an error number.
void rlogind(int r_in, int r_out, int l_in, int l_out)
{
pthread_t in_thread, out_thread;
fd_pair_t in = {r_in, l_in};
fd_pair_t out = {r_out, l_out};
pthread_create(&in_thread, 0, incoming, (void *) &in);
pthread_create(&out_thread, 0, outgoing, (void *) &out);
pthread_join(in_thread, 0);
pthread_join(out_thread, 0);
}
Return from main will cause exit to be called.
Calling exit implicitly or explicitly terminates the
process and all its threads.
If a thread whose initial function is not main returns
or calls pthread_exit(n) will terminate just that
thread.
This can result in zombie threads if no thread in the
process has yet pthread_join.
Note that thread id's in general may be reused. It is
considered unsafe to call pthread_join more than once
with the same thread id. This may or may not be detected.
Using 0 for the thread attribute argument of pthread_create
will result in a default value for the stack size for that
thread.
Example.
Suppose we are creating 1024 threads, each with a default stack
size of 8Mb (default value in some versions of POSIX threads on
Linux).
This would require 8Gb of address space.
But a 32bit machine has only 4Gb of address space.
So to get 1024 threads, each one would have to use a smaller
stack size.
Example: Request a 20Mb stack:
pthread_t thread;
pthread_attr_t thr_attr;
pthread_attr_init(&thr_attr);
pthread_attr_setstacksize(&thr_attr, 20 * 1024 * 1024);
...
pthread_create(&thread, &thr_attr, start, arg);
Consider that 2 threads might execute this statement where the
initial value of x is 5 and y is 10
x = x + y;
Assembler instructions could be:
movl x, %ebx
addl y, %ebx
movl %ebx, x
The final value in x should/can be what after the 2 threads
execute? Is it 25?
// Shared by both threads
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
int x = 5;
int y = 10;
Now the common statement executed by both threads is protected
by the mutex:
...
pthread_mutex_lock(&m);
x = x + y;
pthread_mutex_unlock(&m);
...
Suppose two (or more) threads use 2 objects that can only be used by
one thread at a time (serially reusable). Each thread
might use just one of the objects initially and then try to get
the second one as well:
void thr1() void thr2()
{ {
pthread_mutex_lock(&m1); pthread_mutex_lock(&m2);
/* use object 1 */ /* use object 2 */
... ...
pthead_mutex_lock(&m2); pthead_mutex_lock(&m1);
/* use objects 1 and 2 */ /* use objects 1 and 2 */
... ...
pthread_mutex_unlock(&m2); pthread_mutex_unlock(&m1);
pthread_mutex_unlock(&m1); pthread_mutex_unlock(&m2);
} }
Can result in deadlock! How?
We could specify a protocol of how the objects must be
locked.
E.g., If a thread wants to use both objects at the same time,
it must lock object 1 first and then object 2.
Suppose thread 1 has object 1, but is not using object 2.
If thread 2 wants to use both objects, it shouldn't lock object
2 even though it is free. It should (try) to lock object 1 (and
will block until it is released.)
If there are more than 2 objects, extending this to an ordering
will also work to prevent deadlock.
Mutual exclusion is important, but doesn't handle all
problems with concurrently executing threads.
In some cases a thread must wait at a point in its code until
another thread has changed some state or has possibly has reached
a particular point in its code.
How? Mutexes aren't generally useful for this.
As an abstract data type a semaphore sconsists of:
- an integer value (private)
- a list of threads waiting (private)
- sem_init(&s) (public) initialize the integer value (>=
0)
- sem_wait(&s) (public) try to decrement the
integer; possibly causing the calling thread to block
- sem_post(&s) (public) increment and possibly
unblocking one or the waiting threads.
The functions should be atomic. That is, If 2 or more
threads attempt to call either of these functions concurrently,
the calls will execute one after the other in some order, but execution
will not be interleaved with only a part of a function being executed by
one thread before another thread starts executing one of the functions.
There are several ways to implement the semaphore abstract data
type.
Looking at one possible way can help elaborate the
missing details from the abstract description.
- The integer can be initialzed to any value >= 0.
- sem_wait:
- first decrement the integer
- if the value is now < 0, the calling thread is
blocked and placed on the wait list for this semaphore
- return
- sem_post:
- first increment the integer
- if the value is now <= 0, the wait list is
not empty. Unblock one of the threads (presumably
first in, first out)
- return
Suppose thread 1 must wait at position A in its code until
thread 2 has arrived at position B in its code:
thread 1 thread 2
. .
. .
. .
. .
A: .
. .
. .
. B:
This isn't a mutual exclusion problem.
Use a semaphore s with initial value 0!
// shared
sem_t s;
sem_init(&s, 0, 0);
Then insert semaphore calls in the code of the two threads:
thread 1 thread 2
. .
. .
. .
. .
A: sem_wait(&s) .
. .
. .
. B: sem_post(&s);
What if thread 2 gets to B before thread 1 reaches A?
What if thread 1 gets to A before thread B reaches B?
The producer/consumer problem with bounded buffer:
- producer thread repeatedly inserts items in a queue
(bounded buffer) which can hold at most N items
- consumer thread repeatedly removes items from the queue
Requirements
- Mutual exclusion is required for remove and
insert
- Synchronization is required for the producer: must wait if
the buffer is full.
- Synchronization is required for the consumer: must wait if
the buffer is empty.
We need 2 semaphores, one for each synchronization
requirement.
We need 1 mutex.
Assume the buffer size is N items.
// shared
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
sem_t full_slot;
sem_t empty_slot;
sem_init(&full_slot, 0, 0);
sem_init(&empty_slot, 0, N);
void *producer(void *) void *consumer(void *)
{ {
while(1) while(1)
{ {
// produce item sem_wait(&full_slot);
... pthread_mutex_lock(&m);
sem_wait(&empty_slot); item = remove();
pthread_mutex_lock(&m); pthread_mutex_ulock(&m);
insert(item); sem_post(&full_slot);
pthread_mutex_unlock(&m); // use item
sem_post(&full_slot); ...
} }
} }
If done at user level, sections being protected by a mutex or
waiting for a semaphore operations are written by the user and can take
arbitrarily long time.
It is possible to use "busy" waiting: loop while testing a
shared integer variable(s) to implement mutexes, etc., but it is
wasteful of processor cycles.
How can the semaphore operations be made atomic and
efficient?
Answer: Make these be system calls. The operating system can
change the state of a thread to blocked and not schedule
it to use a processor until its state is changed to
unblocked.
Mutexes and semaphores can solve many problems, but for some
problems the solution can involve a rather complex use of
tools.
POSIX provides yet another tool: condition variables
which for some kinds of problems provides a clearer
solution than using semaphores.
An example helps understand the semantics of condition variables.
The idea is to wait on some condition - here called the
guard.
But evaluating the guard must be done exclusively by only one
thread!
Code using a condition variable - cond_var - would look
like something like this:
pthread_mutex_lock(&mutex);
while(!guard)
{
pthread_cond_wait(&cond_var, &mutex);
}
statement 1;
...
statement n;
pthread_mutex_unlock(&mutex);
pthread_cond_wait(&cond_var, &mutex);
- The mutex must already be locked by the thread calling
pthread_cond_wait
-
this call unlocks the mutex
-
blocks the calling thread, putting it on the queue for the
condition variable
Now assume at least one thread is on the wait queue and another
thread modifies the guard condition.
After that thread is through with the guard it can call
pthread_cond_broadcast
and unlock the mutex.
As threads waiting in the call to pthread_cond_wait are woken up
they implicitly call pthread_mutex_lock before returning from
pthread_cond_wait.
These threads must then check the guard again as it might have
been changed by one of the other threads they were awakened.
pthread_mutex_lock(&mutex);
while(!guard)
{
pthread_cond_wait(&cond_var, &mutex);
}
statement 1;
...
statement n;
pthread_mutex_unlock(&mutex);
If the first thread released from within pthread_cond
always makes the guard false from within its statement sequence,
it is pointless to wake up all the other waiting threads since
they would just block again waiting on the mutex.
An alternative to pthread_cond_broadcast is
pthread_cond_signal(&cond_var)
which wakes up the first thread on the condition
variable wait queue.
Mutual exclusion is only required among a group of readers and
writers of a data structure only if the data structure is being
modified.
// shared
pthead_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t readerQ = PTHREAD_COND_INITIALIZER;
pthread_cond_t writerQ = PTHREAD_COND_INITIALIZER;
int readers = 0;
int writers = 0;
The reader and writer code:
void reader() void writer()
{ {
pthread_mutex_lock(&m); pthread_mutex_lock(&m);
while(!(writers == 0)) { while(!((readers == 0) && (writers == 0))) {
pthread_cond_wait(&readerQ, &m); pthread_cond_wait(&writerQ, &m);
} }
readers++; writers++;
pthread_mutex_unlock(&m); pthread_mutex_unlock(&m);
// read // write
pthread_mutex_lock(&m); pthread_mutex_lock(&m);
if (--readers == 0) { writers--;
pthread_cond_signal(&writerQ); pthread_cond_signal(&writerQ);
} pthread_cond_broadcast(&readerQ);
pthread_mutex_unlock(&m); pthread_mutex_unlock(&m);
} }
Note: This "solution" can cause writers to starve.
It isn't too hard to devise a solution again with condition
variables that avoids this problem.
A key idea is to keep a count of waiting writers and another
variable for the active_writer.
Can you modify the solution above to provide this solution?