*** a/doc/src/sgml/plperl.sgml
--- b/doc/src/sgml/plperl.sgml
***************
*** 191,196 **** select returns_array();
--- 191,231 ----
+ Perl passes PostgreSQL arrays as array
+ references when plperl.convert_array_arguments variable
+ is set to true. By default it's set to false, allowing Perl code written
+ for PostgreSQL versions below 9.1 to run
+ unmodified:
+
+
+ SET plperl.convert_array_arguments = true;
+ CREATE OR REPLACE FUNCTION concat_array_elements(text[]) RETURNS TEXT AS $$
+ my $arg = shift;
+ my $result = "";
+ return undef if (ref $arg ne 'ARRAY');
+ foreach (@$arg) {
+ $result.=$_;
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+
+ SELECT concat_array_elements(ARRAY['PL','/','Perl']);
+
+ The SET command in the example above enables
+ conversion of arrays for the current session only. For permanent effect you
+ can set plperl.convert_array_arguments to true in
+ postgresql.conf file.
+
+
+
+
+ Multi-dimensional arrays are represented as references to
+ lower-dimensional arrays of references in a way common to every Perl
+ programmer.
+
+
+
+
Composite-type arguments are passed to the function as references
to hashes. The keys of the hash are the attribute names of the
composite type. Here is an example:
*** a/src/pl/plperl/GNUmakefile
--- b/src/pl/plperl/GNUmakefile
***************
*** 41,47 **** PERLCHUNKS = plc_perlboot.pl plc_trusted.pl
SHLIB_LINK = $(perl_embed_ldflags)
REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-language=plperl --load-language=plperlu
! REGRESS = plperl plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu
# if Perl can support two interpreters in one backend,
# test plperl-and-plperlu cases
ifneq ($(PERL),)
--- 41,47 ----
SHLIB_LINK = $(perl_embed_ldflags)
REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-language=plperl --load-language=plperlu
! REGRESS = plperl plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu plperl_array
# if Perl can support two interpreters in one backend,
# test plperl-and-plperlu cases
ifneq ($(PERL),)
*** /dev/null
--- b/src/pl/plperl/expected/plperl_array.out
***************
*** 0 ****
--- 1,191 ----
+ CREATE OR REPLACE FUNCTION plperl_sum_array(INTEGER[]) RETURNS INTEGER AS $$
+ my $array_arg = shift;
+ my $result = 0;
+ my @arrays;
+
+ if (ref $array_arg eq "ARRAY") {
+ push @arrays, @$array_arg;
+
+ while (@arrays > 0) {
+ my $el = shift @arrays;
+ if (ref $el eq "ARRAY") {
+ push @arrays, @$el;
+ } else {
+ $result += $el;
+ }
+ }
+ } else {
+ $result = 0;
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+ set plperl.convert_array_arguments = true;
+ select plperl_sum_array('{1,2,NULL}');
+ plperl_sum_array
+ ------------------
+ 3
+ (1 row)
+
+ select plperl_sum_array('{}');
+ plperl_sum_array
+ ------------------
+ 0
+ (1 row)
+
+ select plperl_sum_array('{{1,2,3}, {4,5,6}}');
+ plperl_sum_array
+ ------------------
+ 21
+ (1 row)
+
+ select plperl_sum_array('{{{1,2,3}, {4,5,6}}, {{7,8,9}, {10,11,12}}}');
+ plperl_sum_array
+ ------------------
+ 78
+ (1 row)
+
+ select plperl_sum_array('{{{1,2,3}, {4,5,6,7}}, {{7,8,9}, {10, 11, 12}}}');
+ ERROR: multidimensional arrays must have array expressions with matching dimensions
+ LINE 1: select plperl_sum_array('{{{1,2,3}, {4,5,6,7}}, {{7,8,9}, {1...
+ ^
+ CREATE OR REPLACE FUNCTION plperl_concat(TEXT[]) RETURNS TEXT AS $$
+ my $array_arg = shift;
+ my $result = "";
+ my @arrays;
+
+ if (ref $array_arg eq "ARRAY") {
+ push @arrays, @$array_arg;
+ while (@arrays > 0) {
+ my $el = shift @arrays;
+ if (ref $el eq "ARRAY") {
+ push @arrays, @$el;
+ } else {
+ $result .= $el;
+ }
+ }
+ }
+ else {
+ $result = $array_arg;
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+ select plperl_concat('{"NULL","NULL","NULL''"}');
+ plperl_concat
+ ---------------
+ NULLNULLNULL'
+ (1 row)
+
+ select plperl_concat('{{NULL,NULL,NULL}}');
+ plperl_concat
+ ---------------
+
+ (1 row)
+
+ select plperl_concat('{"hello"," ","world!"}');
+ plperl_concat
+ ---------------
+ hello world!
+ (1 row)
+
+ -- OLD style --
+ reset plperl.convert_array_arguments;
+ select plperl_concat('{"hello"," ","world!"}');
+ plperl_concat
+ --------------------
+ {hello," ",world!}
+ (1 row)
+
+ set plperl.convert_array_arguments = true;
+ -- array of rows --
+ CREATE TYPE foo AS (bar INTEGER, baz TEXT);
+ CREATE OR REPLACE FUNCTION plperl_array_of_rows(foo[]) RETURNS TEXT AS $$
+ my $array_arg = shift;
+ my $result = "";
+
+ die "not an array reference" unless (ref $array_arg eq "ARRAY");
+ foreach my $row_ref (@$array_arg) {
+ die "not a hash reference" unless (ref $row_ref eq "HASH");
+ $result .= $row_ref->{bar}." items of ".$row_ref->{baz}.";";
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+ select plperl_array_of_rows(ARRAY[ ROW(2, 'coffee'), ROW(0, 'sugar')]::foo[]);
+ plperl_array_of_rows
+ -------------------------------------
+ 2 items of coffee;0 items of sugar;
+ (1 row)
+
+ -- composite type containing arrays
+ CREATE TYPE rowfoo AS (bar INTEGER, baz INTEGER[]);
+ CREATE OR REPLACE FUNCTION plperl_sum_row_elements(rowfoo) RETURNS TEXT AS $$
+ my $row_ref = shift;
+ my $result;
+
+ if (ref $row_ref ne 'HASH') {
+ $result = 0;
+ }
+ else {
+ $result = $row_ref->{bar};
+ die "not an array reference".ref ($row_ref->{baz})
+ unless (ref $row_ref->{baz} eq 'ARRAY');
+ # process a single-dimensional array
+ foreach my $elem (@{$row_ref->{baz}}) {
+ $result += $elem unless ref $elem;
+ }
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+ select plperl_sum_row_elements(ROW(1, ARRAY[2,3,4,5,6,7,8,9,10])::rowfoo);
+ plperl_sum_row_elements
+ -------------------------
+ 55
+ (1 row)
+
+ -- composite type containing array of another composite type, which, in order,
+ -- contains an array of integers.
+ CREATE TYPE rowbar AS (foo rowfoo[]);
+ CREATE OR REPLACE FUNCTION plperl_sum_array_of_rows(rowbar) RETURNS TEXT AS $$
+ my $rowfoo_ref = shift;
+ my $result = 0;
+
+ if (ref $rowfoo_ref eq 'HASH') {
+ my $row_array_ref = $rowfoo_ref->{foo};
+ if (ref $row_array_ref eq 'ARRAY') {
+ foreach my $row_ref (@{$row_array_ref}) {
+ if (ref $row_ref eq 'HASH') {
+ $result += $row_ref->{bar};
+ die "not an array reference".ref ($row_ref->{baz})
+ unless (ref $row_ref->{baz} eq 'ARRAY');
+ foreach my $elem (@{$row_ref->{baz}}) {
+ $result += $elem unless ref $elem;
+ }
+ }
+ else {
+ die "element baz is not a reference to a rowfoo";
+ }
+ }
+ } else {
+ die "not a reference to an array of rowfoo elements"
+ }
+ } else {
+ die "not a reference to type rowbar";
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+ select plperl_sum_array_of_rows(ROW(ARRAY[ROW(1, ARRAY[2,3,4,5,6,7,8,9,10])::rowfoo,
+ ROW(11, ARRAY[12,13,14,15,16,17,18,19,20])::rowfoo])::rowbar);
+ plperl_sum_array_of_rows
+ --------------------------
+ 210
+ (1 row)
+
+ -- check arrays as out parameters
+ CREATE OR REPLACE FUNCTION plperl_arrays_out(OUT INTEGER[]) AS $$
+ return [[1,2,3],[4,5,6]];
+ $$ LANGUAGE plperl;
+ select plperl_arrays_out();
+ plperl_arrays_out
+ -------------------
+ {{1,2,3},{4,5,6}}
+ (1 row)
+
*** a/src/pl/plperl/plperl.c
--- b/src/pl/plperl/plperl.c
***************
*** 108,113 **** typedef struct plperl_proc_desc
--- 108,114 ----
int nargs;
FmgrInfo arg_out_func[FUNC_MAX_ARGS];
bool arg_is_rowtype[FUNC_MAX_ARGS];
+ bool arg_is_array[FUNC_MAX_ARGS];
SV *reference;
} plperl_proc_desc;
***************
*** 177,182 **** typedef struct plperl_query_entry
--- 178,196 ----
} plperl_query_entry;
/**********************************************************************
+ * Information for PostgreSQL - Perl array conversion.
+ **********************************************************************/
+ typedef struct plperl_array_info
+ {
+ int ndims;
+ bool typiscomposite;
+ Datum *elements;
+ bool *nulls;
+ int *nelems;
+ FmgrInfo proc;
+ } plperl_array_info;
+
+ /**********************************************************************
* Global data
**********************************************************************/
***************
*** 191,196 **** static bool plperl_use_strict = false;
--- 205,211 ----
static char *plperl_on_init = NULL;
static char *plperl_on_plperl_init = NULL;
static char *plperl_on_plperlu_init = NULL;
+ static bool plperl_convert_array_arguments = false;
static bool plperl_ending = false;
static OP *(*pp_require_orig) (pTHX) = NULL;
***************
*** 227,238 **** static SV **hv_store_string(HV *hv, const char *key, SV *val);
--- 242,256 ----
static SV **hv_fetch_string(HV *hv, const char *key);
static void plperl_create_sub(plperl_proc_desc *desc, char *s, Oid fn_oid);
static SV *plperl_call_perl_func(plperl_proc_desc *desc, FunctionCallInfo fcinfo);
+ static SV *plperl_ref_from_pg_array(Datum arg);
static void plperl_compile_callback(void *arg);
static void plperl_exec_callback(void *arg);
static void plperl_inline_callback(void *arg);
static char *strip_trailing_ws(const char *msg);
static OP *pp_require_safe(pTHX);
static void activate_interpreter(plperl_interp_desc *interp_desc);
+ static SV * split_array(plperl_array_info *info, int a, int z, int nest);
+ static SV * make_array_ref(plperl_array_info *info, int a, int z);
#ifdef WIN32
static char *setlocale_perl(int category, char *locale);
***************
*** 314,319 **** _PG_init(void)
--- 332,344 ----
false,
PGC_USERSET, 0,
NULL, NULL);
+ DefineCustomBoolVariable("plperl.convert_array_arguments",
+ gettext_noop("If true, convert postgres array input parameters to perl arrays, leave them as strings otherwise."),
+ NULL,
+ &plperl_convert_array_arguments,
+ false,
+ PGC_USERSET, 0,
+ NULL, NULL);
/*
* plperl.on_init is marked PGC_SIGHUP to support the idea that it might
***************
*** 1523,1528 **** plperl_call_perl_func(plperl_proc_desc *desc, FunctionCallInfo fcinfo)
--- 1548,1561 ----
PUSHs(sv_2mortal(hashref));
ReleaseTupleDesc(tupdesc);
}
+ else if (desc->arg_is_array[i] && plperl_convert_array_arguments)
+ {
+ /* Get perl array reference from the postgresql array */
+ SV *arrayref;
+ arrayref = plperl_ref_from_pg_array(fcinfo->arg[i]);
+
+ XPUSHs(sv_2mortal(arrayref));
+ }
else
{
char *tmp;
***************
*** 2132,2137 **** compile_plperl_function(Oid fn_oid, bool is_trigger)
--- 2165,2176 ----
perm_fmgr_info(typeStruct->typoutput,
&(prodesc->arg_out_func[i]));
}
+
+ /* Set flag for array attributes */
+ if (typeStruct->typlen == -1 && typeStruct->typelem)
+ prodesc->arg_is_array[i] = true;
+ else
+ prodesc->arg_is_array[i] = false;
ReleaseSysCache(typeTup);
}
***************
*** 2202,2211 **** plperl_hash_from_tuple(HeapTuple tuple, TupleDesc tupdesc)
{
Datum attr;
bool isnull;
char *attname;
char *outputstr;
Oid typoutput;
! bool typisvarlena;
if (tupdesc->attrs[i]->attisdropped)
continue;
--- 2241,2252 ----
{
Datum attr;
bool isnull;
+ bool elementisarray;
char *attname;
char *outputstr;
Oid typoutput;
! HeapTuple typeTup;
! Form_pg_type typeStruct;
if (tupdesc->attrs[i]->attisdropped)
continue;
***************
*** 2219,2234 **** plperl_hash_from_tuple(HeapTuple tuple, TupleDesc tupdesc)
hv_store_string(hv, attname, newSV(0));
continue;
}
! /* XXX should have a way to cache these lookups */
! getTypeOutputInfo(tupdesc->attrs[i]->atttypid,
! &typoutput, &typisvarlena);
!
! outputstr = OidOutputFunctionCall(typoutput, attr);
!
! hv_store_string(hv, attname, newSVstring(outputstr));
!
! pfree(outputstr);
}
return newRV_noinc((SV *) hv);
--- 2260,2297 ----
hv_store_string(hv, attname, newSV(0));
continue;
}
+
+ /* Check whether the attribute is an array */
+ typeTup = SearchSysCache1(TYPEOID,
+ ObjectIdGetDatum(tupdesc->attrs[i]->atttypid));
+
+ if (!HeapTupleIsValid(typeTup))
+ elog(ERROR, "cache lookup failed for type %u",
+ tupdesc->attrs[i]->atttypid);
+
+ typeStruct = (Form_pg_type) GETSTRUCT(typeTup);
+
+ typoutput = typeStruct->typoutput;
+
+ /* Detect whether the member of a composite type is an array*/
+ if (typeStruct->typlen == -1 && typeStruct->typelem)
+ elementisarray = true;
+ else
+ elementisarray = false;
! ReleaseSysCache(typeTup);
!
! if (!elementisarray)
! {
! outputstr = OidOutputFunctionCall(typoutput, attr);
! hv_store_string(hv, attname, newSVstring(outputstr));
! pfree(outputstr);
! }
! else
! {
! SV *arrayref = plperl_ref_from_pg_array(attr);
! hv_store_string(hv, attname, arrayref);
! }
}
return newRV_noinc((SV *) hv);
***************
*** 3282,3284 **** setlocale_perl(int category, char *locale)
--- 3345,3484 ----
}
#endif
+
+ /*
+ * Convert PostgreSQL array datum to a perl array reference.
+ */
+ static SV *
+ plperl_ref_from_pg_array(Datum arg)
+ {
+ ArrayType *ar = DatumGetArrayTypeP(arg);
+ Oid elementtype = ARR_ELEMTYPE(ar);
+ int16 typlen;
+ bool typbyval;
+ char typalign,
+ typdelim;
+ Oid typioparam;
+ Oid typoutputfunc;
+ int i,
+ nitems,
+ *dims;
+ Form_pg_type typeStruct;
+ HeapTuple typeTuple;
+ plperl_array_info *info;
+
+ info = palloc(sizeof(plperl_array_info));
+
+ /* get element type information, including output coversion function */
+ get_type_io_data(elementtype, IOFunc_output,
+ &typlen, &typbyval, &typalign,
+ &typdelim, &typioparam, &typoutputfunc);
+
+ perm_fmgr_info(typoutputfunc, &info->proc);
+
+ typeTuple = SearchSysCache(TYPEOID,
+ ObjectIdGetDatum(elementtype),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(typeTuple))
+ elog(ERROR, "cache lookup failed for type %u",
+ elementtype);
+
+ typeStruct = (Form_pg_type) GETSTRUCT(typeTuple);
+ info->typiscomposite = (typeStruct->typtype == 'c');
+
+ ReleaseSysCache(typeTuple);
+
+ /* Get the number and bounds of array dimensions */
+ info->ndims = ARR_NDIM(ar);
+ dims = ARR_DIMS(ar);
+
+ deconstruct_array(ar, elementtype, typlen, typbyval,
+ typalign, &info->elements, &info->nulls,
+ &nitems);
+
+ /* Get total number of elements in each dimension */
+ info->nelems = palloc(sizeof(int) * info->ndims);
+ info->nelems[0] = nitems;
+ for (i = 1; i < info->ndims; i++)
+ info->nelems[i] = info->nelems[i-1]/dims[i-1];
+
+ return split_array(info, 0, nitems, 0);
+ }
+
+ /* Recursively form array references from splices of the initial array */
+ static SV *
+ split_array(plperl_array_info *info, int a, int z, int nest)
+ {
+ int i;
+ AV *result;
+
+ elog(DEBUG3, "split_array [%d, %d), nest = %d", a, z, nest);
+
+ /*
+ * Base case, return a reference to a single-dimensional array
+ */
+ if (nest >= info->ndims - 1)
+ return make_array_ref(info, a, z);
+
+ result = newAV();
+ for (i = a; i < z; i+=info->nelems[nest + 1])
+ {
+ /*
+ * Recursively form references to arrays of lower dimensions */
+ SV *ref = split_array(info, i, i+info->nelems[nest+1], nest + 1);
+ av_push(result, ref);
+ }
+ return newRV_noinc((SV *)result);
+ }
+
+ /*
+ * Create a Perl reference from a one-dimensional C array, converting
+ * composite type elements to hash references.
+ */
+ static SV *
+ make_array_ref(plperl_array_info *info, int a, int z)
+ {
+ int i;
+ AV *result = newAV();
+
+ for (i = a; i < z; i++ )
+ {
+ if (info->nulls[i])
+ av_push(result, newSV(0));
+ else
+ {
+ Datum itemvalue = info->elements[i];
+
+ /* Handle composite type elements */
+ if (info->typiscomposite)
+ {
+ HeapTupleHeader td;
+ Oid tupType;
+ int32 tupTypmod;
+ TupleDesc tupdesc;
+ HeapTupleData tmptup;
+ SV *hashref;
+
+ td = DatumGetHeapTupleHeader(itemvalue);
+ /* Extract rowtype info and find a tupdesc */
+ tupType = HeapTupleHeaderGetTypeId(td);
+ tupTypmod = HeapTupleHeaderGetTypMod(td);
+ tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
+ /* Build a temporary HeapTuple control structure */
+ tmptup.t_len = HeapTupleHeaderGetDatumLength(td);
+ tmptup.t_data = td;
+
+ hashref = plperl_hash_from_tuple(&tmptup, tupdesc);
+ av_push(result, hashref);
+ ReleaseTupleDesc(tupdesc);
+ }
+ else
+ {
+ char *val = OutputFunctionCall(&info->proc, itemvalue);
+ av_push(result, newSVstring(val));
+ }
+ }
+ }
+ elog(DEBUG3, "creating array reference from [%d, %d)", a, z);
+ return newRV_noinc((SV *) result);
+ }
*** /dev/null
--- b/src/pl/plperl/sql/plperl_array.sql
***************
*** 0 ****
--- 1,144 ----
+ CREATE OR REPLACE FUNCTION plperl_sum_array(INTEGER[]) RETURNS INTEGER AS $$
+ my $array_arg = shift;
+ my $result = 0;
+ my @arrays;
+
+ if (ref $array_arg eq "ARRAY") {
+ push @arrays, @$array_arg;
+
+ while (@arrays > 0) {
+ my $el = shift @arrays;
+ if (ref $el eq "ARRAY") {
+ push @arrays, @$el;
+ } else {
+ $result += $el;
+ }
+ }
+ } else {
+ $result = 0;
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+
+ set plperl.convert_array_arguments = true;
+
+ select plperl_sum_array('{1,2,NULL}');
+ select plperl_sum_array('{}');
+ select plperl_sum_array('{{1,2,3}, {4,5,6}}');
+ select plperl_sum_array('{{{1,2,3}, {4,5,6}}, {{7,8,9}, {10,11,12}}}');
+ select plperl_sum_array('{{{1,2,3}, {4,5,6,7}}, {{7,8,9}, {10, 11, 12}}}');
+
+ CREATE OR REPLACE FUNCTION plperl_concat(TEXT[]) RETURNS TEXT AS $$
+ my $array_arg = shift;
+ my $result = "";
+ my @arrays;
+
+ if (ref $array_arg eq "ARRAY") {
+ push @arrays, @$array_arg;
+ while (@arrays > 0) {
+ my $el = shift @arrays;
+ if (ref $el eq "ARRAY") {
+ push @arrays, @$el;
+ } else {
+ $result .= $el;
+ }
+ }
+ }
+ else {
+ $result = $array_arg;
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+
+ select plperl_concat('{"NULL","NULL","NULL''"}');
+ select plperl_concat('{{NULL,NULL,NULL}}');
+ select plperl_concat('{"hello"," ","world!"}');
+
+ -- OLD style --
+ reset plperl.convert_array_arguments;
+ select plperl_concat('{"hello"," ","world!"}');
+
+ set plperl.convert_array_arguments = true;
+
+ -- array of rows --
+ CREATE TYPE foo AS (bar INTEGER, baz TEXT);
+ CREATE OR REPLACE FUNCTION plperl_array_of_rows(foo[]) RETURNS TEXT AS $$
+ my $array_arg = shift;
+ my $result = "";
+
+ die "not an array reference" unless (ref $array_arg eq "ARRAY");
+ foreach my $row_ref (@$array_arg) {
+ die "not a hash reference" unless (ref $row_ref eq "HASH");
+ $result .= $row_ref->{bar}." items of ".$row_ref->{baz}.";";
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+
+ select plperl_array_of_rows(ARRAY[ ROW(2, 'coffee'), ROW(0, 'sugar')]::foo[]);
+
+ -- composite type containing arrays
+ CREATE TYPE rowfoo AS (bar INTEGER, baz INTEGER[]);
+
+ CREATE OR REPLACE FUNCTION plperl_sum_row_elements(rowfoo) RETURNS TEXT AS $$
+ my $row_ref = shift;
+ my $result;
+
+ if (ref $row_ref ne 'HASH') {
+ $result = 0;
+ }
+ else {
+ $result = $row_ref->{bar};
+ die "not an array reference".ref ($row_ref->{baz})
+ unless (ref $row_ref->{baz} eq 'ARRAY');
+ # process a single-dimensional array
+ foreach my $elem (@{$row_ref->{baz}}) {
+ $result += $elem unless ref $elem;
+ }
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+
+ select plperl_sum_row_elements(ROW(1, ARRAY[2,3,4,5,6,7,8,9,10])::rowfoo);
+
+ -- composite type containing array of another composite type, which, in order,
+ -- contains an array of integers.
+ CREATE TYPE rowbar AS (foo rowfoo[]);
+
+ CREATE OR REPLACE FUNCTION plperl_sum_array_of_rows(rowbar) RETURNS TEXT AS $$
+ my $rowfoo_ref = shift;
+ my $result = 0;
+
+ if (ref $rowfoo_ref eq 'HASH') {
+ my $row_array_ref = $rowfoo_ref->{foo};
+ if (ref $row_array_ref eq 'ARRAY') {
+ foreach my $row_ref (@{$row_array_ref}) {
+ if (ref $row_ref eq 'HASH') {
+ $result += $row_ref->{bar};
+ die "not an array reference".ref ($row_ref->{baz})
+ unless (ref $row_ref->{baz} eq 'ARRAY');
+ foreach my $elem (@{$row_ref->{baz}}) {
+ $result += $elem unless ref $elem;
+ }
+ }
+ else {
+ die "element baz is not a reference to a rowfoo";
+ }
+ }
+ } else {
+ die "not a reference to an array of rowfoo elements"
+ }
+ } else {
+ die "not a reference to type rowbar";
+ }
+ return $result;
+ $$ LANGUAGE plperl;
+
+ select plperl_sum_array_of_rows(ROW(ARRAY[ROW(1, ARRAY[2,3,4,5,6,7,8,9,10])::rowfoo,
+ ROW(11, ARRAY[12,13,14,15,16,17,18,19,20])::rowfoo])::rowbar);
+
+ -- check arrays as out parameters
+ CREATE OR REPLACE FUNCTION plperl_arrays_out(OUT INTEGER[]) AS $$
+ return [[1,2,3],[4,5,6]];
+ $$ LANGUAGE plperl;
+
+ select plperl_arrays_out();