From b6affd2a778b7b4cff5738ad99f34ea21a816562 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Wed, 15 Dec 2021 10:28:49 +0530
Subject: [PATCH v1] Conveyor belt testing extention

This extention provide wrapper over the conveyor belt infrastructure
for testing the conveyor belt.
---
 contrib/pg_conveyor/Makefile                 |  23 +++
 contrib/pg_conveyor/expected/pg_conveyor.out | 185 ++++++++++++++++++++++++
 contrib/pg_conveyor/pg_conveyor--1.0.sql     |  32 +++++
 contrib/pg_conveyor/pg_conveyor.c            | 207 +++++++++++++++++++++++++++
 contrib/pg_conveyor/pg_conveyor.control      |   5 +
 contrib/pg_conveyor/sql/pg_conveyor.sql      | 125 ++++++++++++++++
 src/common/relpath.c                         |   3 +-
 src/include/common/relpath.h                 |   5 +-
 8 files changed, 582 insertions(+), 3 deletions(-)
 create mode 100644 contrib/pg_conveyor/Makefile
 create mode 100644 contrib/pg_conveyor/expected/pg_conveyor.out
 create mode 100644 contrib/pg_conveyor/pg_conveyor--1.0.sql
 create mode 100644 contrib/pg_conveyor/pg_conveyor.c
 create mode 100644 contrib/pg_conveyor/pg_conveyor.control
 create mode 100644 contrib/pg_conveyor/sql/pg_conveyor.sql

diff --git a/contrib/pg_conveyor/Makefile b/contrib/pg_conveyor/Makefile
new file mode 100644
index 0000000..8c29ffd
--- /dev/null
+++ b/contrib/pg_conveyor/Makefile
@@ -0,0 +1,23 @@
+# contrib/pg_conveyor/Makefile
+
+MODULE_big = pg_conveyor
+OBJS = \
+	$(WIN32RES) \
+	pg_conveyor.o
+
+EXTENSION = pg_conveyor
+DATA = pg_conveyor--1.0.sql
+PGFILEDESC = "pg_conveyor - conveyor belt test"
+
+REGRESS = pg_conveyor
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_conveyor
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_conveyor/expected/pg_conveyor.out b/contrib/pg_conveyor/expected/pg_conveyor.out
new file mode 100644
index 0000000..6ae5cd3
--- /dev/null
+++ b/contrib/pg_conveyor/expected/pg_conveyor.out
@@ -0,0 +1,185 @@
+CREATE EXTENSION pg_conveyor;
+CREATE TABLE test(a int);
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+ pg_conveyor_init 
+------------------
+ 
+(1 row)
+
+SELECT pg_conveyor_insert('test'::regclass::oid, 'test_data');
+ pg_conveyor_insert 
+--------------------
+ 
+(1 row)
+
+SELECT pg_conveyor_read('test'::regclass::oid, 0);
+ pg_conveyor_read 
+------------------
+ test_data
+(1 row)
+
+--CASE1
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..1000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+-- read from some random blocks
+SELECT pg_conveyor_read('test'::regclass::oid, 100);
+               pg_conveyor_read                
+-----------------------------------------------
+ test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa100
+(1 row)
+
+SELECT pg_conveyor_read('test'::regclass::oid, 800);
+               pg_conveyor_read                
+-----------------------------------------------
+ test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa800
+(1 row)
+
+--CASE2
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..5000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i+1000;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+SELECT pg_conveyor_read('test'::regclass::oid, 4000);
+                pg_conveyor_read                
+------------------------------------------------
+ test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa4000
+(1 row)
+
+SELECT pg_conveyor_read('test'::regclass::oid, 3000);
+                pg_conveyor_read                
+------------------------------------------------
+ test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa3000
+(1 row)
+
+--CASE3
+DROP TABLE test;
+CREATE TABLE test(a int);
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+ pg_conveyor_init 
+------------------
+ 
+(1 row)
+
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..50000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+--CASE4--(vacuum is failing)
+DROP TABLE test;
+CREATE TABLE test(a int);
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+ pg_conveyor_init 
+------------------
+ 
+(1 row)
+
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..5000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+SELECT pg_conveyor_truncate('test'::regclass::oid, 3000);
+ pg_conveyor_truncate 
+----------------------
+ 
+(1 row)
+
+--SELECT pg_conveyor_vacuum('test'::regclass::oid); //not implemented
+--CASE5
+DROP TABLE test;
+CREATE TABLE test(a int);
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+ pg_conveyor_init 
+------------------
+ 
+(1 row)
+
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..50000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+--CASE6 (multi truncate single vacuum)
+DROP TABLE test;
+CREATE TABLE test(a int);
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+ pg_conveyor_init 
+------------------
+ 
+(1 row)
+
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..1000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+SELECT pg_conveyor_truncate('test'::regclass::oid, 500);
+ pg_conveyor_truncate 
+----------------------
+ 
+(1 row)
+
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..1000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+SELECT pg_conveyor_truncate('test'::regclass::oid, 1800);
+ pg_conveyor_truncate 
+----------------------
+ 
+(1 row)
+
+SELECT pg_conveyor_vacuum('test'::regclass::oid);
+ pg_conveyor_vacuum 
+--------------------
+ 
+(1 row)
+
diff --git a/contrib/pg_conveyor/pg_conveyor--1.0.sql b/contrib/pg_conveyor/pg_conveyor--1.0.sql
new file mode 100644
index 0000000..301bb88
--- /dev/null
+++ b/contrib/pg_conveyor/pg_conveyor--1.0.sql
@@ -0,0 +1,32 @@
+/* contrib/pg_conveyor/pg_conveyor--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_conveyor" to load this file. \quit
+
+-- Initialize the conveyor belt for the relation.
+CREATE FUNCTION pg_conveyor_init(relid OID, blocks_per_seg int)
+RETURNS void
+AS 'MODULE_PATHNAME', 'pg_conveyor_init'
+LANGUAGE C STRICT;
+
+/* Insert given data in the relation's conveyor belt. */
+CREATE FUNCTION pg_conveyor_insert(relid OID, data TEXT)
+RETURNS void
+AS 'MODULE_PATHNAME', 'pg_conveyor_insert'
+LANGUAGE C STRICT;
+
+/* Read relation's conveyor belt data. */
+CREATE FUNCTION pg_conveyor_read(relid OID, blockno bigint)
+RETURNS TEXT
+AS 'MODULE_PATHNAME', 'pg_conveyor_read'
+LANGUAGE C STRICT;
+
+CREATE FUNCTION pg_conveyor_truncate(relid OID, blockno bigint)
+RETURNS void
+AS 'MODULE_PATHNAME', 'pg_conveyor_truncate'
+LANGUAGE C STRICT;
+
+CREATE FUNCTION pg_conveyor_vacuum(relid OID)
+RETURNS void
+AS 'MODULE_PATHNAME', 'pg_conveyor_vacuum'
+LANGUAGE C STRICT;
diff --git a/contrib/pg_conveyor/pg_conveyor.c b/contrib/pg_conveyor/pg_conveyor.c
new file mode 100644
index 0000000..c9e56c4
--- /dev/null
+++ b/contrib/pg_conveyor/pg_conveyor.c
@@ -0,0 +1,207 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_conveyor.c
+ *
+ * provide APIs over the conveyor belt infrastructure to create, insert and
+ * fetch the data from the conveyor belt.
+ *
+ * Copyright (c) 2016-2021, PostgreSQL Global Development Group
+ *
+ * contrib/pg_conveyor/pg_conveyor.c
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/conveyor.h"
+#include "access/relation.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/bufpage.h"
+#include "storage/smgr.h"
+#include "utils/builtins.h"
+#include "utils/rel.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(pg_conveyor_init);
+PG_FUNCTION_INFO_V1(pg_conveyor_insert);
+PG_FUNCTION_INFO_V1(pg_conveyor_read);
+PG_FUNCTION_INFO_V1(pg_conveyor_truncate);
+PG_FUNCTION_INFO_V1(pg_conveyor_vacuum);
+
+static ConveyorBelt*
+OpenConveyorBeltForRel(Relation rel)
+{
+	SMgrRelation	reln;
+
+	/* Open the relation at smgr level. */
+	reln = RelationGetSmgr(rel);
+
+	if (!smgrexists(reln, DEADTID_FORKNUM))
+		elog(ERROR, "conveyor belt not initialized for relid %u", RelationGetRelid(rel));
+
+	/* Open the conveyor belt. */
+	return ConveyorBeltOpen(rel, DEADTID_FORKNUM, CurrentMemoryContext);
+}
+
+/*
+ * Initialize a new conveyor belt for input relid.
+ */
+Datum
+pg_conveyor_init(PG_FUNCTION_ARGS)
+{
+	Oid	relid = PG_GETARG_OID(0);
+	int	block_per_seg = PG_GETARG_INT32(1);
+	SMgrRelation	reln;
+	Relation		rel;
+
+	rel = relation_open(relid, AccessShareLock);
+
+	/* Open the relation at smgr level. */
+	reln = RelationGetSmgr(rel);
+
+	/*
+	 * If the dead_tid fork doesn't exist then create it and initialize the
+	 * conveyor belt, otherwise just open the conveyor belt.
+	 */
+	if (!smgrexists(reln, DEADTID_FORKNUM))
+	{
+		smgrcreate(reln, DEADTID_FORKNUM, false);
+		ConveyorBeltInitialize(rel, DEADTID_FORKNUM, block_per_seg,
+							   CurrentMemoryContext);
+	}
+
+	relation_close(rel, AccessShareLock);
+
+	/* Nothing to return. */
+	PG_RETURN_VOID();
+}
+
+/*
+ * Insert input buffer data into the conveyor belt.
+ */
+Datum
+pg_conveyor_insert(PG_FUNCTION_ARGS)
+{
+	Oid	relid = PG_GETARG_OID(0);
+	char   *data = text_to_cstring(PG_GETARG_TEXT_PP(1));
+	Relation	rel;
+	ConveyorBelt   *cb;
+	CBPageNo		pageno;
+	Buffer			buffer;
+	PageHeader		phdr;
+	Page			page;
+	char		   *pagedata;
+	int				len = strlen(data);
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	cb = OpenConveyorBeltForRel(rel);
+
+	buffer = ConveyorBeltGetNewPage(cb, &pageno);
+	page = BufferGetPage(buffer);
+	pagedata = PageGetContents(page);
+	PageInit(page, BLCKSZ, 0);
+
+	if (len > (BLCKSZ) - MAXALIGN(SizeOfPageHeaderData))
+		elog(ERROR, "data too large");
+
+	phdr = (PageHeader) page;
+
+	START_CRIT_SECTION();
+	memcpy(pagedata, data, strlen(data));
+	phdr->pd_lower += strlen(data);
+	ConveyorBeltPerformInsert(cb, buffer);
+	END_CRIT_SECTION();
+
+	ConveyorBeltCleanupInsert(cb, buffer);
+
+	relation_close(rel, AccessExclusiveLock);
+
+	/* Nothing to return. */
+	PG_RETURN_VOID();
+}
+
+/*
+ * Read data from the conveyor belt's logical page .
+ */
+Datum
+pg_conveyor_read(PG_FUNCTION_ARGS)
+{
+	Oid		relid = PG_GETARG_OID(0);
+	CBPageNo		pageno = PG_GETARG_INT64(1);
+	CBPageNo		oldest_page;
+	CBPageNo		next_page;
+	Relation		rel;
+	ConveyorBelt   *cb;
+	Buffer			buffer;
+	char			pagedata[BLCKSZ];
+
+	rel = relation_open(relid, AccessShareLock);
+
+	cb = OpenConveyorBeltForRel(rel);
+
+	ConveyorBeltGetBounds(cb, &oldest_page, &next_page);
+	if (pageno < oldest_page || pageno >= next_page)
+		elog(ERROR, "conveyor belt pageno is out of bound");
+
+	buffer = ConveyorBeltReadBuffer(cb, pageno, BUFFER_LOCK_SHARE, NULL);
+	if (BufferIsInvalid(buffer))
+		elog(ERROR, "could not read data");
+
+	memcpy(pagedata, BufferGetPage(buffer), BLCKSZ);
+	UnlockReleaseBuffer(buffer);
+
+	relation_close(rel, AccessShareLock);
+
+	PG_RETURN_DATUM(CStringGetTextDatum((char *) PageGetContents((char *) pagedata)));
+}
+
+/*
+ * Truncate the conveyor belt wrapper.
+ */
+Datum
+pg_conveyor_truncate(PG_FUNCTION_ARGS)
+{
+	Oid		relid = PG_GETARG_OID(0);
+	CBPageNo		pageno = PG_GETARG_INT64(1);
+	CBPageNo		oldest_page;
+	CBPageNo		next_page;
+	Relation		rel;
+	ConveyorBelt   *cb;
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	cb = OpenConveyorBeltForRel(rel);
+
+	ConveyorBeltGetBounds(cb, &oldest_page, &next_page);
+	if (pageno < oldest_page || pageno >= next_page)
+		elog(ERROR, "conveyor belt pageno is out of bound");
+
+	ConveyorBeltLogicalTruncate(cb, pageno);
+	relation_close(rel, AccessExclusiveLock);
+
+	/* Nothing to return. */
+	PG_RETURN_VOID();
+}
+
+/*
+ * Vacuum conveyor belt wrapper.
+ */
+Datum
+pg_conveyor_vacuum(PG_FUNCTION_ARGS)
+{
+	Oid		relid = PG_GETARG_OID(0);
+	Relation		rel;
+	ConveyorBelt   *cb;
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	cb = OpenConveyorBeltForRel(rel);
+
+	ConveyorBeltVacuum(cb);
+	relation_close(rel, AccessExclusiveLock);
+
+	/* Nothing to return. */
+	PG_RETURN_VOID();
+}
diff --git a/contrib/pg_conveyor/pg_conveyor.control b/contrib/pg_conveyor/pg_conveyor.control
new file mode 100644
index 0000000..7e95dab
--- /dev/null
+++ b/contrib/pg_conveyor/pg_conveyor.control
@@ -0,0 +1,5 @@
+# pg_conveyor test extension
+comment = 'test conveyor'
+default_version = '1.0'
+module_pathname = '$libdir/pg_conveyor'
+relocatable = true
diff --git a/contrib/pg_conveyor/sql/pg_conveyor.sql b/contrib/pg_conveyor/sql/pg_conveyor.sql
new file mode 100644
index 0000000..a4bd146
--- /dev/null
+++ b/contrib/pg_conveyor/sql/pg_conveyor.sql
@@ -0,0 +1,125 @@
+CREATE EXTENSION pg_conveyor;
+
+CREATE TABLE test(a int);
+
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+SELECT pg_conveyor_insert('test'::regclass::oid, 'test_data');
+SELECT pg_conveyor_read('test'::regclass::oid, 0);
+
+--CASE1
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..1000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+
+-- read from some random blocks
+SELECT pg_conveyor_read('test'::regclass::oid, 100);
+SELECT pg_conveyor_read('test'::regclass::oid, 800);
+
+--CASE2
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..5000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i+1000;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+SELECT pg_conveyor_read('test'::regclass::oid, 4000);
+SELECT pg_conveyor_read('test'::regclass::oid, 3000);
+
+--CASE3
+DROP TABLE test;
+CREATE TABLE test(a int);
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..50000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+
+
+--CASE4--(vacuum is failing)
+DROP TABLE test;
+CREATE TABLE test(a int);
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..5000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+
+SELECT pg_conveyor_truncate('test'::regclass::oid, 3000);
+--SELECT pg_conveyor_vacuum('test'::regclass::oid); //not implemented
+
+--CASE5
+DROP TABLE test;
+CREATE TABLE test(a int);
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..50000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+
+--CASE6 (multi truncate single vacuum)
+DROP TABLE test;
+CREATE TABLE test(a int);
+SELECT pg_conveyor_init('test'::regclass::oid, 4);
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..1000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+
+SELECT pg_conveyor_truncate('test'::regclass::oid, 500);
+do $$
+<<first_block>>
+declare
+  i int := 0;
+  data varchar;
+begin
+  for i in 1..1000 loop
+	data := 'test_dataaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' || i;
+	PERFORM pg_conveyor_insert('test'::regclass::oid, data);
+  end loop;
+end first_block $$;
+
+SELECT pg_conveyor_truncate('test'::regclass::oid, 1800);
+SELECT pg_conveyor_vacuum('test'::regclass::oid);
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 1f5c426..20624e2 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -34,7 +34,8 @@ const char *const forkNames[] = {
 	"main",						/* MAIN_FORKNUM */
 	"fsm",						/* FSM_FORKNUM */
 	"vm",						/* VISIBILITYMAP_FORKNUM */
-	"init"						/* INIT_FORKNUM */
+	"init",						/* INIT_FORKNUM */
+	"tid"						/* DEADTID_FORKNUM */
 };
 
 StaticAssertDecl(lengthof(forkNames) == (MAX_FORKNUM + 1),
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index a44be11..0d38e07 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -43,7 +43,8 @@ typedef enum ForkNumber
 	MAIN_FORKNUM = 0,
 	FSM_FORKNUM,
 	VISIBILITYMAP_FORKNUM,
-	INIT_FORKNUM
+	INIT_FORKNUM,
+	DEADTID_FORKNUM
 
 	/*
 	 * NOTE: if you add a new fork, change MAX_FORKNUM and possibly
@@ -52,7 +53,7 @@ typedef enum ForkNumber
 	 */
 } ForkNumber;
 
-#define MAX_FORKNUM		INIT_FORKNUM
+#define MAX_FORKNUM		DEADTID_FORKNUM
 
 #define FORKNAMECHARS	4		/* max chars for a fork name */
 
-- 
1.8.3.1

