*
* Copyright (c) 2001, PostgreSQL Global Development Group
*
- * $Header: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v 1.5 2001/08/04 00:14:43 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v 1.6 2001/08/05 02:06:50 tgl Exp $
* ----------
*/
#include "postgres.h"
static int pgStatSock = -1;
static int pgStatPipe[2];
static struct sockaddr_in pgStatAddr;
-static int pgStatPmPipe[2];
+static int pgStatPmPipe[2] = { -1, -1 };
static int pgStatRunning = 0;
static int pgStatPid;
*/
static void pgstat_main(int real_argc, char *real_argv[]);
static void pgstat_recvbuffer(int real_argc, char *real_argv[]);
+static void pgstat_die(SIGNAL_ARGS);
static int pgstat_add_backend(PgStat_MsgHdr *msg);
static void pgstat_sub_backend(int procpid);
return 0;
/*
- * Create the UDP socket for receiving statistic messages
+ * Create the UDP socket for sending and receiving statistic messages
*/
if ((pgStatSock = socket(PF_INET, SOCK_DGRAM, 0)) < 0)
{
}
/*
- * Set the socket to non-blocking IO
+ * Connect the socket to its own address. This saves a few cycles
+ * by not having to respecify the target address on every send.
+ * This also provides a kernel-level check that only packets from
+ * this same address will be received.
+ */
+ if (connect(pgStatSock, (struct sockaddr *)&pgStatAddr, alen) < 0)
+ {
+ perror("PGSTAT: connect(2)");
+ close(pgStatSock);
+ pgStatSock = -1;
+ return -1;
+ }
+
+ /*
+ * Set the socket to non-blocking IO. This ensures that if the
+ * collector falls behind (despite the buffering process), statistics
+ * messages will be discarded; backends won't block waiting to send
+ * messages to the collector.
*/
if (fcntl(pgStatSock, F_SETFL, O_NONBLOCK) < 0)
{
}
/*
- * Then fork off the collector.
+ * Then fork off the collector. Remember its PID for pgstat_ispgstat.
*/
- switch(pgStatPid = (int)fork())
+ switch ((pgStatPid = (int)fork()))
{
case -1:
perror("PGSTAT: fork(2)");
return 0;
}
+ /* in postmaster child ... */
+ ClosePostmasterPorts(false);
+
pgstat_main(real_argc, real_argv);
exit(0);
}
+/* ----------
+ * pgstat_close_sockets() -
+ *
+ * Called when postmaster forks a non-pgstat child process, to close off
+ * file descriptors that should not be held open in child processes.
+ * ----------
+ */
+void
+pgstat_close_sockets(void)
+{
+ if (pgStatPmPipe[0] >= 0)
+ close(pgStatPmPipe[0]);
+ pgStatPmPipe[0] = -1;
+ if (pgStatPmPipe[1] >= 0)
+ close(pgStatPmPipe[1]);
+ pgStatPmPipe[1] = -1;
+}
+
+
/* ----------
* pgstat_beterm() -
*
{
PgStat_MsgBeterm msg;
- if (!pgstat_collect_startcollector)
+ if (!pgstat_collect_startcollector || pgStatSock < 0)
return;
msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
((PgStat_MsgHdr *)msg)->m_size = len;
- sendto(pgStatSock, msg, len, 0,
- (struct sockaddr *)&pgStatAddr, sizeof(pgStatAddr));
+ send(pgStatSock, msg, len, 0);
+ /* We deliberately ignore any error from send() */
}
/* ----------
* pgstat_main() -
*
- * The statistics collector itself.
+ * Start up the statistics collector itself. This is the body of the
+ * postmaster child process.
* ----------
*/
static void
{
PgStat_Msg msg;
fd_set rfds;
+ int readPipe;
+ int pmPipe = pgStatPmPipe[0];
int maxfd;
int nready;
- int len;
- int dlen;
+ int len = 0;
struct timeval timeout;
struct timeval next_statwrite;
bool need_statwrite;
* as well.
*/
close(pgStatPmPipe[1]);
+ pgStatPmPipe[1] = -1;
/*
- * Ignore all signals usually bound to some action in the postmaster
+ * Ignore all signals usually bound to some action in the postmaster,
+ * except for SIGCHLD --- see pgstat_recvbuffer.
*/
pqsignal(SIGHUP, SIG_IGN);
pqsignal(SIGINT, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, SIG_IGN);
pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
+ pqsignal(SIGCHLD, pgstat_die);
pqsignal(SIGTTIN, SIG_DFL);
pqsignal(SIGTTOU, SIG_DFL);
pqsignal(SIGCONT, SIG_DFL);
pqsignal(SIGWINCH, SIG_DFL);
/*
- * Start a buffering subprocess to read from the socket, so
+ * Start a buffering process to read from the socket, so
* we have a little more time to process incoming messages.
+ *
+ * NOTE: the process structure is: postmaster is parent of buffer process
+ * is parent of collector process. This way, the buffer can detect
+ * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
+ * collector failure until it tried to write on the pipe. That would mean
+ * that after the postmaster started a new collector, we'd have two buffer
+ * processes competing to read from the UDP socket --- not good.
*/
if (pipe(pgStatPipe) < 0)
{
exit(1);
}
- switch(fork())
+ switch (fork())
{
case -1:
perror("PGSTAT: fork(2)");
exit(1);
case 0:
- close(pgStatPipe[0]);
- /* child process should die if can't pipe to parent collector */
- pqsignal(SIGPIPE, SIG_DFL);
- pgstat_recvbuffer(real_argc, real_argv);
- exit(2);
-
- default:
+ /* child becomes collector process */
close(pgStatPipe[1]);
close(pgStatSock);
break;
+
+ default:
+ /* parent becomes buffer process */
+ close(pgStatPipe[0]);
+ pgstat_recvbuffer(real_argc, real_argv);
+ exit(0);
}
+ /*
+ * In the child we can have default SIGCHLD handling (in case we
+ * want to call system() here...)
+ */
+ pqsignal(SIGCHLD, SIG_DFL);
+
/*
* Identify myself via ps
*
}
memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends);
+ readPipe = pgStatPipe[0];
+
/*
* Process incoming messages and handle all the reporting stuff
- * until the postmaster waves us good bye.
+ * until there are no more messages.
*/
for (;;)
{
* Setup the descriptor set for select(2)
*/
FD_ZERO(&rfds);
- FD_SET(pgStatPipe[0], &rfds);
- FD_SET(pgStatPmPipe[0], &rfds);
+ FD_SET(readPipe, &rfds);
+ FD_SET(pmPipe, &rfds);
- if (pgStatPipe[0] > pgStatPmPipe[0])
- maxfd = pgStatPipe[0];
+ if (readPipe > pmPipe)
+ maxfd = readPipe;
else
- maxfd = pgStatPmPipe[0];
+ maxfd = pmPipe;
/*
* Now wait for something to do.
(need_statwrite) ? &timeout : NULL);
if (nready < 0)
{
+ if (errno == EINTR)
+ continue;
perror("PGSTAT: select(2)");
exit(1);
}
/*
* Check if there is a new statistics message to collect.
*/
- if (FD_ISSET(pgStatPipe[0], &rfds))
+ if (FD_ISSET(readPipe, &rfds))
{
/*
- * If this is the first message after we wrote the stats
- * file the last time, setup the timeout that it'd be
- * written.
+ * We may need to issue multiple read calls in case the
+ * buffer process didn't write the message in a single write,
+ * which is possible since it dumps its buffer bytewise.
+ * In any case, we'd need two reads since we don't know the
+ * message length initially.
*/
- if (!need_statwrite)
- {
- gettimeofday(&next_statwrite, NULL);
- next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
- next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
- next_statwrite.tv_usec %= 1000000;
- need_statwrite = TRUE;
- }
+ int nread = 0;
+ int targetlen = sizeof(PgStat_MsgHdr); /* initial */
- /*
- * Read the header.
- */
- len = read(pgStatPipe[0], &msg, sizeof(PgStat_MsgHdr));
- if (len < 0)
+ while (nread < targetlen)
{
- perror("PGSTAT: read(2)");
- exit(1);
- }
- if (len == 0)
- {
- return;
- }
- if (len != sizeof(PgStat_MsgHdr))
- {
- fprintf(stderr, "PGSTAT: short read(2)");
- exit(1);
- }
-
- /*
- * And the body. We need to do it in two steps because
- * we don't know the length.
- */
- dlen = msg.msg_hdr.m_size - sizeof(PgStat_MsgHdr);
- if (dlen > 0)
- {
- len = read(pgStatPipe[0],
- ((char *)&msg) + sizeof(PgStat_MsgHdr), dlen);
+ len = read(readPipe,
+ ((char *) &msg) + nread,
+ targetlen - nread);
if (len < 0)
{
+ if (errno == EINTR)
+ continue;
perror("PGSTAT: read(2)");
exit(1);
}
- if (len == 0)
- {
- return;
- }
- if (len != dlen)
+ if (len == 0) /* EOF on the pipe! */
+ break;
+ nread += len;
+ if (nread == sizeof(PgStat_MsgHdr))
{
- fprintf(stderr, "PGSTAT: short read(2)");
- exit(1);
+ /* we have the header, compute actual msg length */
+ targetlen = msg.msg_hdr.m_size;
+ if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
+ targetlen > (int) sizeof(msg))
+ {
+ /*
+ * Bogus message length implies that we got out
+ * of sync with the buffer process somehow.
+ * Abort so that we can restart both processes.
+ */
+ fprintf(stderr, "PGSTAT: bogus message length\n");
+ exit(1);
+ }
}
}
+ /*
+ * EOF on the pipe implies that the buffer process exited.
+ * Fall out of outer loop.
+ */
+ if (len == 0)
+ break;
/*
- * Distribute the message to the specific function
- * handling it.
+ * Distribute the message to the specific function handling it.
*/
- len += sizeof(PgStat_MsgHdr);
switch (msg.msg_hdr.m_type)
{
case PGSTAT_MTYPE_DUMMY:
break;
case PGSTAT_MTYPE_BESTART:
- pgstat_recv_bestart((PgStat_MsgBestart *)&msg, len);
+ pgstat_recv_bestart((PgStat_MsgBestart *)&msg, nread);
break;
case PGSTAT_MTYPE_BETERM:
- pgstat_recv_beterm((PgStat_MsgBeterm *)&msg, len);
+ pgstat_recv_beterm((PgStat_MsgBeterm *)&msg, nread);
break;
case PGSTAT_MTYPE_TABSTAT:
- pgstat_recv_tabstat((PgStat_MsgTabstat *)&msg, len);
+ pgstat_recv_tabstat((PgStat_MsgTabstat *)&msg, nread);
break;
case PGSTAT_MTYPE_TABPURGE:
- pgstat_recv_tabpurge((PgStat_MsgTabpurge *)&msg, len);
+ pgstat_recv_tabpurge((PgStat_MsgTabpurge *)&msg, nread);
break;
case PGSTAT_MTYPE_ACTIVITY:
- pgstat_recv_activity((PgStat_MsgActivity *)&msg, len);
+ pgstat_recv_activity((PgStat_MsgActivity *)&msg, nread);
break;
case PGSTAT_MTYPE_DROPDB:
- pgstat_recv_dropdb((PgStat_MsgDropdb *)&msg, len);
+ pgstat_recv_dropdb((PgStat_MsgDropdb *)&msg, nread);
break;
case PGSTAT_MTYPE_RESETCOUNTER:
pgstat_recv_resetcounter((PgStat_MsgResetcounter *)&msg,
- len);
+ nread);
break;
default:
}
/*
- * Globally count messages and start over.
+ * Globally count messages.
*/
pgStatNumMessages++;
- continue;
+
+ /*
+ * If this is the first message after we wrote the stats
+ * file the last time, setup the timeout that it'd be
+ * written.
+ */
+ if (!need_statwrite)
+ {
+ gettimeofday(&next_statwrite, NULL);
+ next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
+ next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
+ next_statwrite.tv_usec %= 1000000;
+ need_statwrite = TRUE;
+ }
}
/*
- * If the postmaster pipe is ready for reading this means that
- * the kernel must have closed it because of the termination
- * of the postmaster (he never really writes to it). Give up
- * then, but save the final stats in case we want to reuse
- * them at startup in the future.
+ * Note that we do NOT check for postmaster exit inside the loop;
+ * only EOF on the buffer pipe causes us to fall out. This ensures
+ * we don't exit prematurely if there are still a few messages in
+ * the buffer or pipe at postmaster shutdown.
*/
- if (FD_ISSET(pgStatPmPipe[0], &rfds))
- {
- pgstat_write_statsfile();
- return;
- }
}
+
+ /*
+ * Okay, we saw EOF on the buffer pipe, so there are no more messages to
+ * process. If the buffer process quit because of postmaster shutdown,
+ * we want to save the final stats to reuse at next startup. But if the
+ * buffer process failed, it seems best not to (there may even now be a
+ * new collector firing up, and we don't want it to read a partially-
+ * rewritten stats file). We can tell whether the postmaster is still
+ * alive by checking to see if the postmaster pipe is still open. If it
+ * is read-ready (ie, EOF), the postmaster must have quit.
+ */
+ if (FD_ISSET(pmPipe, &rfds))
+ pgstat_write_statsfile();
}
/* ----------
* pgstat_recvbuffer() -
*
- * This is a special receive buffer started by the statistics
- * collector itself and running in a separate process. It's only
+ * This is the body of the separate buffering process. Its only
* purpose is to receive messages from the UDP socket as fast as
- * possible and forward them over a pipe into the collector
- * itself.
+ * possible and forward them over a pipe into the collector itself.
+ * If the collector is slow to absorb messages, they are buffered here.
* ----------
*/
static void
{
fd_set rfds;
fd_set wfds;
+ int writePipe = pgStatPipe[1];
+ int pmPipe = pgStatPmPipe[0];
int maxfd;
int nready;
int len;
- PgStat_Msg *msgbuffer = NULL;
- int msg_recv = 0;
- int msg_send = 0;
- int msg_have = 0;
+ int xfr;
+ int frm;
+ PgStat_Msg input_buffer;
+ char *msgbuffer;
+ int msg_send = 0; /* next send index in buffer */
+ int msg_recv = 0; /* next receive index */
+ int msg_have = 0; /* number of bytes stored */
struct sockaddr_in fromaddr;
int fromlen;
- int overflow = 0;
+ bool overflow = false;
/*
* Identify myself via ps
init_ps_display(real_argc, real_argv, "stats buffer process", "", "");
set_ps_display("");
+ /*
+ * We want to die if our child collector process does. There are two ways
+ * we might notice that it has died: receive SIGCHLD, or get a write
+ * failure on the pipe leading to the child. We can set SIGPIPE to kill
+ * us here. Our SIGCHLD handler was already set up before we forked (must
+ * do it that way, else it's a race condition).
+ */
+ pqsignal(SIGPIPE, SIG_DFL);
+ PG_SETMASK(&UnBlockSig);
+
+ /*
+ * Set the write pipe to nonblock mode, so that we cannot block when
+ * the collector falls behind.
+ */
+ if (fcntl(writePipe, F_SETFL, O_NONBLOCK) < 0)
+ {
+ perror("PGSTATBUFF: fcntl(2)");
+ exit(1);
+ }
+
/*
* Allocate the message buffer
*/
- msgbuffer = (PgStat_Msg *)malloc(sizeof(PgStat_Msg) *
- PGSTAT_RECVBUFFERSZ);
+ msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ);
if (msgbuffer == NULL)
{
perror("PGSTATBUFF: malloc()");
* As long as we have buffer space we add the socket
* to the read descriptor set.
*/
- if (msg_have < PGSTAT_RECVBUFFERSZ)
+ if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
{
FD_SET(pgStatSock, &rfds);
maxfd = pgStatSock;
- overflow = 0;
+ overflow = false;
}
else
{
- if (overflow == 0)
+ if (!overflow)
{
- fprintf(stderr, "PGSTAT: Warning - receive buffer full\n");
- overflow = 1;
+ fprintf(stderr, "PGSTATBUFF: Warning - receive buffer full\n");
+ overflow = true;
}
}
-
/*
* If we have messages to write out, we add the pipe
* to the write descriptor set. Otherwise, we check if
*/
if (msg_have > 0)
{
- FD_SET(pgStatPipe[1], &wfds);
- if (pgStatPipe[1] > maxfd)
- maxfd = pgStatPipe[1];
+ FD_SET(writePipe, &wfds);
+ if (writePipe > maxfd)
+ maxfd = writePipe;
}
else
{
- FD_SET(pgStatPmPipe[0], &rfds);
- if (pgStatPmPipe[0] > maxfd)
- maxfd = pgStatPmPipe[0];
+ FD_SET(pmPipe, &rfds);
+ if (pmPipe > maxfd)
+ maxfd = pmPipe;
}
-
/*
* Wait for some work to do.
*/
nready = select(maxfd + 1, &rfds, &wfds, NULL, NULL);
if (nready < 0)
{
+ if (errno == EINTR)
+ continue;
perror("PGSTATBUFF: select(2)");
exit(1);
}
{
fromlen = sizeof(fromaddr);
len = recvfrom(pgStatSock,
- &msgbuffer[msg_recv], sizeof(PgStat_Msg), 0,
- (struct sockaddr *)&fromaddr, &fromlen);
+ &input_buffer, sizeof(PgStat_Msg), 0,
+ (struct sockaddr *) &fromaddr, &fromlen);
if (len < 0)
{
perror("PGSTATBUFF: recvfrom(2)");
/*
* The received length must match the length in the header
*/
- if (msgbuffer[msg_recv].msg_hdr.m_size != len)
+ if (input_buffer.msg_hdr.m_size != len)
continue;
/*
* The source address of the packet must be our own socket.
* This ensures that only real hackers or our own backends
- * tell us something.
+ * tell us something. (This should be redundant with a
+ * kernel-level check due to having used connect(), but
+ * let's do it anyway.)
*/
if (fromaddr.sin_addr.s_addr != pgStatAddr.sin_addr.s_addr)
continue;
continue;
/*
- * O.K. - we accept this message.
+ * O.K. - we accept this message. Copy it to the circular
+ * msgbuffer.
*/
- msg_have++;
- msg_recv++;
- if (msg_recv == PGSTAT_RECVBUFFERSZ)
- msg_recv = 0;
+ frm = 0;
+ while (len > 0)
+ {
+ xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
+ if (xfr > len)
+ xfr = len;
+ Assert(xfr > 0);
+ memcpy(msgbuffer + msg_recv,
+ ((char *) &input_buffer) + frm,
+ xfr);
+ msg_recv += xfr;
+ if (msg_recv == PGSTAT_RECVBUFFERSZ)
+ msg_recv = 0;
+ msg_have += xfr;
+ frm += xfr;
+ len -= xfr;
+ }
}
/*
- * If the collector is ready to receive, write a buffered
- * message into his pipe.
+ * If the collector is ready to receive, write some data into his
+ * pipe. We may or may not be able to write all that we have.
+ *
+ * NOTE: if what we have is less than PIPE_BUF bytes but more than
+ * the space available in the pipe buffer, most kernels will refuse
+ * to write any of it, and will return EAGAIN. This means we will
+ * busy-loop until the situation changes (either because the collector
+ * caught up, or because more data arrives so that we have more than
+ * PIPE_BUF bytes buffered). This is not good, but is there any way
+ * around it? We have no way to tell when the collector has
+ * caught up...
*/
- if (FD_ISSET(pgStatPipe[1], &wfds))
+ if (FD_ISSET(writePipe, &wfds))
{
- len = write(pgStatPipe[1], &msgbuffer[msg_send],
- msgbuffer[msg_send].msg_hdr.m_size);
+ xfr = PGSTAT_RECVBUFFERSZ - msg_send;
+ if (xfr > msg_have)
+ xfr = msg_have;
+ Assert(xfr > 0);
+ len = write(writePipe, msgbuffer + msg_send, xfr);
if (len < 0)
{
+ if (errno == EINTR || errno == EAGAIN)
+ continue; /* not enough space in pipe */
perror("PGSTATBUFF: write(2)");
exit(1);
}
- if (len != msgbuffer[msg_send].msg_hdr.m_size)
- {
- fprintf(stderr, "PGSTATBUFF: short write(2)");
- exit(1);
- }
-
- msg_have--;
- msg_send++;
+ /* NB: len < xfr is okay */
+ msg_send += len;
if (msg_send == PGSTAT_RECVBUFFERSZ)
msg_send = 0;
+ msg_have -= len;
}
/*
* Make sure we forwarded all messages before we check for
* Postmaster termination.
*/
- if (FD_ISSET(pgStatSock, &rfds) || FD_ISSET(pgStatPipe[1], &wfds))
+ if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
continue;
/*
* the kernel must have closed it on exit() (the postmaster
* never really writes to it). So we've done our job.
*/
- if (FD_ISSET(pgStatPmPipe[0], &rfds))
+ if (FD_ISSET(pmPipe, &rfds))
exit(0);
}
}
+static void
+pgstat_die(SIGNAL_ARGS)
+{
+ exit(1);
+}
+
/* ----------
* pgstat_add_backend() -
*
- * Support function to keep our backen list up to date.
+ * Support function to keep our backend list up to date.
* ----------
*/
static int
/*
* If the database is marked for destroy, this is a delayed
- * UDP packet and not worth beeing counted.
+ * UDP packet and not worth being counted.
*/
if (dbentry->destroy > 0)
return;