diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 609329bb21..d0bc120ecc 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -382,7 +382,33 @@ static relopt_int intRelOpts[] = }, -1, 0, 1024 }, - + { + { + "compresslevel", + "Level of page compression.", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE | RELOPT_KIND_HASH | RELOPT_KIND_GIN | RELOPT_KIND_GIST | RELOPT_KIND_SPGIST, + ShareUpdateExclusiveLock + }, + 0, -127, 127 + }, + { + { + "compress_chunk_size", + "Size of chunk to store compressed page.", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE | RELOPT_KIND_HASH | RELOPT_KIND_GIN | RELOPT_KIND_GIST | RELOPT_KIND_SPGIST, + AccessExclusiveLock + }, + BLCKSZ / 2, BLCKSZ / 16, BLCKSZ / 2 + }, + { + { + "compress_prealloc_chunks", + "Number of prealloced chunks for each block.", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE | RELOPT_KIND_HASH | RELOPT_KIND_GIN | RELOPT_KIND_GIST | RELOPT_KIND_SPGIST, + ShareUpdateExclusiveLock + }, + 0, 0, 15 + }, /* list terminator */ {{NULL}} }; @@ -507,6 +533,17 @@ static relopt_enum_elt_def viewCheckOptValues[] = {(const char *) NULL} /* list terminator */ }; +/* values from compressTypeOption */ +relopt_enum_elt_def compressTypeOptValues[] = +{ + /* no value for NOT_SET */ + {"none", PAGE_COMPRESSION_NONE}, + {"pglz", PAGE_COMPRESSION_PGLZ}, + {"lz4", PAGE_COMPRESSION_LZ4}, + {"zstd", PAGE_COMPRESSION_ZSTD}, + {(const char *) NULL} /* list terminator */ +}; + static relopt_enum enumRelOpts[] = { { @@ -542,6 +579,17 @@ static relopt_enum enumRelOpts[] = VIEW_OPTION_CHECK_OPTION_NOT_SET, gettext_noop("Valid values are \"local\" and \"cascaded\".") }, + { + { + "compresstype", + "Compression type (none, pglz, lz4 or zstd).", + RELOPT_KIND_HEAP | RELOPT_KIND_BTREE | RELOPT_KIND_HASH | RELOPT_KIND_GIN | RELOPT_KIND_GIST | RELOPT_KIND_SPGIST, + AccessExclusiveLock + }, + compressTypeOptValues, + PAGE_COMPRESSION_NONE, + gettext_noop("Valid values are \"none\", \"pglz\", \"lz4\" and \"zstd\".") + }, /* list terminator */ {{NULL}} }; @@ -1882,7 +1930,15 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind) {"vacuum_index_cleanup", RELOPT_TYPE_ENUM, offsetof(StdRdOptions, vacuum_index_cleanup)}, {"vacuum_truncate", RELOPT_TYPE_BOOL, - offsetof(StdRdOptions, vacuum_truncate)} + offsetof(StdRdOptions, vacuum_truncate)}, + {"compresstype", RELOPT_TYPE_ENUM, + offsetof(StdRdOptions, compress) + offsetof(PageCompressOpts, compresstype)}, + {"compresslevel", RELOPT_TYPE_INT, + offsetof(StdRdOptions, compress) + offsetof(PageCompressOpts, compresslevel)}, + {"compress_chunk_size", RELOPT_TYPE_INT, + offsetof(StdRdOptions, compress) + offsetof(PageCompressOpts, compress_chunk_size)}, + {"compress_prealloc_chunks", RELOPT_TYPE_INT, + offsetof(StdRdOptions, compress) + offsetof(PageCompressOpts, compress_prealloc_chunks)} }; return (bytea *) build_reloptions(reloptions, validate, kind, diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c index 6df7f2eaeb..523c18e149 100644 --- a/src/backend/access/gin/ginutil.c +++ b/src/backend/access/gin/ginutil.c @@ -610,7 +610,15 @@ ginoptions(Datum reloptions, bool validate) static const relopt_parse_elt tab[] = { {"fastupdate", RELOPT_TYPE_BOOL, offsetof(GinOptions, useFastUpdate)}, {"gin_pending_list_limit", RELOPT_TYPE_INT, offsetof(GinOptions, - pendingListCleanupSize)} + pendingListCleanupSize)}, + {"compresstype", RELOPT_TYPE_ENUM, + offsetof(GinOptions, compress) + offsetof(PageCompressOpts, compresstype)}, + {"compresslevel", RELOPT_TYPE_INT, + offsetof(GinOptions, compress) + offsetof(PageCompressOpts, compresslevel)}, + {"compress_chunk_size", RELOPT_TYPE_INT, + offsetof(GinOptions, compress) + offsetof(PageCompressOpts, compress_chunk_size)}, + {"compress_prealloc_chunks", RELOPT_TYPE_INT, + offsetof(GinOptions, compress) + offsetof(PageCompressOpts, compress_prealloc_chunks)} }; return (bytea *) build_reloptions(reloptions, validate, diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index d4bf0c7563..cc255024ab 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -922,7 +922,15 @@ gistoptions(Datum reloptions, bool validate) { static const relopt_parse_elt tab[] = { {"fillfactor", RELOPT_TYPE_INT, offsetof(GiSTOptions, fillfactor)}, - {"buffering", RELOPT_TYPE_ENUM, offsetof(GiSTOptions, buffering_mode)} + {"buffering", RELOPT_TYPE_ENUM, offsetof(GiSTOptions, buffering_mode)}, + {"compresstype", RELOPT_TYPE_ENUM, + offsetof(GiSTOptions, compress) + offsetof(PageCompressOpts, compresstype)}, + {"compresslevel", RELOPT_TYPE_INT, + offsetof(GiSTOptions, compress) + offsetof(PageCompressOpts, compresslevel)}, + {"compress_chunk_size", RELOPT_TYPE_INT, + offsetof(GiSTOptions, compress) + offsetof(PageCompressOpts, compress_chunk_size)}, + {"compress_prealloc_chunks", RELOPT_TYPE_INT, + offsetof(GiSTOptions, compress) + offsetof(PageCompressOpts, compress_prealloc_chunks)} }; return (bytea *) build_reloptions(reloptions, validate, diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c index 32822dbb6b..c6362d0a2b 100644 --- a/src/backend/access/hash/hashutil.c +++ b/src/backend/access/hash/hashutil.c @@ -277,6 +277,14 @@ hashoptions(Datum reloptions, bool validate) { static const relopt_parse_elt tab[] = { {"fillfactor", RELOPT_TYPE_INT, offsetof(HashOptions, fillfactor)}, + {"compresstype", RELOPT_TYPE_ENUM, + offsetof(HashOptions, compress) + offsetof(PageCompressOpts, compresstype)}, + {"compresslevel", RELOPT_TYPE_INT, + offsetof(HashOptions, compress) + offsetof(PageCompressOpts, compresslevel)}, + {"compress_chunk_size", RELOPT_TYPE_INT, + offsetof(HashOptions, compress) + offsetof(PageCompressOpts, compress_chunk_size)}, + {"compress_prealloc_chunks", RELOPT_TYPE_INT, + offsetof(HashOptions, compress) + offsetof(PageCompressOpts, compress_prealloc_chunks)} }; return (bytea *) build_reloptions(reloptions, validate, diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index b802ed247e..96ec5dda3f 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -59,6 +59,7 @@ #include "storage/bufmgr.h" #include "storage/freespace.h" #include "storage/lmgr.h" +#include "storage/page_compression.h" #include "tcop/tcopprot.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -444,7 +445,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, vacrel->consider_bypass_optimization = true; vacrel->do_index_vacuuming = true; vacrel->do_index_cleanup = true; - vacrel->do_rel_truncate = (params->truncate != VACOPTVALUE_DISABLED); + vacrel->do_rel_truncate = (params->truncate != VACOPTVALUE_DISABLED) && + (rel->rd_locator.compressOpt.algorithm == PAGE_COMPRESSION_NONE); if (params->index_cleanup == VACOPTVALUE_DISABLED) { /* Force disable index vacuuming up-front */ @@ -2807,7 +2809,7 @@ lazy_cleanup_one_index(Relation indrel, IndexBulkDeleteResult *istat, */ static bool should_attempt_truncation(LVRelState *vacrel) -{ +{ BlockNumber possibly_freeable; if (!vacrel->do_rel_truncate || vacrel->failsafe_active || diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index ff260c393a..f8f3555d9e 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -2115,7 +2115,15 @@ btoptions(Datum reloptions, bool validate) {"vacuum_cleanup_index_scale_factor", RELOPT_TYPE_REAL, offsetof(BTOptions, vacuum_cleanup_index_scale_factor)}, {"deduplicate_items", RELOPT_TYPE_BOOL, - offsetof(BTOptions, deduplicate_items)} + offsetof(BTOptions, deduplicate_items)}, + {"compresstype", RELOPT_TYPE_ENUM, + offsetof(BTOptions, compress) + offsetof(PageCompressOpts, compresstype)}, + {"compresslevel", RELOPT_TYPE_INT, + offsetof(BTOptions, compress) + offsetof(PageCompressOpts, compresslevel)}, + {"compress_chunk_size", RELOPT_TYPE_INT, + offsetof(BTOptions, compress) + offsetof(PageCompressOpts, compress_chunk_size)}, + {"compress_prealloc_chunks", RELOPT_TYPE_INT, + offsetof(BTOptions, compress) + offsetof(PageCompressOpts, compress_prealloc_chunks)} }; return (bytea *) build_reloptions(reloptions, validate, diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index 2c661fcf96..b0a036f9f5 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -740,6 +740,14 @@ spgoptions(Datum reloptions, bool validate) { static const relopt_parse_elt tab[] = { {"fillfactor", RELOPT_TYPE_INT, offsetof(SpGistOptions, fillfactor)}, + {"compresstype", RELOPT_TYPE_ENUM, + offsetof(SpGistOptions, compress) + offsetof(PageCompressOpts, compresstype)}, + {"compresslevel", RELOPT_TYPE_INT, + offsetof(SpGistOptions, compress) + offsetof(PageCompressOpts, compresslevel)}, + {"compress_chunk_size", RELOPT_TYPE_INT, + offsetof(SpGistOptions, compress) + offsetof(PageCompressOpts, compress_chunk_size)}, + {"compress_prealloc_chunks", RELOPT_TYPE_INT, + offsetof(SpGistOptions, compress) + offsetof(PageCompressOpts, compress_prealloc_chunks)} }; return (bytea *) build_reloptions(reloptions, validate, diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c index 87d14210be..1c10a7a464 100644 --- a/src/backend/access/transam/xlogprefetcher.c +++ b/src/backend/access/transam/xlogprefetcher.c @@ -365,7 +365,7 @@ XLogPrefetcherAllocate(XLogReaderState *reader) { XLogPrefetcher *prefetcher; static HASHCTL hash_table_ctl = { - .keysize = sizeof(RelFileLocator), + .keysize = offsetof(RelFileLocator, compressOpt), .entrysize = sizeof(XLogPrefetcherFilter) }; @@ -869,6 +869,7 @@ XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, /* * Don't allow any prefetching of this block or higher until replayed. */ + filter->rlocator.compressOpt = rlocator.compressOpt; filter->filter_until_replayed = lsn; filter->filter_from_block = blockno; dlist_push_head(&prefetcher->filter_queue, &filter->link); diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index 7d7655d295..79cbc3966c 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -210,6 +210,7 @@ Boot_CreateStmt: RELPERSISTENCE_PERMANENT, shared_relation, mapped_relation, + (Datum) 0, true, &relfrozenxid, &relminmxid, diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 9b03579e6e..f6eb82e58f 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -296,6 +296,7 @@ heap_create(const char *relname, char relpersistence, bool shared_relation, bool mapped_relation, + Datum reloptions, bool allow_system_table_mods, TransactionId *relfrozenxid, MultiXactId *relminmxid, @@ -373,7 +374,8 @@ heap_create(const char *relname, shared_relation, mapped_relation, relpersistence, - relkind); + relkind, + reloptions); /* * Have the storage manager create the relation's disk file, if needed. @@ -1280,6 +1282,7 @@ heap_create_with_catalog(const char *relname, relpersistence, shared_relation, mapped_relation, + reloptions, allow_system_table_mods, &relfrozenxid, &relminmxid, diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index d7192f35e3..7a6f414bda 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -956,6 +956,7 @@ index_create(Relation heapRelation, relpersistence, shared_relation, mapped_relation, + reloptions, allow_system_table_mods, &relfrozenxid, &relminmxid, diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c index d708af19ed..751320d749 100644 --- a/src/backend/catalog/storage.c +++ b/src/backend/catalog/storage.c @@ -91,7 +91,7 @@ AddPendingSync(const RelFileLocator *rlocator) { HASHCTL ctl; - ctl.keysize = sizeof(RelFileLocator); + ctl.keysize = offsetof(RelFileLocator, compressOpt); ctl.entrysize = sizeof(PendingRelSync); ctl.hcxt = TopTransactionContext; pendingSyncHash = hash_create("pending sync hash", 16, &ctl, @@ -100,6 +100,7 @@ AddPendingSync(const RelFileLocator *rlocator) pending = hash_search(pendingSyncHash, rlocator, HASH_ENTER, &found); Assert(!found); + pending->rlocator.compressOpt = rlocator->compressOpt; pending->is_truncated = false; } @@ -588,7 +589,7 @@ SerializePendingSyncs(Size maxSize, char *startAddress) goto terminate; /* Create temporary hash to collect active relfilelocators */ - ctl.keysize = sizeof(RelFileLocator); + ctl.keysize = offsetof(RelFileLocator, compressOpt); ctl.entrysize = sizeof(RelFileLocator); ctl.hcxt = CurrentMemoryContext; tmphash = hash_create("tmp relfilelocators", @@ -598,7 +599,11 @@ SerializePendingSyncs(Size maxSize, char *startAddress) /* collect all rlocator from pending syncs */ hash_seq_init(&scan, pendingSyncHash); while ((sync = (PendingRelSync *) hash_seq_search(&scan))) - (void) hash_search(tmphash, &sync->rlocator, HASH_ENTER, NULL); + { + RelFileLocator *rlocator; + rlocator = hash_search(tmphash, &sync->rlocator, HASH_ENTER, NULL); + rlocator->compressOpt = sync->rlocator.compressOpt; + } /* remove deleted rnodes */ for (delete = pendingDeletes; delete != NULL; delete = delete->next) diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 099d369b2f..dbecdcbbe3 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -27,7 +27,9 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "access/multixact.h" +#include "access/reloptions.h" #include "access/tableam.h" +#include "access/tupdesc_details.h" #include "access/xact.h" #include "access/xloginsert.h" #include "access/xlogutils.h" @@ -41,6 +43,7 @@ #include "catalog/pg_db_role_setting.h" #include "catalog/pg_subscription.h" #include "catalog/pg_tablespace.h" +#include "catalog/schemapg.h" #include "commands/comment.h" #include "commands/dbcommands.h" #include "commands/dbcommands_xlog.h" @@ -106,6 +109,10 @@ typedef struct CreateDBRelInfo bool permanent; /* relation is permanent or unlogged */ } CreateDBRelInfo; +/* + * hardcoded tuple descriptors, contents generated by genbki.pl + */ +static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class}; /* non-export function prototypes */ static void createdb_failure_callback(int code, Datum arg); @@ -122,6 +129,7 @@ static bool have_createdb_privilege(void); static void remove_dbtablespaces(Oid db_id); static bool check_db_file_conflict(Oid db_id); static int errdetail_busy_db(int notherbackends, int npreparedxacts); +static StdRdOptions *parseRelCompressOptions(HeapTuple tuple); static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dboid, Oid src_tsid, Oid dst_tsid); static List *ScanSourceDatabasePgClass(Oid srctbid, Oid srcdbid, char *srcpath); @@ -136,6 +144,61 @@ static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dboid, Oid src_tsid, Oid dst_tsid); + +static StdRdOptions * +parseRelCompressOptions(HeapTuple tuple) +{ + static TupleDesc pgclassdesc = NULL; + + StdRdOptions *options; + bool isnull; + Datum datum; + Form_pg_class classForm; + + /* Already done? */ + if (pgclassdesc == NULL) + { + pgclassdesc = CreateTemplateTupleDesc(Natts_pg_class); + pgclassdesc->tdtypeid = RECORDOID; /* not right, but we don't care */ + pgclassdesc->tdtypmod = -1; + + for (int i = 0; i < Natts_pg_class; i++) + { + memcpy(TupleDescAttr(pgclassdesc, i), &Desc_pg_class[i], ATTRIBUTE_FIXED_PART_SIZE); + /* make sure attcacheoff is valid */ + TupleDescAttr(pgclassdesc, i)->attcacheoff = -1; + } + + /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */ + TupleDescAttr(pgclassdesc, 0)->attcacheoff = 0; + } + + + datum = fastgetattr(tuple, + Anum_pg_class_reloptions, + pgclassdesc, + &isnull); + if (isnull) + return NULL; + + classForm = (Form_pg_class) GETSTRUCT(tuple); + + /* Parse into appropriate format; don't error out here */ + switch (classForm->relkind) + { + case RELKIND_RELATION: + case RELKIND_INDEX: + case RELKIND_MATVIEW: + options = (StdRdOptions *) default_reloptions(datum, false, RELOPT_KIND_HEAP); + break; + default: + options = NULL; /* keep compiler quiet */ + break; + } + + return options; +} + /* * Create a new database using the WAL_LOG strategy. * @@ -152,7 +215,7 @@ CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, LockRelId srcrelid; LockRelId dstrelid; RelFileLocator srcrlocator; - RelFileLocator dstrlocator; + RelFileLocator dstrlocator = {0}; CreateDBRelInfo *relinfo; /* Get source and destination database paths. */ @@ -195,6 +258,7 @@ CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, dstrlocator.dbOid = dst_dboid; dstrlocator.relNumber = srcrlocator.relNumber; + dstrlocator.compressOpt = srcrlocator.compressOpt; /* * Acquire locks on source and target relations before copying. @@ -246,7 +310,7 @@ CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, static List * ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath) { - RelFileLocator rlocator; + RelFileLocator rlocator = {0}; BlockNumber nblocks; BlockNumber blkno; Buffer buf; @@ -397,6 +461,7 @@ ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid, { CreateDBRelInfo *relinfo; Form_pg_class classForm; + StdRdOptions *options; Oid relfilenumber = InvalidRelFileNumber; classForm = (Form_pg_class) GETSTRUCT(tuple); @@ -433,7 +498,7 @@ ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid, classForm->oid); /* Prepare a rel info element and add it to the list. */ - relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo)); + relinfo = (CreateDBRelInfo *) palloc0(sizeof(CreateDBRelInfo)); if (OidIsValid(classForm->reltablespace)) relinfo->rlocator.spcOid = classForm->reltablespace; else @@ -443,6 +508,16 @@ ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid, relinfo->rlocator.relNumber = relfilenumber; relinfo->reloid = classForm->oid; + /* parse compression options */ + options = parseRelCompressOptions(tuple); + if(options) + { + relinfo->rlocator.compressOpt.algorithm = options->compress.compresstype; + relinfo->rlocator.compressOpt.level = options->compress.compresslevel; + relinfo->rlocator.compressOpt.chunks_pre_block = BLCKSZ / options->compress.compress_chunk_size; + relinfo->rlocator.compressOpt.prealloc_chunks = (uint8) options->compress.compress_prealloc_chunks; + } + /* Temporary relations were rejected above. */ Assert(classForm->relpersistence != RELPERSISTENCE_TEMP); relinfo->permanent = diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 7fbee0c1f7..74ea7328ad 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -16,11 +16,16 @@ #include "access/attmap.h" #include "access/genam.h" +#include "access/gin_private.h" +#include "access/gist_private.h" +#include "access/hash.h" #include "access/heapam.h" #include "access/heapam_xlog.h" #include "access/multixact.h" +#include "access/nbtree.h" #include "access/reloptions.h" #include "access/relscan.h" +#include "access/spgist_private.h" #include "access/sysattr.h" #include "access/tableam.h" #include "access/toast_compression.h" @@ -14107,6 +14112,8 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, Datum repl_val[Natts_pg_class]; bool repl_null[Natts_pg_class]; bool repl_repl[Natts_pg_class]; + bytea *byteaOpts; + PageCompressOpts *newPcOpts = NULL; static char *validnsps[] = HEAP_RELOPT_NAMESPACES; if (defList == NIL && operation != AT_ReplaceRelOptions) @@ -14147,7 +14154,9 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, case RELKIND_RELATION: case RELKIND_TOASTVALUE: case RELKIND_MATVIEW: - (void) heap_reloptions(rel->rd_rel->relkind, newOptions, true); + byteaOpts = heap_reloptions(rel->rd_rel->relkind, newOptions, true); + if(byteaOpts) + newPcOpts = &((StdRdOptions *)byteaOpts)->compress; break; case RELKIND_PARTITIONED_TABLE: (void) partitioned_table_reloptions(newOptions, true); @@ -14157,7 +14166,35 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, break; case RELKIND_INDEX: case RELKIND_PARTITIONED_INDEX: - (void) index_reloptions(rel->rd_indam->amoptions, newOptions, true); + byteaOpts = index_reloptions(rel->rd_indam->amoptions, newOptions, true); + if(byteaOpts) + { + switch(rel->rd_rel->relam) + { + case BTREE_AM_OID: + newPcOpts = &((BTOptions *)byteaOpts)->compress; + break; + + case HASH_AM_OID: + newPcOpts = &((HashOptions *)byteaOpts)->compress; + break; + + case GIN_AM_OID: + newPcOpts = &((GinOptions *)byteaOpts)->compress; + break; + + case GIST_AM_OID: + newPcOpts = &((GiSTOptions *)byteaOpts)->compress; + break; + + case SPGIST_AM_OID: + newPcOpts = &((SpGistOptions *)byteaOpts)->compress; + break; + + default: + break; + } + } break; default: ereport(ERROR, @@ -14201,6 +14238,26 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation, } } + /* check if changing page compression store format */ + if(newPcOpts != NULL) + { + if(newPcOpts->compresstype != rel->rd_locator.compressOpt.algorithm) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("change compresstype parameter is not supported"))); + + if(rel->rd_locator.compressOpt.algorithm != PAGE_COMPRESSION_NONE && + newPcOpts->compress_chunk_size != BLCKSZ / rel->rd_locator.compressOpt.chunks_pre_block) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("change compress_chunk_size parameter is not supported"))); + }else{ + if(rel->rd_locator.compressOpt.algorithm != PAGE_COMPRESSION_NONE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("change compresstype parameter is not supported"))); + } + /* * All we need do here is update the pg_class row; the new options will be * propagated into relcaches during post-commit cache inval. diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 637c0ce459..d478b465b3 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -38,6 +38,7 @@ #include "storage/dsm_impl.h" #include "storage/fd.h" #include "storage/ipc.h" +#include "storage/page_compression.h" #include "storage/reinit.h" #include "utils/builtins.h" #include "utils/ps_status.h" @@ -93,6 +94,8 @@ static void perform_base_backup(basebackup_options *opt, bbsink *sink); static void parse_basebackup_options(List *options, basebackup_options *opt); static int compareWalFileNames(const ListCell *a, const ListCell *b); static bool is_checksummed_file(const char *fullpath, const char *filename); +static bool is_compressed_datafile(const char *fullpath, const char *filename); +static int get_chunk_size_of_compressed_datafile(const char *pcd_filepath); static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset, const char *filename, bool partial_read_ok); @@ -1448,6 +1451,7 @@ is_checksummed_file(const char *fullpath, const char *filename) strncmp(fullpath, "/", 1) == 0) { int excludeIdx; + size_t filenameLen; /* Compare file against noChecksumFiles skip list */ for (excludeIdx = 0; noChecksumFiles[excludeIdx].name != NULL; excludeIdx++) @@ -1461,12 +1465,99 @@ is_checksummed_file(const char *fullpath, const char *filename) return false; } + /* + * Skip checksum check of compressed relation files. Compressed page + * may be stored in multiple non-continuous chunks, and cannot perform + * checksum while transferring data. + */ + filenameLen = strlen(filename); + if (filenameLen >= 4) + { + if (strncmp(filename + filenameLen - 4, "_pca", 4) == 0) + return false; + } + return true; } else return false; } +/* + * Check if the file is a page compression data file. + */ +static bool +is_compressed_datafile(const char *fullpath, const char *filename) +{ + /* Check that the file is in a tablespace */ + if (strncmp(fullpath, "./global/", 9) == 0 || + strncmp(fullpath, "./base/", 7) == 0 || + strncmp(fullpath, "/", 1) == 0) + { + size_t filenameLen; + + filenameLen = strlen(filename); + if (filenameLen >= 4) + { + if (strncmp(filename + filenameLen - 4, "_pcd", 4) == 0) + return true; + } + + return false; + } + else + return false; +} + +/** + * Get chunk size of a page compression data file, + * and return -1 if failed. +*/ +static int +get_chunk_size_of_compressed_datafile(const char *pcd_filepath) +{ + int fd; + int read_cnt; + size_t filepath_len; + char pca_filepath[MAXPGPATH]; + PageCompressHeader pcheader; + + /* setup pca file path */ + filepath_len = strlen(pcd_filepath); + memcpy(pca_filepath, pcd_filepath, filepath_len + 1); + pca_filepath[filepath_len - 1] = 'a'; + + /* get chunk size from pca file */ + fd = OpenTransientFile(pca_filepath, O_RDONLY | PG_BINARY); + if (fd < 0) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", pca_filepath))); + return -1; + } + + read_cnt = basebackup_read_file(fd, + (char *) &pcheader, + sizeof(PageCompressHeader), + 0, + pca_filepath, + true); + CloseTransientFile(fd); + + if (read_cnt != sizeof(PageCompressHeader)) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": read only %d of %zu bytes", + pca_filepath, read_cnt, sizeof(PageCompressHeader)))); + return -1; + } + + return pcheader.chunk_size; +} + + /***** * Functions for handling tar file format * @@ -1504,6 +1595,9 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, char *segmentpath; bool verify_checksum = false; pg_checksum_context checksum_ctx; + bool verify_pcd_checksum = false; + int chunk_size = 0; + uint32 chunkno = 1; if (pg_checksum_init(&checksum_ctx, manifest->checksum_type) < 0) elog(ERROR, "could not initialize checksum of file \"%s\"", @@ -1549,6 +1643,23 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, (errmsg("invalid segment number %d in file \"%s\"", segmentno, filename))); } + + /* Check if this's compressed data file */ + if (is_compressed_datafile(readfilename, filename)) + { + chunk_size = get_chunk_size_of_compressed_datafile(readfilename); + if (IsValidPageCompressChunkSize(chunk_size)) + { + verify_pcd_checksum = true; + } + else + { + ereport(WARNING, + (errmsg("could not verify checksum in file \"%s\"" + ": invalid chunk size %d", + readfilename, chunk_size))); + } + } } } @@ -1575,25 +1686,124 @@ sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, if (cnt == 0) break; - /* - * The checksums are verified at block level, so we iterate over the - * buffer in chunks of BLCKSZ, after making sure that - * TAR_SEND_SIZE/buf is divisible by BLCKSZ and we read a multiple of - * BLCKSZ bytes. - */ - Assert((sink->bbs_buffer_length % BLCKSZ) == 0); - - if (verify_checksum && (cnt % BLCKSZ != 0)) + if (verify_pcd_checksum) { - ereport(WARNING, - (errmsg("could not verify checksum in file \"%s\", block " - "%u: read buffer size %d and page size %d " - "differ", - readfilename, blkno, (int) cnt, BLCKSZ))); - verify_checksum = false; + /* + * The checksums are verified at compressed chunk level, so we + * iterate over the buffer in chunks of chunk_size, after making + * sure that TAR_SEND_SIZE/buf is divisible by chunk_size and we + * read a multiple of chunk_size bytes. + */ + Assert((sink->bbs_buffer_length % chunk_size) == 0); + + if (cnt % chunk_size != 0) + { + ereport(WARNING, + (errmsg("could not verify checksum in file \"%s\", chunk " + "%u: read buffer size %d and chunk size %d " + "differ", + readfilename, chunkno, (int) cnt, chunk_size))); + verify_pcd_checksum = false; + verify_checksum = false; + } + } + else if (verify_checksum) + { + /* + * The checksums are verified at block level, so we iterate over + * the buffer in chunks of BLCKSZ, after making sure that + * TAR_SEND_SIZE/buf is divisible by BLCKSZ and we read a multiple + * of BLCKSZ bytes. + */ + Assert((sink->bbs_buffer_length % BLCKSZ) == 0); + + if (verify_checksum && (cnt % BLCKSZ != 0)) + { + ereport(WARNING, + (errmsg("could not verify checksum in file \"%s\", block " + "%u: read buffer size %d and page size %d " + "differ", + readfilename, blkno, (int) cnt, BLCKSZ))); + verify_checksum = false; + } } - if (verify_checksum) + if (verify_pcd_checksum) + { + bool chunk_retry = false; + + for (i = 0; i < cnt / chunk_size; i++) + { + char *chunk; + PageCompressChunk *pcchunk;; + + chunk = sink->bbs_buffer + chunk_size * i; + pcchunk = (PageCompressChunk *) chunk; + checksum = pg_checksum_compress_chunk(chunk, chunk_size, chunkno); + if (pcchunk->checksum != checksum) + { + /* + * Retry the chunk on the first failure. It's possible + * that we read the part of the chunk just before postgres + * updated the entire chunk so it ends up looking torn to + * us. We only need to retry once because the LSN should + * be updated to something we can ignore on the next pass. + * If the error happens again then it is a true validation + * failure. + */ + if (chunk_retry == false) + { + int reread_cnt; + + /* Reread the failed chunk */ + reread_cnt = + basebackup_read_file(fd, + sink->bbs_buffer + chunk_size * i, + chunk_size, len + chunk_size * i, + readfilename, + false); + if (reread_cnt == 0) + { + /* + * If we hit end-of-file, a concurrent truncation + * must have occurred, so break out of this loop + * just as if the initial fread() returned 0. + * We'll drop through to the same code that + * handles that case. (We must fix up cnt first, + * though.) + */ + cnt = chunk_size * i; + break; + } + + /* Set flag so we know a retry was attempted */ + chunk_retry = true; + + /* Reset loop to validate the block again */ + i--; + continue; + } + + checksum_failures++; + + if (checksum_failures <= 5) + ereport(WARNING, + (errmsg("checksum verification failed in " + "file \"%s\", chunk %u, blockno %u, chunkseq %u: calculated " + "%X but expected %X", + readfilename, chunkno, pcchunk->blockno, pcchunk->chunkseq, + checksum, pcchunk->checksum))); + if (checksum_failures == 5) + ereport(WARNING, + (errmsg("further checksum verification " + "failures in file \"%s\" will not " + "be reported", readfilename))); + } + chunk_retry = false; + chunkno++; + } + } + else if (verify_checksum) { for (i = 0; i < cnt / BLCKSZ; i++) { diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index f904f60c08..503492eb82 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -89,6 +89,7 @@ #include "access/xact.h" #include "access/xlog.h" +#include "access/xlogutils.h" #include "catalog/pg_tablespace.h" #include "common/file_perm.h" #include "common/file_utils.h" @@ -100,6 +101,7 @@ #include "postmaster/startup.h" #include "storage/fd.h" #include "storage/ipc.h" +#include "storage/page_compression.h" #include "utils/guc.h" #include "utils/resowner_private.h" @@ -204,6 +206,8 @@ typedef struct vfd /* NB: fileName is malloc'd, and must be free'd when closing the VFD */ int fileFlags; /* open(2) flags for (re)opening the file */ mode_t fileMode; /* mode to pass to open(2) */ + bool with_pcmap; /* is page compression relation */ + PageCompressHeader *pcmap; /* memory map of page compression address file */ } Vfd; /* @@ -1223,6 +1227,17 @@ LruDelete(File file) vfdP = &VfdCache[file]; + if (vfdP->with_pcmap && vfdP->pcmap != NULL) + { + if (pc_munmap(vfdP->pcmap) != 0) + ereport(vfdP->fdstate & FD_TEMP_FILE_LIMIT ? LOG : data_sync_elevel(LOG), + (errcode_for_dynamic_shared_memory(), + errmsg("could not munmap file \"%s\": %m", + vfdP->fileName))); + + vfdP->pcmap = NULL; + } + /* * Close the file. We aren't expecting this to fail; if it does, better * to leak the FD than to mess up our internal state. @@ -1559,6 +1574,8 @@ PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode) vfdP->fileSize = 0; vfdP->fdstate = 0x0; vfdP->resowner = NULL; + vfdP->with_pcmap = false; + vfdP->pcmap = NULL; Insert(file); @@ -1908,6 +1925,18 @@ FileClose(File file) if (!FileIsNotOpen(file)) { + /* close the pcmap */ + if (vfdP->with_pcmap && vfdP->pcmap != NULL) + { + if (pc_munmap(vfdP->pcmap) != 0) + ereport(vfdP->fdstate & FD_TEMP_FILE_LIMIT ? LOG : data_sync_elevel(LOG), + (errcode_for_dynamic_shared_memory(), + errmsg("could not munmap file \"%s\": %m", + vfdP->fileName))); + + vfdP->pcmap = NULL; + } + /* close the file */ if (close(vfdP->fd) != 0) { @@ -2266,6 +2295,105 @@ FileTruncate(File file, off_t offset, uint32 wait_event_info) return returnCode; } +/* + * initialize page compression memory map. + * While file_pcd is valid, will try to fix pca file in recovery mode. + */ +void +SetupPageCompressMemoryMap(File file, File file_pcd, int chunk_size, uint8 algorithm) +{ + int returnCode; + Vfd *vfdP; + PageCompressHeader *map; + + Assert(FileIsValid(file)); + + vfdP = &VfdCache[file]; + + returnCode = FileAccess(file); + if (returnCode < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + vfdP->fileName))); + + map = pc_mmap(vfdP->fd, chunk_size, false); + if (map == MAP_FAILED) + ereport(ERROR, + (errcode_for_dynamic_shared_memory(), + errmsg("could not mmap file \"%s\": %m", + vfdP->fileName))); + + /* initialize page compression header */ + if (map->chunk_size == 0 && map->algorithm == 0) + { + map->chunk_size = chunk_size; + map->algorithm = algorithm; + + if (pc_msync(map) != 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_dynamic_shared_memory(), + errmsg("could not msync file \"%s\": %m", + vfdP->fileName))); + } + + if (InRecovery && FileIsValid(file_pcd)) + { + Vfd *vfdP_pcd = &VfdCache[file_pcd]; + + returnCode = FileAccess(file_pcd); + if (returnCode < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + vfdP->fileName))); + + check_and_repair_compress_address(map, chunk_size, algorithm, vfdP->fileName, vfdP_pcd->fd, vfdP_pcd->fileName); + } + + vfdP->with_pcmap = true; + vfdP->pcmap = map; +} + +/* + * Returns the page compression memory map. + * + */ +void * +GetPageCompressMemoryMap(File file, int chunk_size) +{ + int returnCode; + Vfd *vfdP; + PageCompressHeader *map; + + Assert(FileIsValid(file)); + + vfdP = &VfdCache[file]; + + returnCode = FileAccess(file); + if (returnCode < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + vfdP->fileName))); + + Assert(vfdP->with_pcmap); + + if (vfdP->pcmap == NULL) + { + map = pc_mmap(vfdP->fd, chunk_size, false); + if (map == MAP_FAILED) + ereport(ERROR, + (errcode_for_dynamic_shared_memory(), + errmsg("could not mmap file \"%s\": %m", + vfdP->fileName))); + + vfdP->pcmap = map; + } + + return vfdP->pcmap; +} + /* * Return the pathname associated with an open file. * diff --git a/src/backend/storage/smgr/Makefile b/src/backend/storage/smgr/Makefile index 596b564656..ba7a9aad35 100644 --- a/src/backend/storage/smgr/Makefile +++ b/src/backend/storage/smgr/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ md.o \ - smgr.o + smgr.o \ + page_compression.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 3998296a62..bfb933e617 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -24,6 +24,7 @@ #include #include #include +#include #include "access/xlog.h" #include "access/xlogutils.h" @@ -33,8 +34,10 @@ #include "pgstat.h" #include "postmaster/bgwriter.h" #include "storage/bufmgr.h" +#include "storage/checksum.h" #include "storage/fd.h" #include "storage/md.h" +#include "storage/page_compression.h" #include "storage/relfilelocator.h" #include "storage/smgr.h" #include "storage/sync.h" @@ -82,6 +85,10 @@ typedef struct _MdfdVec { File mdfd_vfd; /* fd number in fd.c's pool */ + File mdfd_vfd_pca; /* page compression address file 's fd number + * in fd.c's pool */ + File mdfd_vfd_pcd; /* page compression data file's fd number in + * fd.c's pool */ BlockNumber mdfd_segno; /* segment number, from 0 */ } MdfdVec; @@ -119,6 +126,13 @@ static MemoryContext MdCxt; /* context for all MdfdVec objects */ /* don't try to open a segment, if not already open */ #define EXTENSION_DONT_OPEN (1 << 5) +#define IS_COMPRESSED_MAINFORK(reln, forkNum) \ + (reln->smgr_rlocator.locator.compressOpt.algorithm != PAGE_COMPRESSION_NONE && forkNum == MAIN_FORKNUM) + +#define PAGE_COMPRESS_ALGORITHM(reln) (reln->smgr_rlocator.locator.compressOpt.algorithm) +#define PAGE_COMPRESS_LEVEL(reln) (reln->smgr_rlocator.locator.compressOpt.level) +#define PAGE_COMPRESS_CHUNK_SIZE(reln) (BLCKSZ / reln->smgr_rlocator.locator.compressOpt.chunks_pre_block) +#define PAGE_COMPRESS_PREALLOC_CHUNKS(reln) (reln->smgr_rlocator.locator.compressOpt.prealloc_chunks) /* local routines */ static void mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forkNum, @@ -142,6 +156,17 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno, static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg); +static int sync_pcmap(PageCompressHeader *pcmap, uint32 wait_event_info); +static void build_compression_chunks(char *data, int size, char *chunk_buf, int total_chunks, + PageCompressAddr *pcaddr, int chunk_size, + BlockNumber blocknum); +static void mdextend_pc(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer, bool skipFsync); + +static void mdread_pc(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer); +static void mdwrite_pc(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer, bool skipFsync); /* * mdinit() -- Initialize private state for magnetic disk storage manager. @@ -183,7 +208,10 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo) { MdfdVec *mdfd; char *path; - File fd; + char *pcfile_path; + File fd, + fd_pca, + fd_pcd; if (isRedo && reln->md_num_open_segs[forkNum] > 0) return; /* created and opened already... */ @@ -223,11 +251,66 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo) } } + fd_pca = -1; + fd_pcd = -1; + if (IS_COMPRESSED_MAINFORK(reln, forkNum)) + { + /* close main fork file */ + FileClose(fd); + fd = -1; + + /* open page compression address file */ + pcfile_path = psprintf("%s_pca", path); + fd_pca = PathNameOpenFile(pcfile_path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY); + + if (fd_pca < 0) + { + int save_errno = errno; + + if (isRedo) + fd_pca = PathNameOpenFile(pcfile_path, O_RDWR | PG_BINARY); + if (fd_pca < 0) + { + /* be sure to report the error reported by create, not open */ + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", pcfile_path))); + } + } + pfree(pcfile_path); + + /* open page compression data file */ + pcfile_path = psprintf("%s_pcd", path); + fd_pcd = PathNameOpenFile(pcfile_path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY); + + if (fd_pcd < 0) + { + int save_errno = errno; + + if (isRedo) + fd_pcd = PathNameOpenFile(pcfile_path, O_RDWR | PG_BINARY); + if (fd_pcd < 0) + { + /* be sure to report the error reported by create, not open */ + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", pcfile_path))); + } + } + pfree(pcfile_path); + + SetupPageCompressMemoryMap(fd_pca, fd_pcd, PAGE_COMPRESS_CHUNK_SIZE(reln), PAGE_COMPRESS_ALGORITHM(reln)); + } + pfree(path); _fdvec_resize(reln, forkNum, 1); mdfd = &reln->md_seg_fds[forkNum][0]; mdfd->mdfd_vfd = fd; + mdfd->mdfd_vfd_pca = fd_pca; + mdfd->mdfd_vfd_pcd = fd_pcd; mdfd->mdfd_segno = 0; } @@ -347,12 +430,101 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forkNum, bool isRedo) (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); } + + /* Next delete page compression address file and data file */ + if (rlocator.locator.compressOpt.algorithm != PAGE_COMPRESSION_NONE && + forkNum == MAIN_FORKNUM) + { + char *pcfile_path; + + pcfile_path = psprintf("%s_pca", path); + ret = unlink(pcfile_path); + if (ret < 0 && errno != ENOENT) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", pcfile_path))); + pfree(pcfile_path); + + pcfile_path = psprintf("%s_pcd", path); + ret = do_truncate(pcfile_path); + if (ret == 0 || errno != ENOENT) + { + ret = unlink(pcfile_path); + if (ret < 0 && errno != ENOENT) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", pcfile_path))); + } + pfree(pcfile_path); + } } else { /* Prevent other backends' fds from holding on to the disk space */ ret = do_truncate(path); + if (rlocator.locator.compressOpt.algorithm != PAGE_COMPRESSION_NONE && + forkNum == MAIN_FORKNUM) + { + int fd; + char *pcfile_path; + + /* clear page compression address file */ + pcfile_path = psprintf("%s_pca", path); + fd = OpenTransientFile(pcfile_path, O_RDWR | PG_BINARY); + if (fd >= 0) + { + int save_errno; + PageCompressHeader *pcmap; + int chunk_size = BLCKSZ / rlocator.locator.compressOpt.chunks_pre_block; + + pcmap = pc_mmap(fd, chunk_size, false); + if (pcmap == MAP_FAILED) + { + ereport(WARNING, + (errcode_for_dynamic_shared_memory(), + errmsg("could not mmap file \"%s\": %m", + pcfile_path))); + } + else + { + pg_atomic_write_u32(&pcmap->nblocks, 0); + pg_atomic_write_u32(&pcmap->allocated_chunks, 0); + memset((char *) pcmap + SizeOfPageCompressHeaderData, + 0x00, + SizeofPageCompressAddrFile(chunk_size) - SizeOfPageCompressHeaderData); + + if (sync_pcmap(pcmap, WAIT_EVENT_COMPRESS_ADDRESS_FILE_SYNC) != 0) + ereport(WARNING, + (errcode_for_dynamic_shared_memory(), + errmsg("could not msync file \"%s\": %m", + pcfile_path))); + + if (pc_munmap(pcmap) != 0) + ereport(WARNING, + (errcode_for_dynamic_shared_memory(), + errmsg("could not munmap file \"%s\": %m", + pcfile_path))); + } + + save_errno = errno; + CloseTransientFile(fd); + errno = save_errno; + } + else if(errno != ENOENT) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", pcfile_path))); + } + pfree(pcfile_path); + + /* truncate page compression data file */ + pcfile_path = psprintf("%s_pcd", path); + ret = do_truncate(pcfile_path); + pfree(pcfile_path); + } + /* Register request to unlink first segment later */ register_unlink_segment(rlocator, forkNum, 0 /* first seg */ ); } @@ -398,6 +570,35 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forkNum, bool isRedo) errmsg("could not remove file \"%s\": %m", segpath))); break; } + + if ((rlocator.locator.compressOpt.algorithm != PAGE_COMPRESSION_NONE && + forkNum == MAIN_FORKNUM)) + { + char *pcfile_segpath; + + pcfile_segpath = psprintf("%s_pca", segpath); + if (unlink(pcfile_segpath) < 0) + { + /* ENOENT is expected after the last segment... */ + if (errno != ENOENT) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", pcfile_segpath))); + } + pfree(pcfile_segpath); + + pcfile_segpath = psprintf("%s_pcd", segpath); + do_truncate(pcfile_segpath); + if (unlink(pcfile_segpath) < 0) + { + /* ENOENT is expected after the last segment... */ + if (errno != ENOENT) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", pcfile_segpath))); + } + pfree(pcfile_segpath); + } } pfree(segpath); } @@ -427,6 +628,9 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, Assert(blocknum >= mdnblocks(reln, forknum)); #endif + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + return mdextend_pc(reln, forknum, blocknum, buffer, skipFsync); + /* * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any * more --- we mustn't create a block whose number actually is @@ -484,27 +688,81 @@ mdopenfork(SMgrRelation reln, ForkNumber forknum, int behavior) { MdfdVec *mdfd; char *path; - File fd; + File fd, + fd_pca, + fd_pcd; /* No work if already open */ if (reln->md_num_open_segs[forknum] > 0) return &reln->md_seg_fds[forknum][0]; - path = relpath(reln->smgr_rlocator, forknum); + fd = -1; + fd_pca = -1; + fd_pcd = -1; + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + char *pcfile_path; + + path = relpath(reln->smgr_rlocator, forknum); + + /* open page compression address file */ + pcfile_path = psprintf("%s_pca", path); + fd_pca = PathNameOpenFile(pcfile_path, O_RDWR | PG_BINARY); + + if (fd_pca < 0) + { + if ((behavior & EXTENSION_RETURN_NULL) && + FILE_POSSIBLY_DELETED(errno)) + { + pfree(path); + pfree(pcfile_path); + return NULL; + } + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", pcfile_path))); + } + pfree(pcfile_path); - fd = PathNameOpenFile(path, O_RDWR | PG_BINARY); + /* open page compression data file */ + pcfile_path = psprintf("%s_pcd", path); + fd_pcd = PathNameOpenFile(pcfile_path, O_RDWR | PG_BINARY); - if (fd < 0) + if (fd_pcd < 0) + { + if ((behavior & EXTENSION_RETURN_NULL) && + FILE_POSSIBLY_DELETED(errno)) + { + pfree(path); + pfree(pcfile_path); + return NULL; + } + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", pcfile_path))); + } + pfree(pcfile_path); + + SetupPageCompressMemoryMap(fd_pca, fd_pcd, PAGE_COMPRESS_CHUNK_SIZE(reln), PAGE_COMPRESS_ALGORITHM(reln)); + } + else { - if ((behavior & EXTENSION_RETURN_NULL) && - FILE_POSSIBLY_DELETED(errno)) + path = relpath(reln->smgr_rlocator, forknum); + + fd = PathNameOpenFile(path, O_RDWR | PG_BINARY); + + if (fd < 0) { - pfree(path); - return NULL; + if ((behavior & EXTENSION_RETURN_NULL) && + FILE_POSSIBLY_DELETED(errno)) + { + pfree(path); + return NULL; + } + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); } - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", path))); } pfree(path); @@ -512,6 +770,8 @@ mdopenfork(SMgrRelation reln, ForkNumber forknum, int behavior) _fdvec_resize(reln, forknum, 1); mdfd = &reln->md_seg_fds[forknum][0]; mdfd->mdfd_vfd = fd; + mdfd->mdfd_vfd_pca = fd_pca; + mdfd->mdfd_vfd_pcd = fd_pcd; mdfd->mdfd_segno = 0; Assert(_mdnblocks(reln, forknum, mdfd) <= ((BlockNumber) RELSEG_SIZE)); @@ -547,7 +807,13 @@ mdclose(SMgrRelation reln, ForkNumber forknum) { MdfdVec *v = &reln->md_seg_fds[forknum][nopensegs - 1]; - FileClose(v->mdfd_vfd); + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + FileClose(v->mdfd_vfd_pca); + FileClose(v->mdfd_vfd_pcd); + } + else + FileClose(v->mdfd_vfd); _fdvec_resize(reln, forknum, nopensegs - 1); nopensegs--; } @@ -568,11 +834,63 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) if (v == NULL) return false; - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + int chunk_size, + i, + range; + PageCompressHeader *pcmap; + PageCompressAddr *pcaddr; - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(v->mdfd_vfd_pca, chunk_size); + pcaddr = GetPageCompressAddr(pcmap, chunk_size, blocknum); - (void) FilePrefetch(v->mdfd_vfd, seekpos, BLCKSZ, WAIT_EVENT_DATA_FILE_PREFETCH); + /* check chunk number */ + if (pcaddr->nchunks > pcaddr->allocated_chunks || pcaddr->allocated_chunks > MaxChunksPreCompressedPage(chunk_size)) + { + if (zero_damaged_pages || InRecovery) + return true; + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunks %u/%u of block %u in file \"%s\"", + pcaddr->nchunks, pcaddr->allocated_chunks, blocknum, FilePathName(v->mdfd_vfd_pca)))); + } + + for (i = 0; i < pcaddr->nchunks; i++) + { + if (pcaddr->chunknos[i] <= 0 || pcaddr->chunknos[i] > MAX_PAGE_COMPRESS_CHUNK_NUMBER(chunk_size)) + { + if (zero_damaged_pages || InRecovery) + return true; + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk number %u of block %u in file \"%s\"", + pcaddr->chunknos[i], blocknum, FilePathName(v->mdfd_vfd_pca)))); + } + + seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, pcaddr->chunknos[i]); + range = 1; + while (i < pcaddr->nchunks - 1 && + pcaddr->chunknos[i + 1] == pcaddr->chunknos[i] + 1) + { + range++; + i++; + } + + (void) FilePrefetch(v->mdfd_vfd_pcd, seekpos, chunk_size * range, WAIT_EVENT_DATA_FILE_PREFETCH); + } + } + else + { + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + + (void) FilePrefetch(v->mdfd_vfd, seekpos, BLCKSZ, WAIT_EVENT_DATA_FILE_PREFETCH); + } #endif /* USE_PREFETCH */ return true; @@ -624,9 +942,70 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum, Assert(nflush >= 1); Assert(nflush <= nblocks); - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + int i, + chunk_size; + PageCompressHeader *pcmap; + PageCompressAddr *pcaddr; + BlockNumber iblock; + pc_chunk_number_t seekpos_chunk, + last_chunk, + nchunks; - FileWriteback(v->mdfd_vfd, seekpos, (off_t) BLCKSZ * nflush, WAIT_EVENT_DATA_FILE_FLUSH); + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(v->mdfd_vfd_pca, chunk_size); + + seekpos_chunk = -1; + last_chunk = -1; + for (iblock = 0; iblock < nflush; iblock++) + { + /* flush one block */ + pcaddr = GetPageCompressAddr(pcmap, chunk_size, blocknum + iblock); + + for (i = 0; i < pcaddr->nchunks; i++) + { + if (seekpos_chunk == -1) + { + seekpos_chunk = pcaddr->chunknos[i]; + last_chunk = seekpos_chunk; + } + else if (pcaddr->chunknos[i] == last_chunk + 1) + { + last_chunk++; + } + else + { + /* + * from here the chunks is discontinuous, flush + * previous chuncks range + */ + seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, seekpos_chunk); + nchunks = 1 + last_chunk - seekpos_chunk; + + FileWriteback(v->mdfd_vfd_pcd, seekpos, (off_t) chunk_size * nchunks, WAIT_EVENT_DATA_FILE_FLUSH); + + seekpos_chunk = pcaddr->chunknos[i]; + last_chunk = seekpos_chunk; + } + } + } + + /* flush the rest chuncks */ + if (seekpos_chunk != -1) + { + seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, seekpos_chunk); + nchunks = 1 + last_chunk - seekpos_chunk; + + FileWriteback(v->mdfd_vfd_pcd, seekpos, (off_t) chunk_size * nchunks, WAIT_EVENT_DATA_FILE_FLUSH); + } + } + else + { + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + + FileWriteback(v->mdfd_vfd, seekpos, (off_t) BLCKSZ * nflush, WAIT_EVENT_DATA_FILE_FLUSH); + } nblocks -= nflush; blocknum += nflush; @@ -644,6 +1023,9 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nbytes; MdfdVec *v; + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + return mdread_pc(reln, forknum, blocknum, buffer); + TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum, reln->smgr_rlocator.locator.spcOid, reln->smgr_rlocator.locator.dbOid, @@ -714,6 +1096,9 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, Assert(blocknum < mdnblocks(reln, forknum)); #endif + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + return mdwrite_pc(reln, forknum, blocknum, buffer, skipFsync); + TRACE_POSTGRESQL_SMGR_MD_WRITE_START(forknum, blocknum, reln->smgr_rlocator.locator.spcOid, reln->smgr_rlocator.locator.dbOid, @@ -828,7 +1213,9 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) { BlockNumber curnblk; BlockNumber priorblocks; - int curopensegs; + int curopensegs, + chunk_size; + PageCompressHeader *pcmap; /* * NOTE: mdnblocks makes sure we have opened all active segments, so that @@ -848,6 +1235,21 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) if (nblocks == curnblk) return; /* no work */ + /* + * The chunks allocated to a page of compressed relation may be + * discontinuous, if only truncate the unused space at the end of the pcd + * file, results in some chunks with data remaining in the pcd file that + * should have been deleted. If crash occurs, the pca file will be + * repaired with the pcd file during recovery, and these chunks may be + * restored by mistake. And padding these chunks with zeros is expensive. + * So truancte compressed relation in vacuum has been disabled, + * and the following error should never occurs. + */ + if (IS_COMPRESSED_MAINFORK(reln, forknum) && nblocks != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("truncate part of compressed relation is not supported"))); + /* * Truncate segments, starting at the last one. Starting at the end makes * managing the memory for the fd array easier, should there be errors. @@ -867,11 +1269,37 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) * This segment is no longer active. We truncate the file, but do * not delete it, for reasons explained in the header comments. */ - if (FileTruncate(v->mdfd_vfd, 0, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not truncate file \"%s\": %m", - FilePathName(v->mdfd_vfd)))); + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(v->mdfd_vfd_pca, chunk_size); + + pg_atomic_write_u32(&pcmap->nblocks, 0); + pg_atomic_write_u32(&pcmap->allocated_chunks, 0); + memset((char *) pcmap + SizeOfPageCompressHeaderData, + 0x00, + SizeofPageCompressAddrFile(chunk_size) - SizeOfPageCompressHeaderData); + + if (sync_pcmap(pcmap, WAIT_EVENT_COMPRESS_ADDRESS_FILE_SYNC) != 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_dynamic_shared_memory(), + errmsg("could not msync file \"%s\": %m", + FilePathName(v->mdfd_vfd_pca)))); + + if (FileTruncate(v->mdfd_vfd_pcd, 0, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\": %m", + FilePathName(v->mdfd_vfd_pcd)))); + } + else + { + if (FileTruncate(v->mdfd_vfd, 0, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\": %m", + FilePathName(v->mdfd_vfd)))); + } if (!SmgrIsTemp(reln)) register_dirty_segment(reln, forknum, v); @@ -879,7 +1307,14 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) /* we never drop the 1st segment */ Assert(v != &reln->md_seg_fds[forknum][0]); - FileClose(v->mdfd_vfd); + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + FileClose(v->mdfd_vfd_pca); + FileClose(v->mdfd_vfd_pcd); + } + else + FileClose(v->mdfd_vfd); + _fdvec_resize(reln, forknum, curopensegs - 1); } else if (priorblocks + ((BlockNumber) RELSEG_SIZE) > nblocks) @@ -893,12 +1328,41 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) */ BlockNumber lastsegblocks = nblocks - priorblocks; - if (FileTruncate(v->mdfd_vfd, (off_t) lastsegblocks * BLCKSZ, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not truncate file \"%s\" to %u blocks: %m", - FilePathName(v->mdfd_vfd), - nblocks))); + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + Assert(lastsegblocks == 0); + + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(v->mdfd_vfd_pca, chunk_size); + + pg_atomic_write_u32(&pcmap->nblocks, 0); + pg_atomic_write_u32(&pcmap->allocated_chunks, 0); + memset((char *) pcmap + SizeOfPageCompressHeaderData, + 0x00, + SizeofPageCompressAddrFile(chunk_size) - SizeOfPageCompressHeaderData); + + if (sync_pcmap(pcmap, WAIT_EVENT_COMPRESS_ADDRESS_FILE_SYNC) != 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_dynamic_shared_memory(), + errmsg("could not msync file \"%s\": %m", + FilePathName(v->mdfd_vfd_pca)))); + + if (FileTruncate(v->mdfd_vfd_pcd, 0, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\": %m", + FilePathName(v->mdfd_vfd_pcd)))); + } + else + { + if (FileTruncate(v->mdfd_vfd, (off_t) lastsegblocks * BLCKSZ, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\" to %u blocks: %m", + FilePathName(v->mdfd_vfd), + nblocks))); + } + if (!SmgrIsTemp(reln)) register_dirty_segment(reln, forknum, v); } @@ -952,16 +1416,43 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum) { MdfdVec *v = &reln->md_seg_fds[forknum][segno - 1]; - if (FileSync(v->mdfd_vfd, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) < 0) - ereport(data_sync_elevel(ERROR), - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\": %m", - FilePathName(v->mdfd_vfd)))); + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + PageCompressHeader *pcmap; + + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(v->mdfd_vfd_pca, PAGE_COMPRESS_CHUNK_SIZE(reln)); + + if (sync_pcmap(pcmap, WAIT_EVENT_COMPRESS_ADDRESS_FILE_SYNC) != 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_dynamic_shared_memory(), + errmsg("could not msync file \"%s\": %m", + FilePathName(v->mdfd_vfd_pca)))); + + if (FileSync(v->mdfd_vfd_pcd, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) < 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + FilePathName(v->mdfd_vfd_pcd)))); + } + else + { + if (FileSync(v->mdfd_vfd, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) < 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + FilePathName(v->mdfd_vfd)))); + } /* Close inactive segments immediately */ if (segno > min_inactive_seg) { - FileClose(v->mdfd_vfd); + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + FileClose(v->mdfd_vfd_pca); + FileClose(v->mdfd_vfd_pcd); + } + else + FileClose(v->mdfd_vfd); _fdvec_resize(reln, forknum, segno - 1); } @@ -993,11 +1484,32 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) ereport(DEBUG1, (errmsg_internal("could not forward fsync request because request queue is full"))); - if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0) - ereport(data_sync_elevel(ERROR), - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\": %m", - FilePathName(seg->mdfd_vfd)))); + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + PageCompressHeader *pcmap; + + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(seg->mdfd_vfd_pca, PAGE_COMPRESS_CHUNK_SIZE(reln)); + + if (sync_pcmap(pcmap, WAIT_EVENT_COMPRESS_ADDRESS_FILE_SYNC) != 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_dynamic_shared_memory(), + errmsg("could not msync file \"%s\": %m", + FilePathName(seg->mdfd_vfd_pca)))); + + if (FileSync(seg->mdfd_vfd_pcd, WAIT_EVENT_DATA_FILE_SYNC) < 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + FilePathName(seg->mdfd_vfd_pcd)))); + } + else + { + if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + FilePathName(seg->mdfd_vfd)))); + } } } @@ -1039,7 +1551,7 @@ void ForgetDatabaseSyncRequests(Oid dbid) { FileTag tag; - RelFileLocator rlocator; + RelFileLocator rlocator = {0}; rlocator.dbOid = dbid; rlocator.spcOid = 0; @@ -1151,19 +1663,56 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno, int oflags) { MdfdVec *v; - File fd; - char *fullpath; + File fd, + fd_pca, + fd_pcd; + char *fullpath, + *pcfile_path; fullpath = _mdfd_segpath(reln, forknum, segno); /* open the file */ fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags); + if (fd < 0) + { + pfree(fullpath); + return NULL; + } + + fd_pca = -1; + fd_pcd = -1; + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + /* open page compression address file */ + pcfile_path = psprintf("%s_pca", fullpath); + fd_pca = PathNameOpenFile(pcfile_path, O_RDWR | PG_BINARY | oflags); + + pfree(pcfile_path); + + if (fd_pca < 0) + { + pfree(fullpath); + return NULL; + } + + /* open page compression data file */ + pcfile_path = psprintf("%s_pcd", fullpath); + fd_pcd = PathNameOpenFile(pcfile_path, O_RDWR | PG_BINARY | oflags); + + pfree(pcfile_path); + + if (fd_pcd < 0) + { + pfree(fullpath); + return NULL; + } + + SetupPageCompressMemoryMap(fd_pca, fd_pcd, PAGE_COMPRESS_CHUNK_SIZE(reln), PAGE_COMPRESS_ALGORITHM(reln)); + } + pfree(fullpath); - if (fd < 0) - return NULL; - /* * Segments are always opened in order from lowest to highest, so we must * be adding a new one at the end. @@ -1175,6 +1724,8 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno, /* fill the entry */ v = &reln->md_seg_fds[forknum][segno]; v->mdfd_vfd = fd; + v->mdfd_vfd_pca = fd_pca; + v->mdfd_vfd_pcd = fd_pcd; v->mdfd_segno = segno; Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE)); @@ -1325,6 +1876,13 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) { off_t len; + PageCompressHeader *pcmap; + + if (IS_COMPRESSED_MAINFORK(reln, forknum)) + { + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(seg->mdfd_vfd_pca, PAGE_COMPRESS_CHUNK_SIZE(reln)); + return (BlockNumber) pg_atomic_read_u32(&pcmap->nblocks); + } len = FileSize(seg->mdfd_vfd); if (len < 0) @@ -1351,6 +1909,82 @@ mdsyncfiletag(const FileTag *ftag, char *path) int result, save_errno; + if (IS_COMPRESSED_MAINFORK(reln, ftag->forknum)) + { + PageCompressHeader *pcmap; + + /* sync page compression address file */ + /* See if we already have the file open, or need to open it. */ + if (ftag->segno < reln->md_num_open_segs[ftag->forknum]) + { + file = reln->md_seg_fds[ftag->forknum][ftag->segno].mdfd_vfd_pca; + strlcpy(path, FilePathName(file), MAXPGPATH); + need_to_close = false; + } + else + { + char *p; + + p = _mdfd_segpath(reln, ftag->forknum, ftag->segno); + snprintf(path, MAXPGPATH, "%s_pca", p); + pfree(p); + + file = PathNameOpenFile(path, O_RDWR | PG_BINARY); + if (file < 0) + return -1; + + need_to_close = true; + + SetupPageCompressMemoryMap(file, (File) -1, PAGE_COMPRESS_CHUNK_SIZE(reln), PAGE_COMPRESS_ALGORITHM(reln)); + } + + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(file, PAGE_COMPRESS_CHUNK_SIZE(reln)); + result = sync_pcmap(pcmap, WAIT_EVENT_COMPRESS_ADDRESS_FILE_SYNC); + save_errno = errno; + + if (need_to_close) + FileClose(file); + + if (result != 0) + { + errno = save_errno; + return result; + } + + /* sync page compression data file */ + /* See if we already have the file open, or need to open it. */ + if (ftag->segno < reln->md_num_open_segs[ftag->forknum]) + { + file = reln->md_seg_fds[ftag->forknum][ftag->segno].mdfd_vfd_pcd; + strlcpy(path, FilePathName(file), MAXPGPATH); + need_to_close = false; + } + else + { + char *p; + + p = _mdfd_segpath(reln, ftag->forknum, ftag->segno); + snprintf(path, MAXPGPATH, "%s_pcd", p); + pfree(p); + + file = PathNameOpenFile(path, O_RDWR | PG_BINARY); + if (file < 0) + return -1; + + need_to_close = true; + } + + /* Sync the page compression data file. */ + result = FileSync(file, WAIT_EVENT_DATA_FILE_SYNC); + save_errno = errno; + + if (need_to_close) + FileClose(file); + + errno = save_errno; + return result; + } + /* See if we already have the file open, or need to open it. */ if (ftag->segno < reln->md_num_open_segs[ftag->forknum]) { @@ -1392,15 +2026,29 @@ mdsyncfiletag(const FileTag *ftag, char *path) int mdunlinkfiletag(const FileTag *ftag, char *path) { + SMgrRelation reln = smgropen(ftag->rlocator, InvalidBackendId); char *p; + int ret; /* Compute the path. */ p = relpathperm(ftag->rlocator, MAIN_FORKNUM); strlcpy(path, p, MAXPGPATH); - pfree(p); /* Try to unlink the file. */ - return unlink(path); + ret = unlink(path); + + if (IS_COMPRESSED_MAINFORK(reln, ftag->forknum)) + { + snprintf(path, MAXPGPATH, "%s_pca", p); + unlink(path); + + snprintf(path, MAXPGPATH, "%s_pcd", p); + ret = unlink(path); + } + + pfree(p); + + return ret; } /* @@ -1419,3 +2067,736 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate) */ return ftag->rlocator.dbOid == candidate->rlocator.dbOid; } + +static int +sync_pcmap(PageCompressHeader *pcmap, uint32 wait_event_info) +{ + int returnCode; + uint32 nblocks, + allocated_chunks, + last_synced_nblocks, + last_synced_allocated_chunks; + + nblocks = pg_atomic_read_u32(&pcmap->nblocks); + allocated_chunks = pg_atomic_read_u32(&pcmap->allocated_chunks); + last_synced_nblocks = pg_atomic_read_u32(&pcmap->last_synced_nblocks); + last_synced_allocated_chunks = pg_atomic_read_u32(&pcmap->last_synced_allocated_chunks); + + pgstat_report_wait_start(wait_event_info); + returnCode = pc_msync(pcmap); + pgstat_report_wait_end(); + + if (returnCode == 0) + { + if (last_synced_nblocks != nblocks) + pg_atomic_write_u32(&pcmap->last_synced_nblocks, nblocks); + + if (last_synced_allocated_chunks != allocated_chunks) + pg_atomic_write_u32(&pcmap->last_synced_allocated_chunks, allocated_chunks); + } + + return returnCode; +} + +static void +build_compression_chunks(char *data, int size, char *chunk_buf, int total_chunks, + PageCompressAddr *pcaddr, int chunk_size, BlockNumber blocknum) +{ + int data_chunks = NeedPageCompressChunksToStoreData(chunk_size, size); + int i; + + /* build chunks of compressed page */ + for (i = 0; i < total_chunks; i++) + { + char *pos; + PageCompressChunk *pcchunk = (PageCompressChunk *) (chunk_buf + chunk_size * i); + + if (i < data_chunks - 1) + { + pos = data + StoreCapacityPerPageCompressChunk(chunk_size) * i; + memcpy(pcchunk->data, pos, StoreCapacityPerPageCompressChunk(chunk_size)); + pcchunk->withdata = 1; + } + else if (i == data_chunks - 1) + { + int datasize = size - StoreCapacityPerPageCompressChunk(chunk_size) * i; + + pos = data + StoreCapacityPerPageCompressChunk(chunk_size) * i; + memcpy(pcchunk->data, pos, datasize); + + /* fill zero in the rest space */ + if (StoreCapacityPerPageCompressChunk(chunk_size) > datasize) + memset(pcchunk->data + datasize, 0x00, StoreCapacityPerPageCompressChunk(chunk_size) - datasize); + + pcchunk->withdata = 1; + } + else + { + /* fill zero in the prealloced chunk */ + memset(pcchunk->data, 0x00, StoreCapacityPerPageCompressChunk(chunk_size)); + pcchunk->withdata = 0; + } + + pcchunk->blockno = blocknum; + pcchunk->chunkseq = i + 1; + if (DataChecksumsEnabled()) + pcchunk->checksum = pg_checksum_compress_chunk((char *) pcchunk, chunk_size, pcaddr->chunknos[i]); + else + pcchunk->checksum = 0; + } +} + +/* + * mdextend_pc() -- Add a block to the specified compressed relation. + * + */ +static void +mdextend_pc(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer, bool skipFsync) +{ + MdfdVec *v; + char *work_buffer, + *chunk_buffer; + int i, + work_buffer_size, + compressed_page_size; + int prealloc_chunks, + need_chunks, + write_chunks, + chunk_size, + nchunks; + pc_chunk_number_t chunkno; + PageCompressHeader *pcmap; + PageCompressAddr *pcaddr; + uint8 algorithm; + int8 level; + + /* This assert is too expensive to have on normally ... */ +#ifdef CHECK_WRITE_VS_EXTEND + Assert(blocknum >= mdnblocks(reln, forknum)); +#endif + + Assert(IS_COMPRESSED_MAINFORK(reln, forknum)); + + /* + * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any + * more --- we mustn't create a block whose number actually is + * InvalidBlockNumber. + */ + if (blocknum == InvalidBlockNumber) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("cannot extend file \"%s\" beyond %u blocks", + relpath(reln->smgr_rlocator, forknum), + InvalidBlockNumber))); + + v = _mdfd_getseg(reln, MAIN_FORKNUM, blocknum, skipFsync, EXTENSION_CREATE); + + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + algorithm = PAGE_COMPRESS_ALGORITHM(reln); + level = PAGE_COMPRESS_LEVEL(reln); + prealloc_chunks = PAGE_COMPRESS_PREALLOC_CHUNKS(reln); + if (prealloc_chunks > BLCKSZ / chunk_size - 1) + prealloc_chunks = BLCKSZ / chunk_size - 1; + + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(v->mdfd_vfd_pca, chunk_size); + pcaddr = GetPageCompressAddr(pcmap, chunk_size, blocknum); + + Assert(blocknum % RELSEG_SIZE >= pg_atomic_read_u32(&pcmap->nblocks)); + + /* + * If the initial allocated space is not enough, it may cause the storage + * space to be discontinuous. Therefore, postpone space allocation of the + * new page until the page is flushed later. + */ + if (PageIsNew(buffer)) + { + if (pg_atomic_read_u32(&pcmap->nblocks) < blocknum % RELSEG_SIZE + 1) + pg_atomic_write_u32(&pcmap->nblocks, blocknum % RELSEG_SIZE + 1); + + if (!skipFsync && !SmgrIsTemp(reln)) + register_dirty_segment(reln, forknum, v); + + Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE)); + + return; + } + + /* check allocated chunk number */ + if (pcaddr->allocated_chunks > MaxChunksPreCompressedPage(chunk_size)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid allocated chunks %u of block %u in file \"%s\"", + pcaddr->allocated_chunks, blocknum, FilePathName(v->mdfd_vfd_pca)))); + + for (i = 0; i < pcaddr->allocated_chunks; i++) + { + if (pcaddr->chunknos[i] <= 0 || pcaddr->chunknos[i] > MAX_PAGE_COMPRESS_CHUNK_NUMBER(chunk_size)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk number %u of block %u in file \"%s\"", + pcaddr->chunknos[i], blocknum, FilePathName(v->mdfd_vfd_pca)))); + } + + /* compress page */ + work_buffer_size = compress_page_buffer_bound(algorithm); + if (work_buffer_size < 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unrecognized compression algorithm %d", + algorithm))); + work_buffer = palloc(work_buffer_size); + + compressed_page_size = compress_page(buffer, work_buffer, work_buffer_size, algorithm, level); + + if (compressed_page_size < 0) + elog(ERROR, "compression failed with algorithm %d", algorithm); + + nchunks = NeedPageCompressChunksToStoreData(chunk_size, compressed_page_size); + if (nchunks > MaxChunksPreCompressedPage(chunk_size)) + elog(ERROR, "size of compressed page %d exceeds the limit when write block %u of file \"%s\"", + compressed_page_size, blocknum, FilePathName(v->mdfd_vfd)); + + /* allocate chunks needed */ + need_chunks = prealloc_chunks > nchunks ? prealloc_chunks : nchunks; + if (pcaddr->allocated_chunks < need_chunks) + { + int max_chunkno = pg_atomic_read_u32(&pcmap->allocated_chunks) + need_chunks - pcaddr->allocated_chunks + 1; + + if (max_chunkno > MAX_PAGE_COMPRESS_CHUNK_NUMBER(chunk_size)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("try to allocate too many chunks %d in file \"%s\"", + max_chunkno, FilePathName(v->mdfd_vfd_pca)))); + + /* + * First atomically increase the global number of chunks, which allows + * multiple backends to allocate chunks concurrently. + */ + chunkno = (pc_chunk_number_t) pg_atomic_fetch_add_u32(&pcmap->allocated_chunks, + need_chunks - pcaddr->allocated_chunks) + 1; + for (i = pcaddr->allocated_chunks; i < need_chunks; i++, chunkno++) + { + pcaddr->chunknos[i] = chunkno; + } + pcaddr->allocated_chunks = need_chunks; + + /* + * Every time COMPRESS_ADDRESS_SYNC_THRESHOLD chunks are allocated, + * the address fileis flashed once to avoid too many addresses that + * need to be repaired after crash. + */ + if (pg_atomic_read_u32(&pcmap->allocated_chunks) - pg_atomic_read_u32(&pcmap->last_synced_allocated_chunks) > + COMPRESS_ADDRESS_SYNC_THRESHOLD(chunk_size)) + { + if (sync_pcmap(pcmap, WAIT_EVENT_COMPRESS_ADDRESS_FILE_FLUSH) != 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_dynamic_shared_memory(), + errmsg("could not msync file \"%s\": %m", + FilePathName(v->mdfd_vfd_pca)))); + } + + write_chunks = need_chunks; + } + else + { + write_chunks = nchunks >= pcaddr->nchunks ? nchunks : pcaddr->nchunks; + if (write_chunks > pcaddr->allocated_chunks) + write_chunks = pcaddr->allocated_chunks; + } + + /* build chunks of compressed page */ + chunk_buffer = palloc(chunk_size * write_chunks); + build_compression_chunks(work_buffer, compressed_page_size, chunk_buffer, write_chunks, pcaddr, chunk_size, blocknum); + + /* write chunks of compressed page */ + for (i = 0; i < need_chunks; i++) + { + char *pos = chunk_buffer + chunk_size * i; + off_t seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, pcaddr->chunknos[i]); + int range = 1; + int write_amount, + nbytes; + + /* write multiple consecutive chunks at once */ + while (i < need_chunks - 1 && pcaddr->chunknos[i + 1] == pcaddr->chunknos[i] + 1) + { + range++; + i++; + } + write_amount = chunk_size * range; + + TRACE_POSTGRESQL_SMGR_MD_WRITE_START(forknum, blocknum, + reln->smgr_rlocator.locator.spcNode, + reln->smgr_rlocator.locator.dbNode, + reln->smgr_rlocator.locator.relNode, + reln->smgr_rlocator.backend); + + nbytes = FileWrite(v->mdfd_vfd_pcd, pos, write_amount, seekpos, WAIT_EVENT_DATA_FILE_EXTEND); + + TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, + reln->smgr_rlocator.locator.spcNode, + reln->smgr_rlocator.locator.dbNode, + reln->smgr_rlocator.locator.relNode, + reln->smgr_rlocator.backend, + nbytes, + write_amount); + + if (nbytes != write_amount) + { + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not extend file \"%s\": %m", + FilePathName(v->mdfd_vfd_pcd)), + errhint("Check free disk space."))); + + /* short write: complain appropriately */ + ereport(ERROR, + (errcode(ERRCODE_DISK_FULL), + errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u", + FilePathName(v->mdfd_vfd_pcd), + nbytes, write_amount, blocknum), + errhint("Check free disk space."))); + } + } + + /* finally update size of this page and global nblocks */ + if (pcaddr->nchunks != nchunks) + pcaddr->nchunks = nchunks; + + if (pg_atomic_read_u32(&pcmap->nblocks) < blocknum % RELSEG_SIZE + 1) + pg_atomic_write_u32(&pcmap->nblocks, blocknum % RELSEG_SIZE + 1); + + if (work_buffer != buffer) + pfree(work_buffer); + + if (!skipFsync && !SmgrIsTemp(reln)) + register_dirty_segment(reln, forknum, v); + + Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE)); +} + +/* + * mdwrite_pc() -- Write the supplied block to a compressed relation. + * + * This is to be used only for updating already-existing blocks of a + * relation (ie, those before the current EOF). To extend a relation, + * use mdextend(). + */ +static void +mdwrite_pc(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer, bool skipFsync) +{ + MdfdVec *v; + char *work_buffer, + *chunk_buffer; + int i, + work_buffer_size, + compressed_page_size; + int prealloc_chunks, + need_chunks, + write_chunks, + chunk_size, + nchunks; + pc_chunk_number_t chunkno; + PageCompressHeader *pcmap; + PageCompressAddr *pcaddr; + uint8 algorithm; + int8 level; + + /* This assert is too expensive to have on normally ... */ +#ifdef CHECK_WRITE_VS_EXTEND + Assert(blocknum < mdnblocks(reln, forknum)); +#endif + + Assert(IS_COMPRESSED_MAINFORK(reln, forknum)); + + v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + algorithm = PAGE_COMPRESS_ALGORITHM(reln); + level = PAGE_COMPRESS_LEVEL(reln); + prealloc_chunks = PAGE_COMPRESS_PREALLOC_CHUNKS(reln); + if (prealloc_chunks > BLCKSZ / chunk_size - 1) + prealloc_chunks = BLCKSZ / chunk_size - 1; + + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(v->mdfd_vfd_pca, chunk_size); + pcaddr = GetPageCompressAddr(pcmap, chunk_size, blocknum); + + Assert(blocknum % RELSEG_SIZE < pg_atomic_read_u32(&pcmap->nblocks)); + + /* check allocated chunk number */ + if (pcaddr->allocated_chunks > MaxChunksPreCompressedPage(chunk_size)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid allocated chunks %u of block %u in file \"%s\"", + pcaddr->allocated_chunks, blocknum, FilePathName(v->mdfd_vfd_pca)))); + + for (i = 0; i < pcaddr->allocated_chunks; i++) + { + if (pcaddr->chunknos[i] <= 0 || pcaddr->chunknos[i] > MAX_PAGE_COMPRESS_CHUNK_NUMBER(chunk_size)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk number %u of block %u in file \"%s\"", + pcaddr->chunknos[i], blocknum, FilePathName(v->mdfd_vfd_pca)))); + } + + /* compress page */ + work_buffer_size = compress_page_buffer_bound(algorithm); + if (work_buffer_size < 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unrecognized compression algorithm %d", + algorithm))); + work_buffer = palloc(work_buffer_size); + + compressed_page_size = compress_page(buffer, work_buffer, work_buffer_size, algorithm, level); + + if (compressed_page_size < 0) + elog(ERROR, "compression failed with algorithm %d", algorithm); + + nchunks = NeedPageCompressChunksToStoreData(chunk_size, compressed_page_size); + if (nchunks > MaxChunksPreCompressedPage(chunk_size)) + elog(ERROR, "size of compressed page %d exceeds the limit when write block %u of file \"%s\"", + compressed_page_size, blocknum, FilePathName(v->mdfd_vfd)); + + /* allocate chunks needed */ + need_chunks = prealloc_chunks > nchunks ? prealloc_chunks : nchunks; + if (pcaddr->allocated_chunks < need_chunks) + { + int max_chunkno = pg_atomic_read_u32(&pcmap->allocated_chunks) + need_chunks - pcaddr->allocated_chunks + 1; + + if (max_chunkno > MAX_PAGE_COMPRESS_CHUNK_NUMBER(chunk_size)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("try to allocate too many chunks %d in file \"%s\"", + max_chunkno, FilePathName(v->mdfd_vfd_pca)))); + + /* + * First atomically increase the global number of chunks, which allows + * multiple backends to allocate chunks concurrently. + */ + chunkno = (pc_chunk_number_t) pg_atomic_fetch_add_u32(&pcmap->allocated_chunks, + need_chunks - pcaddr->allocated_chunks) + 1; + for (i = pcaddr->allocated_chunks; i < need_chunks; i++, chunkno++) + { + pcaddr->chunknos[i] = chunkno; + } + pcaddr->allocated_chunks = need_chunks; + + /* + * Every time COMPRESS_ADDRESS_SYNC_THRESHOLD chunks are allocated, + * the address fileis flashed once to avoid too many addresses that + * need to be repaired after crash. + */ + if (pg_atomic_read_u32(&pcmap->allocated_chunks) - pg_atomic_read_u32(&pcmap->last_synced_allocated_chunks) > + COMPRESS_ADDRESS_SYNC_THRESHOLD(chunk_size)) + { + if (sync_pcmap(pcmap, WAIT_EVENT_COMPRESS_ADDRESS_FILE_FLUSH) != 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_dynamic_shared_memory(), + errmsg("could not msync file \"%s\": %m", + FilePathName(v->mdfd_vfd_pca)))); + } + + write_chunks = need_chunks; + } + else + { + write_chunks = nchunks >= pcaddr->nchunks ? nchunks : pcaddr->nchunks; + if (write_chunks > pcaddr->allocated_chunks) + write_chunks = pcaddr->allocated_chunks; + } + + /* build chunks of compressed page */ + chunk_buffer = palloc(chunk_size * write_chunks); + build_compression_chunks(work_buffer, compressed_page_size, chunk_buffer, write_chunks, pcaddr, chunk_size, blocknum); + + /* write chunks of compressed page */ + for (i = 0; i < need_chunks; i++) + { + char *pos = chunk_buffer + chunk_size * i; + off_t seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, pcaddr->chunknos[i]); + int range = 1; + int write_amount, + nbytes; + + /* write multiple consecutive chunks at once */ + while (i < need_chunks - 1 && pcaddr->chunknos[i + 1] == pcaddr->chunknos[i] + 1) + { + range++; + i++; + } + write_amount = chunk_size * range; + + TRACE_POSTGRESQL_SMGR_MD_WRITE_START(forknum, blocknum, + reln->smgr_rlocator.locator.spcNode, + reln->smgr_rlocator.locator.dbNode, + reln->smgr_rlocator.locator.relNode, + reln->smgr_rlocator.backend); + + nbytes = FileWrite(v->mdfd_vfd_pcd, pos, write_amount, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + + TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, + reln->smgr_rlocator.locator.spcNode, + reln->smgr_rlocator.locator.dbNode, + reln->smgr_rlocator.locator.relNode, + reln->smgr_rlocator.backend, + nbytes, + write_amount); + + if (nbytes != write_amount) + { + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write block %u in file \"%s\": %m", + blocknum, FilePathName(v->mdfd_vfd_pcd)))); + /* short write: complain appropriately */ + ereport(ERROR, + (errcode(ERRCODE_DISK_FULL), + errmsg("could not write block %u in file \"%s\": wrote only %d of %d bytes", + blocknum, + FilePathName(v->mdfd_vfd_pcd), + nbytes, write_amount), + errhint("Check free disk space."))); + } + } + + /* finally update size of this page */ + if (pcaddr->nchunks != nchunks) + pcaddr->nchunks = nchunks; + + if (work_buffer != buffer) + pfree(work_buffer); + + if (!skipFsync && !SmgrIsTemp(reln)) + register_dirty_segment(reln, forknum, v); +} + +/* + * mdread_pc() -- Read the specified block from a compressed relation. + */ +static void +mdread_pc(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + char *buffer) +{ + off_t seekpos; + int nbytes, + chunk_size, + i, + read_amount, + range, + nchunks; + MdfdVec *v; + PageCompressHeader *pcmap; + PageCompressAddr *pcaddr; + PageCompressChunk *pcchunk; + PageCompressData *pcdata; + char *compress_buffer, + *buffer_pos; + + Assert(IS_COMPRESSED_MAINFORK(reln, forkNum)); + + v = _mdfd_getseg(reln, forknum, blocknum, false, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + chunk_size = PAGE_COMPRESS_CHUNK_SIZE(reln); + pcmap = (PageCompressHeader *) GetPageCompressMemoryMap(v->mdfd_vfd_pca, chunk_size); + pcaddr = GetPageCompressAddr(pcmap, chunk_size, blocknum); + + nchunks = pcaddr->nchunks; + if (nchunks == 0) + { + MemSet(buffer, 0, BLCKSZ); + return; + } + + /* check chunk number */ + if (nchunks > pcaddr->allocated_chunks || pcaddr->allocated_chunks > MaxChunksPreCompressedPage(chunk_size)) + { + if (zero_damaged_pages || InRecovery) + { + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunks %u/%u of block %u in file \"%s\"", + nchunks, pcaddr->allocated_chunks, blocknum, FilePathName(v->mdfd_vfd_pca)))); + } + + for (i = 0; i < nchunks; i++) + { + if (pcaddr->chunknos[i] <= 0 || pcaddr->chunknos[i] > MAX_PAGE_COMPRESS_CHUNK_NUMBER(chunk_size)) + { + if (zero_damaged_pages || InRecovery) + { + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk number %u of block %u in file \"%s\"", + pcaddr->chunknos[i], blocknum, FilePathName(v->mdfd_vfd_pca)))); + } + } + + /* read chunk data */ + compress_buffer = palloc(chunk_size * nchunks); + for (i = 0; i < nchunks; i++) + { + buffer_pos = compress_buffer + chunk_size * i; + seekpos = (off_t) OffsetOfPageCompressChunk(chunk_size, pcaddr->chunknos[i]); + range = 1; + while (i < nchunks - 1 && pcaddr->chunknos[i + 1] == pcaddr->chunknos[i] + 1) + { + range++; + i++; + } + read_amount = chunk_size * range; + + TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum, + reln->smgr_rlocator.locator.spcNode, + reln->smgr_rlocator.locator.dbNode, + reln->smgr_rlocator.locator.relNode, + reln->smgr_rlocator.backend); + + nbytes = FileRead(v->mdfd_vfd_pcd, buffer_pos, read_amount, seekpos, WAIT_EVENT_DATA_FILE_READ); + + TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, + reln->smgr_rlocator.locator.spcNode, + reln->smgr_rlocator.locator.dbNode, + reln->smgr_rlocator.locator.relNode, + reln->smgr_rlocator.backend, + nbytes, + read_amount); + + if (nbytes != read_amount) + { + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read block %u in file \"%s\": %m", + blocknum, FilePathName(v->mdfd_vfd_pcd)))); + + /* + * Short read: we are at or past EOF, or we read a partial block + * at EOF. Normally this is an error; upper levels should never + * try to read a nonexistent block. However, if + * zero_damaged_pages is ON or we are InRecovery, we should + * instead return zeroes without complaining. This allows, for + * example, the case of trying to update a block that was later + * truncated away. + */ + if (zero_damaged_pages || InRecovery) + { + pfree(compress_buffer); + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read block %u in file \"%s\": read only %d of %d bytes", + blocknum, FilePathName(v->mdfd_vfd_pcd), + nbytes, read_amount))); + } + } + + /* check chunks */ + pcchunk = (PageCompressChunk *) compress_buffer; + pcdata = (PageCompressData *) (pcchunk->data); + if (NeedPageCompressChunksToStoreData(chunk_size, SizeOfPageCompressDataHeaderData + pcdata->size) != nchunks) + { + if (zero_damaged_pages || InRecovery) + { + pfree(compress_buffer); + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("size of compressed data %u and chunk count %d for chunk %u of block %u are inconsistent in file \"%s\"", + pcdata->size, nchunks, pcaddr->chunknos[0], blocknum, + FilePathName(v->mdfd_vfd_pcd)))); + } + + for (i = 0; i < nchunks; i++) + { + pcchunk = (PageCompressChunk *) (compress_buffer + chunk_size * i); + if (pcchunk->blockno != blocknum || + pcchunk->chunkseq != i + 1) + { + if (zero_damaged_pages || InRecovery) + { + pfree(compress_buffer); + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("inconsistent chunk header %u(%d) for chunk %u of block %u(%d) in file \"%s\"", + pcchunk->blockno, pcchunk->chunkseq, + pcaddr->chunknos[i], blocknum, i + 1, FilePathName(v->mdfd_vfd_pcd)))); + } + } + + /* build decompress buffer */ + if (nchunks > 1) + { + char *tmp_buffer = palloc(chunk_size * nchunks); + + for (i = 0; i < nchunks; i++) + { + memcpy(tmp_buffer + SizeOfPageCompressChunkHeaderData + StoreCapacityPerPageCompressChunk(chunk_size) * i, + compress_buffer + SizeOfPageCompressChunkHeaderData + chunk_size * i, + StoreCapacityPerPageCompressChunk(chunk_size)); + } + pfree(compress_buffer); + compress_buffer = tmp_buffer; + pcchunk = (PageCompressChunk *) compress_buffer; + pcdata = (PageCompressData *) (pcchunk->data); + } + + /* decompress chunk data */ + nbytes = decompress_page((char *) pcdata, buffer, PAGE_COMPRESS_ALGORITHM(reln)); + if (nbytes != BLCKSZ) + { + if (nbytes == -2) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not recognized compression algorithm %d for file \"%s\"", + PAGE_COMPRESS_ALGORITHM(reln), + FilePathName(v->mdfd_vfd_pcd)))); + + /* + * Short read: we are at or past EOF, or we read a partial block at + * EOF. Normally this is an error; upper levels should never try to + * read a nonexistent block. However, if zero_damaged_pages is ON or + * we are InRecovery, we should instead return zeroes without + * complaining. This allows, for example, the case of trying to + * update a block that was later truncated away. + */ + if (zero_damaged_pages || InRecovery) + { + pfree(compress_buffer); + MemSet(buffer, 0, BLCKSZ); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not decompress block %u in file \"%s\": decompressed %d of %d bytes", + blocknum, FilePathName(v->mdfd_vfd_pcd), + nbytes, BLCKSZ))); + } + + pfree(compress_buffer); +} diff --git a/src/backend/storage/smgr/page_compression.c b/src/backend/storage/smgr/page_compression.c new file mode 100644 index 0000000000..98a5bf66fe --- /dev/null +++ b/src/backend/storage/smgr/page_compression.c @@ -0,0 +1,466 @@ +/* + * page_compression.c + * Routines for page compression + * + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/storage/smgr/page_compression.c + */ +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/fd.h" +#include "utils/timestamp.h" + +#include "storage/page_compression.h" +#include "storage/page_compression_impl.h" + +static BlockNumber getsegno(const char *path_pca); + +/* + * Returns segment number for specified pca file. + * + */ +static BlockNumber +getsegno(const char *path_pca) +{ + int i; + BlockNumber segno = (BlockNumber) 0; + int pathlen = strlen(path_pca); + const char *dot_pos = NULL; + + for (i = pathlen - 5; i >= 0; i--) + { + if (path_pca[i] < '0' && path_pca[i] > '9') + break; + + if (path_pca[i] == '.') + { + dot_pos = &path_pca[i]; + break; + } + } + + if (dot_pos) + segno = atol(dot_pos); + + return segno; +} + +/* + * errcode_for_dynamic_shared_memory --- add SQLSTATE error code to the current error + * + * The SQLSTATE code is chosen based on the saved errno value. We assume + * that the failing operation was some type of mmap access. + * + * NOTE: the primary error message string should generally include %m + * when this is used. + */ +int +errcode_for_dynamic_shared_memory(void) +{ + if (errno == EFBIG || errno == ENOMEM) + return errcode(ERRCODE_OUT_OF_MEMORY); + else + return errcode_for_file_access(); +} + +/* + * Check data consistency of pca file and if inconsistent try to repair it via + * the pcd file. + * + */ +void +check_and_repair_compress_address(PageCompressHeader *pcmap, uint16 chunk_size, uint8 algorithm, + const char *path, int fd_pcd, const char *path_pcd) +{ + int i, + unused_chunks; + BlockNumber blocknum, + segno; + uint32 nblocks, + allocated_chunks, + real_blocks; + BlockNumber *global_chunknos; + char last_recovery_start_time_buf[sizeof(TimestampTz)]; + char start_time_buf[sizeof(TimestampTz)]; + struct stat fst; + bool need_check = false; + int total_allocated_chunks; + PageCompressAddr *pcaddr; + + unused_chunks = 0; + real_blocks = 0; + + /* if the relation had been checked in this startup, skip */ + memcpy(last_recovery_start_time_buf, &pcmap->last_recovery_start_time, sizeof(TimestampTz)); + memcpy(start_time_buf, &PgStartTime, sizeof(TimestampTz)); + for (i = 0; i < sizeof(TimestampTz); i++) + { + if (start_time_buf[i] != last_recovery_start_time_buf[i]) + { + need_check = true; + break; + } + } + if (!need_check) + return; + + /* read count of allocated chunks */ + if (stat(path_pcd, &fst) < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path_pcd))); + } + total_allocated_chunks = fst.st_size / chunk_size; + + /* check header of page compression address file */ + if (pcmap->chunk_size != chunk_size || pcmap->algorithm != algorithm) + { + /* + * reinitialize header of pca file if it is invalid and + * zero_damaged_pages is on + */ + if (zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk_size %u or algorithm %u in header of file \"%s\", and reinitialized it.", + pcmap->chunk_size, pcmap->algorithm, path))); + + pcmap->algorithm = algorithm; + pcmap->chunk_size = chunk_size; + pg_atomic_write_u32(&pcmap->nblocks, 0); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk_size %u or algorithm %u in header of file \"%s\"", + pcmap->chunk_size, pcmap->algorithm, path))); + } + + segno = getsegno(path); + nblocks = pg_atomic_read_u32(&pcmap->nblocks); + allocated_chunks = pg_atomic_read_u32(&pcmap->allocated_chunks); + global_chunknos = palloc0(sizeof(BlockNumber) * MAX_PAGE_COMPRESS_CHUNK_NUMBER(chunk_size)); + + /* check address data of every pages */ + for (blocknum = segno * ((BlockNumber) RELSEG_SIZE); blocknum < (segno + 1) * ((BlockNumber) RELSEG_SIZE); blocknum++) + { + pcaddr = GetPageCompressAddr(pcmap, chunk_size, blocknum); + + /* skip when found first zero filled block after nblocks */ + if (blocknum % RELSEG_SIZE >= (BlockNumber) nblocks && pcaddr->allocated_chunks == 0) + break; + + /* check allocated_chunks for one page */ + if (pcaddr->allocated_chunks > MaxChunksPreCompressedPage(chunk_size)) + { + if (zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid allocated chunks %u of block %u in file \"%s\", and zero the block", + pcaddr->allocated_chunks, blocknum, path))); + + MemSet(pcaddr, 0, SizeOfPageCompressAddr(chunk_size)); + continue; + } + else + { + pfree(global_chunknos); + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid allocated chunks %u of block %u in file \"%s\"", + pcaddr->allocated_chunks, blocknum, path))); + } + } + + /* check nchunks for one page */ + if (pcaddr->nchunks > pcaddr->allocated_chunks) + { + if (zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("nchunks %u exceeds allocated_chunks %u of block %u in file \"%s\", and zero the block", + pcaddr->nchunks, pcaddr->allocated_chunks, blocknum, path))); + + MemSet(pcaddr, 0, SizeOfPageCompressAddr(chunk_size)); + continue; + } + else + { + pfree(global_chunknos); + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("nchunks %u exceeds allocated_chunks %u of block %u in file \"%s\"", + pcaddr->nchunks, pcaddr->allocated_chunks, blocknum, path))); + } + } + + /* check chunknos for one page */ + for (i = 0; i < pcaddr->allocated_chunks; i++) + { + /* check for invalid chunkno */ + if (pcaddr->chunknos[i] == 0 || pcaddr->chunknos[i] > MAX_PAGE_COMPRESS_CHUNK_NUMBER(chunk_size)) + { + if (zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk number %u of block %u in file \"%s\", and zero the block", + pcaddr->chunknos[i], blocknum, path))); + + MemSet(pcaddr, 0, SizeOfPageCompressAddr(chunk_size)); + break; + } + else + { + pfree(global_chunknos); + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid chunk number %u of block %u in file \"%s\"", + pcaddr->chunknos[i], blocknum, path))); + } + } + + /* check for duplicate chunkno */ + if (global_chunknos[pcaddr->chunknos[i] - 1] != 0) + { + if (zero_damaged_pages) + { + int j; + PageCompressAddr *pcaddr_dup = GetPageCompressAddr(pcmap, chunk_size, global_chunknos[pcaddr->chunknos[i] - 1] - 1); + + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("chunk number %u of block %u duplicate with block %u in file \"%s\", and zero the two blocks", + pcaddr->chunknos[i], blocknum, global_chunknos[pcaddr->chunknos[i] - 1] - 1, path))); + + /* + * clean all chunk allocation infomation in the two + * duplicated pages + */ + for (j = 0; j < pcaddr_dup->allocated_chunks; j++) + { + global_chunknos[pcaddr_dup->chunknos[j] - 1] = 0; + } + MemSet(pcaddr, 0, SizeOfPageCompressAddr(chunk_size)); + MemSet(pcaddr_dup, 0, SizeOfPageCompressAddr(chunk_size)); + continue; + } + else + { + pfree(global_chunknos); + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("chunk number %u of block %u duplicate with block %u in file \"%s\"", + pcaddr->chunknos[i], blocknum, global_chunknos[pcaddr->chunknos[i] - 1] - 1, path))); + } + } + + /* + * check if chunkno exceeds size of the pcd file, exceeded chunkno + * will be clean up + */ + if (pcaddr->chunknos[i] > total_allocated_chunks) + { + pcaddr->allocated_chunks = i; + break; + } + } + + /* + * clean chunknos beyond allocated_chunks for one page and this is a + * normal scenario + */ + for (i = pcaddr->allocated_chunks; i < MaxChunksPreCompressedPage(chunk_size); i++) + { + if (pcaddr->chunknos[i] != 0) + { + ereport(DEBUG1, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("clear chunk %u at position %d which beyond allocated_chunks %u of block %u in file \"%s\"", + pcaddr->chunknos[i], i + 1, pcaddr->allocated_chunks, blocknum, path))); + pcaddr->chunknos[i] = 0; + } + } + + for (i = 0; i < pcaddr->allocated_chunks; i++) + { + global_chunknos[pcaddr->chunknos[i] - 1] = blocknum + 1; + } + + if (pcaddr->nchunks > 0) + real_blocks = blocknum + 1; + } + + /* clean the rest of address data in pca file */ + for (; blocknum < (segno + 1) * ((BlockNumber) RELSEG_SIZE); blocknum++) + { + char buf[256], + *p; + bool need_clean = false; + + pcaddr = GetPageCompressAddr(pcmap, chunk_size, blocknum); + if (pcaddr->allocated_chunks != 0 || pcaddr->nchunks != 0) + need_clean = true; + + /* clean address data and output content of the address */ + MemSet(buf, 0, sizeof(buf)); + p = buf; + + for (i = 0; i < MaxChunksPreCompressedPage(chunk_size); i++) + { + if (pcaddr->chunknos[i]) + { + need_clean = true; + if (i == 0) + snprintf(p, (sizeof(buf) - (p - buf)), "%u", pcaddr->chunknos[i]); + else + snprintf(p, (sizeof(buf) - (p - buf)), ",%u", pcaddr->chunknos[i]); + p += strlen(p); + } + } + + if (need_clean) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("clean unused address data of block %u in file \"%s\", old allocated_chunks/nchunks/chunknos: %u/%u/{%s}", + blocknum, path, pcaddr->allocated_chunks, pcaddr->nchunks, buf))); + + MemSet(pcaddr, 0, SizeOfPageCompressAddr(chunk_size)); + } + } + + /* fix unused chunks via compression data file */ + for (i = 0; i < total_allocated_chunks; i++) + { + int rc; + char buf[SizeOfPageCompressChunkHeaderData + SizeOfPageCompressDataHeaderData]; + int nbytes = SizeOfPageCompressChunkHeaderData + SizeOfPageCompressDataHeaderData; + PageCompressChunk *pcchunk = (PageCompressChunk *) buf; + PageCompressData *pcdata = (PageCompressData *) (buf + SizeOfPageCompressChunkHeaderData); + + if (global_chunknos[i] == 0) + { + rc = pg_pread(fd_pcd, buf, nbytes, i * chunk_size); + + if (rc < 0) + { + pfree(global_chunknos); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path_pcd))); + } + + if (rc >= 0 && rc != nbytes) + { + pfree(global_chunknos); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": read %d of %d", + path_pcd, rc, nbytes))); + } + + if (pcchunk->chunkseq > 0 && pcchunk->chunkseq <= MaxChunksPreCompressedPage(chunk_size)) + { + pcaddr = GetPageCompressAddr(pcmap, chunk_size, pcchunk->blockno); + if (pcaddr->chunknos[pcchunk->chunkseq - 1] == 0 && + pcaddr->allocated_chunks + 1 == pcchunk->chunkseq) + { + global_chunknos[i] = pcchunk->blockno + 1; + pcaddr->chunknos[pcchunk->chunkseq - 1] = i + 1; + pcaddr->allocated_chunks++; + + ereport(LOG, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("fix allocation of chunk for block %u in file \"%s\", seq: %u, chunkno: %u", + pcchunk->blockno, path, pcchunk->chunkseq, i + 1))); + + /* + * After fixing page's nchunks, may cause nchunks to be + * larger than allocated_chunks. This will be fixed when + * replaying WAL record with FPI. + */ + if (pcchunk->withdata && pcchunk->chunkseq == 1) + { + int nchunks = 0; + + nchunks = NeedPageCompressChunksToStoreData(chunk_size, SizeOfPageCompressDataHeaderData + pcdata->size); + + if (nchunks <= MaxChunksPreCompressedPage(chunk_size) && pcaddr->nchunks != nchunks) + { + ereport(LOG, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("fix nchunks of block %u in file \"%s\", old: %u, new: %u", + pcchunk->blockno, path, pcaddr->nchunks, nchunks))); + + pcaddr->nchunks = nchunks; + if ((pcchunk->blockno % RELSEG_SIZE) + 1 > real_blocks) + real_blocks = (pcchunk->blockno % RELSEG_SIZE) + 1; + } + } + } + } + } + } + + /* check for holes in allocated chunks */ + for (i = 0; i < total_allocated_chunks; i++) + if (global_chunknos[i] == 0) + unused_chunks++; + + if (unused_chunks > 0) + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("there are %u chunks of total allocated chunks %u can not be use in file \"%s\"", + unused_chunks, total_allocated_chunks, path), + errhint("You may need to run VACUMM FULL to optimize space allocation, or run REINDEX if it is an index."))); + + /* + * update nblocks in header of page compression address file. Because + * mdextend_pc delays space allocation for new pages, it is normal for the + * number of blocks recorded in the compression address file to be greater + * than the maximum block number in the compression data file. So scenes + * with "real_blocks < nblocks" do not need to be fixed. + */ + if (real_blocks > nblocks) + { + pg_atomic_write_u32(&pcmap->nblocks, real_blocks); + pg_atomic_write_u32(&pcmap->last_synced_nblocks, real_blocks); + + ereport(LOG, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("fix nblocks in header of file \"%s\", old: %u, new: %u", + path, nblocks, real_blocks))); + } + + /* update allocated_chunks in header of compression address file */ + if (allocated_chunks != total_allocated_chunks) + { + pg_atomic_write_u32(&pcmap->allocated_chunks, total_allocated_chunks); + pg_atomic_write_u32(&pcmap->last_synced_allocated_chunks, total_allocated_chunks); + + ereport(LOG, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("fix allocated_chunks in header of file \"%s\", old: %u, new: %u", + path, allocated_chunks, total_allocated_chunks))); + } + + pfree(global_chunknos); + + if (pc_msync(pcmap) != 0) + ereport(data_sync_elevel(ERROR), + (errcode_for_dynamic_shared_memory(), + errmsg("could not msync file \"%s\": %m", + path))); + + pcmap->last_recovery_start_time = PgStartTime; +} diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index da57a93034..35a2d87a52 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -744,6 +744,12 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_WAL_WRITE: event_name = "WALWrite"; break; + case WAIT_EVENT_COMPRESS_ADDRESS_FILE_FLUSH: + event_name = "CompressAddressFileFlush"; + break; + case WAIT_EVENT_COMPRESS_ADDRESS_FILE_SYNC: + event_name = "CompressAddressFileSync"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c index 34efa121b4..0198012643 100644 --- a/src/backend/utils/adt/dbsize.c +++ b/src/backend/utils/adt/dbsize.c @@ -314,16 +314,58 @@ calculate_relation_size(RelFileLocator *rfn, BackendId backend, ForkNumber forkn snprintf(pathname, MAXPGPATH, "%s.%u", relationpath, segcount); - if (stat(pathname, &fst) < 0) + if (rfn->compressOpt.algorithm != PAGE_COMPRESSION_NONE && + forknum == MAIN_FORKNUM) { - if (errno == ENOENT) - break; - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", pathname))); + char *pcfile_segpath; + + /* calculate size of page compression address file */ + pcfile_segpath = psprintf("%s_pca", pathname); + if (stat(pcfile_segpath, &fst) < 0) + { + if (errno == ENOENT) + { + pfree(pcfile_segpath); + break; + } + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", pcfile_segpath))); + } + totalsize += fst.st_size; + pfree(pcfile_segpath); + + /* calculate size of page compression data file */ + pcfile_segpath = psprintf("%s_pcd", pathname); + if (stat(pcfile_segpath, &fst) < 0) + { + if (errno == ENOENT) + { + pfree(pcfile_segpath); + break; + } + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", pcfile_segpath))); + } + totalsize += fst.st_size; + pfree(pcfile_segpath); + } + else + { + if (stat(pathname, &fst) < 0) + { + if (errno == ENOENT) + break; + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", pathname))); + } + totalsize += fst.st_size; } - totalsize += fst.st_size; } return totalsize; diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index bdb771d278..6b5aafb31e 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -30,6 +30,10 @@ #include #include +#include "access/hash.h" +#include "access/gin_private.h" +#include "access/gist_private.h" +#include "access/spgist_private.h" #include "access/htup_details.h" #include "access/multixact.h" #include "access/nbtree.h" @@ -76,6 +80,7 @@ #include "rewrite/rewriteDefine.h" #include "rewrite/rowsecurity.h" #include "storage/lmgr.h" +#include "storage/page_compression.h" #include "storage/smgr.h" #include "utils/array.h" #include "utils/builtins.h" @@ -316,6 +321,7 @@ static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid, StrategyNumber numSupport); static void RelationCacheInitFileRemoveInDir(const char *tblspcpath); static void unlink_initfile(const char *initfilename, int elevel); +static void SetupPageCompressForRelation(Relation relation, PageCompressOpts *compress_options); /* @@ -1381,6 +1387,40 @@ RelationInitPhysicalAddr(Relation relation) else relation->rd_firstRelfilelocatorSubid = InvalidSubTransactionId; } + + /* setup page compression option */ + if (relation->rd_options) + { + switch (relation->rd_rel->relam) + { + case HEAP_TABLE_AM_OID: + SetupPageCompressForRelation(relation, &((StdRdOptions *) (relation->rd_options))->compress); + break; + + case BTREE_AM_OID: + SetupPageCompressForRelation(relation, &((BTOptions *) (relation->rd_options))->compress); + break; + + case HASH_AM_OID: + SetupPageCompressForRelation(relation, &((HashOptions *) (relation->rd_options))->compress); + break; + + case GIN_AM_OID: + SetupPageCompressForRelation(relation, &((GinOptions *) (relation->rd_options))->compress); + break; + + case GIST_AM_OID: + SetupPageCompressForRelation(relation, &((GiSTOptions *) (relation->rd_options))->compress); + break; + + case SPGIST_AM_OID: + SetupPageCompressForRelation(relation, &((SpGistOptions *) (relation->rd_options))->compress); + break; + + default: + break; + } + } } /* @@ -3464,7 +3504,8 @@ RelationBuildLocalRelation(const char *relname, bool shared_relation, bool mapped_relation, char relpersistence, - char relkind) + char relkind, + Datum reloptions) { Relation rel; MemoryContext oldcxt; @@ -3641,6 +3682,17 @@ RelationBuildLocalRelation(const char *relname, RelationInitPhysicalAddr(rel); + /* setup page compression option */ + if (reloptions && + (relkind == RELKIND_RELATION || + relkind == RELKIND_MATVIEW || + relkind == RELKIND_INDEX)) + { + StdRdOptions *options = (StdRdOptions *) default_reloptions(reloptions, false, RELOPT_KIND_HEAP); + + SetupPageCompressForRelation(rel, &options->compress); + } + rel->rd_rel->relam = accessmtd; /* @@ -6720,3 +6772,60 @@ unlink_initfile(const char *initfilename, int elevel) initfilename))); } } + +/* setup page compression options for relation */ +static void +SetupPageCompressForRelation(Relation relation, PageCompressOpts *compress_options) +{ + if (compress_options->compresstype == PAGE_COMPRESSION_NONE) + { + relation->rd_locator.compressOpt.algorithm = PAGE_COMPRESSION_NONE; + relation->rd_locator.compressOpt.level = 0; + relation->rd_locator.compressOpt.chunks_pre_block = 0; + relation->rd_locator.compressOpt.prealloc_chunks = 0; + } + else + { + if (!SUPPORT_PAGE_COMPRESSION) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("page compression is not supported by this build"))); + +#ifndef USE_LZ4 + if (compress_options->compresstype == PAGE_COMPRESSION_LZ4) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("lz4 compression is not supported by this build"))); +#endif + +#ifndef USE_ZSTD + if (compress_options->compresstype == PAGE_COMPRESSION_ZSTD) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("zstd compression is not supported by this build"))); +#endif + + relation->rd_locator.compressOpt.algorithm = compress_options->compresstype; + relation->rd_locator.compressOpt.level = compress_options->compresslevel; + + if (compress_options->compress_chunk_size != BLCKSZ / 2 && + compress_options->compress_chunk_size != BLCKSZ / 4 && + compress_options->compress_chunk_size != BLCKSZ / 8 && + compress_options->compress_chunk_size != BLCKSZ / 16) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("invalid compress_chunk_size %d, must be one of %d, %d, %d or %d", + compress_options->compress_chunk_size, + BLCKSZ / 16, BLCKSZ / 8, BLCKSZ / 4, BLCKSZ / 2), + errtable(relation))); + } + + relation->rd_locator.compressOpt.chunks_pre_block = BLCKSZ / compress_options->compress_chunk_size; + + if (compress_options->compress_prealloc_chunks >= BLCKSZ / compress_options->compress_chunk_size) + relation->rd_locator.compressOpt.prealloc_chunks = (uint8) (BLCKSZ / compress_options->compress_chunk_size - 1); + else + relation->rd_locator.compressOpt.prealloc_chunks = (uint8) (compress_options->compress_prealloc_chunks); + } +} diff --git a/src/include/access/gin_private.h b/src/include/access/gin_private.h index 2935d2f353..3b0189dfcb 100644 --- a/src/include/access/gin_private.h +++ b/src/include/access/gin_private.h @@ -18,6 +18,7 @@ #include "fmgr.h" #include "lib/rbtree.h" #include "storage/bufmgr.h" +#include "utils/rel.h" /* * Storage type for GIN's reloptions @@ -27,6 +28,7 @@ typedef struct GinOptions int32 vl_len_; /* varlena header (do not touch directly!) */ bool useFastUpdate; /* use fast updates? */ int pendingListCleanupSize; /* maximum size of pending list */ + PageCompressOpts compress; /* page compression related options */ } GinOptions; #define GIN_DEFAULT_USE_FASTUPDATE true diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 240131ef71..b831e69c87 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -21,6 +21,7 @@ #include "storage/bufmgr.h" #include "storage/buffile.h" #include "utils/hsearch.h" +#include "utils/rel.h" #include "access/genam.h" /* @@ -396,6 +397,7 @@ typedef struct GiSTOptions int32 vl_len_; /* varlena header (do not touch directly!) */ int fillfactor; /* page fill factor in percent (0..100) */ GistOptBufferingMode buffering_mode; /* buffering build mode */ + PageCompressOpts compress; /* page compression related options */ } GiSTOptions; /* gist.c */ diff --git a/src/include/access/hash.h b/src/include/access/hash.h index da372841c4..e772c8a5ca 100644 --- a/src/include/access/hash.h +++ b/src/include/access/hash.h @@ -26,6 +26,7 @@ #include "storage/bufmgr.h" #include "storage/lockdefs.h" #include "utils/hsearch.h" +#include "utils/rel.h" #include "utils/relcache.h" /* @@ -270,6 +271,7 @@ typedef struct HashOptions { int32 varlena_header_; /* varlena header (do not touch directly!) */ int fillfactor; /* page fill factor in percent (0..100) */ + PageCompressOpts compress; /* page compression related options */ } HashOptions; #define HashGetFillFactor(relation) \ diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 93f8267b48..32f6f34534 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -24,6 +24,7 @@ #include "lib/stringinfo.h" #include "storage/bufmgr.h" #include "storage/shm_toc.h" +#include "utils/rel.h" /* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */ typedef uint16 BTCycleId; @@ -1090,6 +1091,7 @@ typedef struct BTOptions int fillfactor; /* page fill factor in percent (0..100) */ float8 vacuum_cleanup_index_scale_factor; /* deprecated */ bool deduplicate_items; /* Try to deduplicate items? */ + PageCompressOpts compress; /* page compression related options */ } BTOptions; #define BTGetFillFactor(relation) \ diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h index eb56b1c6b8..e585244862 100644 --- a/src/include/access/spgist_private.h +++ b/src/include/access/spgist_private.h @@ -20,6 +20,7 @@ #include "nodes/tidbitmap.h" #include "storage/buf.h" #include "utils/geo_decls.h" +#include "utils/rel.h" #include "utils/relcache.h" @@ -27,6 +28,7 @@ typedef struct SpGistOptions { int32 varlena_header_; /* varlena header (do not touch directly!) */ int fillfactor; /* page fill factor in percent (0..100) */ + PageCompressOpts compress; /* page compression related options */ } SpGistOptions; #define SpGistGetFillFactor(relation) \ diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index 5774c46471..401ce71b34 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -57,6 +57,7 @@ extern Relation heap_create(const char *relname, char relpersistence, bool shared_relation, bool mapped_relation, + Datum reloptions, bool allow_system_table_mods, TransactionId *relfrozenxid, MultiXactId *relminmxid, diff --git a/src/include/storage/checksum.h b/src/include/storage/checksum.h index 1904fabd5a..fce330da0e 100644 --- a/src/include/storage/checksum.h +++ b/src/include/storage/checksum.h @@ -21,4 +21,10 @@ */ extern uint16 pg_checksum_page(char *page, BlockNumber blkno); +/* + * Compute the checksum for a chunk of compressed page. The chunk must be + * aligned on a 4-byte boundary. + */ +extern uint16 pg_checksum_compress_chunk(char *chunk, int chunk_size, uint32 chunckno); + #endif /* CHECKSUM_H */ diff --git a/src/include/storage/checksum_impl.h b/src/include/storage/checksum_impl.h index d2eb75f769..94c4434757 100644 --- a/src/include/storage/checksum_impl.h +++ b/src/include/storage/checksum_impl.h @@ -101,6 +101,7 @@ */ #include "storage/bufpage.h" +#include "storage/page_compression.h" /* number of checksums to calculate in parallel */ #define N_SUMS 32 @@ -139,11 +140,11 @@ do { \ } while (0) /* - * Block checksum algorithm. The page must be adequately aligned + * Data checksum algorithm. The data must be adequately aligned * (at least on 4-byte boundary). */ static uint32 -pg_checksum_block(const PGChecksummablePage *page) +pg_checksum_data(const char *data, int size) { uint32 sums[N_SUMS]; uint32 result = 0; @@ -151,15 +152,16 @@ pg_checksum_block(const PGChecksummablePage *page) j; /* ensure that the size is compatible with the algorithm */ - Assert(sizeof(PGChecksummablePage) == BLCKSZ); + Assert(size > 0); + Assert(chunk_size == (size / (sizeof(uint32) * N_SUMS)) * (sizeof(uint32) * N_SUMS)); /* initialize partial checksums to their corresponding offsets */ memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets)); /* main checksum calculation */ - for (i = 0; i < (uint32) (BLCKSZ / (sizeof(uint32) * N_SUMS)); i++) + for (i = 0; i < (uint32) (size / (sizeof(uint32) * N_SUMS)); i++) for (j = 0; j < N_SUMS; j++) - CHECKSUM_COMP(sums[j], page->data[i][j]); + CHECKSUM_COMP(sums[j], ((uint32 *) data)[i * N_SUMS + j]); /* finally add in two rounds of zeroes for additional mixing */ for (i = 0; i < 2; i++) @@ -201,7 +203,7 @@ pg_checksum_page(char *page, BlockNumber blkno) */ save_checksum = cpage->phdr.pd_checksum; cpage->phdr.pd_checksum = 0; - checksum = pg_checksum_block(cpage); + checksum = pg_checksum_data(page, BLCKSZ); cpage->phdr.pd_checksum = save_checksum; /* Mix in the block number to detect transposed pages */ @@ -213,3 +215,42 @@ pg_checksum_page(char *page, BlockNumber blkno) */ return (uint16) ((checksum % 65535) + 1); } + + +/* + * Compute the checksum for a chunk of compressed page. + * + * The chunk must be adequately aligned (at least on a 4-byte boundary). + * Beware also that the checksum field of the page is transiently zeroed. + * + * The checksum includes the chunck number (to detect the case where a chunk + * is somehow moved to a different location), the chunk header (excluding the + * checksum itself), and the chunk data. + */ +uint16 +pg_checksum_compress_chunk(char *chunk, int chunk_size, uint32 chunckno) +{ + PageCompressChunk *pcchunk = (PageCompressChunk *) chunk; + uint16 save_checksum; + uint32 checksum; + + /* + * Save pd_checksum and temporarily set it to zero, so that the checksum + * calculation isn't affected by the old checksum stored on the page. + * Restore it after, because actually updating the checksum is NOT part of + * the API of this function. + */ + save_checksum = pcchunk->checksum; + pcchunk->checksum = 0; + checksum = pg_checksum_data(chunk, chunk_size); + pcchunk->checksum = save_checksum; + + /* Mix in the chunk number to detect transposed pages */ + checksum ^= chunckno; + + /* + * Reduce to a uint16 (to fit in the checksum field) with an offset of + * one. That avoids checksums of zero, which seems like a good idea. + */ + return (uint16) ((checksum % 65535) + 1); +} diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 2b4a8e0ffe..a6e73433e9 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -156,6 +156,10 @@ extern void ReleaseExternalFD(void); /* Make a directory with default permissions */ extern int MakePGDirectory(const char *directoryName); +/* Page compression support routines */ +extern void SetupPageCompressMemoryMap(File file, File file_pcd, int chunk_size, uint8 algorithm); +extern void *GetPageCompressMemoryMap(File file, int chunk_size); + /* Miscellaneous support routines */ extern void InitFileAccess(void); extern void InitTemporaryFileAccess(void); diff --git a/src/include/storage/page_compression.h b/src/include/storage/page_compression.h new file mode 100644 index 0000000000..2d36bd8130 --- /dev/null +++ b/src/include/storage/page_compression.h @@ -0,0 +1,203 @@ +/* + * page_compression.h + * internal declarations for page compression + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/storage/page_compression.h + */ + +#ifndef PAGE_COMPRESSION_H +#define PAGE_COMPRESSION_H + +#include + +#include "storage/bufpage.h" +#include "datatype/timestamp.h" + +#ifdef FRONTEND +typedef uint32 pg_atomic_uint32; +#else +#include "port/atomics.h" + +/* The page compression feature relies on native atomic operation support. + * On platforms that do not support native atomic operations, the members + * of pg_atomic_uint32 contain semaphore objects, which will affect the + * persistence of compressed page address files. + */ +#define SUPPORT_PAGE_COMPRESSION (sizeof(pg_atomic_uint32) == sizeof(uint32)) +#endif + +/* In order to avoid the inconsistency of address metadata when the server + * is crash, it is necessary to prevent the address metadata of one data block + * from crossing two storage device blocks. The block size of ordinary storage + * devices is a multiple of 512, so 512 is used as the block size of the + * page compression address file. + */ +#define COMPRESS_ADDR_BLCKSZ 512 + +typedef uint32 pc_chunk_number_t; + +/* Compression algorithms for page compression */ +typedef enum PageCompression +{ + PAGE_COMPRESSION_NONE = 0, + PAGE_COMPRESSION_PGLZ, + PAGE_COMPRESSION_LZ4, + PAGE_COMPRESSION_ZSTD +} PageCompression; + +/* + * Layout of files for page compression: + * + * 1. page compression address file(_pca) + * - PageCompressHeader + * - PageCompressAddr[0] + * - PageCompressAddr[1] + * - ... + * + * 2. page compression data file(_pcd) + * - PageCompressChunk[0] + * - PageCompressData(in whole or in part) + * - PageCompressChunk[1] + * - PageCompressData(in whole or in part) + * - ... + */ + +typedef struct PageCompressHeader +{ + pg_atomic_uint32 nblocks; /* number of total blocks in this segment */ + pg_atomic_uint32 allocated_chunks; /* number of total allocated chunks in + * data area */ + uint16 chunk_size; /* size of chunk, must be 1/2, 1/4, 1/8 or + * 1/16 of BLCKSZ */ + uint8 algorithm; /* compress algorithm, 1=pglz, 2=lz4 3=zstd */ + pg_atomic_uint32 last_synced_nblocks; /* last synced nblocks */ + pg_atomic_uint32 last_synced_allocated_chunks; /* last synced + * allocated_chunks */ + TimestampTz last_recovery_start_time; /* postmaster start time of last + * recovery */ +} PageCompressHeader; + +typedef struct PageCompressAddr +{ + volatile uint8 nchunks; /* number of chunks for this block */ + volatile uint8 allocated_chunks; /* number of allocated chunks for this + * block */ + + /* + * variable-length fields, 1 based chunk number array for this block, size + * of the array is "BLCKSZ/chunk_size + 1" + */ + pc_chunk_number_t chunknos[FLEXIBLE_ARRAY_MEMBER]; +} PageCompressAddr; + +typedef struct PageCompressChunk +{ + uint32 blockno; /* block number of the block stored in this + * chunk */ + uint8 chunkseq; /* index of this chunk, from 1 to + * allocated_chunks */ + uint8 withdata:1, /* if has data */ + unused:7; + uint16 checksum; /* checksum of this chunk */ + char data[FLEXIBLE_ARRAY_MEMBER]; /* 1/nchunks of + * PageCompressData , and at + * least 4-byte aligned */ +} PageCompressChunk; + +typedef struct PageCompressData +{ + char page_header[SizeOfPageHeaderData]; /* page header */ + uint16 size; /* size of data */ + uint16 unused; + char data[FLEXIBLE_ARRAY_MEMBER]; /* data of compressed page, + * except for the page header */ +} PageCompressData; + +/* + * Use this, not "char buf[BLCKSZ + BLCKSZ/2]", to declare a field or local + * variable holding a buffer for all chunks for one compressed page, if that buffer + * need to be aligned, such as when calculating checksum. (In some places, we use + * this to declare buffers even though we only pass them to read() and + * write(), because copying to/from aligned buffers is usually faster than + * using unaligned buffers.) We include both "double" and "int64" in the + * union to ensure that the compiler knows the value must be MAXALIGN'ed + * (cf. configure's computation of MAXIMUM_ALIGNOF). + */ +typedef union PGAlignedCompressChunks +{ + char data[BLCKSZ + BLCKSZ / 2]; + double force_align_d; + int64 force_align_i64; +} PGAlignedCompressChunks; + +#define IsValidPageCompressChunkSize(chunk_size) \ + ((chunk_size) == BLCKSZ / 2 || \ + (chunk_size) == BLCKSZ / 4 || \ + (chunk_size) == BLCKSZ / 8 || \ + (chunk_size) == BLCKSZ / 16) + +#define SizeOfPageCompressHeaderData sizeof(PageCompressHeader) +#define SizeOfPageCompressAddrHeaderData offsetof(PageCompressAddr, chunknos) +#define SizeOfPageCompressChunkHeaderData offsetof(PageCompressChunk, data) +#define SizeOfPageCompressDataHeaderData offsetof(PageCompressData, data) + +#define MaxChunksPreCompressedPage(chunk_size) \ + (BLCKSZ / (chunk_size) + 1) + +#define SizeOfPageCompressAddr(chunk_size) \ + (SizeOfPageCompressAddrHeaderData + sizeof(pc_chunk_number_t) * MaxChunksPreCompressedPage(chunk_size)) + +#define NumberOfPageCompressAddrPerBlock(chunk_size) \ + (COMPRESS_ADDR_BLCKSZ / SizeOfPageCompressAddr(chunk_size)) + +#define OffsetOfPageCompressAddr(chunk_size, blockno) \ + (COMPRESS_ADDR_BLCKSZ * (1 + (blockno) / NumberOfPageCompressAddrPerBlock(chunk_size)) \ + + SizeOfPageCompressAddr(chunk_size) * ((blockno) % NumberOfPageCompressAddrPerBlock(chunk_size))) + +#define GetPageCompressAddr(pcbuffer, chunk_size, blockno) \ + (PageCompressAddr *)((char *)(pcbuffer) + OffsetOfPageCompressAddr((chunk_size),(blockno) % RELSEG_SIZE)) + +#define SizeofPageCompressAddrFile(chunk_size) \ + OffsetOfPageCompressAddr((chunk_size), RELSEG_SIZE) + +#define OffsetOfPageCompressChunk(chunk_size, chunkno) \ + ((chunk_size) * ((chunkno) - 1)) + +#define StoreCapacityPerPageCompressChunk(chunk_size) \ + (chunk_size - SizeOfPageCompressChunkHeaderData) + +#define NeedPageCompressChunksToStoreData(chunk_size, data_size) \ + ((data_size + StoreCapacityPerPageCompressChunk(chunk_size) - 1) / StoreCapacityPerPageCompressChunk(chunk_size)) + + +#define MAX_PAGE_COMPRESS_ADDRESS_FILE_SIZE SizeofPageCompressAddrFile(BLCKSZ / 16) + +#define MAX_PAGE_COMPRESS_CHUNK_NUMBER(chunk_size) \ + (RELSEG_SIZE * MaxChunksPreCompressedPage(chunk_size)) + +/* + * After allocated 32MB data space, sync the page compression address file. + */ +#define COMPRESS_ADDRESS_SYNC_THRESHOLD(chunk_size) (32 * 1024 * 1024 / (chunk_size)) + +/* Compress function */ +extern int compress_page_buffer_bound(uint8 algorithm); +extern int compress_page(const char *src, char *dst, int dst_size, uint8 algorithm, int8 level); +extern int decompress_page(const char *src, char *dst, uint8 algorithm); + +/* Memory mapping function */ +extern PageCompressHeader *pc_mmap(int fd, int chunk_size, bool readonly); +extern int pc_munmap(PageCompressHeader *map); +extern int pc_msync(PageCompressHeader *map); + + +#ifndef FRONTEND +int errcode_for_dynamic_shared_memory(void); +void check_and_repair_compress_address(PageCompressHeader *pcmap, uint16 chunk_size, uint8 algorithm, + const char *path, int fd_pcd, const char *path_pcd); +#endif + +#endif /* PAGE_COMPRESSION_H */ diff --git a/src/include/storage/page_compression_impl.h b/src/include/storage/page_compression_impl.h new file mode 100644 index 0000000000..2b647d5bbd --- /dev/null +++ b/src/include/storage/page_compression_impl.h @@ -0,0 +1,294 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "common/pg_lzcompress.h" + +#ifdef USE_LZ4 +#include +#endif + +#ifdef USE_ZSTD +#include + +#define DEFAULT_ZSTD_COMPRESSION_LEVEL 1 +#define MIN_ZSTD_COMPRESSION_LEVEL ZSTD_minCLevel() +#define MAX_ZSTD_COMPRESSION_LEVEL ZSTD_maxCLevel() +#endif + + +/** + * compress_page_buffer_bound() + * Get the destination buffer boundary to compress one page. + * + * Return needed destination buffer size for compress one page or + * -1 for unrecognized compression algorithm + * + */ +int +compress_page_buffer_bound(uint8 algorithm) +{ + int result = 0; + + switch (algorithm) + { + case PAGE_COMPRESSION_PGLZ: + result = PGLZ_MAX_OUTPUT(BLCKSZ - SizeOfPageHeaderData); + break; +#ifdef USE_LZ4 + case PAGE_COMPRESSION_LZ4: + result = LZ4_compressBound(BLCKSZ - SizeOfPageHeaderData); + break; +#endif +#ifdef USE_ZSTD + case PAGE_COMPRESSION_ZSTD: + result = ZSTD_compressBound(BLCKSZ - SizeOfPageHeaderData); + break; +#endif + default: + return -1; + break; + } + if (result < 0) + return -1; + + return result + SizeOfPageCompressDataHeaderData; +} + +/** + * compress_page() -- Compress one page. + * + * Only the parts other than the page header will be compressed. + * This function returns the size of compressed data or + * -1 for compression fail + * -2 for unrecognized compression algorithm +* note:The size of dst must be greater than or equal to "BLCKSZ + chunck_size". + */ +int +compress_page(const char *src, char *dst, int dst_size, uint8 algorithm, int8 level) +{ + int compressed_size; + PageCompressData *pcdata; + + pcdata = (PageCompressData *) dst; + + switch (algorithm) + { + case PAGE_COMPRESSION_PGLZ: + compressed_size = pglz_compress(src + SizeOfPageHeaderData, + BLCKSZ - SizeOfPageHeaderData, + pcdata->data, + PGLZ_strategy_always); + break; + +#ifdef USE_LZ4 + case PAGE_COMPRESSION_LZ4: + { + compressed_size = LZ4_compress_default(src + SizeOfPageHeaderData, + pcdata->data, + BLCKSZ - SizeOfPageHeaderData, + dst_size); + if (compressed_size <= 0) + { + return -1; + } + break; + } +#endif + +#ifdef USE_ZSTD + case PAGE_COMPRESSION_ZSTD: + { + int real_level = level; + + if (level == 0) + real_level = DEFAULT_ZSTD_COMPRESSION_LEVEL; + else if (level < MIN_ZSTD_COMPRESSION_LEVEL) + real_level = MIN_ZSTD_COMPRESSION_LEVEL; + else if (level > MAX_ZSTD_COMPRESSION_LEVEL) + real_level = MAX_ZSTD_COMPRESSION_LEVEL; + + compressed_size = ZSTD_compress(pcdata->data, + dst_size, + src + SizeOfPageHeaderData, + BLCKSZ - SizeOfPageHeaderData, + real_level); + + if (ZSTD_isError(compressed_size)) + { + return -1; + } + break; + } +#endif + default: + return -2; + break; + } + + if (compressed_size < 0) + return -1; + + memcpy(pcdata->page_header, src, SizeOfPageHeaderData); + pcdata->size = compressed_size; + + return SizeOfPageCompressDataHeaderData + compressed_size; +} + +/** + * decompress_page() -- Decompress one compressed page. + * Returns size of decompressed page which should be BLCKSZ or + * -1 for decompress error + * -2 for unrecognized compression algorithm + * + * note:The size of dst must be greater than or equal to BLCKSZ. + */ +int +decompress_page(const char *src, char *dst, uint8 algorithm) +{ + int decompressed_size; + PageCompressData *pcdata; + + pcdata = (PageCompressData *) src; + + memcpy(dst, src, SizeOfPageHeaderData); + + switch (algorithm) + { + case PAGE_COMPRESSION_PGLZ: + decompressed_size = pglz_decompress(pcdata->data, + pcdata->size, + dst + SizeOfPageHeaderData, + BLCKSZ - SizeOfPageHeaderData, + false); + if (decompressed_size < 0) + { + return -1; + } + break; + +#ifdef USE_LZ4 + case PAGE_COMPRESSION_LZ4: + decompressed_size = LZ4_decompress_safe(pcdata->data, + dst + SizeOfPageHeaderData, + pcdata->size, + BLCKSZ - SizeOfPageHeaderData); + + if (decompressed_size < 0) + { + return -1; + } + + break; +#endif + +#ifdef USE_ZSTD + case PAGE_COMPRESSION_ZSTD: + decompressed_size = ZSTD_decompress(dst + SizeOfPageHeaderData, + BLCKSZ - SizeOfPageHeaderData, + pcdata->data, + pcdata->size); + + if (ZSTD_isError(decompressed_size)) + { + return -1; + } + + break; +#endif + + default: + return -2; + break; + + } + + return SizeOfPageHeaderData + decompressed_size; +} + + +/** + * pc_mmap() -- create memory map for address file of comressed relation. + * + */ +PageCompressHeader * +pc_mmap(int fd, int chunk_size, bool readonly) +{ + PageCompressHeader *map; + int file_size, + pc_memory_map_size; + + pc_memory_map_size = SizeofPageCompressAddrFile(chunk_size); + + file_size = lseek(fd, 0, SEEK_END); + if (file_size != pc_memory_map_size) + { + if (ftruncate(fd, pc_memory_map_size) != 0) + return (PageCompressHeader *) MAP_FAILED; + } + +#ifdef WIN32 + { + HANDLE mh; + + if (readonly) + mh = CreateSnapshotMapping((HANDLE) _get_osfhandle(fd), NULL, PAGE_READONLY, + 0, (DWORD) pc_memory_map_size, NULL); + else + mh = CreateSnapshotMapping((HANDLE) _get_osfhandle(fd), NULL, PAGE_READWRITE, + 0, (DWORD) pc_memory_map_size, NULL); + + if (mh == NULL) + return (PageCompressHeader *) MAP_FAILED; + + map = (PageCompressHeader *) MapViewOfFile(mh, FILE_MAP_ALL_ACCESS, 0, 0, 0); + CloseHandle(mh); + } + if (map == NULL) + return (PageCompressHeader *) MAP_FAILED; + +#else + if (readonly) + map = (PageCompressHeader *) mmap(NULL, pc_memory_map_size, PROT_READ, MAP_SHARED, fd, 0); + else + map = (PageCompressHeader *) mmap(NULL, pc_memory_map_size, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0); +#endif + return map; +} + +/** + * pc_munmap() -- release memory map of page compression address file. + * + */ +int +pc_munmap(PageCompressHeader *map) +{ +#ifdef WIN32 + return UnmapViewOfFile(map) ? 0 : -1; +#else + return munmap(map, SizeofPageCompressAddrFile(map->chunk_size)); +#endif +} + +/** + * pc_msync() -- sync memory map of page compression address file. + * + */ +int +pc_msync(PageCompressHeader *map) +{ +#ifndef FRONTEND + if (!enableFsync) + return 0; +#endif + +#ifdef WIN32 + return FlushViewOfFile(map, SizeofPageCompressAddrFile(map->chunk_size)) ? 0 : -1; +#else + return msync(map, SizeofPageCompressAddrFile(map->chunk_size), MS_SYNC); +#endif +} diff --git a/src/include/storage/relfilelocator.h b/src/include/storage/relfilelocator.h index 10f41f3abb..ef7e2d72f4 100644 --- a/src/include/storage/relfilelocator.h +++ b/src/include/storage/relfilelocator.h @@ -17,6 +17,17 @@ #include "common/relpath.h" #include "storage/backendid.h" +/* +* compression options for compressed relation +*/ +typedef struct RelFileNodeCompressOpts +{ + uint8 algorithm; /* compression algorithm */ + int8 level; /* compression level */ + uint8 chunks_pre_block; /* number of chunks per block for compressed relation */ + uint8 prealloc_chunks; /* prealloced chunks to store compressed data */ +} RelFileNodeCompressOpts; + /* * RelFileLocator must provide all that we need to know to physically access * a relation, with the exception of the backend ID, which can be provided @@ -59,6 +70,7 @@ typedef struct RelFileLocator Oid spcOid; /* tablespace */ Oid dbOid; /* database */ RelFileNumber relNumber; /* relation */ + RelFileNodeCompressOpts compressOpt; /* compression options */ } RelFileLocator; /* diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 7dc401cf0d..1f915633c1 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -23,6 +23,7 @@ #include "partitioning/partdefs.h" #include "rewrite/prs2lock.h" #include "storage/block.h" +#include "storage/page_compression.h" #include "storage/relfilelocator.h" #include "storage/smgr.h" #include "utils/relcache.h" @@ -329,6 +330,18 @@ typedef enum StdRdOptIndexCleanup STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON } StdRdOptIndexCleanup; +/* PageCompressOpts->compresstype values */ +typedef PageCompression compressTypeOption; + + /* page compression related reloptions. */ +typedef struct PageCompressOpts +{ + compressTypeOption compresstype; /* compress algorithm */ + int compresslevel; /* compress level */ + int compress_chunk_size; /* chunk size of compressed data */ + int compress_prealloc_chunks; /* prealloced chunks to store compressed data */ +} PageCompressOpts; + typedef struct StdRdOptions { int32 vl_len_; /* varlena header (do not touch directly!) */ @@ -339,6 +352,7 @@ typedef struct StdRdOptions int parallel_workers; /* max number of parallel workers */ StdRdOptIndexCleanup vacuum_index_cleanup; /* controls index vacuuming */ bool vacuum_truncate; /* enables vacuum to truncate a relation */ + PageCompressOpts compress; /* page compression related reloptions. */ } StdRdOptions; #define HEAP_MIN_FILLFACTOR 10 diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h index ba35d6b3b3..a9eba9a622 100644 --- a/src/include/utils/relcache.h +++ b/src/include/utils/relcache.h @@ -108,7 +108,8 @@ extern Relation RelationBuildLocalRelation(const char *relname, bool shared_relation, bool mapped_relation, char relpersistence, - char relkind); + char relkind, + Datum reloptions); /* * Routines to manage assignment of new relfilenumber to a relation diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index c3ade01120..5e99d304c0 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -230,7 +230,9 @@ typedef enum WAIT_EVENT_WAL_READ, WAIT_EVENT_WAL_SYNC, WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN, - WAIT_EVENT_WAL_WRITE + WAIT_EVENT_WAL_WRITE, + WAIT_EVENT_COMPRESS_ADDRESS_FILE_FLUSH, + WAIT_EVENT_COMPRESS_ADDRESS_FILE_SYNC } WaitEventIO;