Add Book to My BookshelfPurchase This Book Online

Chapter 3 - Synchronizing Pthreads

Pthreads Programming
Bradford Nichols, Dick Buttlar and Jacqueline Proulx Farrell
 Copyright © 1996 O'Reilly & Associates, Inc.

Thread Pools
We designed our ATM server according to the boss/worker model for multithreaded programs. The boss creates worker threads on demand. When it receives a request, the boss creates a new worker thread to service that request and that request alone. When the worker completes this request, it exits. This might be ideal if we got a nickel for each thread we created, but it can slow our server in a couple of different ways:
 We don't reuse idle threads to handle new requests. Rather, we create—and destroy—a thread for each request we receive. Consequently, our server spends a lot of time in the Pthreads library.
 We've added to each request's processing time (a request's latency, to use a term from an engineering design spec) the time it takes to create a thread. No wonder our ATM customers keep tapping the Enter button and scowling at the camera!
We'll address these performance snags by redesigning our server to use a thread pool, a very common and very important design technique. Ina server that uses a thread pool, the boss thread creates a fixed number of worker threads up front. Like their boss, these worker threads survive for the duration of the program. When the boss receives a new request, it places it on a queue. Workers remove requests from the queue and process them. When a worker completes a request, it simply removes another one from the queue.
Figure 3-4 shows the components of a thread pool.
Figure 3-4: Thread pool components
The focal point of a thread pool is the request queue. Each request describes a unit of work. (This description might be the name of a routine; it might be just a flag.) Worker threads continually monitor the queue for new work requests; the boss thread places new requests on the queue.
A thread pool has some basic characteristics:
 Number of worker threads. This limits the number of requests that can be in progress at the same time.
 Request queue size. This limits the number of requests that can be waiting for service.
 Behavior when all workers are occupied and the request queue is full. Some requesters may want to block until their requests can be queued and only then resume execution. Others may prefer immediate notification that the pool is full. (For instance, network-based applications typically depend on a status value to avoid "dropping requests on the floor" when the server is overloaded.)
An ATM Server Example That Uses a Thread Pool
We'll start on a version of our ATM server that uses a thread pool by adding some definitions to its header file, as shown in Example 3-22.
Example 3-22: Interface to a Thread Pool (tpool.h)
typedef struct tpool_work {
         void (*routine)();
         void *arg;
         struct tpool_work *next;
} tpool_work_t;
typedef struct tpool {
         /* pool characteristics */
         int num_threads;
         int max_queue_size;
         int do_not_block_when_full;
         /* pool state */
         pthread_t *threads;
         int cur_queue_size;
         tpool_work_t *queue_head;
         tpool_work_t *queue_tail;
         pthread_mutex_t queue_lock;
         pthread_cond_t  queue_not_empty;
         pthread_cond_t  queue_not_full;
         pthread_cond_t  queue_empty;
         int queue_closed;
         int shutdown;
} *tpool_t;
tpool_init(tpool_t *tpoolp,
           int num_worker_threads,
           int max_queue_size,
           int do_not_block_when_full);
tpool_add_work(tpool_t tpool,
           void *routine,
           void *arg);
tpool_destroy(tpool_t tpoolp, int finish);
We've defined three routines that manipulate a thread pool and two new data types. The routines are tpool_init, tpool_add_work,and tpool_destroy.
 The tpool_work_t type represents a single request on the request queue. It includes a pointer to the routine that should be executed by the worker that selects the request, a pointer to this routine's single argument (if any), and a pointer to the next request on the queue. When an external thread (such as the boss) calls tpool_add_work, a new request is added to the tail of the queue. When a worker comes along looking for something to do, it removes a request from the queue's head.
 The tpool_t type is a pointer to a structure that records the characteristics and state of a single thread pool. It contains pointers to the head and tail of the request queue. Because the queue is a shared data structure that may be accessed by all worker threads (as well as any thread that exists outside of the pool and calls tpool_add_work), we'll need to add some synchronization. We'll do so by incorporating a mutex (queue_lock) and three condition variables (queue_not_empty, queue_not_full,and queue_empty) in the tpool_t structure.
 When a worker looks at the queue and finds it empty, it sleeps on the queue_not_empty condition variable. When a caller in tpool_add_work adds an item to an empty queue, it wakes up a sleeping worker by signaling the queue_not_empty condition. Depending on the do_not_block_when_full characteristic of the queue, a thread calling tpool_add_work can wait on the queue_not_full condition variable. When a worker makes room on the queue by removing a request, it signals the queue_not_full condition variable, thus letting the thread in tpool_add_work continue.
 Finally, the tpool_t structure defines shutdown and queue_closed flags. Our tpool_destroy routine uses these flags to shut down the thread pool. The queue_closed flag is used in combination with the queue_empty condition variable to support a delayed shutdown. The delayed shutdown allows the currently queued work to complete.
Initializing a thread pool
The tpool_init routine, shown in Example 3-23, initializes a thread pool. The routine sets the basic characteristics of the thread pool by copying into the tpoolt structure the values of its three input parameters (num_worker_threads,max_queue_size, and do_not_block_when_full). It also initializes the thread pool's state.
Example 3-23: The Thread Pool Initialization Routine (tpool.c)
void tpool_init(tpool_t   *tpoolp,
                int       num_worker_threads,
                int       max_queue_size,
                int       do_not_block_when_full)
{
   int i, rtn;
   tpool_t tpool;
   /* allocate a pool data structure */
   if ((tpool = (tpool_t )malloc(sizeof(struct tpool))) == NULL)
     perror("malloc"), exit(-1);
   /* initialize the fields */
   tpool->num_threads = num_worker_threads;
   tpool->max_queue_size = max_queue_size;
   tpool->do_not_block_when_full = do_not_block_when_full;
   if ((tpool->threads =
         (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads))
           == NULL)
     perror("malloc"), exit(-1);
   tpool->cur_queue_size = 0;
   tpool->queue_head = NULL;
   tpool->queue_tail = NULL;
   tpool->queue_closed = 0;
   tpool->shutdown = 0;
   if ((rtn = pthread_mutex_init(&(tpool->queue_lock), NULL)) != 0)
        fprintf(stderr,"pthread_mutex_init %s",strerror(rtn)), exit(-1);
   if ((rtn = pthread_cond_init(&(tpool->queue_not_empty), NULL)) != 0)
        fprintf(stderr,"pthread_cond_init %s",strerror(rtn)), exit(-1);
   if ((rtn = pthread_cond_init(&(tpool->queue_not_full), NULL)) != 0)
        fprintf(stderr,"pthread_cond_init %s",strerror(rtn)), exit(-1);
   if ((rtn = pthread_cond_init(&(tpool->queue_empty), NULL)) != 0)
        fprintf(stderr,"pthread_cond_init %s",strerror(rtn)), exit(-1);
   /* create threads */
   for (i = 0; i != num_worker_threads; i++) {
        if ((rtn = pthread_create( &(tpool->threads[i]),
                        NULL,
                        tpool_thread,
                        (void *)tpool)) != 0)
           fprintf(stderr,"pthread_create %d",rtn), exit(-1);
   }
   *tpoolp = tpool;
}
Checking for work
In Example 3-23, the tpool_init routine creates all worker threads, starting each one in the tpool_thread routine. The tpool_thread routine, in Example 3-24, contains the logic each worker uses to check the queue for work and take appropriate action depending upon whether or not a request is available. It takes a single argument—a pointer to the tpool_t structure for the pool to which the thread belongs.
Example 3-24: The Thread Pool Thread (tpool.c)
void tpool_thread(tpool_t tpool)
{
   tpool_work_t *my_workp;
   for (;;) {
            pthread_mutex_lock(&(tpool->queue_lock));
            while ( (tpool->cur_queue_size == 0) && (!tpool->shutdown)) {
                      pthread_cond_wait(&(tpool->queue_not_empty),
                      &(tpool->queue_lock));
            }
            if (tpool->shutdown) {
                      pthread_mutex_unlock(&(tpool->queue_lock));
                      pthread_exit(NULL);
            }
            my_workp = tpool->queue_head;
            tpool->cur_queue_size--;
            if (tpool->cur_queue_size == 0)
                      tpool->queue_head = tpool->queue_tail = NULL;
            else
                      tpool->queue->head = my_workp->next;
            if ((!tpool->do_not_block_when_full) &&
              (tpool->cur_queue_size == (tpool->max_queue_size - 1)))
                      pthread_cond_broadcast(&(tpool->queue_not_full));
            if (tpool->cur_queue_size == 0)
                      pthread_cond_signal(&(tpool->queue_empty));
            pthread_mutex_unlock(&(tpool->queue_lock));
            (*(my_workp->routine))(my_workp->arg);
            free(my_workp);
   }
}
The body of the routine is a loop in which the worker checks the request queue. If it's empty, the worker sleeps on the queue_not_empty condition variable. It can be awakened by either a shutdown request from tpool_destroy or a work item placed on its request queue. When awakened by a shutdown request, the worker exits. When awakened by a work request, however, it rechecks the queue, removes the request from the queue's head, and executes the routine specified in the request (using any associated argument). If the worker finds that the queue was full before it removed the node and knows that threads may be blocked waiting to add to the queue (because the pool's do_not_block_when_full characteristic is not set), it signals the queue_not_full condition. Likewise, if this thread empties the queue, it signals queue_empty to allow a delayed shutdown to proceed.
Adding work
In Example 3-25, the tpool_add_work routine adds work requests to the queue.
Example 3-25: Adding Work to a Thread Pool (tpool.c)
int tpool_add_work(tpool_t tpool, void *routine, void *arg)
{
        tpool_work_t *workp;
        pthread_mutex_lock(&tpool->queue_lock);
        if ((tpool->cur_queue_size == tpool->max_queue_size) &&
                                  tpool->do_not_block_when_full) {
                  pthread_mutex_unlock(&tpool->queue_lock);
                  return -1;
                 }
        while ((tpool->cur_queue_size == tpool->max_queue_size) &&
                                  (!(tpool->shutdown || tpool->queue_closed))) {
                  pthread_cond_wait(&tpool->queue_not_full, &tpool->queue_lock);
                 }
        if (tpool->shutdown || tpool->queue_closed) {
                  pthread_mutex_unlock(&tpool->queue_lock);
                  return -1;
        }
        /* allocate work structure */
        workp = (tpool_work_t *)malloc(sizeof(tpool_work_t));
        workp->routine = routine;
        workp->arg = arg;
        workp->next = NULL;
        if (tpool->cur_queue_size == 0) {
                  tpool->queue_tail = tpool->queue_head = workp;
                  pthread_cond_broadcast(&tpool->queue_not_empty);
        } else {
                  (tpool->queue_tail)->next = workp;
                  tpool->queue_tail = workp;
        }
        tpool->cur_queue_size++;
        pthread_mutex_unlock(&tpool->queue_lock);
        return 1;
}
The tpool_add_work routine checks the do_not_block_when_full flag and examines the current size of the request queue. If the queue is full, the routine either returns an error to its caller or suspends itself on the queue_not_full condition, depending on the value of the pool's do_not_block_when_full flag. In the latter case, the tpool_add_work routine resumes when the condition is signaled; it queues the request and returns to its caller.
Deleting a thread pool
The final routine in our thread pool interface, tpool_destroy (Example 3-26),deallocates a thread pool. It sets the shutdownflag in the tpool _t structure to indicate to workers (and threads calling tpool_add_work) that the pool is being deactivated. Worker threads exit when they find this flag set; the tpool_add_work routine returns a -1 to its caller, as shown in Example 3-26.
Example 3-26: Deleting a Thread Pool (tpool.c)
int tpool_destroy(tpool_t    tpool,
                  int        finish)
{
   int          i,rtn;
   tpool_work_t *cur_nodep;
   if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
         fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(-1);
   /* Is a shutdown already in progress? */
   if (tpool->queue_closed || tpool->shutdown) {
      if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
         fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(-1);
      return 0;
   }
   tpool->queue_closed = 1;
   /* If the finish flag is set, wait for workers to drain queue */
   if (finish == 1) {
     while (tpool->cur_queue_size != 0) {
        if ((rtn = pthread_cond_wait(&(tpool->queue_empty),
                          &(tpool->queue_lock))) != 0)
         fprintf(stderr,"pthread_cond_wait %d",rtn), exit(-1);
     }
   }
   tpool->shutdown = 1;
   if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
         fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(-1);
   /* Wake up any workers so they recheck shutdown flag */
   if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
         fprintf(stderr,"pthread_cond_broadcast %d",rtn), exit(-1);
   if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
         fprintf(stderr,"pthread_cond_broadcast %d",rtn), exit(-1);
   /* Wait for workers to exit */
   for(i=0; i < tpool->num_threads; i++) {
        if ((rtn = pthread_join(tpool->threads[i],NULL)) != 0)
            fprintf(stderr,"pthread_join %d",rtn), exit(-1);
   }
   /* Now free pool structures */
   free(tpool->threads);
   while(tpool->queue_head != NULL) {
     cur_nodep = tpool->queue_head->next;
     tpool->queue_head = tpool->queue_head->next;
     free(cur_nodep);
   }
   free(tpool);
   return 0;
}
The tpool_destroy routine ensures that all threads are awake to see the shutdown flag by signaling both the queue_not_empty and queue_not_full conditions. Even still, some threads may be busy completing their current requests; it may still be some time before they learn that a shutdown has begun. To avoid interfering with in-progress requests, tpool_destroy waits for all worker threads to exit by calling pthread_join for each thread. When all workers have departed, tpool_destroy frees the pool's data structures.
The current edition of our tpool_destroy routine is not without its surprises. When it sets the shutdown flag, only those requests that are currently in progress are completed. Any requests that are still in the request queue are lost when the thread pool is deallocated. Instead, it could disallow additions to the queue and wait for the queue to empty before deactivating the thread pool. It could also speed performance by canceling workers rather than waiting for them to check the shutdown flag.
We'll leave the particulars of these enhancements to your imagination. In the meantime, we must move on to our next chapter, Managing Pthreads, in which we'll focus a bit more on some of the Pthreads features we've already introduced (such as attribute objects and keys) and add cancellation and scheduling capabilities to our multithreaded ATM server.
Adapting the atm_server_init and main routines
In Example 3-27, we'll make some quick changes to our atm_server_init so that it:
 Uses a new global thread pool structure (tpool_t) instead of our thread information structure (thread_info_t).
 Initializes the thread pool by supplying the maximum number of threads to tpool_init.
Example 3-27: Using the Thread Pool from the atm_server_init Routine (atm_svr_tpool.c)
#define ATM_MAX_THREADS 10
#define ATM_MAX_QUEUE 10
tpool_t atm_thread_pool;
void atm_server_init(int argc, char **argv)
{
  /* Process input arguments */
  .
  .
  .
  tpool_init(&atm_thread_pool, ATM_MAX_THREADS, ATM_MAX_QUEUE, 0);
  /*  Initialize database and communications */
  .
  .
  .
}
Now, we simply need to change the main routine of our ATM server so that it:
 Calls tpool_add_work for each new request instead of calling pthread_create directly to create a new thread.
 Calls tpool_destroy to synchronize shutdown of the threads and to release resources. There's no need for the thread exit notification we used in the previous examples.
Example 3-28 implements these changes.
Example 3-28: Using the Thread Pool from the main Routine (atm_svr_tpool.c)
extern int
main(int argc, char **argv)
{
  workorder_t *workorderp;
  int  trans_id;
  void *status;
  atm_server_init(argc, argv);
  for(;;) {
    /*** Wait for a request ***/
    server_comm_get_request(&workorderp->conn, workorderp->req_buf);
    /*** Is it a shutdown request? ***/
    sscanf(workorderp->req_buf, "%d", &trans_id);
    if (trans_id == SHUTDOWN) {
      char resp_buf[COMM_BUF_SIZE];
      tpool_destroy(atm_thread_pool, 1);
      /* process it here with main() thread */
      if (shutdown_req(workorderp->req_buf, resp_buf)) {
        server_comm_send_response(workorderp->conn, resp_buf);
        free(workorderp);
        break;
      }
    }
    /*** Use a thread to process this request ***/
  
  }
  server_comm_shutdown();
  return 0;
}

Previous SectionNext Section
Books24x7.com, Inc © 2000 –  Feedback