<para>
Collects all the input values, including nulls, into an array.
</para></entry>
- <entry>No</entry>
+ <entry>Yes</entry>
</row>
<row>
dimension. (The inputs must all have the same dimensionality, and
cannot be empty or null.)
</para></entry>
- <entry>No</entry>
+ <entry>Yes</entry>
</row>
<row>
after the first is preceded by the
corresponding <parameter>delimiter</parameter> (if it's not null).
</para></entry>
- <entry>No</entry>
+ <entry>Yes</entry>
</row>
<row>
* functions; if not, we can't serialize partial-aggregation
* results.
*/
- else if (transinfo->aggtranstype == INTERNALOID &&
- (!OidIsValid(transinfo->serialfn_oid) ||
- !OidIsValid(transinfo->deserialfn_oid)))
- root->hasNonSerialAggs = true;
+ else if (transinfo->aggtranstype == INTERNALOID)
+ {
+
+ if (!OidIsValid(transinfo->serialfn_oid) ||
+ !OidIsValid(transinfo->deserialfn_oid))
+ root->hasNonSerialAggs = true;
+
+ /*
+ * array_agg_serialize and array_agg_deserialize make use
+ * of the aggregate non-byval input type's send and
+ * receive functions. There's a chance that the type
+ * being aggregated has one or both of these functions
+ * missing. In this case we must not allow the
+ * aggregate's serial and deserial functions to be used.
+ * It would be nice not to have special case this and
+ * instead provide some sort of supporting function within
+ * the aggregate to do this, but for now, that seems like
+ * overkill for this one case.
+ */
+ if ((transinfo->serialfn_oid == F_ARRAY_AGG_SERIALIZE ||
+ transinfo->deserialfn_oid == F_ARRAY_AGG_DESERIALIZE) &&
+ !agg_args_support_sendreceive(aggref))
+ root->hasNonSerialAggs = true;
+ }
}
}
agginfo->transno = transno;
*/
#include "postgres.h"
+#include "access/htup_details.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_type.h"
#include "rewrite/rewriteManip.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
-
+#include "utils/syscache.h"
typedef struct
{
return aggtranstype;
}
+/*
+ * agg_args_support_sendreceive
+ * Returns true if all non-byval of aggref's arg types have send and
+ * receive functions.
+ */
+bool
+agg_args_support_sendreceive(Aggref *aggref)
+{
+ ListCell *lc;
+
+ foreach(lc, aggref->args)
+ {
+ HeapTuple typeTuple;
+ Form_pg_type pt;
+ TargetEntry *tle = (TargetEntry *) lfirst(lc);
+ Oid type = exprType((Node *) tle->expr);
+
+ typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
+ if (!HeapTupleIsValid(typeTuple))
+ elog(ERROR, "cache lookup failed for type %u", type);
+
+ pt = (Form_pg_type) GETSTRUCT(typeTuple);
+
+ if (!pt->typbyval &&
+ (!OidIsValid(pt->typsend) || !OidIsValid(pt->typreceive)))
+ {
+ ReleaseSysCache(typeTuple);
+ return false;
+ }
+ ReleaseSysCache(typeTuple);
+ }
+ return true;
+}
+
/*
* Create an expression tree for the transition function of an aggregate.
* This is needed so that polymorphic functions can be used within an
#include "postgres.h"
#include "catalog/pg_type.h"
+#include "libpq/pqformat.h"
#include "common/int.h"
+#include "port/pg_bitutils.h"
#include "utils/array.h"
+#include "utils/datum.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/typcache.h"
+/*
+ * SerialIOData
+ * Used for caching element-type data in array_agg_serialize
+ */
+typedef struct SerialIOData
+{
+ FmgrInfo typsend;
+} SerialIOData;
+
+/*
+ * DeserialIOData
+ * Used for caching element-type data in array_agg_deserialize
+ */
+typedef struct DeserialIOData
+{
+ FmgrInfo typreceive;
+ Oid typioparam;
+} DeserialIOData;
static Datum array_position_common(FunctionCallInfo fcinfo);
PG_RETURN_POINTER(state);
}
+Datum
+array_agg_combine(PG_FUNCTION_ARGS)
+{
+ ArrayBuildState *state1;
+ ArrayBuildState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ {
+ /*
+ * NULL state2 is easy, just return state1, which we know is already
+ * in the agg_context
+ */
+ if (state1 == NULL)
+ PG_RETURN_NULL();
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state1 == NULL)
+ {
+ /* We must copy state2's data into the agg_context */
+ state1 = initArrayResultWithSize(state2->element_type, agg_context,
+ false, state2->alen);
+
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ for (int i = 0; i < state2->nelems; i++)
+ {
+ if (!state2->dnulls[i])
+ state1->dvalues[i] = datumCopy(state2->dvalues[i],
+ state1->typbyval,
+ state1->typlen);
+ else
+ state1->dvalues[i] = (Datum) 0;
+ }
+
+ MemoryContextSwitchTo(old_context);
+
+ memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * state2->nelems);
+
+ state1->nelems = state2->nelems;
+
+ PG_RETURN_POINTER(state1);
+ }
+ else if (state2->nelems > 0)
+ {
+ /* We only need to combine the two states if state2 has any elements */
+ int reqsize = state1->nelems + state2->nelems;
+ MemoryContext oldContext = MemoryContextSwitchTo(state1->mcontext);
+
+ Assert(state1->element_type == state2->element_type);
+
+ /* Enlarge state1 arrays if needed */
+ if (state1->alen < reqsize)
+ {
+ /* Use a power of 2 size rather than allocating just reqsize */
+ state1->alen = pg_nextpower2_32(reqsize);
+ state1->dvalues = (Datum *) repalloc(state1->dvalues,
+ state1->alen * sizeof(Datum));
+ state1->dnulls = (bool *) repalloc(state1->dnulls,
+ state1->alen * sizeof(bool));
+ }
+
+ /* Copy in the state2 elements to the end of the state1 arrays */
+ for (int i = 0; i < state2->nelems; i++)
+ {
+ if (!state2->dnulls[i])
+ state1->dvalues[i + state1->nelems] =
+ datumCopy(state2->dvalues[i],
+ state1->typbyval,
+ state1->typlen);
+ else
+ state1->dvalues[i + state1->nelems] = (Datum) 0;
+ }
+
+ memcpy(&state1->dnulls[state1->nelems], state2->dnulls,
+ sizeof(bool) * state2->nelems);
+
+ state1->nelems = reqsize;
+
+ MemoryContextSwitchTo(oldContext);
+ }
+
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * array_agg_serialize
+ * Serialize ArrayBuildState into bytea.
+ */
+Datum
+array_agg_serialize(PG_FUNCTION_ARGS)
+{
+ ArrayBuildState *state;
+ StringInfoData buf;
+ bytea *result;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ state = (ArrayBuildState *) PG_GETARG_POINTER(0);
+
+ pq_begintypsend(&buf);
+
+ /*
+ * element_type. Putting this first is more convenient in deserialization
+ */
+ pq_sendint32(&buf, state->element_type);
+
+ /*
+ * nelems -- send first so we know how large to make the dvalues and
+ * dnulls array during deserialization.
+ */
+ pq_sendint64(&buf, state->nelems);
+
+ /* alen can be decided during deserialization */
+
+ /* typlen */
+ pq_sendint16(&buf, state->typlen);
+
+ /* typbyval */
+ pq_sendbyte(&buf, state->typbyval);
+
+ /* typalign */
+ pq_sendbyte(&buf, state->typalign);
+
+ /* dnulls */
+ pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * state->nelems);
+
+ /*
+ * dvalues. By agreement with array_agg_deserialize, when the element
+ * type is byval, we just transmit the Datum array as-is, including any
+ * null elements. For by-ref types, we must invoke the element type's
+ * send function, and we skip null elements (which is why the nulls flags
+ * must be sent first).
+ */
+ if (state->typbyval)
+ pq_sendbytes(&buf, (char *) state->dvalues,
+ sizeof(Datum) * state->nelems);
+ else
+ {
+ SerialIOData *iodata;
+ int i;
+
+ /* Avoid repeat catalog lookups for typsend function */
+ iodata = (SerialIOData *) fcinfo->flinfo->fn_extra;
+ if (iodata == NULL)
+ {
+ Oid typsend;
+ bool typisvarlena;
+
+ iodata = (SerialIOData *)
+ MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+ sizeof(SerialIOData));
+ getTypeBinaryOutputInfo(state->element_type, &typsend,
+ &typisvarlena);
+ fmgr_info_cxt(typsend, &iodata->typsend,
+ fcinfo->flinfo->fn_mcxt);
+ fcinfo->flinfo->fn_extra = (void *) iodata;
+ }
+
+ for (i = 0; i < state->nelems; i++)
+ {
+ bytea *outputbytes;
+
+ if (state->dnulls[i])
+ continue;
+ outputbytes = SendFunctionCall(&iodata->typsend,
+ state->dvalues[i]);
+ pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
+ pq_sendbytes(&buf, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
+ }
+ }
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+array_agg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate;
+ ArrayBuildState *result;
+ StringInfoData buf;
+ Oid element_type;
+ int64 nelems;
+ const char *temp;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ sstate = PG_GETARG_BYTEA_PP(0);
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard recv-function infrastructure.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf,
+ VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+ /* element_type */
+ element_type = pq_getmsgint(&buf, 4);
+
+ /* nelems */
+ nelems = pq_getmsgint64(&buf);
+
+ /* Create output ArrayBuildState with the needed number of elements */
+ result = initArrayResultWithSize(element_type, CurrentMemoryContext,
+ false, nelems);
+ result->nelems = nelems;
+
+ /* typlen */
+ result->typlen = pq_getmsgint(&buf, 2);
+
+ /* typbyval */
+ result->typbyval = pq_getmsgbyte(&buf);
+
+ /* typalign */
+ result->typalign = pq_getmsgbyte(&buf);
+
+ /* dnulls */
+ temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems);
+ memcpy(result->dnulls, temp, sizeof(bool) * nelems);
+
+ /* dvalues --- see comment in array_agg_serialize */
+ if (result->typbyval)
+ {
+ temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems);
+ memcpy(result->dvalues, temp, sizeof(Datum) * nelems);
+ }
+ else
+ {
+ DeserialIOData *iodata;
+
+ /* Avoid repeat catalog lookups for typreceive function */
+ iodata = (DeserialIOData *) fcinfo->flinfo->fn_extra;
+ if (iodata == NULL)
+ {
+ Oid typreceive;
+
+ iodata = (DeserialIOData *)
+ MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+ sizeof(DeserialIOData));
+ getTypeBinaryInputInfo(element_type, &typreceive,
+ &iodata->typioparam);
+ fmgr_info_cxt(typreceive, &iodata->typreceive,
+ fcinfo->flinfo->fn_mcxt);
+ fcinfo->flinfo->fn_extra = (void *) iodata;
+ }
+
+ for (int i = 0; i < nelems; i++)
+ {
+ int itemlen;
+ StringInfoData elem_buf;
+ char csave;
+
+ if (result->dnulls[i])
+ {
+ result->dvalues[i] = (Datum) 0;
+ continue;
+ }
+
+ itemlen = pq_getmsgint(&buf, 4);
+ if (itemlen < 0 || itemlen > (buf.len - buf.cursor))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("insufficient data left in message")));
+
+ /*
+ * Rather than copying data around, we just set up a phony
+ * StringInfo pointing to the correct portion of the input buffer.
+ * We assume we can scribble on the input buffer so as to maintain
+ * the convention that StringInfos have a trailing null.
+ */
+ elem_buf.data = &buf.data[buf.cursor];
+ elem_buf.maxlen = itemlen + 1;
+ elem_buf.len = itemlen;
+ elem_buf.cursor = 0;
+
+ buf.cursor += itemlen;
+
+ csave = buf.data[buf.cursor];
+ buf.data[buf.cursor] = '\0';
+
+ /* Now call the element's receiveproc */
+ result->dvalues[i] = ReceiveFunctionCall(&iodata->typreceive,
+ &elem_buf,
+ iodata->typioparam,
+ -1);
+
+ buf.data[buf.cursor] = csave;
+ }
+ }
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
Datum
array_agg_finalfn(PG_FUNCTION_ARGS)
{
PG_RETURN_POINTER(state);
}
+Datum
+array_agg_array_combine(PG_FUNCTION_ARGS)
+{
+ ArrayBuildStateArr *state1;
+ ArrayBuildStateArr *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ {
+ /*
+ * NULL state2 is easy, just return state1, which we know is already
+ * in the agg_context
+ */
+ if (state1 == NULL)
+ PG_RETURN_NULL();
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state1 == NULL)
+ {
+ /* We must copy state2's data into the agg_context */
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = initArrayResultArr(state2->array_type, InvalidOid,
+ agg_context, false);
+
+ state1->abytes = state2->abytes;
+ state1->data = (char *) palloc(state1->abytes);
+
+ if (state2->nullbitmap)
+ {
+ int size = (state2->aitems + 7) / 8;
+
+ state1->nullbitmap = (bits8 *) palloc(size);
+ memcpy(state1->nullbitmap, state2->nullbitmap, size);
+ }
+
+ memcpy(state1->data, state2->data, state2->nbytes);
+ state1->nbytes = state2->nbytes;
+ state1->aitems = state2->aitems;
+ state1->nitems = state2->nitems;
+ state1->ndims = state2->ndims;
+ memcpy(state1->dims, state2->dims, sizeof(state2->dims));
+ memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs));
+ state1->array_type = state2->array_type;
+ state1->element_type = state2->element_type;
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ /* We only need to combine the two states if state2 has any items */
+ else if (state2->nitems > 0)
+ {
+ MemoryContext oldContext;
+ int reqsize = state1->nbytes + state2->nbytes;
+ int i;
+
+ /*
+ * Check the states are compatible with each other. Ensure we use the
+ * same error messages that are listed in accumArrayResultArr so that
+ * the same error is shown as would have been if we'd not used the
+ * combine function for the aggregation.
+ */
+ if (state1->ndims != state2->ndims)
+ ereport(ERROR,
+ (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
+ errmsg("cannot accumulate arrays of different dimensionality")));
+
+ /* Check dimensions match ignoring the first dimension. */
+ for (i = 1; i < state1->ndims; i++)
+ {
+ if (state1->dims[i] != state2->dims[i] || state1->lbs[i] != state2->lbs[i])
+ ereport(ERROR,
+ (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
+ errmsg("cannot accumulate arrays of different dimensionality")));
+ }
+
+
+ oldContext = MemoryContextSwitchTo(state1->mcontext);
+
+ /*
+ * If there's not enough space in state1 then we'll need to reallocate
+ * more.
+ */
+ if (state1->abytes < reqsize)
+ {
+ /* use a power of 2 size rather than allocating just reqsize */
+ state1->abytes = pg_nextpower2_32(reqsize);
+ state1->data = (char *) repalloc(state1->data, state1->abytes);
+ }
+
+ if (state2->nullbitmap)
+ {
+ int newnitems = state1->nitems + state2->nitems;
+
+ if (state1->nullbitmap == NULL)
+ {
+ /*
+ * First input with nulls; we must retrospectively handle any
+ * previous inputs by marking all their items non-null.
+ */
+ state1->aitems = pg_nextpower2_32(Max(256, newnitems + 1));
+ state1->nullbitmap = (bits8 *) palloc((state1->aitems + 7) / 8);
+ array_bitmap_copy(state1->nullbitmap, 0,
+ NULL, 0,
+ state1->nitems);
+ }
+ else if (newnitems > state1->aitems)
+ {
+ int newaitems = state1->aitems + state2->aitems;
+
+ state1->aitems = pg_nextpower2_32(newaitems);
+ state1->nullbitmap = (bits8 *)
+ repalloc(state1->nullbitmap, (state1->aitems + 7) / 8);
+ }
+ array_bitmap_copy(state1->nullbitmap, state1->nitems,
+ state2->nullbitmap, 0,
+ state2->nitems);
+ }
+
+ memcpy(state1->data + state1->nbytes, state2->data, state2->nbytes);
+ state1->nbytes += state2->nbytes;
+ state1->nitems += state2->nitems;
+
+ state1->dims[0] += state2->dims[0];
+ /* remaing dims already match, per test above */
+
+ Assert(state1->array_type == state2->array_type);
+ Assert(state1->element_type == state2->element_type);
+
+ MemoryContextSwitchTo(oldContext);
+ }
+
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * array_agg_array_serialize
+ * Serialize ArrayBuildStateArr into bytea.
+ */
+Datum
+array_agg_array_serialize(PG_FUNCTION_ARGS)
+{
+ ArrayBuildStateArr *state;
+ StringInfoData buf;
+ bytea *result;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0);
+
+ pq_begintypsend(&buf);
+
+ /*
+ * element_type. Putting this first is more convenient in deserialization
+ * so that we can init the new state sooner.
+ */
+ pq_sendint32(&buf, state->element_type);
+
+ /* array_type */
+ pq_sendint32(&buf, state->array_type);
+
+ /* nbytes */
+ pq_sendint32(&buf, state->nbytes);
+
+ /* data */
+ pq_sendbytes(&buf, state->data, state->nbytes);
+
+ /* abytes */
+ pq_sendint32(&buf, state->abytes);
+
+ /* aitems */
+ pq_sendint32(&buf, state->aitems);
+
+ /* nullbitmap */
+ if (state->nullbitmap)
+ {
+ Assert(state->aitems > 0);
+ pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 7) / 8);
+ }
+
+ /* nitems */
+ pq_sendint32(&buf, state->nitems);
+
+ /* ndims */
+ pq_sendint32(&buf, state->ndims);
+
+ /* dims: XXX should we just send ndims elements? */
+ pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims));
+
+ /* lbs */
+ pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs));
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+array_agg_array_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate;
+ ArrayBuildStateArr *result;
+ StringInfoData buf;
+ Oid element_type;
+ Oid array_type;
+ int nbytes;
+ const char *temp;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ sstate = PG_GETARG_BYTEA_PP(0);
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard recv-function infrastructure.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf,
+ VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+ /* element_type */
+ element_type = pq_getmsgint(&buf, 4);
+
+ /* array_type */
+ array_type = pq_getmsgint(&buf, 4);
+
+ /* nbytes */
+ nbytes = pq_getmsgint(&buf, 4);
+
+ result = initArrayResultArr(array_type, element_type,
+ CurrentMemoryContext, false);
+
+ result->abytes = 1024;
+ while (result->abytes < nbytes)
+ result->abytes *= 2;
+
+ result->data = (char *) palloc(result->abytes);
+
+ /* data */
+ temp = pq_getmsgbytes(&buf, nbytes);
+ memcpy(result->data, temp, nbytes);
+ result->nbytes = nbytes;
+
+ /* abytes */
+ result->abytes = pq_getmsgint(&buf, 4);
+
+ /* aitems: might be 0 */
+ result->aitems = pq_getmsgint(&buf, 4);
+
+ /* nullbitmap */
+ if (result->aitems > 0)
+ {
+ int size = (result->aitems + 7) / 8;
+
+ result->nullbitmap = (bits8 *) palloc(size);
+ temp = pq_getmsgbytes(&buf, size);
+ memcpy(result->nullbitmap, temp, size);
+ }
+ else
+ result->nullbitmap = NULL;
+
+ /* nitems */
+ result->nitems = pq_getmsgint(&buf, 4);
+
+ /* ndims */
+ result->ndims = pq_getmsgint(&buf, 4);
+
+ /* dims */
+ temp = pq_getmsgbytes(&buf, sizeof(result->dims));
+ memcpy(result->dims, temp, sizeof(result->dims));
+
+ /* lbs */
+ temp = pq_getmsgbytes(&buf, sizeof(result->lbs));
+ memcpy(result->lbs, temp, sizeof(result->lbs));
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
Datum
array_agg_array_finalfn(PG_FUNCTION_ARGS)
{
*/
ArrayBuildState *
initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext)
+{
+ /*
+ * When using a subcontext, we can afford to start with a somewhat larger
+ * initial array size. Without subcontexts, we'd better hope that most of
+ * the states stay small ...
+ */
+ return initArrayResultWithSize(element_type, rcontext, subcontext,
+ subcontext ? 64 : 8);
+}
+
+/*
+ * initArrayResultWithSize
+ * As initArrayResult, but allow the initial size of the allocated arrays
+ * to be specified.
+ */
+ArrayBuildState *
+initArrayResultWithSize(Oid element_type, MemoryContext rcontext,
+ bool subcontext, int initsize)
{
ArrayBuildState *astate;
MemoryContext arr_context = rcontext;
MemoryContextAlloc(arr_context, sizeof(ArrayBuildState));
astate->mcontext = arr_context;
astate->private_cxt = subcontext;
- astate->alen = (subcontext ? 64 : 8); /* arbitrary starting array size */
+ astate->alen = initsize;
astate->dvalues = (Datum *)
MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum));
astate->dnulls = (bool *)
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
- /* Append the value unless null. */
+ /* Append the value unless null, preceding it with the delimiter. */
if (!PG_ARGISNULL(1))
{
bytea *value = PG_GETARG_BYTEA_PP(1);
+ bool isfirst = false;
- /* On the first time through, we ignore the delimiter. */
+ /*
+ * You might think we can just throw away the first delimiter, however
+ * we must keep it as we may be a parallel worker doing partial
+ * aggregation building a state to send to the main process. We need
+ * to keep the delimiter of every aggregation so that the combine
+ * function can properly join up the strings of two separately
+ * partially aggregated results. The first delimiter is only stripped
+ * off in the final function. To know how much to strip off the front
+ * of the string, we store the length of the first delimiter in the
+ * StringInfo's cursor field, which we don't otherwise need here.
+ */
if (state == NULL)
+ {
state = makeStringAggState(fcinfo);
- else if (!PG_ARGISNULL(2))
+ isfirst = true;
+ }
+
+ if (!PG_ARGISNULL(2))
{
bytea *delim = PG_GETARG_BYTEA_PP(2);
- appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim));
+ appendBinaryStringInfo(state, VARDATA_ANY(delim),
+ VARSIZE_ANY_EXHDR(delim));
+ if (isfirst)
+ state->cursor = VARSIZE_ANY_EXHDR(delim);
}
- appendBinaryStringInfo(state, VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value));
+ appendBinaryStringInfo(state, VARDATA_ANY(value),
+ VARSIZE_ANY_EXHDR(value));
}
/*
* The transition type for string_agg() is declared to be "internal",
* which is a pass-by-value type the same size as a pointer.
*/
- PG_RETURN_POINTER(state);
+ if (state)
+ PG_RETURN_POINTER(state);
+ PG_RETURN_NULL();
}
Datum
if (state != NULL)
{
+ /* As per comment in transfn, strip data before the cursor position */
bytea *result;
+ int strippedlen = state->len - state->cursor;
- result = (bytea *) palloc(state->len + VARHDRSZ);
- SET_VARSIZE(result, state->len + VARHDRSZ);
- memcpy(VARDATA(result), state->data, state->len);
+ result = (bytea *) palloc(strippedlen + VARHDRSZ);
+ SET_VARSIZE(result, strippedlen + VARHDRSZ);
+ memcpy(VARDATA(result), &state->data[state->cursor], strippedlen);
PG_RETURN_BYTEA_P(result);
}
else
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
- /* Append the value unless null. */
+ /* Append the value unless null, preceding it with the delimiter. */
if (!PG_ARGISNULL(1))
{
- /* On the first time through, we ignore the delimiter. */
+ text *value = PG_GETARG_TEXT_PP(1);
+ bool isfirst = false;
+
+ /*
+ * You might think we can just throw away the first delimiter, however
+ * we must keep it as we may be a parallel worker doing partial
+ * aggregation building a state to send to the main process. We need
+ * to keep the delimiter of every aggregation so that the combine
+ * function can properly join up the strings of two separately
+ * partially aggregated results. The first delimiter is only stripped
+ * off in the final function. To know how much to strip off the front
+ * of the string, we store the length of the first delimiter in the
+ * StringInfo's cursor field, which we don't otherwise need here.
+ */
if (state == NULL)
+ {
state = makeStringAggState(fcinfo);
- else if (!PG_ARGISNULL(2))
- appendStringInfoText(state, PG_GETARG_TEXT_PP(2)); /* delimiter */
+ isfirst = true;
+ }
- appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */
+ if (!PG_ARGISNULL(2))
+ {
+ text *delim = PG_GETARG_TEXT_PP(2);
+
+ appendStringInfoText(state, delim);
+ if (isfirst)
+ state->cursor = VARSIZE_ANY_EXHDR(delim);
+ }
+
+ appendStringInfoText(state, value);
}
/*
* The transition type for string_agg() is declared to be "internal",
* which is a pass-by-value type the same size as a pointer.
*/
- PG_RETURN_POINTER(state);
+ if (state)
+ PG_RETURN_POINTER(state);
+ PG_RETURN_NULL();
+}
+
+/*
+ * string_agg_combine
+ * Aggregate combine function for string_agg(text) and string_agg(bytea)
+ */
+Datum
+string_agg_combine(PG_FUNCTION_ARGS)
+{
+ StringInfo state1;
+ StringInfo state2;
+ MemoryContext agg_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ {
+ /*
+ * NULL state2 is easy, just return state1, which we know is already
+ * in the agg_context
+ */
+ if (state1 == NULL)
+ PG_RETURN_NULL();
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state1 == NULL)
+ {
+ /* We must copy state2's data into the agg_context */
+ MemoryContext old_context;
+
+ old_context = MemoryContextSwitchTo(agg_context);
+ state1 = makeStringAggState(fcinfo);
+ appendBinaryStringInfo(state1, state2->data, state2->len);
+ state1->cursor = state2->cursor;
+ MemoryContextSwitchTo(old_context);
+ }
+ else if (state2->len > 0)
+ {
+ /* Combine ... state1->cursor does not change in this case */
+ appendBinaryStringInfo(state1, state2->data, state2->len);
+ }
+
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * string_agg_serialize
+ * Aggregate serialize function for string_agg(text) and string_agg(bytea)
+ *
+ * This is strict, so we need not handle NULL input
+ */
+Datum
+string_agg_serialize(PG_FUNCTION_ARGS)
+{
+ StringInfo state;
+ StringInfoData buf;
+ bytea *result;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ state = (StringInfo) PG_GETARG_POINTER(0);
+
+ pq_begintypsend(&buf);
+
+ /* cursor */
+ pq_sendint(&buf, state->cursor, 4);
+
+ /* data */
+ pq_sendbytes(&buf, state->data, state->len);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * string_agg_deserialize
+ * Aggregate deserial function for string_agg(text) and string_agg(bytea)
+ *
+ * This is strict, so we need not handle NULL input
+ */
+Datum
+string_agg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate;
+ StringInfo result;
+ StringInfoData buf;
+ char *data;
+ int datalen;
+
+ /* cannot be called directly because of internal-type argument */
+ Assert(AggCheckCallContext(fcinfo, NULL));
+
+ sstate = PG_GETARG_BYTEA_PP(0);
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard recv-function infrastructure.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf,
+ VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+ result = makeStringAggState(fcinfo);
+
+ /* cursor */
+ result->cursor = pq_getmsgint(&buf, 4);
+
+ /* data */
+ datalen = VARSIZE_ANY_EXHDR(sstate) - 4;
+ data = (char *) pq_getmsgbytes(&buf, datalen);
+ appendBinaryStringInfo(result, data, datalen);
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
}
Datum
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
if (state != NULL)
- PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len));
+ {
+ /* As per comment in transfn, strip data before the cursor position */
+ PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor],
+ state->len - state->cursor));
+ }
else
PG_RETURN_NULL();
}
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202301201
+#define CATALOG_VERSION_NO 202301231
#endif
# array
{ aggfnoid => 'array_agg(anynonarray)', aggtransfn => 'array_agg_transfn',
- aggfinalfn => 'array_agg_finalfn', aggfinalextra => 't',
- aggtranstype => 'internal' },
+ aggcombinefn => 'array_agg_combine', aggserialfn => 'array_agg_serialize',
+ aggdeserialfn => 'array_agg_deserialize', aggfinalfn => 'array_agg_finalfn',
+ aggfinalextra => 't', aggtranstype => 'internal' },
{ aggfnoid => 'array_agg(anyarray)', aggtransfn => 'array_agg_array_transfn',
+ aggcombinefn => 'array_agg_array_combine',
+ aggserialfn => 'array_agg_array_serialize',
+ aggdeserialfn => 'array_agg_array_deserialize',
aggfinalfn => 'array_agg_array_finalfn', aggfinalextra => 't',
aggtranstype => 'internal' },
# text
{ aggfnoid => 'string_agg(text,text)', aggtransfn => 'string_agg_transfn',
+ aggcombinefn => 'string_agg_combine', aggserialfn => 'string_agg_serialize',
+ aggdeserialfn => 'string_agg_deserialize',
aggfinalfn => 'string_agg_finalfn', aggtranstype => 'internal' },
# bytea
{ aggfnoid => 'string_agg(bytea,bytea)',
aggtransfn => 'bytea_string_agg_transfn',
+ aggcombinefn => 'string_agg_combine',
+ aggserialfn => 'string_agg_serialize',
+ aggdeserialfn => 'string_agg_deserialize',
aggfinalfn => 'bytea_string_agg_finalfn', aggtranstype => 'internal' },
# range
{ oid => '2333', descr => 'aggregate transition function',
proname => 'array_agg_transfn', proisstrict => 'f', prorettype => 'internal',
proargtypes => 'internal anynonarray', prosrc => 'array_agg_transfn' },
+{ oid => '9328', descr => 'aggregate combine function',
+ proname => 'array_agg_combine', proisstrict => 'f', prorettype => 'internal',
+ proargtypes => 'internal internal', prosrc => 'array_agg_combine' },
+{ oid => '9329', descr => 'aggregate serial function',
+ proname => 'array_agg_serialize', prorettype => 'bytea',
+ proargtypes => 'internal', prosrc => 'array_agg_serialize' },
+{ oid => '9330', descr => 'aggregate deserial function',
+ proname => 'array_agg_deserialize', prorettype => 'internal',
+ proargtypes => 'bytea internal', prosrc => 'array_agg_deserialize' },
{ oid => '2334', descr => 'aggregate final function',
proname => 'array_agg_finalfn', proisstrict => 'f', prorettype => 'anyarray',
proargtypes => 'internal anynonarray', prosrc => 'array_agg_finalfn' },
proname => 'array_agg_array_transfn', proisstrict => 'f',
prorettype => 'internal', proargtypes => 'internal anyarray',
prosrc => 'array_agg_array_transfn' },
+{ oid => '9331', descr => 'aggregate combine function',
+ proname => 'array_agg_array_combine', proisstrict => 'f', prorettype => 'internal',
+ proargtypes => 'internal internal', prosrc => 'array_agg_array_combine' },
+{ oid => '9332', descr => 'aggregate serial function',
+ proname => 'array_agg_array_serialize', prorettype => 'bytea',
+ proargtypes => 'internal', prosrc => 'array_agg_array_serialize' },
+{ oid => '9333', descr => 'aggregate deserial function',
+ proname => 'array_agg_array_deserialize', prorettype => 'internal',
+ proargtypes => 'bytea internal', prosrc => 'array_agg_array_deserialize' },
{ oid => '4052', descr => 'aggregate final function',
proname => 'array_agg_array_finalfn', proisstrict => 'f',
prorettype => 'anyarray', proargtypes => 'internal anyarray',
{ oid => '3535', descr => 'aggregate transition function',
proname => 'string_agg_transfn', proisstrict => 'f', prorettype => 'internal',
proargtypes => 'internal text text', prosrc => 'string_agg_transfn' },
+{ oid => '9334', descr => 'aggregate combine function',
+ proname => 'string_agg_combine', proisstrict => 'f', prorettype => 'internal',
+ proargtypes => 'internal internal', prosrc => 'string_agg_combine' },
+{ oid => '9335', descr => 'aggregate serial function',
+ proname => 'string_agg_serialize', prorettype => 'bytea',
+ proargtypes => 'internal', prosrc => 'string_agg_serialize' },
+{ oid => '9336', descr => 'aggregate deserial function',
+ proname => 'string_agg_deserialize', prorettype => 'internal',
+ proargtypes => 'bytea internal', prosrc => 'string_agg_deserialize' },
{ oid => '3536', descr => 'aggregate final function',
proname => 'string_agg_finalfn', proisstrict => 'f', prorettype => 'text',
proargtypes => 'internal', prosrc => 'string_agg_finalfn' },
Oid *inputTypes,
int numArguments);
+extern bool agg_args_support_sendreceive(Aggref *aggref);
+
extern void build_aggregate_transfn_expr(Oid *agg_input_types,
int agg_num_inputs,
int agg_num_direct_inputs,
extern ArrayBuildState *initArrayResult(Oid element_type,
MemoryContext rcontext, bool subcontext);
+extern ArrayBuildState *initArrayResultWithSize(Oid element_type,
+ MemoryContext rcontext,
+ bool subcontext, int initsize);
extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate,
Datum dvalue, bool disnull,
Oid element_type,
(1 row)
drop table bytea_test_table;
+-- Test parallel string_agg and array_agg
+create table pagg_test (x int, y int);
+insert into pagg_test
+select (case x % 4 when 1 then null else x end), x % 10
+from generate_series(1,5000) x;
+set parallel_setup_cost TO 0;
+set parallel_tuple_cost TO 0;
+set parallel_leader_participation TO 0;
+set min_parallel_table_scan_size = 0;
+set bytea_output = 'escape';
+-- create a view as we otherwise have to repeat this query a few times.
+create view v_pagg_test AS
+select
+ y,
+ min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct,
+ min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct,
+ min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct,
+ min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct
+from (
+ select
+ y,
+ unnest(regexp_split_to_array(a1.t, ','))::int AS t,
+ unnest(regexp_split_to_array(a1.b::text, ',')) AS b,
+ unnest(a1.a) AS a,
+ unnest(a1.aa) AS aa
+ from (
+ select
+ y,
+ string_agg(x::text, ',') AS t,
+ string_agg(x::text::bytea, ',') AS b,
+ array_agg(x) AS a,
+ array_agg(ARRAY[x]) AS aa
+ from pagg_test
+ group by y
+ ) a1
+) a2
+group by y;
+-- Ensure results are correct.
+select * from v_pagg_test order by y;
+ y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct
+---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
+ 0 | 10 | 5000 | 500 | 10 | 990 | 500 | 10 | 5000 | 500 | 10 | 5000 | 500
+ 1 | 11 | 4991 | 250 | 1011 | 991 | 250 | 11 | 4991 | 250 | 11 | 4991 | 250
+ 2 | 2 | 4992 | 500 | 1002 | 992 | 500 | 2 | 4992 | 500 | 2 | 4992 | 500
+ 3 | 3 | 4983 | 250 | 1003 | 983 | 250 | 3 | 4983 | 250 | 3 | 4983 | 250
+ 4 | 4 | 4994 | 500 | 1004 | 994 | 500 | 4 | 4994 | 500 | 4 | 4994 | 500
+ 5 | 15 | 4995 | 250 | 1015 | 995 | 250 | 15 | 4995 | 250 | 15 | 4995 | 250
+ 6 | 6 | 4996 | 500 | 1006 | 996 | 500 | 6 | 4996 | 500 | 6 | 4996 | 500
+ 7 | 7 | 4987 | 250 | 1007 | 987 | 250 | 7 | 4987 | 250 | 7 | 4987 | 250
+ 8 | 8 | 4998 | 500 | 1008 | 998 | 500 | 8 | 4998 | 500 | 8 | 4998 | 500
+ 9 | 19 | 4999 | 250 | 1019 | 999 | 250 | 19 | 4999 | 250 | 19 | 4999 | 250
+(10 rows)
+
+-- Ensure parallel aggregation is actually being used.
+explain (costs off) select * from v_pagg_test order by y;
+ QUERY PLAN
+--------------------------------------------------------------------------------------------------------------------------------------
+ GroupAggregate
+ Group Key: pagg_test.y
+ -> Sort
+ Sort Key: pagg_test.y, (((unnest(regexp_split_to_array((string_agg((pagg_test.x)::text, ','::text)), ','::text))))::integer)
+ -> Result
+ -> ProjectSet
+ -> Finalize HashAggregate
+ Group Key: pagg_test.y
+ -> Gather
+ Workers Planned: 2
+ -> Partial HashAggregate
+ Group Key: pagg_test.y
+ -> Parallel Seq Scan on pagg_test
+(13 rows)
+
+set max_parallel_workers_per_gather = 0;
+-- Ensure results are the same without parallel aggregation.
+select * from v_pagg_test order by y;
+ y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct
+---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
+ 0 | 10 | 5000 | 500 | 10 | 990 | 500 | 10 | 5000 | 500 | 10 | 5000 | 500
+ 1 | 11 | 4991 | 250 | 1011 | 991 | 250 | 11 | 4991 | 250 | 11 | 4991 | 250
+ 2 | 2 | 4992 | 500 | 1002 | 992 | 500 | 2 | 4992 | 500 | 2 | 4992 | 500
+ 3 | 3 | 4983 | 250 | 1003 | 983 | 250 | 3 | 4983 | 250 | 3 | 4983 | 250
+ 4 | 4 | 4994 | 500 | 1004 | 994 | 500 | 4 | 4994 | 500 | 4 | 4994 | 500
+ 5 | 15 | 4995 | 250 | 1015 | 995 | 250 | 15 | 4995 | 250 | 15 | 4995 | 250
+ 6 | 6 | 4996 | 500 | 1006 | 996 | 500 | 6 | 4996 | 500 | 6 | 4996 | 500
+ 7 | 7 | 4987 | 250 | 1007 | 987 | 250 | 7 | 4987 | 250 | 7 | 4987 | 250
+ 8 | 8 | 4998 | 500 | 1008 | 998 | 500 | 8 | 4998 | 500 | 8 | 4998 | 500
+ 9 | 19 | 4999 | 250 | 1019 | 999 | 250 | 19 | 4999 | 250 | 19 | 4999 | 250
+(10 rows)
+
+-- Clean up
+reset max_parallel_workers_per_gather;
+reset bytea_output;
+reset min_parallel_table_scan_size;
+reset parallel_leader_participation;
+reset parallel_tuple_cost;
+reset parallel_setup_cost;
+drop view v_pagg_test;
+drop table pagg_test;
-- FILTER tests
select min(unique1) filter (where unique1 > 100) from tenk1;
min
drop table bytea_test_table;
+-- Test parallel string_agg and array_agg
+create table pagg_test (x int, y int);
+insert into pagg_test
+select (case x % 4 when 1 then null else x end), x % 10
+from generate_series(1,5000) x;
+
+set parallel_setup_cost TO 0;
+set parallel_tuple_cost TO 0;
+set parallel_leader_participation TO 0;
+set min_parallel_table_scan_size = 0;
+set bytea_output = 'escape';
+
+-- create a view as we otherwise have to repeat this query a few times.
+create view v_pagg_test AS
+select
+ y,
+ min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct,
+ min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct,
+ min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct,
+ min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct
+from (
+ select
+ y,
+ unnest(regexp_split_to_array(a1.t, ','))::int AS t,
+ unnest(regexp_split_to_array(a1.b::text, ',')) AS b,
+ unnest(a1.a) AS a,
+ unnest(a1.aa) AS aa
+ from (
+ select
+ y,
+ string_agg(x::text, ',') AS t,
+ string_agg(x::text::bytea, ',') AS b,
+ array_agg(x) AS a,
+ array_agg(ARRAY[x]) AS aa
+ from pagg_test
+ group by y
+ ) a1
+) a2
+group by y;
+
+-- Ensure results are correct.
+select * from v_pagg_test order by y;
+
+-- Ensure parallel aggregation is actually being used.
+explain (costs off) select * from v_pagg_test order by y;
+
+set max_parallel_workers_per_gather = 0;
+
+-- Ensure results are the same without parallel aggregation.
+select * from v_pagg_test order by y;
+
+-- Clean up
+reset max_parallel_workers_per_gather;
+reset bytea_output;
+reset min_parallel_table_scan_size;
+reset parallel_leader_participation;
+reset parallel_tuple_cost;
+reset parallel_setup_cost;
+
+drop view v_pagg_test;
+drop table pagg_test;
+
-- FILTER tests
select min(unique1) filter (where unique1 > 100) from tenk1;