read stream on amcheck
Hi,
While reviewing some other patches implementing stream API for core subsystems,
I noticed that the amcheck extension could also benefit from that.
Notice the refactor when handling the "skip" parameter; The logic was moved to
the heapam_read_stream_next_block callback so that verify_heapam don't need to
touch any private field of heapam_read_stream_next_block_private struct.
One other think to mention is that the test cases of "skip" parameter
that I've seen just test when the first page is corrupted, so I think
that a carefully review on callback logic would be good to ensure that
we don't accidentally skip a page when doing p->current_blocknum++;
This patch doesn't show any performance improvements (or regression)
but I think that it would be good to replace the ReadBufferExtended
usage with the read stream API, so in the future it could be benefit
from the AIO project.
--
Matheus Alcantara
Attachments:
v1-0001-Use-read-stream-on-amcheck.patchapplication/octet-stream; name=v1-0001-Use-read-stream-on-amcheck.patchDownload
From 513804d079f15d9584f313a3859f8155819b5587 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 29 Nov 2024 18:52:43 -0300
Subject: [PATCH v1] Use read stream on amcheck
---
contrib/amcheck/verify_heapam.c | 87 ++++++++++++++++++++++++---------
1 file changed, 63 insertions(+), 24 deletions(-)
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 8a8e36dde7..ae73a96af5 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -23,8 +23,10 @@
#include "catalog/pg_class.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "storage/block.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
+#include "storage/read_stream.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -185,6 +187,44 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
HeapCheckContext *ctx,
XidCommitStatus *status);
+struct heapam_read_stream_next_block_private
+{
+ BlockNumber current_blocknum;
+ BlockNumber last_exclusive;
+ SkipPages skip_option;
+ Relation rel;
+ Buffer *vmbuffer;
+};
+
+static BlockNumber
+heapam_read_stream_next_block(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ struct heapam_read_stream_next_block_private *p = callback_private_data;
+
+ /* Optionally skip over all-frozen or all-visible blocks */
+ if (p->skip_option != SKIP_PAGES_NONE)
+ {
+ for(; p->current_blocknum < p->last_exclusive; p->current_blocknum++)
+ {
+ int32 mapbts = visibilitymap_get_status(p->rel, p->current_blocknum,
+ p->vmbuffer);
+
+ if ((p->skip_option == SKIP_PAGES_ALL_FROZEN && (mapbts & VISIBILITYMAP_ALL_FROZEN) != 0) ||
+ (p->skip_option == SKIP_PAGES_ALL_VISIBLE && (mapbts & VISIBILITYMAP_ALL_VISIBLE) != 0))
+ continue;
+
+ break;
+ }
+ }
+
+ if (p->current_blocknum < p->last_exclusive)
+ return p->current_blocknum++;
+
+ return InvalidBlockNumber;
+}
+
/*
* Scan and report corruption in heap pages, optionally reconciling toasted
* attributes with entries in the associated toast table. Intended to be
@@ -231,6 +271,8 @@ verify_heapam(PG_FUNCTION_ARGS)
BlockNumber last_block;
BlockNumber nblocks;
const char *skip;
+ ReadStream *stream;
+ struct heapam_read_stream_next_block_private p;
/* Check supplied arguments */
if (PG_ARGISNULL(0))
@@ -404,7 +446,19 @@ verify_heapam(PG_FUNCTION_ARGS)
if (TransactionIdIsNormal(ctx.relfrozenxid))
ctx.oldest_xid = ctx.relfrozenxid;
- for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+ p.current_blocknum = first_block;
+ p.last_exclusive = last_block + 1;
+ p.skip_option = skip_option;
+ p.rel = ctx.rel;
+ p.vmbuffer = &vmbuffer;
+ stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+ ctx.bstrategy,
+ ctx.rel,
+ MAIN_FORKNUM,
+ heapam_read_stream_next_block,
+ &p,
+ 0);
+ while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
{
OffsetNumber maxoff;
OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +471,11 @@ verify_heapam(PG_FUNCTION_ARGS)
memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
- /* Optionally skip over all-frozen or all-visible blocks */
- if (skip_option != SKIP_PAGES_NONE)
- {
- int32 mapbits;
-
- mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
- &vmbuffer);
- if (skip_option == SKIP_PAGES_ALL_FROZEN)
- {
- if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
- continue;
- }
-
- if (skip_option == SKIP_PAGES_ALL_VISIBLE)
- {
- if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
- continue;
- }
- }
-
- /* Read and lock the next page. */
- ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
- RBM_NORMAL, ctx.bstrategy);
+ /* Lock the next page. */
+ Assert(BufferIsValid(ctx.buffer));
LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+ ctx.blkno = BufferGetBlockNumber(ctx.buffer);
ctx.page = BufferGetPage(ctx.buffer);
/* Perform tuple checks */
@@ -798,6 +833,10 @@ verify_heapam(PG_FUNCTION_ARGS)
if (on_error_stop && ctx.is_corrupt)
break;
}
+ /* Ensure that the stream is completely read */
+ Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(p.current_blocknum == last_block + 1);
+ read_stream_end(stream);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
--
2.39.3 (Apple Git-146)
On Thu, 2 Jan 2025 at 20:29, Matheus Alcantara <matheusssilv97@gmail.com> wrote:
Hi,
While reviewing some other patches implementing stream API for core subsystems,
I noticed that the amcheck extension could also benefit from that.Notice the refactor when handling the "skip" parameter; The logic was moved to
the heapam_read_stream_next_block callback so that verify_heapam don't need to
touch any private field of heapam_read_stream_next_block_private struct.One other think to mention is that the test cases of "skip" parameter
that I've seen just test when the first page is corrupted, so I think
that a carefully review on callback logic would be good to ensure that
we don't accidentally skip a page when doing p->current_blocknum++;This patch doesn't show any performance improvements (or regression)
but I think that it would be good to replace the ReadBufferExtended
usage with the read stream API, so in the future it could be benefit
from the AIO project.--
Matheus Alcantara
Hi!
+1 on idea
However, this:
- if (skip_option == SKIP_PAGES_ALL_FROZEN)
- {
- if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
- continue;
- }
-
- if (skip_option == SKIP_PAGES_ALL_VISIBLE)
- {
- if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
- continue;
- }
changes to this
(in heapam_read_stream_next_block)
+ + if ((p->skip_option == SKIP_PAGES_ALL_FROZEN && (mapbts & VISIBILITYMAP_ALL_FROZEN) != 0) || + (p->skip_option == SKIP_PAGES_ALL_VISIBLE && (mapbts & VISIBILITYMAP_ALL_VISIBLE) != 0)) + continue;
I don't understand this change. The patch aims to be purely applying
streaming API, not refactoring. And if we refactor this code, this is
less readable than it was.
Other than that, LGTM.
--
Best regards,
Kirill Reshke
Thanks for reviewing!
Em qui., 2 de jan. de 2025 às 13:16, Kirill Reshke
<reshkekirill@gmail.com> escreveu:
However, this:
- if (skip_option == SKIP_PAGES_ALL_FROZEN)
- {
- if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
- continue;
- }
-
- if (skip_option == SKIP_PAGES_ALL_VISIBLE)
- {
- if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
- continue;
- }changes to this
(in heapam_read_stream_next_block)+ + if ((p->skip_option == SKIP_PAGES_ALL_FROZEN && (mapbts & VISIBILITYMAP_ALL_FROZEN) != 0) || + (p->skip_option == SKIP_PAGES_ALL_VISIBLE && (mapbts & VISIBILITYMAP_ALL_VISIBLE) != 0)) + continue;I don't understand this change. The patch aims to be purely applying
streaming API, not refactoring. And if we refactor this code, this is
less readable than it was.
Yeap, I agree. Attached a v2 fixed.
--
Matheus Alcantara
Attachments:
v2-0001-Use-read-stream-on-amcheck.patchapplication/octet-stream; name=v2-0001-Use-read-stream-on-amcheck.patchDownload
From 8e221251ff19eab5005063de97f5e3174bdfb977 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 29 Nov 2024 18:52:43 -0300
Subject: [PATCH v2] Use read stream on amcheck
---
contrib/amcheck/verify_heapam.c | 97 +++++++++++++++++++++++++--------
1 file changed, 73 insertions(+), 24 deletions(-)
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 8a8e36dde7..2c5c196961 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -23,8 +23,10 @@
#include "catalog/pg_class.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "storage/block.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
+#include "storage/read_stream.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -185,6 +187,54 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
HeapCheckContext *ctx,
XidCommitStatus *status);
+struct heapam_read_stream_next_block_private
+{
+ BlockNumber current_blocknum;
+ BlockNumber last_exclusive;
+ SkipPages skip_option;
+ Relation rel;
+ Buffer *vmbuffer;
+};
+
+static BlockNumber
+heapam_read_stream_next_block(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ struct heapam_read_stream_next_block_private *p = callback_private_data;
+
+ /* Optionally skip over all-frozen or all-visible blocks */
+ if (p->skip_option != SKIP_PAGES_NONE)
+ {
+ for(; p->current_blocknum < p->last_exclusive; p->current_blocknum++)
+ {
+ int32 mapbits;
+
+ mapbits = (int32) visibilitymap_get_status(p->rel, p->current_blocknum,
+ p->vmbuffer);
+
+ if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
+ continue;
+ }
+
+ if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ if (p->current_blocknum < p->last_exclusive)
+ return p->current_blocknum++;
+
+ return InvalidBlockNumber;
+}
+
/*
* Scan and report corruption in heap pages, optionally reconciling toasted
* attributes with entries in the associated toast table. Intended to be
@@ -231,6 +281,8 @@ verify_heapam(PG_FUNCTION_ARGS)
BlockNumber last_block;
BlockNumber nblocks;
const char *skip;
+ ReadStream *stream;
+ struct heapam_read_stream_next_block_private p;
/* Check supplied arguments */
if (PG_ARGISNULL(0))
@@ -404,7 +456,19 @@ verify_heapam(PG_FUNCTION_ARGS)
if (TransactionIdIsNormal(ctx.relfrozenxid))
ctx.oldest_xid = ctx.relfrozenxid;
- for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+ p.current_blocknum = first_block;
+ p.last_exclusive = last_block + 1;
+ p.skip_option = skip_option;
+ p.rel = ctx.rel;
+ p.vmbuffer = &vmbuffer;
+ stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+ ctx.bstrategy,
+ ctx.rel,
+ MAIN_FORKNUM,
+ heapam_read_stream_next_block,
+ &p,
+ 0);
+ while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
{
OffsetNumber maxoff;
OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +481,11 @@ verify_heapam(PG_FUNCTION_ARGS)
memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
- /* Optionally skip over all-frozen or all-visible blocks */
- if (skip_option != SKIP_PAGES_NONE)
- {
- int32 mapbits;
-
- mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
- &vmbuffer);
- if (skip_option == SKIP_PAGES_ALL_FROZEN)
- {
- if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
- continue;
- }
-
- if (skip_option == SKIP_PAGES_ALL_VISIBLE)
- {
- if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
- continue;
- }
- }
-
- /* Read and lock the next page. */
- ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
- RBM_NORMAL, ctx.bstrategy);
+ /* Lock the next page. */
+ Assert(BufferIsValid(ctx.buffer));
LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+ ctx.blkno = BufferGetBlockNumber(ctx.buffer);
ctx.page = BufferGetPage(ctx.buffer);
/* Perform tuple checks */
@@ -798,6 +843,10 @@ verify_heapam(PG_FUNCTION_ARGS)
if (on_error_stop && ctx.is_corrupt)
break;
}
+ /* Ensure that the stream is completely read */
+ Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(p.current_blocknum == last_block + 1);
+ read_stream_end(stream);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
--
2.39.3 (Apple Git-146)
On Fri, Jan 3, 2025 at 1:53 AM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:
Yeap, I agree. Attached a v2 fixed.
hi. some minor issue i found.
+#include "storage/block.h"
no need, since "#include "storage/bufmgr.h" already included it.
do we need to add ``CHECK_FOR_INTERRUPTS()`` in heapam_read_stream_next_block?
heapam_read_stream_next_block_private need add to
src/tools/pgindent/typedefs.list
overall, it looks good to me.
Hi,
Em ter., 11 de fev. de 2025 às 03:39, jian he
<jian.universality@gmail.com> escreveu:
hi. some minor issue i found.
+#include "storage/block.h"
no need, since "#include "storage/bufmgr.h" already included it.
Fixed
do we need to add ``CHECK_FOR_INTERRUPTS()`` in heapam_read_stream_next_block?
The current code on master don't CHECK_FOR_INTERRUPTS, so I would prefer to not
change this behaviour, but I think that is considerable.
heapam_read_stream_next_block_private need add to
src/tools/pgindent/typedefs.list
Fixed
overall, it looks good to me.
Thanks for the review! v3 with the fixes attached.
--
Matheus Alcantara
Attachments:
v3-0001-Use-read-stream-on-amcheck.patchapplication/octet-stream; name=v3-0001-Use-read-stream-on-amcheck.patchDownload
From 60b1d901894523fa2958326e1002fade23ebe98e Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 29 Nov 2024 18:52:43 -0300
Subject: [PATCH v3] Use read stream on amcheck
---
contrib/amcheck/verify_heapam.c | 94 ++++++++++++++++++++++++--------
src/tools/pgindent/typedefs.list | 1 +
2 files changed, 71 insertions(+), 24 deletions(-)
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 827312306f6..8c83870db7d 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -25,6 +25,7 @@
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
+#include "storage/read_stream.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -185,6 +186,52 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
HeapCheckContext *ctx,
XidCommitStatus *status);
+struct heapam_read_stream_next_block_private
+{
+ BlockNumber current_blocknum;
+ BlockNumber last_exclusive;
+ SkipPages skip_option;
+ Relation rel;
+ Buffer *vmbuffer;
+};
+
+static BlockNumber
+heapam_read_stream_next_block(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ struct heapam_read_stream_next_block_private *p = callback_private_data;
+
+ /* Optionally skip over all-frozen or all-visible blocks */
+ if (p->skip_option != SKIP_PAGES_NONE)
+ {
+ for (; p->current_blocknum < p->last_exclusive; p->current_blocknum++)
+ {
+ int32 mapbits = (int32) visibilitymap_get_status(p->rel, p->current_blocknum,
+ p->vmbuffer);
+
+ if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
+ continue;
+ }
+
+ if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ if (p->current_blocknum < p->last_exclusive)
+ return p->current_blocknum++;
+
+ return InvalidBlockNumber;
+}
+
/*
* Scan and report corruption in heap pages, optionally reconciling toasted
* attributes with entries in the associated toast table. Intended to be
@@ -231,6 +278,8 @@ verify_heapam(PG_FUNCTION_ARGS)
BlockNumber last_block;
BlockNumber nblocks;
const char *skip;
+ ReadStream *stream;
+ struct heapam_read_stream_next_block_private p;
/* Check supplied arguments */
if (PG_ARGISNULL(0))
@@ -404,7 +453,19 @@ verify_heapam(PG_FUNCTION_ARGS)
if (TransactionIdIsNormal(ctx.relfrozenxid))
ctx.oldest_xid = ctx.relfrozenxid;
- for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+ p.current_blocknum = first_block;
+ p.last_exclusive = last_block + 1;
+ p.skip_option = skip_option;
+ p.rel = ctx.rel;
+ p.vmbuffer = &vmbuffer;
+ stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+ ctx.bstrategy,
+ ctx.rel,
+ MAIN_FORKNUM,
+ heapam_read_stream_next_block,
+ &p,
+ 0);
+ while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
{
OffsetNumber maxoff;
OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +478,11 @@ verify_heapam(PG_FUNCTION_ARGS)
memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
- /* Optionally skip over all-frozen or all-visible blocks */
- if (skip_option != SKIP_PAGES_NONE)
- {
- int32 mapbits;
-
- mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
- &vmbuffer);
- if (skip_option == SKIP_PAGES_ALL_FROZEN)
- {
- if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
- continue;
- }
-
- if (skip_option == SKIP_PAGES_ALL_VISIBLE)
- {
- if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
- continue;
- }
- }
-
- /* Read and lock the next page. */
- ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
- RBM_NORMAL, ctx.bstrategy);
+ /* Lock the next page. */
+ Assert(BufferIsValid(ctx.buffer));
LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+ ctx.blkno = BufferGetBlockNumber(ctx.buffer);
ctx.page = BufferGetPage(ctx.buffer);
/* Perform tuple checks */
@@ -798,6 +840,10 @@ verify_heapam(PG_FUNCTION_ARGS)
if (on_error_stop && ctx.is_corrupt)
break;
}
+ /* Ensure that the stream is completely read */
+ Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(p.current_blocknum == last_block + 1);
+ read_stream_end(stream);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b6c170ac249..825acb907b1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3569,6 +3569,7 @@ gzFile
hashfunc
hbaPort
heap_page_items_state
+heapam_read_stream_next_block_private
help_handler
hlCheck
hstoreCheckKeyLen_t
--
2.39.5 (Apple Git-154)
Hi,
Thank you for working on this!
On Tue, 11 Feb 2025 at 21:41, Matheus Alcantara
<matheusssilv97@gmail.com> wrote:
Thanks for the review! v3 with the fixes attached.
I have a small comment.
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 827312306f6..8c83870db7d 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
+ stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+ ctx.bstrategy,
+ ctx.rel,
I think we do not always want to disable prefetching by explicitly
setting the READ_STREAM_SEQUENTIAL flag. If skip_option !=
SKIP_PAGES_NONE, then the blocks might not always be sequential. So,
what do you think about setting READ_STREAM_SEQUENTIAL when the
skip_option == SKIP_PAGES_NONE and
READ_STREAM_DEFAULT otherwise.
Other than that, LGTM.
--
Regards,
Nazir Bilal Yavuz
Microsoft
On Wed, 12 Feb 2025 at 00:11, Matheus Alcantara
<matheusssilv97@gmail.com> wrote:
Hi,
Em ter., 11 de fev. de 2025 às 03:39, jian he
<jian.universality@gmail.com> escreveu:hi. some minor issue i found.
+#include "storage/block.h"
no need, since "#include "storage/bufmgr.h" already included it.Fixed
do we need to add ``CHECK_FOR_INTERRUPTS()`` in heapam_read_stream_next_block?
The current code on master don't CHECK_FOR_INTERRUPTS, so I would prefer to not
change this behaviour, but I think that is considerable.heapam_read_stream_next_block_private need add to
src/tools/pgindent/typedefs.listFixed
overall, it looks good to me.
Thanks for the review! v3 with the fixes attached.
I noticed that Nazir Bilal Yavuz's comment from [1]/messages/by-id/CAN55FZ2CGxcqTk_LLRPAi2aFNqtR4o=JPfjN0=yT0yObfQ2h2g@mail.gmail.com is not yet
addressed, i have changed the status of commitfest entry to Waiting on
Author, please address them and update it to Needs review.
[1]: /messages/by-id/CAN55FZ2CGxcqTk_LLRPAi2aFNqtR4o=JPfjN0=yT0yObfQ2h2g@mail.gmail.com
Regards,
Vignesh
Hi,
On Sun, Mar 16, 2025 at 10:30 AM vignesh C <vignesh21@gmail.com> wrote:
On Wed, 12 Feb 2025 at 00:11, Matheus Alcantara
<matheusssilv97@gmail.com> wrote:Hi,
Em ter., 11 de fev. de 2025 às 03:39, jian he
<jian.universality@gmail.com> escreveu:hi. some minor issue i found.
+#include "storage/block.h"
no need, since "#include "storage/bufmgr.h" already included it.Fixed
do we need to add ``CHECK_FOR_INTERRUPTS()`` in heapam_read_stream_next_block?
The current code on master don't CHECK_FOR_INTERRUPTS, so I would prefer to not
change this behaviour, but I think that is considerable.heapam_read_stream_next_block_private need add to
src/tools/pgindent/typedefs.listFixed
overall, it looks good to me.
Thanks for the review! v3 with the fixes attached.
I noticed that Nazir Bilal Yavuz's comment from [1] is not yet
addressed, i have changed the status of commitfest entry to Waiting on
Author, please address them and update it to Needs review.
[1] - /messages/by-id/CAN55FZ2CGxcqTk_LLRPAi2aFNqtR4o=JPfjN0=yT0yObfQ2h2g@mail.gmail.com
Sorry for the delay, attached v4 with the remaining fixes.
--
Matheus Alcantara
Attachments:
v4-0001-Use-read-stream-on-amcheck.patchapplication/octet-stream; name=v4-0001-Use-read-stream-on-amcheck.patchDownload
From 0ee6524f27354e825afce85cb48ecb080a805c12 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 29 Nov 2024 18:52:43 -0300
Subject: [PATCH v4] Use read stream on amcheck
---
contrib/amcheck/verify_heapam.c | 95 ++++++++++++++++++++++++--------
src/tools/pgindent/typedefs.list | 1 +
2 files changed, 72 insertions(+), 24 deletions(-)
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 827312306f6..c4805900bf9 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -25,6 +25,7 @@
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
+#include "storage/read_stream.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -185,6 +186,52 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
HeapCheckContext *ctx,
XidCommitStatus *status);
+struct heapam_read_stream_next_block_private
+{
+ BlockNumber current_blocknum;
+ BlockNumber last_exclusive;
+ SkipPages skip_option;
+ Relation rel;
+ Buffer *vmbuffer;
+};
+
+static BlockNumber
+heapam_read_stream_next_block(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ struct heapam_read_stream_next_block_private *p = callback_private_data;
+
+ /* Optionally skip over all-frozen or all-visible blocks */
+ if (p->skip_option != SKIP_PAGES_NONE)
+ {
+ for (; p->current_blocknum < p->last_exclusive; p->current_blocknum++)
+ {
+ int32 mapbits = (int32) visibilitymap_get_status(p->rel, p->current_blocknum,
+ p->vmbuffer);
+
+ if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
+ continue;
+ }
+
+ if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ if (p->current_blocknum < p->last_exclusive)
+ return p->current_blocknum++;
+
+ return InvalidBlockNumber;
+}
+
/*
* Scan and report corruption in heap pages, optionally reconciling toasted
* attributes with entries in the associated toast table. Intended to be
@@ -231,6 +278,9 @@ verify_heapam(PG_FUNCTION_ARGS)
BlockNumber last_block;
BlockNumber nblocks;
const char *skip;
+ ReadStream *stream;
+ struct heapam_read_stream_next_block_private p;
+ int read_stream_flags = skip_option == SKIP_PAGES_NONE ? READ_STREAM_SEQUENTIAL : READ_STREAM_DEFAULT;
/* Check supplied arguments */
if (PG_ARGISNULL(0))
@@ -404,7 +454,19 @@ verify_heapam(PG_FUNCTION_ARGS)
if (TransactionIdIsNormal(ctx.relfrozenxid))
ctx.oldest_xid = ctx.relfrozenxid;
- for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+ p.current_blocknum = first_block;
+ p.last_exclusive = last_block + 1;
+ p.skip_option = skip_option;
+ p.rel = ctx.rel;
+ p.vmbuffer = &vmbuffer;
+ stream = read_stream_begin_relation(read_stream_flags,
+ ctx.bstrategy,
+ ctx.rel,
+ MAIN_FORKNUM,
+ heapam_read_stream_next_block,
+ &p,
+ 0);
+ while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
{
OffsetNumber maxoff;
OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +479,11 @@ verify_heapam(PG_FUNCTION_ARGS)
memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
- /* Optionally skip over all-frozen or all-visible blocks */
- if (skip_option != SKIP_PAGES_NONE)
- {
- int32 mapbits;
-
- mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
- &vmbuffer);
- if (skip_option == SKIP_PAGES_ALL_FROZEN)
- {
- if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
- continue;
- }
-
- if (skip_option == SKIP_PAGES_ALL_VISIBLE)
- {
- if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
- continue;
- }
- }
-
- /* Read and lock the next page. */
- ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
- RBM_NORMAL, ctx.bstrategy);
+ /* Lock the next page. */
+ Assert(BufferIsValid(ctx.buffer));
LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+ ctx.blkno = BufferGetBlockNumber(ctx.buffer);
ctx.page = BufferGetPage(ctx.buffer);
/* Perform tuple checks */
@@ -798,6 +841,10 @@ verify_heapam(PG_FUNCTION_ARGS)
if (on_error_stop && ctx.is_corrupt)
break;
}
+ /* Ensure that the stream is completely read */
+ Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ Assert(p.current_blocknum == last_block + 1);
+ read_stream_end(stream);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 93339ef3c58..5803481e232 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3599,6 +3599,7 @@ gzFile
hashfunc
hbaPort
heap_page_items_state
+heapam_read_stream_next_block_private
help_handler
hlCheck
hstoreCheckKeyLen_t
--
2.39.5 (Apple Git-154)
On Mon, Mar 17, 2025 at 9:47 AM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:
Sorry for the delay, attached v4 with the remaining fixes.
Thanks for the patch.
I started reviewing this with the intent to commit it. But, I decided
while studying it that I want to separate the SKIP_PAGES_NONE case and
the other cases into two callbacks. I think it is easier to read the
skip pages callback this way. The SKIP_PAGES_NONE case is just read
all blocks in the range, so we can use the existing default callback,
block_range_read_cb(). Then the callback for the
SKIP_PAGES_ALL_VISIBLE and SKIP_PAGES_ALL_FROZEN options can be clear
and simple.
I've attached two versions with this proposed structure.
amcheck-readsteram-1callback.patch implements this with one callback
and has the amcheck specific callback private data struct subclass
BlockRangeReadStreamPrivate (I called it
heapamcheck_rs_perblock_data).
amcheck-readstream-2callbacks.patch wraps block_range_read_cb() in an
amcheck specific callback and creates a BlockRangeReadStreamPrivate
and fills it in from the heapamcheck_rs_perblock_data to pass as
callback_private_data. Because this version is more explicit, it is
more safe. We don't have any type checking facilities that will alert
us if someone adds a member above the BlockRangeReadStreamPrivate in
heapamcheck_rs_perblock_data. But, I'm open to feedback.
- Melanie
Attachments:
amcheck-readstream-2callbacks.patchtext/x-patch; charset=US-ASCII; name=amcheck-readstream-2callbacks.patchDownload
From c28f070d76d713c9547a679f9d6f09c6c9d531fb Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 29 Nov 2024 18:52:43 -0300
Subject: [PATCH] Use read stream on amcheck
ci-os-only:
---
contrib/amcheck/verify_heapam.c | 114 +++++++++++++++++++++++++-------
1 file changed, 90 insertions(+), 24 deletions(-)
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 827312306f6..caef0a3989a 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -25,6 +25,7 @@
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
+#include "storage/read_stream.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -185,6 +186,58 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
HeapCheckContext *ctx,
XidCommitStatus *status);
+typedef struct heapamcheck_rs_perblock_data
+{
+ BlockNumber last_exclusive;
+ BlockNumber current_blocknum;
+ SkipPages skip_option;
+ Relation rel;
+ Buffer *vmbuffer;
+} heapamcheck_rs_perblock_data;
+
+static BlockNumber
+heapamcheck_rs_next_block_noskips(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ heapamcheck_rs_perblock_data *p = callback_private_data;
+ BlockRangeReadStreamPrivate range = {
+ .current_blocknum = p->current_blocknum,
+ .last_exclusive = p->last_exclusive
+ };
+
+ return block_range_read_stream_cb(stream, (void *) &range, NULL);
+}
+
+static BlockNumber
+heapamcheck_rs_next_block_skips(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ heapamcheck_rs_perblock_data *p = callback_private_data;
+
+ for (BlockNumber i; (i = p->current_blocknum++) < p->last_exclusive;)
+ {
+ int32 mapbits = visibilitymap_get_status(p->rel, i, p->vmbuffer);
+
+ if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
+ continue;
+ }
+
+ if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
+ continue;
+ }
+
+ return i;
+ }
+
+ return InvalidBlockNumber;
+}
+
/*
* Scan and report corruption in heap pages, optionally reconciling toasted
* attributes with entries in the associated toast table. Intended to be
@@ -231,6 +284,10 @@ verify_heapam(PG_FUNCTION_ARGS)
BlockNumber last_block;
BlockNumber nblocks;
const char *skip;
+ ReadStream *stream;
+ int read_stream_flags;
+ ReadStreamBlockNumberCB cb;
+ heapamcheck_rs_perblock_data rsdata;
/* Check supplied arguments */
if (PG_ARGISNULL(0))
@@ -404,7 +461,32 @@ verify_heapam(PG_FUNCTION_ARGS)
if (TransactionIdIsNormal(ctx.relfrozenxid))
ctx.oldest_xid = ctx.relfrozenxid;
- for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+ rsdata.current_blocknum = first_block;
+ rsdata.last_exclusive = last_block + 1;
+ rsdata.skip_option = skip_option;
+ rsdata.rel = ctx.rel;
+ rsdata.vmbuffer = &vmbuffer;
+
+ if (skip_option == SKIP_PAGES_NONE)
+ {
+ cb = heapamcheck_rs_next_block_noskips;
+ read_stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL;
+ }
+ else
+ {
+ cb = heapamcheck_rs_next_block_skips;
+ read_stream_flags = READ_STREAM_DEFAULT;
+ }
+
+ stream = read_stream_begin_relation(read_stream_flags,
+ ctx.bstrategy,
+ ctx.rel,
+ MAIN_FORKNUM,
+ cb,
+ &rsdata,
+ 0);
+
+ while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
{
OffsetNumber maxoff;
OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +499,11 @@ verify_heapam(PG_FUNCTION_ARGS)
memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
- /* Optionally skip over all-frozen or all-visible blocks */
- if (skip_option != SKIP_PAGES_NONE)
- {
- int32 mapbits;
-
- mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
- &vmbuffer);
- if (skip_option == SKIP_PAGES_ALL_FROZEN)
- {
- if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
- continue;
- }
-
- if (skip_option == SKIP_PAGES_ALL_VISIBLE)
- {
- if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
- continue;
- }
- }
-
- /* Read and lock the next page. */
- ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
- RBM_NORMAL, ctx.bstrategy);
+ /* Lock the next page. */
+ Assert(BufferIsValid(ctx.buffer));
LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+ ctx.blkno = BufferGetBlockNumber(ctx.buffer);
ctx.page = BufferGetPage(ctx.buffer);
/* Perform tuple checks */
@@ -798,6 +861,9 @@ verify_heapam(PG_FUNCTION_ARGS)
if (on_error_stop && ctx.is_corrupt)
break;
}
+ /* Ensure that the stream is completely read */
+ Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ read_stream_end(stream);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
--
2.34.1
amcheck-readstream-1callback.patchtext/x-patch; charset=US-ASCII; name=amcheck-readstream-1callback.patchDownload
From f4e0c14631a11d60e5b518db63c232ec5fcc0048 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 29 Nov 2024 18:52:43 -0300
Subject: [PATCH] Use read stream on amcheck
ci-os-only:
---
contrib/amcheck/verify_heapam.c | 99 +++++++++++++++++++++++++--------
1 file changed, 75 insertions(+), 24 deletions(-)
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 827312306f6..2b323b6f4e4 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -25,6 +25,7 @@
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
+#include "storage/read_stream.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -185,6 +186,43 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
HeapCheckContext *ctx,
XidCommitStatus *status);
+typedef struct heapamcheck_rs_perblock_data
+{
+ BlockRangeReadStreamPrivate range;
+ SkipPages skip_option;
+ Relation rel;
+ Buffer *vmbuffer;
+} heapamcheck_rs_perblock_data;
+
+static BlockNumber
+heapam_read_stream_next_block(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ heapamcheck_rs_perblock_data *p = callback_private_data;
+
+ for (BlockNumber i; (i = p->range.current_blocknum++) < p->range.last_exclusive;)
+ {
+ int32 mapbits = visibilitymap_get_status(p->rel, i, p->vmbuffer);
+
+ if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
+ continue;
+ }
+
+ if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
+ continue;
+ }
+
+ return i;
+ }
+
+ return InvalidBlockNumber;
+}
+
/*
* Scan and report corruption in heap pages, optionally reconciling toasted
* attributes with entries in the associated toast table. Intended to be
@@ -231,6 +269,10 @@ verify_heapam(PG_FUNCTION_ARGS)
BlockNumber last_block;
BlockNumber nblocks;
const char *skip;
+ ReadStream *stream;
+ int read_stream_flags;
+ ReadStreamBlockNumberCB cb;
+ heapamcheck_rs_perblock_data rsdata;
/* Check supplied arguments */
if (PG_ARGISNULL(0))
@@ -404,7 +446,32 @@ verify_heapam(PG_FUNCTION_ARGS)
if (TransactionIdIsNormal(ctx.relfrozenxid))
ctx.oldest_xid = ctx.relfrozenxid;
- for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+ rsdata.range.current_blocknum = first_block;
+ rsdata.range.last_exclusive = last_block + 1;
+ rsdata.skip_option = skip_option;
+ rsdata.rel = ctx.rel;
+ rsdata.vmbuffer = &vmbuffer;
+
+ if (skip_option == SKIP_PAGES_NONE)
+ {
+ cb = block_range_read_stream_cb;
+ read_stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL;
+ }
+ else
+ {
+ cb = heapam_read_stream_next_block;
+ read_stream_flags = READ_STREAM_DEFAULT;
+ }
+
+ stream = read_stream_begin_relation(read_stream_flags,
+ ctx.bstrategy,
+ ctx.rel,
+ MAIN_FORKNUM,
+ cb,
+ &rsdata,
+ 0);
+
+ while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
{
OffsetNumber maxoff;
OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +484,11 @@ verify_heapam(PG_FUNCTION_ARGS)
memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
- /* Optionally skip over all-frozen or all-visible blocks */
- if (skip_option != SKIP_PAGES_NONE)
- {
- int32 mapbits;
-
- mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
- &vmbuffer);
- if (skip_option == SKIP_PAGES_ALL_FROZEN)
- {
- if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
- continue;
- }
-
- if (skip_option == SKIP_PAGES_ALL_VISIBLE)
- {
- if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
- continue;
- }
- }
-
- /* Read and lock the next page. */
- ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
- RBM_NORMAL, ctx.bstrategy);
+ /* Lock the next page. */
+ Assert(BufferIsValid(ctx.buffer));
LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+ ctx.blkno = BufferGetBlockNumber(ctx.buffer);
ctx.page = BufferGetPage(ctx.buffer);
/* Perform tuple checks */
@@ -798,6 +846,9 @@ verify_heapam(PG_FUNCTION_ARGS)
if (on_error_stop && ctx.is_corrupt)
break;
}
+ /* Ensure that the stream is completely read */
+ Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ read_stream_end(stream);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
--
2.34.1
Hi,
On Thu, 27 Mar 2025 at 03:48, Melanie Plageman
<melanieplageman@gmail.com> wrote:
On Mon, Mar 17, 2025 at 9:47 AM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:Sorry for the delay, attached v4 with the remaining fixes.
Thanks for the patch.
I started reviewing this with the intent to commit it. But, I decided
while studying it that I want to separate the SKIP_PAGES_NONE case and
the other cases into two callbacks. I think it is easier to read the
skip pages callback this way. The SKIP_PAGES_NONE case is just read
all blocks in the range, so we can use the existing default callback,
block_range_read_cb(). Then the callback for the
SKIP_PAGES_ALL_VISIBLE and SKIP_PAGES_ALL_FROZEN options can be clear
and simple.I've attached two versions with this proposed structure.
amcheck-readsteram-1callback.patch implements this with one callback
and has the amcheck specific callback private data struct subclass
BlockRangeReadStreamPrivate (I called it
heapamcheck_rs_perblock_data).amcheck-readstream-2callbacks.patch wraps block_range_read_cb() in an
amcheck specific callback and creates a BlockRangeReadStreamPrivate
and fills it in from the heapamcheck_rs_perblock_data to pass as
callback_private_data. Because this version is more explicit, it is
more safe. We don't have any type checking facilities that will alert
us if someone adds a member above the BlockRangeReadStreamPrivate in
heapamcheck_rs_perblock_data. But, I'm open to feedback.
I liked the first approach more. We can solve the first approach's
problems by introducing a void pointer to pass to
read_stream_begin_relation(). We can set it to &rsdata.range for the
SKIP_PAGES_NONE case and &rsdata for the rest.
Example patch is attached, heapamcheck_rs_perblock_data is added to
typedefs.list too.
--
Regards,
Nazir Bilal Yavuz
Microsoft
Attachments:
amcheck-readstream-1callback.patchtext/x-patch; charset=US-ASCII; name=amcheck-readstream-1callback.patchDownload
From 642a01c6298742879cefac1197d466b6fec5df7f Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 29 Nov 2024 18:52:43 -0300
Subject: [PATCH] Use read stream on amcheck
ci-os-only:
---
contrib/amcheck/verify_heapam.c | 102 +++++++++++++++++++++++--------
src/tools/pgindent/typedefs.list | 1 +
2 files changed, 79 insertions(+), 24 deletions(-)
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 827312306f6..be031a1795e 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -25,6 +25,7 @@
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
+#include "storage/read_stream.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -185,6 +186,43 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
HeapCheckContext *ctx,
XidCommitStatus *status);
+typedef struct heapamcheck_rs_perblock_data
+{
+ BlockRangeReadStreamPrivate range;
+ SkipPages skip_option;
+ Relation rel;
+ Buffer *vmbuffer;
+} heapamcheck_rs_perblock_data;
+
+static BlockNumber
+heapam_read_stream_next_block(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ heapamcheck_rs_perblock_data *p = callback_private_data;
+
+ for (BlockNumber i; (i = p->range.current_blocknum++) < p->range.last_exclusive;)
+ {
+ int32 mapbits = visibilitymap_get_status(p->rel, i, p->vmbuffer);
+
+ if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
+ continue;
+ }
+
+ if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
+ {
+ if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
+ continue;
+ }
+
+ return i;
+ }
+
+ return InvalidBlockNumber;
+}
+
/*
* Scan and report corruption in heap pages, optionally reconciling toasted
* attributes with entries in the associated toast table. Intended to be
@@ -231,6 +269,11 @@ verify_heapam(PG_FUNCTION_ARGS)
BlockNumber last_block;
BlockNumber nblocks;
const char *skip;
+ ReadStream *stream;
+ int read_stream_flags;
+ ReadStreamBlockNumberCB cb;
+ heapamcheck_rs_perblock_data rsdata;
+ void *stream_callback_private;
/* Check supplied arguments */
if (PG_ARGISNULL(0))
@@ -404,7 +447,34 @@ verify_heapam(PG_FUNCTION_ARGS)
if (TransactionIdIsNormal(ctx.relfrozenxid))
ctx.oldest_xid = ctx.relfrozenxid;
- for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+ rsdata.range.current_blocknum = first_block;
+ rsdata.range.last_exclusive = last_block + 1;
+ rsdata.skip_option = skip_option;
+ rsdata.rel = ctx.rel;
+ rsdata.vmbuffer = &vmbuffer;
+
+ if (skip_option == SKIP_PAGES_NONE)
+ {
+ cb = block_range_read_stream_cb;
+ read_stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL;
+ stream_callback_private = &rsdata.range;
+ }
+ else
+ {
+ cb = heapam_read_stream_next_block;
+ read_stream_flags = READ_STREAM_DEFAULT;
+ stream_callback_private = &rsdata;
+ }
+
+ stream = read_stream_begin_relation(read_stream_flags,
+ ctx.bstrategy,
+ ctx.rel,
+ MAIN_FORKNUM,
+ cb,
+ stream_callback_private,
+ 0);
+
+ while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
{
OffsetNumber maxoff;
OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +487,11 @@ verify_heapam(PG_FUNCTION_ARGS)
memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
- /* Optionally skip over all-frozen or all-visible blocks */
- if (skip_option != SKIP_PAGES_NONE)
- {
- int32 mapbits;
-
- mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
- &vmbuffer);
- if (skip_option == SKIP_PAGES_ALL_FROZEN)
- {
- if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
- continue;
- }
-
- if (skip_option == SKIP_PAGES_ALL_VISIBLE)
- {
- if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
- continue;
- }
- }
-
- /* Read and lock the next page. */
- ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
- RBM_NORMAL, ctx.bstrategy);
+ /* Lock the next page. */
+ Assert(BufferIsValid(ctx.buffer));
LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+ ctx.blkno = BufferGetBlockNumber(ctx.buffer);
ctx.page = BufferGetPage(ctx.buffer);
/* Perform tuple checks */
@@ -798,6 +849,9 @@ verify_heapam(PG_FUNCTION_ARGS)
if (on_error_stop && ctx.is_corrupt)
break;
}
+ /* Ensure that the stream is completely read */
+ Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ read_stream_end(stream);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9442a4841aa..b5047ebd594 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3626,6 +3626,7 @@ gtrgm_consistent_cache
gzFile
hashfunc
hbaPort
+heapamcheck_rs_perblock_data
heap_page_items_state
help_handler
hlCheck
--
2.47.2
On Thu, Mar 27, 2025 at 4:45 AM Nazir Bilal Yavuz <byavuz81@gmail.com> wrote:
I liked the first approach more. We can solve the first approach's
problems by introducing a void pointer to pass to
read_stream_begin_relation(). We can set it to &rsdata.range for the
SKIP_PAGES_NONE case and &rsdata for the rest.
Cool. I've gone with this approach but have renamed all the functions
and structs to try and be more consistent and clear.
Committed in 043799fa08c2c and I marked the commitfest entry as such.
- Melanie
On Thu, Mar 27, 2025 at 3:35 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
On Thu, Mar 27, 2025 at 4:45 AM Nazir Bilal Yavuz <byavuz81@gmail.com> wrote:
I liked the first approach more. We can solve the first approach's
problems by introducing a void pointer to pass to
read_stream_begin_relation(). We can set it to &rsdata.range for the
SKIP_PAGES_NONE case and &rsdata for the rest.Cool. I've gone with this approach but have renamed all the functions
and structs to try and be more consistent and clear.
Committed in 043799fa08c2c and I marked the commitfest entry as such.
Just my 0.2 cents. I also like the first approach even though I prefer
the v4 version, but anyway, thanks very much for reviewing and
committing!
(I was sending my anwer just when I received your reply)
--
Matheus Alcantara
On Thu, Mar 27, 2025 at 2:46 PM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:
Just my 0.2 cents. I also like the first approach even though I prefer
the v4 version, but anyway, thanks very much for reviewing and
committing!
Thanks for the patch!
FWIW, I strongly disliked about v4 that two separate parts of the
callback were responsible for advancing current_blocknum, one to
advance it past blocks we chose to skip and the other to advance it to
the next block.
for (; p->current_blocknum < p->last_exclusive; p->current_blocknum++)
and
if (p->current_blocknum < p->last_exclusive)
return p->current_blocknum++;
I found that alone to be undesirable, but once you add in
SKIP_PAGES_NONE, I think it was very hard to understand.
Besides that, when we implemented streaming read sequential scan, we
ended up making dedicated callbacks for the parallel and non-parallel
cases (see heap_scan_stream_read_next_parallel and
heap_scan_stream_read_next_serial) because it performed better than a
single combined callback with a branch. I didn't validate that amcheck
got the same performance benefit from the dedicated callbacks, but I
don't see why it would be any different.
- Melanie
On Thu, Mar 27, 2025 at 4:42 PM Melanie Plageman
<melanieplageman@gmail.com> wrote:
On Thu, Mar 27, 2025 at 2:46 PM Matheus Alcantara
<matheusssilv97@gmail.com> wrote:Just my 0.2 cents. I also like the first approach even though I prefer
the v4 version, but anyway, thanks very much for reviewing and
committing!Thanks for the patch!
FWIW, I strongly disliked about v4 that two separate parts of the
callback were responsible for advancing current_blocknum, one to
advance it past blocks we chose to skip and the other to advance it to
the next block.for (; p->current_blocknum < p->last_exclusive; p->current_blocknum++)
and
if (p->current_blocknum < p->last_exclusive)
return p->current_blocknum++;I found that alone to be undesirable, but once you add in
SKIP_PAGES_NONE, I think it was very hard to understand.Besides that, when we implemented streaming read sequential scan, we
ended up making dedicated callbacks for the parallel and non-parallel
cases (see heap_scan_stream_read_next_parallel and
heap_scan_stream_read_next_serial) because it performed better than a
single combined callback with a branch. I didn't validate that amcheck
got the same performance benefit from the dedicated callbacks, but I
don't see why it would be any different.
Yeah, it totally makes sense to me now, thanks very much for the details!
--
Matheus Alcantara