Thread Pools Using Solaris 8 Asynchronous I/O
Introduction
This paper addresses the issue of enhancing server performance by
leveraging the concepts of thread pools and asynchronous I/O. It was
motivated by an ISV's need to port an NT-compliant-IOCP-using
application to Solaris 8.
The discussion is centered around one kind of server application,
where a large number of clients is serviced by a significantly
smaller number of server threads. It describes all the necessary APIs
and presents working example code for a server (and its matching
client) to illustrate how the APIs can be used. There are also
pointers to other Solaris APIs that could be used to attack the
problem, and a discussion of situations in which AIO might not be the
best choice.
This paper assumes that the reader is already familiar with
Solaris synchronous input/output facilities (read(),
write(), and so forth), with the network APIs that
create and manage Internet sockets, and with the POSIX threading
model ("Pthreads"). There are many excellent sources of
detailed information on these topics, including
Advanced Programming in the
UNIX Environment by W. Richard Stevens. The basics of
UNIX, including input/output as well as process control and many
other topics not covered in this paper.
UNIX Network Programming,
Volume 1: Networking APIs by W. Richard Stevens. The
essentials for writing clients and servers that communicate
across the Internet.
Multithreaded Programming with Pthreads by Bil
Lewis and Daniel J Berg. How to use the POSIX APIs for
multi-threaded programs.
The Usenet newsgroups comp.unix.programmer and
comp.programming.threads have pointers to more
information on all these topics, and are frequented by recognized
experts.
The Concept of Thread Pools
The designer of a server application can choose from a variety of
architectural models. Two common ones are:
The serial model,
where a single thread waits for client requests (typically over the
network). When the request comes in, the thread wakes up and handles
it.
The concurrent model, where a single thread
waits for client requests and then creates a new thread to handle
the request every time a client request comes in. The original thread
then loops back and goes back to listening for incoming requests
while the new thread handles the request.
The serial model does not handle multiple, simultaneous requests
well. If two clients make a request simultaneously, one has to wait
until the other one is serviced. Hence, this model is discarded as
being good only for extremely simplistic server applications.
Due to the limitations of the serial model, the concurrent model
is quite popular. The advantage is that the thread that is waiting
for incoming requests has very little work to do. In most cases, this
thread is sleeping most of the time. Incoming client requests are
handled expediently, because each request gets its own thread. Also,
the concurrent model scales well on multiprocessor machines.
However, it was noticed that the performance of the concurrent
model was not as high as developers liked it to be. When there is a
large number of clients connected to the server simultaneously, it
implies that there is an equally large number of threads running on
the server concurrently. Since all the threads are runnable
(not suspended and waiting for something to happen), the kernel
spends most of its time context switching between these threads and
very little time actually performing the service.
In such a situation, an elegant solution would be to have a thread
pool: a collection of N threads that are used to run tasks. In
this model, one of the fundamental issues is "multiplexing"
a large number of clients onto a much smaller number of worker
threads. Disparities as great as three orders of magnitude (e.g., 200
threads serving 100,000 clients) are not uncommon, and efficient
management of the dispatch problem is essential to the application's
performance.
In the thread pool model, none of the threads is bound to a
particular client or service. Every time a job needs to be done, it
is handed off to any thread from the pool. The threads are available
to run any task and they will run up to N at a time. If there are
more tasks than threads, tasks wait in a queue until a thread becomes
available before continuing.
Why Asynchronous I/O?
When an application executes I/O system calls like read()
or write(), the Solaris kernel translates these calls
into I/O commands and sends them to the I/O devices ( file systems,
network connections or raw devices such as printers, etc.). A read()
suspends the program until data arrives, then places the input in a
designated area and awakens the program again; the program can begin
processing the input data as soon as the read() system
call returns. Similarly, a write() suspends the program
until the output data can be disposed of; at the completion of the
write() the program wakes up and can reuse the "written"
memory for other purposes.
In other words, the program sleeps until the I/O operation
completes, and cannot perform computation during that time. For
performance sensitive applications (as, in most cases, client/server
applications are) it is not good that an application sleeps over I/O.
Asynchronous I/O (AIO) avoids this problem. The asynchronous
aioread() and aiowrite() system calls also
initiate an input or output operation, but they do not force the
program to wait idly until the operation completes. Instead, the
program keeps on running while the input or output is in progress;
computation and I/O proceed simultaneously and in parallel.
After
issuing an aioread(), the program cannot immediately
start processing data from the input buffer because the data might
not be there yet; aioread() returns as soon as the input
operation is initiated, but the operation may not yet have completed.
Similarly, when an aiowrite() returns, the program cannot
start overwriting its output buffer because the old data might not
have been drained out yet. Since the program and the I/O operations
run in parallel, the program needs a way to check on the progress of
the I/O operations and determine when they've actually finished.
The aiowait() system call suspends the program until
an I/O operation initiated by aioread() or aiowrite()
completes. When the aiowait() system call returns, the
program knows that the data exchange initiated by an aioread()
or aiowrite() has completed. This provides a synchronous method of notification.
In the context of a single file descriptor, this isn't too
exciting. Yes, it enables a program to engage in fancy
double-buffering schemes, but Solaris typically manages that sort of
thing even with synchronous read() and write().
The crucial point is that a program can have many asynchronous I/O
operations in progress simultaneously. A program can start an
aioread() on file descriptor 7, say, and while that
operation is in progress the program can start an aiowrite()
on file descriptor 9. Later, when the program has nothing better to
do, it executes an aiowait() to suspend operations until
one of the operations finishes. A trace of the action might look
something like the diagram at the right.
One important point to observe here is that the I/O operations
will not necessarily complete in the same order they were started;
when the program does an aiowait() it waits for the
completion of any aioread() or aiowrite(),
no matter when it was issued. If several I/O operations are in
progress, the program needs a way to determine which one has
completed. The following sample application shows one way of doing this.
Another thing to note is that it's perfectly alright for an I/O
to finish while the program is not waiting for it; all that happens
is that the program's next aiowait() will return
immediately, reporting the already-completed I/O instead of
suspending the program.
In the server design presented in this paper, you will see that
the computational activities in the above diagram are spread across
several threads of execution: one thread will initiate an aioread(),
for example, and an entirely different thread will perform the
aiowait() which detects the read's completion. Although
this distribution of labor is important to the server's design, it
makes no essential difference to the way the AIO operations work. As
far as Solaris is concerned, all the aioread(),
aiowrite(), and aiowait() calls are coming
from the same process, and it doesn't matter whether the process has
one thread or a thousand. A thread that does an aioread()
or aiowrite() starts its I/O operation and keeps right
on running, and a thread that does an aiowait()
suspends until some asynchronous I/O, any asynchronous
I/O, finishes.
The Sample Application
The application developed to demonstrate the use of asynchronous
I/O with thread pools is a simple multi-threaded server that handles
just one kind of request from its clients. The server is implemented
in the boss-worker model, using a pool of worker threads.
Upon initialization, the server
Creates a main thread that in
turn
Creates a pool of worker threads
Creates a poll thread to perform
aiowait()
Creates a queue to hold work
requests
Opens a connection on a specified port number and listens for
incoming requests.

When
a request arrives, the main/boss thread puts it on the queue. The
pool of worker threads listens to this queue for requests. When a
request comes in on the queue, a worker thread extracts the request,
examines the contents, performs necessary function (in this test
case, aioread() or aiowrite()) and based on
the status of request, (completed or more work needed ), either puts
it back on the queue or returns data to the client. The worker
threads initiate aioread() or aiowrite()
and return to the thread pool to listen for more requests.
The poll thread performs aiowait() periodically. When
an asynchronous I/O operation completes, aiowait()
returns a pointer to the aio_result_t structure where
aioread() or aiowrite() was told to store
information about the completed operation. This aio_result_t
contains the number of bytes transferred and an error code if
applicable, but does not directly indicate which of the many I/O
operations has completed; it doesn't even say whether the
operation was input or output! It's up to the poll thread to figure
this out.
In this server, the problem is solved in a simple way: every
aioread() or aiowrite() must supply a
pointer to the aio_result_t to be used for that
operation, and by convention this aio_result_t is an
element of a larger structure containing additional information of
interest to the server:
typedef struct client_data { /* server-defined structure */
aio_result_t aiores; /* AIO result structure */
... /* other server-specific data */
} client_data_t;
Since the aio_result_t element is the first thing in
this structure, the address of the aio_result_t is also
the address of the enclosing client_data_t. All that's
needed is a cast to convert the pointer to the proper type:
aio_result_t *aioptr = aiowait(...);
client_data_t *cdtptr = (client_data_t*)aioptr;
Having located the client_data_t corresponding to the
completed I/O operation, the poll thread can chain it back onto the
request queue, where one of the worker threads will pick it up and
decide what to do next. Meanwhile, the poll thread loops back to the
aiowait() to wait for the next I/O completion.
Among other things....
AIO_INPROGRESS : aioread(3AIO)/
aiowrite(3AIO) return an
aio_result_t, which consists
of the following:
int aio_return; /*
return value of read( ) or write( ) */
int
aio_errno; /* value of errno for read( ) or write( ) */
Solaris does not use AIO_INPROGRESS for aio_return.
Therefore, completion of aioread(3AIO)
can be detected by initializing aio_return to AIO_INPROGRESS before
calling aioread(3AIO).
However, this notification mechanism requires the calling thread to
block until it is notified of AIO completion.
SIGIO : A completed AIO operation returns a SIGIO signal to
the process. However, when there is a large number of signals to be
handled in the application, signals, in general, tend to be highly
unreliable.
Two sets of interfaces do asynchronous I/O in Solaris: the
aioread(3AIO) and
aiowrite(3AIO) routines; and
the POSIX-equivalent routines, aio_read(3RT)
and aio_write(3RT), which are
based on the POSIX standards for real-time extensions.
The POSIX and Solaris asynchronous I/O interfaces are
functionally identical. The real differences exist in the semantics
of using one interface or the other, and the system
libraries that must be linked in. Using the Solaris asynchronous I/O
interfaces requires linking to libaio,
whereas using the POSIX asynchronous I/O interfaces requires linking
to libposix4.
However, the POSIX implementations are
not suitable for this application, as they do not give good blocking
wait semantics - there is no aiowait() equivalent in the POSIX library. They use an AIO control block
to submit I/Os, then use aio_error /
aio_suspend to reap them. They are better for files
than for sockets.
Things to Watch Out For
AIO is not a cure-all. If the application uses a third party
library that implements asynchronous operations, this may not be the
best solution. This is because of the "anonymous" nature of
aiowait(), which returns the result of any
asynchronous read or write operation. And if a third party AIO
library is used in the application, this means that aiowait()
could return result sets of the third party library AIO calls as
well. Due to obvious interface disparities, this could lead to
unpredictable behavior.
In such a case, it would be better to use some other notification
technique such as signals or doors. Signals
are usually to be avoided, since they are notoriously difficult to
use reliably. It is very easy to lose signals, especially as the
number of the signals that need to be handled increases.
Doors are an IPC (inter-process communication) mechanism used for
fast, secure control transfer between local processes and threads.
A door descriptor is used to describe a procedure and optionally some
additional state associated with the procedure (e.g. a data structure
containing information about AIO completion). For more information on
doors, refer to the door_create()
and door_call() man pages.
The efficiency of the AIO approach also depends on which Solaris
version is used. Older Solaris versions implement asynchronous I/O
directly in the kernel only for certain classes of I/O devices, not
including sockets. For other devices, the aioread() or
aiowrite() actually creates a user-level thread to
execute an ordinary read() or write(), while
the calling thread keeps running. Essentially, this strategy
"degenerates" into the concurrent server model when large
numbers of socket connections are being handled.
As of Kernel Update 4 to Solaris 8 the kernel
supports asynchronous I/O for sockets without needing user-level
threads. A network server application built on the thread pool model
with asynchronous I/O will see substantial performance gains without
code changes when running on the newer Solaris versions.
Code
The code for the application discussed above can all be downloaded: mult_srv.c and tpool.c implement the server, mult_srv.h and
tpool.h are the respective header files,the client is done in client.c, and the socket connections are handled in connectsock.c, connectTCP.c, passivesock.c, passiveTCP.c, and errexit.c.
mult_srv.c , tpool.c, and client.c are shown below.
mult_srv.c
/* static char sccsid [ ] = "@(#)thread_pools.html 1.7 01/03/27 15:21:40 SCCS ID";*/
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/errno.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/asynch.h>
#include <sys/fcntl.h>
#include <sys/asynch.h>
#include <pthread.h>
#include "tpool.h"
#include <tnf/probe.h>
tpool_t srv_thread_pool;
int no_of_act_conn = 0;
pthread_mutex_t global_data_mutex =
PTHREAD_MUTEX_INITIALIZER;
/* Master socket */
int msock;
/* connections handled */
slavesocket_t clientreq[MAXSOCKS];
extern int errno;
void reaper(int);
int errexit(const char *format, ...);
int passiveTCP(const char *service, int qlen);
/*----------------------------------------------------
* main - Concurrent TCP server for ECHO service
*----------------------------------------------------
*/
int
main(int argc, char *argv[])
{
char *service = "echo"; /*service name or port*/
/*number*/
pthread_t *pollthreadid =
(pthread_t *) malloc(sizeof(pthread_t));
switch (argc) {
case 1:
break;
case 2:
service = argv[1];
break;
default:
errexit("usage: TCPechod [port]n");
}
/* start the server and workder threads */
server_init();
msock = passiveTCP(service, QLEN);
pthread_create(pollthreadid, NULL, aiopoll, (void *)
NULL);
while (1) {
if (no_of_act_conn < MAXSOCKS) {
clientreq[no_of_act_conn].ssock = accept(msock,
(struct sockaddr *)
&clientreq[no_of_act_conn].fsin,
&clientreq[no_of_act_conn].alen);
clientreq[no_of_act_conn].offsetp = 0;
clientreq[no_of_act_conn].aioflag = 1;
memset(clientreq[no_of_act_conn].aiobuf, 0,
BUFSIZE);
TCPechodread(&clientreq[no_of_act_conn]);
/* calling aioread from main thread */
no_of_act_conn++;
}
}
}
/*****************************
* Server Initialization *
*****************************/
void server_init() {
tpool_init(&srv_thread_pool, MAX_THREADS,
MAX_THREADS );
}
tpool.c
/* tpool.c --
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include <tpool.h>
#include <sys/signal.h>
#include <poll.h>
#include <tnf/probe.h>
/* pool synchronization */
pthread_mutex_t queue_lock =
PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t queue_not_empty =
PTHREAD_COND_INITIALIZER;
pthread_cond_t queue_not_full =
PTHREAD_COND_INITIALIZER;
pthread_cond_t queue_empty =
PTHREAD_COND_INITIALIZER;
pthread_mutex_t conn_pool_lock =
PTHREAD_MUTEX_INITIALIZER;
extern tpool_t srv_thread_pool;
extern slavesocket_t clientreq[MAXSOCKS];
extern int no_of_act_conn;
void *tpool_thread(void *);
void sigpipedo(int);
long long r_diff, w_diff;
char temp[BUFSIZE];
void tpool_init(tpool_t *tpoolp,
int num_worker_threads,
int max_queue_size)
{
int i, rtn;
tpool_t tpool;
TNF_PROBE_0_DEBUG(tpool_init, "tpool_probes", "");
for (i=0; i <MAXSOCKS; i++)
clientreq[i].aioflag = -1;
/* Initializing the pool and allocating the data structures */
/* allocate a pool data structure */
if ((tpool = (tpool_t )malloc(sizeof(struct tpool)))
== NULL)
perror("malloc"), exit(1);
/* initialize the fields */
if ((tpool->threads = (pthread_t *)
malloc(sizeof(pthread_t)*num_worker_threads)) == NULL)
perror("malloc"), exit(1);
tpool->cur_queue_size = 0;
tpool->queue_head = NULL;
tpool->queue_tail = NULL;
tpool->queue_closed = 0;
/* create threads */
for (i = 0; i != num_worker_threads; i++) {
if ((rtn = pthread_create( &(tpool->threads[i]),
NULL,
tpool_thread,
(void *)tpool)) != 0)
fprintf(stderr, "pthread_create %d",rtn), exit(1);
}
*tpoolp = tpool;
}
/* routine to add work items to the queue */
int tpool_add_work(
tpool_t tpool,
void (*routine)(),
slavesocket_t *workorder)
{
int rtn;
TNF_PROBE_0_DEBUG(tpool_add_work, "tpool_probes", "");
/* obtain a lock on the queue */
if( (rtn = pthread_mutex_lock(&queue_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
/* allocate work structure */
workorder->routine = routine;
/* aio operations */
workorder->next = NULL;
/* adding an item to the queue */
if (tpool->cur_queue_size == 0) {
tpool->queue_tail = tpool->queue_head = workorder;
/* waking all workers */
if ((rtn = pthread_cond_broadcast(&queue_not_empty))
!= 0)
fprintf(stderr,"pthread_cond_broadcast %d",rtn),
exit(1);
} else {
tpool->queue_tail->next = workorder;
tpool->queue_tail = workorder;
}
tpool->cur_queue_size++;
/* unlock the queue */
if ((rtn = pthread_mutex_unlock(&queue_lock)) != 0)
fprintf(stderr,"pthread_mutex_unlock %d" ,rtn),
exit(1);
return 1;
}
/****************************
* Worker thread process *
****************************/
void *tpool_thread(void *arg)
{
tpool_t tpool = (tpool_t)arg;
int rtn;
slavesocket_t *my_workp;
TNF_PROBE_0_DEBUG(tpool_thread, "tpool_probes", "");
for(;;) {
/* Check queue for work */
/* lock queue */
if ((rtn = pthread_mutex_lock(&queue_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
/* no work on the queue? go back to sleep */
while (tpool->cur_queue_size == 0) {
/* wait on the condition that queue_not _empty */
if ((rtn = pthread_cond_wait
(&queue_not_empty, &queue_lock)) != 0)
fprintf(stderr,"pthread_cond_wait %dn",rtn),
exit(1);
}
/* Get to work, dequeue the next item */
my_workp = tpool->queue_head;
tpool->cur_queue_size--;
if (tpool->cur_queue_size == 0)
tpool->queue_head = tpool->queue_tail = NULL;
else
tpool->queue_head = my_workp->next;
if ((rtn = pthread_mutex_unlock(&queue_lock)) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn),
exit(1);
/* Do this work item */
(*(my_workp->routine))(my_workp);
}
return(NULL);
}
/*******************************
* Polling function *
*******************************/
void *aiopoll(void *arg) {
int i;
int rtn;
int cc;
aio_result_t *res;
slavesocket_t *out;
hrtime_t temp_t;
struct timeval timeout = {0,0};
TNF_PROBE_0_DEBUG(aiopoll, "tpool_probes", "");
while (1) {
if (no_of_act_conn > 0) {
/* check for completed I/O; poll */
if ( (res = aiowait(&timeout)) == -1 ) {
continue;
}
temp_t = gethrtime();
if (res == 0) continue;
/* result of aiowait */
out = (slavesocket_t *) res;
/* if aioflag == 2, set aioflag = 1,*/
/*perform aioread */
if (out->aioflag == 2) {
out->aft_write = temp_t;
w_diff = out->aft_write - out->bef_write;
sprintf(temp, "%lld", w_diff);
strncat(out->aiobuf, "AFTER_WRITE",
strlen("AFTER_WRITE"));
strncat(out->aiobuf, temp, strlen(temp));
if ((rtn = pthread_mutex_lock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
out->aioflag = 1;
memset(out->aiobuf, 0, BUFSIZE);
/* ading item to work queue; calling aioread */
tpool_add_work(srv_thread_pool,
TCPechodread, out );
if ((rtn = pthread_mutex_unlock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
continue;
}
/* if aioflag == 1, call aiowrite */
if (out->aioflag == 1) {
if (out->aiores.aio_return) {
out->aft_read = temp_t;
r_diff = out->aft_read - out->bef_read;
sprintf(temp, "%lld", r_diff);
strncat(out->aiobuf, "AFTER_READ",
strlen("AFTER_READ"));
strncat(out->aiobuf, temp, strlen(temp));
if ((rtn = pthread_mutex_lock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
TNF_PROBE_0_DEBUG(in_aiopoll_calling_TCPechodwrite,
"tpool_probes", "");
/* adding item to work queue, calling aiowrite */
tpool_add_work(srv_thread_pool, TCPechodwrite, out );
if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
continue;
}
}
/* if (aioflag != 1 || aioflag != 2); close connection */
printf("closing connection %dn", out->ssock);
if ((rtn = pthread_mutex_lock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
out->aioflag = -1;
aiocancel(&(out->aiores));
close(out->ssock);
if ((rtn = pthread_mutex_unlock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
}
}
}
/* signal handler */
void sigpipedo (int sig) {
};
void TCPechodwrite(slavesocket_t *connreq) {
int tmpoffset = connreq->aiores.aio_return;
int rtn, rtnaio;
signal(SIGPIPE, sigpipedo);
if ((rtn = pthread_mutex_lock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
connreq->aioflag = 2;
memset(temp, 0, BUFSIZE);
connreq->bef_write = gethrtime();
if( aiowrite(connreq->ssock, connreq->aiobuf, 55, 0,
SEEK_SET,
&(connreq->aiores)) == -1) {
errexit("aiowrite errorn");
}
TNF_PROBE_0_DEBUG(after_write, "tpool_probes", "");
connreq->offsetp += tmpoffset;
if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
}
void TCPechodread(slavesocket_t *connreq) {
int rtn;
signal(SIGPIPE, sigpipedo);
if ((rtn = pthread_mutex_lock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
connreq->aioflag = 1;
connreq->bef_read = gethrtime();
if ( aioread(connreq->ssock, connreq->aiobuf, BUFSIZE, 0,
SEEK_SET, &(connreq->aiores)) == -1) {
connreq->aioflag = -1;
}
if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
}
client.c
/* TCPecho.c - main, TCPecho */
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <pthread.h>
#define MAXTHREADS 10000
#define LINELEN 82
typedef struct threadstruct {
char *host;
char *service ;
int threadno;
pthread_t threadidt;
pthread_attr_t threadattr;
char buf[LINELEN+1];
int s, n;
int outchars, inchars;
int count;
double timetaken, mintime, avgtime, sum;
struct timeval pbef_time, paft_time;
} threadstruct_t;
struct timeval iter_bef_time, iter_aft_time;
double iter, time_stamp;
int t1=0, t2=0, t3=0, t4=0, t5=0, t6=0, t7=0,
t8=0, t9=0, t10=0, t11=0;
int a1 = 0, a2 = 0, a4 = 0, a3 = 0, a5= 0, a6 = 0;
extern int errno;
int TCPecho( char *host, char *service,
int noofthreads);
int errexit(const char *format, ...);
int connectTCP(const char *host,
const char *service);
threadstruct_t clients[MAXTHREADS];
void *doroutine(void *);
/*-----------------------------------------------
* main - TCP client for ECHO service
*-----------------------------------------------
*/
int
main(int argc, char *argv[])
{
char *hostt, *servicet;
int noofthreads;
switch (argc) {
case 1:
hostt = "localhost";
break;
case 4:
noofthreads = atoi(argv[3]);
case 3:
servicet = argv[2];
/* FALL THROUGH */
case 2:
hostt = argv[1];
break;
default:
fprintf(stderr,
"usage: client host port threadsn");
exit(1);
}
if (noofthreads > MAXTHREADS)
errexit("Maximum number of threads is %dn",
MAXTHREADS);
TCPecho(hostt, servicet, noofthreads);
exit(0);
}
/*------------------------------------------------------------------------
* TCPecho - send input to ECHO service on specified host and print reply
*------------------------------------------------------------------------
*/
int
TCPecho( char *host, char *service, int noofthreads)
{
int i;
pthread_t idt;
for ( i =0; i < noofthreads; i++) {
clients[i].host = host;
clients[i].service = service;
clients[i].count = 0;
clients[i].timetaken = 0;
clients[i].threadno = i + 1;
pthread_attr_init(&clients[i].threadattr);
pthread_attr_setscope(&(clients[i].threadattr),
PTHREAD_SCOPE_SYSTEM);
pthread_create(
&clients[i].threadidt, NULL, doroutine,
(void *) &clients[i]);
}
for (i=0; i < noofthreads; i++) {
pthread_join(clients[i].threadidt, NULL);
}
}
void *doroutine(void *argt) {
char *tmp1, *tmp2, *tmp3;
double r_diff;
size_t sz = 147;
threadstruct_t *arg = (threadstruct_t *) argt;
arg->s = connectTCP(arg->host, arg->service);
gettimeofday(&iter_bef_time, NULL);
while(1){
memset(arg->buf, 0, LINELEN);
strncat(arg->buf, "CLIENT", strlen("CLIENT"));
arg->outchars = strlen(arg->buf);
gettimeofday(&(arg->pbef_time), NULL);
(void) write(arg->s, arg->buf, arg->outchars);
memset(arg->buf, 0, LINELEN);
/* read it back */
arg->n = read(arg->s, &(arg->buf), 55);
if (arg->n < 0)
perror("socket read failedn"), exit(1);
gettimeofday(&(arg->paft_time), NULL);
tmp1 = &(arg->buf[16]);
tmp2 = (char *)malloc(17);
strncpy(tmp2, tmp1, strlen(tmp1));
tmp2[strlen(tmp2)-1] = '0';
r_diff = atoi(tmp2);
r_diff = r_diff / 1000000000;
/*tmp1 = &(arg->buf[37]);
tmp3 = (char *)malloc(17);
strncpy(tmp3, tmp1, 16);
tmp3[16] = '0';
ar = atof(tmp3); */
/*tmp1 = &(arg->buf[11]);
tmp4 = (char *)malloc(17);
strncpy(tmp4, tmp1, 16);
tmp4[16] = '0';
bw = atof(tmp4);
tmp1 = &(arg->buf[39]);
tmp5 = (char *)malloc(17);
strncpy(tmp5, tmp1, 16);
tmp5[16] = '0';
aw = atof(tmp5);*/
/*CLIENT_READ969994591.142932AFTER_READ969994594.181865*/
/* fputs(arg->buf, stdout); */
arg->timetaken = (arg->paft_time.tv_sec - arg->
pbef_time.tv_sec) +
(1 - arg->pbef_time.tv_usec*0.000001) +
arg->paft_time.tv_usec*0.000001;
if(arg->timetaken >=
0 && arg->timetaken <=10) t1++;
if(arg->timetaken >=
11 && arg->timetaken <=20) t2++;
if(arg->timetaken >=
21 && arg->timetaken <=30) t3++;
if(arg->timetaken >=
31 && arg->timetaken <=40) t4++;
if(arg->timetaken >= 41
&& arg->timetaken <=50) t5++;
if(arg->timetaken >=
51 && arg->timetaken <=60) t6++;
if(arg->timetaken >=
61 && arg->timetaken <=70) t7++;
if(arg->timetaken >=
71 && arg->timetaken <=80) t8++;
if(arg->timetaken >=
81 && arg->timetaken <=90) t9++;
if(arg->timetaken >=
91 && arg->timetaken <100) t10++;
if(arg->timetaken >= 100 ) t11++;
if(r_diff >= 0 && r_diff <=1) a1++;
if(r_diff >1 && r_diff <=2) a2++;
if(r_diff >2 && r_diff <=4) a3++;
if(r_diff >4 && r_diff <= 6) a4++;
if(r_diff >6 && r_diff <=10) a5++;
if(r_diff >10) a6++;
/* set the minimum time */
if (arg->count == 0) arg->mintime =
arg->timetaken;
if (arg->timetaken < arg->mintime)
arg->mintime = arg->timetaken;
arg->sum += arg->timetaken;
arg->count ++;
arg->avgtime = arg->timetaken/arg->count;
printf("%dt %dt %10.6ft %10.10ft %10.6ft %10.6fn",
arg->threadno, arg->count, arg->timetaken,
r_diff, arg->mintime,
arg->avgtime);
/*fclose(arg->fp);*/
sleep(2);
gettimeofday(&iter_aft_time, NULL);
iter = (iter_aft_time.tv_sec - iter_bef_time.tv_sec) +
(1 - iter_bef_time.tv_usec*0.000001) +
iter_aft_time.tv_usec*0.000001;
if(iter > 10) {
printf("Execution time = %10.6f", iter);
printf("Number of iterations = %dn", arg->count);
printf("Response time ranges : n");
printf("0-10 t 11-20 t 21-30 t 31-40 t 41-50
+ nn");
printf("%d t %d t %d t %d t %d t nn",
t1, t2, t3, t4, t5);
printf("51-60 t 61-70 t 71-80 t 81-90 t 91-100
+ t 100+ nn");
printf("%d t %d t %d t %d t %d t %d t nn",
t6, t7, t8, t9, t10, t11);
printf("Aioread time ranges : n");
printf("0-1 t 1-2 t 2-4 t 4-6 t 6-10 t 10+ n");
printf("%d t %d t %d t %d t %d t %d n",
a1, a2, a3, a4,a5, a6);
exit(0); }
}
}
November 2000