Use the kqueue(2) framework instead of the poll(2) system call to wait

for replies and timeouts in the conectionless (UDP) RPC client
code. Based on similar changes from FreeBSD in rev 1.15.

Reviewed by Christos Zoulas.
This commit is contained in:
rpaulo 2005-10-13 23:40:08 +00:00
parent a64b99061e
commit b5e9a2e4c2
1 changed files with 97 additions and 137 deletions

View File

@ -1,4 +1,4 @@
/* $NetBSD: clnt_dg.c,v 1.17 2005/09/10 09:04:11 jmmv Exp $ */
/* $NetBSD: clnt_dg.c,v 1.18 2005/10/13 23:40:08 rpaulo Exp $ */
/*
* Sun RPC is a product of Sun Microsystems, Inc. and is provided for
@ -39,7 +39,7 @@
#if 0
static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro";
#else
__RCSID("$NetBSD: clnt_dg.c,v 1.17 2005/09/10 09:04:11 jmmv Exp $");
__RCSID("$NetBSD: clnt_dg.c,v 1.18 2005/10/13 23:40:08 rpaulo Exp $");
#endif
#endif
@ -49,7 +49,7 @@ __RCSID("$NetBSD: clnt_dg.c,v 1.17 2005/09/10 09:04:11 jmmv Exp $");
#include "namespace.h"
#include "reentrant.h"
#include <sys/poll.h>
#include <sys/event.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
@ -135,7 +135,8 @@ struct cu_data {
u_int cu_sendsz; /* send size */
char *cu_outbuf;
u_int cu_recvsz; /* recv size */
struct pollfd pfdp;
struct kevent cu_kin;
int cu_kq;
char cu_inbuf[1];
};
@ -283,8 +284,8 @@ clnt_dg_create(fd, svcaddr, program, version, sendsz, recvsz)
cl->cl_auth = authnone_create();
cl->cl_tp = NULL;
cl->cl_netid = NULL;
cu->pfdp.fd = cu->cu_fd;
cu->pfdp.events = POLLIN | POLLPRI | POLLRDNORM | POLLRDBAND;
cu->cu_kq = -1;
EV_SET(&cu->cu_kin, cu->cu_fd, EVFILT_READ, EV_ADD, 0, 0, 0);
return (cl);
err1:
warnx(mem_err_clnt_dg);
@ -314,21 +315,20 @@ clnt_dg_call(cl, proc, xargs, argsp, xresults, resultsp, utimeout)
size_t outlen;
struct rpc_msg reply_msg;
XDR reply_xdrs;
struct timeval time_waited;
bool_t ok;
int nrefreshes = 2; /* number of times to refresh cred */
struct timeval timeout;
struct timeval retransmit_time;
struct timeval startime, curtime;
int firsttimeout = 1;
struct timeval next_sendtime, starttime, time_waited, tv;
struct kevent kv;
#ifdef _REENTRANT
sigset_t mask, *maskp = &mask;
#else
sigset_t *maskp = NULL;
sigset_t mask;
#endif
sigset_t newmask;
ssize_t recvlen = 0;
struct timespec ts;
size_t kin_len;
int n;
_DIAGASSERT(cl != NULL);
@ -349,7 +349,18 @@ clnt_dg_call(cl, proc, xargs, argsp, xresults, resultsp, utimeout)
time_waited.tv_sec = 0;
time_waited.tv_usec = 0;
retransmit_time = cu->cu_wait;
retransmit_time = next_sendtime = cu->cu_wait;
gettimeofday(&starttime, NULL);
/* Clean up in case the last call ended in a longjmp(3) call. */
if (cu->cu_kq >= 0)
(void)close(cu->cu_kq);
if ((cu->cu_kq = kqueue()) < 0) {
cu->cu_error.re_errno = errno;
cu->cu_error.re_status = RPC_CANTSEND;
goto out;
}
kin_len = 1;
call_again:
xdrs = &(cu->cu_outxdrs);
@ -362,8 +373,8 @@ call_again:
if ((! XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
(! AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
(! (*xargs)(xdrs, argsp))) {
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_CANTENCODEARGS);
cu->cu_error.re_status = RPC_CANTENCODEARGS;
goto out;
}
outlen = (size_t)XDR_GETPOS(xdrs);
@ -372,16 +383,16 @@ send_again:
(struct sockaddr *)(void *)&cu->cu_raddr, (socklen_t)cu->cu_rlen)
!= outlen) {
cu->cu_error.re_errno = errno;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_CANTSEND);
cu->cu_error.re_status = RPC_CANTSEND;
goto out;
}
/*
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_TIMEDOUT);
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
/*
* sub-optimal code appears here because we have
@ -394,139 +405,82 @@ send_again:
for (;;) {
TIMEVAL_TO_TIMESPEC(&retransmit_time, &ts);
switch (pollts(&cu->pfdp, 1, &ts, maskp)) {
case 0:
time_waited.tv_sec += retransmit_time.tv_sec;
time_waited.tv_usec += retransmit_time.tv_usec;
while (time_waited.tv_usec >= 1000000) {
time_waited.tv_sec++;
time_waited.tv_usec -= 1000000;
}
/* update retransmit_time */
if (retransmit_time.tv_sec < RPC_MAX_BACKOFF) {
retransmit_time.tv_usec *= 2;
retransmit_time.tv_sec *= 2;
while (retransmit_time.tv_usec >= 1000000) {
retransmit_time.tv_sec++;
retransmit_time.tv_usec -= 1000000;
}
}
/* Decide how long to wait. */
if (timercmp(&next_sendtime, &timeout, <))
timersub(&next_sendtime, &time_waited, &tv);
else
timersub(&timeout, &time_waited, &tv);
if (tv.tv_sec < 0 || tv.tv_usec < 0)
tv.tv_sec = tv.tv_usec = 0;
TIMEVAL_TO_TIMESPEC(&tv, &ts);
if ((time_waited.tv_sec < timeout.tv_sec) ||
((time_waited.tv_sec == timeout.tv_sec) &&
(time_waited.tv_usec < timeout.tv_usec)))
goto send_again;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_TIMEDOUT);
n = kevent(cu->cu_kq, &cu->cu_kin, kin_len, &kv, 1, &ts);
/* We don't need to register the event again. */
kin_len = 0;
case -1:
if (errno == EBADF) {
if (n == 1) {
if (kv.flags & EV_ERROR) {
cu->cu_error.re_errno = (int)kv.data;
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
/* We have some data now */
do {
recvlen = recvfrom(cu->cu_fd, cu->cu_inbuf,
cu->cu_recvsz, 0, NULL, NULL);
} while (recvlen < 0 && errno == EINTR);
if (recvlen < 0 && errno != EWOULDBLOCK) {
cu->cu_error.re_errno = errno;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_CANTRECV);
}
if (errno != EINTR) {
errno = 0; /* reset it */
continue;
}
/* interrupted by another signal, update time_waited */
if (firsttimeout) {
/*
* Could have done gettimeofday before clnt_call
* but that means 1 more system call per each
* clnt_call, so do it after first time out
*/
if (gettimeofday(&startime,
(struct timezone *) NULL) == -1) {
errno = 0;
continue;
}
firsttimeout = 0;
errno = 0;
continue;
};
if (gettimeofday(&curtime,
(struct timezone *) NULL) == -1) {
errno = 0;
continue;
};
time_waited.tv_sec += curtime.tv_sec - startime.tv_sec;
time_waited.tv_usec += curtime.tv_usec -
startime.tv_usec;
while (time_waited.tv_usec < 0) {
time_waited.tv_sec--;
time_waited.tv_usec += 1000000;
};
while (time_waited.tv_usec >= 1000000) {
time_waited.tv_sec++;
time_waited.tv_usec -= 1000000;
}
startime.tv_sec = curtime.tv_sec;
startime.tv_usec = curtime.tv_usec;
if ((time_waited.tv_sec > timeout.tv_sec) ||
((time_waited.tv_sec == timeout.tv_sec) &&
(time_waited.tv_usec > timeout.tv_usec))) {
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_TIMEDOUT);
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
if (recvlen >= sizeof(uint32_t) &&
(*((uint32_t *)(void *)(cu->cu_inbuf)) ==
*((uint32_t *)(void *)(cu->cu_outbuf)))) {
/* We now assume we have the proper reply. */
break;
}
}
if (n == -1) {
#ifdef _REENTRANT
if (errno == EINTR) {
sigset_t rmask;
if (sigpending(&rmask) == -1) {
cu->cu_error.re_errno = errno;
release_fd_lock(cu->cu_fd, mask);
return cu->cu_error.re_status =
cu->cu_error.re_status =
RPC_SYSTEMERROR;
goto out;
}
(void)sigsuspend(&rmask);
}
} else
#endif
errno = 0; /* reset it */
continue;
};
if (cu->pfdp.revents & POLLNVAL || (cu->pfdp.revents == 0)) {
cu->cu_error.re_status = RPC_CANTRECV;
/*
* Note: we're faking errno here because we
* previously would have expected pollts() to
* return -1 with errno EBADF. Poll(BA_OS)
* returns 0 and sets the POLLNVAL revents flag
* instead.
*/
cu->cu_error.re_errno = errno = EBADF;
release_fd_lock(cu->cu_fd, mask);
return (-1);
}
/* We have some data now */
do {
if (errno == EINTR) {
/*
* Must make sure errno was not already
* EINTR in case recvfrom() returns -1.
*/
errno = 0;
{
cu->cu_error.re_errno = errno;
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
recvlen = recvfrom(cu->cu_fd, cu->cu_inbuf,
(socklen_t)cu->cu_recvsz, 0, NULL, NULL);
} while (recvlen < 0 && errno == EINTR);
if (recvlen < 0) {
if (errno == EWOULDBLOCK)
continue;
cu->cu_error.re_errno = errno;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status = RPC_CANTRECV);
}
if (recvlen < sizeof (u_int32_t))
continue;
/* see if reply transaction id matches sent id */
if (*((u_int32_t *)(void *)(cu->cu_inbuf)) !=
*((u_int32_t *)(void *)(cu->cu_outbuf)))
continue;
/* we now assume we have the proper reply */
break;
gettimeofday(&tv, NULL);
timersub(&tv, &starttime, &time_waited);
/* Check for timeout. */
if (timercmp(&time_waited, &timeout, >)) {
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
/* Retransmit if necessary. */
if (timercmp(&time_waited, &next_sendtime, >)) {
/* update retransmit_time */
if (retransmit_time.tv_sec < RPC_MAX_BACKOFF)
timeradd(&retransmit_time, &retransmit_time,
&retransmit_time);
timeradd(&next_sendtime, &retransmit_time,
&next_sendtime);
goto send_again;
}
}
/*
@ -571,6 +525,10 @@ send_again:
cu->cu_error.re_status = RPC_CANTDECODERES;
}
out:
if (cu->cu_kq >= 0)
(void)close(cu->cu_kq);
cu->cu_kq = -1;
release_fd_lock(cu->cu_fd, mask);
return (cu->cu_error.re_status);
}
@ -793,6 +751,8 @@ clnt_dg_destroy(cl)
cond_wait(&dg_cv[cu_fd], &clnt_fd_lock);
if (cu->cu_closeit)
(void) close(cu_fd);
if (cu->cu_kq >= 0)
(void)close(cu->cu_kq);
XDR_DESTROY(&(cu->cu_outxdrs));
mem_free(cu, (sizeof (*cu) + cu->cu_sendsz + cu->cu_recvsz));
if (cl->cl_netid && cl->cl_netid[0])