Support frontend-backend protocol communication using a shm_mq.
authorRobert Haas <rhaas@postgresql.org>
Fri, 31 Oct 2014 16:02:40 +0000 (12:02 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 31 Oct 2014 16:02:40 +0000 (12:02 -0400)
A background worker can use pq_redirect_to_shm_mq() to direct protocol
that would normally be sent to the frontend to a shm_mq so that another
process may read them.

The receiving process may use pq_parse_errornotice() to parse an
ErrorResponse or NoticeResponse from the background worker and, if
it wishes, ThrowErrorData() to propagate the error (with or without
further modification).

Patch by me.  Review by Andres Freund.

src/backend/libpq/Makefile
src/backend/libpq/pqcomm.c
src/backend/libpq/pqmq.c [new file with mode: 0644]
src/backend/utils/adt/numutils.c
src/backend/utils/error/elog.c
src/include/libpq/libpq.h
src/include/libpq/pqmq.h [new file with mode: 0644]
src/include/utils/builtins.h
src/include/utils/elog.h

index 8be0572b5b76789ada91f395ee24a544a27461de..09410c4bb19d3e36c79f22d5e836a2ecc32933e3 100644 (file)
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 # be-fsstubs is here for historical reasons, probably belongs elsewhere
 
 OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o pqcomm.o \
-       pqformat.o pqsignal.o
+       pqformat.o pqmq.o pqsignal.o
 
 ifeq ($(with_openssl),yes)
 OBJS += be-secure-openssl.o
index 605d8913b16d2487c0f4f6c4a9d4baf6cc2635b8..dcbb704c6a59b3abc65e197e5c6cbe38f5c7ec40 100644 (file)
 int            Unix_socket_permissions;
 char      *Unix_socket_group;
 
-
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
 
@@ -134,16 +133,38 @@ static bool DoingCopyOut;
 
 
 /* Internal functions */
-static void pq_close(int code, Datum arg);
+static void socket_comm_reset(void);
+static void socket_close(int code, Datum arg);
+static void socket_set_nonblocking(bool nonblocking);
+static int socket_flush(void);
+static int socket_flush_if_writable(void);
+static bool socket_is_send_pending(void);
+static int socket_putmessage(char msgtype, const char *s, size_t len);
+static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void socket_startcopyout(void);
+static void socket_endcopyout(bool errorAbort);
 static int internal_putbytes(const char *s, size_t len);
 static int internal_flush(void);
-static void pq_set_nonblocking(bool nonblocking);
+static void socket_set_nonblocking(bool nonblocking);
 
 #ifdef HAVE_UNIX_SOCKETS
 static int Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
 static int Setup_AF_UNIX(char *sock_path);
 #endif   /* HAVE_UNIX_SOCKETS */
 
+PQcommMethods PQcommSocketMethods;
+
+static PQcommMethods PqCommSocketMethods = {
+   socket_comm_reset,
+   socket_flush,
+   socket_flush_if_writable,
+   socket_is_send_pending,
+   socket_putmessage,
+   socket_putmessage_noblock,
+   socket_startcopyout,
+   socket_endcopyout
+};
+
 
 /* --------------------------------
  *     pq_init - initialize libpq at backend startup
@@ -152,24 +173,25 @@ static int    Setup_AF_UNIX(char *sock_path);
 void
 pq_init(void)
 {
+   PqCommMethods = &PqCommSocketMethods;
    PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
    PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
    PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
    PqCommBusy = false;
    DoingCopyOut = false;
-   on_proc_exit(pq_close, 0);
+   on_proc_exit(socket_close, 0);
 }
 
 /* --------------------------------
- *     pq_comm_reset - reset libpq during error recovery
+ *     socket_comm_reset - reset libpq during error recovery
  *
  * This is called from error recovery at the outer idle loop.  It's
  * just to get us out of trouble if we somehow manage to elog() from
  * inside a pqcomm.c routine (which ideally will never happen, but...)
  * --------------------------------
  */
-void
-pq_comm_reset(void)
+static void
+socket_comm_reset(void)
 {
    /* Do not throw away pending data, but do reset the busy flag */
    PqCommBusy = false;
@@ -178,14 +200,14 @@ pq_comm_reset(void)
 }
 
 /* --------------------------------
- *     pq_close - shutdown libpq at backend exit
+ *     socket_close - shutdown libpq at backend exit
  *
  * Note: in a standalone backend MyProcPort will be null,
  * don't crash during exit...
  * --------------------------------
  */
 static void
-pq_close(int code, Datum arg)
+socket_close(int code, Datum arg)
 {
    if (MyProcPort != NULL)
    {
@@ -783,15 +805,20 @@ TouchSocketFiles(void)
  */
 
 /* --------------------------------
- *           pq_set_nonblocking - set socket blocking/non-blocking
+ *           socket_set_nonblocking - set socket blocking/non-blocking
  *
  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
  * blocking otherwise.
  * --------------------------------
  */
 static void
-pq_set_nonblocking(bool nonblocking)
+socket_set_nonblocking(bool nonblocking)
 {
+   if (MyProcPort == NULL)
+       ereport(ERROR,
+               (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+                errmsg("there is no client connection")));
+
    if (MyProcPort->noblock == nonblocking)
        return;
 
@@ -844,7 +871,7 @@ pq_recvbuf(void)
    }
 
    /* Ensure that we're in blocking mode */
-   pq_set_nonblocking(false);
+   socket_set_nonblocking(false);
 
    /* Can fill buffer from PqRecvLength and upwards */
    for (;;)
@@ -935,7 +962,7 @@ pq_getbyte_if_available(unsigned char *c)
    }
 
    /* Put the socket into non-blocking mode */
-   pq_set_nonblocking(true);
+   socket_set_nonblocking(true);
 
    r = secure_read(MyProcPort, c, 1);
    if (r < 0)
@@ -1194,7 +1221,7 @@ internal_putbytes(const char *s, size_t len)
        /* If buffer is full, then flush it out */
        if (PqSendPointer >= PqSendBufferSize)
        {
-           pq_set_nonblocking(false);
+           socket_set_nonblocking(false);
            if (internal_flush())
                return EOF;
        }
@@ -1210,13 +1237,13 @@ internal_putbytes(const char *s, size_t len)
 }
 
 /* --------------------------------
- *     pq_flush        - flush pending output
+ *     socket_flush        - flush pending output
  *
  *     returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_flush(void)
+static int
+socket_flush(void)
 {
    int         res;
 
@@ -1224,7 +1251,7 @@ pq_flush(void)
    if (PqCommBusy)
        return 0;
    PqCommBusy = true;
-   pq_set_nonblocking(false);
+   socket_set_nonblocking(false);
    res = internal_flush();
    PqCommBusy = false;
    return res;
@@ -1310,8 +1337,8 @@ internal_flush(void)
  * Returns 0 if OK, or EOF if trouble.
  * --------------------------------
  */
-int
-pq_flush_if_writable(void)
+static int
+socket_flush_if_writable(void)
 {
    int         res;
 
@@ -1324,7 +1351,7 @@ pq_flush_if_writable(void)
        return 0;
 
    /* Temporarily put the socket into non-blocking mode */
-   pq_set_nonblocking(true);
+   socket_set_nonblocking(true);
 
    PqCommBusy = true;
    res = internal_flush();
@@ -1333,11 +1360,11 @@ pq_flush_if_writable(void)
 }
 
 /* --------------------------------
- *     pq_is_send_pending  - is there any pending data in the output buffer?
+ * socket_is_send_pending  - is there any pending data in the output buffer?
  * --------------------------------
  */
-bool
-pq_is_send_pending(void)
+static bool
+socket_is_send_pending(void)
 {
    return (PqSendStart < PqSendPointer);
 }
@@ -1351,7 +1378,7 @@ pq_is_send_pending(void)
 
 
 /* --------------------------------
- *     pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
+ *     socket_putmessage - send a normal message (suppressed in COPY OUT mode)
  *
  *     If msgtype is not '\0', it is a message type code to place before
  *     the message body.  If msgtype is '\0', then the message has no type
@@ -1375,8 +1402,8 @@ pq_is_send_pending(void)
  *     returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_putmessage(char msgtype, const char *s, size_t len)
+static int
+socket_putmessage(char msgtype, const char *s, size_t len)
 {
    if (DoingCopyOut || PqCommBusy)
        return 0;
@@ -1408,8 +1435,8 @@ fail:
  *     If the output buffer is too small to hold the message, the buffer
  *     is enlarged.
  */
-void
-pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+static void
+socket_putmessage_noblock(char msgtype, const char *s, size_t len)
 {
    int res     PG_USED_FOR_ASSERTS_ONLY;
    int         required;
@@ -1431,18 +1458,18 @@ pq_putmessage_noblock(char msgtype, const char *s, size_t len)
 
 
 /* --------------------------------
- *     pq_startcopyout - inform libpq that an old-style COPY OUT transfer
+ *     socket_startcopyout - inform libpq that an old-style COPY OUT transfer
  *         is beginning
  * --------------------------------
  */
-void
-pq_startcopyout(void)
+static void
+socket_startcopyout(void)
 {
    DoingCopyOut = true;
 }
 
 /* --------------------------------
- *     pq_endcopyout   - end an old-style COPY OUT transfer
+ *     socket_endcopyout   - end an old-style COPY OUT transfer
  *
  *     If errorAbort is indicated, we are aborting a COPY OUT due to an error,
  *     and must send a terminator line.  Since a partial data line might have
@@ -1451,8 +1478,8 @@ pq_startcopyout(void)
  *     not allow binary transfers, so a textual terminator is always correct.
  * --------------------------------
  */
-void
-pq_endcopyout(bool errorAbort)
+static void
+socket_endcopyout(bool errorAbort)
 {
    if (!DoingCopyOut)
        return;
@@ -1462,7 +1489,6 @@ pq_endcopyout(bool errorAbort)
    DoingCopyOut = false;
 }
 
-
 /*
  * Support for TCP Keepalive parameters
  */
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
new file mode 100644 (file)
index 0000000..6e6b429
--- /dev/null
@@ -0,0 +1,261 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.c
+ *   Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/libpq/pqmq.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+
+static shm_mq *pq_mq;
+static shm_mq_handle *pq_mq_handle;
+static bool pq_mq_busy = false;
+
+static void mq_comm_reset(void);
+static int mq_flush(void);
+static int mq_flush_if_writable(void);
+static bool mq_is_send_pending(void);
+static int mq_putmessage(char msgtype, const char *s, size_t len);
+static void mq_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void mq_startcopyout(void);
+static void mq_endcopyout(bool errorAbort);
+
+static PQcommMethods PqCommMqMethods = {
+   mq_comm_reset,
+   mq_flush,
+   mq_flush_if_writable,
+   mq_is_send_pending,
+   mq_putmessage,
+   mq_putmessage_noblock,
+   mq_startcopyout,
+   mq_endcopyout
+};
+
+/*
+ * Arrange to redirect frontend/backend protocol messages to a shared-memory
+ * message queue.
+ */
+void
+pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
+{
+   PqCommMethods = &PqCommMqMethods;
+   pq_mq = mq;
+   pq_mq_handle = mqh;
+   whereToSendOutput = DestRemote;
+   FrontendProtocol = PG_PROTOCOL_LATEST;
+}
+
+static void
+mq_comm_reset(void)
+{
+   /* Nothing to do. */
+}
+
+static int
+mq_flush(void)
+{
+   /* Nothing to do. */
+   return 0;
+}
+
+static int
+mq_flush_if_writable(void)
+{
+   /* Nothing to do. */
+   return 0;
+}
+
+static bool
+mq_is_send_pending(void)
+{
+   /* There's never anything pending. */
+   return 0;
+}
+
+/*
+ * Transmit a libpq protocol message to the shared memory message queue
+ * selected via pq_mq_handle.  We don't include a length word, because the
+ * receiver will know the length of the message from shm_mq_receive().
+ */
+static int
+mq_putmessage(char msgtype, const char *s, size_t len)
+{
+   shm_mq_iovec    iov[2];
+   shm_mq_result   result;
+
+   /*
+    * If we're sending a message, and we have to wait because the
+    * queue is full, and then we get interrupted, and that interrupt
+    * results in trying to send another message, we respond by detaching
+    * the queue.  There's no way to return to the original context, but
+    * even if there were, just queueing the message would amount to
+    * indefinitely postponing the response to the interrupt.  So we do
+    * this instead.
+    */
+   if (pq_mq_busy)
+   {
+       if (pq_mq != NULL)
+           shm_mq_detach(pq_mq);
+       pq_mq = NULL;
+       return EOF;
+   }
+
+   pq_mq_busy = true;
+
+   iov[0].data = &msgtype;
+   iov[0].len = 1;
+   iov[1].data = s;
+   iov[1].len = len;
+
+   Assert(pq_mq_handle != NULL);
+   result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+   pq_mq_busy = false;
+
+   Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
+   if (result != SHM_MQ_SUCCESS)
+       return EOF;
+   return 0;
+}
+
+static void
+mq_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+   /*
+    * While the shm_mq machinery does support sending a message in
+    * non-blocking mode, there's currently no way to try sending beginning
+    * to send the message that doesn't also commit us to completing the
+    * transmission.  This could be improved in the future, but for now
+    * we don't need it.
+    */
+   elog(ERROR, "not currently supported");
+}
+
+static void
+mq_startcopyout(void)
+{
+   /* Nothing to do. */
+}
+
+static void
+mq_endcopyout(bool errorAbort)
+{
+   /* Nothing to do. */
+}
+
+/*
+ * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
+ * structure with the results.
+ */
+void
+pq_parse_errornotice(StringInfo msg, ErrorData *edata)
+{
+   /* Initialize edata with reasonable defaults. */
+   MemSet(edata, 0, sizeof(ErrorData));
+   edata->elevel = ERROR;
+   edata->assoc_context = CurrentMemoryContext;
+
+   /* Loop over fields and extract each one. */
+   for (;;)
+   {
+       char    code = pq_getmsgbyte(msg);
+       const char *value;
+
+       if (code == '\0')
+       {
+           pq_getmsgend(msg);
+           break;
+       }
+       value = pq_getmsgstring(msg);
+
+       switch (code)
+       {
+           case PG_DIAG_SEVERITY:
+               if (strcmp(value, "DEBUG") == 0)
+                   edata->elevel = DEBUG1; /* or some other DEBUG level */
+               else if (strcmp(value, "LOG") == 0)
+                   edata->elevel = LOG;    /* can't be COMMERROR */
+               else if (strcmp(value, "INFO") == 0)
+                   edata->elevel = INFO;
+               else if (strcmp(value, "NOTICE") == 0)
+                   edata->elevel = NOTICE;
+               else if (strcmp(value, "WARNING") == 0)
+                   edata->elevel = WARNING;
+               else if (strcmp(value, "ERROR") == 0)
+                   edata->elevel = ERROR;
+               else if (strcmp(value, "FATAL") == 0)
+                   edata->elevel = FATAL;
+               else if (strcmp(value, "PANIC") == 0)
+                   edata->elevel = PANIC;
+               else
+                   elog(ERROR, "unknown error severity");
+               break;
+           case PG_DIAG_SQLSTATE:
+               if (strlen(value) != 5)
+                   elog(ERROR, "malformed sql state");
+               edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
+                                                 value[3], value[4]);
+               break;
+           case PG_DIAG_MESSAGE_PRIMARY:
+               edata->message = pstrdup(value);
+               break;
+           case PG_DIAG_MESSAGE_DETAIL:
+               edata->detail = pstrdup(value);
+               break;
+           case PG_DIAG_MESSAGE_HINT:
+               edata->hint = pstrdup(value);
+               break;
+           case PG_DIAG_STATEMENT_POSITION:
+               edata->cursorpos = pg_atoi(value, sizeof(int), '\0');
+               break;
+           case PG_DIAG_INTERNAL_POSITION:
+               edata->internalpos = pg_atoi(value, sizeof(int), '\0');
+               break;
+           case PG_DIAG_INTERNAL_QUERY:
+               edata->internalquery = pstrdup(value);
+               break;
+           case PG_DIAG_CONTEXT:
+               edata->context = pstrdup(value);
+               break;
+           case PG_DIAG_SCHEMA_NAME:
+               edata->schema_name = pstrdup(value);
+               break;
+           case PG_DIAG_TABLE_NAME:
+               edata->table_name = pstrdup(value);
+               break;
+           case PG_DIAG_COLUMN_NAME:
+               edata->column_name = pstrdup(value);
+               break;
+           case PG_DIAG_DATATYPE_NAME:
+               edata->datatype_name = pstrdup(value);
+               break;
+           case PG_DIAG_CONSTRAINT_NAME:
+               edata->constraint_name = pstrdup(value);
+               break;
+           case PG_DIAG_SOURCE_FILE:
+               edata->filename = pstrdup(value);
+               break;
+           case PG_DIAG_SOURCE_LINE:
+               edata->lineno = pg_atoi(value, sizeof(int), '\0');
+               break;
+           case PG_DIAG_SOURCE_FUNCTION:
+               edata->funcname = pstrdup(value);
+               break;
+           default:
+               elog(ERROR, "unknown error field: %d", (int) code);
+               break;
+       }
+   }
+}
index ca5a8a576cb7b1385ec8eec0f7df3f1a5074b308..1d133634e1123a177257cadce18d7de879d63cc9 100644 (file)
@@ -34,7 +34,7 @@
  * overflow.
  */
 int32
-pg_atoi(char *s, int size, int c)
+pg_atoi(const char *s, int size, int c)
 {
    long        l;
    char       *badp;
index 32a9663366d9e1479e6108bba8631f5e22cca184..231646481916c13a44889833044681f418c80110 100644 (file)
@@ -1576,6 +1576,57 @@ FlushErrorState(void)
    MemoryContextResetAndDeleteChildren(ErrorContext);
 }
 
+/*
+ * ThrowErrorData --- report an error described by an ErrorData structure
+ *
+ * This is intended to be used to re-report errors originally thrown by
+ * background worker processes and then propagated (with or without
+ * modification) to the backend responsible for them.
+ */
+void
+ThrowErrorData(ErrorData *edata)
+{
+   ErrorData *newedata;
+   MemoryContext   oldcontext;
+
+   if (!errstart(edata->elevel, edata->filename, edata->lineno,
+                 edata->funcname, NULL))
+       return;
+
+   newedata = &errordata[errordata_stack_depth];
+   oldcontext = MemoryContextSwitchTo(edata->assoc_context);
+
+   /* Copy the supplied fields to the error stack. */
+   if (edata->sqlerrcode > 0)
+       newedata->sqlerrcode = edata->sqlerrcode;
+   if (edata->message)
+       newedata->message = pstrdup(edata->message);
+   if (edata->detail)
+       newedata->detail = pstrdup(edata->detail);
+   if (edata->detail_log)
+       newedata->detail_log = pstrdup(edata->detail_log);
+   if (edata->hint)
+       newedata->hint = pstrdup(edata->hint);
+   if (edata->context)
+       newedata->context = pstrdup(edata->context);
+   if (edata->schema_name)
+       newedata->schema_name = pstrdup(edata->schema_name);
+   if (edata->table_name)
+       newedata->table_name = pstrdup(edata->table_name);
+   if (edata->column_name)
+       newedata->column_name = pstrdup(edata->column_name);
+   if (edata->datatype_name)
+       newedata->datatype_name = pstrdup(edata->datatype_name);
+   if (edata->constraint_name)
+       newedata->constraint_name = pstrdup(edata->constraint_name);
+   if (edata->internalquery)
+       newedata->internalquery = pstrdup(edata->internalquery);
+
+   MemoryContextSwitchTo(oldcontext);
+
+   errfinish(0);
+}
+
 /*
  * ReThrowError --- re-throw a previously copied error
  *
index 5da9d8d4f52b9ade54765c819fd80b8e8dd385ae..409f3d7786e27d88265ea03da98594615356c170 100644 (file)
@@ -37,6 +37,31 @@ typedef struct
    }           u;
 } PQArgBlock;
 
+typedef struct
+{
+   void (*comm_reset)(void);
+   int (*flush)(void);
+   int (*flush_if_writable)(void);
+   bool (*is_send_pending)(void);
+   int (*putmessage)(char msgtype, const char *s, size_t len);
+   void (*putmessage_noblock)(char msgtype, const char *s, size_t len);
+   void (*startcopyout)(void);
+   void (*endcopyout)(bool errorAbort);
+} PQcommMethods;
+
+PQcommMethods *PqCommMethods;
+
+#define pq_comm_reset()    (PqCommMethods->comm_reset())
+#define pq_flush() (PqCommMethods->flush())
+#define pq_flush_if_writable() (PqCommMethods->flush_if_writable())
+#define pq_is_send_pending() (PqCommMethods->is_send_pending())
+#define pq_putmessage(msgtype, s, len) \
+   (PqCommMethods->putmessage(msgtype, s, len))
+#define pq_putmessage_noblock(msgtype, s, len) \
+   (PqCommMethods->putmessage(msgtype, s, len))
+#define pq_startcopyout() (PqCommMethods->startcopyout())
+#define pq_endcopyout(errorAbort) (PqCommMethods->endcopyout(errorAbort))
+
 /*
  * External functions.
  */
@@ -51,7 +76,6 @@ extern int    StreamConnection(pgsocket server_fd, Port *port);
 extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void pq_init(void);
-extern void pq_comm_reset(void);
 extern int pq_getbytes(char *s, size_t len);
 extern int pq_getstring(StringInfo s);
 extern int pq_getmessage(StringInfo s, int maxlen);
@@ -59,13 +83,6 @@ extern int   pq_getbyte(void);
 extern int pq_peekbyte(void);
 extern int pq_getbyte_if_available(unsigned char *c);
 extern int pq_putbytes(const char *s, size_t len);
-extern int pq_flush(void);
-extern int pq_flush_if_writable(void);
-extern bool pq_is_send_pending(void);
-extern int pq_putmessage(char msgtype, const char *s, size_t len);
-extern void pq_putmessage_noblock(char msgtype, const char *s, size_t len);
-extern void pq_startcopyout(void);
-extern void pq_endcopyout(bool errorAbort);
 
 /*
  * prototypes for functions in be-secure.c
@@ -75,6 +92,9 @@ extern char *ssl_key_file;
 extern char *ssl_ca_file;
 extern char *ssl_crl_file;
 
+extern int (*pq_putmessage_hook)(char msgtype, const char *s, size_t len);
+extern int  (*pq_flush_hook)(void);
+
 extern int secure_initialize(void);
 extern bool secure_loaded_verify_locations(void);
 extern void secure_destroy(void);
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
new file mode 100644 (file)
index 0000000..6bb24d9
--- /dev/null
@@ -0,0 +1,22 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.h
+ *   Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/pqmq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PQMQ_H
+#define PQMQ_H
+
+#include "storage/shm_mq.h"
+
+extern void    pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+
+extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
+
+#endif   /* PQMQ_H */
index fb1b4a42ddb68f05de1ed89ab873d6fa5a42bc4e..4e74d85d22895b1fa0eed3b10aed87bc4ff98d8c 100644 (file)
@@ -285,7 +285,7 @@ extern Datum current_schema(PG_FUNCTION_ARGS);
 extern Datum current_schemas(PG_FUNCTION_ARGS);
 
 /* numutils.c */
-extern int32 pg_atoi(char *s, int size, int c);
+extern int32 pg_atoi(const char *s, int size, int c);
 extern void pg_itoa(int16 i, char *a);
 extern void pg_ltoa(int32 l, char *a);
 extern void pg_lltoa(int64 ll, char *a);
index 92073be0ca54b1f3a938f617855992dac43ecb79..87438b86445480a8179b03ac770b0bfa839fc7e1 100644 (file)
@@ -415,6 +415,7 @@ extern ErrorData *CopyErrorData(void);
 extern void FreeErrorData(ErrorData *edata);
 extern void FlushErrorState(void);
 extern void ReThrowError(ErrorData *edata) __attribute__((noreturn));
+extern void ThrowErrorData(ErrorData *edata);
 extern void pg_re_throw(void) __attribute__((noreturn));
 
 extern char *GetErrorContextStack(void);