*** a/doc/src/sgml/plperl.sgml --- b/doc/src/sgml/plperl.sgml *************** *** 191,196 **** select returns_array(); --- 191,231 ---- + Perl passes PostgreSQL arrays as array + references when plperl.convert_array_arguments variable + is set to true. By default it's set to false, allowing Perl code written + for PostgreSQL versions below 9.1 to run + unmodified: + + + SET plperl.convert_array_arguments = true; + CREATE OR REPLACE FUNCTION concat_array_elements(text[]) RETURNS TEXT AS $$ + my $arg = shift; + my $result = ""; + return undef if (ref $arg ne 'ARRAY'); + foreach (@$arg) { + $result.=$_; + } + return $result; + $$ LANGUAGE plperl; + + SELECT concat_array_elements(ARRAY['PL','/','Perl']); + + The SET command in the example above enables + conversion of arrays for the current session only. For permanent effect you + can set plperl.convert_array_arguments to true in + postgresql.conf file. + + + + + Multi-dimensional arrays are represented as references to + lower-dimensional arrays of references in a way common to every Perl + programmer. + + + + Composite-type arguments are passed to the function as references to hashes. The keys of the hash are the attribute names of the composite type. Here is an example: *** a/src/pl/plperl/GNUmakefile --- b/src/pl/plperl/GNUmakefile *************** *** 41,47 **** PERLCHUNKS = plc_perlboot.pl plc_trusted.pl SHLIB_LINK = $(perl_embed_ldflags) REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-language=plperl --load-language=plperlu ! REGRESS = plperl plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu # if Perl can support two interpreters in one backend, # test plperl-and-plperlu cases ifneq ($(PERL),) --- 41,47 ---- SHLIB_LINK = $(perl_embed_ldflags) REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-language=plperl --load-language=plperlu ! REGRESS = plperl plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu plperl_array # if Perl can support two interpreters in one backend, # test plperl-and-plperlu cases ifneq ($(PERL),) *** /dev/null --- b/src/pl/plperl/expected/plperl_array.out *************** *** 0 **** --- 1,191 ---- + CREATE OR REPLACE FUNCTION plperl_sum_array(INTEGER[]) RETURNS INTEGER AS $$ + my $array_arg = shift; + my $result = 0; + my @arrays; + + if (ref $array_arg eq "ARRAY") { + push @arrays, @$array_arg; + + while (@arrays > 0) { + my $el = shift @arrays; + if (ref $el eq "ARRAY") { + push @arrays, @$el; + } else { + $result += $el; + } + } + } else { + $result = 0; + } + return $result; + $$ LANGUAGE plperl; + set plperl.convert_array_arguments = true; + select plperl_sum_array('{1,2,NULL}'); + plperl_sum_array + ------------------ + 3 + (1 row) + + select plperl_sum_array('{}'); + plperl_sum_array + ------------------ + 0 + (1 row) + + select plperl_sum_array('{{1,2,3}, {4,5,6}}'); + plperl_sum_array + ------------------ + 21 + (1 row) + + select plperl_sum_array('{{{1,2,3}, {4,5,6}}, {{7,8,9}, {10,11,12}}}'); + plperl_sum_array + ------------------ + 78 + (1 row) + + select plperl_sum_array('{{{1,2,3}, {4,5,6,7}}, {{7,8,9}, {10, 11, 12}}}'); + ERROR: multidimensional arrays must have array expressions with matching dimensions + LINE 1: select plperl_sum_array('{{{1,2,3}, {4,5,6,7}}, {{7,8,9}, {1... + ^ + CREATE OR REPLACE FUNCTION plperl_concat(TEXT[]) RETURNS TEXT AS $$ + my $array_arg = shift; + my $result = ""; + my @arrays; + + if (ref $array_arg eq "ARRAY") { + push @arrays, @$array_arg; + while (@arrays > 0) { + my $el = shift @arrays; + if (ref $el eq "ARRAY") { + push @arrays, @$el; + } else { + $result .= $el; + } + } + } + else { + $result = $array_arg; + } + return $result; + $$ LANGUAGE plperl; + select plperl_concat('{"NULL","NULL","NULL''"}'); + plperl_concat + --------------- + NULLNULLNULL' + (1 row) + + select plperl_concat('{{NULL,NULL,NULL}}'); + plperl_concat + --------------- + + (1 row) + + select plperl_concat('{"hello"," ","world!"}'); + plperl_concat + --------------- + hello world! + (1 row) + + -- OLD style -- + reset plperl.convert_array_arguments; + select plperl_concat('{"hello"," ","world!"}'); + plperl_concat + -------------------- + {hello," ",world!} + (1 row) + + set plperl.convert_array_arguments = true; + -- array of rows -- + CREATE TYPE foo AS (bar INTEGER, baz TEXT); + CREATE OR REPLACE FUNCTION plperl_array_of_rows(foo[]) RETURNS TEXT AS $$ + my $array_arg = shift; + my $result = ""; + + die "not an array reference" unless (ref $array_arg eq "ARRAY"); + foreach my $row_ref (@$array_arg) { + die "not a hash reference" unless (ref $row_ref eq "HASH"); + $result .= $row_ref->{bar}." items of ".$row_ref->{baz}.";"; + } + return $result; + $$ LANGUAGE plperl; + select plperl_array_of_rows(ARRAY[ ROW(2, 'coffee'), ROW(0, 'sugar')]::foo[]); + plperl_array_of_rows + ------------------------------------- + 2 items of coffee;0 items of sugar; + (1 row) + + -- composite type containing arrays + CREATE TYPE rowfoo AS (bar INTEGER, baz INTEGER[]); + CREATE OR REPLACE FUNCTION plperl_sum_row_elements(rowfoo) RETURNS TEXT AS $$ + my $row_ref = shift; + my $result; + + if (ref $row_ref ne 'HASH') { + $result = 0; + } + else { + $result = $row_ref->{bar}; + die "not an array reference".ref ($row_ref->{baz}) + unless (ref $row_ref->{baz} eq 'ARRAY'); + # process a single-dimensional array + foreach my $elem (@{$row_ref->{baz}}) { + $result += $elem unless ref $elem; + } + } + return $result; + $$ LANGUAGE plperl; + select plperl_sum_row_elements(ROW(1, ARRAY[2,3,4,5,6,7,8,9,10])::rowfoo); + plperl_sum_row_elements + ------------------------- + 55 + (1 row) + + -- composite type containing array of another composite type, which, in order, + -- contains an array of integers. + CREATE TYPE rowbar AS (foo rowfoo[]); + CREATE OR REPLACE FUNCTION plperl_sum_array_of_rows(rowbar) RETURNS TEXT AS $$ + my $rowfoo_ref = shift; + my $result = 0; + + if (ref $rowfoo_ref eq 'HASH') { + my $row_array_ref = $rowfoo_ref->{foo}; + if (ref $row_array_ref eq 'ARRAY') { + foreach my $row_ref (@{$row_array_ref}) { + if (ref $row_ref eq 'HASH') { + $result += $row_ref->{bar}; + die "not an array reference".ref ($row_ref->{baz}) + unless (ref $row_ref->{baz} eq 'ARRAY'); + foreach my $elem (@{$row_ref->{baz}}) { + $result += $elem unless ref $elem; + } + } + else { + die "element baz is not a reference to a rowfoo"; + } + } + } else { + die "not a reference to an array of rowfoo elements" + } + } else { + die "not a reference to type rowbar"; + } + return $result; + $$ LANGUAGE plperl; + select plperl_sum_array_of_rows(ROW(ARRAY[ROW(1, ARRAY[2,3,4,5,6,7,8,9,10])::rowfoo, + ROW(11, ARRAY[12,13,14,15,16,17,18,19,20])::rowfoo])::rowbar); + plperl_sum_array_of_rows + -------------------------- + 210 + (1 row) + + -- check arrays as out parameters + CREATE OR REPLACE FUNCTION plperl_arrays_out(OUT INTEGER[]) AS $$ + return [[1,2,3],[4,5,6]]; + $$ LANGUAGE plperl; + select plperl_arrays_out(); + plperl_arrays_out + ------------------- + {{1,2,3},{4,5,6}} + (1 row) + *** a/src/pl/plperl/plperl.c --- b/src/pl/plperl/plperl.c *************** *** 108,113 **** typedef struct plperl_proc_desc --- 108,114 ---- int nargs; FmgrInfo arg_out_func[FUNC_MAX_ARGS]; bool arg_is_rowtype[FUNC_MAX_ARGS]; + bool arg_is_array[FUNC_MAX_ARGS]; SV *reference; } plperl_proc_desc; *************** *** 177,182 **** typedef struct plperl_query_entry --- 178,196 ---- } plperl_query_entry; /********************************************************************** + * Information for PostgreSQL - Perl array conversion. + **********************************************************************/ + typedef struct plperl_array_info + { + int ndims; + bool typiscomposite; + Datum *elements; + bool *nulls; + int *nelems; + FmgrInfo proc; + } plperl_array_info; + + /********************************************************************** * Global data **********************************************************************/ *************** *** 191,196 **** static bool plperl_use_strict = false; --- 205,211 ---- static char *plperl_on_init = NULL; static char *plperl_on_plperl_init = NULL; static char *plperl_on_plperlu_init = NULL; + static bool plperl_convert_array_arguments = false; static bool plperl_ending = false; static OP *(*pp_require_orig) (pTHX) = NULL; *************** *** 227,238 **** static SV **hv_store_string(HV *hv, const char *key, SV *val); --- 242,256 ---- static SV **hv_fetch_string(HV *hv, const char *key); static void plperl_create_sub(plperl_proc_desc *desc, char *s, Oid fn_oid); static SV *plperl_call_perl_func(plperl_proc_desc *desc, FunctionCallInfo fcinfo); + static SV *plperl_ref_from_pg_array(Datum arg); static void plperl_compile_callback(void *arg); static void plperl_exec_callback(void *arg); static void plperl_inline_callback(void *arg); static char *strip_trailing_ws(const char *msg); static OP *pp_require_safe(pTHX); static void activate_interpreter(plperl_interp_desc *interp_desc); + static SV * split_array(plperl_array_info *info, int a, int z, int nest); + static SV * make_array_ref(plperl_array_info *info, int a, int z); #ifdef WIN32 static char *setlocale_perl(int category, char *locale); *************** *** 314,319 **** _PG_init(void) --- 332,344 ---- false, PGC_USERSET, 0, NULL, NULL); + DefineCustomBoolVariable("plperl.convert_array_arguments", + gettext_noop("If true, convert postgres array input parameters to perl arrays, leave them as strings otherwise."), + NULL, + &plperl_convert_array_arguments, + false, + PGC_USERSET, 0, + NULL, NULL); /* * plperl.on_init is marked PGC_SIGHUP to support the idea that it might *************** *** 1523,1528 **** plperl_call_perl_func(plperl_proc_desc *desc, FunctionCallInfo fcinfo) --- 1548,1561 ---- PUSHs(sv_2mortal(hashref)); ReleaseTupleDesc(tupdesc); } + else if (desc->arg_is_array[i] && plperl_convert_array_arguments) + { + /* Get perl array reference from the postgresql array */ + SV *arrayref; + arrayref = plperl_ref_from_pg_array(fcinfo->arg[i]); + + XPUSHs(sv_2mortal(arrayref)); + } else { char *tmp; *************** *** 2132,2137 **** compile_plperl_function(Oid fn_oid, bool is_trigger) --- 2165,2176 ---- perm_fmgr_info(typeStruct->typoutput, &(prodesc->arg_out_func[i])); } + + /* Set flag for array attributes */ + if (typeStruct->typlen == -1 && typeStruct->typelem) + prodesc->arg_is_array[i] = true; + else + prodesc->arg_is_array[i] = false; ReleaseSysCache(typeTup); } *************** *** 2202,2211 **** plperl_hash_from_tuple(HeapTuple tuple, TupleDesc tupdesc) { Datum attr; bool isnull; char *attname; char *outputstr; Oid typoutput; ! bool typisvarlena; if (tupdesc->attrs[i]->attisdropped) continue; --- 2241,2252 ---- { Datum attr; bool isnull; + bool elementisarray; char *attname; char *outputstr; Oid typoutput; ! HeapTuple typeTup; ! Form_pg_type typeStruct; if (tupdesc->attrs[i]->attisdropped) continue; *************** *** 2219,2234 **** plperl_hash_from_tuple(HeapTuple tuple, TupleDesc tupdesc) hv_store_string(hv, attname, newSV(0)); continue; } ! /* XXX should have a way to cache these lookups */ ! getTypeOutputInfo(tupdesc->attrs[i]->atttypid, ! &typoutput, &typisvarlena); ! ! outputstr = OidOutputFunctionCall(typoutput, attr); ! ! hv_store_string(hv, attname, newSVstring(outputstr)); ! ! pfree(outputstr); } return newRV_noinc((SV *) hv); --- 2260,2297 ---- hv_store_string(hv, attname, newSV(0)); continue; } + + /* Check whether the attribute is an array */ + typeTup = SearchSysCache1(TYPEOID, + ObjectIdGetDatum(tupdesc->attrs[i]->atttypid)); + + if (!HeapTupleIsValid(typeTup)) + elog(ERROR, "cache lookup failed for type %u", + tupdesc->attrs[i]->atttypid); + + typeStruct = (Form_pg_type) GETSTRUCT(typeTup); + + typoutput = typeStruct->typoutput; + + /* Detect whether the member of a composite type is an array*/ + if (typeStruct->typlen == -1 && typeStruct->typelem) + elementisarray = true; + else + elementisarray = false; ! ReleaseSysCache(typeTup); ! ! if (!elementisarray) ! { ! outputstr = OidOutputFunctionCall(typoutput, attr); ! hv_store_string(hv, attname, newSVstring(outputstr)); ! pfree(outputstr); ! } ! else ! { ! SV *arrayref = plperl_ref_from_pg_array(attr); ! hv_store_string(hv, attname, arrayref); ! } } return newRV_noinc((SV *) hv); *************** *** 3282,3284 **** setlocale_perl(int category, char *locale) --- 3345,3484 ---- } #endif + + /* + * Convert PostgreSQL array datum to a perl array reference. + */ + static SV * + plperl_ref_from_pg_array(Datum arg) + { + ArrayType *ar = DatumGetArrayTypeP(arg); + Oid elementtype = ARR_ELEMTYPE(ar); + int16 typlen; + bool typbyval; + char typalign, + typdelim; + Oid typioparam; + Oid typoutputfunc; + int i, + nitems, + *dims; + Form_pg_type typeStruct; + HeapTuple typeTuple; + plperl_array_info *info; + + info = palloc(sizeof(plperl_array_info)); + + /* get element type information, including output coversion function */ + get_type_io_data(elementtype, IOFunc_output, + &typlen, &typbyval, &typalign, + &typdelim, &typioparam, &typoutputfunc); + + perm_fmgr_info(typoutputfunc, &info->proc); + + typeTuple = SearchSysCache(TYPEOID, + ObjectIdGetDatum(elementtype), + 0, 0, 0); + if (!HeapTupleIsValid(typeTuple)) + elog(ERROR, "cache lookup failed for type %u", + elementtype); + + typeStruct = (Form_pg_type) GETSTRUCT(typeTuple); + info->typiscomposite = (typeStruct->typtype == 'c'); + + ReleaseSysCache(typeTuple); + + /* Get the number and bounds of array dimensions */ + info->ndims = ARR_NDIM(ar); + dims = ARR_DIMS(ar); + + deconstruct_array(ar, elementtype, typlen, typbyval, + typalign, &info->elements, &info->nulls, + &nitems); + + /* Get total number of elements in each dimension */ + info->nelems = palloc(sizeof(int) * info->ndims); + info->nelems[0] = nitems; + for (i = 1; i < info->ndims; i++) + info->nelems[i] = info->nelems[i-1]/dims[i-1]; + + return split_array(info, 0, nitems, 0); + } + + /* Recursively form array references from splices of the initial array */ + static SV * + split_array(plperl_array_info *info, int a, int z, int nest) + { + int i; + AV *result; + + elog(DEBUG3, "split_array [%d, %d), nest = %d", a, z, nest); + + /* + * Base case, return a reference to a single-dimensional array + */ + if (nest >= info->ndims - 1) + return make_array_ref(info, a, z); + + result = newAV(); + for (i = a; i < z; i+=info->nelems[nest + 1]) + { + /* + * Recursively form references to arrays of lower dimensions */ + SV *ref = split_array(info, i, i+info->nelems[nest+1], nest + 1); + av_push(result, ref); + } + return newRV_noinc((SV *)result); + } + + /* + * Create a Perl reference from a one-dimensional C array, converting + * composite type elements to hash references. + */ + static SV * + make_array_ref(plperl_array_info *info, int a, int z) + { + int i; + AV *result = newAV(); + + for (i = a; i < z; i++ ) + { + if (info->nulls[i]) + av_push(result, newSV(0)); + else + { + Datum itemvalue = info->elements[i]; + + /* Handle composite type elements */ + if (info->typiscomposite) + { + HeapTupleHeader td; + Oid tupType; + int32 tupTypmod; + TupleDesc tupdesc; + HeapTupleData tmptup; + SV *hashref; + + td = DatumGetHeapTupleHeader(itemvalue); + /* Extract rowtype info and find a tupdesc */ + tupType = HeapTupleHeaderGetTypeId(td); + tupTypmod = HeapTupleHeaderGetTypMod(td); + tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod); + /* Build a temporary HeapTuple control structure */ + tmptup.t_len = HeapTupleHeaderGetDatumLength(td); + tmptup.t_data = td; + + hashref = plperl_hash_from_tuple(&tmptup, tupdesc); + av_push(result, hashref); + ReleaseTupleDesc(tupdesc); + } + else + { + char *val = OutputFunctionCall(&info->proc, itemvalue); + av_push(result, newSVstring(val)); + } + } + } + elog(DEBUG3, "creating array reference from [%d, %d)", a, z); + return newRV_noinc((SV *) result); + } *** /dev/null --- b/src/pl/plperl/sql/plperl_array.sql *************** *** 0 **** --- 1,144 ---- + CREATE OR REPLACE FUNCTION plperl_sum_array(INTEGER[]) RETURNS INTEGER AS $$ + my $array_arg = shift; + my $result = 0; + my @arrays; + + if (ref $array_arg eq "ARRAY") { + push @arrays, @$array_arg; + + while (@arrays > 0) { + my $el = shift @arrays; + if (ref $el eq "ARRAY") { + push @arrays, @$el; + } else { + $result += $el; + } + } + } else { + $result = 0; + } + return $result; + $$ LANGUAGE plperl; + + set plperl.convert_array_arguments = true; + + select plperl_sum_array('{1,2,NULL}'); + select plperl_sum_array('{}'); + select plperl_sum_array('{{1,2,3}, {4,5,6}}'); + select plperl_sum_array('{{{1,2,3}, {4,5,6}}, {{7,8,9}, {10,11,12}}}'); + select plperl_sum_array('{{{1,2,3}, {4,5,6,7}}, {{7,8,9}, {10, 11, 12}}}'); + + CREATE OR REPLACE FUNCTION plperl_concat(TEXT[]) RETURNS TEXT AS $$ + my $array_arg = shift; + my $result = ""; + my @arrays; + + if (ref $array_arg eq "ARRAY") { + push @arrays, @$array_arg; + while (@arrays > 0) { + my $el = shift @arrays; + if (ref $el eq "ARRAY") { + push @arrays, @$el; + } else { + $result .= $el; + } + } + } + else { + $result = $array_arg; + } + return $result; + $$ LANGUAGE plperl; + + select plperl_concat('{"NULL","NULL","NULL''"}'); + select plperl_concat('{{NULL,NULL,NULL}}'); + select plperl_concat('{"hello"," ","world!"}'); + + -- OLD style -- + reset plperl.convert_array_arguments; + select plperl_concat('{"hello"," ","world!"}'); + + set plperl.convert_array_arguments = true; + + -- array of rows -- + CREATE TYPE foo AS (bar INTEGER, baz TEXT); + CREATE OR REPLACE FUNCTION plperl_array_of_rows(foo[]) RETURNS TEXT AS $$ + my $array_arg = shift; + my $result = ""; + + die "not an array reference" unless (ref $array_arg eq "ARRAY"); + foreach my $row_ref (@$array_arg) { + die "not a hash reference" unless (ref $row_ref eq "HASH"); + $result .= $row_ref->{bar}." items of ".$row_ref->{baz}.";"; + } + return $result; + $$ LANGUAGE plperl; + + select plperl_array_of_rows(ARRAY[ ROW(2, 'coffee'), ROW(0, 'sugar')]::foo[]); + + -- composite type containing arrays + CREATE TYPE rowfoo AS (bar INTEGER, baz INTEGER[]); + + CREATE OR REPLACE FUNCTION plperl_sum_row_elements(rowfoo) RETURNS TEXT AS $$ + my $row_ref = shift; + my $result; + + if (ref $row_ref ne 'HASH') { + $result = 0; + } + else { + $result = $row_ref->{bar}; + die "not an array reference".ref ($row_ref->{baz}) + unless (ref $row_ref->{baz} eq 'ARRAY'); + # process a single-dimensional array + foreach my $elem (@{$row_ref->{baz}}) { + $result += $elem unless ref $elem; + } + } + return $result; + $$ LANGUAGE plperl; + + select plperl_sum_row_elements(ROW(1, ARRAY[2,3,4,5,6,7,8,9,10])::rowfoo); + + -- composite type containing array of another composite type, which, in order, + -- contains an array of integers. + CREATE TYPE rowbar AS (foo rowfoo[]); + + CREATE OR REPLACE FUNCTION plperl_sum_array_of_rows(rowbar) RETURNS TEXT AS $$ + my $rowfoo_ref = shift; + my $result = 0; + + if (ref $rowfoo_ref eq 'HASH') { + my $row_array_ref = $rowfoo_ref->{foo}; + if (ref $row_array_ref eq 'ARRAY') { + foreach my $row_ref (@{$row_array_ref}) { + if (ref $row_ref eq 'HASH') { + $result += $row_ref->{bar}; + die "not an array reference".ref ($row_ref->{baz}) + unless (ref $row_ref->{baz} eq 'ARRAY'); + foreach my $elem (@{$row_ref->{baz}}) { + $result += $elem unless ref $elem; + } + } + else { + die "element baz is not a reference to a rowfoo"; + } + } + } else { + die "not a reference to an array of rowfoo elements" + } + } else { + die "not a reference to type rowbar"; + } + return $result; + $$ LANGUAGE plperl; + + select plperl_sum_array_of_rows(ROW(ARRAY[ROW(1, ARRAY[2,3,4,5,6,7,8,9,10])::rowfoo, + ROW(11, ARRAY[12,13,14,15,16,17,18,19,20])::rowfoo])::rowbar); + + -- check arrays as out parameters + CREATE OR REPLACE FUNCTION plperl_arrays_out(OUT INTEGER[]) AS $$ + return [[1,2,3],[4,5,6]]; + $$ LANGUAGE plperl; + + select plperl_arrays_out();