diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 788464e..432a4aa 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -1931,6 +1931,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) List *newcmds = NIL; bool skipValidation = true; AlterTableCmd *newcmd; + Oid index_oid = InvalidOid; /* * We must not scribble on the passed-in AlterTableStmt, so copy it. (This @@ -2047,35 +2048,152 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) transformFKConstraints(pstate, &cxt, skipValidation, true); - /* TODO: look at cxt->pkey->options and see if it has 'USING INDEX'; if yes - then cook-up an ALTER TABLE DROP CONSTRAINT for table's primary key, - and push into newcmds + /* + * If the PRIMARY KEY clasue has WITH INDEX option set, then prepare to use + * that index as the primary key. + */ + if (cxt.pkey) + foreach(l, cxt.pkey->options) + { + DefElem *def = (DefElem*)lfirst(l); + Oid pkey_oid; + char *index_name; + char *oid_string; + List *index_name_list; + RangeVar *index_rv; + Relation index_rel; + + if (def->defnamespace == NULL && 0 == strcmp(def->defname, "index")) + { + if (!(def->defaction == DEFELEM_UNSPEC || def->defaction == DEFELEM_SET)) + elog(ERROR, "index option for a primary key has a syntax error." ); + } + else + continue; + + /* + * If we don't do this, WITH INDEX will get to DefineIndex(), and it will + * throw a fit. + */ + if (index_oid != InvalidOid) + elog(ERROR, "only one WITH INDEX option can be specified for a primary key."); + + if (!IsA(def->arg, String)) + elog(ERROR, "WITH INDEX option for primary key should be a string value"); + + index_name = strVal(def->arg); - Check if cxt->pkey->options has a 'USING INDEX' element - take an exclusive lock on that index + index_name_list = stringToQualifiedNameList(index_name); - Does this table have a primary key - check if index mentioned in cxt->pkey matches that PKEY definition, - Does column list match - Does index type metch (BTree for now) - Do they have the same owner + /* TODO: Should we assert that this is not a qualified name? */ + index_rv = makeRangeVarFromNameList(index_name_list); - Append a new command to newcmds to drop the PKey constraint - use 'rel' variable to get primary key's OID ( modify and reuse relationHasPrimaryKey() ) - use relation_open() to get pkey's relation - use the returned Relation->rd_rel->relname to build DROP CONSTRAINT command - set missingok member of the command so that this would work even if there was already a DROP CONSTRAINT for the PKey. - push this command to newcmds + index_rel = relation_openrv(index_rv, AccessExclusiveLock); - Chenge the 'USING INDEX' element, and replace index name in Value* to have decimal representation of index's OID. - This will be used by ATExecAddIndex(). - string = DatumGetCString(DirectFunctionCall1(oidout, - ObjectIdGetDatum(tupleOid))); + index_oid = RelationGetRelid(index_rel); + + oid_string = DatumGetCString(DirectFunctionCall1(oidout, + ObjectIdGetDatum(index_oid))); + + /* + * Replace index name with its Oid::cstring; ATAddIndex() will use this + * OID for the primary key. + */ + def->arg = (Node*)makeString(oid_string); + + /* set index name in the statement, to affect the constraint name */ + cxt.pkey->idxname = pstrdup(strVal(llast(index_name_list))); + + pkey_oid = relationHasPrimaryKey(rel); + + if (pkey_oid != InvalidOid) + { + HeapTuple pkey_tuple; + HeapTuple index_tuple; + Form_pg_index pkey_form; + Form_pg_index index_form; + AlterTableCmd *at_cmd; + int2 i; + Relation pkey_rel; + + pkey_rel = relation_open(pkey_oid, AccessExclusiveLock); + + /* + * Check pg_class->relam to see if the index type match. As of now, only + * BTree indexes are used to implement primary keys, but it doesn't hurt + * to be future proof. + */ + if (pkey_rel->rd_rel->relam != index_rel->rd_rel->relam) + elog(ERROR, "index type of WITH INDEX argument and that of the primary key are not the same."); - We need decimal representation of OID since int32 to Oid mapping may not be future proof. - TODO check with -hackers + if (pkey_rel->rd_rel->relowner != index_rel->rd_rel->relowner) + elog(ERROR, "owner of WITH INDEX argument and that of the primary key are not same."); - */ + if (pkey_rel->rd_rel->relnamespace != index_rel->rd_rel->relnamespace) + elog(ERROR, "index in WITH INDEX argument is not in the correct namespace."); + + pkey_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(pkey_oid)); + if (!HeapTupleIsValid(pkey_tuple)) + elog(ERROR, "cache lookup failed for index %u", pkey_oid); + pkey_form = (Form_pg_index) GETSTRUCT(pkey_tuple); + + index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid)); + if (!HeapTupleIsValid(index_tuple)) + elog(ERROR, "cache lookup failed for index %u", index_oid); + index_form = (Form_pg_index) GETSTRUCT(index_tuple); + + if (!index_form->indisvalid) + elog(ERROR, "index mentioned in the WITH INDEX clause of primary key is not valid."); + + if (!index_form->indisready) + elog(ERROR, "index mentioned in the WITH INDEX clause of primary key is not ready."); + + if (!index_form->indisunique) + elog(ERROR, "index mentioned in the WITH INDEX clause of primary key is not a unique index."); + + /* TODO: Should we check for indcheckxmin ?*/ + + if (index_form->indrelid != pkey_form->indrelid) + elog(ERROR, "index mentioned in the WITH INDEX clause of primary key is not on the same table."); + + if (index_form->indnatts != pkey_form->indnatts) + elog(ERROR, "idefinition of index mentioned in the WITH INDEX clause does not match primary key."); + + /* + * Loop over each attribute in the primary key and see if it + * matches the attributes of the index we are replacing it with. + */ + for (i = 0; i < pkey_form->indnatts; i++) + { + /* We do not expect primary keys on expressions */ + Assert(pkey_form->indkey.values[i] != 0); + + if (pkey_form->indkey.values[i] != index_form->indkey.values[i]) + elog(ERROR, "idefinition of index mentioned in the WITH INDEX clause does not match primary key."); + + if (pkey_form->indclass.values[i] != index_form->indclass.values[i]) + elog(ERROR, "operator classes of index mentioned in the WITH INDEX clause does not match those of primary key."); + } + + ReleaseSysCache(pkey_tuple); + ReleaseSysCache(index_tuple); + + at_cmd = makeNode(AlterTableCmd); + at_cmd->subtype = AT_DropConstraint; + at_cmd->name = pstrdup(NameStr(pkey_rel->rd_rel->relname)); + at_cmd->behavior = DROP_CASCADE;; + at_cmd->missing_ok = true; + + newcmds = lappend(newcmds, at_cmd); + + /* Release the lock so that it can be dropped from cache. */ + relation_close(pkey_rel, AccessExclusiveLock); + } + + relation_close(index_rel, NoLock); + + /* do not break; at the end of the loop. Use this opprtunity to catch multiple 'WITH INDEX' clauses*/ + } /* * Push any index-creation commands into the ALTER, so that they can be