# be-fsstubs is here for historical reasons, probably belongs elsewhere
OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o pqcomm.o \
- pqformat.o pqsignal.o
+ pqformat.o pqmq.o pqsignal.o
ifeq ($(with_openssl),yes)
OBJS += be-secure-openssl.o
int Unix_socket_permissions;
char *Unix_socket_group;
-
/* Where the Unix socket files are (list of palloc'd strings) */
static List *sock_paths = NIL;
/* Internal functions */
-static void pq_close(int code, Datum arg);
+static void socket_comm_reset(void);
+static void socket_close(int code, Datum arg);
+static void socket_set_nonblocking(bool nonblocking);
+static int socket_flush(void);
+static int socket_flush_if_writable(void);
+static bool socket_is_send_pending(void);
+static int socket_putmessage(char msgtype, const char *s, size_t len);
+static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void socket_startcopyout(void);
+static void socket_endcopyout(bool errorAbort);
static int internal_putbytes(const char *s, size_t len);
static int internal_flush(void);
-static void pq_set_nonblocking(bool nonblocking);
+static void socket_set_nonblocking(bool nonblocking);
#ifdef HAVE_UNIX_SOCKETS
static int Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
static int Setup_AF_UNIX(char *sock_path);
#endif /* HAVE_UNIX_SOCKETS */
+PQcommMethods PQcommSocketMethods;
+
+static PQcommMethods PqCommSocketMethods = {
+ socket_comm_reset,
+ socket_flush,
+ socket_flush_if_writable,
+ socket_is_send_pending,
+ socket_putmessage,
+ socket_putmessage_noblock,
+ socket_startcopyout,
+ socket_endcopyout
+};
+
/* --------------------------------
* pq_init - initialize libpq at backend startup
void
pq_init(void)
{
+ PqCommMethods = &PqCommSocketMethods;
PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false;
DoingCopyOut = false;
- on_proc_exit(pq_close, 0);
+ on_proc_exit(socket_close, 0);
}
/* --------------------------------
- * pq_comm_reset - reset libpq during error recovery
+ * socket_comm_reset - reset libpq during error recovery
*
* This is called from error recovery at the outer idle loop. It's
* just to get us out of trouble if we somehow manage to elog() from
* inside a pqcomm.c routine (which ideally will never happen, but...)
* --------------------------------
*/
-void
-pq_comm_reset(void)
+static void
+socket_comm_reset(void)
{
/* Do not throw away pending data, but do reset the busy flag */
PqCommBusy = false;
}
/* --------------------------------
- * pq_close - shutdown libpq at backend exit
+ * socket_close - shutdown libpq at backend exit
*
* Note: in a standalone backend MyProcPort will be null,
* don't crash during exit...
* --------------------------------
*/
static void
-pq_close(int code, Datum arg)
+socket_close(int code, Datum arg)
{
if (MyProcPort != NULL)
{
*/
/* --------------------------------
- * pq_set_nonblocking - set socket blocking/non-blocking
+ * socket_set_nonblocking - set socket blocking/non-blocking
*
* Sets the socket non-blocking if nonblocking is TRUE, or sets it
* blocking otherwise.
* --------------------------------
*/
static void
-pq_set_nonblocking(bool nonblocking)
+socket_set_nonblocking(bool nonblocking)
{
+ if (MyProcPort == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+ errmsg("there is no client connection")));
+
if (MyProcPort->noblock == nonblocking)
return;
}
/* Ensure that we're in blocking mode */
- pq_set_nonblocking(false);
+ socket_set_nonblocking(false);
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
}
/* Put the socket into non-blocking mode */
- pq_set_nonblocking(true);
+ socket_set_nonblocking(true);
r = secure_read(MyProcPort, c, 1);
if (r < 0)
/* If buffer is full, then flush it out */
if (PqSendPointer >= PqSendBufferSize)
{
- pq_set_nonblocking(false);
+ socket_set_nonblocking(false);
if (internal_flush())
return EOF;
}
}
/* --------------------------------
- * pq_flush - flush pending output
+ * socket_flush - flush pending output
*
* returns 0 if OK, EOF if trouble
* --------------------------------
*/
-int
-pq_flush(void)
+static int
+socket_flush(void)
{
int res;
if (PqCommBusy)
return 0;
PqCommBusy = true;
- pq_set_nonblocking(false);
+ socket_set_nonblocking(false);
res = internal_flush();
PqCommBusy = false;
return res;
* Returns 0 if OK, or EOF if trouble.
* --------------------------------
*/
-int
-pq_flush_if_writable(void)
+static int
+socket_flush_if_writable(void)
{
int res;
return 0;
/* Temporarily put the socket into non-blocking mode */
- pq_set_nonblocking(true);
+ socket_set_nonblocking(true);
PqCommBusy = true;
res = internal_flush();
}
/* --------------------------------
- * pq_is_send_pending - is there any pending data in the output buffer?
+ * socket_is_send_pending - is there any pending data in the output buffer?
* --------------------------------
*/
-bool
-pq_is_send_pending(void)
+static bool
+socket_is_send_pending(void)
{
return (PqSendStart < PqSendPointer);
}
/* --------------------------------
- * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
+ * socket_putmessage - send a normal message (suppressed in COPY OUT mode)
*
* If msgtype is not '\0', it is a message type code to place before
* the message body. If msgtype is '\0', then the message has no type
* returns 0 if OK, EOF if trouble
* --------------------------------
*/
-int
-pq_putmessage(char msgtype, const char *s, size_t len)
+static int
+socket_putmessage(char msgtype, const char *s, size_t len)
{
if (DoingCopyOut || PqCommBusy)
return 0;
* If the output buffer is too small to hold the message, the buffer
* is enlarged.
*/
-void
-pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+static void
+socket_putmessage_noblock(char msgtype, const char *s, size_t len)
{
int res PG_USED_FOR_ASSERTS_ONLY;
int required;
/* --------------------------------
- * pq_startcopyout - inform libpq that an old-style COPY OUT transfer
+ * socket_startcopyout - inform libpq that an old-style COPY OUT transfer
* is beginning
* --------------------------------
*/
-void
-pq_startcopyout(void)
+static void
+socket_startcopyout(void)
{
DoingCopyOut = true;
}
/* --------------------------------
- * pq_endcopyout - end an old-style COPY OUT transfer
+ * socket_endcopyout - end an old-style COPY OUT transfer
*
* If errorAbort is indicated, we are aborting a COPY OUT due to an error,
* and must send a terminator line. Since a partial data line might have
* not allow binary transfers, so a textual terminator is always correct.
* --------------------------------
*/
-void
-pq_endcopyout(bool errorAbort)
+static void
+socket_endcopyout(bool errorAbort)
{
if (!DoingCopyOut)
return;
DoingCopyOut = false;
}
-
/*
* Support for TCP Keepalive parameters
*/
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.c
+ * Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/libpq/pqmq.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+
+static shm_mq *pq_mq;
+static shm_mq_handle *pq_mq_handle;
+static bool pq_mq_busy = false;
+
+static void mq_comm_reset(void);
+static int mq_flush(void);
+static int mq_flush_if_writable(void);
+static bool mq_is_send_pending(void);
+static int mq_putmessage(char msgtype, const char *s, size_t len);
+static void mq_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void mq_startcopyout(void);
+static void mq_endcopyout(bool errorAbort);
+
+static PQcommMethods PqCommMqMethods = {
+ mq_comm_reset,
+ mq_flush,
+ mq_flush_if_writable,
+ mq_is_send_pending,
+ mq_putmessage,
+ mq_putmessage_noblock,
+ mq_startcopyout,
+ mq_endcopyout
+};
+
+/*
+ * Arrange to redirect frontend/backend protocol messages to a shared-memory
+ * message queue.
+ */
+void
+pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
+{
+ PqCommMethods = &PqCommMqMethods;
+ pq_mq = mq;
+ pq_mq_handle = mqh;
+ whereToSendOutput = DestRemote;
+ FrontendProtocol = PG_PROTOCOL_LATEST;
+}
+
+static void
+mq_comm_reset(void)
+{
+ /* Nothing to do. */
+}
+
+static int
+mq_flush(void)
+{
+ /* Nothing to do. */
+ return 0;
+}
+
+static int
+mq_flush_if_writable(void)
+{
+ /* Nothing to do. */
+ return 0;
+}
+
+static bool
+mq_is_send_pending(void)
+{
+ /* There's never anything pending. */
+ return 0;
+}
+
+/*
+ * Transmit a libpq protocol message to the shared memory message queue
+ * selected via pq_mq_handle. We don't include a length word, because the
+ * receiver will know the length of the message from shm_mq_receive().
+ */
+static int
+mq_putmessage(char msgtype, const char *s, size_t len)
+{
+ shm_mq_iovec iov[2];
+ shm_mq_result result;
+
+ /*
+ * If we're sending a message, and we have to wait because the
+ * queue is full, and then we get interrupted, and that interrupt
+ * results in trying to send another message, we respond by detaching
+ * the queue. There's no way to return to the original context, but
+ * even if there were, just queueing the message would amount to
+ * indefinitely postponing the response to the interrupt. So we do
+ * this instead.
+ */
+ if (pq_mq_busy)
+ {
+ if (pq_mq != NULL)
+ shm_mq_detach(pq_mq);
+ pq_mq = NULL;
+ return EOF;
+ }
+
+ pq_mq_busy = true;
+
+ iov[0].data = &msgtype;
+ iov[0].len = 1;
+ iov[1].data = s;
+ iov[1].len = len;
+
+ Assert(pq_mq_handle != NULL);
+ result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+ pq_mq_busy = false;
+
+ Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
+ if (result != SHM_MQ_SUCCESS)
+ return EOF;
+ return 0;
+}
+
+static void
+mq_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+ /*
+ * While the shm_mq machinery does support sending a message in
+ * non-blocking mode, there's currently no way to try sending beginning
+ * to send the message that doesn't also commit us to completing the
+ * transmission. This could be improved in the future, but for now
+ * we don't need it.
+ */
+ elog(ERROR, "not currently supported");
+}
+
+static void
+mq_startcopyout(void)
+{
+ /* Nothing to do. */
+}
+
+static void
+mq_endcopyout(bool errorAbort)
+{
+ /* Nothing to do. */
+}
+
+/*
+ * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
+ * structure with the results.
+ */
+void
+pq_parse_errornotice(StringInfo msg, ErrorData *edata)
+{
+ /* Initialize edata with reasonable defaults. */
+ MemSet(edata, 0, sizeof(ErrorData));
+ edata->elevel = ERROR;
+ edata->assoc_context = CurrentMemoryContext;
+
+ /* Loop over fields and extract each one. */
+ for (;;)
+ {
+ char code = pq_getmsgbyte(msg);
+ const char *value;
+
+ if (code == '\0')
+ {
+ pq_getmsgend(msg);
+ break;
+ }
+ value = pq_getmsgstring(msg);
+
+ switch (code)
+ {
+ case PG_DIAG_SEVERITY:
+ if (strcmp(value, "DEBUG") == 0)
+ edata->elevel = DEBUG1; /* or some other DEBUG level */
+ else if (strcmp(value, "LOG") == 0)
+ edata->elevel = LOG; /* can't be COMMERROR */
+ else if (strcmp(value, "INFO") == 0)
+ edata->elevel = INFO;
+ else if (strcmp(value, "NOTICE") == 0)
+ edata->elevel = NOTICE;
+ else if (strcmp(value, "WARNING") == 0)
+ edata->elevel = WARNING;
+ else if (strcmp(value, "ERROR") == 0)
+ edata->elevel = ERROR;
+ else if (strcmp(value, "FATAL") == 0)
+ edata->elevel = FATAL;
+ else if (strcmp(value, "PANIC") == 0)
+ edata->elevel = PANIC;
+ else
+ elog(ERROR, "unknown error severity");
+ break;
+ case PG_DIAG_SQLSTATE:
+ if (strlen(value) != 5)
+ elog(ERROR, "malformed sql state");
+ edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
+ value[3], value[4]);
+ break;
+ case PG_DIAG_MESSAGE_PRIMARY:
+ edata->message = pstrdup(value);
+ break;
+ case PG_DIAG_MESSAGE_DETAIL:
+ edata->detail = pstrdup(value);
+ break;
+ case PG_DIAG_MESSAGE_HINT:
+ edata->hint = pstrdup(value);
+ break;
+ case PG_DIAG_STATEMENT_POSITION:
+ edata->cursorpos = pg_atoi(value, sizeof(int), '\0');
+ break;
+ case PG_DIAG_INTERNAL_POSITION:
+ edata->internalpos = pg_atoi(value, sizeof(int), '\0');
+ break;
+ case PG_DIAG_INTERNAL_QUERY:
+ edata->internalquery = pstrdup(value);
+ break;
+ case PG_DIAG_CONTEXT:
+ edata->context = pstrdup(value);
+ break;
+ case PG_DIAG_SCHEMA_NAME:
+ edata->schema_name = pstrdup(value);
+ break;
+ case PG_DIAG_TABLE_NAME:
+ edata->table_name = pstrdup(value);
+ break;
+ case PG_DIAG_COLUMN_NAME:
+ edata->column_name = pstrdup(value);
+ break;
+ case PG_DIAG_DATATYPE_NAME:
+ edata->datatype_name = pstrdup(value);
+ break;
+ case PG_DIAG_CONSTRAINT_NAME:
+ edata->constraint_name = pstrdup(value);
+ break;
+ case PG_DIAG_SOURCE_FILE:
+ edata->filename = pstrdup(value);
+ break;
+ case PG_DIAG_SOURCE_LINE:
+ edata->lineno = pg_atoi(value, sizeof(int), '\0');
+ break;
+ case PG_DIAG_SOURCE_FUNCTION:
+ edata->funcname = pstrdup(value);
+ break;
+ default:
+ elog(ERROR, "unknown error field: %d", (int) code);
+ break;
+ }
+ }
+}
* overflow.
*/
int32
-pg_atoi(char *s, int size, int c)
+pg_atoi(const char *s, int size, int c)
{
long l;
char *badp;
MemoryContextResetAndDeleteChildren(ErrorContext);
}
+/*
+ * ThrowErrorData --- report an error described by an ErrorData structure
+ *
+ * This is intended to be used to re-report errors originally thrown by
+ * background worker processes and then propagated (with or without
+ * modification) to the backend responsible for them.
+ */
+void
+ThrowErrorData(ErrorData *edata)
+{
+ ErrorData *newedata;
+ MemoryContext oldcontext;
+
+ if (!errstart(edata->elevel, edata->filename, edata->lineno,
+ edata->funcname, NULL))
+ return;
+
+ newedata = &errordata[errordata_stack_depth];
+ oldcontext = MemoryContextSwitchTo(edata->assoc_context);
+
+ /* Copy the supplied fields to the error stack. */
+ if (edata->sqlerrcode > 0)
+ newedata->sqlerrcode = edata->sqlerrcode;
+ if (edata->message)
+ newedata->message = pstrdup(edata->message);
+ if (edata->detail)
+ newedata->detail = pstrdup(edata->detail);
+ if (edata->detail_log)
+ newedata->detail_log = pstrdup(edata->detail_log);
+ if (edata->hint)
+ newedata->hint = pstrdup(edata->hint);
+ if (edata->context)
+ newedata->context = pstrdup(edata->context);
+ if (edata->schema_name)
+ newedata->schema_name = pstrdup(edata->schema_name);
+ if (edata->table_name)
+ newedata->table_name = pstrdup(edata->table_name);
+ if (edata->column_name)
+ newedata->column_name = pstrdup(edata->column_name);
+ if (edata->datatype_name)
+ newedata->datatype_name = pstrdup(edata->datatype_name);
+ if (edata->constraint_name)
+ newedata->constraint_name = pstrdup(edata->constraint_name);
+ if (edata->internalquery)
+ newedata->internalquery = pstrdup(edata->internalquery);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ errfinish(0);
+}
+
/*
* ReThrowError --- re-throw a previously copied error
*
} u;
} PQArgBlock;
+typedef struct
+{
+ void (*comm_reset)(void);
+ int (*flush)(void);
+ int (*flush_if_writable)(void);
+ bool (*is_send_pending)(void);
+ int (*putmessage)(char msgtype, const char *s, size_t len);
+ void (*putmessage_noblock)(char msgtype, const char *s, size_t len);
+ void (*startcopyout)(void);
+ void (*endcopyout)(bool errorAbort);
+} PQcommMethods;
+
+PQcommMethods *PqCommMethods;
+
+#define pq_comm_reset() (PqCommMethods->comm_reset())
+#define pq_flush() (PqCommMethods->flush())
+#define pq_flush_if_writable() (PqCommMethods->flush_if_writable())
+#define pq_is_send_pending() (PqCommMethods->is_send_pending())
+#define pq_putmessage(msgtype, s, len) \
+ (PqCommMethods->putmessage(msgtype, s, len))
+#define pq_putmessage_noblock(msgtype, s, len) \
+ (PqCommMethods->putmessage(msgtype, s, len))
+#define pq_startcopyout() (PqCommMethods->startcopyout())
+#define pq_endcopyout(errorAbort) (PqCommMethods->endcopyout(errorAbort))
+
/*
* External functions.
*/
extern void StreamClose(pgsocket sock);
extern void TouchSocketFiles(void);
extern void pq_init(void);
-extern void pq_comm_reset(void);
extern int pq_getbytes(char *s, size_t len);
extern int pq_getstring(StringInfo s);
extern int pq_getmessage(StringInfo s, int maxlen);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
-extern int pq_flush(void);
-extern int pq_flush_if_writable(void);
-extern bool pq_is_send_pending(void);
-extern int pq_putmessage(char msgtype, const char *s, size_t len);
-extern void pq_putmessage_noblock(char msgtype, const char *s, size_t len);
-extern void pq_startcopyout(void);
-extern void pq_endcopyout(bool errorAbort);
/*
* prototypes for functions in be-secure.c
extern char *ssl_ca_file;
extern char *ssl_crl_file;
+extern int (*pq_putmessage_hook)(char msgtype, const char *s, size_t len);
+extern int (*pq_flush_hook)(void);
+
extern int secure_initialize(void);
extern bool secure_loaded_verify_locations(void);
extern void secure_destroy(void);
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.h
+ * Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/pqmq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PQMQ_H
+#define PQMQ_H
+
+#include "storage/shm_mq.h"
+
+extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+
+extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
+
+#endif /* PQMQ_H */
extern Datum current_schemas(PG_FUNCTION_ARGS);
/* numutils.c */
-extern int32 pg_atoi(char *s, int size, int c);
+extern int32 pg_atoi(const char *s, int size, int c);
extern void pg_itoa(int16 i, char *a);
extern void pg_ltoa(int32 l, char *a);
extern void pg_lltoa(int64 ll, char *a);
extern void FreeErrorData(ErrorData *edata);
extern void FlushErrorState(void);
extern void ReThrowError(ErrorData *edata) __attribute__((noreturn));
+extern void ThrowErrorData(ErrorData *edata);
extern void pg_re_throw(void) __attribute__((noreturn));
extern char *GetErrorContextStack(void);