diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c index bb70cba..d14829d 100644 --- a/src/backend/utils/adt/array_userfuncs.c +++ b/src/backend/utils/adt/array_userfuncs.c @@ -13,8 +13,10 @@ #include "postgres.h" #include "catalog/pg_type.h" +#include "libpq/pqformat.h" #include "common/int.h" #include "utils/array.h" +#include "utils/datum.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/typcache.h" @@ -499,6 +501,240 @@ array_agg_transfn(PG_FUNCTION_ARGS) } Datum +array_agg_combine(PG_FUNCTION_ARGS) +{ + ArrayBuildState *state1; + ArrayBuildState *state2; + MemoryContext agg_context; + MemoryContext old_context; + int i; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(1); + + if (state2 == NULL) + PG_RETURN_POINTER(state1); + + /* Manually copy all fields from state2 to state1 */ + if (state1 == NULL) + { + old_context = MemoryContextSwitchTo(agg_context); + + state1 = initArrayResultWithSize(state2->element_type, agg_context, + false, state2->alen); + + state1->nelems = state2->nelems; + + for (i = 0; i < state2->nelems; i++) + state1->dvalues[i] = datumCopy(state2->dvalues[i], + state1->typbyval, state1->typlen); + + memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * state2->nelems); + + state1->nelems = state2->nelems; + + MemoryContextSwitchTo(old_context); + + PG_RETURN_POINTER(state1); + } + + /* We only need to combine the two states if state2 has any elements */ + else if (state2->nelems > 0) + { + int reqsize = state1->nelems + state2->nelems; + MemoryContext oldContext = MemoryContextSwitchTo(state1->mcontext); + + /* + * If there's not enough space in state1 then we'll need to reallocate + * more. + */ + if (state1->alen < reqsize) + { + /* Use a power of 2 size rather than allocating just reqsize */ + while (state1->alen < reqsize) + state1->alen *= 2; + + state1->dvalues = (Datum *) repalloc(state1->dvalues, state1->alen * sizeof(Datum)); + state1->dnulls = (bool *) repalloc(state1->dnulls, state1->alen * sizeof(bool)); + } + + /* Copy in the state2 elements to the end of the state1 arrays */ + for (i = 0; i < state2->nelems; i++) + state1->dvalues[i + state1->nelems] = datumCopy(state2->dvalues[i], + state1->typbyval, state1->typlen); + + memcpy(&state1->dnulls[state1->nelems], state2->dnulls, sizeof(bool) * state2->nelems); + + state1->nelems = reqsize; + + MemoryContextSwitchTo(oldContext); + } + + PG_RETURN_POINTER(state1); +} + +/* + * array_agg_serialize + * Serialize ArrayBuildState into bytea. + */ +Datum +array_agg_serialize(PG_FUNCTION_ARGS) +{ + ArrayBuildState *state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (ArrayBuildState *) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * element_type. Putting this first is more convenient in deserialization + */ + pq_sendint32(&buf, state->element_type); + + /* + * nelems -- send first so we know how large to make the dvalues and + * dnulls array during deserialization. + */ + pq_sendint64(&buf, state->nelems); + + /* alen can be decided during deserialization */ + + /* typlen */ + pq_sendint16(&buf, state->typlen); + + /* typbyval */ + pq_sendbyte(&buf, state->typbyval); + + /* typalign */ + pq_sendbyte(&buf, state->typalign); + + + /* + * dvalues -- this is very simple when the value type is byval, we can + * simply just send the Datums over, however, for non-byval types we must + * work a little harder. + */ + if (state->typbyval) + pq_sendbytes(&buf, (char *) state->dvalues, sizeof(Datum) * state->nelems); + else + { + Oid typsend; + bool typisvarlena; + bytea *outputbytes; + int i; + + /* XXX is there anywhere to cache this to save calling each time? */ + getTypeBinaryOutputInfo(state->element_type, &typsend, &typisvarlena); + + for (i = 0; i < state->nelems; i++) + { + outputbytes = OidSendFunctionCall(typsend, state->dvalues[i]); + pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ); + pq_sendbytes(&buf, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + /* dnulls */ + pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * state->nelems); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +array_agg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + ArrayBuildState *result; + StringInfoData buf; + Oid element_type; + int64 nelems; + const char *temp; + + if (!AggCheckCallContext(fcinfo, NULL)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + /* element_type */ + element_type = pq_getmsgint(&buf, 4); + + /* nelems */ + nelems = pq_getmsgint64(&buf); + + result = initArrayResultWithSize(element_type, CurrentMemoryContext, + false, nelems); + + result->nelems = nelems; + + /* typlen */ + result->typlen = pq_getmsgint(&buf, 2); + + /* typbyval */ + result->typbyval = pq_getmsgbyte(&buf); + + /* typalign */ + result->typalign = pq_getmsgbyte(&buf); + + /* + * dvalues -- this is very simple when the value type is byval, we can + * simply just get all the Datums at once, however, for non-byval types we + * must work a little harder. + */ + if (result->typbyval) + { + temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems); + memcpy(result->dvalues, temp, sizeof(Datum) * nelems); + } + else + { + Oid typreceive; + Oid typioparam; + int i; + + getTypeBinaryInputInfo(element_type, &typreceive, &typioparam); + + for (i = 0; i < nelems; i++) + { + /* XXX? Surely this cannot be the way to do this? */ + StringInfoData tmp; + tmp.cursor = 0; + tmp.maxlen = tmp.len = pq_getmsgint(&buf, 4); + tmp.data = (char *) pq_getmsgbytes(&buf, tmp.len); + + result->dvalues[i] = OidReceiveFunctionCall(typreceive, &tmp, + typioparam, -1); + } + } + + temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems); + memcpy(result->dnulls, temp, sizeof(bool) * nelems); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + +Datum array_agg_finalfn(PG_FUNCTION_ARGS) { Datum result; @@ -578,6 +814,295 @@ array_agg_array_transfn(PG_FUNCTION_ARGS) } Datum +array_agg_array_combine(PG_FUNCTION_ARGS) +{ + ArrayBuildStateArr *state1; + ArrayBuildStateArr *state2; + MemoryContext agg_context; + MemoryContext old_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(1); + + if (state2 == NULL) + PG_RETURN_POINTER(state1); + + /* Manually copy all fields from state2 to state1 */ + if (state1 == NULL) + { + old_context = MemoryContextSwitchTo(agg_context); + + state1 = initArrayResultArr(state2->array_type, InvalidOid, + agg_context, false); + + state1->abytes = state2->abytes; + state1->data = (char *) palloc(state1->abytes); + + if (state2->nullbitmap) + { + int size = (state2->aitems + 7) / 8; + state1->nullbitmap = (bits8 *) palloc(size); + memcpy(state1->nullbitmap, state2->nullbitmap, size); + } + + memcpy(state1->data, state2->data, state2->nbytes); + state1->nbytes = state2->nbytes; + state1->aitems = state2->aitems; + state1->nitems = state2->nitems; + state1->ndims = state2->ndims; + memcpy(state1->dims, state2->dims, sizeof(state2->dims)); + memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs)); + state1->array_type = state2->array_type; + state1->element_type = state2->element_type; + + MemoryContextSwitchTo(old_context); + + PG_RETURN_POINTER(state1); + } + + /* We only need to combine the two states if state2 has any items */ + else if (state2->nitems > 0) + { + MemoryContext oldContext; + int reqsize = state1->nbytes + state2->nbytes; + int i; + + /* + * Check the states are compatible with each other. Ensure we use the + * same error messages that are listed in accumArrayResultArr so that + * the same error is shown as would have been if we'd not used the + * combine function for the aggregation. + */ + if (state1->ndims != state2->ndims) + ereport(ERROR, + (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), + errmsg("cannot accumulate arrays of different dimensionality"))); + + /* Check dimensions match ignoring the first dimension. */ + for (i = 1; i < state1->ndims; i++) + { + if (state1->dims[i] != state2->dims[i] || state1->lbs[i] != state2->lbs[i]) + ereport(ERROR, + (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), + errmsg("cannot accumulate arrays of different dimensionality"))); + } + + + oldContext = MemoryContextSwitchTo(state1->mcontext); + + /* + * If there's not enough space in state1 then we'll need to reallocate + * more. + */ + if (state1->abytes < reqsize) + { + /* use a power of 2 size rather than allocating just reqsize */ + while (state1->abytes < reqsize) + state1->abytes *= 2; + + state1->data = (char *) repalloc(state1->data, state1->abytes); + } + + if (state2->nullbitmap) + { + int newnitems = state1->nitems + state2->nitems; + + if (state1->nullbitmap == NULL) + { + /* + * First input with nulls; we must retrospectively handle any + * previous inputs by marking all their items non-null. + */ + state1->aitems = 256; + while (state1->aitems <= newnitems) + state1->aitems *= 2; + state1->nullbitmap = (bits8 *) palloc((state1->aitems + 7) / 8); + array_bitmap_copy(state1->nullbitmap, 0, + NULL, 0, + state1->nitems); + } + else if (newnitems > state1->aitems) + { + int newaitems = state1->aitems + state2->aitems; + + while (state1->aitems < newaitems) + state1->aitems *= 2; + + state1->nullbitmap = (bits8 *) + repalloc(state1->nullbitmap, (state1->aitems + 7) / 8); + } + array_bitmap_copy(state1->nullbitmap, state1->nitems, + state2->nullbitmap, 0, + state2->nitems); + } + + memcpy(state1->data + state1->nbytes, state2->data, state2->nbytes); + state1->nbytes += state2->nbytes; + state1->nitems += state2->nitems; + + state1->dims[0] += state2->dims[0]; + /* remaing dims already match, per test above */ + + Assert(state1->array_type == state2->array_type); + Assert(state1->element_type = state2->element_type); + + MemoryContextSwitchTo(oldContext); + } + + PG_RETURN_POINTER(state1); +} + +/* + * array_agg_array_serialize + * Serialize ArrayBuildStateArr into bytea. + */ +Datum +array_agg_array_serialize(PG_FUNCTION_ARGS) +{ + ArrayBuildStateArr *state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * element_type. Putting this first is more convenient in deserialization + * so that we can init the new state sooner. + */ + pq_sendint32(&buf, state->element_type); + + /* array_type */ + pq_sendint32(&buf, state->array_type); + + /* nbytes */ + pq_sendint32(&buf, state->nbytes); + + /* data */ + pq_sendbytes(&buf, state->data, state->nbytes); + + /* abytes */ + pq_sendint32(&buf, state->abytes); + + /* aitems */ + pq_sendint32(&buf, state->aitems); + + /* nullbitmap */ + if (state->nullbitmap) + { + Assert(state->aitems == 0); + pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 7) / 8); + } + + /* nitems */ + pq_sendint32(&buf, state->nitems); + + /* ndims */ + pq_sendint32(&buf, state->ndims); + + /* dims: XXX or should we just send ndim's worth? */ + pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims)); + + /* lbs */ + pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs)); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +array_agg_array_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + ArrayBuildStateArr *result; + StringInfoData buf; + Oid element_type; + Oid array_type; + int nbytes; + const char *temp; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + /* element_type */ + element_type = pq_getmsgint(&buf, 4); + + /* array_type */ + array_type = pq_getmsgint(&buf, 4); + + /* nbytes */ + nbytes = pq_getmsgint(&buf, 4); + + result = initArrayResultArr(array_type, element_type, + CurrentMemoryContext, false); + + result->abytes = 1024; + while (result->abytes < nbytes) + result->abytes *= 2; + + result->data = (char *) palloc(result->abytes); + + /* data */ + temp = pq_getmsgbytes(&buf, nbytes); + memcpy(result->data, temp, nbytes); + result->nbytes = nbytes; + + /* abytes */ + result->abytes = pq_getmsgint(&buf, 4); + + /* aitems: might be 0 */ + result->aitems = pq_getmsgint(&buf, 4); + + /* nullbitmap */ + if (result->aitems > 0) + { + int size = (result->aitems + 7) / 8; + result->nullbitmap = (bits8 *) palloc(size); + temp = pq_getmsgbytes(&buf, size); + memcpy(result->nullbitmap, temp, size); + } + else + result->nullbitmap = NULL; + + /* nitems */ + result->nitems = pq_getmsgint(&buf, 4); + + /* ndims */ + result->ndims = pq_getmsgint(&buf, 4); + + /* dims */ + temp = pq_getmsgbytes(&buf, sizeof(result->dims)); + memcpy(result->dims, temp, sizeof(result->dims)); + + /* lbs */ + temp = pq_getmsgbytes(&buf, sizeof(result->lbs)); + memcpy(result->lbs, temp, sizeof(result->lbs)); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + +Datum array_agg_array_finalfn(PG_FUNCTION_ARGS) { Datum result; diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c index ac21241..97e80c0 100644 --- a/src/backend/utils/adt/arrayfuncs.c +++ b/src/backend/utils/adt/arrayfuncs.c @@ -138,6 +138,9 @@ static void array_insert_slice(ArrayType *destArray, ArrayType *origArray, int ndim, int *dim, int *lb, int *st, int *endp, int typlen, bool typbyval, char typalign); +static ArrayBuildState *initArrayResultInternal(Oid element_type, + MemoryContext rcontext, + bool subcontext, int initsize); static int array_cmp(FunctionCallInfo fcinfo); static ArrayType *create_array_envelope(int ndims, int *dimv, int *lbv, int nbytes, Oid elmtype, int dataoffset); @@ -5002,6 +5005,19 @@ array_insert_slice(ArrayType *destArray, ArrayBuildState * initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) { + return initArrayResultWithSize(element_type, rcontext, subcontext, + subcontext ? 64 : 8); +} + +/* + * initArrayResultWithSize + * As initArrayResult, but allow the initial size of the allocated arrays + * to be specified. + */ +ArrayBuildState * +initArrayResultWithSize(Oid element_type, MemoryContext rcontext, + bool subcontext, int initsize) +{ ArrayBuildState *astate; MemoryContext arr_context = rcontext; @@ -5015,7 +5031,7 @@ initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) MemoryContextAlloc(arr_context, sizeof(ArrayBuildState)); astate->mcontext = arr_context; astate->private_cxt = subcontext; - astate->alen = (subcontext ? 64 : 8); /* arbitrary starting array size */ + astate->alen = initsize; astate->dvalues = (Datum *) MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum)); astate->dnulls = (bool *) diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c index a84e845..229bfde 100644 --- a/src/backend/utils/adt/varlena.c +++ b/src/backend/utils/adt/varlena.c @@ -459,13 +459,28 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS) { bytea *value = PG_GETARG_BYTEA_PP(1); - /* On the first time through, we ignore the delimiter. */ + /* + * We always append the delimiter text before the value text, even + * with the first aggregated item. The reason for this is that if + * this state needs to be combined with another state using the + * aggregate's combine function, then we need to have the delimiter + * for the first aggregated item. The first delimiter will be + * stripped off in the final function anyway. We use a little cheat + * here and store the position of the actual first value (after the + * delimiter) using the StringInfo's cursor variable. This relies on + * the cursor not being used for any other purpose. + */ if (state == NULL) + { state = makeStringAggState(fcinfo); - else if (!PG_ARGISNULL(2)) + + if (!PG_ARGISNULL(2)) + state->cursor = VARSIZE_ANY_EXHDR(PG_GETARG_BYTEA_PP(2)); + } + + if (!PG_ARGISNULL(2)) { bytea *delim = PG_GETARG_BYTEA_PP(2); - appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim)); } @@ -480,6 +495,72 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS) } Datum +bytea_string_agg_serialize(PG_FUNCTION_ARGS) +{ + StringInfo state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (StringInfo) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * cursor -- we must send this position so we can properly skip past the + * delimiter for the first aggregated string + */ + pq_sendint(&buf, state->cursor, 4); + + pq_sendbytes(&buf, state->data, state->len); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +bytea_string_agg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + StringInfo result; + StringInfoData buf; + char *temp; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + result = makeStringAggState(fcinfo); + + /* cursor */ + result->cursor = pq_getmsgint(&buf, 4); + + /* + * data: technically we could reuse the buf.data buffer, but that is + * slightly larger than we need due to the extra 4 bytes for the cursor + */ + temp = (char *) pq_getmsgbytes(&buf, VARSIZE_ANY_EXHDR(sstate) - 4); + appendBinaryStringInfo(result, temp, VARSIZE_ANY_EXHDR(sstate) - 4); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + +Datum bytea_string_agg_finalfn(PG_FUNCTION_ARGS) { StringInfo state; @@ -493,9 +574,10 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS) { bytea *result; - result = (bytea *) palloc(state->len + VARHDRSZ); - SET_VARSIZE(result, state->len + VARHDRSZ); - memcpy(VARDATA(result), state->data, state->len); + result = (bytea *) palloc(state->len - state->cursor + VARHDRSZ); + SET_VARSIZE(result, state->len - state->cursor + VARHDRSZ); + memcpy(VARDATA(result), &state->data[state->cursor], + state->len - state->cursor); PG_RETURN_BYTEA_P(result); } else @@ -4704,10 +4786,27 @@ string_agg_transfn(PG_FUNCTION_ARGS) /* Append the value unless null. */ if (!PG_ARGISNULL(1)) { - /* On the first time through, we ignore the delimiter. */ + + /* + * We always append the delimiter text before the value text, even + * with the first aggregated item. The reason for this is that if + * this state needs to be combined with another state using the + * aggregate's combine function, then we need to have the delimiter + * for the first aggregated item. The first delimiter will be + * stripped off in the final function anyway. We use a little cheat + * here and store the position of the actual first value (after the + * delimiter) using the StringInfo's cursor variable. This relies on + * the cursor not being used for any other purpose. + */ if (state == NULL) + { state = makeStringAggState(fcinfo); - else if (!PG_ARGISNULL(2)) + + if (!PG_ARGISNULL(2)) + state->cursor = VARSIZE_ANY_EXHDR(PG_GETARG_TEXT_PP(2)); + } + + if (!PG_ARGISNULL(2)) appendStringInfoText(state, PG_GETARG_TEXT_PP(2)); /* delimiter */ appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */ @@ -4720,6 +4819,108 @@ string_agg_transfn(PG_FUNCTION_ARGS) PG_RETURN_POINTER(state); } +/* + * string_agg_combine + * Aggregate combine function for string_agg(text) and string_agg(bytea) + */ +Datum +string_agg_combine(PG_FUNCTION_ARGS) +{ + StringInfo state1; + StringInfo state2; + MemoryContext agg_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1); + + if (state2 == NULL) + PG_RETURN_POINTER(state1); + + if (state1 == NULL) + { + MemoryContext old_context; + + old_context = MemoryContextSwitchTo(agg_context); + state1 = makeStringAggState(fcinfo); + + /* The cursor marks the start of the actual data */ + state1->cursor = state2->cursor; + + appendBinaryStringInfo(state1, state2->data, state2->len); + + MemoryContextSwitchTo(old_context); + } + else if (state2->len > 0) + appendBinaryStringInfo(state1, state2->data, state2->len); + + PG_RETURN_POINTER(state1); +} + +Datum +string_agg_serialize(PG_FUNCTION_ARGS) +{ + StringInfo state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (StringInfo) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * cursor -- we must send this position so we can properly skip past the + * delimiter for the first aggregated string + */ + pq_sendint(&buf, state->cursor, 4); + + pq_sendtext(&buf, state->data, state->len); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); +} + +Datum +string_agg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + StringInfo result; + StringInfoData buf; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + result = makeStringAggState(fcinfo); + + /* cursor */ + result->cursor = pq_getmsgint(&buf, 4); + + /* data: pq_getmsgtext will allocate memory in the required context */ + result->data = pq_getmsgtext(&buf, VARSIZE_ANY_EXHDR(sstate) - 4, + &result->len); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + Datum string_agg_finalfn(PG_FUNCTION_ARGS) { @@ -4730,8 +4931,13 @@ string_agg_finalfn(PG_FUNCTION_ARGS) state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); + /* + * Return the text removing the first delimiter text as marked by the + * cursor. + */ if (state != NULL) - PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len)); + PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor], + state->len - state->cursor)); else PG_RETURN_NULL(); } diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index 13f1bce..d6410e8 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -300,14 +300,14 @@ DATA(insert ( 2243 n 0 bitor - bitor - - - - - f f r r 0 1560 0 0 DATA(insert ( 2901 n 0 xmlconcat2 - - - - - - - f f r r 0 142 0 0 0 _null_ _null_ )); /* array */ -DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn - - - - - - t f r r 0 2281 0 0 0 _null_ _null_ )); -DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn - - - - - - t f r r 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn array_agg_combine array_agg_serialize array_agg_deserialize - - - t f r r 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn array_agg_array_combine array_agg_array_serialize array_agg_array_deserialize - - - t f r r 0 2281 0 0 0 _null_ _null_ )); /* text */ -DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn string_agg_combine string_agg_serialize string_agg_deserialize - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* bytea */ -DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn string_agg_combine bytea_string_agg_serialize bytea_string_agg_deserialize - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* json */ DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index c969375..05ad4bc 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -924,12 +924,24 @@ DATA(insert OID = 3168 ( array_replace PGNSP PGUID 12 1 0 0 0 f f f f f f i DESCR("replace any occurrences of an element in an array"); DATA(insert OID = 2333 ( array_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f f i s 2 0 2281 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); +DATA(insert OID = 3419 ( array_agg_combine PGNSP PGUID 12 1 0 0 0 f f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_combine _null_ _null_ _null_ )); +DESCR("aggregate combine function"); +DATA(insert OID = 3420 ( array_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_serialize _null_ _null_ _null_ )); +DESCR("aggregate serial function"); +DATA(insert OID = 3421 ( array_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_deserialize _null_ _null_ _null_ )); +DESCR("aggregate deserial function"); DATA(insert OID = 2334 ( array_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f f i s 2 0 2277 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 2335 ( array_agg PGNSP PGUID 12 1 0 0 0 t f f f f f i s 1 0 2277 "2776" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); DESCR("concatenate aggregate input into an array"); DATA(insert OID = 4051 ( array_agg_array_transfn PGNSP PGUID 12 1 0 0 0 f f f f f f i s 2 0 2281 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); +DATA(insert OID = 3451 ( array_agg_array_combine PGNSP PGUID 12 1 0 0 0 f f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_combine _null_ _null_ _null_ )); +DESCR("aggregate combine function"); +DATA(insert OID = 3452 ( array_agg_array_serialize PGNSP PGUID 12 1 0 0 0 f f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_serialize _null_ _null_ _null_ )); +DESCR("aggregate serial function"); +DATA(insert OID = 3453 ( array_agg_array_deserialize PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_deserialize _null_ _null_ _null_ )); +DESCR("aggregate deserial function"); DATA(insert OID = 4052 ( array_agg_array_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f f i s 2 0 2277 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 4053 ( array_agg PGNSP PGUID 12 1 0 0 0 t f f f f f i s 1 0 2277 "2277" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); @@ -2690,12 +2702,23 @@ DESCR("aggregate final function"); DATA(insert OID = 3535 ( string_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f f i s 3 0 2281 "2281 25 25" _null_ _null_ _null_ _null_ _null_ string_agg_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); +DATA(insert OID = 3422 ( string_agg_combine PGNSP PGUID 12 1 0 0 0 f f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ string_agg_combine _null_ _null_ _null_ )); +DESCR("aggregate combine function"); +DATA(insert OID = 3423 ( string_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_serialize _null_ _null_ _null_ )); +DESCR("aggregate serial function"); +DATA(insert OID = 3424 ( string_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ string_agg_deserialize _null_ _null_ _null_ )); +DESCR("aggregate deserial function"); + DATA(insert OID = 3536 ( string_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f f i s 1 0 25 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 3538 ( string_agg PGNSP PGUID 12 1 0 0 0 t f f f f f i s 2 0 25 "25 25" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); DESCR("concatenate aggregate input into a string"); DATA(insert OID = 3543 ( bytea_string_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f f i s 3 0 2281 "2281 17 17" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); +DATA(insert OID = 3449 ( bytea_string_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_serialize _null_ _null_ _null_ )); +DESCR("aggregate serial function"); +DATA(insert OID = 3450 ( bytea_string_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_deserialize _null_ _null_ _null_ )); +DESCR("aggregate deserial function"); DATA(insert OID = 3544 ( bytea_string_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ bytea_string_agg_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 3545 ( string_agg PGNSP PGUID 12 1 0 0 0 t f f f f f i s 2 0 17 "17 17" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); diff --git a/src/include/utils/array.h b/src/include/utils/array.h index cc19879..7550735 100644 --- a/src/include/utils/array.h +++ b/src/include/utils/array.h @@ -393,6 +393,9 @@ extern bool array_contains_nulls(ArrayType *array); extern ArrayBuildState *initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext); +extern ArrayBuildState *initArrayResultWithSize(Oid element_type, + MemoryContext rcontext, + bool subcontext, int initsize); extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate, Datum dvalue, bool disnull, Oid element_type,