Thread Pools Using Solaris 8 Asynchronous I/O
Thread Pools Using Solaris 8 Asynchronous I/O    

Introduction

This paper addresses the issue of enhancing server performance by leveraging the concepts of thread pools and asynchronous I/O. It was motivated by an ISV's need to port an NT-compliant-IOCP-using application to Solaris 8.

The discussion is centered around one kind of server application, where a large number of clients is serviced by a significantly smaller number of server threads. It describes all the necessary APIs and presents working example code for a server (and its matching client) to illustrate how the APIs can be used. There are also pointers to other Solaris APIs that could be used to attack the problem, and a discussion of situations in which AIO might not be the best choice.

This paper assumes that the reader is already familiar with Solaris synchronous input/output facilities (read(), write(), and so forth), with the network APIs that create and manage Internet sockets, and with the POSIX threading model ("Pthreads"). There are many excellent sources of detailed information on these topics, including

  • Advanced Programming in the UNIX Environment by W. Richard  Stevens. The basics of UNIX, including input/output as well as process control and many other topics not covered in this  paper.

  • UNIX Network Programming, Volume 1: Networking APIs by  W. Richard Stevens. The essentials for writing clients and  servers that communicate across the Internet.

  • Multithreaded Programming with Pthreads by Bil Lewis and Daniel J Berg. How to  use the POSIX APIs for multi-threaded programs.

The Usenet newsgroups comp.unix.programmer and comp.programming.threads have pointers to more information on all these topics, and are frequented by recognized experts.

The Concept of Thread Pools

The designer of a server application can choose from a variety of architectural models. Two common ones are:

  • The serial model, where a single thread waits for client requests (typically over the network). When the request comes in, the thread wakes up and handles it.

  • The concurrent model, where a single thread waits for client requests and then creates a new thread to handle the request every time a client request comes in. The original thread then loops back and goes back to listening for incoming requests while the new thread handles the request.

The serial model does not handle multiple, simultaneous requests well. If two clients make a request simultaneously, one has to wait until the other one is serviced. Hence, this model is discarded as being good only for extremely simplistic server applications.

Due to the limitations of the serial model, the concurrent model is quite popular. The advantage is that the thread that is waiting for incoming requests has very little work to do. In most cases, this thread is sleeping most of the time. Incoming client requests are handled expediently, because each request gets its own thread. Also, the concurrent model scales well on multiprocessor machines.

However, it was noticed that the performance of the concurrent model was not as high as developers liked it to be. When there is a large number of clients connected to the server simultaneously, it implies that there is an equally large number of threads running on the  server concurrently. Since all the threads are runnable (not suspended and waiting for something to happen), the kernel spends most of its time context switching between these threads and very little time actually performing the service.

In such a situation, an elegant solution would be to have a thread pool: a collection of N threads that are used to run tasks. In this model, one of the fundamental issues is  "multiplexing" a large number of clients onto a much smaller number of worker threads. Disparities as great as three orders of magnitude (e.g., 200 threads serving 100,000 clients) are not uncommon, and efficient management of the dispatch problem is essential to the application's performance.

In the thread pool model, none of the threads is bound to a particular client or service. Every time a job needs to be done, it is handed off to any thread from the pool. The threads are available to run any task and they will run up to N at a time. If there are more tasks than threads, tasks wait in a queue until a thread becomes available before continuing.

Why Asynchronous I/O?

When an application executes I/O system calls like read() or write(), the Solaris kernel translates these calls into I/O commands and sends them to the I/O devices ( file systems, network connections or raw devices such as printers, etc.). A read() suspends the program until data arrives, then places the input in a designated area and awakens the program again; the program can begin processing the input data as soon as the read() system call returns. Similarly, a write() suspends the program until the output data can be disposed of; at the completion of the write() the program wakes up and can reuse the "written" memory for other purposes.

In other words, the program sleeps until the I/O operation completes, and cannot perform computation during that time. For performance sensitive applications (as, in most cases, client/server applications are) it is not good that an application sleeps over I/O.

Asynchronous I/O (AIO) avoids this problem. The asynchronous aioread() and aiowrite() system calls also initiate an input or output operation, but they do not force the  program to wait idly until the operation completes. Instead, the program keeps on running while the input or output is in progress; computation and I/O proceed simultaneously and in parallel.

Asynchronous I/O diagramAfter issuing an aioread(), the program cannot immediately start processing data from the input buffer because the data might not be there yet; aioread() returns as soon as the input operation is initiated, but the operation may not yet have completed. Similarly, when an aiowrite() returns, the program cannot start overwriting its output buffer because the old data might not have been drained out yet. Since the program and the I/O operations run in parallel, the program needs a way to check on the progress of the I/O operations and determine when they've actually finished.

The aiowait() system call suspends the program until an I/O operation initiated by aioread() or aiowrite() completes. When the aiowait() system call returns, the program knows that the data exchange initiated by an aioread() or aiowrite() has completed. This provides a synchronous method of notification.

In the context of a single file descriptor, this isn't too exciting. Yes, it enables a program to engage in fancy double-buffering schemes, but Solaris typically manages that sort of thing even with synchronous read() and write(). The crucial point is that a program can have many asynchronous I/O operations in progress simultaneously. A program can start an aioread() on file descriptor 7, say, and while that operation is in progress the program can start an aiowrite() on file descriptor 9. Later, when the program has nothing better to do, it executes an aiowait() to suspend operations until one of the operations finishes. A trace of the action might look something like the diagram at the right.

One important point to observe here is that the I/O operations will not necessarily complete in the same order they were started; when the program does an aiowait() it waits for the completion of any aioread() or aiowrite(), no matter when it was issued. If several I/O operations are in progress, the program needs a way to determine which one has completed. The following sample application shows one way of doing this.

Another thing to note is that it's perfectly alright for an I/O to finish while the program is not waiting for it; all that happens is that the program's next aiowait() will return immediately, reporting the already-completed I/O instead of suspending the program.

In the server design presented in this paper, you will see that the computational activities in the above diagram are spread across several threads of execution: one thread will initiate an aioread(), for example, and an entirely different thread will perform the aiowait() which detects the read's completion. Although this distribution of labor is important to the server's design, it makes no essential difference to the way the AIO operations work. As far as Solaris is concerned, all the aioread(), aiowrite(), and aiowait() calls are coming from the same process, and it doesn't matter whether the process has one thread or a thousand. A thread that does an aioread() or aiowrite() starts its I/O operation and keeps right on running, and a thread that does an aiowait() suspends until some asynchronous I/O, any asynchronous I/O, finishes.

The Sample Application

The application developed to demonstrate the use of asynchronous I/O with thread pools is a simple multi-threaded server that handles just one kind of request from its clients. The server is implemented in the boss-worker model, using a pool of worker threads.

Upon initialization, the server

  • Creates a main thread that in turn

  • Creates a pool of worker threads

  • Creates a poll thread to perform aiowait()

  • Creates a queue to hold work requests

  • Opens a connection on a specified port number and listens for incoming requests.



When a request arrives, the main/boss thread puts it on the queue. The pool of worker threads listens to this queue for requests. When a request comes in on the queue, a worker thread extracts the request, examines the contents, performs necessary function (in this test case, aioread() or aiowrite()) and based on the status of request, (completed or more work needed ), either puts it back on the queue or returns data to the client. The worker threads initiate aioread() or aiowrite() and return to the thread pool to listen for more requests.

The poll thread performs aiowait() periodically. When an asynchronous I/O operation completes, aiowait() returns a pointer to the aio_result_t structure where aioread() or aiowrite() was told to store information about the completed operation. This aio_result_t contains the number of bytes transferred and an error code if applicable, but does not directly indicate which of the many I/O operations has completed; it doesn't even say whether the operation was input or output! It's up to the poll thread to figure this out.

In this server, the problem is solved in a simple way: every aioread() or aiowrite() must supply a pointer to the aio_result_t to be used for that operation, and by convention this aio_result_t is an element of a larger structure containing additional information of interest to the server:

        typedef struct client_data {    /* server-defined structure */
            aio_result_t aiores;        /* AIO result structure */
            ...                         /* other server-specific data */
        } client_data_t;

Since the aio_result_t element is the first thing in this structure, the address of the aio_result_t is also the address of the enclosing client_data_t. All that's needed is a cast to convert the pointer to the proper type:

        aio_result_t *aioptr = aiowait(...);
      client_data_t *cdtptr = (client_data_t*)aioptr;

Having located the client_data_t corresponding to the completed I/O operation, the poll thread can chain it back onto the request queue, where one of the worker threads will pick it up and decide what to do next. Meanwhile, the poll thread loops back to the aiowait() to wait for the next I/O completion.

Among other things....

  • The notification mechanism used in this application is aiowait(). However, there are other notification mechanisms.

  • AIO_INPROGRESS : aioread(3AIO)/ aiowrite(3AIO) return an aio_result_t, which consists of the following:

    int aio_return; /* return value of read( ) or write( ) */ int aio_errno; /* value of errno for read( ) or write( ) */

    Solaris does not use AIO_INPROGRESS for aio_return. Therefore, completion of aioread(3AIO) can be detected by initializing aio_return to AIO_INPROGRESS before calling aioread(3AIO). However, this notification mechanism requires the calling thread to block until it is notified of AIO completion.

  • SIGIO : A completed AIO operation returns a SIGIO signal to the process. However, when there is a large number of signals to be handled in the application, signals, in general, tend to be highly unreliable.

  • Two sets of interfaces do asynchronous I/O in Solaris: the aioread(3AIO) and aiowrite(3AIO) routines; and the POSIX-equivalent routines, aio_read(3RT) and aio_write(3RT), which are based on the POSIX standards for real-time extensions.

    The POSIX and Solaris asynchronous I/O interfaces are functionally identical. The real differences exist in the semantics of using one interface or the other, and the system libraries that must be linked in. Using the Solaris asynchronous I/O interfaces requires linking to libaio, whereas using the POSIX asynchronous I/O interfaces requires linking to libposix4.

However, the POSIX implementations are not suitable for this application, as they do not give good blocking wait semantics - there is no aiowait() equivalent in the POSIX library. They use an AIO control block to submit I/Os, then use aio_error / aio_suspend to reap them. They are better for files than for sockets.

Things to Watch Out For

AIO is not a cure-all. If the application uses a third party library that implements asynchronous operations, this may not be the best solution. This is because of the "anonymous" nature of aiowait(), which returns the result of any asynchronous read or write operation. And if a third party AIO library is used in the application, this means that aiowait() could return result sets of the third party library AIO calls as well. Due to obvious interface disparities, this could lead to unpredictable behavior.

In such a case, it would be better to use some other notification technique such as signals or doors. Signals are usually to be avoided, since they are notoriously difficult to use reliably. It is very easy to lose signals, especially as the number of the signals that need to be handled increases.

Doors are an IPC (inter-process communication) mechanism used for fast, secure control transfer between local processes and threads. A door descriptor is used to describe a procedure and optionally some additional state associated with the procedure (e.g. a data structure containing information about AIO completion). For more information on doors, refer to the door_create() and door_call() man pages.

The efficiency of the AIO approach also depends on which Solaris version is used. Older Solaris versions implement asynchronous I/O directly in the kernel only for certain classes of I/O devices, not including sockets. For other devices, the aioread() or aiowrite() actually creates a user-level thread to execute an ordinary read() or write(), while the calling thread keeps running. Essentially, this strategy "degenerates" into the concurrent server model when large numbers of socket connections are being handled.

As of Kernel Update 4 to Solaris 8 the kernel supports asynchronous I/O for sockets without needing user-level threads. A network server application built on the thread pool model with asynchronous I/O will see substantial performance gains without code changes when running on the newer Solaris versions.

Code

The code for the application discussed above can all be downloaded: mult_srv.c and tpool.c implement the server, mult_srv.h and tpool.h are the respective header files,the client is done in client.c, and the socket connections are handled in connectsock.c, connectTCP.c, passivesock.c, passiveTCP.c, and errexit.c.
mult_srv.c , tpool.c, and client.c are shown below.



mult_srv.c

/* static char sccsid [ ] = "@(#)thread_pools.html	1.7 01/03/27 15:21:40 SCCS ID";*/ 

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/errno.h>
#include <netinet/in.h>
#include <sys/types.h>

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/asynch.h>
#include <sys/fcntl.h>
#include <sys/asynch.h>
#include <pthread.h>
 
#include "tpool.h"

#include <tnf/probe.h>

tpool_t srv_thread_pool;
int     no_of_act_conn = 0;

pthread_mutex_t global_data_mutex = 
PTHREAD_MUTEX_INITIALIZER;

/* Master socket */

int msock;

/* connections handled */
slavesocket_t clientreq[MAXSOCKS];

extern int	errno;

void	reaper(int);
int	errexit(const char *format, ...);
int	passiveTCP(const char *service, int qlen);

/*----------------------------------------------------
 * main - Concurrent TCP server for ECHO service
 *----------------------------------------------------
 */
int
main(int argc, char *argv[])
{
  char *service = "echo"; /*service name or port*/ 
                          /*number*/
 
  pthread_t *pollthreadid = 
  (pthread_t *) malloc(sizeof(pthread_t));


  switch (argc) {
  case 1:
    break;
  case 2:
    service = argv[1];
    break;
  default:
    errexit("usage: TCPechod [port]n");
  }

  /* start the server and workder threads */

  server_init();
  msock = passiveTCP(service, QLEN);
 
  pthread_create(pollthreadid, NULL, aiopoll, (void *) 
  NULL);  
	
while (1) {
  if (no_of_act_conn < MAXSOCKS) {	

    clientreq[no_of_act_conn].ssock = accept(msock, 
    (struct sockaddr *)
    &clientreq[no_of_act_conn].fsin, 
    &clientreq[no_of_act_conn].alen);
 
    clientreq[no_of_act_conn].offsetp = 0;

    clientreq[no_of_act_conn].aioflag = 1;
		
    memset(clientreq[no_of_act_conn].aiobuf, 0, 
    BUFSIZE);

    TCPechodread(&clientreq[no_of_act_conn]); 
    /* calling aioread from main thread */

    no_of_act_conn++;
   }
  }
}


/*****************************
 * Server Initialization     *
 *****************************/

void server_init() {

  tpool_init(&srv_thread_pool, MAX_THREADS, 
  MAX_THREADS );
}



tpool.c

/* tpool.c -- 
 * 
 */

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>

#include <pthread.h>
#include <tpool.h>
#include <sys/signal.h>
#include <poll.h>

#include <tnf/probe.h>


	/* pool synchronization */
  pthread_mutex_t     queue_lock       = 
    PTHREAD_MUTEX_INITIALIZER;
  pthread_cond_t      queue_not_empty  = 
    PTHREAD_COND_INITIALIZER;
  pthread_cond_t      queue_not_full   = 
    PTHREAD_COND_INITIALIZER;
  pthread_cond_t      queue_empty      = 
    PTHREAD_COND_INITIALIZER;
  pthread_mutex_t     conn_pool_lock   = 
    PTHREAD_MUTEX_INITIALIZER;

extern tpool_t srv_thread_pool;
extern slavesocket_t clientreq[MAXSOCKS];
extern int no_of_act_conn;
void *tpool_thread(void *);

void sigpipedo(int);
long long r_diff, w_diff;
char temp[BUFSIZE];
void tpool_init(tpool_t   *tpoolp,
    int       num_worker_threads, 
    int       max_queue_size)
{
  int i, rtn;
  tpool_t tpool;

  TNF_PROBE_0_DEBUG(tpool_init, "tpool_probes", "");  


for (i=0; i <MAXSOCKS; i++) 
  clientreq[i].aioflag = -1;

 /* Initializing the pool and allocating the data structures */

  /* allocate a pool data structure */ 

  if ((tpool = (tpool_t )malloc(sizeof(struct tpool))) 
    == NULL)
    perror("malloc"), exit(1);

  /* initialize the fields */

  if ((tpool->threads = (pthread_t *)
    malloc(sizeof(pthread_t)*num_worker_threads)) == NULL)
    perror("malloc"), exit(1);

  tpool->cur_queue_size = 0;
  tpool->queue_head = NULL; 
  tpool->queue_tail = NULL;
  tpool->queue_closed = 0;  

  /* create threads */
  for (i = 0; i != num_worker_threads; i++) {
    if ((rtn = pthread_create( &(tpool->threads[i]),
			      NULL,
			      tpool_thread,
			      (void *)tpool)) != 0)
      fprintf(stderr, "pthread_create %d",rtn), exit(1);
  }

  *tpoolp = tpool;
}

/* routine to add work items to the queue */

int tpool_add_work(
		   tpool_t          tpool,
		   void             (*routine)(),
		   slavesocket_t      *workorder)
{
  int rtn;

 TNF_PROBE_0_DEBUG(tpool_add_work, "tpool_probes", "");

  /* obtain a lock on the queue */

  if( (rtn = pthread_mutex_lock(&queue_lock)) != 0)
    fprintf(stderr,"pthread_mutex_lock %d",rtn), 
    exit(1);


  /* allocate work structure */
  
   workorder->routine = routine;
  

  /* aio operations */
  
   workorder->next = NULL;

/* adding an item to the queue */

  if (tpool->cur_queue_size == 0) {
    tpool->queue_tail = tpool->queue_head = workorder;

/* waking all workers */


  if ((rtn = pthread_cond_broadcast(&queue_not_empty)) 
    != 0)
      fprintf(stderr,"pthread_cond_broadcast %d",rtn), 
        exit(1);
  } else {
    tpool->queue_tail->next = workorder;
    tpool->queue_tail = workorder;
  }


  tpool->cur_queue_size++; 

/*  unlock the queue */
  if ((rtn = pthread_mutex_unlock(&queue_lock)) != 0)
    fprintf(stderr,"pthread_mutex_unlock %d" ,rtn), 
      exit(1);

  return 1;
}


/****************************
 *  Worker thread process   *
 ****************************/

void *tpool_thread(void *arg)
{
  tpool_t tpool = (tpool_t)arg; 
  int rtn;
  slavesocket_t	*my_workp;

  TNF_PROBE_0_DEBUG(tpool_thread, "tpool_probes", "");
	
  for(;;) {

    /* Check queue for work */ 

          /* lock queue */

    if ((rtn = pthread_mutex_lock(&queue_lock)) != 0)
      fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);

         /* no work on the queue? go back to sleep */

    while (tpool->cur_queue_size == 0)  {
 
        /* wait on the condition that queue_not _empty */
    
      if ((rtn = pthread_cond_wait
        (&queue_not_empty, &queue_lock)) != 0)
	fprintf(stderr,"pthread_cond_wait %dn",rtn),
          exit(1);

    }

    /* Get to work, dequeue the next item */ 

    my_workp = tpool->queue_head;
    tpool->cur_queue_size--;
    if (tpool->cur_queue_size == 0)
      tpool->queue_head = tpool->queue_tail = NULL;
    else
      tpool->queue_head = my_workp->next;
 
    if ((rtn = pthread_mutex_unlock(&queue_lock)) != 0)
      fprintf(stderr,"pthread_mutex_unlock %d",rtn), 
        exit(1);

    /* Do this work item */

    (*(my_workp->routine))(my_workp);

  } 
  return(NULL);            
}


/*******************************
 * Polling function            *
 *******************************/
 
void *aiopoll(void *arg) {
  int i;
  int rtn;
  int cc;
  aio_result_t *res;
  slavesocket_t *out;
  hrtime_t temp_t;
  struct timeval timeout = {0,0};
   
  TNF_PROBE_0_DEBUG(aiopoll, "tpool_probes", "");
  while (1) {
    if (no_of_act_conn > 0) {

      /* check for completed I/O; poll */
      if ( (res = aiowait(&timeout)) == -1 ) {
        continue;	
      }
		
      temp_t = gethrtime();
		
      if (res == 0) continue;

	/* result of aiowait */
      out = (slavesocket_t *) res;

      /* if aioflag == 2, set aioflag = 1,*/  
      /*perform aioread */

		
      if (out->aioflag == 2) {
		
        out->aft_write = temp_t;
	w_diff = out->aft_write - out->bef_write;
	
	sprintf(temp, "%lld", w_diff);
	strncat(out->aiobuf, "AFTER_WRITE", 
          strlen("AFTER_WRITE"));
	strncat(out->aiobuf, temp, strlen(temp));

        if ((rtn = pthread_mutex_lock(&conn_pool_lock)) 
          != 0)
        fprintf(stderr,"pthread_mutex_lock %d",rtn), 
          exit(1);
		    
	out->aioflag = 1;
		    
	memset(out->aiobuf, 0, BUFSIZE);

	/* ading item to work queue; calling aioread */
        tpool_add_work(srv_thread_pool, 
          TCPechodread, out ); 

        if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) 
          != 0)
	  fprintf(stderr,"pthread_mutex_lock %d",rtn), 
            exit(1);
	continue; 
      }  

      /* if aioflag == 1, call aiowrite */
    if (out->aioflag == 1) {
      if (out->aiores.aio_return) {
        out->aft_read = temp_t;
	r_diff = out->aft_read - out->bef_read;

	sprintf(temp, "%lld", r_diff);
	strncat(out->aiobuf, "AFTER_READ", 
          strlen("AFTER_READ"));
	strncat(out->aiobuf, temp, strlen(temp));

      if ((rtn = pthread_mutex_lock(&conn_pool_lock)) 
        != 0)
        fprintf(stderr,"pthread_mutex_lock %d",rtn), 
          exit(1);
      TNF_PROBE_0_DEBUG(in_aiopoll_calling_TCPechodwrite, 
        "tpool_probes", "");

      /* adding item to work queue, calling aiowrite */
      tpool_add_work(srv_thread_pool, TCPechodwrite, out );  

      if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) != 0)
        fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1); 
      continue;
      }
}
	
  /* if (aioflag != 1 || aioflag != 2); close connection */
      printf("closing connection %dn", out->ssock);
        if ((rtn = pthread_mutex_lock(&conn_pool_lock)) 
          != 0)
	  fprintf(stderr,"pthread_mutex_lock %d",rtn), 
            exit(1);
	  out->aioflag = -1;
	  aiocancel(&(out->aiores));
          close(out->ssock);
            if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) 
              != 0)
              fprintf(stderr,"pthread_mutex_lock %d",rtn), 
                exit(1);
	       
      }
   }
}
 
/* signal handler */
void sigpipedo (int sig) {

};


void TCPechodwrite(slavesocket_t *connreq) {

  int tmpoffset = connreq->aiores.aio_return;
  int rtn, rtnaio;

  signal(SIGPIPE, sigpipedo); 

  if ((rtn = pthread_mutex_lock(&conn_pool_lock)) != 0)
    fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
 
      connreq->aioflag = 2;

	
      memset(temp, 0, BUFSIZE);
	
      connreq->bef_write = gethrtime();
	
      if( aiowrite(connreq->ssock, connreq->aiobuf, 55, 0, 
        SEEK_SET,  
      &(connreq->aiores)) == -1) {
      errexit("aiowrite errorn");
      }


      TNF_PROBE_0_DEBUG(after_write, "tpool_probes", "");

      connreq->offsetp += tmpoffset;

    if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) != 0)
      fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
}
 


void TCPechodread(slavesocket_t *connreq) {

  int rtn;
  
    signal(SIGPIPE, sigpipedo);

    if ((rtn = pthread_mutex_lock(&conn_pool_lock)) != 0)
      fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);

  	connreq->aioflag = 1;
	
	connreq->bef_read = gethrtime();
	
 if ( aioread(connreq->ssock, connreq->aiobuf, BUFSIZE, 0, 
 SEEK_SET, &(connreq->aiores)) == -1) {
        connreq->aioflag = -1;
     }

    if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) != 0)
      fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
}



client.c

/* TCPecho.c - main, TCPecho */

#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <pthread.h>

#define MAXTHREADS  10000 
#define LINELEN     82

typedef struct  threadstruct {
  char         *host;
  char         *service ;
  int           threadno;
  pthread_t     threadidt;
  pthread_attr_t threadattr;
  char	        buf[LINELEN+1];	
  int	        s, n;			
  int	        outchars, inchars;	
  int           count;
  double        timetaken, mintime, avgtime, sum;
  struct timeval pbef_time, paft_time; 
 } threadstruct_t;

struct timeval iter_bef_time, iter_aft_time;
double iter, time_stamp;
int t1=0, t2=0, t3=0, t4=0, t5=0, t6=0, t7=0, 
  t8=0, t9=0, t10=0, t11=0;
int a1 = 0, a2 = 0, a4 = 0, a3 = 0, a5= 0, a6 = 0;
extern int	errno;
int	TCPecho( char *host, char *service, 
  int noofthreads);
int	errexit(const char *format, ...);
int	connectTCP(const char *host, 
  const char *service);

threadstruct_t clients[MAXTHREADS];

void *doroutine(void *);

/*-----------------------------------------------
 * main - TCP client for ECHO service
 *-----------------------------------------------
 */
int
main(int argc, char *argv[])
{
  char  *hostt, *servicet;

  int noofthreads;
  switch (argc) {
  case 1:
    hostt = "localhost";
    break;
  case 4:
    noofthreads = atoi(argv[3]);
  case 3:
    servicet = argv[2];
    /* FALL THROUGH */
  case 2:
    hostt = argv[1];
    break;
  default:
    fprintf(stderr, 
      "usage: client host port threadsn");
    exit(1);
  }


  if (noofthreads > MAXTHREADS)
    errexit("Maximum number of threads is %dn", 
      MAXTHREADS);

  TCPecho(hostt, servicet, noofthreads);
  exit(0);
}

/*------------------------------------------------------------------------
 * TCPecho - send input to ECHO service on specified host and print reply
 *------------------------------------------------------------------------
 */
int
TCPecho( char *host,  char *service, int noofthreads)
{

  int i;
  pthread_t idt;

  for ( i =0; i < noofthreads; i++) {
    clients[i].host = host;
    clients[i].service = service;
    clients[i].count = 0;
    clients[i].timetaken = 0;
    clients[i].threadno = i + 1;
    pthread_attr_init(&clients[i].threadattr);
    pthread_attr_setscope(&(clients[i].threadattr), 
    PTHREAD_SCOPE_SYSTEM);
    pthread_create(
    &clients[i].threadidt, NULL, doroutine,  
    (void *) &clients[i]);
      }

        for (i=0; i < noofthreads; i++) {
	  pthread_join(clients[i].threadidt, NULL);
      }
}


void *doroutine(void *argt) {

  char *tmp1, *tmp2, *tmp3;
  double r_diff;

  size_t sz = 147;
 
  threadstruct_t  *arg = (threadstruct_t *) argt;

  arg->s = connectTCP(arg->host, arg->service);

  gettimeofday(&iter_bef_time, NULL);
	
  while(1){
		
  memset(arg->buf, 0, LINELEN);
  strncat(arg->buf, "CLIENT", strlen("CLIENT"));
  arg->outchars = strlen(arg->buf); 
	
  gettimeofday(&(arg->pbef_time), NULL);

    (void) write(arg->s, arg->buf, arg->outchars);

    memset(arg->buf, 0, LINELEN);

    /* read it back */
    arg->n = read(arg->s, &(arg->buf), 55); 

    if (arg->n < 0)
    perror("socket read failedn"), exit(1);

    gettimeofday(&(arg->paft_time), NULL); 

    tmp1 = &(arg->buf[16]);		 
    tmp2 = (char *)malloc(17);
    strncpy(tmp2, tmp1, strlen(tmp1));
    tmp2[strlen(tmp2)-1] = '0';
	
    r_diff = atoi(tmp2);
    r_diff = r_diff / 1000000000; 

    /*tmp1 = &(arg->buf[37]);
    tmp3 = (char *)malloc(17);
    strncpy(tmp3, tmp1, 16);
    tmp3[16] = '0';
	
    ar = atof(tmp3); */
	
    /*tmp1 = &(arg->buf[11]);		 
    tmp4 = (char *)malloc(17);
    strncpy(tmp4, tmp1, 16);
    tmp4[16] = '0';
	
    bw = atof(tmp4);

    tmp1 = &(arg->buf[39]);
    tmp5 = (char *)malloc(17);
    strncpy(tmp5, tmp1, 16);
    tmp5[16] = '0';
	
    aw = atof(tmp5);*/		
/*CLIENT_READ969994591.142932AFTER_READ969994594.181865*/

    /*	fputs(arg->buf, stdout); */
	
   arg->timetaken = (arg->paft_time.tv_sec - arg->
   pbef_time.tv_sec) + 
     (1 - arg->pbef_time.tv_usec*0.000001) + 
   arg->paft_time.tv_usec*0.000001;
		
   if(arg->timetaken >= 
     0 && arg->timetaken <=10) t1++;
   if(arg->timetaken >= 
     11 && arg->timetaken <=20) t2++;
   if(arg->timetaken >= 
     21 && arg->timetaken <=30) t3++;
   if(arg->timetaken >= 
     31 && arg->timetaken <=40) t4++;
   if(arg->timetaken >= 41 
     && arg->timetaken <=50) t5++;
   if(arg->timetaken >= 
     51 && arg->timetaken <=60) t6++;
   if(arg->timetaken >= 
     61 && arg->timetaken <=70) t7++;
   if(arg->timetaken >= 
     71 && arg->timetaken <=80) t8++;
   if(arg->timetaken >= 
     81 && arg->timetaken <=90) t9++;
   if(arg->timetaken >= 
     91 && arg->timetaken <100) t10++;
   if(arg->timetaken >= 100 ) t11++;

   if(r_diff >= 0 && r_diff <=1) a1++;
   if(r_diff >1  && r_diff <=2) a2++;
   if(r_diff >2 && r_diff <=4) a3++;
   if(r_diff >4 && r_diff <= 6) a4++;
   if(r_diff >6 && r_diff <=10) a5++;
   if(r_diff >10) a6++;

   /* set the minimum time  */
   if (arg->count == 0)  arg->mintime = 
     arg->timetaken;
   if (arg->timetaken < arg->mintime) 
     arg->mintime = arg->timetaken;
   arg->sum += arg->timetaken;
   arg->count ++;
   arg->avgtime = arg->timetaken/arg->count;

   printf("%dt  %dt %10.6ft %10.10ft %10.6ft %10.6fn", 
   arg->threadno, arg->count, arg->timetaken, 
     r_diff, arg->mintime, 
   arg->avgtime);

   /*fclose(arg->fp);*/
   sleep(2);
	
   gettimeofday(&iter_aft_time, NULL);
	
   iter = (iter_aft_time.tv_sec - iter_bef_time.tv_sec) +
   (1 - iter_bef_time.tv_usec*0.000001) + 
   iter_aft_time.tv_usec*0.000001;
	
   if(iter > 10) {

   printf("Execution time = %10.6f", iter);
   printf("Number of iterations = %dn", arg->count);
   printf("Response time ranges : n");
   printf("0-10 t 11-20 t 21-30 t 31-40 t 41-50 
   + nn");
   printf("%d t  %d t  %d t  %d t  %d t  nn", 
     t1, t2, t3, t4, t5);
   printf("51-60 t 61-70 t 71-80 t 81-90 t 91-100 
   + t 100+ nn");
   printf("%d t  %d t  %d t  %d t  %d t  %d t  nn", 
     t6, t7, t8, t9, t10,               t11);
   printf("Aioread time ranges : n");
   printf("0-1 t 1-2 t 2-4 t 4-6 t 6-10 t 10+ n");
   printf("%d t %d t %d t %d t %d t %d n", 
     a1, a2, a3, a4,a5, a6);
   exit(0); }

  }
}

November 2000

Rate and Review Tell us what you think of the content of this page. Excellent   Good   Fair   Poor   Comments:
If you would like a reply to your comment, please submit your email address:
Note: We may not respond to all submitted comments.
Close    To Top
  • Prev Article-OS:
  • Next Article-OS:
  • Now: Tutorial for Web and Software Design > OS > Solaris > OS Content
    Photoshop Tutorial
     

    Special Effect

      3D Effect
      Photoshop Articles
    Programming Tutorial
     

    C/C++ Tutorial

      Visual Basic
      C# Tutorial
    Database Tutorial
     

    MySQL Tutorial

      MS SQL Tutorial
      Oracle Tutorial
    Geek Tutorial
     

    Blogging Tutorial

      RSS Tutorial
      Podcasting Tutorial
    Graphic Design Tutorial
      Coreldraw Tutorial
      Illustrator Tutorial
      3D Tutorials
    Webmaster Articles
     

    Domain Service

      Web Hosting
      Site Promotion
    Java Tutorial/ Articles
     

    Java Servlets

      JavaEE Tutorial
     

    JavaBeans Tutorial

    XML Tutorial/ Articles
     

    XML Style

      AJAX Tutorial
      XML Mobile
    Flash Tutorial/ Articles
     

    Flash Video

      Action Script
      Flash Articles
    OS Tutorial/ Articles
      Linux Tutorial
      Symbian Tutorial
      MacOS Tutorial
    Personal Tech
      Hardware Tutorial
      Software Tutorial
      Online Auction