? part_diff.txt Index: src/backend/catalog/Makefile =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/catalog/Makefile,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.10.1 diff -c -r1.1.1.2 -r1.1.1.2.10.1 *** src/backend/catalog/Makefile 1 Dec 2008 09:37:46 -0000 1.1.1.2 --- src/backend/catalog/Makefile 10 Feb 2009 13:18:07 -0000 1.1.1.2.10.1 *************** *** 27,33 **** POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_proc.h pg_type.h pg_attribute.h pg_class.h pg_autovacuum.h \ ! pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \ pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \ pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \ --- 27,33 ---- POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_proc.h pg_type.h pg_attribute.h pg_class.h pg_autovacuum.h \ ! pg_attrdef.h pg_constraint.h pg_inherits.h pg_partition.h pg_index.h pg_operator.h \ pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \ pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \ Index: src/backend/catalog/pg_operator.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/catalog/pg_operator.c,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.1 diff -c -r1.1.1.1 -r1.1.1.1.10.1 *** src/backend/catalog/pg_operator.c 1 Dec 2008 09:36:26 -0000 1.1.1.1 --- src/backend/catalog/pg_operator.c 19 Feb 2009 11:51:17 -0000 1.1.1.1.10.1 *************** *** 35,41 **** #include "utils/syscache.h" ! static Oid OperatorGet(const char *operatorName, Oid operatorNamespace, Oid leftObjectId, Oid rightObjectId, --- 35,41 ---- #include "utils/syscache.h" ! Oid OperatorGet(const char *operatorName, Oid operatorNamespace, Oid leftObjectId, Oid rightObjectId, *************** *** 127,133 **** * * *defined is set TRUE if defined (not a shell) */ ! static Oid OperatorGet(const char *operatorName, Oid operatorNamespace, Oid leftObjectId, --- 127,133 ---- * * *defined is set TRUE if defined (not a shell) */ ! Oid OperatorGet(const char *operatorName, Oid operatorNamespace, Oid leftObjectId, Index: src/backend/commands/tablecmds.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/commands/tablecmds.c,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.8.22 diff -c -r1.1.1.2 -r1.1.1.2.8.22 *** src/backend/commands/tablecmds.c 1 Dec 2008 09:37:46 -0000 1.1.1.2 --- src/backend/commands/tablecmds.c 19 Mar 2009 12:34:56 -0000 1.1.1.2.8.22 *************** *** 49,54 **** --- 49,55 ---- #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" + #include "nodes/pg_list.h" #include "optimizer/clauses.h" #include "optimizer/plancat.h" #include "optimizer/prep.h" *************** *** 76,82 **** --- 77,96 ---- #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/tqual.h" + #include "tcop/tcopprot.h" + #include "utils/numeric.h" + /* For partitions */ + #include "catalog/pg_partition.h" + #include "catalog/pg_operator.h" + #include "postgres.h" + #include "executor/spi.h" + #include "commands/trigger.h" + #include "utils/rel.h" + + extern Datum trigf(PG_FUNCTION_ARGS); + + PG_FUNCTION_INFO_V1(trigf); /* * ON COMMIT action list *************** *** 97,102 **** --- 111,123 ---- SubTransactionId deleting_subid; } OnCommitItem; + + typedef struct PartitionRowInfo + { + ItemPointerData tid; + Oid partRelOid; + }PartitionRowInfo; + static List *on_commits = NIL; *************** *** 114,130 **** * a pass determined by subcommand type. */ ! #define AT_PASS_DROP 0 /* DROP (all flavors) */ ! #define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */ ! #define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */ ! #define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */ ! #define AT_PASS_COL_ATTRS 4 /* set other column attributes */ /* We could support a RENAME COLUMN pass here, but not currently used */ ! #define AT_PASS_ADD_COL 5 /* ADD COLUMN */ ! #define AT_PASS_ADD_INDEX 6 /* ADD indexes */ ! #define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */ ! #define AT_PASS_MISC 8 /* other stuff */ ! #define AT_NUM_PASSES 9 typedef struct AlteredTableInfo { --- 135,156 ---- * a pass determined by subcommand type. */ ! #define AT_PASS_DROP 0 /* DROP (all flavors) */ ! #define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */ ! #define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */ ! #define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */ ! #define AT_PASS_COL_ATTRS 4 /* set other column attributes */ /* We could support a RENAME COLUMN pass here, but not currently used */ ! #define AT_PASS_ADD_COL 5 /* ADD COLUMN */ ! #define AT_PASS_ADD_INDEX 6 /* ADD indexes */ ! #define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */ ! #define AT_PASS_ADD_PARTITION 8 /* ADD partition to an existing table. */ ! #define AT_PASS_DROP_PARTITION_NAME 9 /* DROP partition(By name) of an existing table */ ! #define AT_PASS_DROP_PARTITION_RANGE 10 /* DROP partition(By range) of an existing table */ ! #define AT_PASS_UPDATE_PARTITION 11 /* UPDATE range partition. */ ! #define AT_PASS_SPLIT_PARTITION 12 /* SPLIT range partition. */ ! #define AT_PASS_MISC 13 /* other stuff */ ! #define AT_NUM_PASSES 14 typedef struct AlteredTableInfo { *************** *** 322,329 **** static void ATExecDropInherit(Relation rel, RangeVar *parent); static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, ForkNumber forkNum, bool istemp); ! ! /* ---------------------------------------------------------------- * DefineRelation * Creates a new relation. --- 348,371 ---- static void ATExecDropInherit(Relation rel, RangeVar *parent); static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, ForkNumber forkNum, bool istemp); ! static void CreatePartitionTrigger(CreateStmt *stmt, Oid parentRelOid, Oid *partitionRelOids); ! static void MutateColumnRefs(Node *node, char *reference); ! Oid OperatorGet(const char *operatorName, ! Oid operatorNamespace, ! Oid leftObjectId, ! Oid rightObjectId, ! bool *defined); ! static Oid ATExecAddPartition(AlteredTableInfo *tab, Relation rel, ! Partition *part); /* Adding partition to an existing table. */ ! static void ATExecDropPartitionByName(AlteredTableInfo *tab, Relation rel, ! Partition *part); /* Deleting partition of an existing table by name. */ ! static void ATExecDropPartitionByRange(AlteredTableInfo *tab, Relation rel, ! Partition *part); /* Deleting partition of an existing table by Range. */ ! static void ATExecUpdatePartition(AlteredTableInfo *tab, Relation rel, ! UpdatePartitionStmt *part); /* Deleting partition of an existing table by Range. */ ! static void ATExecSplitPartition(AlteredTableInfo *tab, Relation rel, ! SplitPartitionStmt *part); /* Splitting partition of an existing table. */ ! Oid get_relevent_partition(HeapTuple tuple, Relation rel); /* ---------------------------------------------------------------- * DefineRelation * Creates a new relation. *************** *** 2170,2175 **** --- 2212,2297 ---- stmt, RelationGetRelationName(rel)))); } + + /* + * GetPartitionsCountAtt : Check if given table is a partitioned table and there + * exists atleast one partition(s) for specified partition key column. + */ + int GetPartitionsCount(Oid parentOid) + { + ScanKeyData skey; + Relation pg_partrel; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + int partitions; + + partitions = 0; + + /*Scan key to scan pg_partition table on parentrelid*/ + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(parentOid)); + + pg_partrel = heap_open(PartitionRelationId, AccessShareLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->keyorder == 1) + partitions++; + } + + systable_endscan(pg_partscan); + heap_close(pg_partrel, AccessShareLock); + + return partitions; + } + + /* + * GetPartitionsCountAtt : Check if given table is a partitioned table and there + * exists atleast one partition(s) for specified partition key column. + */ + int GetPartitionsCountAtt(Oid parentOid, Oid attNum) + { + ScanKeyData skey; + Relation pg_partrel; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + int partitions; + + partitions = 0; + + /*Scan key to scan pg_partition table on parentrelid*/ + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(parentOid)); + + pg_partrel = heap_open(PartitionRelationId, AccessShareLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->partkey == attNum) + partitions++; + } + + systable_endscan(pg_partscan); + heap_close(pg_partrel, AccessShareLock); + + return partitions; + } + + /* * AlterTable * Execute ALTER TABLE, which can be a list of subcommands *************** *** 2328,2333 **** --- 2450,2459 ---- */ switch (cmd->subtype) { + case AT_AddPartition: /* Add partition to an existing table. */ + ATSimplePermissions(rel, false); + pass = AT_PASS_ADD_PARTITION; + break; case AT_AddColumn: /* ADD COLUMN */ ATSimplePermissions(rel, false); /* Performs own recursion */ *************** *** 2347,2352 **** --- 2473,2494 ---- /* No command-specific prep needed */ pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP; break; + case AT_DropPartitionByName: /* Drop partition by Name.*/ + ATSimplePermissions(rel, false); + pass = AT_PASS_DROP_PARTITION_NAME; + break; + case AT_DropPartitionByRange: /* Drop partition by Range*/ + ATSimplePermissions(rel, false); + pass = AT_PASS_DROP_PARTITION_RANGE; + break; + case AT_UpdatePartition: /* */ + ATSimplePermissions(rel, false); + pass = AT_PASS_UPDATE_PARTITION; + break; + case AT_SplitPartition: /* */ + ATSimplePermissions(rel, false); + pass = AT_PASS_SPLIT_PARTITION; + break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ ATSimplePermissions(rel, false); ATSimpleRecursion(wqueue, rel, cmd, recurse); *************** *** 2554,2565 **** --- 2696,2722 ---- { switch (cmd->subtype) { + case AT_AddPartition: /* Add partition to an existing table. */ + ATExecAddPartition(tab, rel, (Partition *)cmd->def); + break; case AT_AddColumn: /* ADD COLUMN */ ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def); break; case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ ATExecColumnDefault(rel, cmd->name, cmd->def); break; + case AT_DropPartitionByName: /* DROP partition(by name) of an existing table.*/ + ATExecDropPartitionByName(tab, rel, (Partition *)cmd->def); + break; + case AT_DropPartitionByRange: /* DROP partition(by range) of an existing table.*/ + ATExecDropPartitionByRange(tab, rel, (Partition *)cmd->def); + break; + case AT_UpdatePartition: /* Update Range partition.*/ + ATExecUpdatePartition(tab, rel, (UpdatePartitionStmt *)cmd->def); + break; + case AT_SplitPartition: /* Update Range partition.*/ + ATExecSplitPartition(tab, rel, (SplitPartitionStmt *)cmd->def); + break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ ATExecDropNotNull(rel, cmd->name); break; *************** *** 3443,3448 **** --- 3600,4910 ---- } } + /* + * IsValidRange : Check if the given range has values of valid types and + * min value is less than max value. + */ + static bool + IsValidRange(ListCell *minIndex, ListCell *maxIndex, Oid parentRelOid, char *partitionColumn, char *tableName) + { + + Oid typ_oid; + int16 len; + bool typbyval; + Node *expr; + StringInfo cbuf = makeStringInfo(); + Datum min_datum; + Datum max_datum; + Oid keycmp_oid; + bool isdef; + Oid keycmp_proc; + bool lt_max; + + /* Get the attribute type. */ + typ_oid = get_atttype(parentRelOid, get_attnum(parentRelOid, partitionColumn)); + + /* Get the attribute length*/ + get_typlenbyval(typ_oid, &len, &typbyval); + + if(minIndex == NULL || maxIndex == NULL) + elog(ERROR, "Please specify the values for '%s' attribute of '%s' partition.", partitionColumn, tableName); + + /* Transform the expression from "A_Expr" type to "OpExpr" type. This is needed + as expression evaluation does not happen at parser level. */ + expr = transformExpr(NULL, (Node *)lfirst(minIndex)); + + /* Solve the expression. */ + lfirst(minIndex) = eval_const_expressions(NULL, expr); + + /* Convert the value to column's datatype. */ + lfirst(minIndex) = coerce_to_specific_type(NULL, lfirst(minIndex), typ_oid, partitionColumn); + + resetStringInfo(cbuf); + + /* If it's a FuncExpr then execute the conversion function. */ + if IsA((Node *)lfirst(minIndex), FuncExpr) + { + min_datum = OidFunctionCall1(((FuncExpr *)lfirst(minIndex))->funcid, + ((Const *)lfirst(list_head(((FuncExpr *)lfirst(minIndex))->args)))->constvalue); + /* Store the datums back. */ + ((Const *)lfirst(minIndex))->constvalue = min_datum; + } + else + { + min_datum = ((Const *)lfirst(minIndex))->constvalue; + } + + /* Transform the expression from "A_Expr" type to "OpExpr" type. This is needed + as expression evaluation does not happen at parser level. */ + expr = transformExpr(NULL, (Node *)lfirst(maxIndex)); + + /* Solve the expression. */ + lfirst(maxIndex) = eval_const_expressions(NULL, expr); + + /* Convert the value to column's datatype. */ + lfirst(maxIndex) = coerce_to_specific_type(NULL, lfirst(maxIndex), typ_oid, partitionColumn); + + resetStringInfo(cbuf); + + /* If it's a FuncExpr then execute the conversion function. */ + if IsA((Node *)lfirst(maxIndex), FuncExpr) + { + max_datum = OidFunctionCall1(((FuncExpr *)lfirst(maxIndex))->funcid, + ((Const *)lfirst(list_head(((FuncExpr *)lfirst(maxIndex))->args)))->constvalue); + /* Store the datums back. */ + ((Const *)lfirst(maxIndex))->constvalue = max_datum; + } + else + { + max_datum = ((Const *)lfirst(maxIndex))->constvalue; + } + + keycmp_oid = OperatorGet("<", PG_CATALOG_NAMESPACE , typ_oid, typ_oid, &isdef); + + if(isdef == FALSE) + elog(ERROR, "'<' operator is not defined for this type."); + + keycmp_proc = get_opcode(keycmp_oid); + lt_max = DatumGetBool(OidFunctionCall2(keycmp_proc, min_datum, max_datum)); + + if(lt_max == 0) + { + if(tableName) + { + elog(ERROR, "First value should be less then second value for '%s' attribute of partition '%s'.", + partitionColumn, tableName); + } + else + { + elog(ERROR, "First value should be less then second value for '%s' attribute.", + partitionColumn); + } + } + + return lt_max; + } + + static bool + WriteRangePartitionToCatalog(Datum min_datum, + Datum max_datum, + int16 len, + bool typbyval, + Oid parentRelOid, + Oid partRelOid, + int attNum, + Oid typ_oid, + int keyOrder) + { + + bytea *min_ba, *max_ba; + Datum values[Natts_pg_partition]; + bool nulls [Natts_pg_partition]; + int i; + + /* initialize nulls and values */ + for (i = 0; i < Natts_pg_partition; i++) + { + nulls[i] = false; + values[i] = (Datum) NULL; + } + + /* There are 3 cases here + * 1. types with length = -1 + * 2. types with fixed length but passed by value (len < 4) + * 3. types with fixed length but *not* passed by value (len > 4) + */ + if(len > 0) + { + min_ba = (bytea *) palloc(len+1+VARHDRSZ); + max_ba = (bytea *) palloc(len+1+VARHDRSZ); + + if(typbyval) + { + memcpy(VARDATA(min_ba), &min_datum, len); + memcpy(VARDATA(max_ba), &max_datum, len); + } + else + { + memcpy(VARDATA(min_ba), (char *)min_datum, len); + memcpy(VARDATA(max_ba), (char *)max_datum, len); + } + + SET_VARSIZE(min_ba, len+VARHDRSZ); + VARDATA(min_ba)[len] = '\0'; + values[Anum_pg_partition_minval -1]= (Datum)min_ba ; + + SET_VARSIZE(max_ba, len+VARHDRSZ); + VARDATA(max_ba)[len] = '\0'; + values[Anum_pg_partition_maxval -1]=(Datum)max_ba; + } + else + { + values[Anum_pg_partition_minval -1]=min_datum; + values[Anum_pg_partition_maxval -1]=max_datum; + } + + values[Anum_pg_partition_parentrelid -1]= ObjectIdGetDatum(parentRelOid); + values[Anum_pg_partition_partrelid -1] = ObjectIdGetDatum(partRelOid); + values[Anum_pg_partition_parttype -1] = Int8GetDatum(PART_RANGE); + values[Anum_pg_partition_partkey -1] = ObjectIdGetDatum(attNum); + values[Anum_pg_partition_listval -1] = Int8GetDatum(NULL); + nulls[Anum_pg_partition_listval -1] = true; + values[Anum_pg_partition_hashval -1] = Int8GetDatum(NULL); + nulls[Anum_pg_partition_hashval -1] = true; + values[Anum_pg_partition_keytype -1] = ObjectIdGetDatum(typ_oid); + values[Anum_pg_partition_keyorder -1] = Int8GetDatum(keyOrder); + + Relation r = heap_open(PartitionRelationId, RowExclusiveLock); + TupleDesc tupDesc = r->rd_att; + HeapTuple tup = heap_form_tuple(tupDesc, values, nulls); + simple_heap_insert(r, tup); + CatalogUpdateIndexes(r, tup); + heap_close(r, RowExclusiveLock); + + return true; + } + + static char* + DisplayDatum(Datum datum, Oid type) + { + Oid typeOut; + bool isVariableLength; + + getTypeOutputInfo(type, &typeOut, &isVariableLength); + return (DatumGetCString(OidFunctionCall1(typeOut, datum))); + } + + static void + AddCheckConstraint(Oid typ_oid, Datum min_datum, Datum max_datum, char *tableName, char *partitionColumn) + { + char *expr_string; + StringInfo func_constraint = makeStringInfo(); + Oid typoutput; + bool isVariableLength; + List *query_list; + Node *parsetree; + + + resetStringInfo(func_constraint); + + getTypeOutputInfo(typ_oid, &typoutput, &isVariableLength); + expr_string = DatumGetCString(OidFunctionCall1(typoutput, min_datum)); + + /* Form the check constraint for child table. */ + appendStringInfo(func_constraint, "ALTER TABLE %s ADD CHECK ( %s >= '%s' ", + tableName, + partitionColumn, + expr_string); + + expr_string = DatumGetCString(OidFunctionCall1(typoutput, max_datum)); + appendStringInfo(func_constraint, " AND %s < '%s' ); ", + partitionColumn, + expr_string); + + /* Parse the above SQL string and use this parsetree to create check constraint on child tables. */ + query_list = pg_parse_query(func_constraint->data); + parsetree = (Node *)lfirst(list_head(query_list)); + AlterTable(( AlterTableStmt * )parsetree); + + } + + static Datum + GetDatum(Relation rel, Oid type, HeapTuple tuple, int column) + { + Datum datum; + int16 len; + bool typeByVal; + bool isNull; + + /* Get len and typbyval from pg_type */ + get_typlenbyval(type, &len, &typeByVal); + + /* Get min attribute from catalog */ + datum = heap_getattr (tuple, column, rel->rd_att, &isNull); + + /* Three cases + * a. datatypes with typbyval true and fix length. + * b. datatypes with byval false and fix length. + * c. extendible datatypes where length -1. + */ + if (typeByVal) + { + memcpy(&datum, VARDATA_ANY(datum), len); + } + else if (len != -1) + { + datum = (Datum)VARDATA_ANY(datum); + } + + return datum; + } + + static Datum + EvalExpr(ListCell *node, Oid type, char *colName) + { + Node *expr; + Datum datum; + + /* Transform the expression from "A_Expr" type to "OpExpr" type. This is needed + as expression evaluation does not happen at parser level. */ + expr = transformExpr(NULL, (Node *)lfirst(node)); + + /* Solve the expression. */ + lfirst(node) = eval_const_expressions(NULL, expr); + + /* Convert the value to column's datatype. */ + lfirst(node) = coerce_to_specific_type(NULL, lfirst(node), type, colName); + + /* If it's a FuncExpr then execute the conversion function. */ + if IsA((Node *)lfirst(node), FuncExpr) + { + datum = OidFunctionCall1(((FuncExpr *)lfirst(node))->funcid, + ((Const *)lfirst(list_head(((FuncExpr *)lfirst(node))->args)))->constvalue); + /* Store the datums back. */ + ((Const *)lfirst(node))->constvalue = datum; + } + else + { + datum = ((Const *)lfirst(node))->constvalue; + } + + return datum; + } + + static bool + OperateDatums(char *operatorName, Oid type, Datum datum1, Datum datum2) + { + bool isDef; + bool result; + Oid operator; + Oid opcode; + + operator = OperatorGet(operatorName, PG_CATALOG_NAMESPACE , type, type, &isDef); + + if(isDef == FALSE) + elog(ERROR, "'%s' operator is not defined for this type.", operatorName); + + opcode = get_opcode(operator); + result = DatumGetBool(OidFunctionCall2(opcode, datum1, datum2)); + + return result; + } + + static void + ATExecSplitPartition(AlteredTableInfo *tab, Relation rel, + SplitPartitionStmt *part) + { + Oid relId; + ScanKeyData skey; + Relation pg_partrel; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + Datum minDatum; + Datum maxDatum; + ListCell *node; + char *columnName; + Partition *partition2 = NULL; + Partition *partition1 = NULL; + Const *cnst; + int16 len; + bool typeByVal; + StringInfo query = makeStringInfo(); + List *queryList; + Node *parseTree; + Oid backup; + + /* Get the relId from the tablename. */ + if(list_head(part->partNames)) + { + relId = RelnameGetRelid(((RangeVar *)linitial(part->partNames))->relname); + if (relId == InvalidOid) + elog(ERROR, "Error : Could not find valid Rel-Id for '%s' in current search path.", ((RangeVar *)linitial(part->partNames))->relname); + } + else + { + elog(ERROR, "Please specify valid partition to be updated."); + } + + ScanKeyInit(&skey, + Anum_pg_partition_partrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(relId)); + + pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->parttype == PART_RANGE) + { + /* Fetch the min and max value for given attribute of partition. */ + minDatum = GetDatum(pg_partrel, pg_part->keytype, pg_parttup, Anum_pg_partition_minval); + maxDatum = GetDatum(pg_partrel, pg_part->keytype, pg_parttup, Anum_pg_partition_maxval); + + if(pg_part->keyorder == 1) + { + partition1 = makeNode(Partition); + partition1->partName = makeNode(RangeVar); + partition1->partitionCheck = makeNode(Constraint); + + partition2 = makeNode(Partition); + partition2->partName = makeNode(RangeVar); + partition2->partitionCheck = makeNode(Constraint); + + partition1->partName = (RangeVar *)lsecond(part->partNames); + partition1->partitionCheck->max_value = part->splitValues; + + partition2->partName = (RangeVar *)lthird(part->partNames); + partition2->partitionCheck->min_value = part->splitValues; + + node = list_head(part->splitValues); + } + + /* Get the column name. */ + columnName = get_attname (relId, pg_part->partkey); + + EvalExpr(node, pg_part->keytype, columnName); + DisplayDatum(((Const *)lfirst(node))->constvalue, pg_part->keytype); + + if((OperateDatums(">", pg_part->keytype, ((Const *)lfirst(node))->constvalue, minDatum) == false) || + (OperateDatums("<", pg_part->keytype, ((Const *)lfirst(node))->constvalue, maxDatum) == false)) + { + elog(ERROR, "Split-point is not valid. Value for '%s' attribute should be in-between and excluding '%s' and '%s'.", columnName, + DisplayDatum(minDatum, pg_part->keytype), DisplayDatum(maxDatum, pg_part->keytype)); + } + + /* Get len and typbyval from pg_type */ + get_typlenbyval(pg_part->keytype, &len, &typeByVal); + + /* Form the const node out of datums. */ + cnst = makeConst(pg_part->keytype, + -1, + len, + maxDatum, + false, + typeByVal); + + cnst->location = -1; + + partition2->partitionCheck->max_value = lappend(partition2->partitionCheck->max_value, cnst); + + /* Form the const node out of datums. */ + cnst = makeConst(pg_part->keytype, + -1, + len, + minDatum, + false, + typeByVal); + + cnst->location = -1; + + partition1->partitionCheck->min_value = lappend(partition1->partitionCheck->min_value, cnst); + + node = lnext(node); + + /* Delete the entry for current table.*/ + simple_heap_delete(pg_partrel, &pg_parttup->t_self); + CatalogUpdateIndexes(pg_partrel, pg_parttup); + } + } + + systable_endscan(pg_partscan); + heap_close(pg_partrel, RowExclusiveLock); + + ATExecAddPartition(tab, rel, partition1); + backup = ATExecAddPartition(tab, rel, partition2); + + if((partition1 != NULL) && (partition2 != NULL)) + { + Snapshot snap; + Relation tmpRel; + HeapScanDesc scanDesc; + HeapTuple tuple; + Oid relation_id; + Relation child_table_relation; + + snap = GetActiveSnapshot(); + tmpRel = heap_open(relId, RowExclusiveLock); + scanDesc = heap_beginscan(tmpRel, snap, 0, NULL); + + while (HeapTupleIsValid(tuple = heap_getnext(scanDesc, ForwardScanDirection))) + { + + relation_id = get_relevent_partition(tuple, rel); + + if (relation_id == InvalidOid) + relation_id = backup; + + child_table_relation = RelationIdGetRelation(relation_id); + + if (child_table_relation != NULL) + { + + ResultRelInfo *resultRelInfo; + TupleTableSlot *slot; + EState *estate= CreateExecutorState(); + + resultRelInfo = makeNode(ResultRelInfo); + resultRelInfo->ri_RangeTableIndex = 1; + resultRelInfo->ri_RelationDesc = child_table_relation; + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + /* Set up a tuple slot too */ + slot = MakeSingleTupleTableSlot(rel->rd_att); + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + + ExecConstraints(resultRelInfo, slot, estate); + + heap_insert(child_table_relation, tuple, GetCurrentCommandId(true), 0, NULL); + RelationClose(child_table_relation); + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState (estate); + } + else + { + elog(ERROR, "Could not migrate row."); + } + + } + + heap_endscan(scanDesc); + heap_close(tmpRel, RowExclusiveLock); + + /* clear the buffer. */ + resetStringInfo(query); + + appendStringInfo(query, "DROP TABLE %s CASCADE;", ((RangeVar *)linitial(part->partNames))->relname); + queryList = pg_parse_query(query->data); + parseTree = (Node *)lfirst(list_head(queryList)); + RemoveRelations((DropStmt *)parseTree); + } + } + + static void + ATExecUpdatePartition(AlteredTableInfo *tab, Relation rel, + UpdatePartitionStmt *part) + { + Oid relId; + ScanKeyData skey; + Relation pg_partrel; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + ListCell *minIndexPrev, *minIndexAfter; + ListCell *maxIndexPrev, *maxIndexAfter; + char *partitionColumn; + Oid keycmp_oid; + bool isdef; + Oid keycmp_proc; + bool gte_max, lte_min; + List *distinct_part_rel_oid_list = NULL; + List *distinct_part_key_list = NULL; + Oid *childOids; + int j = 0; + Partition *tmp; + PartitionAttrs *partition_attr; + int16 len; + bool typbyval; + Datum part_attr; + bool isnull; + Datum short_datum; + Const *tmp_const; + int keyorder; + Oid typ_oid; + StringInfo func_constraint = makeStringInfo(); + List *query_list; + Node *parsetree; + + /* Get the relId from the tablename. */ + if(part->prev->partName->relname) + { + relId = RelnameGetRelid(part->prev->partName->relname); + if (relId == InvalidOid) + elog(ERROR, "Error : Could not find valid Rel-Id for '%s' in current search path.", part->prev->partName->relname); + part->after->partName = part->prev->partName; + } + else + { + elog(ERROR, "Please specify valid partition to be updated."); + } + + childOids = malloc(sizeof(Oid)*GetPartitionsCount(RelationGetRelid(rel))); + + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + /* Create partition attribute node. */ + partition_attr = makeNode(PartitionAttrs); + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->parttype == PART_RANGE) + { + + if (!list_member_int(distinct_part_key_list, pg_part->partkey)) + { + Alias *colName; + + distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey); + + /* Get the name of partition key column */ + partitionColumn = get_attname (rel->rd_id, pg_part->partkey); + + /* Add the partition key columns. */ + colName = makeAlias(partitionColumn, NULL); + partition_attr->colName = lappend(partition_attr->colName, colName); + } + + if(relId == pg_part->partrelid) + { + if(pg_part->keyorder == 1) + { + keyorder = 0; + + /* Set the ptr to first value. */ + minIndexPrev = part->prev->partitionCheck->min_value->head; + maxIndexPrev = part->prev->partitionCheck->max_value->head; + minIndexAfter = part->after->partitionCheck->min_value->head; + maxIndexAfter = part->after->partitionCheck->max_value->head; + } + + /* Get the name of partition key column */ + partitionColumn = get_attname (rel->rd_id, pg_part->partkey); + + /* Get len and typbyval from pg_type */ + get_typlenbyval(pg_part->keytype, &len, &typbyval); + + typ_oid = pg_part->keytype; + + /* Is this a valid range? */ + IsValidRange(minIndexPrev, maxIndexPrev, RelationGetRelid(rel), partitionColumn, part->prev->partName->relname); + IsValidRange(minIndexAfter, maxIndexAfter, RelationGetRelid(rel), partitionColumn, part->prev->partName->relname); + + resetStringInfo(func_constraint); + + appendStringInfo(func_constraint, "ALTER TABLE %s DROP CONSTRAINT %s_%s_check; ", + part->prev->partName->relname, + part->prev->partName->relname, + partitionColumn); + + /* Parse the above SQL string and use this parsetree to create check constraint on child tables. */ + query_list = pg_parse_query(func_constraint->data); + parsetree = (Node *)lfirst(list_head(query_list)); + AlterTable(( AlterTableStmt * )parsetree); + + AddCheckConstraint(typ_oid, + ((Const *)lfirst(minIndexAfter))->constvalue, + ((Const *)lfirst(maxIndexAfter))->constvalue, + part->prev->partName->relname, + partitionColumn); + + keycmp_oid = OperatorGet(">=", PG_CATALOG_NAMESPACE , pg_part->keytype, pg_part->keytype, &isdef); + + if(isdef == FALSE) + elog(ERROR, "'>=' operator is not defined for this type."); + + keycmp_proc = get_opcode(keycmp_oid); + gte_max = DatumGetBool(OidFunctionCall2(keycmp_proc, ((Const *)lfirst(maxIndexAfter))->constvalue, ((Const *)lfirst(maxIndexPrev))->constvalue)); + + if(gte_max == false) + elog(ERROR, "Range of partition can only be extended."); + + keycmp_oid = OperatorGet("<=", PG_CATALOG_NAMESPACE , pg_part->keytype, pg_part->keytype, &isdef); + + if(isdef == FALSE) + elog(ERROR, "'<=' operator is not defined for this type."); + + keycmp_proc = get_opcode(keycmp_oid); + lte_min = DatumGetBool(OidFunctionCall2(keycmp_proc, ((Const *)lfirst(minIndexAfter))->constvalue, ((Const *)lfirst(minIndexPrev))->constvalue)); + + if(lte_min == false) + elog(ERROR, "Range of partition can only be extended."); + + /* Delete the row. */ + simple_heap_delete(pg_partrel, &pg_parttup->t_self); + + /* Insert the new updated row. */ + + WriteRangePartitionToCatalog(((Const *)lfirst(minIndexAfter))->constvalue, + ((Const *)lfirst(maxIndexAfter))->constvalue, + len, + typbyval, + RelationGetRelid(rel), + relId, + get_attnum(RelationGetRelid(rel), partitionColumn), + typ_oid, + ++keyorder + ); + + minIndexPrev = minIndexPrev->next; + maxIndexPrev = maxIndexPrev->next; + minIndexAfter = minIndexAfter->next; + maxIndexAfter = maxIndexAfter->next; + } + else + { + if (!list_member_oid(distinct_part_rel_oid_list, pg_part->partrelid)) + { + distinct_part_rel_oid_list = lappend_oid(distinct_part_rel_oid_list, pg_part->partrelid); + childOids[j++] = pg_part->partrelid; + + tmp = makeNode(Partition); + tmp->partName = makeNode(RangeVar); + tmp->partitionCheck = makeNode(Constraint); + + /* Add this partition to partition attributes. */ + partition_attr->partitions = lappend(partition_attr->partitions, tmp); + } + + Relation tmp_rel = RelationIdGetRelation(pg_part->partrelid); + tmp->partName->relname = RelationGetRelationName(tmp_rel); + relation_close(tmp_rel, 0); + + /* Get the name of partition key column */ + partitionColumn = get_attname (rel->rd_id, pg_part->partkey); + + /* Get len and typbyval from pg_type */ + get_typlenbyval(pg_part->keytype, &len, &typbyval); + + /* Get min attribute from catalog */ + part_attr = heap_getattr (pg_parttup, Anum_pg_partition_minval, pg_partrel->rd_att, &isnull); + + /* Three cases a. datatypes with typbyval true and fix length. + * b. datatypes with byval false and fix length. + * c. extendible datatypes where length -1. + */ + if ( typbyval ) + { + short_datum = 0; + memcpy(&short_datum, VARDATA_ANY(part_attr), len); + part_attr = short_datum; + } + else if (len != -1) + { + part_attr = (Datum)VARDATA_ANY(part_attr); + } + + /* Create const. */ + tmp_const = makeConst(pg_part->keytype, /* Const type */ + -1, + len, + part_attr, + false, + typbyval); + + tmp_const->location = -1; + + /* Add this to new partition. */ + tmp->partitionCheck->min_value = lappend(tmp->partitionCheck->min_value, tmp_const); + + /* Get min attribute from catalog */ + part_attr = heap_getattr (pg_parttup, Anum_pg_partition_maxval, pg_partrel->rd_att, &isnull); + + /* Three cases a. datatypes with typbyval true and fix length. + * b. datatypes with byval false and fix length. + * c. extendible datatypes where length -1. + */ + if ( typbyval ) + { + short_datum = 0; + memcpy(&short_datum, VARDATA_ANY(part_attr), len); + part_attr = short_datum; + } + else if (len != -1) + { + part_attr = (Datum)VARDATA_ANY(part_attr); + } + + /* Create const. */ + tmp_const = makeConst(pg_part->keytype, /* Const type */ + -1, + len, + part_attr, + false, + typbyval); + + tmp_const->location = -1; + + /* Add this to new partition. */ + tmp->partitionCheck->max_value = lappend(tmp->partitionCheck->max_value, tmp_const); + } + } + } + + childOids[j++] = relId; + partition_attr->partitions = lappend(partition_attr->partitions, part->after); + + ValidateRanges(partition_attr, RelationGetRelid(rel), childOids); + + systable_endscan(pg_partscan); + heap_close(pg_partrel, RowExclusiveLock); + } + + + static void + ATExecDropPartitionByRange(AlteredTableInfo *tab, Relation rel, + Partition *part) + { + List *rows_to_be_deleted_list = NULL; + ListCell *row_to_be_deleted; + List *distinct_part_key_list= NULL; + ScanKeyData skey; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + Relation pg_partrel; + StringInfo query = makeStringInfo(); + Node *parsetree; + List *query_list = NULL; + char *partitionColumn; + int16 len; + ListCell *minIndex; + ListCell *maxIndex; + bool typbyval; + bool isnull; + Datum min_datum; + Datum max_datum; + Datum short_datum; + Oid keycmp_oid; + bool isdef; + Oid keycmp_proc; + bool lte_min; + bool gte_max; + bool IsRelDeleted = false; + PartitionRowInfo *partRowInfo; + + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->parttype == PART_RANGE) + { + if (!list_member_int(distinct_part_key_list, pg_part->partkey)) + { + distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey); + } + if(pg_part->keyorder == 1) + { + IsRelDeleted = false; + + /* Set the ptr to first value. */ + minIndex = part->partitionCheck->min_value->head; + maxIndex = part->partitionCheck->max_value->head; + + if(list_length(distinct_part_key_list)== list_length(rows_to_be_deleted_list)) + { + foreach(row_to_be_deleted, rows_to_be_deleted_list) + { + PartitionRowInfo *tmp = (PartitionRowInfo *)lfirst(row_to_be_deleted); + simple_heap_delete(pg_partrel, &tmp->tid); + + if(IsRelDeleted == false) + { + IsRelDeleted = true; + resetStringInfo(query); + Relation Rel = RelationIdGetRelation(tmp->partRelOid); + appendStringInfo(query, "DROP TABLE %s CASCADE;", RelationGetRelationName(Rel)); + relation_close(Rel, 0); + query_list = pg_parse_query(query->data); + parsetree = (Node *)lfirst(list_head(query_list)); + RemoveRelations((DropStmt *)parsetree); + } + + pfree(tmp); + } + } + + list_free(rows_to_be_deleted_list); + rows_to_be_deleted_list = NULL; + } + + /* Get the name of partition key column */ + partitionColumn = get_attname (rel->rd_id, pg_part->partkey); + + /* Get len and typbyval from pg_type */ + get_typlenbyval(pg_part->keytype, &len, &typbyval); + + /* Get min attribute from catalog */ + min_datum = heap_getattr (pg_parttup, Anum_pg_partition_minval, pg_partrel->rd_att, &isnull); + + /* Three cases a. datatypes with typbyval true and fix length. + * b. datatypes with byval false and fix length. + * c. extendible datatypes where length -1. + */ + if ( typbyval ) + { + short_datum = 0; + memcpy(&short_datum, VARDATA_ANY(min_datum), len); + min_datum = short_datum; + } + else if (len != -1) + { + min_datum = (Datum)VARDATA_ANY(min_datum); + } + + /* Get max attribute from catalog */ + max_datum = heap_getattr (pg_parttup, Anum_pg_partition_maxval, pg_partrel->rd_att, &isnull); + + /* Three cases a. datatypes with typbyval true and fix length. + * b. datatypes with byval false and fix length. + * c. extendible datatypes where length -1. + */ + if ( typbyval ) + { + short_datum = 0; + memcpy(&short_datum, VARDATA_ANY(max_datum), len); + max_datum = short_datum; + } + else if (len != -1) + { + max_datum = (Datum)VARDATA_ANY(max_datum); + } + + /* Is this a valid range? */ + IsValidRange(minIndex, maxIndex, RelationGetRelid(rel), partitionColumn, NULL); + + keycmp_oid = OperatorGet("<=", PG_CATALOG_NAMESPACE , pg_part->keytype, pg_part->keytype, &isdef); + + if(isdef == FALSE) + elog(ERROR, "'<=' operator is not defined for this type."); + + keycmp_proc = get_opcode(keycmp_oid); + lte_min = DatumGetBool(OidFunctionCall2(keycmp_proc, ((Const *)lfirst(minIndex))->constvalue, min_datum)); + + keycmp_oid = OperatorGet(">=", PG_CATALOG_NAMESPACE , pg_part->keytype, pg_part->keytype, &isdef); + + if(isdef == FALSE) + elog(ERROR, "'>=' operator is not defined for this type."); + + keycmp_proc = get_opcode(keycmp_oid); + gte_max = DatumGetBool(OidFunctionCall2(keycmp_proc, ((Const *)lfirst(maxIndex))->constvalue, max_datum)); + + if(lte_min && gte_max) + { + partRowInfo = palloc(sizeof(PartitionRowInfo)); + memcpy(&partRowInfo->tid, &pg_parttup->t_self, sizeof(ItemPointerData)); + partRowInfo->partRelOid = pg_part->partrelid; + rows_to_be_deleted_list = lappend(rows_to_be_deleted_list, partRowInfo); + } + else + { + list_free(rows_to_be_deleted_list); + rows_to_be_deleted_list = NULL; + } + + + minIndex = minIndex->next; + maxIndex = maxIndex->next; + } + } + + IsRelDeleted = false; + foreach(row_to_be_deleted, rows_to_be_deleted_list) + { + PartitionRowInfo *tmp = (PartitionRowInfo *)lfirst(row_to_be_deleted); + simple_heap_delete(pg_partrel, &tmp->tid); + + if(IsRelDeleted == false) + { + IsRelDeleted = true; + resetStringInfo(query); + Relation Rel = RelationIdGetRelation(tmp->partRelOid); + appendStringInfo(query, "DROP TABLE %s CASCADE;", RelationGetRelationName(Rel)); + relation_close(Rel, 0); + query_list = pg_parse_query(query->data); + parsetree = (Node *)lfirst(list_head(query_list)); + RemoveRelations((DropStmt *)parsetree); + } + pfree(tmp); + } + list_free(rows_to_be_deleted_list); + rows_to_be_deleted_list = NULL; + + systable_endscan(pg_partscan); + heap_close(pg_partrel, RowExclusiveLock); + } + + static void + ATExecDropPartitionByName(AlteredTableInfo *tab, Relation rel, + Partition *part) + { + Oid relId = RelnameGetRelid(part->partName->relname); + ScanKeyData skey; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + Relation pg_partrel; + StringInfo query = makeStringInfo(); + Node *parsetree; + List *query_list = NULL; + int count = 0; + + if(relId == InvalidOid) + elog(ERROR, "Error : Could not find valid Rel-Id for '%s' in current search path.", part->partName->relname); + + ScanKeyInit(&skey, + Anum_pg_partition_partrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(relId)); + + pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->parttype == PART_RANGE) + { + count++; + simple_heap_delete(pg_partrel, &pg_parttup->t_self); + } + } + + if(count) + { + appendStringInfo(query, "DROP TABLE %s CASCADE;", part->partName->relname); + query_list = pg_parse_query(query->data); + parsetree = (Node *)lfirst(list_head(query_list)); + RemoveRelations((DropStmt *)parsetree); + } + else + { + elog(ERROR, "'%s' is not a valid partition of '%s' table.", part->partName->relname, RelationGetRelationName(rel)); + } + + systable_endscan(pg_partscan); + heap_close(pg_partrel, RowExclusiveLock); + } + /* + * ATExecAddPartition : Add partition to an existing table. Currectly only range partition + * is supported. + */ + static Oid + ATExecAddPartition(AlteredTableInfo *tab, + Relation rel, Partition *part) + { + StringInfo query = makeStringInfo(); + StringInfo createTrigger = makeStringInfo(); + List *query_list; + Node *parsetree; + ScanKeyData skey; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + Relation pg_partrel; + List *distinct_part_key_list = NULL; + List *distinct_part_rel_oid_list = NULL; + ListCell *minIndex, *maxIndex; + Datum min_datum, max_datum; + Oid typ_oid; + bool typbyval; + int16 len; + char *partitionColumn; + Partition *tmp; + Datum part_attr; + bool isnull; + PartitionAttrs *partition_attr; + int partitionCount = 0; + Oid *childOids = NULL; + int j = 0; + Oid child; + Const *tmp_const; + + if(part->partName) + { + if(part->tablespacename) + appendStringInfo(query, "CREATE TABLE %s () INHERITS (%s) TABLESPACE %s;", part->partName->relname, RelationGetRelationName(rel), part->tablespacename); + else + appendStringInfo(query, "CREATE TABLE %s () INHERITS (%s) ;", part->partName->relname, RelationGetRelationName(rel)); + } + else + { + elog(ERROR, "Please specify partition name."); + } + + query_list = pg_parse_query(query->data); + parsetree = (Node *)lfirst(list_head(query_list)); + + child = DefineRelation((CreateStmt *)parsetree, RELKIND_RELATION); + + minIndex = part->partitionCheck->min_value->head; + maxIndex = part->partitionCheck->max_value->head; + + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + pg_partrel = heap_open(PartitionRelationId, AccessShareLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + /* Create partition attribute node. */ + partition_attr = makeNode(PartitionAttrs); + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->parttype == PART_RANGE) + { + partitionCount++; + if (!list_member_int(distinct_part_key_list, pg_part->partkey)) + { + + Alias *colName = NULL; + + distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey); + + /* Get the name of partition key column */ + partitionColumn = get_attname (rel->rd_id, pg_part->partkey); + + /* Add the partition key columns. */ + colName = makeAlias(partitionColumn, NULL); + partition_attr->colName = lappend(partition_attr->colName, colName); + + IsValidRange(minIndex, maxIndex, RelationGetRelid(rel), partitionColumn, part->partName->relname); + + AddCheckConstraint(pg_part->keytype, + ((Const *)lfirst(minIndex))->constvalue, + ((Const *)lfirst(maxIndex))->constvalue, + part->partName->relname, + partitionColumn); + + minIndex = minIndex->next; + maxIndex = maxIndex->next; + } + } + } + + if(partitionCount == 0) + elog(ERROR, "Could not find partition attributes of '%s'.", RelationGetRelationName(rel)); + + if(minIndex != NULL || maxIndex != NULL) + elog(ERROR, "Multiple values specified for partition attributes for partition '%s'.", part->partName->relname); + + systable_endscan(pg_partscan); + heap_close(pg_partrel, AccessShareLock); + + /*Scan key to scan pg_partition table on parentrelid*/ + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + pg_partrel = heap_open(PartitionRelationId, AccessShareLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + // partition_attr->PartitionFunction = PART_RANGE; + + if((partitionCount % list_length(distinct_part_key_list))==0) + { + childOids = malloc(sizeof(Oid) * (1 + (partitionCount/list_length(distinct_part_key_list)))); + } + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + Datum short_datum; + + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->parttype == PART_RANGE) + { + + if (!list_member_oid(distinct_part_rel_oid_list, pg_part->partrelid)) + { + distinct_part_rel_oid_list = lappend_oid(distinct_part_rel_oid_list, pg_part->partrelid); + childOids[j++] = pg_part->partrelid; + + tmp = makeNode(Partition); + tmp->partName = makeNode(RangeVar); + tmp->partitionCheck = makeNode(Constraint); + + /* Add this partition to partition attributes. */ + partition_attr->partitions = lappend(partition_attr->partitions, tmp); + } + + Relation tmp_rel = RelationIdGetRelation(pg_part->partrelid); + tmp->partName->relname = RelationGetRelationName(tmp_rel); + relation_close(tmp_rel, 0); + + /* Get the name of partition key column */ + partitionColumn = get_attname (rel->rd_id, pg_part->partkey); + + /* Get len and typbyval from pg_type */ + get_typlenbyval(pg_part->keytype, &len, &typbyval); + + /* Get min attribute from catalog */ + part_attr = heap_getattr (pg_parttup, Anum_pg_partition_minval, pg_partrel->rd_att, &isnull); + + /* Three cases a. datatypes with typbyval true and fix length. + * b. datatypes with byval false and fix length. + * c. extendible datatypes where length -1. + */ + if ( typbyval ) + { + short_datum = 0; + memcpy(&short_datum, VARDATA_ANY(part_attr), len); + part_attr = short_datum; + } + else if (len != -1) + { + part_attr = (Datum)VARDATA_ANY(part_attr); + } + + + /* Create const. */ + tmp_const = makeConst(pg_part->keytype, /* Const type */ + -1, + len, + part_attr, + false, + typbyval); + + tmp_const->location = -1; + + /* Add this to new partition. */ + tmp->partitionCheck->min_value = lappend(tmp->partitionCheck->min_value, tmp_const); + + /* Get min attribute from catalog */ + part_attr = heap_getattr (pg_parttup, Anum_pg_partition_maxval, pg_partrel->rd_att, &isnull); + + /* Three cases a. datatypes with typbyval true and fix length. + * b. datatypes with byval false and fix length. + * c. extendible datatypes where length -1. + */ + if ( typbyval ) + { + short_datum = 0; + memcpy(&short_datum, VARDATA_ANY(part_attr), len); + part_attr = short_datum; + } + else if (len != -1) + { + part_attr = (Datum)VARDATA_ANY(part_attr); + } + + /* Create const. */ + tmp_const = makeConst(pg_part->keytype, /* Const type */ + -1, + len, + part_attr, + false, + typbyval); + + tmp_const->location = -1; + + /* Add this to new partition. */ + tmp->partitionCheck->max_value = lappend(tmp->partitionCheck->max_value, tmp_const); + } + } + + childOids[j++] = child; + partition_attr->partitions = lappend(partition_attr->partitions, part); + + ValidateRanges(partition_attr, RelationGetRelid(rel), childOids); + + systable_endscan(pg_partscan); + heap_close(pg_partrel, AccessShareLock); + + minIndex = part->partitionCheck->min_value->head; + maxIndex = part->partitionCheck->max_value->head; + + + ListCell *partColumn; + + j = 0; + + foreach(partColumn, partition_attr->colName) + { + /* Get the name of partition key column. */ + partitionColumn = ((Alias *)lfirst(partColumn))->aliasname; + + min_datum = ((Const *)lfirst(minIndex))->constvalue; + max_datum = ((Const *)lfirst(maxIndex))->constvalue; + + /* Get the attribute type. */ + typ_oid = get_atttype(RelationGetRelid(rel), get_attnum(RelationGetRelid(rel), partitionColumn) ); + + /* Get the attribute length*/ + get_typlenbyval(typ_oid, &len, &typbyval); + + WriteRangePartitionToCatalog(min_datum, + max_datum, + len, + typbyval, + RelationGetRelid(rel), + child, + get_attnum(RelationGetRelid(rel), partitionColumn), + typ_oid, + ++j); + + minIndex = minIndex->next; + maxIndex = maxIndex->next; + } + + resetStringInfo(createTrigger); + appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_update_trigger BEFORE UPDATE ON %s ", part->partName->relname); + appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_update_trigger ();"); + + /* Parse the above SQL string and use this parsetree to create trigger on parent table. */ + query_list = pg_parse_query(createTrigger->data); + parsetree = (Node *)lfirst(list_head(query_list)); + CreateTrigger((CreateTrigStmt * )parsetree, 0); + + return child; + } + + static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel, ColumnDef *colDef) *************** *** 4061,4066 **** --- 5523,5532 ---- List *children; ObjectAddress object; + /* Check if the column is part of any partition key. */ + if( GetPartitionsCountAtt(RelationGetRelid(rel), get_attnum(RelationGetRelid(rel), colName)) > 0) + elog(ERROR, "Can not drop column '%s' as this is part of partition key of '%s' table.", colName, RelationGetRelationName(rel)); + /* At top level, permission check was done in ATPrepCmd, else do it */ if (recursing) ATSimplePermissions(rel, false); *************** *** 5557,5562 **** --- 7023,7032 ---- SysScanDesc scan; HeapTuple depTup; + /* Check if the column is part of any partition key. */ + if( GetPartitionsCountAtt(RelationGetRelid(rel), get_attnum(RelationGetRelid(rel), colName)) > 0) + elog(ERROR, "Can not alter column '%s' as this is part of partition key of '%s' table.", colName, RelationGetRelationName(rel)); + attrelation = heap_open(AttributeRelationId, RowExclusiveLock); /* Look up the target column */ *************** *** 7764,7767 **** --- 9234,9870 ---- cur_item = lnext(prev_item); } } + } + + + /* + * ------------------------------------------------------------------------------ + * DefinePartitions + * Create new partitions. They end up inheriting from the parent + * relation. + * Once they have been created, triggers need to be assigned to the parent to + * provide the UPDATEs/INSERTs/DELETEs to percolate down to the children + * Callers expect this function to end with CommandCounterIncrement if it + * makes any changes. + * ------------------------------------------------------------------------------ + */ + void + DefinePartitions(CreateStmt *stmt, Oid parentRelId) + { + CreateStmt *childStmt; + RangeVar inr; + Oid childOid; + PartitionAttrs *partAttr; + int *partKeyOrder; + List *partColumnList; + ListCell *partColumn; + int count, i, j; + List *columnList; + ListCell *column; + StringInfo childTableName = makeStringInfo(); + Oid *partitionRelOids; + bool partitionColumnMatch; + Relation rel; + StringInfo createTrigger = makeStringInfo(); + List *query_list; + Node *parsetree; + + + /* If no partitions are defined then return. */ + if (stmt->partAttr == NULL) + return; + + partColumnList = ((PartitionAttrs *)(stmt->partAttr))->colName; + + i = 0; + j = 0; + partitionColumnMatch = false; + + Assert(IsA(stmt->partAttr, PartitionAttrs)); + + partAttr = (PartitionAttrs *)(stmt->partAttr); + /* To store the partition key order */ + partKeyOrder = malloc(partColumnList->length * sizeof(int)); + columnList = stmt->tableElts; + + foreach(partColumn, partColumnList) + { + /* Get the name of partition key column. */ + char *colName = ((Alias *)lfirst(partColumn))->aliasname; + count = 0; + + foreach(column, columnList) + { + ColumnDef *tmp = lfirst(column); + count++; + if(strcmp(tmp->colname, colName) == 0) + { + partKeyOrder[j++] = count; + partitionColumnMatch = true; + break; + } + } + if(partitionColumnMatch == false) + elog(ERROR, "'%s' attribute is not present in given table.", colName); + } + + /* + * All the partitions will inherit from the parent, set the parent in the + * inhRelations structure + */ + inr = *stmt->relation; + + /* + * Create the children tables. The parser has already made sure that we + * have atleast one partition in the list + */ + if (partAttr->partFunc == PART_LIST || partAttr->partFunc == PART_RANGE) + { + List *partitionList = partAttr->partitions; + ListCell *temp_part; + int isSystemGeneratedName = 0; + int isUserGeneratedName = 0; + int i = 0; + + Assert(list_length(partitionList) > 0); + + rel = RelationIdGetRelation(parentRelId); + resetStringInfo(createTrigger); + appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_insert_trigger BEFORE INSERT ON %s ", RelationGetRelationName(rel)); + appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_insert_trigger();"); + relation_close(rel, 0); + + /* Parse the above SQL string and use this parsetree to create trigger on parent table. */ + query_list = pg_parse_query(createTrigger->data); + parsetree = (Node *)lfirst(list_head(query_list)); + CreateTrigger((CreateTrigStmt * )parsetree, 0); + + /* To avoid tuple being updated by the current transaction, after the current scan started */ + CommandCounterIncrement(); + + /* Allocate the storage for number of partitions.*/ + partitionRelOids = malloc(sizeof(Oid)*list_length(partitionList)); + + foreach(temp_part, partitionList) + { + Partition *temp_partition = lfirst(temp_part); + /* + * Create a working copy for each child + */ + childStmt = (CreateStmt *)copyObject((void *)stmt); + childStmt->constraints = list_make1(temp_partition->partitionCheck); + + /* + * Child has to use all columns from the parent, otherwise we will get + * unnecessary merging columns notices as part of the + * DefineRelation + */ + childStmt->tableElts = NIL; + + childStmt->inhRelations = lappend(NULL, &inr); + + /* If partition name is not specified then generate one and apply. */ + if(temp_partition->partName == NULL) + { + + if(isUserGeneratedName == 1) + elog(ERROR, "Combination of user generated and system generated table names is not allowed."); + + isSystemGeneratedName = 1; + i++; + + appendStringInfo(childTableName, "%s_%d", stmt->relation->relname, i); + + /* Assign the child table name. */ + childStmt->relation->relname = childTableName->data; + + temp_partition->partName = makeNode(RangeVar); + temp_partition->partName->relname = childTableName->data; + } + else + { + if(isSystemGeneratedName == 0) + { + /* Assign the child table name. */ + childStmt->relation->relname = temp_partition->partName->relname; + isUserGeneratedName = 1; + } + else + elog(ERROR, "Combination of user generated and system generated table names is not allowed."); + } + + /* Assign the tablespace specified. */ + childStmt->tablespacename = temp_partition->tablespacename; + + /* Define the relation. */ + childOid = DefineRelation(childStmt, RELKIND_RELATION); + + rel = RelationIdGetRelation(childOid); + resetStringInfo(createTrigger); + appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_update_trigger BEFORE UPDATE ON %s ", RelationGetRelationName(rel)); + appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_update_trigger ();"); + relation_close(rel, 0); + + /* Parse the above SQL string and use this parsetree to create trigger on parent table. */ + query_list = pg_parse_query(createTrigger->data); + parsetree = (Node *)lfirst(list_head(query_list)); + CreateTrigger((CreateTrigStmt * )parsetree, 0); + + /* Store the Child table Oids for later use. */ + partitionRelOids[i++] = childOid; + } + /* + * Make the changes carried out so far, visible + */ + CreatePartitionTrigger(stmt, parentRelId, partitionRelOids); + CommandCounterIncrement(); + + /* Free the storage. */ + free(partitionRelOids); + + } + else if(partAttr->partFunc == PART_HASH) + { + + bool typbyval; + Datum values[Natts_pg_partition]; + bool nulls [Natts_pg_partition]; + int16 len, k; + Oid typ_oid; + + Assert(partAttr->numberOfPartitions > 0); + + i = 0; + j = 0; + + /* Allocate the storage for number of partitions.*/ + partitionRelOids = malloc(sizeof(Oid) * partAttr->numberOfPartitions); + + for(i=0; inumberOfPartitions; i++) + { + childStmt = (CreateStmt *)copyObject((void *)stmt); + childStmt->constraints = NULL; + + /* + * Child has to use all columns from the parent, otherwise we will get + * unnecessary merging columns notices as part of the + * DefineRelation + */ + childStmt->tableElts = NIL; + childStmt->inhRelations = lappend(NULL, &inr); + + /* Device the name of child table. */ + resetStringInfo(childTableName); + appendStringInfo(childTableName, "%s_%d", stmt->relation->relname, i); + + /* Use this name to create child table. */ + childStmt->relation->relname = childTableName->data; + childOid = DefineRelation(childStmt, RELKIND_RELATION); + + /* */ + rel = RelationIdGetRelation(childOid); + resetStringInfo(createTrigger); + appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_update_trigger BEFORE UPDATE ON %s ", RelationGetRelationName(rel)); + appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_update_trigger();"); + + /* Parse the above SQL string and use this parsetree to create trigger on parent table. */ + query_list = pg_parse_query(createTrigger->data); + parsetree = (Node *)lfirst(list_head(query_list)); + CreateTrigger((CreateTrigStmt * )parsetree, 0); + + /* To avoid tuple being updated by the current transaction, after the current scan started */ + CommandCounterIncrement(); + + /* We can not create constraints for hash partitions directly. Instead, we will monitor inserts using insert triggers.*/ + resetStringInfo(createTrigger); + appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_constraints_hash BEFORE INSERT ON %s ", RelationGetRelationName(rel)); + appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_constraints_hash();"); + relation_close(rel, 0); + + /* Parse the above SQL string and use this parsetree to create trigger on parent table. */ + query_list = pg_parse_query(createTrigger->data); + parsetree = (Node *)lfirst(list_head(query_list)); + CreateTrigger((CreateTrigStmt * )parsetree, 0); + + k = 1; + + foreach(partColumn, partColumnList) + { + /* Get the name of partition key column. */ + char *colName = ((Alias *)lfirst(partColumn))->aliasname; + + /* Get the attribute type. */ + typ_oid = get_atttype(parentRelId, get_attnum(parentRelId, colName)); + + /* Get the attribute length*/ + get_typlenbyval(typ_oid, &len, &typbyval); + + /* initialize nulls and values */ + for (j = 0; j < Natts_pg_partition; j++) + { + nulls[j] = false; + values[j] = (Datum) NULL; + } + + values[Anum_pg_partition_minval -1] = Int8GetDatum(NULL); + nulls[Anum_pg_partition_minval -1] = true; + values[Anum_pg_partition_maxval -1] = Int8GetDatum(NULL); + nulls[Anum_pg_partition_maxval -1] = true; + values[Anum_pg_partition_parentrelid -1]= ObjectIdGetDatum(parentRelId); + values[Anum_pg_partition_partrelid -1] = ObjectIdGetDatum(childOid); + values[Anum_pg_partition_parttype -1] = Int8GetDatum(PART_HASH); + values[Anum_pg_partition_partkey -1] = ObjectIdGetDatum(get_attnum(parentRelId, colName)); + values[Anum_pg_partition_listval -1] = Int8GetDatum(NULL); + nulls[Anum_pg_partition_listval -1] = true; + values[Anum_pg_partition_hashval -1] = Int8GetDatum(i); + values[Anum_pg_partition_keytype -1] = ObjectIdGetDatum(typ_oid); + values[Anum_pg_partition_keyorder -1] = Int8GetDatum(k); + + k++; + + Relation r = heap_open(PartitionRelationId, RowExclusiveLock); + TupleDesc tupDesc = r->rd_att; + + HeapTuple tup = heap_form_tuple(tupDesc, values, nulls); + simple_heap_insert(r, tup); + + CatalogUpdateIndexes(r, tup); + + heap_close(r, RowExclusiveLock); + } + } + + rel = RelationIdGetRelation(parentRelId); + resetStringInfo(createTrigger); + appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_insert_trigger_hash BEFORE INSERT ON %s ", RelationGetRelationName(rel)); + appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_insert_trigger_hash();"); + relation_close(rel, 0); + + /* Parse the above SQL string and use this parsetree to create trigger on parent table. */ + query_list = pg_parse_query(createTrigger->data); + parsetree = (Node *)lfirst(list_head(query_list)); + CreateTrigger((CreateTrigStmt * )parsetree, 0); + + /* + * Make the changes carried out so far, visible + */ + CreatePartitionTrigger(stmt, parentRelId, partitionRelOids); + CommandCounterIncrement(); + + /* Free the storage. */ + free(partitionRelOids); + } + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("Invalid PARTITION type specified"))); + } + + /* + * ------------------------------------------------------------------------------ + * ValidateRanges : Check for the intersecting ranges of specified partitions. + * + * ------------------------------------------------------------------------------ + */ + int ValidateRanges(PartitionAttrs *partAttr, Oid parentRelOid, Oid *partitionRelOids) + { + List *partitionList; + List *partitionColumnList; + ListCell *p1; + ListCell *p2; + Partition *partition1, *partition2; + ListCell *partitionColumn; + ListCell *p1MinIndex, *p1MaxIndex, *p2MinIndex, *p2MaxIndex; + Datum p1MinDatum, p1MaxDatum, p2MinDatum; + int i, j, k; + Oid typ_oid; + RegProcedure procLessThan, procGreaterThan; + Oid LessThanOid, GreaterThanOid; + bool isDef; + + + i = 0; + + /* Get the partition List. */ + partitionList = partAttr->partitions; + + /* Get the partition-key column list*/ + partitionColumnList = partAttr->colName; + + foreach(partitionColumn, partitionColumnList) + { + char *colName = ((Alias *)lfirst(partitionColumn))->aliasname; + /* Get the column type. */ + typ_oid = get_atttype(parentRelOid, get_attnum(parentRelOid, colName)); + + foreach(p1, partitionList) + { + partition1 = lfirst(p1); + j = 0; + p1MinIndex = partition1->partitionCheck->min_value->head; + p1MaxIndex = partition1->partitionCheck->max_value->head; + + while(j < i) + { + p1MinIndex = p1MinIndex->next; + p1MaxIndex = p1MaxIndex->next; + j++; + } + + /* If it's a FuncExpr then execute the conversion function. */ + if IsA((Node *)lfirst(p1MinIndex), FuncExpr) + { + p1MinDatum = OidFunctionCall1(((FuncExpr *)lfirst(p1MinIndex))->funcid, + ((Const *)((FuncExpr *)lfirst(p1MinIndex))->args->head->data.ptr_value)->constvalue); + } + else + { + p1MinDatum = ((Const *)lfirst(p1MinIndex))->constvalue; + } + + /* If it's a FuncExpr then execute the conversion function. */ + if IsA((Node *)lfirst(p1MaxIndex), FuncExpr) + { + p1MaxDatum = OidFunctionCall1(((FuncExpr *)lfirst(p1MaxIndex))->funcid, + ((Const *)((FuncExpr *)lfirst(p1MaxIndex))->args->head->data.ptr_value)->constvalue); + } + else + { + p1MaxDatum = ((Const *)lfirst(p1MaxIndex))->constvalue; + } + + foreach(p2, partitionList) + { + if(p1 != p2) + { + partition2 = lfirst(p2); + k = 0; + p2MinIndex = partition2->partitionCheck->min_value->head; + p2MaxIndex = partition2->partitionCheck->max_value->head; + + while(k < i) + { + p2MinIndex = p2MinIndex->next; + p2MaxIndex = p2MaxIndex->next; + k++; + } + + /* If it's a FuncExpr then execute the conversion function. */ + if IsA((Node *)lfirst(p2MinIndex), FuncExpr) + { + p2MinDatum = OidFunctionCall1(((FuncExpr *)lfirst(p2MinIndex))->funcid, + ((Const *)((FuncExpr *)lfirst(p2MinIndex))->args->head->data.ptr_value)->constvalue); + } + else + { + p2MinDatum = ((Const *)lfirst(p2MinIndex))->constvalue; + } + + LessThanOid = OperatorGet("<=", PG_CATALOG_NAMESPACE , typ_oid, typ_oid, &isDef); + + if(isDef == FALSE) + elog(ERROR, "'<=' operator is not defined for type '%d'.", typ_oid); + + procLessThan = get_opcode(LessThanOid); + + GreaterThanOid = OperatorGet(">", PG_CATALOG_NAMESPACE , typ_oid, typ_oid, &isDef); + + if(isDef == FALSE) + elog(ERROR, "'>' operator is not defined for type '%d'.", typ_oid); + + procGreaterThan = get_opcode(GreaterThanOid); + bool tmp1, tmp2; + + tmp1 = DatumGetBool(OidFunctionCall2(procLessThan, p1MinDatum, p2MinDatum)); + tmp2 = DatumGetBool(OidFunctionCall2(procGreaterThan, p1MaxDatum, p2MinDatum)); + + if(tmp1 == true && tmp2 == true ) + { + elog(ERROR, "Partition ranges of '%s' and '%s' are overlapping for '%s' attribute.", + partition1->partName->relname, partition2->partName->relname, colName); + } + } + } + } + i++; + } + return 0; + } + + /* + * ------------------------------------------------------------------------------ + * CreatePartitionTrigger : Create trigger for insert, update. Add constraints + * on child tables as per ranges specified. + * Param 1 : CreateStmt *stmt - Pointer to struct CreateStmt which contains + * information required to create trigger. + * ------------------------------------------------------------------------------ + */ + static void + CreatePartitionTrigger(CreateStmt *stmt, Oid parentRelOid, Oid *partitionRelOids) + { + int counter, numberOfPartitions; + PartitionAttrs *partAttr; + char *partitionColumn; + StringInfo func_insert = makeStringInfo(); + StringInfo func_update = makeStringInfo(); + Partition *temp_partition; + List *partitionList; + ListCell *temp_part; + int partitionColumnType; + List *partColumnList; + ListCell *partColumn; + int count, i, j, l; + List *columnList; + ListCell *column; + int *partKeyOrder; + + partAttr = (PartitionAttrs *)(stmt->partAttr); + + if(partAttr->partFunc == PART_HASH) + { + return ; + } + + /* Reset the buffers used. */ + resetStringInfo(func_insert); + resetStringInfo(func_update); + + /* Reset */ + counter = 0; + numberOfPartitions = 0; + i = 0; + j = 0; + l = 0; + + partitionList = partAttr->partitions; + Assert(list_length(partitionList)); + + /* Get the list of partition key columns. */ + partColumnList = partAttr->colName; + + partKeyOrder = malloc(partColumnList->length * sizeof(int)); + + columnList = stmt->tableElts; + + foreach(partColumn, partColumnList) + { + /* Get the name of partition key column. */ + char *colName = ((Alias *)lfirst(partColumn))->aliasname; + bool found = false; + count = 0; + + foreach(column, columnList) + { + ColumnDef *tmp = lfirst(column); + count++; + if(strcmp(tmp->colname, colName) == 0) + { + partKeyOrder[j++] = count; + found = true; + break; + } + } + + if(found == false) + elog(ERROR, "'%s' attribute is not present in given table.", colName); + + } + + foreach(temp_part, partitionList) + { + ListCell *minIndex, *maxIndex; + + temp_partition = lfirst(temp_part); + + if ( partAttr->partFunc == PART_RANGE ) + { + minIndex = temp_partition->partitionCheck->min_value->head; + maxIndex = temp_partition->partitionCheck->max_value->head; + } + + j = 0; + + foreach(partColumn, partColumnList) + { + + /* Get the name of partition key column. */ + char *colName = ((Alias *)lfirst(partColumn))->aliasname; + partitionColumn = colName; + + /* Type of partition key column */ + partitionColumnType = get_atttype(parentRelOid, get_attnum(parentRelOid, partitionColumn)); + + if ( partAttr->partFunc == PART_LIST ) + { + elog(ERROR, "List partitions are not yet supported."); + } + else + { + Datum min_datum, max_datum; + Oid typ_oid; + bool typbyval; + int16 len; + + /* Get the attribute type. */ + typ_oid = get_atttype(parentRelOid, get_attnum(parentRelOid, partitionColumn)); + + IsValidRange(minIndex, maxIndex, parentRelOid, partitionColumn, temp_partition->partName->relname); + + AddCheckConstraint(typ_oid, + ((Const *)lfirst(minIndex))->constvalue, + ((Const *)lfirst(maxIndex))->constvalue, + temp_partition->partName->relname, + partitionColumn); + + /* Get the attribute length*/ + get_typlenbyval(typ_oid, &len, &typbyval); + + min_datum = ((Const *)lfirst(minIndex))->constvalue; + max_datum = ((Const *)lfirst(maxIndex))->constvalue; + + minIndex = minIndex->next; + maxIndex = maxIndex->next; + + WriteRangePartitionToCatalog(min_datum, + max_datum, + len, + typbyval, + parentRelOid, + partitionRelOids[l], + get_attnum(parentRelOid, partitionColumn), + typ_oid, + ++j); + } + } + l = l + 1; + } + if ( partAttr->partFunc == PART_RANGE ) + ValidateRanges(partAttr,parentRelOid, partitionRelOids); + return; + } + + /* + * Pass either "*OLD*" or "*NEW*" as appropriate depending on the trigger to + * to be generated + */ + static void + MutateColumnRefs(Node *node, char *reference) + { + if IsA(node, A_Expr) + { + MutateColumnRefs( ((A_Expr *) node)->lexpr, reference ); + MutateColumnRefs( ((A_Expr *) node)->rexpr, reference); + } + else if IsA(node, ColumnRef) + { + ColumnRef *col_ref = (ColumnRef *) node; + + /* Should have at max one column in the column list */ + Assert(1 == list_length(col_ref->fields)); + + col_ref->fields = list_make2(makeString(reference), + lfirst(list_head(col_ref->fields))); + + } + return; } Index: src/backend/commands/trigger.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/commands/trigger.c,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.10.1 diff -c -r1.1.1.2 -r1.1.1.2.10.1 *** src/backend/commands/trigger.c 1 Dec 2008 09:37:46 -0000 1.1.1.2 --- src/backend/commands/trigger.c 9 Mar 2009 12:55:31 -0000 1.1.1.2.10.1 *************** *** 1730,1735 **** --- 1730,1738 ---- heap_freetuple(oldtuple); if (newtuple == NULL) break; + /* Break free from the loop is found dummy-tuple. */ + if (newtuple->t_len == -1) + break; } return newtuple; } *************** *** 2021,2026 **** --- 2024,2032 ---- if (oldtuple != newtuple && oldtuple != intuple) heap_freetuple(oldtuple); if (newtuple == NULL) + break; + /* Break free from the loop is found dummy-tuple. */ + if (newtuple->t_len == -1) break; } heap_freetuple(trigtuple); Index: src/backend/executor/execMain.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/executor/execMain.c,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.8.1 diff -c -r1.1.1.2 -r1.1.1.2.8.1 *** src/backend/executor/execMain.c 1 Dec 2008 09:37:46 -0000 1.1.1.2 --- src/backend/executor/execMain.c 9 Mar 2009 12:55:31 -0000 1.1.1.2.8.1 *************** *** 1674,1679 **** --- 1674,1687 ---- if (newtuple == NULL) /* "do nothing" */ return; + /* Check for dummy tuple */ + if(newtuple->t_len == -1) + { + /* Increase the count of processed rows. */ + (estate->es_processed)++; + return; + } + if (newtuple != tuple) /* modified by Trigger(s) */ { /* *************** *** 1911,1916 **** --- 1919,1933 ---- if (newtuple == NULL) /* "do nothing" */ return; + + /* Check for dummy tuple */ + if(newtuple->t_len == -1) + { + /* Increase the count of processed rows. */ + (estate->es_processed)++; + heap_freetuple(newtuple); + return; + } if (newtuple != tuple) /* modified by Trigger(s) */ { Index: src/backend/nodes/copyfuncs.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/nodes/copyfuncs.c,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.8.6 diff -c -r1.1.1.2 -r1.1.1.2.8.6 *** src/backend/nodes/copyfuncs.c 1 Dec 2008 09:37:46 -0000 1.1.1.2 --- src/backend/nodes/copyfuncs.c 19 Mar 2009 12:34:57 -0000 1.1.1.2.8.6 *************** *** 1513,1518 **** --- 1513,1573 ---- return newnode; } + /* + * _copyPartitionAttrs + */ + static PartitionAttrs * + _copyPartitionAttrs(PartitionAttrs *from) + { + PartitionAttrs *newnode = makeNode(PartitionAttrs); + + COPY_SCALAR_FIELD(numberOfPartitions); + COPY_NODE_FIELD(partitions); + COPY_SCALAR_FIELD(partFunc); + COPY_NODE_FIELD(colName); + + return newnode; + } + + /* + * _copyPartition + */ + static Partition * + _copyPartition(Partition *from) + { + Partition *newnode = makeNode(Partition); + + COPY_NODE_FIELD(partName); + COPY_NODE_FIELD(partitionCheck); + COPY_STRING_FIELD(tablespacename); + + return newnode; + } + + /* + * _copyPartition + */ + static UpdatePartitionStmt * + _copyUpdatePartitionStmt(UpdatePartitionStmt *from) + { + UpdatePartitionStmt *newnode = makeNode(UpdatePartitionStmt); + + COPY_NODE_FIELD(prev); + COPY_NODE_FIELD(after); + + return newnode; + } + static SplitPartitionStmt * + _copySplitPartitionStmt(SplitPartitionStmt *from) + { + SplitPartitionStmt *newnode = makeNode(SplitPartitionStmt); + + COPY_NODE_FIELD(partNames); + COPY_NODE_FIELD(splitValues); + + return newnode; + } + /* **************************************************************** * relation.h copy functions * *************** *** 2018,2023 **** --- 2073,2081 ---- COPY_NODE_FIELD(keys); COPY_NODE_FIELD(options); COPY_STRING_FIELD(indexspace); + COPY_NODE_FIELD(min_value); + COPY_NODE_FIELD(max_value); + COPY_NODE_FIELD(partition_list_values); return newnode; } *************** *** 2325,2330 **** --- 2383,2389 ---- COPY_NODE_FIELD(options); COPY_SCALAR_FIELD(oncommit); COPY_STRING_FIELD(tablespacename); + COPY_NODE_FIELD(partAttr); return newnode; } *************** *** 3461,3467 **** case T_FromExpr: retval = _copyFromExpr(from); break; ! /* * RELATION NODES */ --- 3520,3537 ---- case T_FromExpr: retval = _copyFromExpr(from); break; ! case T_PartitionAttrs: ! retval = _copyPartitionAttrs(from); ! break; ! case T_Partition: ! retval = _copyPartition(from); ! break; ! case T_UpdatePartitionStmt: ! retval = _copyUpdatePartitionStmt(from); ! break; ! case T_SplitPartitionStmt: ! retval = _copySplitPartitionStmt(from); ! break; /* * RELATION NODES */ Index: src/backend/nodes/equalfuncs.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/nodes/equalfuncs.c,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.10.2 diff -c -r1.1.1.2 -r1.1.1.2.10.2 *** src/backend/nodes/equalfuncs.c 1 Dec 2008 09:37:46 -0000 1.1.1.2 --- src/backend/nodes/equalfuncs.c 4 Mar 2009 08:30:42 -0000 1.1.1.2.10.2 *************** *** 705,710 **** --- 705,730 ---- return true; } + static bool + _equalPartitionAttrs(PartitionAttrs *a, PartitionAttrs *b) + { + COMPARE_SCALAR_FIELD(numberOfPartitions); + COMPARE_NODE_FIELD(partitions); + COMPARE_SCALAR_FIELD(partFunc); + COMPARE_NODE_FIELD(colName); + + return true; + } + + static bool + _equalPartition(Partition *a, Partition *b) + { + COMPARE_NODE_FIELD(partName); + COMPARE_NODE_FIELD(partitionCheck); + + return true; + } + /* * Stuff from relation.h *************** *** 1060,1065 **** --- 1080,1086 ---- COMPARE_NODE_FIELD(options); COMPARE_SCALAR_FIELD(oncommit); COMPARE_STRING_FIELD(tablespacename); + COMPARE_NODE_FIELD(partAttr); return true; } *************** *** 2311,2316 **** --- 2332,2343 ---- break; case T_JoinExpr: retval = _equalJoinExpr(a, b); + break; + case T_PartitionAttrs: + retval = _equalPartitionAttrs(a, b); + break; + case T_Partition: + retval = _equalPartition(a, b); break; /* Index: src/backend/optimizer/plan/planner.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/optimizer/plan/planner.c,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.7 diff -c -r1.1.1.1 -r1.1.1.1.10.7 *** src/backend/optimizer/plan/planner.c 1 Dec 2008 09:36:26 -0000 1.1.1.1 --- src/backend/optimizer/plan/planner.c 18 Mar 2009 13:51:36 -0000 1.1.1.1.10.7 *************** *** 41,46 **** --- 41,52 ---- #include "utils/lsyscache.h" #include "utils/syscache.h" + #include "catalog/pg_partition.h" + #include "utils/fmgroids.h" + #include "catalog/indexing.h" + #include "utils/tqual.h" + #include "catalog/pg_namespace.h" + #include "access/hash.h" /* GUC parameter */ double cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION; *************** *** 82,87 **** --- 88,367 ---- List *sub_tlist, AttrNumber *groupColIdx); static List *postprocess_setop_tlist(List *new_tlist, List *orig_tlist); + Oid OperatorGet(const char *operatorName, + Oid operatorNamespace, + Oid leftObjectId, + Oid rightObjectId, + bool *defined); + extern Oid get_relevent_partition(HeapTuple tuple, Relation rel); + extern Oid get_relevent_hash_partition(HeapTuple tuple, Relation rel); + extern Oid TypenameGetTypid (const char *typname); + extern Node* coerce_to_specific_type(ParseState *pstate, + Node * node, + Oid targetTypeId, + const char * constructName + ) ; + + Oid + get_relevent_hash_partition(HeapTuple tuple, Relation rel) { + ScanKeyData skey; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + Relation pg_partrel; + bool flag; + StringInfo str_to_hash = makeStringInfo(); + + /* Lists to identify partition id to which the row should belong. + * irr_part_list - list of oids of irrelevent partitions. + * rel_part_list - list may contain irrelevent partition along + * with relevent partition + */ + List *distinct_part_key_list=NULL; + List *rel_part_list= NULL; + int hash_partition_entries = 0; + unsigned int hashValue; + + flag = false; + resetStringInfo(str_to_hash); + + /*Scan key to scan pg_partition table on parentrelid*/ + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + bool isnull, typbyval; + int16 len; + bool isVariableLength; + Oid typoutput; + + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->parttype == PART_HASH) + { + /* Increase the no. of hash partition entries count. */ + hash_partition_entries++; + + if (!list_member_int(distinct_part_key_list, pg_part->partkey)) + { + distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey); + + /* Get attribute from tuple */ + Datum attr = heap_getattr(tuple, pg_part->partkey, rel->rd_att, &isnull); + + /* Get len and typbyval from pg_type */ + get_typlenbyval(pg_part->keytype, &len, &typbyval); + + /* Read the list value */ + getTypeOutputInfo(pg_part->keytype, &typoutput, &isVariableLength); + appendStringInfo(str_to_hash, DatumGetCString(OidFunctionCall1(typoutput, attr))); + } + } + } + + + hashValue = DatumGetUInt32(hash_any(str_to_hash->data, strlen(str_to_hash->data))) % + (int)(hash_partition_entries / distinct_part_key_list->length); + + // elog(NOTICE, "String to be hashed : %s, Hash Value : %u", str_to_hash->data, hashValue); + + systable_endscan(pg_partscan); + heap_close(pg_partrel, RowExclusiveLock); + + /*Scan key to scan pg_partition table on parentrelid*/ + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + bool isnull; + + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->parttype == PART_HASH) + { + /* Get min attribute from catalog */ + Datum datumDashValue = heap_getattr (pg_parttup,Anum_pg_partition_hashval, + pg_partrel->rd_att, &isnull); + + if(datumDashValue == hashValue) + { + if (!list_member_oid(rel_part_list, pg_part->partrelid)) + { + rel_part_list = lappend_oid(rel_part_list, pg_part->partrelid); + + if(list_length(rel_part_list) > 1) + elog(ERROR, "Found more than one matching partitions."); + } + } + } + } + + systable_endscan(pg_partscan); + heap_close(pg_partrel, RowExclusiveLock); + + // elog (NOTICE,"Returned Oid %u", (Oid)rel_part_list->head->data.oid_value); + + return (Oid)rel_part_list->head->data.oid_value; + } + + static char* + DisplayDatum(Datum datum, Oid type) + { + Oid typeOut; + bool isVariableLength; + + getTypeOutputInfo(type, &typeOut, &isVariableLength); + return (DatumGetCString(OidFunctionCall1(typeOut, datum))); + } + + + /* Function to get partiton id for a tuple inserted on master relation. + * Uses pg_partition catalog lookup to verify boundry condition. + * Each entry in catalog corresponds to constraint on single column. + */ + + Oid + get_relevent_partition(HeapTuple tuple, Relation rel) { + ScanKeyData skey; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + Relation pg_partrel; + + /* Lists to identify partition id to which the row should belong. + * irr_part_list - list of oids of irrelevent partitions. + * rel_part_list - list may contain irrelevent partition along + * with relevent partition + */ + List *irr_part_list=NULL; + List *rel_part_list= NULL; + ListCell *part_oid; + Oid result = 0 ; + + /*Scan key to scan pg_partition table on parentrelid*/ + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + bool isnull, isdef, gte_min, lt_max, typbyval; + int16 len; + Datum short_datum; + Oid typoutput; + bool isVariableLength; + + /* Get attribute from tuple */ + Datum attr = heap_getattr(tuple, pg_part->partkey, rel->rd_att,&isnull); + /* Get min attribute from catalog */ + Datum part_attr = heap_getattr (pg_parttup,Anum_pg_partition_minval, + pg_partrel->rd_att,&isnull); + + getTypeOutputInfo(pg_part->keytype, &typoutput, &isVariableLength); + + /* Get len and typbyval from pg_type */ + get_typlenbyval(pg_part->keytype, &len, &typbyval); + + /* Three cases a. datatypes with typbyval true and fix length. + * b. datatypes with byval false and fix length. + * c. extendible datatypes where length -1. + */ + if ( typbyval ) + { + short_datum = 0; + memcpy(&short_datum, VARDATA_ANY(part_attr), len); + part_attr = short_datum; + } + else if (len != -1) + { + part_attr = (Datum)VARDATA_ANY(part_attr); + } + + /* Evaluate whether tuple value satisfies partition key cnstraint */ + Oid min_oper = OperatorGet(">=", PG_CATALOG_NAMESPACE , pg_part->keytype, + pg_part->keytype, &isdef); + RegProcedure oper_proc = get_opcode(min_oper); + gte_min = DatumGetBool(OidFunctionCall2(oper_proc, attr, part_attr)); + + // elog(NOTICE, "min from pg_partition : %s", DisplayDatum(part_attr, pg_part->keytype)); + + /* Similar steps for max attribute of catalog */ + part_attr = heap_getattr (pg_parttup,Anum_pg_partition_maxval, + pg_partrel->rd_att,&isnull); + if ( typbyval ) + { + short_datum = 0; + memcpy(&short_datum, VARDATA_ANY(part_attr), len); + part_attr = short_datum; + } + else if (len != -1 ) + { + part_attr = (Datum)VARDATA_ANY(part_attr); + } + + Oid max_oper = OperatorGet("<", PG_CATALOG_NAMESPACE , pg_part->keytype, + pg_part->keytype, &isdef); + RegProcedure oper_proc2 = get_opcode(max_oper); + lt_max = DatumGetBool(OidFunctionCall2(oper_proc2,attr,part_attr)); + + // elog(NOTICE, "max from pg_partition : %s", DisplayDatum(part_attr, pg_part->keytype)); + // elog(NOTICE, "value from tuple : %s\n", DisplayDatum(attr, pg_part->keytype)); + + /* Create relevent and irrelevent partition lists */ + if ( gte_min & lt_max ) + { + if ( !list_member_oid(irr_part_list, pg_part->partrelid) ) + rel_part_list = lappend_oid(rel_part_list, pg_part->partrelid); + } + else + irr_part_list = lappend_oid(irr_part_list, (pg_part->partrelid)); + + } + + /* Scan through lists to get partition oid to which the tuple is to be inserted */ + foreach(part_oid, rel_part_list) + { + if ( list_member_oid(irr_part_list, part_oid->data.oid_value) ) + result = 0; + else + result = (Oid)part_oid->data.oid_value; + } + + systable_endscan(pg_partscan); + heap_close(pg_partrel, RowExclusiveLock); + + + // elog (NOTICE,"Returned Oid %d", result); + /*if ( part_oid != NULL ) + return (Oid)part_oid->data.oid_value; + else + return part_oid;*/ + return result; + } + /***************************************************************************** Index: src/backend/optimizer/util/plancat.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/optimizer/util/plancat.c,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.8.1 diff -c -r1.1.1.1 -r1.1.1.1.8.1 *** src/backend/optimizer/util/plancat.c 1 Dec 2008 09:36:26 -0000 1.1.1.1 --- src/backend/optimizer/util/plancat.c 19 Feb 2009 11:51:17 -0000 1.1.1.1.8.1 *************** *** 917,922 **** --- 917,953 ---- return result; } + + /* Return parent oid for child table. + * Useful to find out parent in case of cross partition updates. + */ + + Oid + find_inheritance_parent(Oid inhrelid) + { + Relation relation; + HeapScanDesc scan; + HeapTuple inheritsTuple; + Oid inhparent; + ScanKeyData key[1]; + + + ScanKeyInit(&key[0], + Anum_pg_inherits_inhrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(inhrelid)); + relation = heap_open(InheritsRelationId, AccessShareLock); + scan = heap_beginscan(relation, SnapshotNow, 1, key); + while ((inheritsTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + inhparent = ((Form_pg_inherits)GETSTRUCT(inheritsTuple))->inhparent; + } + heap_endscan(scan); + heap_close(relation, AccessShareLock); + return inhparent; + } + + /* * has_unique_index * Index: src/backend/parser/gram.y =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/parser/gram.y,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.8.10 diff -c -r1.1.1.2 -r1.1.1.2.8.10 *** src/backend/parser/gram.y 1 Dec 2008 09:37:47 -0000 1.1.1.2 --- src/backend/parser/gram.y 19 Mar 2009 12:34:57 -0000 1.1.1.2.8.10 *************** *** 169,174 **** --- 169,176 ---- InsertStmt *istmt; VariableSetStmt *vsetstmt; + + PartitionFunction partFunc; } %type stmt schema_stmt *************** *** 238,244 **** %type func_name handler_name qual_Op qual_all_Op subquery_Op opt_class opt_validator ! %type qualified_name OptConstrFromTable %type all_Op MathOp SpecialRuleRelation --- 240,246 ---- %type func_name handler_name qual_Op qual_all_Op subquery_Op opt_class opt_validator ! %type qualified_name OptConstrFromTable OptPartitionName %type all_Op MathOp SpecialRuleRelation *************** *** 324,330 **** %type def_elem old_aggr_elem %type def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr func_expr AexprConst indirection_el ! columnref in_expr having_clause func_table array_expr %type row type_list array_expr_list %type case_expr case_arg when_clause case_default %type when_clause_list --- 326,335 ---- %type def_elem old_aggr_elem %type def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr func_expr AexprConst indirection_el ! columnref in_expr having_clause func_table array_expr ! ! %type ListPartitionList ! %type row type_list array_expr_list %type case_expr case_arg when_clause case_default %type when_clause_list *************** *** 386,391 **** --- 391,401 ---- %type with_clause %type cte_list + %type RangePartitions ListPartitions PartitionColumns + %type PartitioningData OptPartition ListPartition RangePartition + /* %type LeftVal RightVal */ + + // %type Partitioning_function /* * If you make any token changes, update the keyword table in *************** *** 435,441 **** KEY LANCOMPILER LANGUAGE LARGE_P LAST_P LEADING LEAST LEFT LEVEL ! LIKE LIMIT LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOGIN_P MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE --- 445,451 ---- KEY LANCOMPILER LANGUAGE LARGE_P LAST_P LEADING LEAST LEFT LEVEL ! LIKE LIMIT LIST1 LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOGIN_P MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE *************** *** 447,465 **** OBJECT_P OF OFF OFFSET OIDS OLD ON ONLY OPERATOR OPTION OR ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNED OWNER ! PARSER PARTIAL PASSWORD PLACING PLANS POSITION PRECISION PRESERVE PREPARE PREPARED PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE QUOTE ! READ REAL REASSIGN RECHECK RECURSIVE REFERENCES REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROW ROWS RULE SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SERIALIZABLE SESSION SESSION_USER SET SETOF SHARE ! SHOW SIMILAR SIMPLE SMALLINT SOME STABLE STANDALONE_P START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING SUPERUSER_P SYMMETRIC SYSID SYSTEM_P --- 457,475 ---- OBJECT_P OF OFF OFFSET OIDS OLD ON ONLY OPERATOR OPTION OR ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNED OWNER ! PARSER PARTIAL PARTITION PARTITIONS PASSWORD PLACING PLANS POSITION PRECISION PRESERVE PREPARE PREPARED PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE QUOTE ! RANGE READ REAL REASSIGN RECHECK RECURSIVE REFERENCES REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROW ROWS RULE SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SERIALIZABLE SESSION SESSION_USER SET SETOF SHARE ! SHOW SIMILAR SIMPLE SMALLINT SOME SPLIT STABLE STANDALONE_P START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING SUPERUSER_P SYMMETRIC SYSID SYSTEM_P *************** *** 1493,1500 **** ; alter_table_cmd: /* ALTER TABLE ADD [COLUMN] */ ! ADD_P opt_column columnDef { AlterTableCmd *n = makeNode(AlterTableCmd); n->subtype = AT_AddColumn; --- 1503,1519 ---- ; alter_table_cmd: + /* ALTER TABLE ADD PARTITION BY RANGE (col, ...) ( START val,.... END val,....) */ + ADD_P RANGE PARTITION RangePartition + { + AlterTableCmd *n = makeNode(AlterTableCmd); + n->subtype = AT_AddPartition; + n->def = $4; + $$ = (Node *)n; + } + /* ALTER TABLE ADD [COLUMN] */ ! | ADD_P opt_column columnDef { AlterTableCmd *n = makeNode(AlterTableCmd); n->subtype = AT_AddColumn; *************** *** 1544,1549 **** --- 1563,1609 ---- n->def = (Node *) makeString($6); $$ = (Node *)n; } + | DROP RANGE PARTITION qualified_name + { + AlterTableCmd *n = makeNode(AlterTableCmd); + n->subtype = AT_DropPartitionByName; + + Partition *partition = makeNode(Partition); + partition->partName = $4; + partition->partitionCheck = NULL; + + n->def = (Node *)partition; + $$ = (Node *)n; + } + | DROP RANGE PARTITION RangePartition + { + AlterTableCmd *n = makeNode(AlterTableCmd); + n->subtype = AT_DropPartitionByRange; + n->def = $4; + $$ = (Node *)n; + } + | UPDATE RANGE PARTITION RangePartition TO RangePartition + { + AlterTableCmd *n = makeNode(AlterTableCmd); + UpdatePartitionStmt *u = makeNode(UpdatePartitionStmt); + u->prev = (Partition *)$4; + u->after = (Partition *)$6; + n->subtype = AT_UpdatePartition; + n->def = (Node *)u; + $$ = (Node *)n; + } + | SPLIT RANGE PARTITION qualified_name AT VALUES '(' ListPartitionList ')' INTO qualified_name AND qualified_name + { + AlterTableCmd *n = makeNode(AlterTableCmd); + SplitPartitionStmt *s = makeNode(SplitPartitionStmt); + s->partNames = lappend(s->partNames, $4); + s->partNames = lappend(s->partNames, $11); + s->partNames = lappend(s->partNames, $13); + s->splitValues = $8; + n->subtype = AT_SplitPartition; + n->def = (Node *)s; + $$ = (Node *)n; + } /* ALTER TABLE DROP [COLUMN] [RESTRICT|CASCADE] */ | DROP opt_column ColId opt_drop_behavior { *************** *** 1946,1953 **** * *****************************************************************************/ ! CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')' ! OptInherit OptWith OnCommitOption OptTableSpace { CreateStmt *n = makeNode(CreateStmt); $4->istemp = $2; --- 2006,2013 ---- * *****************************************************************************/ ! CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')' ! OptInherit OptWith OnCommitOption OptTableSpace OptPartition { CreateStmt *n = makeNode(CreateStmt); $4->istemp = $2; *************** *** 1958,1967 **** n->options = $9; n->oncommit = $10; n->tablespacename = $11; $$ = (Node *)n; } | CREATE OptTemp TABLE qualified_name OF qualified_name ! '(' OptTableElementList ')' OptWith OnCommitOption OptTableSpace { /* SQL99 CREATE TABLE OF (cols) seems to be satisfied * by our inheritance capabilities. Let's try it... --- 2018,2028 ---- n->options = $9; n->oncommit = $10; n->tablespacename = $11; + n->partAttr = $12; $$ = (Node *)n; } | CREATE OptTemp TABLE qualified_name OF qualified_name ! '(' OptTableElementList ')' OptWith OnCommitOption OptTableSpace OptPartition { /* SQL99 CREATE TABLE OF (cols) seems to be satisfied * by our inheritance capabilities. Let's try it... *************** *** 1975,1984 **** --- 2036,2167 ---- n->options = $10; n->oncommit = $11; n->tablespacename = $12; + n->partAttr = $13; $$ = (Node *)n; } ; + OptPartition: PARTITION BY PartitioningData { $$ = $3; } + | {$$ = NULL;} + ; + + PartitionColumns: ColId {$$ = list_make1(makeString($1));} + | PartitionColumns ',' ColId {$$ = lappend($1, makeString($3));} + ; + + PartitioningData: LIST1 '(' PartitionColumns ')' '(' ListPartitions ')' + { + PartitionAttrs *partData = makeNode(PartitionAttrs); + partData->partFunc = PART_LIST; + partData->numberOfPartitions = 0; + partData->colName = $3; + partData->partitions = $6; + $$ = (Node *)partData; + } + + | RANGE '(' PartitionColumns ')' '(' RangePartitions ')' + { + PartitionAttrs *partData = makeNode(PartitionAttrs); + partData->partFunc = PART_RANGE; + partData->numberOfPartitions = 0; + partData->colName = $3; + partData->partitions = $6; + $$ = (Node *)partData; + } + + + | ColId '(' PartitionColumns ')' PARTITIONS ICONST + { + PartitionAttrs *partData = makeNode(PartitionAttrs); + if(strcasecmp($1, "hash") != 0) + elog(ERROR, "Not a valid partition type."); + partData->partFunc = PART_HASH; + partData->numberOfPartitions = $6; + partData->colName = $3; + partData->partitions = NULL; + $$ = (Node *)partData; + } + ; + + ListPartitions: ListPartition { $$ = list_make1($1);} + | ListPartitions ',' ListPartition { $$ = lappend($1, $3);} + ; + + RangePartitions : RangePartition { $$ = list_make1($1); } + | RangePartitions ',' RangePartition { $$ = lappend($1, $3); } + ; + + ListPartitionList: a_expr { $$ = list_make1($1);} + | ListPartitionList ',' a_expr { $$ = lappend($1, $3);} + ; + + + OptPartitionName: /* Empty */ {$$ = NULL;} + | qualified_name + ; + + ListPartition : OptPartitionName '(' ListPartitionList ')' OptTableSpace + { + + Partition *partition = makeNode(Partition); + Constraint *constraint = NULL; + + partition->partName = $1; + + /* Make the constraint node */ + constraint = makeNode(Constraint); + + constraint->contype = PART_LIST; + constraint->name = NULL; + constraint->raw_expr = (Node *)$3; + constraint->cooked_expr = NULL; + constraint->keys = NULL; + constraint->indexspace = NULL; + constraint->partition_list_values = $3; + + partition->partitionCheck = constraint; + partition->tablespacename = $5; + $$ = (Node *)partition; + } + ; + + RangePartition : OptPartitionName '(' START ListPartitionList END_P ListPartitionList ')' OptTableSpace + { + + Partition *partition = makeNode(Partition); + Constraint *constraint = NULL; + + partition->partName = $1; + + /* Make the constraint node */ + constraint = makeNode(Constraint); + constraint->contype = PART_RANGE; + constraint->min_value = $4; + constraint->max_value = $6; + constraint->name = NULL; + + /* constraint->raw_expr = $3; */ + + constraint->cooked_expr = NULL; + constraint->keys = NULL; + constraint->indexspace = NULL; + + partition->partitionCheck = constraint; + partition->tablespacename = $8; + $$ = (Node *)partition; + } + ; + + + /*** + part_column_list: + ColId + { $$ = list_make1($1); } + |part_column_list ',' ColId + { $$ = lappend($1, $3); } + ; + ****/ + /* * Redundancy here is needed to avoid shift/reduce conflicts, * since TEMP is not a reserved word. See also OptTempTableName. *************** *** 9344,9360 **** { $$ = makeNullAConst(@1); } ; Iconst: ICONST { $$ = $1; }; Sconst: SCONST { $$ = $1; }; RoleId: ColId { $$ = $1; }; - SignedIconst: Iconst { $$ = $1; } | '+' Iconst { $$ = + $2; } | '-' Iconst { $$ = - $2; } ; - /* * Name classification hierarchy. * --- 9527,9592 ---- { $$ = makeNullAConst(@1); } + /* + | AexprConst '+' AexprConst + { + Node *l = $1; + Node *r = $3; + int iresult; + float fresult, flval, frval; + + if(l->val.type == T_Integer && r->val.type == T_Integer) + { + iresult = l->val.val.ival + r->val.val.ival; + $$ = makeIntConst(iresult, @1); + } + else if(l->val.type == T_Integer && r->val.type == T_Float) + { + sscanf(r.val.val.str, "%f", &frval); + fresult = (float)l->val.val.ival + frval; + $$ = makeFloatConst(fresult, @1); + } + else if(l->val.type == T_Float && r->val.type == T_Integer) + { + sscanf(l.val.val.str, "%f", &flval); + fresult = flval + (float)r->val.val.ival; + $$ = makeFloatConst(fresult, @1); + } + else if(l->val.type == T_Float && r->val.type == T_Float) + { + sscanf(l.val.val.str, "%f", &flval); + sscanf(r.val.val.str, "%f", &frval); + fresult = flval + frval; + $$ = makeFloatConst(fresult, @1); + } + else + elog(ERROR, "Invalid expression."); + } + if ( l->val.type == T_Float) || r->val.type == T_Float ) + { + $$ == makeFloatConst($1, @1); + } + else if ( l->val.type == T_Integer && r->val.type == T_Integer ) + $$ == makeIntConst($1, @1); + else + elog (ERROR, "Invalid expression "); + } + | AexprConst '-' AexprConst + { + } + | AexprConst '/' AexprConst + | AexprConst '*' AexprConst + | AexprConst '||' AexprConst + */ ; Iconst: ICONST { $$ = $1; }; Sconst: SCONST { $$ = $1; }; RoleId: ColId { $$ = $1; }; SignedIconst: Iconst { $$ = $1; } | '+' Iconst { $$ = + $2; } | '-' Iconst { $$ = - $2; } ; /* * Name classification hierarchy. * *************** *** 9556,9561 **** --- 9788,9795 ---- | OWNER | PARSER | PARTIAL + | PARTITION + | PARTITIONS | PASSWORD | PLANS | PREPARE *************** *** 9599,9604 **** --- 9833,9839 ---- | SHARE | SHOW | SIMPLE + | SPLIT | STABLE | STANDALONE_P | START *************** *** 9729,9738 **** --- 9964,9975 ---- | JOIN | LEFT | LIKE + | LIST1 | NATURAL | NOTNULL | OUTER_P | OVERLAPS + | RANGE | RIGHT | SIMILAR | VERBOSE Index: src/backend/parser/keywords.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/parser/keywords.c,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.8.3 diff -c -r1.1.1.1 -r1.1.1.1.8.3 *** src/backend/parser/keywords.c 1 Dec 2008 09:36:26 -0000 1.1.1.1 --- src/backend/parser/keywords.c 17 Mar 2009 12:50:55 -0000 1.1.1.1.8.3 *************** *** 186,191 **** --- 186,192 ---- {"greatest", GREATEST, COL_NAME_KEYWORD}, {"group", GROUP_P, RESERVED_KEYWORD}, {"handler", HANDLER, UNRESERVED_KEYWORD}, + // {"hash", HASH, UNRESERVED_KEYWORD}, {"having", HAVING, RESERVED_KEYWORD}, {"header", HEADER_P, UNRESERVED_KEYWORD}, {"hold", HOLD, UNRESERVED_KEYWORD}, *************** *** 231,236 **** --- 232,238 ---- {"level", LEVEL, UNRESERVED_KEYWORD}, {"like", LIKE, TYPE_FUNC_NAME_KEYWORD}, {"limit", LIMIT, RESERVED_KEYWORD}, + {"list", LIST1, UNRESERVED_KEYWORD}, {"listen", LISTEN, UNRESERVED_KEYWORD}, {"load", LOAD, UNRESERVED_KEYWORD}, {"local", LOCAL, UNRESERVED_KEYWORD}, *************** *** 291,296 **** --- 293,300 ---- {"owner", OWNER, UNRESERVED_KEYWORD}, {"parser", PARSER, UNRESERVED_KEYWORD}, {"partial", PARTIAL, UNRESERVED_KEYWORD}, + {"partition", PARTITION, UNRESERVED_KEYWORD}, + {"partitions", PARTITIONS, UNRESERVED_KEYWORD}, {"password", PASSWORD, UNRESERVED_KEYWORD}, {"placing", PLACING, RESERVED_KEYWORD}, {"plans", PLANS, UNRESERVED_KEYWORD}, *************** *** 305,310 **** --- 309,315 ---- {"procedural", PROCEDURAL, UNRESERVED_KEYWORD}, {"procedure", PROCEDURE, UNRESERVED_KEYWORD}, {"quote", QUOTE, UNRESERVED_KEYWORD}, + {"range", RANGE, UNRESERVED_KEYWORD}, {"read", READ, UNRESERVED_KEYWORD}, {"real", REAL, COL_NAME_KEYWORD}, {"reassign", REASSIGN, UNRESERVED_KEYWORD}, *************** *** 349,354 **** --- 354,360 ---- {"simple", SIMPLE, UNRESERVED_KEYWORD}, {"smallint", SMALLINT, COL_NAME_KEYWORD}, {"some", SOME, RESERVED_KEYWORD}, + {"split", SPLIT, UNRESERVED_KEYWORD}, {"stable", STABLE, UNRESERVED_KEYWORD}, {"standalone", STANDALONE_P, UNRESERVED_KEYWORD}, {"start", START, UNRESERVED_KEYWORD}, Index: src/backend/parser/parse_utilcmd.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/parser/parse_utilcmd.c,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.5 diff -c -r1.1.1.1 -r1.1.1.1.10.5 *** src/backend/parser/parse_utilcmd.c 1 Dec 2008 09:36:26 -0000 1.1.1.1 --- src/backend/parser/parse_utilcmd.c 13 Mar 2009 09:09:36 -0000 1.1.1.1.10.5 *************** *** 75,80 **** --- 75,81 ---- List *alist; /* "after list" of things to do after creating * the table */ IndexStmt *pkey; /* PRIMARY KEY index, if any */ + PartitionAttrs *partAttr; /* partitioning related info */ } CreateStmtContext; /* State shared by transformCreateSchemaStmt and its subroutines */ *************** *** 114,119 **** --- 115,121 ---- static void transformConstraintAttrs(List *constraintList); static void transformColumnType(ParseState *pstate, ColumnDef *column); static void setSchemaName(char *context_schema, char **stmt_schema_name); + static void transformPartitionData(ParseState *pstate, CreateStmtContext *cxt); /* *************** *** 177,182 **** --- 179,185 ---- cxt.alist = NIL; cxt.pkey = NULL; cxt.hasoids = interpretOidsOption(stmt->options); + cxt.partAttr = (PartitionAttrs *)stmt->partAttr; /* * Run through each primary element in the table creation clause. Separate *************** *** 235,240 **** --- 238,248 ---- transformFKConstraints(pstate, &cxt, true, false); /* + * Postprocess partition related information + */ + transformPartitionData(pstate, &cxt); + + /* * Output results. */ stmt->tableElts = cxt.columns; *************** *** 955,960 **** --- 963,1063 ---- return result; } + /* + * transformPartitionData: + * If partitioning is specified, carry out checks on the supplied create stmt + */ + static void + transformPartitionData(ParseState *pstate, CreateStmtContext *cxt) + { + PartitionAttrs *partAttr = (PartitionAttrs *)cxt->partAttr; + ListCell *lc, *temp_part; + List *partitionList, *partindexlist = NIL; + + if (partAttr == NULL) + return; + + partitionList = partAttr->partitions; + /* Foreign key constraints not supported with partitioning */ + if(cxt->fkconstraints) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("FOREIGN KEY constraints not supported with partitioning"))); + + /* + * Check if the primary key column is the one on which partitioning + * is being done + */ + + foreach(lc, cxt->ixconstraints) + { + Constraint *constraint = lfirst(lc); + ListCell *partCol; + bool flag; + + Assert(IsA(constraint, Constraint)); + + if(constraint->contype == CONSTR_PRIMARY || + constraint->contype == CONSTR_UNIQUE) + { + /* + if (constraint->keys->length > 1) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("PRIMARY/UNIQUE KEY with partitioning can specify" + " single attribute only"))); + } + */ + flag = false; + + foreach(partCol, partAttr->colName) + { + Alias *partColName = lfirst(partCol); + + if(strcmp(partColName->aliasname, strVal(lfirst(list_head(constraint->keys)))) == 0) + flag = true; + } + + if(flag == false) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("PRIMARY/UNIQUE KEY (%s) not the same as" + " the partitioning keys", strVal(lfirst(list_head(constraint->keys)))))); + } + } + } + + /* + * cxt.alist contains index related entries, if so, we need to add entries + * for the children tables too + */ + foreach(temp_part, partitionList) + { + Partition *temp_partition = lfirst(temp_part); + + foreach(lc, cxt->alist) + { + IndexStmt *index, *copyindex; + + index = lfirst(lc); + Assert(IsA(index, IndexStmt)); + + copyindex = copyObject(index); + *(copyindex->relation) = *(temp_partition->partName); + + partindexlist = lappend(partindexlist, copyindex); + } + } + + /* + * Append partindexlist to alist now, so that the indexes get created for + * the children tables too + */ + cxt->alist = list_concat(cxt->alist, partindexlist); + } + /* * transformIndexConstraints *************** *** 1770,1775 **** --- 1873,1890 ---- cmd->subtype = AT_AddConstraint; newcmds = lappend(newcmds, cmd); break; + + case AT_AddPartition: + newcmds = lappend(newcmds, cmd); + break; + + case AT_DropPartitionByName: + newcmds = lappend(newcmds, cmd); + break; + + case AT_DropPartitionByRange: + newcmds = lappend(newcmds, cmd); + break; default: newcmds = lappend(newcmds, cmd); Index: src/backend/tcop/utility.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/tcop/utility.c,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.8.2 diff -c -r1.1.1.1 -r1.1.1.1.8.2 *** src/backend/tcop/utility.c 1 Dec 2008 09:36:27 -0000 1.1.1.1 --- src/backend/tcop/utility.c 14 Feb 2009 11:30:22 -0000 1.1.1.1.8.2 *************** *** 417,427 **** relOid = DefineRelation((CreateStmt *) stmt, RELKIND_RELATION); /* * Let AlterTableCreateToastTable decide if this one * needs a secondary relation too. */ - CommandCounterIncrement(); AlterTableCreateToastTable(relOid); } else --- 417,432 ---- relOid = DefineRelation((CreateStmt *) stmt, RELKIND_RELATION); + CommandCounterIncrement(); + /* + * Let DefinePartitions decide if partitions need to be created + * for this table + */ + DefinePartitions((CreateStmt *)parsetree, relOid); /* * Let AlterTableCreateToastTable decide if this one * needs a secondary relation too. */ AlterTableCreateToastTable(relOid); } else Index: src/backend/utils/adt/numeric.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/utils/adt/numeric.c,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.1 diff -c -r1.1.1.1 -r1.1.1.1.10.1 *** src/backend/utils/adt/numeric.c 1 Dec 2008 09:36:27 -0000 1.1.1.1 --- src/backend/utils/adt/numeric.c 2 Feb 2009 11:07:12 -0000 1.1.1.1.10.1 *************** *** 3588,3594 **** char *endptr; tmp = DatumGetCString(DirectFunctionCall1(numeric_out, ! NumericGetDatum(num))); /* unlike float8in, we ignore ERANGE from strtod */ val = strtod(tmp, &endptr); --- 3588,3594 ---- char *endptr; tmp = DatumGetCString(DirectFunctionCall1(numeric_out, ! NumericGetDatum(num))); /* unlike float8in, we ignore ERANGE from strtod */ val = strtod(tmp, &endptr); Index: src/backend/utils/adt/ruleutils.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/utils/adt/ruleutils.c,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.1 diff -c -r1.1.1.1 -r1.1.1.1.10.1 *** src/backend/utils/adt/ruleutils.c 1 Dec 2008 09:36:27 -0000 1.1.1.1 --- src/backend/utils/adt/ruleutils.c 2 Feb 2009 11:07:12 -0000 1.1.1.1.10.1 *************** *** 41,46 **** --- 41,48 ---- #include "parser/parse_func.h" #include "parser/parse_oper.h" #include "parser/parsetree.h" + #include "parser/parse_clause.h" + #include "parser/parse_expr.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" #include "rewrite/rewriteSupport.h" *************** *** 6157,6160 **** --- 6159,6200 ---- ReleaseSysCache(tuple); return result; + } + + /* + * get the text corresponding to a raw expr + */ + char * + pg_get_expr_text(RangeVar *relation, Node *raw_expr, StringInfo buf) + { + deparse_context context; + Node *opex; + Node *temp; + deparse_namespace dpns; + RangeVar *old; + RangeVar *new; + ParseState *pstate = make_parsestate(NULL); + + old = makeNode(RangeVar); + old->relname = relation->relname; + old->alias = makeAlias("*OLD*", NIL); + + new = makeNode(RangeVar); + new->relname = relation->relname; + new->alias = makeAlias("*NEW*", NIL); + + transformFromClause(pstate, list_make3(relation, old, new)); + temp = transformExpr(pstate, raw_expr); + + opex = (Node *)temp; + context.buf = buf; + context.namespaces = list_make1(&dpns); + context.varprefix = true; + context.prettyFlags = PRETTYFLAG_PAREN | PRETTYFLAG_INDENT ; + context.indentLevel = PRETTYINDENT_STD; + dpns.rtable = pstate->p_rtable; + dpns.outer_plan = dpns.inner_plan = NULL; + + get_rule_expr(opex, &context, false); + return (context.buf)->data; } Index: src/include/catalog/indexing.h =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/catalog/indexing.h,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.2 diff -c -r1.1.1.1 -r1.1.1.1.10.2 *** src/include/catalog/indexing.h 1 Dec 2008 09:36:42 -0000 1.1.1.1 --- src/include/catalog/indexing.h 23 Feb 2009 12:33:10 -0000 1.1.1.1.10.2 *************** *** 57,62 **** --- 57,65 ---- * index name (much less the numeric OID). */ + DECLARE_INDEX(pg_part_partrel_index, 2337, on pg_partition using btree(parentrelid oid_ops, partrelid oid_ops, keyorder int2_ops)); + #define PartitionParentIndexId 2337 + DECLARE_UNIQUE_INDEX(pg_aggregate_fnoid_index, 2650, on pg_aggregate using btree(aggfnoid oid_ops)); #define AggregateFnoidIndexId 2650 Index: src/include/commands/tablecmds.h =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/commands/tablecmds.h,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.6 diff -c -r1.1.1.1 -r1.1.1.1.10.6 *** src/include/commands/tablecmds.h 1 Dec 2008 09:36:42 -0000 1.1.1.1 --- src/include/commands/tablecmds.h 17 Mar 2009 12:50:55 -0000 1.1.1.1.10.6 *************** *** 18,23 **** --- 18,30 ---- #include "utils/relcache.h" + typedef struct partitionInfo + { + char partitionName[20]; + Value minValue; + Value maxValue; + }partitionInfo; + extern Oid DefineRelation(CreateStmt *stmt, char relkind); extern void RemoveRelations(DropStmt *drop); *************** *** 69,73 **** --- 76,85 ---- extern void AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid); + + extern void DefinePartitions(CreateStmt *stmt, Oid parentRelId); + extern int GetPartitionsCount(Oid parentOid); + extern int GetPartitionsCountAtt(Oid parentOid, Oid attNum); + extern int ValidateRanges(PartitionAttrs *partAttr, Oid parentRelOid, Oid *partitionRelOids); #endif /* TABLECMDS_H */ Index: src/include/nodes/nodes.h =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/nodes/nodes.h,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.10.3 diff -c -r1.1.1.2 -r1.1.1.2.10.3 *** src/include/nodes/nodes.h 1 Dec 2008 09:38:07 -0000 1.1.1.2 --- src/include/nodes/nodes.h 17 Mar 2009 12:50:55 -0000 1.1.1.2.10.3 *************** *** 362,367 **** --- 362,371 ---- T_XmlSerialize, T_WithClause, T_CommonTableExpr, + T_PartitionAttrs, + T_Partition, + T_UpdatePartitionStmt, + T_SplitPartitionStmt, /* * TAGS FOR RANDOM OTHER STUFF Index: src/include/nodes/parsenodes.h =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/nodes/parsenodes.h,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.8.6 diff -c -r1.1.1.2 -r1.1.1.2.8.6 *** src/include/nodes/parsenodes.h 1 Dec 2008 09:38:07 -0000 1.1.1.2 --- src/include/nodes/parsenodes.h 17 Mar 2009 12:50:55 -0000 1.1.1.2.8.6 *************** *** 1025,1031 **** AT_EnableReplicaRule, /* ENABLE REPLICA RULE name */ AT_DisableRule, /* DISABLE RULE name */ AT_AddInherit, /* INHERIT parent */ ! AT_DropInherit /* NO INHERIT parent */ } AlterTableType; typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */ --- 1025,1036 ---- AT_EnableReplicaRule, /* ENABLE REPLICA RULE name */ AT_DisableRule, /* DISABLE RULE name */ AT_AddInherit, /* INHERIT parent */ ! AT_DropInherit, /* NO INHERIT parent */ ! AT_AddPartition, /* add partition */ ! AT_DropPartitionByName, /* drop partition by name. */ ! AT_DropPartitionByRange, /* drop partition by range. */ ! AT_UpdatePartition, /* Update the range partition. */ ! AT_SplitPartition /* Split range partition. */ } AlterTableType; typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */ *************** *** 1216,1221 **** --- 1221,1227 ---- List *options; /* options from WITH clause */ OnCommitAction oncommit; /* what do we do at COMMIT? */ char *tablespacename; /* table space to use, or NULL */ + Node *partAttr; /* partitioning related information */ } CreateStmt; /* ---------- *************** *** 1254,1271 **** CONSTR_ATTR_IMMEDIATE } ConstrType; typedef struct Constraint { ! NodeTag type; ! ConstrType contype; ! char *name; /* name, or NULL if unnamed */ ! Node *raw_expr; /* expr, as untransformed parse tree */ ! char *cooked_expr; /* expr, as nodeToString representation */ ! List *keys; /* String nodes naming referenced column(s) */ ! List *options; /* options from WITH clause */ ! char *indexspace; /* index tablespace for PKEY/UNIQUE ! * constraints; NULL for default */ } Constraint; /* ---------- * Definitions for FOREIGN KEY constraints in CreateStmt --- 1260,1338 ---- CONSTR_ATTR_IMMEDIATE } ConstrType; + + /* + * Enumeration which defines the various possibilites for the partitioning + * function. + */ + typedef enum PartitionFunction { + PART_UNDEFINED, + PART_LIST, + PART_RANGE, + PART_HASH + } PartitionFunction; + + typedef struct Constraint { ! NodeTag type; ! List *min_value; ! List *max_value; ! List *partition_list_values; ! // ConstrType contype; ! PartitionFunction contype; ! char *name; /* name, or NULL if unnamed */ ! Node *raw_expr; /* expr, as untransformed parse tree */ ! char *cooked_expr; /* expr, as nodeToString representation */ ! List *keys; /* String nodes naming referenced column(s) */ ! List *options; /* options from WITH clause */ ! char *indexspace; /* index tablespace for PKEY/UNIQUE ! * constraints; NULL for default */ } Constraint; + + //typedef struct PartitionConstraint + //{ + // Node *min_value; + // Node *max_value; + // List *list_partition_values; + // PartitionFunction contype; + // char *name; /* name, or NULL if unnamed */ + // Node *raw_expr; /* expr, as untransformed parse tree */ + // char *cooked_expr; /* expr, as nodeToString representation */ + // List *keys; /* String nodes naming referenced column(s) */ + // List *options; /* options from WITH clause */ + // char *indexspace; /* index tablespace for PKEY/UNIQUE + // * constraints; NULL for default */ + // } PartitionConstraint; + + typedef struct Partition { + NodeTag type; + RangeVar *partName; /* Name of the partition */ + Constraint *partitionCheck; /* per partition constraint */ + char *tablespacename; /* tablespace name*/ + } Partition; + + typedef struct UpdatePartitionStmt + { + NodeTag type; + Partition *prev; + Partition *after; + }UpdatePartitionStmt; + + typedef struct SplitPartitionStmt + { + NodeTag type; + List *partNames; + List *splitValues; + }SplitPartitionStmt; + + typedef struct PartitionAttrs { + NodeTag type; + int numberOfPartitions; /* Number of partitions */ + List *partitions; /* The list of partitions */ + PartitionFunction partFunc; /* type of partition */ + List *colName; /* partition column name */ + } PartitionAttrs; /* ---------- * Definitions for FOREIGN KEY constraints in CreateStmt Index: src/include/optimizer/planner.h =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/optimizer/planner.h,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.1 diff -c -r1.1.1.1 -r1.1.1.1.10.1 *** src/include/optimizer/planner.h 1 Dec 2008 09:36:42 -0000 1.1.1.1 --- src/include/optimizer/planner.h 4 Mar 2009 08:30:42 -0000 1.1.1.1.10.1 *************** *** 33,37 **** PlannerInfo *parent_root, bool hasRecursion, double tuple_fraction, PlannerInfo **subroot); - #endif /* PLANNER_H */ --- 33,36 ---- Index: src/include/utils/builtins.h =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/utils/builtins.h,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.1 diff -c -r1.1.1.1 -r1.1.1.1.10.1 *** src/include/utils/builtins.h 1 Dec 2008 09:36:46 -0000 1.1.1.1 --- src/include/utils/builtins.h 2 Feb 2009 11:07:12 -0000 1.1.1.1.10.1 *************** *** 16,21 **** --- 16,23 ---- #include "fmgr.h" #include "nodes/parsenodes.h" + #include "nodes/primnodes.h" + #include "lib/stringinfo.h" /* * Defined in adt/ *************** *** 562,567 **** --- 564,571 ---- extern const char *quote_identifier(const char *ident); extern char *quote_qualified_identifier(const char *namespace, const char *ident); + extern char *pg_get_expr_text(RangeVar *relation, + Node *raw_expr, StringInfo buf); /* tid.c */ extern Datum tidin(PG_FUNCTION_ARGS); Index: src/test/regress/regress.c =================================================================== RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/test/regress/regress.c,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.10.8 diff -c -r1.1.1.1 -r1.1.1.1.10.8 *** src/test/regress/regress.c 1 Dec 2008 09:36:47 -0000 1.1.1.1 --- src/test/regress/regress.c 9 Mar 2009 12:55:31 -0000 1.1.1.1.10.8 *************** *** 11,16 **** --- 11,34 ---- #include "executor/executor.h" /* For GetAttributeByName */ #include "commands/sequence.h" /* for nextval() */ + #include "postgres.h" + #include "executor/spi.h" + #include "commands/trigger.h" + + #include "optimizer/plancat.h" + #include "utils/fmgroids.h" + #include "catalog/pg_partition.h" + #include "catalog/indexing.h" + #include "utils/tqual.h" + #include "utils/lsyscache.h" + #include "access/hash.h" + + extern Datum trigf(PG_FUNCTION_ARGS); + + PG_FUNCTION_INFO_V1(trigf); + + #define ItemPointerGetDatum(X) PointerGetDatum(X) + #define P_MAXDIG 12 #define LDELIM '(' #define RDELIM ')' *************** *** 27,37 **** extern int oldstyle_length(int n, text *t); extern Datum int44in(PG_FUNCTION_ARGS); extern Datum int44out(PG_FUNCTION_ARGS); ! #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; #endif /* * Distance from a point to a path --- 45,365 ---- extern int oldstyle_length(int n, text *t); extern Datum int44in(PG_FUNCTION_ARGS); extern Datum int44out(PG_FUNCTION_ARGS); ! extern Oid find_inheritance_parent(Oid inhrelid); ! extern Oid get_relevent_hash_partition(HeapTuple tuple, Relation rel); ! Datum partition_update_trigger(PG_FUNCTION_ARGS); ! Datum partition_constraints_hash(PG_FUNCTION_ARGS); ! Datum partition_insert_trigger_hash(PG_FUNCTION_ARGS); ! Datum partition_insert_trigger(PG_FUNCTION_ARGS); #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; #endif + PG_FUNCTION_INFO_V1(partition_update_trigger); + + /* + * Update trigger for all partitions. + * Update is implemented as delete + insert operation. + * Delete the OLD row from child table and insert the NEW + * row on parent table. + */ + + Datum + partition_update_trigger(PG_FUNCTION_ARGS) + { + int col; + TriggerData *trigdata = (TriggerData *) fcinfo->context; + HeapTuple trigtuple = trigdata->tg_trigtuple; + HeapTuple newtuple = trigdata->tg_newtuple; + HeapTuple dummyTuple = NULL; + + Relation parent_relation = RelationIdGetRelation( + find_inheritance_parent ( + RelationGetRelid(trigdata->tg_relation))); + + StringInfo delquery = makeStringInfo(); + StringInfo insquery = makeStringInfo(); + bool isnull; + + resetStringInfo(delquery); + + appendStringInfo( delquery, " delete from %s where ctid = '%s';", + RelationGetRelationName(trigdata->tg_relation), + DatumGetCString(OidFunctionCall1(F_TIDOUT, + ItemPointerGetDatum(&trigtuple->t_self) )) + ); + + // elog(NOTICE,"query is :- %s",delquery->data); + SPI_connect(); + SPI_exec(delquery->data,1); + SPI_finish(); + + appendStringInfo(insquery," insert into %s values ('", RelationGetRelationName(parent_relation)); + for ( col = 1; col <= trigdata->tg_relation->rd_att->natts; col++ ) + { + + Oid typoutput; + bool typIsVarlen ; + getTypeOutputInfo ( trigdata->tg_relation->rd_att->attrs[col-1]->atttypid , &typoutput , &typIsVarlen ); + Datum d = heap_getattr(newtuple,col,trigdata->tg_relation->rd_att,&isnull); + appendStringInfo(insquery ,"%s%s", + DatumGetCString (OidFunctionCall1(typoutput,d)), + col < (trigdata->tg_relation->rd_att->natts) ? "','" :"');" + ); + } + + SPI_connect(); + SPI_exec(insquery->data,1); + SPI_finish(); + + RelationClose(parent_relation); + + /* Form a dummy-tuple to count no. of rows modified. */ + dummyTuple = palloc(newtuple->t_len); + dummyTuple->t_len = -1; + + return PointerGetDatum(dummyTuple); + } + + /* + * Constaints for hash partitions are implemented by monitoring inserts to child tables + * using following trigger which computes the hash value of partition key attributes + * and allows insert iff it matches with its designated hash value as specified + * in pg_partition catalog table. + */ + PG_FUNCTION_INFO_V1(partition_constraints_hash); + Datum + partition_constraints_hash(PG_FUNCTION_ARGS) + { + TriggerData *trigdata = (TriggerData *) fcinfo->context; + HeapTuple trigtuple = trigdata->tg_trigtuple; + Relation relation = trigdata->tg_relation; + Relation pg_partrel; + Oid relation_id; + ScanKeyData skey; + SysScanDesc pg_partscan; + HeapTuple pg_parttup; + int hash_partition_entries = 0; + List *distinct_part_key_list=NULL; + StringInfo str_to_hash = makeStringInfo(); + unsigned int hashValueFromCatalog; + Oid parentOid; + + /* make sure it's called as a trigger at all */ + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "partition_insert_trigger: not called by trigger manager"); + + /* Sanity checks */ + if (!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event) || !TRIGGER_FIRED_BEFORE(trigdata->tg_event)) + elog(ERROR, "partition_insert_trigger: not called on insert before"); + + relation_id = RelationGetRelid(relation); + + ScanKeyInit(&skey, + Anum_pg_partition_partrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(relation))); + + pg_partrel = heap_open(PartitionRelationId, AccessShareLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + bool isnull, typbyval; + int16 len; + bool isVariableLength; + Oid typoutput; + + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->partrelid == RelationGetRelid(relation)) + { + if(pg_part->parttype == PART_HASH) + { + if (!list_member_int(distinct_part_key_list, pg_part->partkey)) + { + distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey); + + /* Get attribute from tuple */ + Datum attr = heap_getattr(trigtuple, pg_part->partkey, relation->rd_att, &isnull); + + /* Get len and typbyval from pg_type */ + get_typlenbyval(pg_part->keytype, &len, &typbyval); + + /* Read the list value */ + getTypeOutputInfo(pg_part->keytype, &typoutput, &isVariableLength); + appendStringInfo(str_to_hash, DatumGetCString(OidFunctionCall1(typoutput, attr))); + + parentOid = pg_part->parentrelid; + hashValueFromCatalog = heap_getattr (pg_parttup, Anum_pg_partition_hashval, pg_partrel->rd_att, &isnull); + } + } + } + } + + systable_endscan(pg_partscan); + heap_close(pg_partrel, AccessShareLock); + + /*Scan key to scan pg_partition table on parentrelid*/ + ScanKeyInit(&skey, + Anum_pg_partition_parentrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(parentOid)); + + pg_partrel = heap_open(PartitionRelationId, AccessShareLock); + pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true, + SnapshotNow, 1, &skey); + + hash_partition_entries = 0; + + while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan))) + { + /* Instead of pg_part Use heap_getattr for accessing bytea coluns */ + Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup); + + if(pg_part->parttype == PART_HASH) + { + /* Increase the no. of hash partition entries count. */ + hash_partition_entries++; + } + } + + systable_endscan(pg_partscan); + heap_close(pg_partrel, AccessShareLock); + + unsigned int hashValue = DatumGetUInt32(hash_any(str_to_hash->data, strlen(str_to_hash->data))) % + (int)(hash_partition_entries / distinct_part_key_list->length); + + // elog(NOTICE, "String to be hashed : %s", str_to_hash->data); + + if(hashValueFromCatalog != hashValue) + elog(ERROR, "This row can not be inserted. Invalid hash value '%d'; expected '%d'", hashValue, hashValueFromCatalog); + + return PointerGetDatum(trigtuple); + } + + PG_FUNCTION_INFO_V1(partition_insert_trigger_hash); + Datum + partition_insert_trigger_hash(PG_FUNCTION_ARGS) + { + TriggerData *trigdata = (TriggerData *) fcinfo->context; + HeapTuple trigtuple= trigdata->tg_trigtuple; + Relation parent_relation = trigdata->tg_relation; + + char *child_table_name; + Relation child_table_relation; + Oid relation_id,parent_reloid; + + /* make sure it's called as a trigger at all */ + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "partition_insert_trigger: not called by trigger manager"); + + /* Sanity checks */ + if (!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event) || !TRIGGER_FIRED_BEFORE(trigdata->tg_event)) + elog(ERROR, "partition_insert_trigger: not called on insert before"); + + parent_reloid = RelationGetRelid(parent_relation); + + relation_id = get_relevent_hash_partition(trigtuple, parent_relation); + if (relation_id == InvalidOid) + elog(ERROR, "partition_insert_trigger: Invalid child table %s", child_table_name); + + child_table_relation = RelationIdGetRelation(relation_id); + if (child_table_relation == NULL) + elog(ERROR, "partition_insert_trigger: Failed to locate relation for child table %s", child_table_name); + + ResultRelInfo *resultRelInfo; + TupleTableSlot *slot; + EState *estate= CreateExecutorState(); + + resultRelInfo = makeNode(ResultRelInfo); + resultRelInfo->ri_RangeTableIndex = 1; + resultRelInfo->ri_RelationDesc = child_table_relation; + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + /* Set up a tuple slot too */ + slot = MakeSingleTupleTableSlot(trigdata->tg_relation->rd_att); + ExecStoreTuple(trigtuple, slot, InvalidBuffer, false); + + ExecConstraints(resultRelInfo, slot, estate); + + heap_insert(child_table_relation, trigtuple, GetCurrentCommandId(true), 0, NULL); + RelationClose(child_table_relation); + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState (estate); + + /* Form a dummy-tuple to count no. of rows modified. */ + trigtuple->t_len = -1; + + return PointerGetDatum(trigtuple); + } + + Oid + get_relevent_partition(HeapTuple tuple, Relation rel); + + PG_FUNCTION_INFO_V1(partition_insert_trigger); + Datum + partition_insert_trigger(PG_FUNCTION_ARGS) + { + TriggerData *trigdata = (TriggerData *) fcinfo->context; + HeapTuple trigtuple= trigdata->tg_trigtuple; + Relation parent_relation = trigdata->tg_relation; + + char *child_table_name; + Relation child_table_relation; + Oid relation_id,parent_reloid; + + /* make sure it's called as a trigger at all */ + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "partition_insert_trigger: not called by trigger manager"); + + /* Sanity checks */ + if (!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event) || !TRIGGER_FIRED_BEFORE(trigdata->tg_event)) + elog(ERROR, "partition_insert_trigger: not called on insert before"); + + parent_reloid = RelationGetRelid(parent_relation); + + relation_id = get_relevent_partition(trigtuple, parent_relation); + if (relation_id == InvalidOid) + elog(ERROR, "Could not find valid partition ... "); + + child_table_relation = RelationIdGetRelation(relation_id); + if (child_table_relation == NULL) + elog(ERROR, "partition_insert_trigger: Failed to locate relation for child table %s", child_table_name); + + ResultRelInfo *resultRelInfo; + TupleTableSlot *slot; + EState *estate= CreateExecutorState(); + + resultRelInfo = makeNode(ResultRelInfo); + resultRelInfo->ri_RangeTableIndex = 1; + resultRelInfo->ri_RelationDesc = child_table_relation; + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + /* Set up a tuple slot too */ + slot = MakeSingleTupleTableSlot(trigdata->tg_relation->rd_att); + ExecStoreTuple(trigtuple, slot, InvalidBuffer, false); + + ExecConstraints(resultRelInfo, slot, estate); + + heap_insert(child_table_relation, trigtuple, GetCurrentCommandId(true), 0, NULL); + RelationClose(child_table_relation); + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState (estate); + + /* Form a dummy-tuple to count no. of rows modified. */ + trigtuple->t_len = -1; + + return PointerGetDatum(trigtuple); + } /* * Distance from a point to a path