Partitioning feature ...

Started by Kedar Potdaralmost 17 years ago16 messages
#1Kedar Potdar
kedar.potdar@gmail.com
2 attachment(s)

Hi,

We are implementing table partitioning feature to support Range and Hash
partitions. Please find attached, the WIP patch and test-cases.

The syntax used conforms to most of the suggestions mentioned in
http://archives.postgresql.org/pgsql-hackers/2008-01/msg00413.php, barring
the following:
-- Specification of partition names is optional. System will be able to
generate partition names in such cases.
-- Sub partitioning

We are maintaining a system catalog(pg_partition) for partition meta-data.
System will look-up this table to find appropriate partition to operate on.
System internally uses low-level 'C' triggers to row-movement.

Regards,
--
Kedar.

Attachments:

partition.patchapplication/octet-stream; name=partition.patchDownload
? part_diff.txt
Index: src/backend/catalog/Makefile
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/catalog/Makefile,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.10.1
diff -c -r1.1.1.2 -r1.1.1.2.10.1
*** src/backend/catalog/Makefile	1 Dec 2008 09:37:46 -0000	1.1.1.2
--- src/backend/catalog/Makefile	10 Feb 2009 13:18:07 -0000	1.1.1.2.10.1
***************
*** 27,33 ****
  
  POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
  	pg_proc.h pg_type.h pg_attribute.h pg_class.h pg_autovacuum.h \
! 	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \
  	pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \
--- 27,33 ----
  
  POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
  	pg_proc.h pg_type.h pg_attribute.h pg_class.h pg_autovacuum.h \
! 	pg_attrdef.h pg_constraint.h pg_inherits.h pg_partition.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \
  	pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \
Index: src/backend/catalog/pg_operator.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/catalog/pg_operator.c,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.1
diff -c -r1.1.1.1 -r1.1.1.1.10.1
*** src/backend/catalog/pg_operator.c	1 Dec 2008 09:36:26 -0000	1.1.1.1
--- src/backend/catalog/pg_operator.c	19 Feb 2009 11:51:17 -0000	1.1.1.1.10.1
***************
*** 35,41 ****
  #include "utils/syscache.h"
  
  
! static Oid OperatorGet(const char *operatorName,
  			Oid operatorNamespace,
  			Oid leftObjectId,
  			Oid rightObjectId,
--- 35,41 ----
  #include "utils/syscache.h"
  
  
! Oid OperatorGet(const char *operatorName,
  			Oid operatorNamespace,
  			Oid leftObjectId,
  			Oid rightObjectId,
***************
*** 127,133 ****
   *
   *		*defined is set TRUE if defined (not a shell)
   */
! static Oid
  OperatorGet(const char *operatorName,
  			Oid operatorNamespace,
  			Oid leftObjectId,
--- 127,133 ----
   *
   *		*defined is set TRUE if defined (not a shell)
   */
! Oid
  OperatorGet(const char *operatorName,
  			Oid operatorNamespace,
  			Oid leftObjectId,
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/commands/tablecmds.c,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.8.22
diff -c -r1.1.1.2 -r1.1.1.2.8.22
*** src/backend/commands/tablecmds.c	1 Dec 2008 09:37:46 -0000	1.1.1.2
--- src/backend/commands/tablecmds.c	19 Mar 2009 12:34:56 -0000	1.1.1.2.8.22
***************
*** 49,54 ****
--- 49,55 ----
  #include "nodes/makefuncs.h"
  #include "nodes/nodeFuncs.h"
  #include "nodes/parsenodes.h"
+ #include "nodes/pg_list.h"
  #include "optimizer/clauses.h"
  #include "optimizer/plancat.h"
  #include "optimizer/prep.h"
***************
*** 76,82 ****
--- 77,96 ----
  #include "utils/snapmgr.h"
  #include "utils/syscache.h"
  #include "utils/tqual.h"
+ #include "tcop/tcopprot.h"
+ #include "utils/numeric.h"
  
+ /* For partitions */
+ #include "catalog/pg_partition.h"
+ #include "catalog/pg_operator.h"
+ #include "postgres.h"
+ #include "executor/spi.h"
+ #include "commands/trigger.h"
+ #include "utils/rel.h"
+ 
+ extern Datum trigf(PG_FUNCTION_ARGS);
+ 
+ PG_FUNCTION_INFO_V1(trigf);
  
  /*
   * ON COMMIT action list
***************
*** 97,102 ****
--- 111,123 ----
  	SubTransactionId deleting_subid;
  } OnCommitItem;
  
+ 
+ typedef struct PartitionRowInfo
+ {
+     ItemPointerData tid;
+     Oid             partRelOid;
+ }PartitionRowInfo;
+ 
  static List *on_commits = NIL;
  
  
***************
*** 114,130 ****
   * a pass determined by subcommand type.
   */
  
! #define AT_PASS_DROP			0		/* DROP (all flavors) */
! #define AT_PASS_ALTER_TYPE		1		/* ALTER COLUMN TYPE */
! #define AT_PASS_OLD_INDEX		2		/* re-add existing indexes */
! #define AT_PASS_OLD_CONSTR		3		/* re-add existing constraints */
! #define AT_PASS_COL_ATTRS		4		/* set other column attributes */
  /* We could support a RENAME COLUMN pass here, but not currently used */
! #define AT_PASS_ADD_COL			5		/* ADD COLUMN */
! #define AT_PASS_ADD_INDEX		6		/* ADD indexes */
! #define AT_PASS_ADD_CONSTR		7		/* ADD constraints, defaults */
! #define AT_PASS_MISC			8		/* other stuff */
! #define AT_NUM_PASSES			9
  
  typedef struct AlteredTableInfo
  {
--- 135,156 ----
   * a pass determined by subcommand type.
   */
  
! #define AT_PASS_DROP					0		/* DROP (all flavors) */
! #define AT_PASS_ALTER_TYPE				1		/* ALTER COLUMN TYPE */
! #define AT_PASS_OLD_INDEX				2		/* re-add existing indexes */
! #define AT_PASS_OLD_CONSTR				3		/* re-add existing constraints */
! #define AT_PASS_COL_ATTRS				4		/* set other column attributes */
  /* We could support a RENAME COLUMN pass here, but not currently used */
! #define AT_PASS_ADD_COL					5		/* ADD COLUMN */
! #define AT_PASS_ADD_INDEX				6		/* ADD indexes */
! #define AT_PASS_ADD_CONSTR				7		/* ADD constraints, defaults */
! #define AT_PASS_ADD_PARTITION   		8		/* ADD partition to an existing table. */
! #define AT_PASS_DROP_PARTITION_NAME 	9   	/* DROP partition(By name) of an existing table */
! #define AT_PASS_DROP_PARTITION_RANGE 	10		/* DROP partition(By range) of an existing table */
! #define AT_PASS_UPDATE_PARTITION		11		/* UPDATE range partition. */
! #define AT_PASS_SPLIT_PARTITION			12		/* SPLIT range partition. */
! #define AT_PASS_MISC					13		/* other stuff */
! #define AT_NUM_PASSES					14
  
  typedef struct AlteredTableInfo
  {
***************
*** 322,329 ****
  static void ATExecDropInherit(Relation rel, RangeVar *parent);
  static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
  							   ForkNumber forkNum, bool istemp);
! 
! 
  /* ----------------------------------------------------------------
   *		DefineRelation
   *				Creates a new relation.
--- 348,371 ----
  static void ATExecDropInherit(Relation rel, RangeVar *parent);
  static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
  							   ForkNumber forkNum, bool istemp);
! static void CreatePartitionTrigger(CreateStmt *stmt, Oid parentRelOid, Oid *partitionRelOids);
! static void MutateColumnRefs(Node *node, char *reference);
! Oid OperatorGet(const char *operatorName,
!             Oid operatorNamespace,
!             Oid leftObjectId,
!             Oid rightObjectId,
!             bool *defined);
! static Oid ATExecAddPartition(AlteredTableInfo *tab, Relation rel,
! 				Partition *part); /* Adding partition to an existing table. */
! static void ATExecDropPartitionByName(AlteredTableInfo *tab, Relation rel,
! 				Partition *part); /* Deleting partition of an existing table by name. */
! static void ATExecDropPartitionByRange(AlteredTableInfo *tab, Relation rel,
! 				Partition *part); /* Deleting partition of an existing table by Range. */
! static void ATExecUpdatePartition(AlteredTableInfo *tab, Relation rel,
! 				UpdatePartitionStmt *part); /* Deleting partition of an existing table by Range. */
! static void ATExecSplitPartition(AlteredTableInfo *tab, Relation rel,
! 				SplitPartitionStmt *part); /* Splitting partition of an existing table. */
! Oid get_relevent_partition(HeapTuple tuple, Relation rel);
  /* ----------------------------------------------------------------
   *		DefineRelation
   *				Creates a new relation.
***************
*** 2170,2175 ****
--- 2212,2297 ----
  						stmt, RelationGetRelationName(rel))));
  }
  
+ 
+ /*
+  * GetPartitionsCountAtt : Check if given table is a partitioned table and there 
+  * exists atleast one partition(s) for specified partition key column.
+  */
+ int GetPartitionsCount(Oid parentOid)
+ {
+ 	ScanKeyData skey;
+ 	Relation    pg_partrel;
+ 	SysScanDesc pg_partscan;
+ 	HeapTuple   pg_parttup;
+ 	int			partitions;
+ 
+ 	partitions = 0;
+ 	
+     /*Scan key to scan pg_partition table on parentrelid*/
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_parentrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(parentOid));
+ 
+     pg_partrel = heap_open(PartitionRelationId, AccessShareLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+ 	{
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 		
+ 		if(pg_part->keyorder == 1)
+ 			partitions++;
+ 	}
+ 
+ 	systable_endscan(pg_partscan);
+ 	heap_close(pg_partrel, AccessShareLock);
+ 
+ 	return partitions;
+ } 
+ 
+ /*
+  * GetPartitionsCountAtt : Check if given table is a partitioned table and there 
+  * exists atleast one partition(s) for specified partition key column.
+  */
+ int GetPartitionsCountAtt(Oid parentOid, Oid attNum)
+ {
+ 	ScanKeyData skey;
+ 	Relation    pg_partrel;
+ 	SysScanDesc pg_partscan;
+ 	HeapTuple   pg_parttup;
+ 	int			partitions;
+ 
+ 	partitions = 0;
+ 	
+     /*Scan key to scan pg_partition table on parentrelid*/
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_parentrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(parentOid));
+ 
+     pg_partrel = heap_open(PartitionRelationId, AccessShareLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+ 	{
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+ 		if(pg_part->partkey == attNum)
+ 			partitions++;
+ 	}
+ 
+ 	systable_endscan(pg_partscan);
+ 	heap_close(pg_partrel, AccessShareLock);
+ 
+ 	return partitions;
+ } 
+ 
+ 
  /*
   * AlterTable
   *		Execute ALTER TABLE, which can be a list of subcommands
***************
*** 2328,2333 ****
--- 2450,2459 ----
  	 */
  	switch (cmd->subtype)
  	{
+ 		case AT_AddPartition:	/* Add partition to an existing table. */
+ 			ATSimplePermissions(rel, false);
+ 			pass = AT_PASS_ADD_PARTITION;	
+ 			break;
  		case AT_AddColumn:		/* ADD COLUMN */
  			ATSimplePermissions(rel, false);
  			/* Performs own recursion */
***************
*** 2347,2352 ****
--- 2473,2494 ----
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
+ 		case AT_DropPartitionByName: /* Drop partition by Name.*/
+ 			ATSimplePermissions(rel, false);
+ 			pass = AT_PASS_DROP_PARTITION_NAME;	
+ 			break;
+ 		case AT_DropPartitionByRange: /* Drop partition by Range*/
+ 			ATSimplePermissions(rel, false);
+ 			pass = AT_PASS_DROP_PARTITION_RANGE;	
+ 			break;
+ 		case AT_UpdatePartition: /* */
+ 			ATSimplePermissions(rel, false);
+ 			pass = AT_PASS_UPDATE_PARTITION;
+ 			break;
+ 		case AT_SplitPartition: /* */
+ 			ATSimplePermissions(rel, false);
+ 			pass = AT_PASS_SPLIT_PARTITION;
+ 			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
  			ATSimplePermissions(rel, false);
  			ATSimpleRecursion(wqueue, rel, cmd, recurse);
***************
*** 2554,2565 ****
--- 2696,2722 ----
  {
  	switch (cmd->subtype)
  	{
+ 		case AT_AddPartition:   /* Add partition to an existing table. */
+ 			ATExecAddPartition(tab, rel, (Partition *)cmd->def);
+ 			break;
  		case AT_AddColumn:		/* ADD COLUMN */
  			ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
  			break;
  		case AT_ColumnDefault:	/* ALTER COLUMN DEFAULT */
  			ATExecColumnDefault(rel, cmd->name, cmd->def);
  			break;
+ 		case AT_DropPartitionByName:	/* DROP partition(by name) of an existing table.*/
+ 			ATExecDropPartitionByName(tab, rel, (Partition *)cmd->def);
+ 			break;
+ 		case AT_DropPartitionByRange:	/* DROP partition(by range) of an existing table.*/
+ 			ATExecDropPartitionByRange(tab, rel, (Partition *)cmd->def);
+ 			break;
+ 		case AT_UpdatePartition: /* Update Range partition.*/
+ 			ATExecUpdatePartition(tab, rel, (UpdatePartitionStmt *)cmd->def);
+ 			break;
+ 		case AT_SplitPartition: /* Update Range partition.*/
+ 			ATExecSplitPartition(tab, rel, (SplitPartitionStmt *)cmd->def);
+ 			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
  			ATExecDropNotNull(rel, cmd->name);
  			break;
***************
*** 3443,3448 ****
--- 3600,4910 ----
  	}
  }
  
+ /*
+  * IsValidRange : Check if the given range has values of valid types and 
+  *	min value is less than max value.
+  */
+ static bool 
+ IsValidRange(ListCell *minIndex, ListCell *maxIndex, Oid parentRelOid, char *partitionColumn, char *tableName)
+ {
+ 
+ 	Oid 		typ_oid;
+ 	int16		len;
+ 	bool		typbyval;
+ 	Node		*expr;
+ 	StringInfo	cbuf = makeStringInfo();
+ 	Datum		min_datum;
+ 	Datum		max_datum;
+ 	Oid			keycmp_oid;
+ 	bool		isdef;
+ 	Oid			keycmp_proc;
+ 	bool		lt_max;
+ 
+ 	/* Get the attribute type. */
+ 	typ_oid = get_atttype(parentRelOid, get_attnum(parentRelOid, partitionColumn));
+ 
+ 	/* Get the attribute length*/
+ 	get_typlenbyval(typ_oid, &len, &typbyval);
+ 
+ 	if(minIndex == NULL || maxIndex == NULL)
+ 		elog(ERROR, "Please specify the values for '%s' attribute of '%s' partition.", partitionColumn, tableName);
+ 
+ 	/* Transform the expression from "A_Expr" type to "OpExpr" type. This is needed
+ 	as expression evaluation does not happen at parser level. */
+ 	expr = transformExpr(NULL, (Node *)lfirst(minIndex));
+ 
+ 	/* Solve the expression. */
+ 	lfirst(minIndex) = eval_const_expressions(NULL, expr);
+ 
+ 	/* Convert the value to column's datatype. */
+ 	lfirst(minIndex) = coerce_to_specific_type(NULL, lfirst(minIndex), typ_oid, partitionColumn);
+ 
+ 	resetStringInfo(cbuf);
+ 
+ 	/* If it's a FuncExpr then execute the conversion function. */
+ 	if IsA((Node *)lfirst(minIndex), FuncExpr)
+ 	{
+ 		min_datum = OidFunctionCall1(((FuncExpr *)lfirst(minIndex))->funcid,
+ 						((Const *)lfirst(list_head(((FuncExpr *)lfirst(minIndex))->args)))->constvalue);
+ 		/* Store the datums back. */
+ 		((Const *)lfirst(minIndex))->constvalue = min_datum;
+ 	}
+ 	else
+ 	{
+ 		min_datum = ((Const *)lfirst(minIndex))->constvalue;
+ 	}
+ 
+ 	/* Transform the expression from "A_Expr" type to "OpExpr" type. This is needed
+ 	as expression evaluation does not happen at parser level. */
+ 	expr = transformExpr(NULL, (Node *)lfirst(maxIndex));
+ 
+ 	/* Solve the expression. */
+ 	lfirst(maxIndex) = eval_const_expressions(NULL, expr);
+ 
+ 	/* Convert the value to column's datatype. */
+ 	lfirst(maxIndex) = coerce_to_specific_type(NULL, lfirst(maxIndex), typ_oid, partitionColumn);
+ 
+ 	resetStringInfo(cbuf);
+ 
+ 	/* If it's a FuncExpr then execute the conversion function. */
+ 	if IsA((Node *)lfirst(maxIndex), FuncExpr)
+ 	{
+ 		max_datum = OidFunctionCall1(((FuncExpr *)lfirst(maxIndex))->funcid,
+ 						((Const *)lfirst(list_head(((FuncExpr *)lfirst(maxIndex))->args)))->constvalue);
+ 		/* Store the datums back. */
+ 		((Const *)lfirst(maxIndex))->constvalue = max_datum;
+ 	}
+ 	else
+ 	{
+ 		max_datum = ((Const *)lfirst(maxIndex))->constvalue;
+ 	}
+ 
+ 	keycmp_oid = OperatorGet("<", PG_CATALOG_NAMESPACE , typ_oid, typ_oid, &isdef);
+ 
+ 	if(isdef == FALSE)
+ 		elog(ERROR, "'<' operator is not defined for this type.");
+ 
+ 	keycmp_proc = get_opcode(keycmp_oid);
+ 	lt_max = DatumGetBool(OidFunctionCall2(keycmp_proc, min_datum, max_datum));
+ 
+ 	if(lt_max == 0)
+ 	{
+ 		if(tableName)
+ 		{
+ 			elog(ERROR, "First value should be less then second value for '%s' attribute of partition '%s'.",
+ 					partitionColumn, tableName);
+ 		}
+ 		else
+ 		{
+ 			elog(ERROR, "First value should be less then second value for '%s' attribute.",
+ 					partitionColumn);
+ 		}
+ 	}
+ 
+ 	return lt_max;
+ }
+ 
+ static bool
+ WriteRangePartitionToCatalog(Datum min_datum,
+ 								Datum max_datum,
+ 								int16 len,
+ 								bool typbyval,
+ 								Oid parentRelOid,
+ 								Oid	partRelOid,
+ 								int	attNum,
+ 								Oid typ_oid,
+ 								int keyOrder)
+ {
+ 
+ 	bytea 			*min_ba, *max_ba;
+ 	Datum 			values[Natts_pg_partition];
+ 	bool  			nulls [Natts_pg_partition];
+ 	int				i;
+ 
+ 	/* initialize nulls and values */
+ 	for (i = 0; i < Natts_pg_partition; i++)
+ 	{
+ 		nulls[i] = false;
+ 		values[i] = (Datum) NULL;
+ 	}
+ 
+ 	/* There are 3 cases here
+ 	 * 1. types with length = -1
+ 	 * 2. types with fixed length but passed by value (len < 4)
+ 	 * 3. types with fixed length but *not* passed by value (len > 4)
+ 	 */
+ 	if(len > 0)
+ 	{
+ 		min_ba = (bytea *) palloc(len+1+VARHDRSZ);
+ 		max_ba = (bytea *) palloc(len+1+VARHDRSZ);
+ 
+ 		if(typbyval)
+ 		{
+ 			memcpy(VARDATA(min_ba), &min_datum, len);
+ 			memcpy(VARDATA(max_ba), &max_datum, len);
+ 		}
+ 		else
+ 		{
+ 			memcpy(VARDATA(min_ba), (char *)min_datum, len);
+ 			memcpy(VARDATA(max_ba), (char *)max_datum, len);
+ 		}
+ 
+ 		SET_VARSIZE(min_ba, len+VARHDRSZ);
+ 		VARDATA(min_ba)[len] = '\0';
+ 		values[Anum_pg_partition_minval        -1]= (Datum)min_ba ;
+ 
+ 		SET_VARSIZE(max_ba, len+VARHDRSZ);
+ 		VARDATA(max_ba)[len] = '\0';
+ 		values[Anum_pg_partition_maxval        -1]=(Datum)max_ba;
+ 	}
+ 	else
+ 	{
+ 		values[Anum_pg_partition_minval        -1]=min_datum;
+ 		values[Anum_pg_partition_maxval        -1]=max_datum;
+ 	}
+ 
+ 	values[Anum_pg_partition_parentrelid -1]= ObjectIdGetDatum(parentRelOid);
+ 	values[Anum_pg_partition_partrelid  -1] = ObjectIdGetDatum(partRelOid);
+ 	values[Anum_pg_partition_parttype   -1] = Int8GetDatum(PART_RANGE);
+ 	values[Anum_pg_partition_partkey    -1] = ObjectIdGetDatum(attNum);
+ 	values[Anum_pg_partition_listval    -1] = Int8GetDatum(NULL);
+ 	nulls[Anum_pg_partition_listval     -1] = true;
+ 	values[Anum_pg_partition_hashval    -1] = Int8GetDatum(NULL);
+ 	nulls[Anum_pg_partition_hashval     -1] = true;
+ 	values[Anum_pg_partition_keytype    -1] = ObjectIdGetDatum(typ_oid);
+ 	values[Anum_pg_partition_keyorder   -1] = Int8GetDatum(keyOrder);
+ 
+ 	Relation r = heap_open(PartitionRelationId, RowExclusiveLock);
+ 	TupleDesc tupDesc = r->rd_att;
+ 	HeapTuple tup = heap_form_tuple(tupDesc, values, nulls);
+ 	simple_heap_insert(r, tup);
+ 	CatalogUpdateIndexes(r, tup); 
+     heap_close(r, RowExclusiveLock);
+ 	
+ 	return true;
+ }
+ 
+ static char* 
+ DisplayDatum(Datum datum, Oid type)
+ {
+ 	Oid		typeOut;
+ 	bool	isVariableLength;
+ 
+ 	getTypeOutputInfo(type, &typeOut, &isVariableLength);
+ 	return (DatumGetCString(OidFunctionCall1(typeOut, datum)));
+ }
+ 
+ static void 
+ AddCheckConstraint(Oid typ_oid, Datum min_datum, Datum max_datum, char *tableName, char *partitionColumn)
+ {
+ 	char		*expr_string;
+ 	StringInfo	func_constraint = makeStringInfo();
+ 	Oid			typoutput;
+ 	bool		isVariableLength;
+ 	List		*query_list;
+ 	Node		*parsetree;
+ 	
+ 
+ 	resetStringInfo(func_constraint);
+ 
+ 	getTypeOutputInfo(typ_oid, &typoutput, &isVariableLength);
+ 	expr_string = DatumGetCString(OidFunctionCall1(typoutput, min_datum));
+ 
+ 	/* Form the check constraint for child table. */
+ 	appendStringInfo(func_constraint, "ALTER TABLE %s ADD CHECK ( %s >= '%s' ",
+ 		tableName,
+ 		partitionColumn,
+ 		expr_string);
+ 
+ 	expr_string = DatumGetCString(OidFunctionCall1(typoutput, max_datum));
+ 	appendStringInfo(func_constraint, " AND %s < '%s' ); ",
+ 		partitionColumn,
+ 		expr_string);
+ 		
+ 	/* Parse the above SQL string and use this parsetree to create check constraint on child tables. */
+ 	query_list  = pg_parse_query(func_constraint->data);
+ 	parsetree = (Node *)lfirst(list_head(query_list));
+ 	AlterTable(( AlterTableStmt * )parsetree);			
+ 
+ }
+ 
+ static Datum 
+ GetDatum(Relation rel, Oid type, HeapTuple tuple, int column)
+ {
+ 	Datum		datum;
+ 	int16 		len;
+ 	bool		typeByVal;
+ 	bool		isNull;
+ 
+ 	/* Get len and typbyval from pg_type */
+ 	get_typlenbyval(type, &len, &typeByVal);
+ 
+ 	/* Get min attribute from catalog */
+ 	datum = heap_getattr (tuple, column, rel->rd_att, &isNull);
+ 
+ 	/* Three cases 
+ 	 * a. datatypes with typbyval true and fix length.
+ 	 * b. datatypes with byval false and fix length.
+ 	 * c. extendible datatypes where length -1.
+ 	 */
+ 	if (typeByVal)
+ 	{
+ 		memcpy(&datum, VARDATA_ANY(datum), len);
+ 	}
+ 	else if (len != -1)
+ 	{
+ 		datum = (Datum)VARDATA_ANY(datum);
+ 	}	
+ 	
+ 	return datum;
+ }
+ 
+ static Datum 
+ EvalExpr(ListCell *node, Oid type, char *colName)
+ {
+ 	Node 	*expr;
+ 	Datum 	datum;
+ 
+ 	/* Transform the expression from "A_Expr" type to "OpExpr" type. This is needed
+ 	as expression evaluation does not happen at parser level. */
+ 	expr = transformExpr(NULL, (Node *)lfirst(node));
+ 
+ 	/* Solve the expression. */
+ 	lfirst(node) = eval_const_expressions(NULL, expr);
+ 
+ 	/* Convert the value to column's datatype. */
+ 	lfirst(node) = coerce_to_specific_type(NULL, lfirst(node), type, colName);
+ 
+ 	/* If it's a FuncExpr then execute the conversion function. */
+ 	if IsA((Node *)lfirst(node), FuncExpr)
+ 	{
+ 		datum = OidFunctionCall1(((FuncExpr *)lfirst(node))->funcid,
+ 						((Const *)lfirst(list_head(((FuncExpr *)lfirst(node))->args)))->constvalue);
+ 		/* Store the datums back. */
+ 		((Const *)lfirst(node))->constvalue = datum;
+ 	}
+ 	else
+ 	{
+ 		datum = ((Const *)lfirst(node))->constvalue;
+ 	}
+ 
+ 	return datum;
+ }
+ 
+ static bool 
+ OperateDatums(char *operatorName, Oid type, Datum datum1, Datum datum2)
+ {
+ 	bool 			isDef;
+ 	bool 			result;
+ 	Oid  			operator;
+ 	Oid	 			opcode;
+ 
+ 	operator = OperatorGet(operatorName, PG_CATALOG_NAMESPACE , type, type, &isDef);
+ 
+ 	if(isDef == FALSE)
+ 		elog(ERROR, "'%s' operator is not defined for this type.", operatorName);
+ 
+ 	opcode = get_opcode(operator);
+ 	result = DatumGetBool(OidFunctionCall2(opcode, datum1, datum2));
+ 
+ 	return result;
+ }
+ 
+ static void 
+ ATExecSplitPartition(AlteredTableInfo *tab, Relation rel,
+ 	SplitPartitionStmt *part)
+ {
+ 	Oid 			relId;
+ 	ScanKeyData		skey;
+ 	Relation		pg_partrel;
+ 	SysScanDesc		pg_partscan;
+ 	HeapTuple		pg_parttup;
+ 	Datum 			minDatum;
+ 	Datum			maxDatum;
+ 	ListCell		*node;
+ 	char			*columnName;
+ 	Partition		*partition2 = NULL;
+ 	Partition		*partition1 = NULL;
+ 	Const			*cnst;
+ 	int16			len;
+ 	bool			typeByVal;
+ 	StringInfo		query = makeStringInfo();
+ 	List			*queryList;
+ 	Node			*parseTree;
+ 	Oid				backup;
+ 	
+ 	/* Get the relId from the tablename. */
+ 	if(list_head(part->partNames))
+ 	{
+ 		relId  = RelnameGetRelid(((RangeVar *)linitial(part->partNames))->relname);
+ 		if (relId == InvalidOid)
+ 			elog(ERROR, "Error : Could not find valid Rel-Id for '%s' in current search path.", ((RangeVar *)linitial(part->partNames))->relname);
+ 	}
+ 	else
+ 	{
+ 		elog(ERROR, "Please specify valid partition to be updated.");
+ 	}
+ 
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_partrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(relId));
+ 
+     pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+ 	{
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+         if(pg_part->parttype == PART_RANGE)
+         {
+ 			/* Fetch the min and max value for given attribute of partition. */
+ 			minDatum = GetDatum(pg_partrel, pg_part->keytype, pg_parttup, Anum_pg_partition_minval);
+ 			maxDatum = GetDatum(pg_partrel, pg_part->keytype, pg_parttup, Anum_pg_partition_maxval);
+ 
+ 			if(pg_part->keyorder == 1)
+ 			{
+ 				partition1					= makeNode(Partition);
+ 				partition1->partName 		= makeNode(RangeVar);
+ 				partition1->partitionCheck	= makeNode(Constraint);
+ 
+ 				partition2					= makeNode(Partition);
+ 				partition2->partName 		= makeNode(RangeVar);
+ 				partition2->partitionCheck	= makeNode(Constraint);
+ 
+ 				partition1->partName		= (RangeVar *)lsecond(part->partNames);
+ 				partition1->partitionCheck->max_value = part->splitValues;
+ 
+ 				partition2->partName		= (RangeVar *)lthird(part->partNames);
+ 				partition2->partitionCheck->min_value = part->splitValues;
+ 
+ 				node = list_head(part->splitValues);
+ 			}
+ 
+ 			/* Get the column name. */
+ 			columnName = get_attname (relId, pg_part->partkey);
+ 		
+ 			EvalExpr(node, pg_part->keytype, columnName);
+ 			DisplayDatum(((Const *)lfirst(node))->constvalue, pg_part->keytype);
+ 
+ 			if((OperateDatums(">", pg_part->keytype, ((Const *)lfirst(node))->constvalue, minDatum) == false) ||
+ 				(OperateDatums("<", pg_part->keytype, ((Const *)lfirst(node))->constvalue, maxDatum) == false))
+ 			{
+ 				elog(ERROR, "Split-point is not valid. Value for '%s' attribute should be in-between and excluding '%s' and '%s'.", columnName, 
+ 					DisplayDatum(minDatum, pg_part->keytype), DisplayDatum(maxDatum, pg_part->keytype));
+ 			}
+ 		
+ 			/* Get len and typbyval from pg_type */
+ 			get_typlenbyval(pg_part->keytype, &len, &typeByVal);
+ 
+ 			/* Form the const node out of datums. */
+ 			cnst = makeConst(pg_part->keytype,
+ 						-1,
+ 						len,
+ 						maxDatum,
+ 						false,
+ 						typeByVal);
+ 
+ 			cnst->location = -1;
+ 
+ 			partition2->partitionCheck->max_value = lappend(partition2->partitionCheck->max_value, cnst);
+ 
+ 			/* Form the const node out of datums. */
+ 			cnst = makeConst(pg_part->keytype,
+ 						-1,
+ 						len,
+ 						minDatum,
+ 						false,
+ 						typeByVal);
+ 
+ 			cnst->location = -1;
+ 
+ 			partition1->partitionCheck->min_value = lappend(partition1->partitionCheck->min_value, cnst);
+ 
+ 			node = lnext(node);
+ 
+ 			/* Delete the entry for current table.*/
+ 			simple_heap_delete(pg_partrel, &pg_parttup->t_self);	
+ 			CatalogUpdateIndexes(pg_partrel, pg_parttup); 
+ 		}
+ 	}
+ 
+     systable_endscan(pg_partscan);
+ 	heap_close(pg_partrel, RowExclusiveLock);
+ 
+ 	ATExecAddPartition(tab, rel, partition1);
+ 	backup = ATExecAddPartition(tab, rel, partition2);
+ 
+ 	if((partition1 != NULL) && (partition2 != NULL))
+ 	{
+ 		Snapshot		snap;
+ 		Relation 		tmpRel;
+ 		HeapScanDesc	scanDesc;
+ 		HeapTuple		tuple;
+ 		Oid				relation_id;
+ 		Relation		child_table_relation;
+ 
+ 		snap = GetActiveSnapshot();
+ 		tmpRel 	= heap_open(relId, RowExclusiveLock);
+ 		scanDesc = heap_beginscan(tmpRel, snap, 0, NULL);
+ 
+     	while (HeapTupleIsValid(tuple = heap_getnext(scanDesc, ForwardScanDirection)))
+ 		{
+ 
+ 			relation_id = get_relevent_partition(tuple, rel);
+ 
+ 			if (relation_id == InvalidOid)
+ 				relation_id = backup;
+ 
+ 			child_table_relation = RelationIdGetRelation(relation_id);
+ 
+ 			if (child_table_relation != NULL)
+ 			{
+ 
+ 			ResultRelInfo *resultRelInfo;
+ 			TupleTableSlot *slot;
+ 			EState *estate= CreateExecutorState();
+ 
+ 			resultRelInfo = makeNode(ResultRelInfo);
+ 			resultRelInfo->ri_RangeTableIndex = 1;
+ 			resultRelInfo->ri_RelationDesc = child_table_relation;
+ 
+ 			estate->es_result_relations = resultRelInfo;
+ 			estate->es_num_result_relations = 1;
+ 			estate->es_result_relation_info = resultRelInfo;
+ 
+ 			/* Set up a tuple slot too */
+ 			slot = MakeSingleTupleTableSlot(rel->rd_att);
+ 			ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+ 
+ 			ExecConstraints(resultRelInfo, slot, estate);
+ 
+ 			heap_insert(child_table_relation, tuple, GetCurrentCommandId(true), 0, NULL);
+ 			RelationClose(child_table_relation);
+ 			ExecDropSingleTupleTableSlot(slot);
+ 			FreeExecutorState (estate);
+ 			}
+ 			else
+ 			{
+ 				elog(ERROR, "Could not migrate row.");
+ 			}
+ 			
+ 		}
+ 		
+ 		heap_endscan(scanDesc);
+ 		heap_close(tmpRel, RowExclusiveLock);
+ 
+ 		/* clear the buffer. */
+ 		resetStringInfo(query);
+ 
+ 		appendStringInfo(query, "DROP TABLE %s CASCADE;", ((RangeVar *)linitial(part->partNames))->relname);
+ 		queryList = pg_parse_query(query->data);
+ 		parseTree = (Node *)lfirst(list_head(queryList));
+ 		RemoveRelations((DropStmt *)parseTree);
+ 	}
+ }
+ 
+ static void 
+ ATExecUpdatePartition(AlteredTableInfo *tab, Relation rel,
+ 	UpdatePartitionStmt *part)
+ {
+ 	Oid				relId;
+ 	ScanKeyData 	skey;
+ 	Relation		pg_partrel;
+ 	SysScanDesc		pg_partscan;
+ 	HeapTuple		pg_parttup;
+ 	ListCell		*minIndexPrev, *minIndexAfter;
+ 	ListCell		*maxIndexPrev, *maxIndexAfter;
+ 	char			*partitionColumn;
+ 	Oid				keycmp_oid;
+ 	bool			isdef;
+ 	Oid				keycmp_proc;
+ 	bool			gte_max, lte_min;
+ 	List			*distinct_part_rel_oid_list = NULL;
+ 	List			*distinct_part_key_list = NULL;
+ 	Oid				*childOids;
+ 	int				j = 0;
+ 	Partition		*tmp;	
+ 	PartitionAttrs	*partition_attr;
+ 	int16			len;
+ 	bool			typbyval;
+ 	Datum			part_attr;
+ 	bool			isnull;
+ 	Datum			short_datum;
+ 	Const			*tmp_const;
+ 	int				keyorder;
+ 	Oid				typ_oid;
+ 	StringInfo		func_constraint = makeStringInfo();
+ 	List			*query_list;
+ 	Node			*parsetree;
+ 
+ 	/* Get the relId from the tablename. */
+ 	if(part->prev->partName->relname)
+ 	{
+ 		relId  = RelnameGetRelid(part->prev->partName->relname);
+ 		if (relId == InvalidOid)
+ 			elog(ERROR, "Error : Could not find valid Rel-Id for '%s' in current search path.", part->prev->partName->relname);
+ 		part->after->partName = part->prev->partName;
+ 	}
+ 	else
+ 	{
+ 		elog(ERROR, "Please specify valid partition to be updated.");
+ 	}
+ 
+ 	childOids = malloc(sizeof(Oid)*GetPartitionsCount(RelationGetRelid(rel))); 
+ 
+ 	ScanKeyInit(&skey,
+ 		Anum_pg_partition_parentrelid,
+ 		BTEqualStrategyNumber, F_OIDEQ,
+ 		ObjectIdGetDatum(RelationGetRelid(rel)));
+ 
+ 	pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock);
+ 	pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+ 					SnapshotNow, 1, &skey);
+ 
+ 	/* Create partition attribute node. */
+ 	partition_attr = makeNode(PartitionAttrs);
+  
+ 	while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+ 	{
+ 		/* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+ 		Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+ 		if(pg_part->parttype == PART_RANGE)
+ 		{
+ 
+ 			if (!list_member_int(distinct_part_key_list, pg_part->partkey))
+ 			{
+ 				Alias *colName;
+ 
+ 				distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey);
+ 
+ 				/* Get the name of partition key column */
+ 				partitionColumn = get_attname (rel->rd_id, pg_part->partkey);
+ 
+ 				/* Add the partition key columns. */
+ 				colName = makeAlias(partitionColumn, NULL);
+ 				partition_attr->colName = lappend(partition_attr->colName, colName);								
+ 			}
+ 
+ 			if(relId == pg_part->partrelid)
+ 			{
+ 				if(pg_part->keyorder == 1)
+ 				{
+ 					keyorder = 0;
+ 
+ 					/* Set the ptr to first value. */
+ 					minIndexPrev = part->prev->partitionCheck->min_value->head;
+ 					maxIndexPrev = part->prev->partitionCheck->max_value->head;
+ 					minIndexAfter = part->after->partitionCheck->min_value->head;
+ 					maxIndexAfter = part->after->partitionCheck->max_value->head;
+ 				}
+ 			
+ 				/* Get the name of partition key column */
+ 				partitionColumn = get_attname (rel->rd_id, pg_part->partkey);	
+ 
+ 				/* Get len and typbyval from pg_type */
+ 				get_typlenbyval(pg_part->keytype, &len, &typbyval);
+ 
+ 				typ_oid = pg_part->keytype;
+ 
+ 				/* Is this a valid range? */	
+ 				IsValidRange(minIndexPrev, maxIndexPrev, RelationGetRelid(rel), partitionColumn, part->prev->partName->relname);			
+ 				IsValidRange(minIndexAfter, maxIndexAfter, RelationGetRelid(rel), partitionColumn, part->prev->partName->relname);			
+ 
+ 				resetStringInfo(func_constraint);
+ 
+ 				appendStringInfo(func_constraint, "ALTER TABLE %s DROP CONSTRAINT %s_%s_check; ", 
+ 						part->prev->partName->relname, 
+ 						part->prev->partName->relname, 
+ 						partitionColumn);
+ 
+ 				/* Parse the above SQL string and use this parsetree to create check constraint on child tables. */
+ 				query_list  = pg_parse_query(func_constraint->data);
+ 				parsetree = (Node *)lfirst(list_head(query_list));
+ 				AlterTable(( AlterTableStmt * )parsetree);
+ 
+ 				AddCheckConstraint(typ_oid, 
+ 					((Const *)lfirst(minIndexAfter))->constvalue, 
+ 					((Const *)lfirst(maxIndexAfter))->constvalue, 
+ 					part->prev->partName->relname, 
+ 					partitionColumn);	
+ 
+ 				keycmp_oid = OperatorGet(">=", PG_CATALOG_NAMESPACE , pg_part->keytype, pg_part->keytype, &isdef);
+ 
+ 				if(isdef == FALSE)
+ 					elog(ERROR, "'>=' operator is not defined for this type.");
+ 
+ 				keycmp_proc = get_opcode(keycmp_oid);
+ 				gte_max = DatumGetBool(OidFunctionCall2(keycmp_proc, ((Const *)lfirst(maxIndexAfter))->constvalue, ((Const *)lfirst(maxIndexPrev))->constvalue));
+ 
+ 				if(gte_max == false)
+ 					elog(ERROR, "Range of partition can only be extended.");
+ 
+ 				keycmp_oid = OperatorGet("<=", PG_CATALOG_NAMESPACE , pg_part->keytype, pg_part->keytype, &isdef);
+ 
+ 				if(isdef == FALSE)
+ 					elog(ERROR, "'<=' operator is not defined for this type.");
+ 
+ 				keycmp_proc = get_opcode(keycmp_oid);
+ 				lte_min = DatumGetBool(OidFunctionCall2(keycmp_proc, ((Const *)lfirst(minIndexAfter))->constvalue, ((Const *)lfirst(minIndexPrev))->constvalue));
+ 
+ 				if(lte_min == false)
+ 					elog(ERROR, "Range of partition can only be extended.");
+ 
+ 				/* Delete the row. */
+ 				simple_heap_delete(pg_partrel, &pg_parttup->t_self);
+ 
+ 				/* Insert the new updated row. */
+ 
+ 				WriteRangePartitionToCatalog(((Const *)lfirst(minIndexAfter))->constvalue,
+ 					((Const *)lfirst(maxIndexAfter))->constvalue,
+ 					len,
+ 					typbyval,
+ 					RelationGetRelid(rel),
+ 					relId,
+ 					get_attnum(RelationGetRelid(rel), partitionColumn),
+ 					typ_oid,
+ 					++keyorder										
+ 					);					
+ 
+ 				minIndexPrev = minIndexPrev->next;
+ 				maxIndexPrev = maxIndexPrev->next;
+ 				minIndexAfter = minIndexAfter->next;
+ 				maxIndexAfter = maxIndexAfter->next;
+ 			}
+ 			else
+ 			{
+ 				if (!list_member_oid(distinct_part_rel_oid_list, pg_part->partrelid))
+ 				{
+ 					distinct_part_rel_oid_list = lappend_oid(distinct_part_rel_oid_list, pg_part->partrelid);
+ 					childOids[j++] = pg_part->partrelid;
+ 				
+ 					tmp = makeNode(Partition);
+ 					tmp->partName 		= makeNode(RangeVar);
+ 					tmp->partitionCheck	= makeNode(Constraint);
+ 
+ 					/* Add this partition to partition attributes. */
+ 					partition_attr->partitions = lappend(partition_attr->partitions, tmp);
+ 				}
+ 
+ 				Relation tmp_rel = RelationIdGetRelation(pg_part->partrelid);
+ 				tmp->partName->relname = RelationGetRelationName(tmp_rel);
+ 				relation_close(tmp_rel, 0);
+ 
+ 				/* Get the name of partition key column */
+ 				partitionColumn = get_attname (rel->rd_id, pg_part->partkey);	
+ 
+         	    /* Get len and typbyval from pg_type */
+             	get_typlenbyval(pg_part->keytype, &len, &typbyval);
+ 			
+ 	      		/* Get min attribute from catalog */
+ 		        part_attr = heap_getattr (pg_parttup, Anum_pg_partition_minval, pg_partrel->rd_att, &isnull);
+ 
+    			 	/* Three cases a. datatypes with typbyval true and fix length.
+          		 * b. datatypes with byval false and fix length.
+ 	         	 * c. extendible datatypes where length -1.
+     	     	 */
+ 	    	    if ( typbyval )
+     	    	{
+         	    	short_datum = 0;
+ 	            	memcpy(&short_datum, VARDATA_ANY(part_attr), len);
+ 		            part_attr = short_datum;
+     		    }
+         		else if (len != -1)
+ 		        {
+     		        part_attr = (Datum)VARDATA_ANY(part_attr);
+         		}
+ 
+ 				/* Create const. */		
+ 				tmp_const = makeConst(pg_part->keytype,	/* Const type */
+ 										-1,
+ 										len,
+ 										part_attr,
+ 										false,
+ 										typbyval);
+ 
+ 				tmp_const->location = -1;
+ 
+ 				/* Add this to new partition. */
+ 				tmp->partitionCheck->min_value = lappend(tmp->partitionCheck->min_value, tmp_const);
+ 
+        			/* Get min attribute from catalog */
+ 		        part_attr = heap_getattr (pg_parttup, Anum_pg_partition_maxval, pg_partrel->rd_att, &isnull);
+ 
+    			    /* Three cases a. datatypes with typbyval true and fix length.
+         	 	 * b. datatypes with byval false and fix length.
+          		 * c. extendible datatypes where length -1.
+ 	         	 */
+     	    	if ( typbyval )
+         		{
+             		short_datum = 0;
+ 	            	memcpy(&short_datum, VARDATA_ANY(part_attr), len);
+ 	    	        part_attr = short_datum;
+     	    	}
+ 	    	    else if (len != -1)
+     	    	{
+         	    	part_attr = (Datum)VARDATA_ANY(part_attr);
+ 		        }
+ 
+ 				/* Create const. */		
+ 				tmp_const = makeConst(pg_part->keytype,	/* Const type */
+ 										-1,
+ 										len,
+ 										part_attr,
+ 										false,
+ 										typbyval);
+ 
+ 				tmp_const->location = -1;
+ 
+ 				/* Add this to new partition. */
+ 				tmp->partitionCheck->max_value = lappend(tmp->partitionCheck->max_value, tmp_const);
+ 			}
+ 		}
+ 	}
+ 
+ 	childOids[j++] = relId;
+ 	partition_attr->partitions = lappend(partition_attr->partitions, part->after);
+ 
+ 	ValidateRanges(partition_attr, RelationGetRelid(rel), childOids);
+ 
+ 	systable_endscan(pg_partscan);
+ 	heap_close(pg_partrel, RowExclusiveLock);
+ }
+ 
+ 
+ static void
+ ATExecDropPartitionByRange(AlteredTableInfo *tab, Relation rel,
+     Partition *part)
+ {
+ 	List		*rows_to_be_deleted_list = NULL;
+ 	ListCell	*row_to_be_deleted;
+ 	List		*distinct_part_key_list= NULL;
+     ScanKeyData	skey;
+     SysScanDesc	pg_partscan;
+     HeapTuple  	pg_parttup;
+     Relation   	pg_partrel;
+ 	StringInfo	query = makeStringInfo();
+ 	Node		*parsetree;
+ 	List		*query_list = NULL;
+ 	char		*partitionColumn;
+ 	int16		len;
+ 	ListCell	*minIndex;
+ 	ListCell	*maxIndex;
+ 	bool		typbyval;
+ 	bool		isnull;
+ 	Datum		min_datum;
+ 	Datum		max_datum;
+ 	Datum		short_datum;
+ 	Oid			keycmp_oid;
+ 	bool		isdef;
+ 	Oid			keycmp_proc;
+ 	bool		lte_min;
+ 	bool		gte_max;
+ 	bool		IsRelDeleted = false;
+ 	PartitionRowInfo	*partRowInfo;	
+ 
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_parentrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(rel)));
+ 
+     pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+ 	{
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+         if(pg_part->parttype == PART_RANGE)
+         {
+ 			if (!list_member_int(distinct_part_key_list, pg_part->partkey))
+ 			{
+ 				distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey);				
+ 			}	
+ 			if(pg_part->keyorder == 1)
+ 			{
+ 				IsRelDeleted = false;
+ 
+ 				/* Set the ptr to first value. */
+ 				minIndex = part->partitionCheck->min_value->head;
+ 				maxIndex = part->partitionCheck->max_value->head;
+ 
+ 				if(list_length(distinct_part_key_list)== list_length(rows_to_be_deleted_list))
+ 				{
+ 				foreach(row_to_be_deleted, rows_to_be_deleted_list)
+ 				{
+ 					PartitionRowInfo *tmp = (PartitionRowInfo *)lfirst(row_to_be_deleted);
+ 					simple_heap_delete(pg_partrel, &tmp->tid);
+ 
+ 					if(IsRelDeleted == false)
+ 					{
+ 						IsRelDeleted = true;
+ 						resetStringInfo(query);
+ 						Relation Rel = RelationIdGetRelation(tmp->partRelOid);						
+ 						appendStringInfo(query, "DROP TABLE %s CASCADE;", RelationGetRelationName(Rel));
+ 						relation_close(Rel, 0);
+ 						query_list = pg_parse_query(query->data);
+ 						parsetree = (Node *)lfirst(list_head(query_list));
+ 						RemoveRelations((DropStmt *)parsetree);						
+ 					}
+ 
+ 					pfree(tmp);
+ 				}
+ 				}
+ 
+ 				list_free(rows_to_be_deleted_list);
+ 				rows_to_be_deleted_list = NULL;
+ 			}
+ 				
+ 			/* Get the name of partition key column */
+ 			partitionColumn = get_attname (rel->rd_id, pg_part->partkey);	
+ 
+             /* Get len and typbyval from pg_type */
+             get_typlenbyval(pg_part->keytype, &len, &typbyval);
+ 			
+       		/* Get min attribute from catalog */
+ 	        min_datum = heap_getattr (pg_parttup, Anum_pg_partition_minval, pg_partrel->rd_att, &isnull);
+ 
+    		 	/* Three cases a. datatypes with typbyval true and fix length.
+          	 * b. datatypes with byval false and fix length.
+          	 * c. extendible datatypes where length -1.
+          	 */
+ 	        if ( typbyval )
+     	    {
+         	    short_datum = 0;
+             	memcpy(&short_datum, VARDATA_ANY(min_datum), len);
+ 	            min_datum = short_datum;
+     	    }
+         	else if (len != -1)
+ 	        {
+     	        min_datum = (Datum)VARDATA_ANY(min_datum);
+ 			}				
+ 
+       		/* Get max attribute from catalog */
+ 	        max_datum = heap_getattr (pg_parttup, Anum_pg_partition_maxval, pg_partrel->rd_att, &isnull);
+ 
+    		 	/* Three cases a. datatypes with typbyval true and fix length.
+          	 * b. datatypes with byval false and fix length.
+          	 * c. extendible datatypes where length -1.
+          	 */
+ 	        if ( typbyval )
+     	    {
+         	    short_datum = 0;
+             	memcpy(&short_datum, VARDATA_ANY(max_datum), len);
+ 	            max_datum = short_datum;
+     	    }
+         	else if (len != -1)
+ 	        {
+     	        max_datum = (Datum)VARDATA_ANY(max_datum);
+ 			}				
+ 			
+ 			/* Is this a valid range? */	
+ 			IsValidRange(minIndex, maxIndex, RelationGetRelid(rel), partitionColumn, NULL);			
+ 	
+ 			keycmp_oid = OperatorGet("<=", PG_CATALOG_NAMESPACE , pg_part->keytype, pg_part->keytype, &isdef);
+ 
+ 			if(isdef == FALSE)
+ 				elog(ERROR, "'<=' operator is not defined for this type.");
+ 
+ 			keycmp_proc = get_opcode(keycmp_oid);
+ 			lte_min = DatumGetBool(OidFunctionCall2(keycmp_proc, ((Const *)lfirst(minIndex))->constvalue, min_datum));
+ 
+ 			keycmp_oid = OperatorGet(">=", PG_CATALOG_NAMESPACE , pg_part->keytype, pg_part->keytype, &isdef);
+ 
+ 			if(isdef == FALSE)
+ 				elog(ERROR, "'>=' operator is not defined for this type.");
+ 
+ 			keycmp_proc = get_opcode(keycmp_oid);
+ 			gte_max = DatumGetBool(OidFunctionCall2(keycmp_proc, ((Const *)lfirst(maxIndex))->constvalue, max_datum));
+ 
+ 			if(lte_min && gte_max)
+ 			{
+ 				partRowInfo = palloc(sizeof(PartitionRowInfo));
+ 				memcpy(&partRowInfo->tid, &pg_parttup->t_self, sizeof(ItemPointerData));
+ 				partRowInfo->partRelOid = pg_part->partrelid;
+ 				rows_to_be_deleted_list = lappend(rows_to_be_deleted_list, partRowInfo);
+ 			}
+ 			else
+ 			{
+ 				list_free(rows_to_be_deleted_list);
+ 				rows_to_be_deleted_list = NULL;
+ 			}
+ 
+ 
+            	minIndex = minIndex->next;
+             maxIndex = maxIndex->next;
+ 		}
+ 	}
+ 
+ 	IsRelDeleted = false;
+ 	foreach(row_to_be_deleted, rows_to_be_deleted_list)
+ 	{
+ 		PartitionRowInfo *tmp = (PartitionRowInfo *)lfirst(row_to_be_deleted);
+ 		simple_heap_delete(pg_partrel, &tmp->tid);
+ 
+ 		if(IsRelDeleted == false)
+ 		{
+ 			IsRelDeleted = true;
+ 			resetStringInfo(query);
+ 			Relation Rel = RelationIdGetRelation(tmp->partRelOid);						
+ 			appendStringInfo(query, "DROP TABLE %s CASCADE;", RelationGetRelationName(Rel));
+ 			relation_close(Rel, 0);
+ 			query_list = pg_parse_query(query->data);
+ 			parsetree = (Node *)lfirst(list_head(query_list));
+ 			RemoveRelations((DropStmt *)parsetree);						
+ 		}
+ 		pfree(tmp);
+ 	}
+ 	list_free(rows_to_be_deleted_list);
+ 	rows_to_be_deleted_list = NULL;
+ 
+     systable_endscan(pg_partscan);
+     heap_close(pg_partrel, RowExclusiveLock);
+ }
+ 
+ static void 
+ ATExecDropPartitionByName(AlteredTableInfo *tab, Relation rel,
+ 	Partition *part) 
+ {
+ 	Oid			relId = RelnameGetRelid(part->partName->relname);
+     ScanKeyData	skey;
+     SysScanDesc	pg_partscan;
+     HeapTuple  	pg_parttup;
+     Relation   	pg_partrel;
+ 	StringInfo	query = makeStringInfo();
+ 	Node		*parsetree;
+ 	List		*query_list = NULL;
+ 	int			count = 0;
+ 
+ 	if(relId == InvalidOid)
+ 		elog(ERROR, "Error : Could not find valid Rel-Id for '%s' in current search path.", part->partName->relname);
+ 
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_partrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(relId));
+ 
+     pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+ 	{
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+         if(pg_part->parttype == PART_RANGE)
+         {
+ 			count++;
+ 			simple_heap_delete(pg_partrel, &pg_parttup->t_self);
+ 		}
+ 	}
+ 
+ 	if(count)
+ 	{
+ 		appendStringInfo(query, "DROP TABLE %s CASCADE;", part->partName->relname);
+ 		query_list = pg_parse_query(query->data);
+ 		parsetree = (Node *)lfirst(list_head(query_list));
+ 		RemoveRelations((DropStmt *)parsetree);
+ 	}
+ 	else
+ 	{
+ 		elog(ERROR, "'%s' is not a valid partition of '%s' table.", part->partName->relname, RelationGetRelationName(rel));
+ 	}
+ 
+     systable_endscan(pg_partscan);
+     heap_close(pg_partrel, RowExclusiveLock);
+ }
+ /*
+  * ATExecAddPartition : Add partition to an existing table. Currectly only range partition
+  * is supported.
+  */
+ static Oid 
+ ATExecAddPartition(AlteredTableInfo *tab, 
+ 	Relation rel, Partition *part)
+ {
+ 	StringInfo 		query = makeStringInfo();
+ 	StringInfo		createTrigger = makeStringInfo();
+ 	List			*query_list;
+ 	Node			*parsetree;
+     ScanKeyData 	skey;
+     SysScanDesc 	pg_partscan;
+     HeapTuple   	pg_parttup;
+     Relation    	pg_partrel;
+     List        	*distinct_part_key_list = NULL;	
+ 	List			*distinct_part_rel_oid_list = NULL;
+ 	ListCell		*minIndex, *maxIndex;
+ 	Datum 			min_datum, max_datum;
+ 	Oid 			typ_oid;
+ 	bool 			typbyval;
+ 	int16			len;
+ 	char            *partitionColumn; 
+ 	Partition		*tmp;
+ 	Datum 			part_attr;
+ 	bool 			isnull;
+ 	PartitionAttrs	*partition_attr;
+ 	int				partitionCount = 0;
+ 	Oid				*childOids = NULL;
+ 	int				j = 0;
+ 	Oid				child;
+ 	Const			*tmp_const;
+ 
+ 	if(part->partName)
+ 	{
+ 		if(part->tablespacename)
+ 			appendStringInfo(query, "CREATE TABLE %s () INHERITS (%s) TABLESPACE %s;", part->partName->relname, RelationGetRelationName(rel), part->tablespacename);
+ 		else
+ 			appendStringInfo(query, "CREATE TABLE %s () INHERITS (%s) ;", part->partName->relname, RelationGetRelationName(rel));
+ 	}
+ 	else
+ 	{
+ 		elog(ERROR, "Please specify partition name.");
+ 	}
+ 
+ 	query_list  = pg_parse_query(query->data);
+ 	parsetree = (Node *)lfirst(list_head(query_list));
+ 
+ 	child = DefineRelation((CreateStmt *)parsetree, RELKIND_RELATION);
+ 
+ 	minIndex = part->partitionCheck->min_value->head;
+ 	maxIndex = part->partitionCheck->max_value->head;
+ 
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_parentrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(rel)));
+ 
+     pg_partrel = heap_open(PartitionRelationId, AccessShareLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+ 	/* Create partition attribute node. */
+ 	partition_attr = makeNode(PartitionAttrs);
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+ 	{
+ 
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+         if(pg_part->parttype == PART_RANGE)
+         {
+ 			partitionCount++;
+ 			if (!list_member_int(distinct_part_key_list, pg_part->partkey))
+ 			{
+ 
+ 				Alias *colName = NULL;
+ 
+ 				distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey);
+ 
+ 				/* Get the name of partition key column */
+ 				partitionColumn = get_attname (rel->rd_id, pg_part->partkey);
+ 
+ 				/* Add the partition key columns. */
+ 				colName = makeAlias(partitionColumn, NULL);
+ 				partition_attr->colName = lappend(partition_attr->colName, colName);
+ 
+ 				IsValidRange(minIndex, maxIndex, RelationGetRelid(rel), partitionColumn, part->partName->relname);				
+ 
+ 				AddCheckConstraint(pg_part->keytype, 
+ 					((Const *)lfirst(minIndex))->constvalue, 
+ 					((Const *)lfirst(maxIndex))->constvalue, 
+ 					part->partName->relname, 
+ 					partitionColumn);	
+ 
+             	minIndex = minIndex->next;
+ 	            maxIndex = maxIndex->next;
+ 			}
+ 		}
+ 	}
+ 
+ 	if(partitionCount == 0)
+ 		elog(ERROR, "Could not find partition attributes of '%s'.", RelationGetRelationName(rel));
+ 
+ 	if(minIndex != NULL || maxIndex != NULL)
+        	elog(ERROR, "Multiple values specified for partition attributes for partition '%s'.", part->partName->relname);
+ 
+     systable_endscan(pg_partscan);
+     heap_close(pg_partrel, AccessShareLock);
+ 
+     /*Scan key to scan pg_partition table on parentrelid*/
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_parentrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(rel)));
+ 
+     pg_partrel = heap_open(PartitionRelationId, AccessShareLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+ //	partition_attr->PartitionFunction = PART_RANGE;
+ 
+ 	if((partitionCount % list_length(distinct_part_key_list))==0)
+ 	{
+ 		childOids = malloc(sizeof(Oid) * (1 + (partitionCount/list_length(distinct_part_key_list))));
+ 	}
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+ 	{
+ 		Datum short_datum;
+ 
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+         if(pg_part->parttype == PART_RANGE)
+         {
+ 
+ 			if (!list_member_oid(distinct_part_rel_oid_list, pg_part->partrelid))
+ 			{
+ 				distinct_part_rel_oid_list = lappend_oid(distinct_part_rel_oid_list, pg_part->partrelid);
+ 				childOids[j++] = pg_part->partrelid;
+ 				
+ 				tmp = makeNode(Partition);
+ 				tmp->partName 		= makeNode(RangeVar);
+ 				tmp->partitionCheck	= makeNode(Constraint);
+ 
+ 				/* Add this partition to partition attributes. */
+ 				partition_attr->partitions = lappend(partition_attr->partitions, tmp);
+ 			}
+ 
+ 			Relation tmp_rel = RelationIdGetRelation(pg_part->partrelid);
+ 			tmp->partName->relname = RelationGetRelationName(tmp_rel);
+ 			relation_close(tmp_rel, 0);
+ 
+ 			/* Get the name of partition key column */
+ 			partitionColumn = get_attname (rel->rd_id, pg_part->partkey);	
+ 
+             /* Get len and typbyval from pg_type */
+             get_typlenbyval(pg_part->keytype, &len, &typbyval);
+ 			
+       		/* Get min attribute from catalog */
+ 	        part_attr = heap_getattr (pg_parttup, Anum_pg_partition_minval, pg_partrel->rd_att, &isnull);
+ 
+    		 	/* Three cases a. datatypes with typbyval true and fix length.
+          	 * b. datatypes with byval false and fix length.
+          	 * c. extendible datatypes where length -1.
+          	 */
+ 	        if ( typbyval )
+     	    {
+         	    short_datum = 0;
+             	memcpy(&short_datum, VARDATA_ANY(part_attr), len);
+ 	            part_attr = short_datum;
+     	    }
+         	else if (len != -1)
+ 	        {
+     	        part_attr = (Datum)VARDATA_ANY(part_attr);
+         	}
+ 
+ 
+ 			/* Create const. */		
+ 			tmp_const = makeConst(pg_part->keytype,	/* Const type */
+ 									-1,
+ 									len,
+ 									part_attr,
+ 									false,
+ 									typbyval);
+ 
+ 			tmp_const->location = -1;
+ 
+ 			/* Add this to new partition. */
+ 			tmp->partitionCheck->min_value = lappend(tmp->partitionCheck->min_value, tmp_const);
+ 
+        		/* Get min attribute from catalog */
+ 	        part_attr = heap_getattr (pg_parttup, Anum_pg_partition_maxval, pg_partrel->rd_att, &isnull);
+ 
+    		    /* Three cases a. datatypes with typbyval true and fix length.
+          	 * b. datatypes with byval false and fix length.
+          	 * c. extendible datatypes where length -1.
+          	 */
+         	if ( typbyval )
+         	{
+             	short_datum = 0;
+ 	            memcpy(&short_datum, VARDATA_ANY(part_attr), len);
+     	        part_attr = short_datum;
+         	}
+ 	        else if (len != -1)
+     	    {
+         	    part_attr = (Datum)VARDATA_ANY(part_attr);
+ 	        }
+ 
+ 			/* Create const. */		
+ 			tmp_const = makeConst(pg_part->keytype,	/* Const type */
+ 									-1,
+ 									len,
+ 									part_attr,
+ 									false,
+ 									typbyval);
+ 
+ 			tmp_const->location = -1;
+ 
+ 			/* Add this to new partition. */
+ 			tmp->partitionCheck->max_value = lappend(tmp->partitionCheck->max_value, tmp_const);
+ 		}
+ 	}	
+ 
+ 	childOids[j++] = child;
+ 	partition_attr->partitions = lappend(partition_attr->partitions, part);
+ 
+ 	ValidateRanges(partition_attr, RelationGetRelid(rel), childOids);
+ 
+     systable_endscan(pg_partscan);
+     heap_close(pg_partrel, AccessShareLock);
+ 
+ 	minIndex = part->partitionCheck->min_value->head;
+ 	maxIndex = part->partitionCheck->max_value->head;
+ 
+ 
+ 	ListCell *partColumn;
+ 
+ 	j = 0;
+ 
+ 	foreach(partColumn, partition_attr->colName)
+ 	{
+ 		/* Get the name of partition key column. */
+ 		partitionColumn = ((Alias *)lfirst(partColumn))->aliasname;
+ 
+         min_datum = ((Const *)lfirst(minIndex))->constvalue;  
+         max_datum = ((Const *)lfirst(maxIndex))->constvalue;  
+ 
+         /* Get the attribute type. */
+         typ_oid = get_atttype(RelationGetRelid(rel), get_attnum(RelationGetRelid(rel), partitionColumn) );
+ 
+ 		/* Get the attribute length*/
+ 		get_typlenbyval(typ_oid, &len, &typbyval);
+ 
+ 		WriteRangePartitionToCatalog(min_datum, 
+ 										max_datum,
+ 										len,
+ 										typbyval,
+ 										RelationGetRelid(rel),
+ 										child,
+ 										get_attnum(RelationGetRelid(rel), partitionColumn),
+ 										typ_oid,
+ 										++j);
+ 
+        	minIndex = minIndex->next;
+         maxIndex = maxIndex->next;
+ 	}
+ 
+     resetStringInfo(createTrigger);
+     appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_update_trigger BEFORE UPDATE ON %s ", part->partName->relname);
+     appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_update_trigger ();");
+ 
+     /* Parse the above SQL string and use this parsetree to create trigger on parent table. */
+     query_list  = pg_parse_query(createTrigger->data);
+     parsetree = (Node *)lfirst(list_head(query_list));
+     CreateTrigger((CreateTrigStmt * )parsetree, 0);
+ 
+ 	return child;
+ }
+ 
+ 
  static void
  ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
  				ColumnDef *colDef)
***************
*** 4061,4066 ****
--- 5523,5532 ----
  	List	   *children;
  	ObjectAddress object;
  
+ 	/* Check if the column is part of any partition key. */
+ 	if( GetPartitionsCountAtt(RelationGetRelid(rel), get_attnum(RelationGetRelid(rel), colName)) > 0)
+ 		elog(ERROR, "Can not drop column '%s' as this is part of partition key of '%s' table.", colName, RelationGetRelationName(rel));
+ 
  	/* At top level, permission check was done in ATPrepCmd, else do it */
  	if (recursing)
  		ATSimplePermissions(rel, false);
***************
*** 5557,5562 ****
--- 7023,7032 ----
  	SysScanDesc scan;
  	HeapTuple	depTup;
  
+ 	/* Check if the column is part of any partition key. */
+ 	if( GetPartitionsCountAtt(RelationGetRelid(rel), get_attnum(RelationGetRelid(rel), colName)) > 0)
+ 		elog(ERROR, "Can not alter column '%s' as this is part of partition key of '%s' table.", colName, RelationGetRelationName(rel));
+ 
  	attrelation = heap_open(AttributeRelationId, RowExclusiveLock);
  
  	/* Look up the target column */
***************
*** 7764,7767 ****
--- 9234,9870 ----
  			cur_item = lnext(prev_item);
  		}
  	}
+ }
+ 
+ 
+ /* 
+  * ------------------------------------------------------------------------------
+  *	DefinePartitions
+  *	Create new partitions. They end up inheriting from the parent
+  *	relation.
+  *	Once they have been created, triggers need to be assigned to the parent to
+  *	provide the UPDATEs/INSERTs/DELETEs to percolate down to the children
+  * 	Callers expect this function to end with CommandCounterIncrement if it
+  * 	makes any changes.
+  * ------------------------------------------------------------------------------
+  */
+ void
+ DefinePartitions(CreateStmt *stmt, Oid parentRelId)
+ {
+ 	CreateStmt		*childStmt;
+ 	RangeVar 		inr;
+ 	Oid 			childOid;
+ 	PartitionAttrs	*partAttr;
+ 	int 			*partKeyOrder;
+ 	List			*partColumnList;
+ 	ListCell		*partColumn;
+ 	int 			count, i, j;
+ 	List			*columnList;
+ 	ListCell		*column;
+ 	StringInfo  	childTableName = makeStringInfo();
+ 	Oid				*partitionRelOids;
+ 	bool			partitionColumnMatch;
+ 	Relation    	rel;
+ 	StringInfo  	createTrigger = makeStringInfo();
+ 	List        	*query_list;
+ 	Node        	*parsetree;
+ 	
+ 
+ 	/* If no partitions are defined then return. */
+ 	if (stmt->partAttr == NULL) 
+ 		return;
+ 	
+ 	partColumnList = ((PartitionAttrs *)(stmt->partAttr))->colName;
+ 
+ 	i = 0;
+ 	j = 0;
+ 	partitionColumnMatch = false;
+ 
+ 	Assert(IsA(stmt->partAttr, PartitionAttrs));
+ 
+ 	partAttr = (PartitionAttrs *)(stmt->partAttr);
+ 	/* To store the partition key order */
+ 	partKeyOrder = malloc(partColumnList->length * sizeof(int));
+ 	columnList 	= stmt->tableElts;
+ 
+ 	foreach(partColumn, partColumnList)
+ 	{
+ 		/* Get the name of partition key column. */
+ 		char *colName = ((Alias *)lfirst(partColumn))->aliasname;
+ 		count = 0;
+ 
+ 		foreach(column, columnList)
+ 		{
+ 			ColumnDef *tmp = lfirst(column);
+ 			count++;
+ 			if(strcmp(tmp->colname, colName) == 0)
+ 			{
+ 				partKeyOrder[j++] = count;
+ 				partitionColumnMatch = true;
+ 				break;
+ 			}
+ 		}
+ 		if(partitionColumnMatch == false)
+ 			elog(ERROR, "'%s' attribute is not present in given table.", colName);
+ 	}
+ 
+ 	/*
+ 	 * All the partitions will inherit from the parent, set the parent in the
+ 	 * inhRelations structure
+ 	 */
+ 	inr = *stmt->relation;
+ 
+ 	/* 
+ 	 * Create the children tables. The parser has already made sure that we
+ 	 * have atleast one partition in the list
+ 	 */
+ 	if (partAttr->partFunc == PART_LIST || partAttr->partFunc == PART_RANGE)
+ 	{
+ 		List 		*partitionList = partAttr->partitions;
+ 		ListCell 	*temp_part; 
+ 		int			isSystemGeneratedName = 0;
+ 		int 		isUserGeneratedName = 0;
+ 		int			i = 0;
+ 
+ 		Assert(list_length(partitionList) > 0);
+ 
+ 		rel = RelationIdGetRelation(parentRelId);
+ 		resetStringInfo(createTrigger);
+ 		appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_insert_trigger BEFORE INSERT ON %s ", RelationGetRelationName(rel));
+ 		appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_insert_trigger();");
+ 		relation_close(rel, 0);
+ 
+ 		/* Parse the above SQL string and use this parsetree to create trigger on parent table. */
+ 		query_list  = pg_parse_query(createTrigger->data);
+ 		parsetree = (Node *)lfirst(list_head(query_list));
+ 		CreateTrigger((CreateTrigStmt * )parsetree, 0);
+ 
+ 		/* To avoid tuple being updated by the current transaction, after the current scan started */
+ 		CommandCounterIncrement();
+ 	
+ 		/* Allocate the storage for number of partitions.*/
+ 		partitionRelOids = malloc(sizeof(Oid)*list_length(partitionList));
+ 
+ 		foreach(temp_part, partitionList)
+ 		{
+ 			Partition *temp_partition = lfirst(temp_part);
+ 			/*
+ 			 * Create a working copy for each child
+ 			 */
+ 			childStmt = (CreateStmt *)copyObject((void *)stmt);
+ 			childStmt->constraints = list_make1(temp_partition->partitionCheck);
+ 
+ 			/*
+ 			 * Child has to use all columns from the parent, otherwise we will get
+ 			 * unnecessary merging columns notices as part of the
+ 			 * DefineRelation 
+ 			 */	
+ 			childStmt->tableElts = NIL;
+ 				
+ 			childStmt->inhRelations = lappend(NULL, &inr);
+ 
+ 			/* If partition name is not specified then generate one and apply. */
+ 			if(temp_partition->partName == NULL)
+ 			{
+ 
+ 				if(isUserGeneratedName == 1)
+ 					elog(ERROR, "Combination of user generated and system generated table names is not allowed.");				
+ 
+ 				isSystemGeneratedName = 1;
+ 				i++;
+ 
+ 				appendStringInfo(childTableName, "%s_%d", stmt->relation->relname, i);
+ 
+ 				/* Assign the child table name. */        
+ 				childStmt->relation->relname = childTableName->data;			
+ 
+ 				temp_partition->partName = makeNode(RangeVar);	
+ 				temp_partition->partName->relname = childTableName->data;
+ 			}
+ 			else
+ 			{
+ 	            if(isSystemGeneratedName == 0)
+ 				{
+ 					/* Assign the child table name. */        
+    		    	    childStmt->relation->relname = temp_partition->partName->relname;
+ 					isUserGeneratedName = 1;
+ 				}
+ 				else
+ 					elog(ERROR, "Combination of user generated and system generated table names is not allowed.");				
+ 			}
+     		
+ 			/* Assign the tablespace specified.  */
+ 			childStmt->tablespacename = temp_partition->tablespacename;
+ 
+ 			/* Define the relation. */
+ 			childOid = DefineRelation(childStmt, RELKIND_RELATION);
+ 
+ 		    rel = RelationIdGetRelation(childOid);
+ 		    resetStringInfo(createTrigger);
+ 		    appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_update_trigger BEFORE UPDATE ON %s ", RelationGetRelationName(rel));
+ 		    appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_update_trigger ();");
+ 		    relation_close(rel, 0);
+ 
+ 		    /* Parse the above SQL string and use this parsetree to create trigger on parent table. */
+ 		    query_list  = pg_parse_query(createTrigger->data);
+ 		    parsetree = (Node *)lfirst(list_head(query_list));
+ 		    CreateTrigger((CreateTrigStmt * )parsetree, 0);
+ 
+ 			/* Store the Child table Oids for later use. */
+ 			partitionRelOids[i++] = childOid;
+ 		}
+         /*
+        	 * Make the changes carried out so far, visible
+          */
+         CreatePartitionTrigger(stmt, parentRelId, partitionRelOids);
+         CommandCounterIncrement();
+ 
+ 		/* Free the storage. */
+ 		free(partitionRelOids);		
+ 
+ 	}
+ 	else if(partAttr->partFunc == PART_HASH)
+ 	{
+ 
+ 		bool 		typbyval;
+ 	    Datum 		values[Natts_pg_partition];
+ 	    bool  		nulls [Natts_pg_partition];
+ 		int16		len, k;
+ 		Oid 		typ_oid;
+ 
+ 		Assert(partAttr->numberOfPartitions > 0);
+ 
+ 		i = 0;
+ 		j = 0;
+ 			
+ 		/* Allocate the storage for number of partitions.*/
+ 		partitionRelOids = malloc(sizeof(Oid) * partAttr->numberOfPartitions);
+ 
+ 		for(i=0; i<partAttr->numberOfPartitions; i++)
+ 		{
+    	        childStmt = (CreateStmt *)copyObject((void *)stmt);
+        	    childStmt->constraints = NULL;
+ 
+             /*
+    	         * Child has to use all columns from the parent, otherwise we will get
+        	     * unnecessary merging columns notices as part of the
+            	 * DefineRelation
+              */	
+    	        childStmt->tableElts = NIL;
+        	    childStmt->inhRelations = lappend(NULL, &inr);
+ 
+ 			/* Device the name of child table. */
+ 			resetStringInfo(childTableName);
+ 			appendStringInfo(childTableName, "%s_%d", stmt->relation->relname, i);
+ 
+ 			/* Use this name to create child table. */
+    	        childStmt->relation->relname = childTableName->data;
+        	    childOid = DefineRelation(childStmt, RELKIND_RELATION);
+ 	
+ 			/* */	
+ 			rel = RelationIdGetRelation(childOid);
+ 			resetStringInfo(createTrigger);
+ 			appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_update_trigger BEFORE UPDATE ON %s ", RelationGetRelationName(rel));
+ 			appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_update_trigger();");
+ 	
+ 	    	/* Parse the above SQL string and use this parsetree to create trigger on parent table. */
+ 	    	query_list  = pg_parse_query(createTrigger->data);
+ 		    parsetree = (Node *)lfirst(list_head(query_list));
+     		CreateTrigger((CreateTrigStmt * )parsetree, 0);
+ 
+ 			/* To avoid tuple being updated by the current transaction, after the current scan started */
+ 			CommandCounterIncrement();
+ 
+             /* We can not create constraints for hash partitions directly. Instead, we will monitor inserts using insert triggers.*/
+             resetStringInfo(createTrigger);
+             appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_constraints_hash BEFORE INSERT ON %s ", RelationGetRelationName(rel));
+             appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_constraints_hash();");
+             relation_close(rel, 0);
+ 
+             /* Parse the above SQL string and use this parsetree to create trigger on parent table. */
+             query_list  = pg_parse_query(createTrigger->data);
+             parsetree = (Node *)lfirst(list_head(query_list));
+             CreateTrigger((CreateTrigStmt * )parsetree, 0);
+ 
+ 			k = 1;
+ 
+ 			foreach(partColumn, partColumnList)
+ 			{
+ 				/* Get the name of partition key column. */
+ 				char *colName = ((Alias *)lfirst(partColumn))->aliasname;
+ 
+ 				/* Get the attribute type. */
+ 				typ_oid = get_atttype(parentRelId, get_attnum(parentRelId, colName));
+ 			
+ 				/* Get the attribute length*/
+ 				get_typlenbyval(typ_oid, &len, &typbyval);
+ 
+ 				/* initialize nulls and values */
+ 			    for (j = 0; j < Natts_pg_partition; j++)
+ 				{
+ 					nulls[j] = false;
+ 					values[j] = (Datum) NULL;
+ 				}				
+ 
+ 			    values[Anum_pg_partition_minval     -1] = Int8GetDatum(NULL);
+ 				nulls[Anum_pg_partition_minval     -1] = true;
+ 		    	values[Anum_pg_partition_maxval     -1] = Int8GetDatum(NULL);
+ 				nulls[Anum_pg_partition_maxval     -1] = true;
+ 			    values[Anum_pg_partition_parentrelid -1]= ObjectIdGetDatum(parentRelId);
+ 			    values[Anum_pg_partition_partrelid  -1] = ObjectIdGetDatum(childOid);
+ 		    	values[Anum_pg_partition_parttype   -1] = Int8GetDatum(PART_HASH);
+ 			    values[Anum_pg_partition_partkey    -1] = ObjectIdGetDatum(get_attnum(parentRelId, colName));
+ 			    values[Anum_pg_partition_listval    -1] = Int8GetDatum(NULL);
+ 			    nulls[Anum_pg_partition_listval     -1] = true;
+ 		    	values[Anum_pg_partition_hashval    -1] = Int8GetDatum(i);
+ 			    values[Anum_pg_partition_keytype    -1] = ObjectIdGetDatum(typ_oid);
+ 				values[Anum_pg_partition_keyorder   -1] = Int8GetDatum(k);
+ 
+ 				k++;
+ 
+ 			    Relation r = heap_open(PartitionRelationId, RowExclusiveLock);
+ 			    TupleDesc tupDesc = r->rd_att;
+ 	
+ 			    HeapTuple tup = heap_form_tuple(tupDesc, values, nulls);
+ 			    simple_heap_insert(r, tup);
+ 
+ 				CatalogUpdateIndexes(r, tup); 
+ 
+ 			    heap_close(r, RowExclusiveLock);
+ 			}
+ 		}
+ 
+ 		rel = RelationIdGetRelation(parentRelId);
+ 		resetStringInfo(createTrigger);
+ 		appendStringInfo(createTrigger, "CREATE TRIGGER zz_partition_insert_trigger_hash BEFORE INSERT ON %s ", RelationGetRelationName(rel));
+ 		appendStringInfo(createTrigger, " FOR EACH ROW EXECUTE PROCEDURE partition_insert_trigger_hash();");
+ 		relation_close(rel, 0);
+ 	
+ 	    /* Parse the above SQL string and use this parsetree to create trigger on parent table. */
+     	query_list  = pg_parse_query(createTrigger->data);
+ 	    parsetree = (Node *)lfirst(list_head(query_list));
+     	CreateTrigger((CreateTrigStmt * )parsetree, 0);
+ 
+         /*
+          * Make the changes carried out so far, visible
+          */
+         CreatePartitionTrigger(stmt, parentRelId, partitionRelOids);
+         CommandCounterIncrement();
+ 		
+ 		/* Free the storage. */
+ 		free(partitionRelOids);		
+ 	}
+ 	else
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ 				 errmsg("Invalid PARTITION type specified")));
+ }
+ 
+ /*
+  * ------------------------------------------------------------------------------
+  * ValidateRanges : Check for the intersecting ranges of specified partitions.
+  * 
+  * ------------------------------------------------------------------------------
+  */
+ int ValidateRanges(PartitionAttrs *partAttr, Oid parentRelOid, Oid *partitionRelOids)
+ {
+ 	List			*partitionList;
+ 	List			*partitionColumnList;
+ 	ListCell		*p1;
+ 	ListCell		*p2;
+ 	Partition		*partition1, *partition2;
+ 	ListCell		*partitionColumn;
+ 	ListCell		*p1MinIndex, *p1MaxIndex, *p2MinIndex, *p2MaxIndex;
+ 	Datum			p1MinDatum, p1MaxDatum, p2MinDatum;
+ 	int				i, j, k;
+ 	Oid				typ_oid;
+ 	RegProcedure	procLessThan, procGreaterThan;
+ 	Oid				LessThanOid, GreaterThanOid;
+ 	bool			isDef;
+ 
+ 
+ 	i = 0;
+ 
+ 	/* Get the partition List. */
+ 	partitionList = partAttr->partitions;
+ 
+ 	/* Get the partition-key column list*/
+ 	partitionColumnList = partAttr->colName;
+ 
+ 	foreach(partitionColumn, partitionColumnList)
+ 	{		
+ 		char *colName = ((Alias *)lfirst(partitionColumn))->aliasname;
+ 		/* Get the column type. */
+ 		typ_oid = get_atttype(parentRelOid, get_attnum(parentRelOid, colName));
+ 
+ 		foreach(p1, partitionList)
+ 		{
+ 			partition1 = lfirst(p1);
+ 			j = 0;			
+ 			p1MinIndex = partition1->partitionCheck->min_value->head;
+ 			p1MaxIndex = partition1->partitionCheck->max_value->head;
+ 
+ 			while(j < i)
+ 			{
+     	        p1MinIndex = p1MinIndex->next;
+    	    	    p1MaxIndex = p1MaxIndex->next;
+ 				j++;
+ 			}
+ 
+    	        /* If it's a FuncExpr then execute the conversion function. */
+        	    if IsA((Node *)lfirst(p1MinIndex), FuncExpr)
+            	{
+                	p1MinDatum = OidFunctionCall1(((FuncExpr *)lfirst(p1MinIndex))->funcid,
+                    	            ((Const *)((FuncExpr *)lfirst(p1MinIndex))->args->head->data.ptr_value)->constvalue);
+             }
+    	        else
+        	    {
+            	    p1MinDatum = ((Const *)lfirst(p1MinIndex))->constvalue;
+             }
+ 
+            /* If it's a FuncExpr then execute the conversion function. */
+    	        if IsA((Node *)lfirst(p1MaxIndex), FuncExpr)
+        	    {
+                	p1MaxDatum = OidFunctionCall1(((FuncExpr *)lfirst(p1MaxIndex))->funcid,
+            	                    ((Const *)((FuncExpr *)lfirst(p1MaxIndex))->args->head->data.ptr_value)->constvalue);
+        	    }
+    	        else
+             {
+            	    p1MaxDatum = ((Const *)lfirst(p1MaxIndex))->constvalue;
+        	    }
+ 
+ 			foreach(p2, partitionList)
+ 			{
+ 				if(p1 != p2)
+ 				{
+ 					partition2 = lfirst(p2);
+ 					k = 0;
+ 	                p2MinIndex = partition2->partitionCheck->min_value->head;
+         	        p2MaxIndex = partition2->partitionCheck->max_value->head;
+ 
+ 					while(k < i)
+ 					{
+                 	    p2MinIndex = p2MinIndex->next;
+             	        p2MaxIndex = p2MaxIndex->next;
+ 						k++;
+ 					}
+ 
+ 		            /* If it's a FuncExpr then execute the conversion function. */
+ 	            	if IsA((Node *)lfirst(p2MinIndex), FuncExpr)
+     	    	    {
+         		        p2MinDatum = OidFunctionCall1(((FuncExpr *)lfirst(p2MinIndex))->funcid,
+     	        	                    ((Const *)((FuncExpr *)lfirst(p2MinIndex))->args->head->data.ptr_value)->constvalue);
+ 		            }
+     	        	else
+         		    {
+         	    	    p2MinDatum = ((Const *)lfirst(p2MinIndex))->constvalue;
+ 		            }
+ 
+ 		            LessThanOid = OperatorGet("<=", PG_CATALOG_NAMESPACE , typ_oid, typ_oid, &isDef);		
+ 
+ 	        	    if(isDef == FALSE)
+     		            elog(ERROR, "'<=' operator is not defined for type '%d'.", typ_oid);
+ 
+ 					procLessThan = get_opcode(LessThanOid);
+ 	            	
+ 					GreaterThanOid = OperatorGet(">", PG_CATALOG_NAMESPACE , typ_oid, typ_oid, &isDef);
+ 	        	    
+ 					if(isDef == FALSE)
+     		            elog(ERROR, "'>' operator is not defined for type '%d'.", typ_oid);
+ 
+     	        	procGreaterThan = get_opcode(GreaterThanOid);
+ 					bool tmp1, tmp2;
+ 
+ 					tmp1 = DatumGetBool(OidFunctionCall2(procLessThan, p1MinDatum, p2MinDatum));
+ 					tmp2 = DatumGetBool(OidFunctionCall2(procGreaterThan, p1MaxDatum, p2MinDatum));
+ 
+ 	    	        if(tmp1 == true && tmp2 == true )
+ 	            	{
+ 							elog(ERROR, "Partition ranges of '%s' and '%s' are overlapping for '%s' attribute.", 
+ 									partition1->partName->relname, partition2->partName->relname, colName);
+ 					}
+ 				}
+ 			}
+ 		}
+ 		i++;
+ 	}
+ 	return 0;
+ }
+ 
+ /*
+  * ------------------------------------------------------------------------------
+  * CreatePartitionTrigger : Create trigger for insert, update. Add constraints
+  * on child tables as per ranges specified.
+  * Param 1 : CreateStmt *stmt - Pointer to struct CreateStmt which contains 
+  * 								information required to create trigger.
+  * ------------------------------------------------------------------------------
+  */
+ static void 
+ CreatePartitionTrigger(CreateStmt *stmt, Oid parentRelOid, Oid *partitionRelOids)
+ {
+ 	int				counter, numberOfPartitions;
+ 	PartitionAttrs	*partAttr;
+ 	char            *partitionColumn; 
+ 	StringInfo		func_insert = makeStringInfo();
+ 	StringInfo		func_update = makeStringInfo();
+     Partition   	*temp_partition;
+ 	List            *partitionList;
+ 	ListCell        *temp_part;
+ 	int 			partitionColumnType;
+ 	List			*partColumnList;
+ 	ListCell		*partColumn;
+ 	int 			count, i, j, l;
+ 	List			*columnList;
+ 	ListCell		*column;
+ 	int 			*partKeyOrder;
+ 
+ 	partAttr = (PartitionAttrs *)(stmt->partAttr);
+ 
+     if(partAttr->partFunc == PART_HASH)
+     {
+         return ;
+     }
+ 
+     /* Reset the buffers used.  */
+     resetStringInfo(func_insert);
+     resetStringInfo(func_update);
+ 
+ 	/* Reset */
+ 	counter 			= 0;
+ 	numberOfPartitions 	= 0;
+ 	i					= 0;
+ 	j					= 0;
+ 	l					= 0;
+ 	
+ 	partitionList = partAttr->partitions;
+ 	Assert(list_length(partitionList));
+ 
+ 	/* Get the list of partition key columns. */
+ 	partColumnList = partAttr->colName;
+ 
+ 	partKeyOrder = malloc(partColumnList->length * sizeof(int));
+ 
+ 	columnList 	= stmt->tableElts;
+ 
+ 	foreach(partColumn, partColumnList)
+ 	{
+ 		/* Get the name of partition key column. */
+ 		char 	*colName = ((Alias *)lfirst(partColumn))->aliasname;
+ 		bool 	found = false;
+ 		count = 0;
+ 
+ 		foreach(column, columnList)
+ 		{
+ 			ColumnDef *tmp = lfirst(column);
+ 			count++;
+ 			if(strcmp(tmp->colname, colName) == 0)
+ 			{
+ 				partKeyOrder[j++] = count;
+ 				found = true;
+ 				break;
+ 			}
+ 		}
+ 
+ 		if(found == false)
+ 			elog(ERROR, "'%s' attribute is not present in given table.", colName);
+ 
+ 	}
+ 
+ 	foreach(temp_part, partitionList)
+ 	{
+ 		ListCell 	*minIndex, *maxIndex;
+ 
+ 		temp_partition = lfirst(temp_part);
+ 
+ 		if ( partAttr->partFunc == PART_RANGE )
+ 		{
+ 			minIndex = temp_partition->partitionCheck->min_value->head;
+ 			maxIndex = temp_partition->partitionCheck->max_value->head;
+ 		}
+ 
+ 		j = 0;
+ 	
+ 		foreach(partColumn, partColumnList)
+ 		{
+ 
+ 			/* Get the name of partition key column. */
+ 			char *colName = ((Alias *)lfirst(partColumn))->aliasname;
+ 			partitionColumn = colName;
+ 	
+ 			/* Type of partition key column */
+     		partitionColumnType = get_atttype(parentRelOid, get_attnum(parentRelOid, partitionColumn)); 
+ 
+ 	        if ( partAttr->partFunc == PART_LIST )
+ 			{
+ 				elog(ERROR, "List partitions are not yet supported.");
+ 			}
+ 			else
+ 			{		
+ 				Datum 			min_datum, max_datum;
+ 				Oid 			typ_oid;
+ 				bool 			typbyval;
+ 				int16			len;
+ 			
+ 				/* Get the attribute type. */
+ 				typ_oid = get_atttype(parentRelOid, get_attnum(parentRelOid, partitionColumn));
+ 	
+ 				IsValidRange(minIndex, maxIndex, parentRelOid, partitionColumn, temp_partition->partName->relname);	
+ 
+ 				AddCheckConstraint(typ_oid, 
+ 					((Const *)lfirst(minIndex))->constvalue, 
+ 					((Const *)lfirst(maxIndex))->constvalue, 
+ 					 temp_partition->partName->relname, 
+ 					partitionColumn);	
+ 
+ 				/* Get the attribute length*/
+ 				get_typlenbyval(typ_oid, &len, &typbyval);
+ 
+ 				min_datum = ((Const *)lfirst(minIndex))->constvalue;
+ 				max_datum = ((Const *)lfirst(maxIndex))->constvalue;
+ 
+ 				minIndex = minIndex->next;
+ 				maxIndex = maxIndex->next;
+ 
+ 		        WriteRangePartitionToCatalog(min_datum,
+     		                                    max_datum,
+         		                                len,
+             		                            typbyval,
+                 		                        parentRelOid,
+                     		                    partitionRelOids[l],
+                         		                get_attnum(parentRelOid, partitionColumn),
+                             		            typ_oid,
+                                 		        ++j);
+ 			}	
+ 		}
+ 		l = l + 1;
+ 	}
+ 	if ( partAttr->partFunc == PART_RANGE )
+ 		ValidateRanges(partAttr,parentRelOid, partitionRelOids);
+ 	return;
+ }
+ 
+ /*
+  * Pass either "*OLD*" or "*NEW*" as appropriate depending on the trigger to
+  * to be generated
+  */
+ static void 
+ MutateColumnRefs(Node *node, char *reference) 
+ {
+ 	if IsA(node, A_Expr) 
+ 	{
+ 		MutateColumnRefs( ((A_Expr *) node)->lexpr, reference );
+ 		MutateColumnRefs( ((A_Expr *) node)->rexpr, reference);
+ 	} 
+ 	else if IsA(node, ColumnRef) 
+ 	{
+ 		ColumnRef *col_ref = (ColumnRef *) node;
+ 
+ 		/* Should have at max one column in the column list */
+ 		Assert(1 == list_length(col_ref->fields));
+ 		
+ 		col_ref->fields = list_make2(makeString(reference),
+ 								lfirst(list_head(col_ref->fields)));
+ 
+ 	}
+ 	return;
  }
Index: src/backend/commands/trigger.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/commands/trigger.c,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.10.1
diff -c -r1.1.1.2 -r1.1.1.2.10.1
*** src/backend/commands/trigger.c	1 Dec 2008 09:37:46 -0000	1.1.1.2
--- src/backend/commands/trigger.c	9 Mar 2009 12:55:31 -0000	1.1.1.2.10.1
***************
*** 1730,1735 ****
--- 1730,1738 ----
  			heap_freetuple(oldtuple);
  		if (newtuple == NULL)
  			break;
+ 		/* Break free from the loop is found dummy-tuple. */
+ 		if (newtuple->t_len == -1)
+ 			break;
  	}
  	return newtuple;
  }
***************
*** 2021,2026 ****
--- 2024,2032 ----
  		if (oldtuple != newtuple && oldtuple != intuple)
  			heap_freetuple(oldtuple);
  		if (newtuple == NULL)
+ 			break;
+ 		/* Break free from the loop is found dummy-tuple. */
+ 		if (newtuple->t_len == -1)
  			break;
  	}
  	heap_freetuple(trigtuple);
Index: src/backend/executor/execMain.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/executor/execMain.c,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.8.1
diff -c -r1.1.1.2 -r1.1.1.2.8.1
*** src/backend/executor/execMain.c	1 Dec 2008 09:37:46 -0000	1.1.1.2
--- src/backend/executor/execMain.c	9 Mar 2009 12:55:31 -0000	1.1.1.2.8.1
***************
*** 1674,1679 ****
--- 1674,1687 ----
  		if (newtuple == NULL)	/* "do nothing" */
  			return;
  
+ 		/* Check for dummy tuple */
+ 		if(newtuple->t_len == -1)
+ 		{
+ 			/* Increase the count of processed rows. */
+ 			(estate->es_processed)++;
+ 			return;
+ 		}
+ 
  		if (newtuple != tuple)	/* modified by Trigger(s) */
  		{
  			/*
***************
*** 1911,1916 ****
--- 1919,1933 ----
  
  		if (newtuple == NULL)	/* "do nothing" */
  			return;
+ 
+ 		/* Check for dummy tuple */
+ 		if(newtuple->t_len == -1)
+ 		{
+ 			/* Increase the count of processed rows. */
+ 			(estate->es_processed)++;
+ 			heap_freetuple(newtuple);
+ 			return;
+ 		}
  
  		if (newtuple != tuple)	/* modified by Trigger(s) */
  		{
Index: src/backend/nodes/copyfuncs.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/nodes/copyfuncs.c,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.8.6
diff -c -r1.1.1.2 -r1.1.1.2.8.6
*** src/backend/nodes/copyfuncs.c	1 Dec 2008 09:37:46 -0000	1.1.1.2
--- src/backend/nodes/copyfuncs.c	19 Mar 2009 12:34:57 -0000	1.1.1.2.8.6
***************
*** 1513,1518 ****
--- 1513,1573 ----
  	return newnode;
  }
  
+ /*
+  * _copyPartitionAttrs
+  */
+ static PartitionAttrs *
+ _copyPartitionAttrs(PartitionAttrs *from)
+ {
+ 	PartitionAttrs *newnode = makeNode(PartitionAttrs);
+ 
+ 	COPY_SCALAR_FIELD(numberOfPartitions);
+ 	COPY_NODE_FIELD(partitions);
+ 	COPY_SCALAR_FIELD(partFunc);
+ 	COPY_NODE_FIELD(colName);
+ 
+ 	return newnode;
+ }
+ 
+ /*
+  * _copyPartition
+  */
+ static Partition *
+ _copyPartition(Partition *from)
+ {
+ 	Partition *newnode = makeNode(Partition);
+ 
+ 	COPY_NODE_FIELD(partName);
+ 	COPY_NODE_FIELD(partitionCheck);
+ 	COPY_STRING_FIELD(tablespacename);
+ 
+ 	return newnode;
+ }
+ 
+ /*
+  * _copyPartition
+  */
+ static UpdatePartitionStmt *
+ _copyUpdatePartitionStmt(UpdatePartitionStmt *from)
+ {
+ 	UpdatePartitionStmt *newnode = makeNode(UpdatePartitionStmt);
+ 
+ 	COPY_NODE_FIELD(prev);
+ 	COPY_NODE_FIELD(after);
+ 
+ 	return newnode;
+ }
+ static SplitPartitionStmt * 
+ _copySplitPartitionStmt(SplitPartitionStmt *from)
+ {
+ 	SplitPartitionStmt *newnode = makeNode(SplitPartitionStmt);
+ 
+ 	COPY_NODE_FIELD(partNames);
+ 	COPY_NODE_FIELD(splitValues);
+ 
+ 	return newnode;
+ }
+ 
  /* ****************************************************************
   *						relation.h copy functions
   *
***************
*** 2018,2023 ****
--- 2073,2081 ----
  	COPY_NODE_FIELD(keys);
  	COPY_NODE_FIELD(options);
  	COPY_STRING_FIELD(indexspace);
+ 	COPY_NODE_FIELD(min_value);
+ 	COPY_NODE_FIELD(max_value);
+ 	COPY_NODE_FIELD(partition_list_values);
  
  	return newnode;
  }
***************
*** 2325,2330 ****
--- 2383,2389 ----
  	COPY_NODE_FIELD(options);
  	COPY_SCALAR_FIELD(oncommit);
  	COPY_STRING_FIELD(tablespacename);
+ 	COPY_NODE_FIELD(partAttr);
  
  	return newnode;
  }
***************
*** 3461,3467 ****
  		case T_FromExpr:
  			retval = _copyFromExpr(from);
  			break;
! 
  			/*
  			 * RELATION NODES
  			 */
--- 3520,3537 ----
  		case T_FromExpr:
  			retval = _copyFromExpr(from);
  			break;
! 		case T_PartitionAttrs:
! 			retval = _copyPartitionAttrs(from);
! 			break;
! 		case T_Partition:
! 			retval = _copyPartition(from);
! 			break;
! 		case T_UpdatePartitionStmt:
! 			retval = _copyUpdatePartitionStmt(from);
! 			break;
! 		case T_SplitPartitionStmt:
! 			retval = _copySplitPartitionStmt(from);
! 			break;
  			/*
  			 * RELATION NODES
  			 */
Index: src/backend/nodes/equalfuncs.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/nodes/equalfuncs.c,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.10.2
diff -c -r1.1.1.2 -r1.1.1.2.10.2
*** src/backend/nodes/equalfuncs.c	1 Dec 2008 09:37:46 -0000	1.1.1.2
--- src/backend/nodes/equalfuncs.c	4 Mar 2009 08:30:42 -0000	1.1.1.2.10.2
***************
*** 705,710 ****
--- 705,730 ----
  	return true;
  }
  
+ static bool 
+ _equalPartitionAttrs(PartitionAttrs *a, PartitionAttrs *b)
+ {
+ 	COMPARE_SCALAR_FIELD(numberOfPartitions);
+ 	COMPARE_NODE_FIELD(partitions);
+ 	COMPARE_SCALAR_FIELD(partFunc);
+ 	COMPARE_NODE_FIELD(colName);
+ 
+ 	return true;
+ }
+ 
+ static bool 
+ _equalPartition(Partition *a, Partition *b)
+ {
+ 	COMPARE_NODE_FIELD(partName);
+ 	COMPARE_NODE_FIELD(partitionCheck);
+ 
+ 	return true;
+ }
+ 
  
  /*
   * Stuff from relation.h
***************
*** 1060,1065 ****
--- 1080,1086 ----
  	COMPARE_NODE_FIELD(options);
  	COMPARE_SCALAR_FIELD(oncommit);
  	COMPARE_STRING_FIELD(tablespacename);
+ 	COMPARE_NODE_FIELD(partAttr);
  
  	return true;
  }
***************
*** 2311,2316 ****
--- 2332,2343 ----
  			break;
  		case T_JoinExpr:
  			retval = _equalJoinExpr(a, b);
+ 			break;
+ 		case T_PartitionAttrs:
+ 			retval = _equalPartitionAttrs(a, b);
+ 			break;
+ 		case T_Partition:
+ 			retval = _equalPartition(a, b);
  			break;
  
  			/*
Index: src/backend/optimizer/plan/planner.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/optimizer/plan/planner.c,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.7
diff -c -r1.1.1.1 -r1.1.1.1.10.7
*** src/backend/optimizer/plan/planner.c	1 Dec 2008 09:36:26 -0000	1.1.1.1
--- src/backend/optimizer/plan/planner.c	18 Mar 2009 13:51:36 -0000	1.1.1.1.10.7
***************
*** 41,46 ****
--- 41,52 ----
  #include "utils/lsyscache.h"
  #include "utils/syscache.h"
  
+ #include "catalog/pg_partition.h"
+ #include "utils/fmgroids.h"
+ #include "catalog/indexing.h"
+ #include "utils/tqual.h"
+ #include "catalog/pg_namespace.h"
+ #include "access/hash.h"
  
  /* GUC parameter */
  double cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION;
***************
*** 82,87 ****
--- 88,367 ----
  						List *sub_tlist,
  						AttrNumber *groupColIdx);
  static List *postprocess_setop_tlist(List *new_tlist, List *orig_tlist);
+ Oid OperatorGet(const char *operatorName,
+             Oid operatorNamespace,
+             Oid leftObjectId,
+             Oid rightObjectId,
+             bool *defined);
+ extern Oid get_relevent_partition(HeapTuple tuple, Relation rel);
+ extern Oid get_relevent_hash_partition(HeapTuple tuple, Relation rel);
+ extern Oid TypenameGetTypid	(const char *typname);
+ extern Node* coerce_to_specific_type(ParseState *pstate,
+ 		Node *  	  node,
+ 		Oid  	  targetTypeId,
+ 		const char *  	  constructName
+ 	)  ;	
+ 
+ Oid
+ get_relevent_hash_partition(HeapTuple tuple, Relation rel) {
+     ScanKeyData skey;
+     SysScanDesc pg_partscan;
+     HeapTuple   pg_parttup;
+     Relation    pg_partrel;
+ 	bool		flag;
+ 	StringInfo	str_to_hash = makeStringInfo();
+ 
+    /* Lists to identify partition id to which the row should belong.
+      * irr_part_list - list of oids of irrelevent partitions.
+      * rel_part_list - list may contain irrelevent partition along
+      *                  with relevent partition
+      */
+     List        	*distinct_part_key_list=NULL;
+     List        	*rel_part_list= NULL;
+ 	int				hash_partition_entries = 0;
+ 	unsigned int	hashValue;
+ 
+ 	flag = false;
+ 	resetStringInfo(str_to_hash);	
+ 
+     /*Scan key to scan pg_partition table on parentrelid*/
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_parentrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(rel)));
+ 
+     pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+     {
+ 		bool			isnull, typbyval;
+ 		int16 			len;
+ 		bool			isVariableLength;
+ 		Oid				typoutput;
+ 
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+ 		if(pg_part->parttype == PART_HASH)
+ 		{		
+ 			/* Increase the no. of hash partition entries count. */
+ 			hash_partition_entries++;
+ 
+ 			if (!list_member_int(distinct_part_key_list, pg_part->partkey))
+ 			{
+ 				distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey);
+ 
+ 		        /* Get attribute from tuple */
+ 		        Datum attr      = heap_getattr(tuple, pg_part->partkey, rel->rd_att, &isnull);
+ 
+         		/* Get len and typbyval from pg_type */
+ 	        	get_typlenbyval(pg_part->keytype, &len, &typbyval);
+ 
+ 				/* Read the list value */
+ 				getTypeOutputInfo(pg_part->keytype, &typoutput, &isVariableLength);
+ 				appendStringInfo(str_to_hash, DatumGetCString(OidFunctionCall1(typoutput, attr)));
+ 			}
+ 		}
+ 	}
+ 
+ 
+ 	hashValue = DatumGetUInt32(hash_any(str_to_hash->data, strlen(str_to_hash->data))) % 
+ 				(int)(hash_partition_entries / distinct_part_key_list->length);
+ 
+ //	elog(NOTICE, "String to be hashed : %s, Hash Value : %u", str_to_hash->data, hashValue);
+ 
+     systable_endscan(pg_partscan);
+     heap_close(pg_partrel, RowExclusiveLock);
+ 
+     /*Scan key to scan pg_partition table on parentrelid*/
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_parentrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(rel)));
+ 
+     pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+     {
+         bool            isnull;
+ 
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+         if(pg_part->parttype == PART_HASH)
+         {
+ 	        /* Get min attribute from catalog */
+     	    Datum datumDashValue = heap_getattr (pg_parttup,Anum_pg_partition_hashval, 
+                                         pg_partrel->rd_att, &isnull);
+ 
+ 			if(datumDashValue == hashValue)
+ 			{
+ 				if (!list_member_oid(rel_part_list, pg_part->partrelid))		
+ 				{
+ 					rel_part_list = lappend_oid(rel_part_list, pg_part->partrelid);
+ 
+ 					if(list_length(rel_part_list) > 1)
+ 						elog(ERROR, "Found more than one matching partitions.");
+ 				}
+ 			}
+ 		}
+ 	}
+ 
+     systable_endscan(pg_partscan);
+     heap_close(pg_partrel, RowExclusiveLock);
+ 
+ //    elog (NOTICE,"Returned Oid %u", (Oid)rel_part_list->head->data.oid_value);
+ 
+ 	return (Oid)rel_part_list->head->data.oid_value;
+ }
+ 
+ static char*
+ DisplayDatum(Datum datum, Oid type)
+ {
+     Oid     typeOut;
+     bool    isVariableLength;
+ 
+     getTypeOutputInfo(type, &typeOut, &isVariableLength);
+     return (DatumGetCString(OidFunctionCall1(typeOut, datum)));
+ }
+ 
+ 
+ /* Function to get partiton id for a tuple inserted on master relation.
+  * Uses pg_partition catalog lookup to verify boundry condition.
+  * Each entry in catalog corresponds to constraint on single column.
+  */
+ 
+ Oid
+ get_relevent_partition(HeapTuple tuple, Relation rel) {
+     ScanKeyData skey;
+     SysScanDesc pg_partscan;
+     HeapTuple   pg_parttup;
+     Relation    pg_partrel;
+ 
+    /* Lists to identify partition id to which the row should belong.
+      * irr_part_list - list of oids of irrelevent partitions.
+      * rel_part_list - list may contain irrelevent partition along
+      *                  with relevent partition
+      */
+     List        *irr_part_list=NULL;
+     List        *rel_part_list= NULL;
+     ListCell    *part_oid;
+ 	Oid 		result = 0 ;
+ 
+     /*Scan key to scan pg_partition table on parentrelid*/
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_parentrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(rel)));
+ 
+     pg_partrel = heap_open(PartitionRelationId, RowExclusiveLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+     {
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+         bool isnull, isdef, gte_min, lt_max, typbyval;
+         int16 len;
+         Datum short_datum;
+         Oid     typoutput;
+         bool    isVariableLength;
+ 
+         /* Get attribute from tuple */
+         Datum attr      = heap_getattr(tuple, pg_part->partkey, rel->rd_att,&isnull);
+         /* Get min attribute from catalog */
+         Datum part_attr = heap_getattr (pg_parttup,Anum_pg_partition_minval,
+                                         pg_partrel->rd_att,&isnull);
+ 
+         getTypeOutputInfo(pg_part->keytype, &typoutput, &isVariableLength);
+ 
+         /* Get len and typbyval from pg_type */
+         get_typlenbyval(pg_part->keytype, &len, &typbyval);
+ 
+         /* Three cases a. datatypes with typbyval true and fix length.
+          * b. datatypes with byval false and fix length.
+          * c. extendible datatypes where length -1.
+          */
+         if ( typbyval )
+         {
+             short_datum = 0;
+             memcpy(&short_datum, VARDATA_ANY(part_attr), len);
+             part_attr = short_datum;
+         }
+         else if (len != -1)
+         {
+ 			part_attr = (Datum)VARDATA_ANY(part_attr);
+       	}
+ 
+         /* Evaluate whether tuple value satisfies partition key cnstraint */
+         Oid min_oper = OperatorGet(">=", PG_CATALOG_NAMESPACE , pg_part->keytype,
+                                         pg_part->keytype, &isdef);
+         RegProcedure oper_proc = get_opcode(min_oper);
+         gte_min = DatumGetBool(OidFunctionCall2(oper_proc, attr, part_attr));
+ 
+ //		elog(NOTICE, "min from pg_partition : %s", DisplayDatum(part_attr, pg_part->keytype));
+ 
+         /* Similar steps for max attribute of catalog */
+ 	    part_attr = heap_getattr (pg_parttup,Anum_pg_partition_maxval,
+                                             pg_partrel->rd_att,&isnull);
+         if ( typbyval )
+         {
+             short_datum = 0;
+             memcpy(&short_datum, VARDATA_ANY(part_attr), len);
+             part_attr = short_datum;
+         }
+         else if (len != -1 )
+ 		{
+ 			part_attr = (Datum)VARDATA_ANY(part_attr);
+ 		}
+ 
+         Oid max_oper = OperatorGet("<", PG_CATALOG_NAMESPACE , pg_part->keytype,
+                                     pg_part->keytype, &isdef);
+         RegProcedure oper_proc2 = get_opcode(max_oper);
+         lt_max = DatumGetBool(OidFunctionCall2(oper_proc2,attr,part_attr));
+ 
+ //		elog(NOTICE, "max from pg_partition : %s", DisplayDatum(part_attr, pg_part->keytype));
+ //		elog(NOTICE, "value from tuple : %s\n", DisplayDatum(attr, pg_part->keytype));
+ 
+         /* Create relevent and  irrelevent partition lists */
+         if ( gte_min & lt_max )
+         {
+             if ( !list_member_oid(irr_part_list, pg_part->partrelid)    )
+                 rel_part_list = lappend_oid(rel_part_list, pg_part->partrelid);
+         }
+         else
+             irr_part_list = lappend_oid(irr_part_list, (pg_part->partrelid));
+ 
+     }
+ 
+     /* Scan through lists to get partition oid to which the tuple is to be inserted */
+     foreach(part_oid, rel_part_list)
+     {
+         if ( list_member_oid(irr_part_list, part_oid->data.oid_value) )
+ 			 result = 0;
+ 		else 
+ 			 result = (Oid)part_oid->data.oid_value;
+     }
+ 
+     systable_endscan(pg_partscan);
+     heap_close(pg_partrel, RowExclusiveLock);
+ 
+ 
+ //        elog (NOTICE,"Returned Oid %d", result);
+     /*if ( part_oid != NULL )
+         return (Oid)part_oid->data.oid_value;
+     else
+         return part_oid;*/
+ 	return result;
+ }
+ 
  
  
  /*****************************************************************************
Index: src/backend/optimizer/util/plancat.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/optimizer/util/plancat.c,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.8.1
diff -c -r1.1.1.1 -r1.1.1.1.8.1
*** src/backend/optimizer/util/plancat.c	1 Dec 2008 09:36:26 -0000	1.1.1.1
--- src/backend/optimizer/util/plancat.c	19 Feb 2009 11:51:17 -0000	1.1.1.1.8.1
***************
*** 917,922 ****
--- 917,953 ----
  	return result;
  }
  
+ 
+ /* Return parent oid for child table.
+  * Useful to find out parent in case of cross partition updates.
+  */
+ 
+ Oid
+ find_inheritance_parent(Oid inhrelid)
+ {
+     Relation    relation;
+     HeapScanDesc scan;
+     HeapTuple   inheritsTuple;
+     Oid         inhparent;
+     ScanKeyData key[1];
+ 
+ 
+     ScanKeyInit(&key[0],
+                 Anum_pg_inherits_inhrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(inhrelid));
+     relation = heap_open(InheritsRelationId, AccessShareLock);
+     scan = heap_beginscan(relation, SnapshotNow, 1, key);
+     while ((inheritsTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+     {
+         inhparent = ((Form_pg_inherits)GETSTRUCT(inheritsTuple))->inhparent;
+     }
+     heap_endscan(scan);
+     heap_close(relation, AccessShareLock);
+     return inhparent;
+ }
+ 
+ 
  /*
   * has_unique_index
   *
Index: src/backend/parser/gram.y
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/parser/gram.y,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.8.10
diff -c -r1.1.1.2 -r1.1.1.2.8.10
*** src/backend/parser/gram.y	1 Dec 2008 09:37:47 -0000	1.1.1.2
--- src/backend/parser/gram.y	19 Mar 2009 12:34:57 -0000	1.1.1.2.8.10
***************
*** 169,174 ****
--- 169,176 ----
  
  	InsertStmt			*istmt;
  	VariableSetStmt		*vsetstmt;
+ 
+ 	PartitionFunction partFunc;
  }
  
  %type <node>	stmt schema_stmt
***************
*** 238,244 ****
  %type <list>	func_name handler_name qual_Op qual_all_Op subquery_Op
  				opt_class opt_validator
  
! %type <range>	qualified_name OptConstrFromTable
  
  %type <str>		all_Op MathOp SpecialRuleRelation
  
--- 240,246 ----
  %type <list>	func_name handler_name qual_Op qual_all_Op subquery_Op
  				opt_class opt_validator
  
! %type <range>	qualified_name OptConstrFromTable OptPartitionName
  
  %type <str>		all_Op MathOp SpecialRuleRelation
  
***************
*** 324,330 ****
  %type <defelt>	def_elem old_aggr_elem
  %type <node>	def_arg columnElem where_clause where_or_current_clause
  				a_expr b_expr c_expr func_expr AexprConst indirection_el
! 				columnref in_expr having_clause func_table array_expr
  %type <list>	row type_list array_expr_list
  %type <node>	case_expr case_arg when_clause case_default
  %type <list>	when_clause_list
--- 326,335 ----
  %type <defelt>	def_elem old_aggr_elem
  %type <node>	def_arg columnElem where_clause where_or_current_clause
  				a_expr b_expr c_expr func_expr AexprConst indirection_el
! 				columnref in_expr having_clause func_table array_expr 
! 
! %type <list>	ListPartitionList  
! 
  %type <list>	row type_list array_expr_list
  %type <node>	case_expr case_arg when_clause case_default
  %type <list>	when_clause_list
***************
*** 386,391 ****
--- 391,401 ----
  %type <with> 	with_clause
  %type <list>	cte_list
  
+ %type <list>	RangePartitions ListPartitions PartitionColumns
+ %type <node> 	PartitioningData OptPartition ListPartition RangePartition
+ /* %type <node>	LeftVal RightVal */
+ 
+ // %type <partFunc> Partitioning_function
  
  /*
   * If you make any token changes, update the keyword table in
***************
*** 435,441 ****
  	KEY
  
  	LANCOMPILER LANGUAGE LARGE_P LAST_P LEADING LEAST LEFT LEVEL
! 	LIKE LIMIT LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION
  	LOCK_P LOGIN_P
  
  	MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE
--- 445,451 ----
  	KEY
  
  	LANCOMPILER LANGUAGE LARGE_P LAST_P LEADING LEAST LEFT LEVEL
! 	LIKE LIMIT LIST1  LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION
  	LOCK_P LOGIN_P
  
  	MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE
***************
*** 447,465 ****
  	OBJECT_P OF OFF OFFSET OIDS OLD ON ONLY OPERATOR OPTION OR
  	ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNED OWNER
  
! 	PARSER PARTIAL PASSWORD PLACING PLANS POSITION
  	PRECISION PRESERVE PREPARE PREPARED PRIMARY
  	PRIOR PRIVILEGES PROCEDURAL PROCEDURE
  
  	QUOTE
  
! 	READ REAL REASSIGN RECHECK RECURSIVE REFERENCES REINDEX RELATIVE_P RELEASE
  	RENAME REPEATABLE REPLACE REPLICA RESET RESTART RESTRICT RETURNING RETURNS
  	REVOKE RIGHT ROLE ROLLBACK ROW ROWS RULE
  
  	SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE
  	SERIALIZABLE SESSION SESSION_USER SET SETOF SHARE
! 	SHOW SIMILAR SIMPLE SMALLINT SOME STABLE STANDALONE_P START STATEMENT
  	STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING SUPERUSER_P
  	SYMMETRIC SYSID SYSTEM_P
  
--- 457,475 ----
  	OBJECT_P OF OFF OFFSET OIDS OLD ON ONLY OPERATOR OPTION OR
  	ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNED OWNER
  
! 	PARSER PARTIAL PARTITION PARTITIONS PASSWORD PLACING PLANS POSITION
  	PRECISION PRESERVE PREPARE PREPARED PRIMARY
  	PRIOR PRIVILEGES PROCEDURAL PROCEDURE
  
  	QUOTE
  
! 	RANGE READ REAL REASSIGN RECHECK RECURSIVE REFERENCES REINDEX RELATIVE_P RELEASE
  	RENAME REPEATABLE REPLACE REPLICA RESET RESTART RESTRICT RETURNING RETURNS
  	REVOKE RIGHT ROLE ROLLBACK ROW ROWS RULE
  
  	SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE
  	SERIALIZABLE SESSION SESSION_USER SET SETOF SHARE
! 	SHOW SIMILAR SIMPLE SMALLINT SOME SPLIT STABLE STANDALONE_P START STATEMENT
  	STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING SUPERUSER_P
  	SYMMETRIC SYSID SYSTEM_P
  
***************
*** 1493,1500 ****
  		;
  
  alter_table_cmd:
  			/* ALTER TABLE <name> ADD [COLUMN] <coldef> */
! 			ADD_P opt_column columnDef
  				{
  					AlterTableCmd *n = makeNode(AlterTableCmd);
  					n->subtype = AT_AddColumn;
--- 1503,1519 ----
  		;
  
  alter_table_cmd:
+ 			/* ALTER TABLE <name> ADD PARTITION BY RANGE (col, ...) (<name> START val,.... END val,....) */
+ 			ADD_P RANGE PARTITION RangePartition
+ 				{
+ 					AlterTableCmd *n = makeNode(AlterTableCmd);
+ 					n->subtype = AT_AddPartition;
+ 					n->def = $4;
+ 					$$ = (Node *)n;
+ 				}		
+ 
  			/* ALTER TABLE <name> ADD [COLUMN] <coldef> */
! 			| ADD_P opt_column columnDef
  				{
  					AlterTableCmd *n = makeNode(AlterTableCmd);
  					n->subtype = AT_AddColumn;
***************
*** 1544,1549 ****
--- 1563,1609 ----
  					n->def = (Node *) makeString($6);
  					$$ = (Node *)n;
  				}
+ 			| DROP RANGE PARTITION qualified_name 
+ 				{
+ 					AlterTableCmd *n = makeNode(AlterTableCmd);
+ 					n->subtype = AT_DropPartitionByName;
+ 
+                     Partition *partition = makeNode(Partition);
+                     partition->partName = $4;
+                     partition->partitionCheck = NULL;
+ 
+ 					n->def = (Node *)partition;
+ 					$$ = (Node *)n;
+ 				}
+ 			| DROP RANGE PARTITION RangePartition
+ 				{
+ 					AlterTableCmd *n = makeNode(AlterTableCmd);
+ 					n->subtype = AT_DropPartitionByRange;
+ 					n->def = $4;
+ 					$$ = (Node *)n;
+ 				}
+ 			| UPDATE RANGE PARTITION RangePartition TO RangePartition
+ 				{
+ 					AlterTableCmd *n = makeNode(AlterTableCmd);
+ 					UpdatePartitionStmt *u = makeNode(UpdatePartitionStmt);
+ 					u->prev = (Partition *)$4;
+ 					u->after = (Partition *)$6;
+ 					n->subtype = AT_UpdatePartition;
+ 					n->def = (Node *)u;
+ 					$$ = (Node *)n;
+ 				}
+ 			| SPLIT RANGE PARTITION qualified_name AT VALUES '(' ListPartitionList ')' INTO qualified_name AND qualified_name
+ 				{
+ 					AlterTableCmd *n = makeNode(AlterTableCmd);
+ 					SplitPartitionStmt *s = makeNode(SplitPartitionStmt);
+ 					s->partNames = lappend(s->partNames, $4);
+ 					s->partNames = lappend(s->partNames, $11);
+ 					s->partNames = lappend(s->partNames, $13);
+ 					s->splitValues = $8;
+ 					n->subtype = AT_SplitPartition;
+ 					n->def = (Node *)s;
+ 					$$ = (Node *)n;
+ 				}
  			/* ALTER TABLE <name> DROP [COLUMN] <colname> [RESTRICT|CASCADE] */
  			| DROP opt_column ColId opt_drop_behavior
  				{
***************
*** 1946,1953 ****
   *
   *****************************************************************************/
  
! CreateStmt:	CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')'
! 			OptInherit OptWith OnCommitOption OptTableSpace
  				{
  					CreateStmt *n = makeNode(CreateStmt);
  					$4->istemp = $2;
--- 2006,2013 ----
   *
   *****************************************************************************/
  
! CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')'
! 			   OptInherit OptWith OnCommitOption OptTableSpace OptPartition 
  				{
  					CreateStmt *n = makeNode(CreateStmt);
  					$4->istemp = $2;
***************
*** 1958,1967 ****
  					n->options = $9;
  					n->oncommit = $10;
  					n->tablespacename = $11;
  					$$ = (Node *)n;
  				}
  		| CREATE OptTemp TABLE qualified_name OF qualified_name
! 			'(' OptTableElementList ')' OptWith OnCommitOption OptTableSpace
  				{
  					/* SQL99 CREATE TABLE OF <UDT> (cols) seems to be satisfied
  					 * by our inheritance capabilities. Let's try it...
--- 2018,2028 ----
  					n->options = $9;
  					n->oncommit = $10;
  					n->tablespacename = $11;
+                     n->partAttr = $12;
  					$$ = (Node *)n;
  				}
  		| CREATE OptTemp TABLE qualified_name OF qualified_name
! 			'(' OptTableElementList ')' OptWith OnCommitOption OptTableSpace OptPartition
  				{
  					/* SQL99 CREATE TABLE OF <UDT> (cols) seems to be satisfied
  					 * by our inheritance capabilities. Let's try it...
***************
*** 1975,1984 ****
--- 2036,2167 ----
  					n->options = $10;
  					n->oncommit = $11;
  					n->tablespacename = $12;
+                     n->partAttr =  $13;
  					$$ = (Node *)n;
  				}
  		;
  
+ OptPartition: PARTITION BY PartitioningData { $$ = $3; }  
+ 		| {$$ = NULL;}
+     	;
+ 
+ PartitionColumns: ColId 				{$$ = list_make1(makeString($1));}
+ 		| PartitionColumns ',' ColId 	{$$ = lappend($1, makeString($3));}
+ 		;
+ 
+ PartitioningData: LIST1  '(' PartitionColumns ')'  '(' ListPartitions  ')'
+ 				{
+ 					PartitionAttrs *partData = makeNode(PartitionAttrs);
+ 					partData->partFunc = PART_LIST;
+ 					partData->numberOfPartitions = 0;
+ 					partData->colName = $3;
+ 					partData->partitions = $6;
+ 					$$ = (Node *)partData;
+ 				}
+ 			
+ 		| RANGE '(' PartitionColumns ')' '(' RangePartitions ')'
+ 				{
+                     PartitionAttrs *partData = makeNode(PartitionAttrs);
+                     partData->partFunc = PART_RANGE;
+                     partData->numberOfPartitions = 0;
+                     partData->colName = $3;
+                     partData->partitions = $6;
+                     $$ = (Node *)partData;					
+ 				}	
+ 
+ 
+ 		| ColId '(' PartitionColumns ')'  PARTITIONS ICONST 
+ 				{
+ 					PartitionAttrs *partData = makeNode(PartitionAttrs);
+ 					if(strcasecmp($1, "hash") != 0)
+ 						elog(ERROR, "Not a valid partition type.");
+ 					partData->partFunc = PART_HASH;
+ 					partData->numberOfPartitions = $6;
+ 					partData->colName = $3;
+ 					partData->partitions =  NULL;
+ 					$$ = (Node *)partData;
+ 				}
+ 		;
+ 
+ ListPartitions: ListPartition 			{ $$ = list_make1($1);}
+ 	| ListPartitions ',' ListPartition 	{ $$ = lappend($1, $3);}
+ 	;
+ 
+ RangePartitions :  RangePartition { $$ = list_make1($1); }
+         | RangePartitions ',' RangePartition { $$ = lappend($1, $3); }
+         ;
+ 
+ ListPartitionList: a_expr 			{ $$ = list_make1($1);}
+ 	| ListPartitionList ',' a_expr 	{ $$ = lappend($1, $3);}
+ 	;
+ 
+ 
+ OptPartitionName: /* Empty  */ {$$ = NULL;}		
+ 	| qualified_name
+ 	;
+ 
+ ListPartition :	OptPartitionName '(' ListPartitionList ')'   OptTableSpace
+ 				{
+ 
+ 					Partition *partition = makeNode(Partition);
+ 					Constraint *constraint = NULL;
+ 					
+ 					partition->partName = $1;
+ 
+ 					/* Make the constraint node */
+ 					constraint = makeNode(Constraint);
+ 	
+ 					constraint->contype = PART_LIST;
+ 					constraint->name = NULL;
+ 					constraint->raw_expr = (Node *)$3; 
+ 					constraint->cooked_expr = NULL;
+ 					constraint->keys = NULL;
+ 					constraint->indexspace = NULL;
+ 					constraint->partition_list_values = $3;
+ 
+ 					partition->partitionCheck = constraint;
+ 					partition->tablespacename = $5;
+ 					$$ = (Node *)partition;
+ 				}
+     	;
+ 
+ RangePartition : OptPartitionName '(' START ListPartitionList END_P ListPartitionList ')' OptTableSpace
+ 				{
+ 
+ 						Partition *partition = makeNode(Partition);
+ 						Constraint *constraint = NULL;
+ 
+ 						partition->partName = $1;
+ 
+ 						/* Make the constraint node */
+ 						constraint = makeNode(Constraint);
+ 						constraint->contype = PART_RANGE;
+ 						constraint->min_value = $4;
+ 						constraint->max_value = $6;
+ 						constraint->name = NULL;
+ 			
+ 						/* constraint->raw_expr = $3; */
+ 
+ 						constraint->cooked_expr = NULL;
+ 						constraint->keys = NULL;
+ 						constraint->indexspace = NULL;
+ 
+ 						partition->partitionCheck = constraint;
+ 						partition->tablespacename = $8;
+ 						$$ = (Node *)partition;
+ 				}
+         ;
+ 
+ 
+ /***
+ part_column_list:
+ 			ColId
+ 					{ $$ = list_make1($1); }
+ 			|part_column_list ',' ColId 
+ 					{ $$ = lappend($1, $3); }
+ 		;
+ ****/
+ 
  /*
   * Redundancy here is needed to avoid shift/reduce conflicts,
   * since TEMP is not a reserved word.  See also OptTempTableName.
***************
*** 9344,9360 ****
  				{
  					$$ = makeNullAConst(@1);
  				}
  		;
  
  Iconst:		ICONST									{ $$ = $1; };
  Sconst:		SCONST									{ $$ = $1; };
  RoleId:		ColId									{ $$ = $1; };
- 
  SignedIconst: Iconst								{ $$ = $1; }
  			| '+' Iconst							{ $$ = + $2; }
  			| '-' Iconst							{ $$ = - $2; }
  		;
- 
  /*
   * Name classification hierarchy.
   *
--- 9527,9592 ----
  				{
  					$$ = makeNullAConst(@1);
  				}
+ /*
+ 			| AexprConst '+' AexprConst 
+ 				{
+ 					Node *l = $1;
+ 					Node *r = $3;	
+ 					int 	iresult;
+ 					float   fresult, flval, frval;
+ 
+ 					if(l->val.type == T_Integer && r->val.type == T_Integer)
+ 					{
+ 						iresult = l->val.val.ival + r->val.val.ival;
+ 						$$ = makeIntConst(iresult, @1);
+ 					}
+ 					else if(l->val.type == T_Integer && r->val.type == T_Float)
+                     {
+ 						sscanf(r.val.val.str, "%f", &frval);
+ 						fresult = (float)l->val.val.ival + frval;
+                         $$ = makeFloatConst(fresult, @1);
+                     }
+ 					else if(l->val.type == T_Float && r->val.type == T_Integer)
+                     {
+ 						sscanf(l.val.val.str, "%f", &flval);
+ 						fresult = flval + (float)r->val.val.ival;
+                         $$ = makeFloatConst(fresult, @1);
+                     }
+ 					else if(l->val.type == T_Float && r->val.type == T_Float)
+                     {
+ 						sscanf(l.val.val.str, "%f", &flval);
+ 						sscanf(r.val.val.str, "%f", &frval);
+ 						fresult = flval + frval;
+                         $$ = makeFloatConst(fresult, @1);
+                     }
+ 					else
+ 						elog(ERROR, "Invalid expression.");
+ 				}
+ 					if ( l->val.type == T_Float) || r->val.type == T_Float )
+ 					{
+ 						$$ == makeFloatConst($1, @1);
+ 					}
+ 					else if ( l->val.type == T_Integer && r->val.type == T_Integer )
+ 						$$ == makeIntConst($1, @1);
+ 					else
+ 						elog (ERROR, "Invalid expression ");					
+ 				}					
+ 			| AexprConst '-' AexprConst
+ 				{
+ 				}
+ 			| AexprConst '/' AexprConst
+ 			| AexprConst '*' AexprConst
+ 			| AexprConst '||' AexprConst
+ */
  		;
  
  Iconst:		ICONST									{ $$ = $1; };
  Sconst:		SCONST									{ $$ = $1; };
  RoleId:		ColId									{ $$ = $1; };
  SignedIconst: Iconst								{ $$ = $1; }
  			| '+' Iconst							{ $$ = + $2; }
  			| '-' Iconst							{ $$ = - $2; }
  		;
  /*
   * Name classification hierarchy.
   *
***************
*** 9556,9561 ****
--- 9788,9795 ----
  			| OWNER
  			| PARSER
  			| PARTIAL
+ 			| PARTITION
+ 			| PARTITIONS
  			| PASSWORD
  			| PLANS
  			| PREPARE
***************
*** 9599,9604 ****
--- 9833,9839 ----
  			| SHARE
  			| SHOW
  			| SIMPLE
+ 			| SPLIT
  			| STABLE
  			| STANDALONE_P
  			| START
***************
*** 9729,9738 ****
--- 9964,9975 ----
  			| JOIN
  			| LEFT
  			| LIKE
+ 			| LIST1
  			| NATURAL
  			| NOTNULL
  			| OUTER_P
  			| OVERLAPS
+ 			| RANGE
  			| RIGHT
  			| SIMILAR
  			| VERBOSE
Index: src/backend/parser/keywords.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/parser/keywords.c,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.8.3
diff -c -r1.1.1.1 -r1.1.1.1.8.3
*** src/backend/parser/keywords.c	1 Dec 2008 09:36:26 -0000	1.1.1.1
--- src/backend/parser/keywords.c	17 Mar 2009 12:50:55 -0000	1.1.1.1.8.3
***************
*** 186,191 ****
--- 186,192 ----
  	{"greatest", GREATEST, COL_NAME_KEYWORD},
  	{"group", GROUP_P, RESERVED_KEYWORD},
  	{"handler", HANDLER, UNRESERVED_KEYWORD},
+ //	{"hash", HASH, UNRESERVED_KEYWORD},
  	{"having", HAVING, RESERVED_KEYWORD},
  	{"header", HEADER_P, UNRESERVED_KEYWORD},
  	{"hold", HOLD, UNRESERVED_KEYWORD},
***************
*** 231,236 ****
--- 232,238 ----
  	{"level", LEVEL, UNRESERVED_KEYWORD},
  	{"like", LIKE, TYPE_FUNC_NAME_KEYWORD},
  	{"limit", LIMIT, RESERVED_KEYWORD},
+ 	{"list", LIST1, UNRESERVED_KEYWORD},
  	{"listen", LISTEN, UNRESERVED_KEYWORD},
  	{"load", LOAD, UNRESERVED_KEYWORD},
  	{"local", LOCAL, UNRESERVED_KEYWORD},
***************
*** 291,296 ****
--- 293,300 ----
  	{"owner", OWNER, UNRESERVED_KEYWORD},
  	{"parser", PARSER, UNRESERVED_KEYWORD},
  	{"partial", PARTIAL, UNRESERVED_KEYWORD},
+ 	{"partition", PARTITION, UNRESERVED_KEYWORD},
+ 	{"partitions", PARTITIONS, UNRESERVED_KEYWORD},
  	{"password", PASSWORD, UNRESERVED_KEYWORD},
  	{"placing", PLACING, RESERVED_KEYWORD},
  	{"plans", PLANS, UNRESERVED_KEYWORD},
***************
*** 305,310 ****
--- 309,315 ----
  	{"procedural", PROCEDURAL, UNRESERVED_KEYWORD},
  	{"procedure", PROCEDURE, UNRESERVED_KEYWORD},
  	{"quote", QUOTE, UNRESERVED_KEYWORD},
+ 	{"range", RANGE, UNRESERVED_KEYWORD},
  	{"read", READ, UNRESERVED_KEYWORD},
  	{"real", REAL, COL_NAME_KEYWORD},
  	{"reassign", REASSIGN, UNRESERVED_KEYWORD},
***************
*** 349,354 ****
--- 354,360 ----
  	{"simple", SIMPLE, UNRESERVED_KEYWORD},
  	{"smallint", SMALLINT, COL_NAME_KEYWORD},
  	{"some", SOME, RESERVED_KEYWORD},
+ 	{"split", SPLIT, UNRESERVED_KEYWORD},
  	{"stable", STABLE, UNRESERVED_KEYWORD},
  	{"standalone", STANDALONE_P, UNRESERVED_KEYWORD},
  	{"start", START, UNRESERVED_KEYWORD},
Index: src/backend/parser/parse_utilcmd.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/parser/parse_utilcmd.c,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.5
diff -c -r1.1.1.1 -r1.1.1.1.10.5
*** src/backend/parser/parse_utilcmd.c	1 Dec 2008 09:36:26 -0000	1.1.1.1
--- src/backend/parser/parse_utilcmd.c	13 Mar 2009 09:09:36 -0000	1.1.1.1.10.5
***************
*** 75,80 ****
--- 75,81 ----
  	List	   *alist;			/* "after list" of things to do after creating
  								 * the table */
  	IndexStmt  *pkey;			/* PRIMARY KEY index, if any */
+ 	PartitionAttrs *partAttr; 	/* partitioning related info */
  } CreateStmtContext;
  
  /* State shared by transformCreateSchemaStmt and its subroutines */
***************
*** 114,119 ****
--- 115,121 ----
  static void transformConstraintAttrs(List *constraintList);
  static void transformColumnType(ParseState *pstate, ColumnDef *column);
  static void setSchemaName(char *context_schema, char **stmt_schema_name);
+ static void transformPartitionData(ParseState *pstate, CreateStmtContext *cxt);
  
  
  /*
***************
*** 177,182 ****
--- 179,185 ----
  	cxt.alist = NIL;
  	cxt.pkey = NULL;
  	cxt.hasoids = interpretOidsOption(stmt->options);
+ 	cxt.partAttr = (PartitionAttrs *)stmt->partAttr;
  
  	/*
  	 * Run through each primary element in the table creation clause. Separate
***************
*** 235,240 ****
--- 238,248 ----
  	transformFKConstraints(pstate, &cxt, true, false);
  
  	/*
+ 	 * Postprocess partition related information
+ 	 */
+ 	transformPartitionData(pstate, &cxt);
+ 
+ 	/*
  	 * Output results.
  	 */
  	stmt->tableElts = cxt.columns;
***************
*** 955,960 ****
--- 963,1063 ----
  	return result;
  }
  
+ /*
+  * transformPartitionData:
+  * If partitioning is specified, carry out checks on the supplied create stmt
+  */
+ static void
+ transformPartitionData(ParseState *pstate, CreateStmtContext *cxt)
+ {
+ 	PartitionAttrs 	*partAttr = (PartitionAttrs *)cxt->partAttr;
+ 	ListCell 		*lc, *temp_part; 
+ 	List *partitionList, *partindexlist = NIL;
+ 
+ 	if (partAttr == NULL)
+ 		return;
+ 
+ 	partitionList = partAttr->partitions;
+ 	/* Foreign key constraints not supported with partitioning */
+ 	if(cxt->fkconstraints)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ 				 errmsg("FOREIGN KEY constraints not supported with partitioning")));
+ 
+ 	/*
+ 	 * Check if the primary key column is the one on which partitioning
+ 	 * is being done
+ 	 */
+ 
+ 	foreach(lc, cxt->ixconstraints)
+ 	{
+ 		Constraint  *constraint = lfirst(lc);
+ 		ListCell *partCol;
+ 		bool 	flag;
+ 
+ 		Assert(IsA(constraint, Constraint));
+ 
+ 		if(constraint->contype == CONSTR_PRIMARY ||
+ 		   constraint->contype == CONSTR_UNIQUE) 
+ 		{
+ /*
+ 			if (constraint->keys->length > 1) 
+ 			{
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ 						 errmsg("PRIMARY/UNIQUE KEY with partitioning can specify"
+ 								" single attribute only")));
+ 			}
+ */
+ 			flag = false;
+ 
+ 			foreach(partCol, partAttr->colName)
+ 			{
+ 				Alias *partColName = lfirst(partCol);
+ 
+ 				if(strcmp(partColName->aliasname, strVal(lfirst(list_head(constraint->keys)))) == 0)
+ 					flag = true;
+ 			}
+ 
+ 			if(flag == false)			
+ 			{
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ 						 errmsg("PRIMARY/UNIQUE KEY (%s) not the same as"
+ 								" the partitioning keys", strVal(lfirst(list_head(constraint->keys))))));
+ 			}
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * cxt.alist contains index related entries, if so, we need to add entries
+ 	 * for the children tables too
+ 	 */
+ 	foreach(temp_part, partitionList)
+ 	{
+ 		Partition *temp_partition = lfirst(temp_part);
+ 
+ 		foreach(lc, cxt->alist)
+ 		{
+ 			IndexStmt  *index, *copyindex;
+ 
+ 			index = lfirst(lc);
+ 			Assert(IsA(index, IndexStmt));
+ 
+ 			copyindex = copyObject(index);
+ 			*(copyindex->relation) = *(temp_partition->partName);
+ 
+ 			partindexlist = lappend(partindexlist, copyindex);
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * Append partindexlist to alist now, so that the indexes get created for
+ 	 * the children tables too
+ 	 */
+ 	cxt->alist = list_concat(cxt->alist, partindexlist);
+ }
+ 
  
  /*
   * transformIndexConstraints
***************
*** 1770,1775 ****
--- 1873,1890 ----
  				cmd->subtype = AT_AddConstraint;
  				newcmds = lappend(newcmds, cmd);
  				break;
+ 
+ 			case AT_AddPartition:
+ 				newcmds = lappend(newcmds, cmd);
+ 				break;		
+ 
+ 			case AT_DropPartitionByName:
+ 				newcmds = lappend(newcmds, cmd);
+ 				break;		
+ 
+ 			case AT_DropPartitionByRange:
+ 				newcmds = lappend(newcmds, cmd);
+ 				break;		
  
  			default:
  				newcmds = lappend(newcmds, cmd);
Index: src/backend/tcop/utility.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/tcop/utility.c,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.8.2
diff -c -r1.1.1.1 -r1.1.1.1.8.2
*** src/backend/tcop/utility.c	1 Dec 2008 09:36:27 -0000	1.1.1.1
--- src/backend/tcop/utility.c	14 Feb 2009 11:30:22 -0000	1.1.1.1.8.2
***************
*** 417,427 ****
  						relOid = DefineRelation((CreateStmt *) stmt,
  												RELKIND_RELATION);
  
  						/*
  						 * Let AlterTableCreateToastTable decide if this one
  						 * needs a secondary relation too.
  						 */
- 						CommandCounterIncrement();
  						AlterTableCreateToastTable(relOid);
  					}
  					else
--- 417,432 ----
  						relOid = DefineRelation((CreateStmt *) stmt,
  												RELKIND_RELATION);
  
+ 						CommandCounterIncrement();
+ 						/*
+ 						 * Let DefinePartitions decide if partitions need to be created
+ 						 * for this table
+ 						 */
+ 						DefinePartitions((CreateStmt *)parsetree, relOid);
  						/*
  						 * Let AlterTableCreateToastTable decide if this one
  						 * needs a secondary relation too.
  						 */
  						AlterTableCreateToastTable(relOid);
  					}
  					else
Index: src/backend/utils/adt/numeric.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/utils/adt/numeric.c,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.1
diff -c -r1.1.1.1 -r1.1.1.1.10.1
*** src/backend/utils/adt/numeric.c	1 Dec 2008 09:36:27 -0000	1.1.1.1
--- src/backend/utils/adt/numeric.c	2 Feb 2009 11:07:12 -0000	1.1.1.1.10.1
***************
*** 3588,3594 ****
  	char	   *endptr;
  
  	tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
! 											  NumericGetDatum(num)));
  
  	/* unlike float8in, we ignore ERANGE from strtod */
  	val = strtod(tmp, &endptr);
--- 3588,3594 ----
  	char	   *endptr;
  
  	tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
! 		 NumericGetDatum(num)));
  
  	/* unlike float8in, we ignore ERANGE from strtod */
  	val = strtod(tmp, &endptr);
Index: src/backend/utils/adt/ruleutils.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/backend/utils/adt/ruleutils.c,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.1
diff -c -r1.1.1.1 -r1.1.1.1.10.1
*** src/backend/utils/adt/ruleutils.c	1 Dec 2008 09:36:27 -0000	1.1.1.1
--- src/backend/utils/adt/ruleutils.c	2 Feb 2009 11:07:12 -0000	1.1.1.1.10.1
***************
*** 41,46 ****
--- 41,48 ----
  #include "parser/parse_func.h"
  #include "parser/parse_oper.h"
  #include "parser/parsetree.h"
+ #include "parser/parse_clause.h"
+ #include "parser/parse_expr.h"
  #include "rewrite/rewriteHandler.h"
  #include "rewrite/rewriteManip.h"
  #include "rewrite/rewriteSupport.h"
***************
*** 6157,6160 ****
--- 6159,6200 ----
  	ReleaseSysCache(tuple);
  
  	return result;
+ }
+ 
+ /*
+  * get the text corresponding to a raw expr 
+  */
+ char *
+ pg_get_expr_text(RangeVar *relation, Node *raw_expr, StringInfo buf)
+ {
+ 	deparse_context context;
+ 	Node *opex;
+ 	Node *temp;
+ 	deparse_namespace dpns;
+ 	RangeVar *old;
+ 	RangeVar *new;
+ 	ParseState *pstate = make_parsestate(NULL);
+ 
+ 	old = makeNode(RangeVar);
+ 	old->relname = relation->relname;
+ 	old->alias = makeAlias("*OLD*", NIL);
+ 
+ 	new = makeNode(RangeVar);
+ 	new->relname = relation->relname;
+ 	new->alias = makeAlias("*NEW*", NIL);
+ 
+ 	transformFromClause(pstate, list_make3(relation, old, new));
+ 	temp = transformExpr(pstate, raw_expr);
+ 
+ 	opex = (Node *)temp;
+ 	context.buf = buf;
+ 	context.namespaces = list_make1(&dpns);
+ 	context.varprefix = true;
+ 	context.prettyFlags = PRETTYFLAG_PAREN | PRETTYFLAG_INDENT ;
+ 	context.indentLevel = PRETTYINDENT_STD;
+ 	dpns.rtable = pstate->p_rtable;
+ 	dpns.outer_plan = dpns.inner_plan = NULL;
+ 
+ 	get_rule_expr(opex, &context, false);
+ 	return (context.buf)->data;
  }
Index: src/include/catalog/indexing.h
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/catalog/indexing.h,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.2
diff -c -r1.1.1.1 -r1.1.1.1.10.2
*** src/include/catalog/indexing.h	1 Dec 2008 09:36:42 -0000	1.1.1.1
--- src/include/catalog/indexing.h	23 Feb 2009 12:33:10 -0000	1.1.1.1.10.2
***************
*** 57,62 ****
--- 57,65 ----
   * index name (much less the numeric OID).
   */
  
+ DECLARE_INDEX(pg_part_partrel_index, 2337, on pg_partition using btree(parentrelid oid_ops, partrelid oid_ops, keyorder int2_ops));
+ #define PartitionParentIndexId 2337
+ 
  DECLARE_UNIQUE_INDEX(pg_aggregate_fnoid_index, 2650, on pg_aggregate using btree(aggfnoid oid_ops));
  #define AggregateFnoidIndexId  2650
  
Index: src/include/commands/tablecmds.h
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/commands/tablecmds.h,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.6
diff -c -r1.1.1.1 -r1.1.1.1.10.6
*** src/include/commands/tablecmds.h	1 Dec 2008 09:36:42 -0000	1.1.1.1
--- src/include/commands/tablecmds.h	17 Mar 2009 12:50:55 -0000	1.1.1.1.10.6
***************
*** 18,23 ****
--- 18,30 ----
  #include "utils/relcache.h"
  
  
+ typedef struct partitionInfo
+ {
+     char    partitionName[20];
+     Value   minValue;
+     Value   maxValue;
+ }partitionInfo;
+ 
  extern Oid	DefineRelation(CreateStmt *stmt, char relkind);
  
  extern void RemoveRelations(DropStmt *drop);
***************
*** 69,73 ****
--- 76,85 ----
  extern void AtEOSubXact_on_commit_actions(bool isCommit,
  							  SubTransactionId mySubid,
  							  SubTransactionId parentSubid);
+ 
+ extern void	DefinePartitions(CreateStmt *stmt, Oid parentRelId);
+ extern int GetPartitionsCount(Oid parentOid);
+ extern int GetPartitionsCountAtt(Oid parentOid, Oid attNum);
+ extern int ValidateRanges(PartitionAttrs *partAttr, Oid parentRelOid, Oid *partitionRelOids);
  
  #endif   /* TABLECMDS_H */
Index: src/include/nodes/nodes.h
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/nodes/nodes.h,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.10.3
diff -c -r1.1.1.2 -r1.1.1.2.10.3
*** src/include/nodes/nodes.h	1 Dec 2008 09:38:07 -0000	1.1.1.2
--- src/include/nodes/nodes.h	17 Mar 2009 12:50:55 -0000	1.1.1.2.10.3
***************
*** 362,367 ****
--- 362,371 ----
  	T_XmlSerialize,
  	T_WithClause,
  	T_CommonTableExpr,
+ 	T_PartitionAttrs,
+ 	T_Partition,
+ 	T_UpdatePartitionStmt,
+ 	T_SplitPartitionStmt,
  
  	/*
  	 * TAGS FOR RANDOM OTHER STUFF
Index: src/include/nodes/parsenodes.h
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/nodes/parsenodes.h,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.8.6
diff -c -r1.1.1.2 -r1.1.1.2.8.6
*** src/include/nodes/parsenodes.h	1 Dec 2008 09:38:07 -0000	1.1.1.2
--- src/include/nodes/parsenodes.h	17 Mar 2009 12:50:55 -0000	1.1.1.2.8.6
***************
*** 1025,1031 ****
  	AT_EnableReplicaRule,		/* ENABLE REPLICA RULE name */
  	AT_DisableRule,				/* DISABLE RULE name */
  	AT_AddInherit,				/* INHERIT parent */
! 	AT_DropInherit				/* NO INHERIT parent */
  } AlterTableType;
  
  typedef struct AlterTableCmd	/* one subcommand of an ALTER TABLE */
--- 1025,1036 ----
  	AT_EnableReplicaRule,		/* ENABLE REPLICA RULE name */
  	AT_DisableRule,				/* DISABLE RULE name */
  	AT_AddInherit,				/* INHERIT parent */
! 	AT_DropInherit,				/* NO INHERIT parent */
! 	AT_AddPartition,			/* add partition */
! 	AT_DropPartitionByName,		/* drop partition by name. */
! 	AT_DropPartitionByRange,	/* drop partition by range. */
! 	AT_UpdatePartition,			/* Update the range partition. */
! 	AT_SplitPartition			/* Split range partition. */
  } AlterTableType;
  
  typedef struct AlterTableCmd	/* one subcommand of an ALTER TABLE */
***************
*** 1216,1221 ****
--- 1221,1227 ----
  	List	   *options;		/* options from WITH clause */
  	OnCommitAction oncommit;	/* what do we do at COMMIT? */
  	char	   *tablespacename; /* table space to use, or NULL */
+ 	Node	   *partAttr;		/* partitioning related information */
  } CreateStmt;
  
  /* ----------
***************
*** 1254,1271 ****
  	CONSTR_ATTR_IMMEDIATE
  } ConstrType;
  
  typedef struct Constraint
  {
! 	NodeTag		type;
! 	ConstrType	contype;
! 	char	   *name;			/* name, or NULL if unnamed */
! 	Node	   *raw_expr;		/* expr, as untransformed parse tree */
! 	char	   *cooked_expr;	/* expr, as nodeToString representation */
! 	List	   *keys;			/* String nodes naming referenced column(s) */
! 	List	   *options;		/* options from WITH clause */
! 	char	   *indexspace;		/* index tablespace for PKEY/UNIQUE
! 								 * constraints; NULL for default */
  } Constraint;
  
  /* ----------
   * Definitions for FOREIGN KEY constraints in CreateStmt
--- 1260,1338 ----
  	CONSTR_ATTR_IMMEDIATE
  } ConstrType;
  
+ 
+ /*
+  * Enumeration which defines the various possibilites for the partitioning
+  * function.
+  */
+ typedef enum PartitionFunction {
+         PART_UNDEFINED,
+         PART_LIST,
+         PART_RANGE,
+         PART_HASH
+ } PartitionFunction;
+ 
+ 
  typedef struct Constraint
  {
! 	NodeTag				type;
! 	List 				*min_value;
! 	List				*max_value;
! 	List				*partition_list_values;
! //	ConstrType			contype;
! 	PartitionFunction	contype;
! 	char	   			*name;			/* name, or NULL if unnamed */
! 	Node	   			*raw_expr;		/* expr, as untransformed parse tree */
! 	char	   			*cooked_expr;	/* expr, as nodeToString representation */
! 	List	   			*keys;			/* String nodes naming referenced column(s) */
! 	List	   			*options;		/* options from WITH clause */
! 	char	   			*indexspace;	/* index tablespace for PKEY/UNIQUE
! 								 			* constraints; NULL for default */
  } Constraint;
+ 
+ //typedef struct PartitionConstraint
+ //{
+ //	Node			*min_value;
+ //  	Node			*max_value;
+ //	List 			*list_partition_values;
+ //	PartitionFunction 	contype;
+ //	char	   		*name;			/* name, or NULL if unnamed */
+ //	Node	   		*raw_expr;		/* expr, as untransformed parse tree */
+ //	char	   		*cooked_expr;		/* expr, as nodeToString representation */
+ //	List	   		*keys;			/* String nodes naming referenced column(s) */
+ //	List	   		*options;		/* options from WITH clause */
+ //	char	   		*indexspace;		/* index tablespace for PKEY/UNIQUE
+ //								 * constraints; NULL for default */
+ // } PartitionConstraint;
+ 
+ typedef struct Partition {
+ 	NodeTag		type;
+ 	RangeVar	*partName;          /* Name of the partition */
+ 	Constraint	*partitionCheck;    /* per partition constraint */
+ 	char 		*tablespacename;	/* tablespace name*/ 
+ } Partition;
+ 
+ typedef struct UpdatePartitionStmt
+ {
+ 	NodeTag		type;
+ 	Partition 	*prev;
+ 	Partition 	*after;
+ }UpdatePartitionStmt;
+ 
+ typedef struct SplitPartitionStmt
+ {
+ 	NodeTag 	type;
+ 	List		*partNames;
+ 	List		*splitValues;
+ }SplitPartitionStmt;
+ 
+ typedef struct PartitionAttrs {
+ 	NodeTag				type;
+ 	int     			numberOfPartitions; /* Number of partitions */
+ 	List    			*partitions;  		/* The list of partitions */
+ 	PartitionFunction 	partFunc; 			/* type of partition */
+ 	List 				*colName;           /* partition column name */
+ } PartitionAttrs;
  
  /* ----------
   * Definitions for FOREIGN KEY constraints in CreateStmt
Index: src/include/optimizer/planner.h
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/optimizer/planner.h,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.1
diff -c -r1.1.1.1 -r1.1.1.1.10.1
*** src/include/optimizer/planner.h	1 Dec 2008 09:36:42 -0000	1.1.1.1
--- src/include/optimizer/planner.h	4 Mar 2009 08:30:42 -0000	1.1.1.1.10.1
***************
*** 33,37 ****
  				 PlannerInfo *parent_root,
  				 bool hasRecursion, double tuple_fraction,
  				 PlannerInfo **subroot);
- 
  #endif   /* PLANNER_H */
--- 33,36 ----
Index: src/include/utils/builtins.h
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/include/utils/builtins.h,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.1
diff -c -r1.1.1.1 -r1.1.1.1.10.1
*** src/include/utils/builtins.h	1 Dec 2008 09:36:46 -0000	1.1.1.1
--- src/include/utils/builtins.h	2 Feb 2009 11:07:12 -0000	1.1.1.1.10.1
***************
*** 16,21 ****
--- 16,23 ----
  
  #include "fmgr.h"
  #include "nodes/parsenodes.h"
+ #include "nodes/primnodes.h"
+ #include "lib/stringinfo.h"
  
  /*
   *		Defined in adt/
***************
*** 562,567 ****
--- 564,571 ----
  extern const char *quote_identifier(const char *ident);
  extern char *quote_qualified_identifier(const char *namespace,
  						   const char *ident);
+ extern char *pg_get_expr_text(RangeVar *relation, 
+ 						Node *raw_expr, StringInfo buf);
  
  /* tid.c */
  extern Datum tidin(PG_FUNCTION_ARGS);
Index: src/test/regress/regress.c
===================================================================
RCS file: /mart/pgsql_home/pgrepo/pgsql_init/src/test/regress/regress.c,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.10.8
diff -c -r1.1.1.1 -r1.1.1.1.10.8
*** src/test/regress/regress.c	1 Dec 2008 09:36:47 -0000	1.1.1.1
--- src/test/regress/regress.c	9 Mar 2009 12:55:31 -0000	1.1.1.1.10.8
***************
*** 11,16 ****
--- 11,34 ----
  #include "executor/executor.h"	/* For GetAttributeByName */
  #include "commands/sequence.h"	/* for nextval() */
  
+ #include "postgres.h"
+ #include "executor/spi.h"
+ #include "commands/trigger.h"
+ 
+ #include "optimizer/plancat.h" 
+ #include "utils/fmgroids.h"
+ #include "catalog/pg_partition.h"
+ #include "catalog/indexing.h"
+ #include "utils/tqual.h"
+ #include "utils/lsyscache.h"
+ #include "access/hash.h"
+ 
+ extern Datum trigf(PG_FUNCTION_ARGS);
+ 
+ PG_FUNCTION_INFO_V1(trigf);
+ 
+ #define     ItemPointerGetDatum(X)   PointerGetDatum(X)
+ 
  #define P_MAXDIG 12
  #define LDELIM			'('
  #define RDELIM			')'
***************
*** 27,37 ****
  extern int	oldstyle_length(int n, text *t);
  extern Datum int44in(PG_FUNCTION_ARGS);
  extern Datum int44out(PG_FUNCTION_ARGS);
! 
  #ifdef PG_MODULE_MAGIC
  PG_MODULE_MAGIC;
  #endif
  
  
  /*
   * Distance from a point to a path
--- 45,365 ----
  extern int	oldstyle_length(int n, text *t);
  extern Datum int44in(PG_FUNCTION_ARGS);
  extern Datum int44out(PG_FUNCTION_ARGS);
! extern Oid find_inheritance_parent(Oid inhrelid);
! extern Oid get_relevent_hash_partition(HeapTuple tuple, Relation rel);
! Datum  partition_update_trigger(PG_FUNCTION_ARGS); 
! Datum  partition_constraints_hash(PG_FUNCTION_ARGS);
! Datum  partition_insert_trigger_hash(PG_FUNCTION_ARGS);
! Datum  partition_insert_trigger(PG_FUNCTION_ARGS);
  #ifdef PG_MODULE_MAGIC
  PG_MODULE_MAGIC;
  #endif
  
+ PG_FUNCTION_INFO_V1(partition_update_trigger);
+ 
+ /*
+  * Update trigger for all partitions.
+  * Update is implemented as delete + insert operation.
+  * Delete the OLD row from child table and insert the NEW
+  * row on parent table.
+  */
+ 
+ Datum
+ partition_update_trigger(PG_FUNCTION_ARGS)
+ {
+     int col;
+     TriggerData *trigdata = (TriggerData *) fcinfo->context;
+     HeapTuple   trigtuple = trigdata->tg_trigtuple;
+     HeapTuple   newtuple  = trigdata->tg_newtuple;
+ 	HeapTuple 	dummyTuple = NULL;
+ 
+     Relation    parent_relation = RelationIdGetRelation(
+                                     find_inheritance_parent (
+ 									    RelationGetRelid(trigdata->tg_relation)));
+ 
+     StringInfo	delquery = makeStringInfo();
+     StringInfo	insquery = makeStringInfo();
+     bool        isnull;
+ 
+ 	resetStringInfo(delquery);
+ 
+     appendStringInfo( delquery, " delete from %s where ctid = '%s';",
+                     RelationGetRelationName(trigdata->tg_relation),
+                      DatumGetCString(OidFunctionCall1(F_TIDOUT,
+                     ItemPointerGetDatum(&trigtuple->t_self) ))
+                 );
+ 
+ //    elog(NOTICE,"query is :- %s",delquery->data);
+     SPI_connect();
+     SPI_exec(delquery->data,1);
+     SPI_finish();
+ 
+     appendStringInfo(insquery," insert into %s values ('", RelationGetRelationName(parent_relation));
+     for ( col = 1; col <= trigdata->tg_relation->rd_att->natts; col++  )
+     {
+ 
+         Oid typoutput;
+ 		bool typIsVarlen ;
+         getTypeOutputInfo ( trigdata->tg_relation->rd_att->attrs[col-1]->atttypid , &typoutput , &typIsVarlen );
+         Datum d = heap_getattr(newtuple,col,trigdata->tg_relation->rd_att,&isnull);
+         appendStringInfo(insquery ,"%s%s",
+                 DatumGetCString (OidFunctionCall1(typoutput,d)),
+                 col < (trigdata->tg_relation->rd_att->natts) ? "','" :"');"
+                 );
+     }
+ 
+     SPI_connect();
+     SPI_exec(insquery->data,1);
+     SPI_finish();
+ 
+     RelationClose(parent_relation);
+ 
+ 	/* Form a dummy-tuple to count no. of rows modified. */
+ 	dummyTuple = palloc(newtuple->t_len);
+ 	dummyTuple->t_len = -1;
+ 
+     return PointerGetDatum(dummyTuple);
+ }
+ 
+ /*
+  * Constaints for hash partitions are implemented by monitoring inserts to child tables
+  * using following trigger which computes the hash value of partition key attributes 
+  * and allows insert iff it matches with its designated hash value as specified  
+  * in pg_partition catalog table.
+  */
+ PG_FUNCTION_INFO_V1(partition_constraints_hash);
+ Datum
+ partition_constraints_hash(PG_FUNCTION_ARGS)
+ {
+     TriggerData *trigdata = (TriggerData *) fcinfo->context;
+     HeapTuple   trigtuple = trigdata->tg_trigtuple;
+     Relation    relation = trigdata->tg_relation;
+     Relation 	pg_partrel;
+     Oid 		relation_id;
+ 	ScanKeyData skey;
+ 	SysScanDesc pg_partscan;
+ 	HeapTuple   pg_parttup;
+ 	int         hash_partition_entries = 0;
+ 	List        *distinct_part_key_list=NULL;
+ 	StringInfo	str_to_hash = makeStringInfo();
+ 	unsigned int hashValueFromCatalog;
+ 	Oid			parentOid;
+ 
+     /* make sure it's called as a trigger at all */
+     if (!CALLED_AS_TRIGGER(fcinfo))
+         elog(ERROR, "partition_insert_trigger: not called by trigger manager");
+ 
+     /* Sanity checks */
+     if (!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event) || !TRIGGER_FIRED_BEFORE(trigdata->tg_event))
+         elog(ERROR, "partition_insert_trigger: not called on insert before");
+ 
+     relation_id = RelationGetRelid(relation);
+ 
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_partrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(relation)));
+ 
+     pg_partrel = heap_open(PartitionRelationId, AccessShareLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+     {
+         bool            isnull, typbyval;
+         int16           len;
+         bool            isVariableLength;
+         Oid             typoutput;
+ 
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+ 		if(pg_part->partrelid == RelationGetRelid(relation))
+ 		{
+         	if(pg_part->parttype == PART_HASH)
+ 	        {
+     	        if (!list_member_int(distinct_part_key_list, pg_part->partkey))
+         	    {
+             	    distinct_part_key_list = lappend_int(distinct_part_key_list, pg_part->partkey);
+ 
+                 	/* Get attribute from tuple */
+ 	                Datum attr      = heap_getattr(trigtuple, pg_part->partkey, relation->rd_att, &isnull);
+ 
+         	        /* Get len and typbyval from pg_type */
+             	    get_typlenbyval(pg_part->keytype, &len, &typbyval);
+ 
+         	        /* Read the list value */
+             	    getTypeOutputInfo(pg_part->keytype, &typoutput, &isVariableLength);
+                 	appendStringInfo(str_to_hash, DatumGetCString(OidFunctionCall1(typoutput, attr)));
+ 	
+ 					parentOid = pg_part->parentrelid;
+ 					hashValueFromCatalog = heap_getattr (pg_parttup, Anum_pg_partition_hashval, pg_partrel->rd_att, &isnull);
+         	    }
+         	}
+ 		}
+     }
+ 
+     systable_endscan(pg_partscan);
+     heap_close(pg_partrel, AccessShareLock);
+ 
+     /*Scan key to scan pg_partition table on parentrelid*/
+     ScanKeyInit(&skey,
+                 Anum_pg_partition_parentrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(parentOid));
+ 
+     pg_partrel = heap_open(PartitionRelationId, AccessShareLock);
+     pg_partscan = systable_beginscan(pg_partrel, PartitionParentIndexId, true,
+                                 SnapshotNow, 1, &skey);
+ 
+ 	hash_partition_entries = 0;
+ 
+     while (HeapTupleIsValid(pg_parttup= systable_getnext(pg_partscan)))
+     {
+         /* Instead of pg_part Use heap_getattr for accessing bytea coluns */
+         Form_pg_partition pg_part = (Form_pg_partition) GETSTRUCT(pg_parttup);
+ 
+         if(pg_part->parttype == PART_HASH)
+         {
+             /* Increase the no. of hash partition entries count. */
+ 			hash_partition_entries++;
+ 		}
+ 	}
+ 
+     systable_endscan(pg_partscan);
+     heap_close(pg_partrel, AccessShareLock);
+ 
+     unsigned int hashValue = DatumGetUInt32(hash_any(str_to_hash->data, strlen(str_to_hash->data))) %
+                 (int)(hash_partition_entries / distinct_part_key_list->length);
+ 
+ //	elog(NOTICE, "String to be hashed : %s", str_to_hash->data);
+ 
+ 	if(hashValueFromCatalog != hashValue)
+ 		elog(ERROR, "This row can not be inserted. Invalid hash value '%d'; expected '%d'", hashValue, hashValueFromCatalog);
+ 	
+     return PointerGetDatum(trigtuple);
+ }
+ 
+ PG_FUNCTION_INFO_V1(partition_insert_trigger_hash);
+ Datum
+ partition_insert_trigger_hash(PG_FUNCTION_ARGS)
+ {
+     TriggerData *trigdata = (TriggerData *) fcinfo->context;
+     HeapTuple   trigtuple= trigdata->tg_trigtuple;
+     Relation    parent_relation = trigdata->tg_relation;
+ 
+     char        *child_table_name;
+     Relation 	child_table_relation;
+     Oid 		relation_id,parent_reloid;
+ 
+     /* make sure it's called as a trigger at all */
+     if (!CALLED_AS_TRIGGER(fcinfo))
+         elog(ERROR, "partition_insert_trigger: not called by trigger manager");
+ 
+     /* Sanity checks */
+     if (!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event) || !TRIGGER_FIRED_BEFORE(trigdata->tg_event))
+         elog(ERROR, "partition_insert_trigger: not called on insert before");
+ 
+     parent_reloid = RelationGetRelid(parent_relation);
+ 
+     relation_id = get_relevent_hash_partition(trigtuple, parent_relation);
+     if (relation_id == InvalidOid)
+         elog(ERROR, "partition_insert_trigger: Invalid child table %s", child_table_name);
+ 
+     child_table_relation = RelationIdGetRelation(relation_id);
+     if (child_table_relation == NULL)
+     elog(ERROR, "partition_insert_trigger: Failed to locate relation for child table %s", child_table_name);
+ 
+     ResultRelInfo *resultRelInfo;
+     TupleTableSlot *slot;
+     EState *estate= CreateExecutorState();
+ 
+     resultRelInfo = makeNode(ResultRelInfo);
+     resultRelInfo->ri_RangeTableIndex = 1;
+     resultRelInfo->ri_RelationDesc = child_table_relation;
+ 
+     estate->es_result_relations = resultRelInfo;
+     estate->es_num_result_relations = 1;
+     estate->es_result_relation_info = resultRelInfo;
+ 
+ 	/* Set up a tuple slot too */
+     slot = MakeSingleTupleTableSlot(trigdata->tg_relation->rd_att);
+     ExecStoreTuple(trigtuple, slot, InvalidBuffer, false);
+ 
+ 	ExecConstraints(resultRelInfo, slot, estate);
+ 
+     heap_insert(child_table_relation, trigtuple, GetCurrentCommandId(true), 0, NULL);
+     RelationClose(child_table_relation);
+     ExecDropSingleTupleTableSlot(slot);
+     FreeExecutorState (estate);
+ 	
+ 	/* Form a dummy-tuple to count no. of rows modified. */
+ 	trigtuple->t_len = -1;
+ 
+     return PointerGetDatum(trigtuple);
+ }
+ 
+ Oid
+ get_relevent_partition(HeapTuple tuple, Relation rel);
+ 
+ PG_FUNCTION_INFO_V1(partition_insert_trigger);
+ Datum
+ partition_insert_trigger(PG_FUNCTION_ARGS)
+ {
+     TriggerData *trigdata = (TriggerData *) fcinfo->context;
+     HeapTuple   trigtuple= trigdata->tg_trigtuple;
+     Relation    parent_relation = trigdata->tg_relation;
+ 
+     char        *child_table_name;
+     Relation child_table_relation;
+     Oid relation_id,parent_reloid;
+ 
+     /* make sure it's called as a trigger at all */
+     if (!CALLED_AS_TRIGGER(fcinfo))
+         elog(ERROR, "partition_insert_trigger: not called by trigger manager");
+ 
+     /* Sanity checks */
+     if (!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event) || !TRIGGER_FIRED_BEFORE(trigdata->tg_event))
+         elog(ERROR, "partition_insert_trigger: not called on insert before");
+ 
+ 	parent_reloid = RelationGetRelid(parent_relation);
+ 
+ 	relation_id = get_relevent_partition(trigtuple, parent_relation);
+     if (relation_id == InvalidOid)
+         elog(ERROR, "Could not find valid partition ... ");
+ 
+     child_table_relation = RelationIdGetRelation(relation_id);
+     if (child_table_relation == NULL)
+     elog(ERROR, "partition_insert_trigger: Failed to locate relation for child table %s", child_table_name);
+ 
+     ResultRelInfo *resultRelInfo;
+     TupleTableSlot *slot;
+     EState *estate= CreateExecutorState();
+ 
+     resultRelInfo = makeNode(ResultRelInfo);
+     resultRelInfo->ri_RangeTableIndex = 1;
+     resultRelInfo->ri_RelationDesc = child_table_relation;
+ 
+     estate->es_result_relations = resultRelInfo;
+     estate->es_num_result_relations = 1;
+     estate->es_result_relation_info = resultRelInfo;
+ 
+ 	/* Set up a tuple slot too */
+     slot = MakeSingleTupleTableSlot(trigdata->tg_relation->rd_att);
+     ExecStoreTuple(trigtuple, slot, InvalidBuffer, false);
+ 
+ 	ExecConstraints(resultRelInfo, slot, estate);
+ 
+     heap_insert(child_table_relation, trigtuple, GetCurrentCommandId(true), 0, NULL);
+     RelationClose(child_table_relation);
+     ExecDropSingleTupleTableSlot(slot);
+     FreeExecutorState (estate);
+ 
+ 	/* Form a dummy-tuple to count no. of rows modified. */
+ 	trigtuple->t_len = -1;
+ 
+     return PointerGetDatum(trigtuple);
+ }
  
  /*
   * Distance from a point to a path
partition.sqlapplication/octet-stream; name=partition.sqlDownload
#2Nikhil Sontakke
nikhil.sontakke@enterprisedb.com
In reply to: Kedar Potdar (#1)
Re: Partitioning feature ...

Hi Kedar,

The syntax used conforms to most of the suggestions mentioned in
http://archives.postgresql.org/pgsql-hackers/2008-01/msg00413.php, barring
the following:
-- Specification of partition names is optional. System will be able to
generate partition names in such cases.
-- Sub partitioning

I was wondering if there is a need to mention the type of partition while
dropping it.

E.g
ALTER table x DROP RANGE PARTITION x_part;

The type of partition (RANGE, HASH) could be dropped according to me.

We are maintaining a system catalog(pg_partition) for partition meta-data.
System will look-up this table to find appropriate partition to operate on.
System internally uses low-level 'C' triggers to row-movement.

Can you elaborate more on how do you handle updates with these triggers?

Regards,
Nikhils
--
http://www.enterprisedb.com

#3Kedar Potdar
kedar.potdar@gmail.com
In reply to: Nikhil Sontakke (#2)
Re: Partitioning feature ...

Hi Nikhil,

Update operation is performed as a combination of 'delete' and 'insert'.

In Update trigger, the row is deleted from relation according to it's
'ctid'. A look-up on system catalog for partitions is performed to identify
the target table by evaluating values of partition-key attributes, of the
given row. The constraints of this target table are evaluated for this new
row and if found valid, the row is inserted.

Regards,
--
Kedar.

On Mon, Mar 23, 2009 at 5:09 PM, Nikhil Sontakke <
nikhil.sontakke@enterprisedb.com> wrote:

Show quoted text

Hi Kedar,

The syntax used conforms to most of the suggestions mentioned in
http://archives.postgresql.org/pgsql-hackers/2008-01/msg00413.php,
barring the following:
-- Specification of partition names is optional. System will be able to
generate partition names in such cases.
-- Sub partitioning

I was wondering if there is a need to mention the type of partition while
dropping it.

E.g
ALTER table x DROP RANGE PARTITION x_part;

The type of partition (RANGE, HASH) could be dropped according to me.

We are maintaining a system catalog(pg_partition) for partition meta-data.
System will look-up this table to find appropriate partition to operate on.
System internally uses low-level 'C' triggers to row-movement.

Can you elaborate more on how do you handle updates with these triggers?

Regards,
Nikhils
--
http://www.enterprisedb.com

#4Emmanuel Cecchet
manu@asterdata.com
In reply to: Kedar Potdar (#1)
Re: Partitioning feature ...

Hi Kedar,

First of all, congratulations for the excellent work.
I have some comments and questions.

In get_relevent_partition (btw, relevant is spelled with an a) you are
maintaining 2 lists. I guess this is only useful for multi-column
partitions, right?
If you have a single column partition (without subpartitions), I think
you could directly return on the first match (without maintaining any
list) since you guarantee that there is no overlap between partitions.
A simple but effective optimization for inserts consists of caching the
last partition used (consecutive inserts often go to the same partition)
and try it first before going through the whole loop.

The update trigger should first check if the tuple needs to be moved. If
the updated tuple still matches the constraints of the partitions it
will not have to be moved and will save a lot of overhead.

The COPY operation should probably be optimized to use the same code as
the one in the insert trigger for partitioned tables. I guess some code
could be factorized in COPY to make the inserts more efficient.

The current trigger approach should prevent other triggers to be added
to the table, or you should make sure that the partition trigger is
always the one to execute last.

As we don't have automatic partition creation, it would be interesting
to have an optional mechanism to deal with tuples that don't match any
partition (very useful when you do bulk insert and some new data require
a new partition). Having a simple overflow partition or an error logging
mechanism would definitely help to identify these tuples and prevent
things like large COPY operations to fail.

Looking forward to your responses,
Emmanuel

We are implementing table partitioning feature to support Range and
Hash partitions. Please find attached, the WIP patch and test-cases.

The syntax used conforms to most of the suggestions mentioned in
http://archives.postgresql.org/pgsql-hackers/2008-01/msg00413.php,
barring the following:
-- Specification of partition names is optional. System will be able
to generate partition names in such cases.
-- Sub partitioning

We are maintaining a system catalog(pg_partition) for partition
meta-data. System will look-up this table to find appropriate
partition to operate on.
System internally uses low-level 'C' triggers to row-movement.

Regards,
--
Kedar.

------------------------------------------------------------------------

--
Emmanuel Cecchet
Aster Data Systems
Web: http://www.asterdata.com

#5Kedar Potdar
kedar.potdar@gmail.com
In reply to: Emmanuel Cecchet (#4)
Re: Partitioning feature ...

Hi Emmanuel,

Thanks for your time. This is a WIP patch and we will integrate your
suggestions/comments as appropriate.

Regards,
--
Kedar.

On Fri, Mar 27, 2009 at 3:38 AM, Emmanuel Cecchet <manu@asterdata.com>wrote:

Hi Kedar,

First of all, congratulations for the excellent work.
I have some comments and questions.

In get_relevent_partition (btw, relevant is spelled with an a) you are
maintaining 2 lists.

Oops! 'a' typographical error.

I guess this is only useful for multi-column partitions, right?

If you have a single column partition (without subpartitions), I think you
could directly return on the first match (without maintaining any list)
since you guarantee that there is no overlap between partitions.
A simple but effective optimization for inserts consists of caching the
last partition used (consecutive inserts often go to the same partition) and
try it first before going through the whole loop.

Yep.

The update trigger should first check if the tuple needs to be moved. If
the updated tuple still matches the constraints of the partitions it will
not have to be moved and will save a lot of overhead.

Yes. We agree on that.

The COPY operation should probably be optimized to use the same code as the
one in the insert trigger for partitioned tables. I guess some code could be
factorized in COPY to make the inserts more efficient.

The current trigger approach should prevent other triggers to be added to
the table, or you should make sure that the partition trigger is always the
one to execute last.

As triggers are executed in order of their names, we've prefixed the

trigger names with "zz". This should work fine as long as no-one uses
trigger-name which starts with "zz".

Show quoted text

As we don't have automatic partition creation, it would be interesting to
have an optional mechanism to deal with tuples that don't match any
partition (very useful when you do bulk insert and some new data require a
new partition). Having a simple overflow partition or an error logging
mechanism would definitely help to identify these tuples and prevent things
like large COPY operations to fail.

Will get back on this.

Looking forward to your responses,
Emmanuel

We are implementing table partitioning feature to support Range and Hash
partitions. Please find attached, the WIP patch and test-cases.

The syntax used conforms to most of the suggestions mentioned in
http://archives.postgresql.org/pgsql-hackers/2008-01/msg00413.php,
barring the following:
-- Specification of partition names is optional. System will be able to
generate partition names in such cases.
-- Sub partitioning

We are maintaining a system catalog(pg_partition) for partition meta-data.
System will look-up this table to find appropriate partition to operate on.
System internally uses low-level 'C' triggers to row-movement.

Regards,
--
Kedar.

------------------------------------------------------------------------

--
Emmanuel Cecchet
Aster Data Systems
Web: http://www.asterdata.com

#6Jaime Casanova
jcasanov@systemguards.com.ec
In reply to: Kedar Potdar (#5)
Re: Partitioning feature ...

On Mon, Mar 30, 2009 at 6:51 AM, Kedar Potdar <kedar.potdar@gmail.com> wrote:

  As triggers are executed in order of their names, we've prefixed the
trigger names with "zz". This should work fine as long as no-one uses
trigger-name which starts with "zz".

this seems a lot fragile... why system generated triggers has to be
executed following the same rules (talking about order of execution)
as user triggers? can't we simply execute them first or last or maybe
be clever and mark one to be executed first and others last?

--
Atentamente,
Jaime Casanova
Soporte y capacitación de PostgreSQL
Asesoría y desarrollo de sistemas
Guayaquil - Ecuador
Cel. +59387171157

#7Emmanuel Cecchet
manu@frogthinker.org
In reply to: Jaime Casanova (#6)
Re: Partitioning feature ...

I agree with Jaime that system triggers should execute independently of
user triggers.
In the particular case of partitioning, the system trigger should
execute after the user triggers. However, as the partitioning trigger is
a row level trigger, it is not clear what is going to happen with user
triggers that work at the statement level.

Emmanuel

Jaime Casanova wrote:

On Mon, Mar 30, 2009 at 6:51 AM, Kedar Potdar <kedar.potdar@gmail.com> wrote:

As triggers are executed in order of their names, we've prefixed the
trigger names with "zz". This should work fine as long as no-one uses
trigger-name which starts with "zz".

this seems a lot fragile... why system generated triggers has to be
executed following the same rules (talking about order of execution)
as user triggers? can't we simply execute them first or last or maybe
be clever and mark one to be executed first and others last?

--
Emmanuel Cecchet
FTO @ Frog Thinker
Open Source Development & Consulting
--
Web: http://www.frogthinker.org
email: manu@frogthinker.org
Skype: emmanuel_cecchet

#8Nikhil Sontakke
nikhil.sontakke@enterprisedb.com
In reply to: Jaime Casanova (#6)
Re: Partitioning feature ...

Hi,

As triggers are executed in order of their names, we've prefixed the
trigger names with "zz". This should work fine as long as no-one uses
trigger-name which starts with "zz".

this seems a lot fragile... why system generated triggers has to be
executed following the same rules (talking about order of execution)
as user triggers? can't we simply execute them first or last or maybe
be clever and mark one to be executed first and others last?

AFAICS, we do not have any category like system triggers. So yeah, it would
have been nice to generate triggers with names (starting with __ for
example) for such special triggers. But I don't think we disallow
user-triggers starting with underscores etc.

So some of the options could be:

- to add a new column in pg_trigger to indicate special or system triggers
which can be executed last (sorted order if multiple entries)

or

- invent a prefix "__partition__" or something and disallow user triggers to
use such a prefix for their names, plus introduce logic to execute them
(again sorted order if multiple entries) last.

Regards,
Nikhils
--
http://www.enterprisedb.com

#9Alvaro Herrera
alvherre@commandprompt.com
In reply to: Nikhil Sontakke (#8)
Re: Partitioning feature ...

Nikhil Sontakke escribi�:

As triggers are executed in order of their names, we've prefixed the
trigger names with "zz". This should work fine as long as no-one uses
trigger-name which starts with "zz".

this seems a lot fragile... why system generated triggers has to be
executed following the same rules (talking about order of execution)
as user triggers? can't we simply execute them first or last or maybe
be clever and mark one to be executed first and others last?

AFAICS, we do not have any category like system triggers. So yeah, it would
have been nice to generate triggers with names (starting with __ for
example) for such special triggers. But I don't think we disallow
user-triggers starting with underscores etc.

We already have system triggers -- the FK triggers. I don't think we've
had all that much trouble with them.

--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.

#10Jaime Casanova
jcasanov@systemguards.com.ec
In reply to: Alvaro Herrera (#9)
Re: Partitioning feature ...

On Tue, Mar 31, 2009 at 9:46 AM, Alvaro Herrera
<alvherre@commandprompt.com> wrote:

AFAICS, we do not have any category like system triggers. So yeah, it would
have been nice to generate triggers with names (starting with __ for
example) for such special triggers. But I don't think we disallow
user-triggers starting with underscores etc.

We already have system triggers -- the FK triggers.  I don't think we've
had all that much trouble with them.

yeah! but we mark them with pg_trigger.tgisconstraint, but i'm not
completely convinced that we should use that same field

--
Atentamente,
Jaime Casanova
Soporte y capacitación de PostgreSQL
Asesoría y desarrollo de sistemas
Guayaquil - Ecuador
Cel. +59387171157

#11Tom Lane
tgl@sss.pgh.pa.us
In reply to: Alvaro Herrera (#9)
Re: Partitioning feature ...

Alvaro Herrera <alvherre@commandprompt.com> writes:

Nikhil Sontakke escribi�:

As triggers are executed in order of their names, we've prefixed the
trigger names with "zz". This should work fine as long as no-one uses
trigger-name which starts with "zz".

this seems a lot fragile...

We already have system triggers -- the FK triggers. I don't think we've
had all that much trouble with them.

In the case of the FK triggers, it's intentional (and maybe even
documented) that users should be able to place their own triggers before
or after the FK triggers. Is there a good reason why partitioning
triggers should be different? If there is, maybe the feature shouldn't
be implemented via triggers in the first place.

regards, tom lane

#12Andrew Dunstan
andrew@dunslane.net
In reply to: Tom Lane (#11)
Re: Partitioning feature ...

Tom Lane wrote:

We already have system triggers -- the FK triggers. I don't think we've
had all that much trouble with them.

In the case of the FK triggers, it's intentional (and maybe even
documented) that users should be able to place their own triggers before
or after the FK triggers.

If it's documented I think it's well hidden ;-) ISTM that the fact that
we implement FK constraints via triggers is really an implementation
detail, not something the user should be encouraged to mess with.

Is there a good reason why partitioning
triggers should be different?

Probably not. ISTM that the scheme should turn tgisconstraint into a
multi-valued item (tgkind: 'u' = userland, 'c'= constraint, 'p' =
partition or some such).

cheers

andrew

#13Emmanuel Cecchet
manu@asterdata.com
In reply to: Tom Lane (#11)
Re: Partitioning feature ...

Yes, there is a good reason. As a trigger can update the tuple value,
this can change the routing decision. If you have a user trigger that
tries to change the key value after the partition choice has been made,
this will lead to an integrity constraint violation which is probably
not what the user expects.
Note that user triggers with partitions will be tricky anyway
(regardless of how partitioning is implemented, that is with triggers or
not). If 2 partitions have user triggers that update the key value to
bounce the tuple to the other partition you may end up with an infinite
loop.
I am not sure what the semantic of statement triggers (still user
triggers) should be on partitioned tables.
We will probably have to come up with restrictions on triggers so that
they can only be applied to the parent table and not on child tables to
prevent nasty issues.

Emmanuel

Tom Lane wrote:

In the case of the FK triggers, it's intentional (and maybe even
documented) that users should be able to place their own triggers before
or after the FK triggers. Is there a good reason why partitioning
triggers should be different? If there is, maybe the feature shouldn't
be implemented via triggers in the first place.

regards, tom lane

--
Emmanuel Cecchet
Aster Data Systems
Web: http://www.asterdata.com

#14Tom Lane
tgl@sss.pgh.pa.us
In reply to: Emmanuel Cecchet (#13)
Re: Partitioning feature ...

Emmanuel Cecchet <manu@asterdata.com> writes:

Yes, there is a good reason. As a trigger can update the tuple value,
this can change the routing decision. If you have a user trigger that
tries to change the key value after the partition choice has been made,
this will lead to an integrity constraint violation which is probably
not what the user expects.

[ shrug... ] Badly written user triggers can break FK constraints,
too. We've tolerated that in the past because preventing it disables
useful capabilities.

I remain of the opinion that if you think you *have to* execute last,
you should not be writing this as a trigger; you'd be better off
embedding it lower in the system.

regards, tom lane

#15Josh Berkus
josh@agliodbs.com
In reply to: Emmanuel Cecchet (#13)
Re: Partitioning feature ...

On 3/31/09 9:45 AM, Emmanuel Cecchet wrote:

Yes, there is a good reason. As a trigger can update the tuple value,
this can change the routing decision. If you have a user trigger that
tries to change the key value after the partition choice has been made,
this will lead to an integrity constraint violation which is probably
not what the user expects.

Actually, it's worse. Depending on the timing of the triggers, it's
possible to bypass the FK check entirely, and you can end up with
inconsistent data.

--Josh

#16Nikhil Sontakke
nikhil.sontakke@enterprisedb.com
In reply to: Andrew Dunstan (#12)
Re: Partitioning feature ...

Hi,

We already have system triggers -- the FK triggers. I don't think we've

had all that much trouble with them.

In the case of the FK triggers, it's intentional (and maybe even
documented) that users should be able to place their own triggers before
or after the FK triggers.

If it's documented I think it's well hidden ;-) ISTM that the fact that we
implement FK constraints via triggers is really an implementation detail,
not something the user should be encouraged to mess with.

Is there a good reason why partitioning

triggers should be different?

Probably not. ISTM that the scheme should turn tgisconstraint into a
multi-valued item (tgkind: 'u' = userland, 'c'= constraint, 'p' = partition
or some such).

+1.

This seems to be the best way forward if we stick to triggers for
partitioning. I think they appear to serve the purpose well for this
use-case and maybe with this scheme they will be low-level enough too.

Regards,
Nikhils
--
http://www.enterprisedb.com