[WIP] Performance Improvement by reducing WAL for Update Operation
Problem statement:
-----------------------------------
Reducing wal size for an update operation for performance improvement.
Advantages:
---------------------
1. Observed increase in performance with pgbench when server is running in sync_commit off mode.
a. with pgbench (tpc_b) - 13%
b. with modified pgbench (such that size of modified columns are less than all row) - 83%
2. WAL size is reduced
Design/Impementation:
------------------------------
Currently the change is done only for fixed length columns for simple tables and the tuple should not contain NULLS.
This is a Proof of concept, the design and implementation needs to be changed based on final design required for handling other scenario's
Update operation:
-----------------------------
1. Check for the simple table or not.(No toast, No before update triggers)
2. Works only for not null tuples.
3. Identify the modified columns from the target entry.
4. Based on the modified column list, check for any variable length columns are modified, if so this optimization is not applied.
5. Identify the offset and length for the modified columns and store it as an optimized WAL tuple in the following format.
Note: Wal update header is modified to denote whether wal update optimization is done or not.
WAL update header + Tuple header(no change from previous format) +
[offset(2bytes)] [length(2 bytes)] [changed data value]
[offset(2bytes)] [length(2 bytes)] [changed data value]
....
....
Recovery:
----------------
The following steps are only incase of the tuple is optimized.
6. For forming the new tuple, old tuple is required.(including if the old tuple does not require any modifications also).
7. Form the new tuple based on the format specified in the 5th point.
8. once new tuple is framed, follow the exisiting behavior.
Frame the new tuple from old tuple and WAL record:
1. The length of the data which is needs to be copied from old tuple is calculated as
the difference of offset present in the WAL record and the old tuple offset.
(for the first time, the old tuple offset value is zero)
2. Once the old tuple data copied, then increase the offset for old tuple by the
copied length.
3. Get the length and value of modified column from WAL record, copy it into new tuple.
4. Increase the old tuple offset with the modified column length.
5. Repeat this procedure until the WAL record reaches the end.
6. If any remaining left out old tuple data will be copied.
Test results:
----------------------
1. The pgbench test run for 10min.
2. pgbench result for tpc-b is attached with this mail as pgbench_org
3. modified pgbench(such that size of modified columns are less than all row) result for tpc-b is attached with this mail as pgbench_1800_300
Modified pgbench code:
---------------------------------------
1. Schema of the tables are modified as added some extra fields to increase the record size to 1800.
2. The tcp_b benchmark suite to do only update operations.
3. The update operation changed as to update 3 columns with 300 bytes out of total size of 1800 bytes.
4. During initialization of tables removed the NULL value insertions.
I am working on solution to handle other scenarios like variable length columns, tuple contain NULLs, handling for before triggers.
Please provide suggestions/objections?
With Regards,
Amit Kapila.
Attachments:
wal_update_changes.patchtext/plain; name=wal_update_changes.patchDownload
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f28026b..27efa6b 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -53,6 +53,7 @@
#include "access/xlogutils.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
+#include "catalog/pg_type.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
@@ -84,7 +85,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
ItemPointerData from, Buffer newbuf, HeapTuple newtup,
- bool all_visible_cleared, bool new_all_visible_cleared);
+ bool all_visible_cleared, bool new_all_visible_cleared,
+ bool diff_update);
static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
HeapTuple oldtup, HeapTuple newtup);
@@ -2673,6 +2675,77 @@ simple_heap_delete(Relation relation, ItemPointer tid)
}
/*
+ * get_tuple_info - Gets the tuple offset and value.
+ *
+ * calculates the attribute value and offset, where the attribute ends in the
+ * tuple based on the attribute number and previous fetched attribute info.
+ *
+ * offset (I/P and O/P variable) - Input as end of previous attribute offset
+ * and incase if it is a first attribute then it's value is zero.
+ * Output as end of the current attribute in the tuple.
+ * usecacheoff (I/P and O/P variable) - Attribute cacheoff can be used or not.
+ */
+static void
+get_tuple_info(Form_pg_attribute *att, HeapTuple tuple, bits8 *bp,
+ bool hasnulls, int attnum, Datum *value, uint16 *offset,
+ bool *usecacheoff)
+{
+ Form_pg_attribute thisatt = att[attnum];
+ uint16 off = *offset;
+ bool slow = *usecacheoff;
+ char *tp;
+ HeapTupleHeader tup = tuple->t_data;
+
+ tp = (char *) tup + tup->t_hoff;
+
+ if (hasnulls && att_isnull(attnum, bp))
+ {
+ slow = true; /* can't use attcacheoff anymore */
+ return;
+ }
+
+ if (!slow && thisatt->attcacheoff >= 0)
+ off = thisatt->attcacheoff;
+ else if (thisatt->attlen == -1)
+ {
+ /*
+ * We can only cache the offset for a varlena attribute if the offset
+ * is already suitably aligned, so that there would be no pad bytes in
+ * any case: then the offset will be valid for either an aligned or
+ * unaligned value.
+ */
+ if (!slow &&
+ off == att_align_nominal(off, thisatt->attalign))
+ thisatt->attcacheoff = off;
+ else
+ {
+ off = att_align_pointer(off, thisatt->attalign, -1,
+ tp + off);
+ slow = true;
+ }
+ }
+ else
+ {
+ /* not varlena, so safe to use att_align_nominal */
+ off = att_align_nominal(off, thisatt->attalign);
+
+ if (!slow)
+ thisatt->attcacheoff = off;
+ }
+
+ *value = fetchatt(thisatt, tp + off);
+
+ off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+
+ if (thisatt->attlen <= 0)
+ slow = true; /* can't use attcacheoff anymore */
+
+ *offset = off;
+ *usecacheoff = slow;
+}
+
+
+/*
* heap_update - replace a tuple
*
* NB: do not call this directly unless you are prepared to deal with
@@ -2707,7 +2780,8 @@ simple_heap_delete(Relation relation, ItemPointer tid)
HTSU_Result
heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
ItemPointer ctid, TransactionId *update_xmax,
- CommandId cid, Snapshot crosscheck, bool wait)
+ CommandId cid, Snapshot crosscheck, Bitmapset *modifiedCols,
+ bool wait)
{
HTSU_Result result;
TransactionId xid = GetCurrentTransactionId();
@@ -2715,6 +2789,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
ItemId lp;
HeapTupleData oldtup;
HeapTuple heaptup;
+ HeapTupleData redotup;
Page page;
BlockNumber block;
Buffer buffer,
@@ -2730,6 +2805,14 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
bool use_hot_update = false;
bool all_visible_cleared = false;
bool all_visible_cleared_new = false;
+ struct
+ {
+ HeapTupleHeaderData hdr;
+ char data[MaxHeapTupleSize];
+ } tbuf;
+ char *data;
+ bool diff_update = false;
+ uint16 offset = 0;
Assert(ItemPointerIsValid(otid));
@@ -3098,6 +3181,132 @@ l2:
PageSetFull(page);
}
+ if (modifiedCols)
+ {
+ Form_pg_attribute *att = relation->rd_att->attrs;
+ int numberOfAttributes;
+ uint16 newOffset = 0;
+ int attnum;
+ HeapTupleHeader newtuphdr = heaptup->t_data;
+ bits8 *new_bp = newtuphdr->t_bits;
+ bool old_hasnulls = HeapTupleHasNulls(&oldtup);
+ bool new_hasnulls = HeapTupleHasNulls(heaptup);
+ bool new_usecacheoff = false;
+ Datum new_value;
+ uint16 data_length;
+
+ /* For NULL value tuples, don't use the optimized path */
+ if (old_hasnulls || new_hasnulls)
+ {
+ goto record_insert;
+ }
+
+ numberOfAttributes = HeapTupleHeaderGetNatts(newtuphdr);
+
+ /*
+ * Skip the WAL record header for now and frame data of optimized WAL
+ * update record.
+ */
+ data = (char *) &tbuf.hdr;
+ offset = newtuphdr->t_hoff;
+
+ for (attnum = 0; attnum < numberOfAttributes; attnum++)
+ {
+ /*
+ * If the attribute is modified by the update operation, store the
+ * appropiate offsets in the WAL record, otherwise skip to the
+ * next attribute.
+ */
+ if (bms_is_member((attnum + 1) - FirstLowInvalidHeapAttributeNumber,
+ modifiedCols))
+ {
+ /*
+ * calculate the offset where the modified attribute starts in
+ * the new tuple used to store in the WAL record, this will be
+ * used to traverse the old tuple during recovery.
+ */
+ newOffset = att_align_nominal(newOffset, att[attnum]->attalign);
+ offset = SHORTALIGN(offset);
+
+ memcpy((data + offset), &newOffset, sizeof(uint16));
+ offset += sizeof(uint16);
+
+ /* get the attribute value and end offset for same */
+ get_tuple_info(att, heaptup, new_bp, new_hasnulls, attnum,
+ &new_value, &newOffset, &new_usecacheoff);
+
+ /* Increment the offset to store the data of modified column */
+ offset += sizeof(uint16);
+
+ if (att[attnum]->attbyval)
+ {
+ /* pass-by-value */
+ data_length = att[attnum]->attlen;
+ store_att_byval((data + offset), new_value, data_length);
+ }
+ else
+ {
+ if (BPCHAROID == att[attnum]->atttypid)
+ {
+ /* varlena */
+ Pointer val = DatumGetPointer(new_value);
+
+ if (VARATT_IS_SHORT(val))
+ {
+ /* no alignment for short varlenas */
+ data_length = VARSIZE_SHORT(val);
+ memcpy((data + offset), val, data_length);
+ }
+ else if ((att[attnum]->attstorage != 'p')
+ && VARATT_CAN_MAKE_SHORT(val))
+ {
+ /* convert to short varlena -- no alignment */
+ data_length = VARATT_CONVERTED_SHORT_SIZE(val);
+ SET_VARSIZE_SHORT(data, data_length);
+ memcpy((data + offset + 1),
+ VARDATA(val),
+ (data_length - 1));
+ }
+ else
+ {
+ /* full 4-byte header varlena */
+ data_length = VARSIZE(val);
+ memcpy((data + offset), val, data_length);
+ }
+ }
+ else
+ {
+ /* Not a BPCHAR, proceed without optimization */
+ goto record_insert;
+ }
+ }
+
+ /* Store the length of the modified attribute */
+ memcpy((data + offset - sizeof(uint16)),
+ &data_length,
+ sizeof(uint16));
+ offset += data_length;
+ }
+ else
+ {
+ get_tuple_info(att, heaptup, new_bp, new_hasnulls, attnum,
+ &new_value, &newOffset, &new_usecacheoff);
+ }
+ }
+
+ /*
+ * FIXME: At the end of calculating the optimization tuple, if the
+ * optimized tuple length is more than 3/4 of the original tuple then
+ * ignore the optimization.
+ */
+ if (offset < ((heaptup->t_len >> 1) + (heaptup->t_len >> 2)))
+ {
+ diff_update = true;
+ }
+ }
+
+record_insert:;
+
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
@@ -3173,10 +3382,27 @@ l2:
/* XLOG stuff */
if (RelationNeedsWAL(relation))
{
- XLogRecPtr recptr = log_heap_update(relation, buffer, oldtup.t_self,
- newbuf, heaptup,
- all_visible_cleared,
- all_visible_cleared_new);
+ XLogRecPtr recptr;
+
+ if (diff_update)
+ {
+ /* Copy the tuple header to the WAL tuple */
+ memcpy(&tbuf.hdr, heaptup->t_data, heaptup->t_data->t_hoff);
+ redotup.t_len = offset;
+ redotup.t_data = (HeapTupleHeader) &tbuf;
+ redotup.t_self = heaptup->t_self;
+ redotup.t_tableOid = heaptup->t_tableOid;
+ }
+ else
+ {
+ memcpy(&redotup, heaptup, sizeof(HeapTupleData));
+ }
+
+ recptr = log_heap_update(relation, buffer, oldtup.t_self,
+ newbuf, &redotup,
+ all_visible_cleared,
+ all_visible_cleared_new,
+ diff_update);
if (newbuf != buffer)
{
@@ -3363,6 +3589,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
result = heap_update(relation, otid, tup,
&update_ctid, &update_xmax,
GetCurrentCommandId(true), InvalidSnapshot,
+ NULL,
true /* wait for commit */ );
switch (result)
{
@@ -4407,7 +4634,8 @@ log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,
static XLogRecPtr
log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
Buffer newbuf, HeapTuple newtup,
- bool all_visible_cleared, bool new_all_visible_cleared)
+ bool all_visible_cleared, bool new_all_visible_cleared,
+ bool diff_update)
{
xl_heap_update xlrec;
xl_heap_header xlhdr;
@@ -4426,9 +4654,15 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
xlrec.target.node = reln->rd_node;
xlrec.target.tid = from;
- xlrec.all_visible_cleared = all_visible_cleared;
+ xlrec.diff_update = diff_update;
xlrec.newtid = newtup->t_self;
- xlrec.new_all_visible_cleared = new_all_visible_cleared;
+
+ /*
+ * MSB 4 bits tells PD_ALL_VISIBLE was cleared of new page and rest 4 bits
+ * for the old page
+ */
+ xlrec.new_all_visible_cleared |= all_visible_cleared;
+ xlrec.new_all_visible_cleared |= new_all_visible_cleared << 4;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapUpdate;
@@ -5217,14 +5451,20 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
} tbuf;
xl_heap_header xlhdr;
int hsize;
- uint32 newlen;
+ uint32 newlen = 0;
Size freespace;
+ bool old_tup_modify = true; /* flag used to indicate, whether old
+ * tuple needs the modification or not */
+
+ /* Initialize the buffer, used to frame the new tuple */
+ MemSet((char *) &tbuf.hdr, 0, sizeof(HeapTupleHeaderData));
+ hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->all_visible_cleared)
+ if (xlrec->new_all_visible_cleared & 0x0F)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
@@ -5240,16 +5480,32 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
{
if (samepage)
return; /* backup block covered both changes */
- goto newt;
+
+ /* Need the old page to read the old tuple data, no update required */
+ if (!xlrec->diff_update)
+ goto newt;
+
+ old_tup_modify = false;
}
/* Deal with old tuple version */
-
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
+ {
+ /*
+ * Incase of diff update, if the old buffer is not available raise a
+ * panic as diff update needs the old buffer to frame the new tuple.
+ */
+ if (xlrec->diff_update)
+ {
+ elog(PANIC, "heap_update_redo: invalid buffer");
+ }
+
goto newt;
+ }
+
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
@@ -5257,7 +5513,12 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
UnlockReleaseBuffer(buffer);
if (samepage)
return;
- goto newt;
+
+ /* Need the old page to read the old tuple data, no update required */
+ if (!xlrec->diff_update)
+ goto newt;
+
+ old_tup_modify = false;
}
offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
@@ -5269,25 +5530,103 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
htup = (HeapTupleHeader) PageGetItem(page, lp);
- htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID |
- HEAP_XMAX_IS_MULTI |
- HEAP_IS_LOCKED |
- HEAP_MOVED);
- if (hot_update)
- HeapTupleHeaderSetHotUpdated(htup);
- else
- HeapTupleHeaderClearHotUpdated(htup);
- HeapTupleHeaderSetXmax(htup, record->xl_xid);
- HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
- /* Set forward chain link in t_ctid */
- htup->t_ctid = xlrec->newtid;
+ if (xlrec->diff_update)
+ {
+ uint16 len = 0,
+ data_length,
+ oldoffset = 0;
+ uint32 t_length;
+ char *olddata = (char *) htup + htup->t_hoff;
+ char *data = (char *) &tbuf.hdr + htup->t_hoff;
+ char *redodata = (char *) xlrec + hsize + htup->t_hoff
+ - offsetof(HeapTupleHeaderData, t_bits);
- /* Mark the page as a candidate for pruning */
- PageSetPrunable(page, record->xl_xid);
+ /*
+ * Frame the new tuple from old and wal tuple
+ *
+ * Get the data start pointer from old and redo data.
+ *
+ * calculate the tuple length from old tuple and redo record length.
+ *
+ * The redo data is in the format as "offset + length + new data"
+ *
+ * The first offset in the redo record gives the offset where the
+ * modification starts, the same will give you the length of the data
+ * needs to be copied from old tuple.
+ *
+ * Once the old tuple data copied, then increase the offset by the
+ * copied length.
+ *
+ * Get the length and value of modified column from wal tuple and
+ * increase the old tuple offset also with the modified column length.
+ *
+ * Repeat this procedure until the wal tuple reaches the end.
+ */
- if (xlrec->all_visible_cleared)
- PageClearAllVisible(page);
+ newlen = record->xl_len - hsize;
+ Assert(newlen <= MaxHeapTupleSize);
+
+ t_length = ItemIdGetLength(lp) - htup->t_hoff;
+ newlen -= (htup->t_hoff - offsetof(HeapTupleHeaderData, t_bits));
+ len = 0;
+
+ /* Frame the new tuple from the old and WAL tuples */
+ while (len < newlen)
+ {
+ data_length = *(uint16 *) (redodata + len) - oldoffset;
+
+ /* Copy the old tuple data */
+ memcpy(data, (olddata + oldoffset), data_length);
+ data += data_length;
+ oldoffset += data_length;
+
+ len += sizeof(uint16);
+ data_length = *(uint16 *) (redodata + len);
+ oldoffset += data_length;
+
+ len += sizeof(uint16);
+
+ /* Copy the modified attribute data from WAL tuple */
+ memcpy(data, (redodata + len), data_length);
+
+ data += data_length;
+ len += data_length;
+
+ len = SHORTALIGN(len);
+ }
+
+ /* Copy the remaining old tuple data to the new tuple */
+ if (oldoffset < t_length)
+ {
+ memcpy(data, (olddata + oldoffset), (t_length - oldoffset));
+ }
+
+ newlen = t_length
+ + (htup->t_hoff - offsetof(HeapTupleHeaderData, t_bits));
+ }
+
+ if (old_tup_modify)
+ {
+ htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
+ HEAP_MOVED);
+ if (hot_update)
+ HeapTupleHeaderSetHotUpdated(htup);
+ else
+ HeapTupleHeaderClearHotUpdated(htup);
+ HeapTupleHeaderSetXmax(htup, record->xl_xid);
+ HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+ /* Set forward chain link in t_ctid */
+ htup->t_ctid = xlrec->newtid;
+
+ /* Mark the page as a candidate for pruning */
+ PageSetPrunable(page, record->xl_xid);
+
+ if (xlrec->new_all_visible_cleared & 0x0F)
+ PageClearAllVisible(page);
+ }
/*
* this test is ugly, but necessary to avoid thinking that insert change
@@ -5295,9 +5634,14 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
*/
if (samepage)
goto newsame;
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
+
+ if (old_tup_modify)
+ {
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+
UnlockReleaseBuffer(buffer);
/* Deal with new tuple */
@@ -5308,7 +5652,7 @@ newt:;
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->new_all_visible_cleared)
+ if ((xlrec->new_all_visible_cleared >> 4) & 0x0F)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
@@ -5355,19 +5699,23 @@ newsame:;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_update_redo: invalid max offset number");
- hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
-
- newlen = record->xl_len - hsize;
- Assert(newlen <= MaxHeapTupleSize);
memcpy((char *) &xlhdr,
(char *) xlrec + SizeOfHeapUpdate,
SizeOfHeapHeader);
+
htup = &tbuf.hdr;
- MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
- /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
- memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
- (char *) xlrec + hsize,
- newlen);
+
+ if (!xlrec->diff_update)
+ {
+ newlen = record->xl_len - hsize;
+ Assert(newlen <= MaxHeapTupleSize);
+
+ /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+ memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+ (char *) xlrec + hsize,
+ newlen);
+ }
+
newlen += offsetof(HeapTupleHeaderData, t_bits);
htup->t_infomask2 = xlhdr.t_infomask2;
htup->t_infomask = xlhdr.t_infomask;
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index a7bce75..4c22aea 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -48,6 +48,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/tqual.h"
+#include "parser/parsetree.h"
/*
@@ -478,12 +479,14 @@ ExecUpdate(ItemPointer tupleid,
bool canSetTag)
{
HeapTuple tuple;
+ HeapTuple old_tuple;
ResultRelInfo *resultRelInfo;
Relation resultRelationDesc;
HTSU_Result result;
ItemPointerData update_ctid;
TransactionId update_xmax;
List *recheckIndexes = NIL;
+ Bitmapset *modifiedCols = NULL;
/*
* abort the operation if not running transactions
@@ -495,7 +498,7 @@ ExecUpdate(ItemPointer tupleid,
* get the heap tuple out of the tuple table slot, making sure we have a
* writable copy
*/
- tuple = ExecMaterializeSlot(slot);
+ tuple = old_tuple = ExecMaterializeSlot(slot);
/*
* get information on the (current) result relation
@@ -553,6 +556,13 @@ lreplace:;
if (resultRelationDesc->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
+ if ((resultRelationDesc->rd_toastoid == InvalidOid)
+ && (old_tuple == tuple))
+ {
+ modifiedCols = (rt_fetch(resultRelInfo->ri_RangeTableIndex,
+ estate->es_range_table)->modifiedCols);
+ }
+
/*
* replace the heap tuple
*
@@ -566,6 +576,7 @@ lreplace:;
&update_ctid, &update_xmax,
estate->es_output_cid,
estate->es_crosscheck_snapshot,
+ modifiedCols,
true /* wait for commit */ );
switch (result)
{
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 660a854..5e91ba8 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -105,7 +105,8 @@ extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
HeapTuple newtup,
ItemPointer ctid, TransactionId *update_xmax,
- CommandId cid, Snapshot crosscheck, bool wait);
+ CommandId cid, Snapshot crosscheck, Bitmapset *modifiedCols,
+ bool wait);
extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
Buffer *buffer, ItemPointer ctid,
TransactionId *update_xmax, CommandId cid,
diff --git a/src/include/access/htup.h b/src/include/access/htup.h
index b289e14..f5c08ed 100644
--- a/src/include/access/htup.h
+++ b/src/include/access/htup.h
@@ -692,13 +692,19 @@ typedef struct xl_multi_insert_tuple
#define SizeOfMultiInsertTuple (offsetof(xl_multi_insert_tuple, t_hoff) + sizeof(uint8))
-/* This is what we need to know about update|hot_update */
+/* This is what we need to know about update|hot_update|optimized_update */
typedef struct xl_heap_update
{
xl_heaptid target; /* deleted tuple id */
ItemPointerData newtid; /* new inserted tuple id */
- bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */
- bool new_all_visible_cleared; /* same for the page of newtid */
+ bool diff_update; /* optimized update or not */
+ /*
+ * To keep the structure size same all_visible_cleared is merged with
+ * new_all_visible_cleared.
+ */
+ bool new_all_visible_cleared; /* MSB 4 bits tells PD_ALL_VISIBLE was
+ cleared of new page and rest 4 bits
+ for the old page */
/* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */
} xl_heap_update;
pgbench_modified.ctext/plain; name=pgbench_modified.cDownload
/*
* pgbench.c
*
* A simple benchmark program for PostgreSQL
* Originally written by Tatsuo Ishii and enhanced by many contributors.
*
* contrib/pgbench/pgbench.c
* Copyright (c) 2000-2012, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
#ifdef WIN32
#define FD_SETSIZE 1024 /* set before winsock2.h is included */
#endif /* ! WIN32 */
#include "postgres_fe.h"
#include "getopt_long.h"
#include "libpq-fe.h"
#include "libpq/pqsignal.h"
#include "portability/instr_time.h"
#include <ctype.h>
#ifndef WIN32
#include <sys/time.h>
#include <unistd.h>
#endif /* ! WIN32 */
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h> /* for getrlimit */
#endif
#ifndef INT64_MAX
#define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF)
#endif
/*
* Multi-platform pthread implementations
*/
#ifdef WIN32
/* Use native win32 threads on Windows */
typedef struct win32_pthread *pthread_t;
typedef int pthread_attr_t;
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
static int pthread_join(pthread_t th, void **thread_return);
#elif defined(ENABLE_THREAD_SAFETY)
/* Use platform-dependent pthread capability */
#include <pthread.h>
#else
/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
#include <sys/wait.h>
#define pthread_t pg_pthread_t
#define pthread_attr_t pg_pthread_attr_t
#define pthread_create pg_pthread_create
#define pthread_join pg_pthread_join
typedef struct fork_pthread *pthread_t;
typedef int pthread_attr_t;
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
static int pthread_join(pthread_t th, void **thread_return);
#endif
extern char *optarg;
extern int optind;
/********************************************************************
* some configurable parameters */
/* max number of clients allowed */
#ifdef FD_SETSIZE
#define MAXCLIENTS (FD_SETSIZE - 10)
#else
#define MAXCLIENTS 1024
#endif
#define DEFAULT_NXACTS 10 /* default nxacts */
int nxacts = 0; /* number of transactions per client */
int duration = 0; /* duration in seconds */
/*
* scaling factor. for example, scale = 10 will make 1000000 tuples in
* pgbench_accounts table.
*/
int scale = 1;
/*
* fillfactor. for example, fillfactor = 90 will use only 90 percent
* space during inserts and leave 10 percent free.
*/
int fillfactor = 100;
/*
* create foreign key constraints on the tables?
*/
int foreign_keys = 0;
/*
* use unlogged tables?
*/
int unlogged_tables = 0;
/*
* tablespace selection
*/
char *tablespace = NULL;
char *index_tablespace = NULL;
/*
* end of configurable parameters
*********************************************************************/
#define nbranches 1 /* Makes little sense to change this. Change
* -s instead */
#define ntellers 10
#define naccounts 100000
bool use_log; /* log transaction latencies to a file */
bool is_connect; /* establish connection for each transaction */
bool is_latencies; /* report per-command latencies */
int main_pid; /* main process id used in log filename */
char *pghost = "";
char *pgport = "";
char *login = NULL;
char *dbName;
const char *progname;
volatile bool timer_exceeded = false; /* flag from signal handler */
/* variable definitions */
typedef struct
{
char *name; /* variable name */
char *value; /* its value */
} Variable;
#define MAX_FILES 128 /* max number of SQL script files allowed */
#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
/*
* structures used in custom query mode
*/
typedef struct
{
PGconn *con; /* connection handle to DB */
int id; /* client No. */
int state; /* state No. */
int cnt; /* xacts count */
int ecnt; /* error count */
int listen; /* 0 indicates that an async query has been
* sent */
int sleeping; /* 1 indicates that the client is napping */
int64 until; /* napping until (usec) */
Variable *variables; /* array of variable definitions */
int nvariables;
instr_time txn_begin; /* used for measuring transaction latencies */
instr_time stmt_begin; /* used for measuring statement latencies */
int use_file; /* index in sql_files for this client */
bool prepared[MAX_FILES];
} CState;
/*
* Thread state and result
*/
typedef struct
{
int tid; /* thread id */
pthread_t thread; /* thread handle */
CState *state; /* array of CState */
int nstate; /* length of state[] */
instr_time start_time; /* thread start time */
instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
int *exec_count; /* number of cmd executions (per Command) */
unsigned short random_state[3]; /* separate randomness for each thread */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
typedef struct
{
instr_time conn_time;
int xacts;
} TResult;
/*
* queries read from files
*/
#define SQL_COMMAND 1
#define META_COMMAND 2
#define MAX_ARGS 10
typedef enum QueryMode
{
QUERY_SIMPLE, /* simple query */
QUERY_EXTENDED, /* extended query */
QUERY_PREPARED, /* extended query with prepared statements */
NUM_QUERYMODE
} QueryMode;
static QueryMode querymode = QUERY_SIMPLE;
static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
typedef struct
{
char *line; /* full text of command line */
int command_num; /* unique index of this Command struct */
int type; /* command type (SQL_COMMAND or META_COMMAND) */
int argc; /* number of command words */
char *argv[MAX_ARGS]; /* command word list */
} Command;
static Command **sql_files[MAX_FILES]; /* SQL script files */
static int num_files; /* number of script files */
static int num_commands = 0; /* total number of Command structs */
static int debug = 0; /* debug flag */
/* default scenario */
static char *tpc_b = {
"\\set nbranches " CppAsString2(nbranches) " * :scale\n"
"\\set ntellers " CppAsString2(ntellers) " * :scale\n"
"\\set naccounts " CppAsString2(naccounts) " * :scale\n"
"\\setrandom aid 1 :naccounts\n"
"\\setrandom bid 1 :nbranches\n"
"\\setrandom tid 1 :ntellers\n"
"\\setrandom delta -5000 5000\n"
"BEGIN;\n"
"UPDATE pgbench_accounts SET abalance = abalance + :delta,"
"filler = filler, filler1 = filler1 WHERE aid = :aid;\n"
"UPDATE pgbench_tellers SET tbalance = tbalance + :delta,"
"filler = filler, filler1 = filler1 WHERE tid = :tid;\n"
"UPDATE pgbench_branches SET bbalance = bbalance + :delta,"
"filler = filler, filler1 = filler1 WHERE bid = :bid;\n"
"END;\n"
};
/* -N case */
static char *simple_update = {
"\\set nbranches " CppAsString2(nbranches) " * :scale\n"
"\\set ntellers " CppAsString2(ntellers) " * :scale\n"
"\\set naccounts " CppAsString2(naccounts) " * :scale\n"
"\\setrandom aid 1 :naccounts\n"
"\\setrandom bid 1 :nbranches\n"
"\\setrandom tid 1 :ntellers\n"
"\\setrandom delta -5000 5000\n"
"BEGIN;\n"
"UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
"INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
"END;\n"
};
/* -S case */
static char *select_only = {
"\\set naccounts " CppAsString2(naccounts) " * :scale\n"
"\\setrandom aid 1 :naccounts\n"
"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
};
/* Function prototypes */
static void setalarm(int seconds);
static void *threadRun(void *arg);
/*
* routines to check mem allocations and fail noisily.
*/
static void *
xmalloc(size_t size)
{
void *result;
result = malloc(size);
if (!result)
{
fprintf(stderr, "out of memory\n");
exit(1);
}
return result;
}
static void *
xrealloc(void *ptr, size_t size)
{
void *result;
result = realloc(ptr, size);
if (!result)
{
fprintf(stderr, "out of memory\n");
exit(1);
}
return result;
}
static char *
xstrdup(const char *s)
{
char *result;
result = strdup(s);
if (!result)
{
fprintf(stderr, "out of memory\n");
exit(1);
}
return result;
}
static void
usage(void)
{
printf("%s is a benchmarking tool for PostgreSQL.\n\n"
"Usage:\n"
" %s [OPTION]... [DBNAME]\n"
"\nInitialization options:\n"
" -i invokes initialization mode\n"
" -n do not run VACUUM after initialization\n"
" -F NUM fill factor\n"
" -s NUM scaling factor\n"
" --foreign-keys\n"
" create foreign key constraints between tables\n"
" --index-tablespace=TABLESPACE\n"
" create indexes in the specified tablespace\n"
" --tablespace=TABLESPACE\n"
" create tables in the specified tablespace\n"
" --unlogged-tables\n"
" create tables as unlogged tables\n"
"\nBenchmarking options:\n"
" -c NUM number of concurrent database clients (default: 1)\n"
" -C establish new connection for each transaction\n"
" -D VARNAME=VALUE\n"
" define variable for use by custom script\n"
" -f FILENAME read transaction script from FILENAME\n"
" -j NUM number of threads (default: 1)\n"
" -l write transaction times to log file\n"
" -M simple|extended|prepared\n"
" protocol for submitting queries to server (default: simple)\n"
" -n do not run VACUUM before tests\n"
" -N do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
" -r report average latency per command\n"
" -s NUM report this scale factor in output\n"
" -S perform SELECT-only transactions\n"
" -t NUM number of transactions each client runs (default: 10)\n"
" -T NUM duration of benchmark test in seconds\n"
" -v vacuum all four standard tables before tests\n"
"\nCommon options:\n"
" -d print debugging output\n"
" -h HOSTNAME database server host or socket directory\n"
" -p PORT database server port number\n"
" -U USERNAME connect as specified database user\n"
" -V, --version output version information, then exit\n"
" -?, --help show this help, then exit\n"
"\n"
"Report bugs to <pgsql-bugs@postgresql.org>.\n",
progname, progname);
}
/* random number generator: uniform distribution from min to max inclusive */
static int
getrand(TState *thread, int min, int max)
{
/*
* Odd coding is so that min and max have approximately the same chance of
* being selected as do numbers between them.
*
* pg_erand48() is thread-safe and concurrent, which is why we use it
* rather than random(), which in glibc is non-reentrant, and therefore
* protected by a mutex, and therefore a bottleneck on machines with many
* CPUs.
*/
return min + (int) ((max - min + 1) * pg_erand48(thread->random_state));
}
/* call PQexec() and exit() on failure */
static void
executeStatement(PGconn *con, const char *sql)
{
PGresult *res;
res = PQexec(con, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
}
/* set up a connection to the backend */
static PGconn *
doConnect(void)
{
PGconn *conn;
static char *password = NULL;
bool new_pass;
/*
* Start the connection. Loop until we have a password if requested by
* backend.
*/
do
{
#define PARAMS_ARRAY_SIZE 7
const char *keywords[PARAMS_ARRAY_SIZE];
const char *values[PARAMS_ARRAY_SIZE];
keywords[0] = "host";
values[0] = pghost;
keywords[1] = "port";
values[1] = pgport;
keywords[2] = "user";
values[2] = login;
keywords[3] = "password";
values[3] = password;
keywords[4] = "dbname";
values[4] = dbName;
keywords[5] = "fallback_application_name";
values[5] = progname;
keywords[6] = NULL;
values[6] = NULL;
new_pass = false;
conn = PQconnectdbParams(keywords, values, true);
if (!conn)
{
fprintf(stderr, "Connection to database \"%s\" failed\n",
dbName);
return NULL;
}
if (PQstatus(conn) == CONNECTION_BAD &&
PQconnectionNeedsPassword(conn) &&
password == NULL)
{
PQfinish(conn);
password = simple_prompt("Password: ", 100, false);
new_pass = true;
}
} while (new_pass);
/* check to see that the backend connection was successfully made */
if (PQstatus(conn) == CONNECTION_BAD)
{
fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
dbName, PQerrorMessage(conn));
PQfinish(conn);
return NULL;
}
return conn;
}
/* throw away response from backend */
static void
discard_response(CState *state)
{
PGresult *res;
do
{
res = PQgetResult(state->con);
if (res)
PQclear(res);
} while (res);
}
static int
compareVariables(const void *v1, const void *v2)
{
return strcmp(((const Variable *) v1)->name,
((const Variable *) v2)->name);
}
static char *
getVariable(CState *st, char *name)
{
Variable key,
*var;
/* On some versions of Solaris, bsearch of zero items dumps core */
if (st->nvariables <= 0)
return NULL;
key.name = name;
var = (Variable *) bsearch((void *) &key,
(void *) st->variables,
st->nvariables,
sizeof(Variable),
compareVariables);
if (var != NULL)
return var->value;
else
return NULL;
}
/* check whether the name consists of alphabets, numerals and underscores. */
static bool
isLegalVariableName(const char *name)
{
int i;
for (i = 0; name[i] != '\0'; i++)
{
if (!isalnum((unsigned char) name[i]) && name[i] != '_')
return false;
}
return true;
}
static int
putVariable(CState *st, const char *context, char *name, char *value)
{
Variable key,
*var;
key.name = name;
/* On some versions of Solaris, bsearch of zero items dumps core */
if (st->nvariables > 0)
var = (Variable *) bsearch((void *) &key,
(void *) st->variables,
st->nvariables,
sizeof(Variable),
compareVariables);
else
var = NULL;
if (var == NULL)
{
Variable *newvars;
/*
* Check for the name only when declaring a new variable to avoid
* overhead.
*/
if (!isLegalVariableName(name))
{
fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
return false;
}
if (st->variables)
newvars = (Variable *) xrealloc(st->variables,
(st->nvariables + 1) * sizeof(Variable));
else
newvars = (Variable *) xmalloc(sizeof(Variable));
st->variables = newvars;
var = &newvars[st->nvariables];
var->name = xstrdup(name);
var->value = xstrdup(value);
st->nvariables++;
qsort((void *) st->variables, st->nvariables, sizeof(Variable),
compareVariables);
}
else
{
char *val;
/* dup then free, in case value is pointing at this variable */
val = xstrdup(value);
free(var->value);
var->value = val;
}
return true;
}
static char *
parseVariable(const char *sql, int *eaten)
{
int i = 0;
char *name;
do
{
i++;
} while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
if (i == 1)
return NULL;
name = xmalloc(i);
memcpy(name, &sql[1], i - 1);
name[i - 1] = '\0';
*eaten = i;
return name;
}
static char *
replaceVariable(char **sql, char *param, int len, char *value)
{
int valueln = strlen(value);
if (valueln > len)
{
size_t offset = param - *sql;
*sql = xrealloc(*sql, strlen(*sql) - len + valueln + 1);
param = *sql + offset;
}
if (valueln != len)
memmove(param + valueln, param + len, strlen(param + len) + 1);
strncpy(param, value, valueln);
return param + valueln;
}
static char *
assignVariables(CState *st, char *sql)
{
char *p,
*name,
*val;
p = sql;
while ((p = strchr(p, ':')) != NULL)
{
int eaten;
name = parseVariable(p, &eaten);
if (name == NULL)
{
while (*p == ':')
{
p++;
}
continue;
}
val = getVariable(st, name);
free(name);
if (val == NULL)
{
p++;
continue;
}
p = replaceVariable(&sql, p, eaten, val);
}
return sql;
}
static void
getQueryParams(CState *st, const Command *command, const char **params)
{
int i;
for (i = 0; i < command->argc - 1; i++)
params[i] = getVariable(st, command->argv[i + 1]);
}
/*
* Run a shell command. The result is assigned to the variable if not NULL.
* Return true if succeeded, or false on error.
*/
static bool
runShellCommand(CState *st, char *variable, char **argv, int argc)
{
char command[SHELL_COMMAND_SIZE];
int i,
len = 0;
FILE *fp;
char res[64];
char *endptr;
int retval;
/*----------
* Join arguments with whitespace separators. Arguments starting with
* exactly one colon are treated as variables:
* name - append a string "name"
* :var - append a variable named 'var'
* ::name - append a string ":name"
*----------
*/
for (i = 0; i < argc; i++)
{
char *arg;
int arglen;
if (argv[i][0] != ':')
{
arg = argv[i]; /* a string literal */
}
else if (argv[i][1] == ':')
{
arg = argv[i] + 1; /* a string literal starting with colons */
}
else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
return false;
}
arglen = strlen(arg);
if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
{
fprintf(stderr, "%s: too long shell command\n", argv[0]);
return false;
}
if (i > 0)
command[len++] = ' ';
memcpy(command + len, arg, arglen);
len += arglen;
}
command[len] = '\0';
/* Fast path for non-assignment case */
if (variable == NULL)
{
if (system(command))
{
if (!timer_exceeded)
fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
return false;
}
return true;
}
/* Execute the command with pipe and read the standard output. */
if ((fp = popen(command, "r")) == NULL)
{
fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
return false;
}
if (fgets(res, sizeof(res), fp) == NULL)
{
if (!timer_exceeded)
fprintf(stderr, "%s: cannot read the result\n", argv[0]);
return false;
}
if (pclose(fp) < 0)
{
fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
return false;
}
/* Check whether the result is an integer and assign it to the variable */
retval = (int) strtol(res, &endptr, 10);
while (*endptr != '\0' && isspace((unsigned char) *endptr))
endptr++;
if (*res == '\0' || *endptr != '\0')
{
fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
return false;
}
snprintf(res, sizeof(res), "%d", retval);
if (!putVariable(st, "setshell", variable, res))
return false;
#ifdef DEBUG
printf("shell parameter name: %s, value: %s\n", argv[1], res);
#endif
return true;
}
#define MAX_PREPARE_NAME 32
static void
preparedStatementName(char *buffer, int file, int state)
{
sprintf(buffer, "P%d_%d", file, state);
}
static bool
clientDone(CState *st, bool ok)
{
(void) ok; /* unused */
if (st->con != NULL)
{
PQfinish(st->con);
st->con = NULL;
}
return false; /* always false */
}
/* return false iff client should be disconnected */
static bool
doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile)
{
PGresult *res;
Command **commands;
top:
commands = sql_files[st->use_file];
if (st->sleeping)
{ /* are we sleeping? */
instr_time now;
INSTR_TIME_SET_CURRENT(now);
if (st->until <= INSTR_TIME_GET_MICROSEC(now))
st->sleeping = 0; /* Done sleeping, go ahead with next command */
else
return true; /* Still sleeping, nothing to do here */
}
if (st->listen)
{ /* are we receiver? */
if (commands[st->state]->type == SQL_COMMAND)
{
if (debug)
fprintf(stderr, "client %d receiving\n", st->id);
if (!PQconsumeInput(st->con))
{ /* there's something wrong */
fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
return clientDone(st, false);
}
if (PQisBusy(st->con))
return true; /* don't have the whole result yet */
}
/*
* command finished: accumulate per-command execution times in
* thread-local data structure, if per-command latencies are requested
*/
if (is_latencies)
{
instr_time now;
int cnum = commands[st->state]->command_num;
INSTR_TIME_SET_CURRENT(now);
INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
now, st->stmt_begin);
thread->exec_count[cnum]++;
}
/*
* if transaction finished, record the time it took in the log
*/
if (logfile && commands[st->state + 1] == NULL)
{
instr_time now;
instr_time diff;
double usec;
INSTR_TIME_SET_CURRENT(now);
diff = now;
INSTR_TIME_SUBTRACT(diff, st->txn_begin);
usec = (double) INSTR_TIME_GET_MICROSEC(diff);
#ifndef WIN32
/* This is more than we really ought to know about instr_time */
fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
st->id, st->cnt, usec, st->use_file,
(long) now.tv_sec, (long) now.tv_usec);
#else
/* On Windows, instr_time doesn't provide a timestamp anyway */
fprintf(logfile, "%d %d %.0f %d 0 0\n",
st->id, st->cnt, usec, st->use_file);
#endif
}
if (commands[st->state]->type == SQL_COMMAND)
{
/*
* Read and discard the query result; note this is not included in
* the statement latency numbers.
*/
res = PQgetResult(st->con);
switch (PQresultStatus(res))
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
break; /* OK */
default:
fprintf(stderr, "Client %d aborted in state %d: %s",
st->id, st->state, PQerrorMessage(st->con));
PQclear(res);
return clientDone(st, false);
}
PQclear(res);
discard_response(st);
}
if (commands[st->state + 1] == NULL)
{
if (is_connect)
{
PQfinish(st->con);
st->con = NULL;
}
++st->cnt;
if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
return clientDone(st, true); /* exit success */
}
/* increment state counter */
st->state++;
if (commands[st->state] == NULL)
{
st->state = 0;
st->use_file = getrand(thread, 0, num_files - 1);
commands = sql_files[st->use_file];
}
}
if (st->con == NULL)
{
instr_time start,
end;
INSTR_TIME_SET_CURRENT(start);
if ((st->con = doConnect()) == NULL)
{
fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
return clientDone(st, false);
}
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
}
/* Record transaction start time if logging is enabled */
if (logfile && st->state == 0)
INSTR_TIME_SET_CURRENT(st->txn_begin);
/* Record statement start time if per-command latencies are requested */
if (is_latencies)
INSTR_TIME_SET_CURRENT(st->stmt_begin);
if (commands[st->state]->type == SQL_COMMAND)
{
const Command *command = commands[st->state];
int r;
if (querymode == QUERY_SIMPLE)
{
char *sql;
sql = xstrdup(command->argv[0]);
sql = assignVariables(st, sql);
if (debug)
fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQuery(st->con, sql);
free(sql);
}
else if (querymode == QUERY_EXTENDED)
{
const char *sql = command->argv[0];
const char *params[MAX_ARGS];
getQueryParams(st, command, params);
if (debug)
fprintf(stderr, "client %d sending %s\n", st->id, sql);
r = PQsendQueryParams(st->con, sql, command->argc - 1,
NULL, params, NULL, NULL, 0);
}
else if (querymode == QUERY_PREPARED)
{
char name[MAX_PREPARE_NAME];
const char *params[MAX_ARGS];
if (!st->prepared[st->use_file])
{
int j;
for (j = 0; commands[j] != NULL; j++)
{
PGresult *res;
char name[MAX_PREPARE_NAME];
if (commands[j]->type != SQL_COMMAND)
continue;
preparedStatementName(name, st->use_file, j);
res = PQprepare(st->con, name,
commands[j]->argv[0], commands[j]->argc - 1, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
fprintf(stderr, "%s", PQerrorMessage(st->con));
PQclear(res);
}
st->prepared[st->use_file] = true;
}
getQueryParams(st, command, params);
preparedStatementName(name, st->use_file, st->state);
if (debug)
fprintf(stderr, "client %d sending %s\n", st->id, name);
r = PQsendQueryPrepared(st->con, name, command->argc - 1,
params, NULL, NULL, 0);
}
else /* unknown sql mode */
r = 0;
if (r == 0)
{
if (debug)
fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
st->ecnt++;
}
else
st->listen = 1; /* flags that should be listened */
}
else if (commands[st->state]->type == META_COMMAND)
{
int argc = commands[st->state]->argc,
i;
char **argv = commands[st->state]->argv;
if (debug)
{
fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
for (i = 1; i < argc; i++)
fprintf(stderr, " %s", argv[i]);
fprintf(stderr, "\n");
}
if (pg_strcasecmp(argv[0], "setrandom") == 0)
{
char *var;
int min,
max;
char res[64];
if (*argv[2] == ':')
{
if ((var = getVariable(st, argv[2] + 1)) == NULL)
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
st->ecnt++;
return true;
}
min = atoi(var);
}
else
min = atoi(argv[2]);
#ifdef NOT_USED
if (min < 0)
{
fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
st->ecnt++;
return;
}
#endif
if (*argv[3] == ':')
{
if ((var = getVariable(st, argv[3] + 1)) == NULL)
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
st->ecnt++;
return true;
}
max = atoi(var);
}
else
max = atoi(argv[3]);
if (max < min)
{
fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
st->ecnt++;
return true;
}
/*
* getrand() neeeds to be able to subtract max from min and add
* one the result without overflowing. Since we know max > min,
* we can detect overflow just by checking for a negative result.
* But we must check both that the subtraction doesn't overflow,
* and that adding one to the result doesn't overflow either.
*/
if (max - min < 0 || (max - min) + 1 < 0)
{
fprintf(stderr, "%s: range too large\n", argv[0]);
st->ecnt++;
return true;
}
#ifdef DEBUG
printf("min: %d max: %d random: %d\n", min, max, getrand(thread, min, max));
#endif
snprintf(res, sizeof(res), "%d", getrand(thread, min, max));
if (!putVariable(st, argv[0], argv[1], res))
{
st->ecnt++;
return true;
}
st->listen = 1;
}
else if (pg_strcasecmp(argv[0], "set") == 0)
{
char *var;
int ope1,
ope2;
char res[64];
if (*argv[2] == ':')
{
if ((var = getVariable(st, argv[2] + 1)) == NULL)
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
st->ecnt++;
return true;
}
ope1 = atoi(var);
}
else
ope1 = atoi(argv[2]);
if (argc < 5)
snprintf(res, sizeof(res), "%d", ope1);
else
{
if (*argv[4] == ':')
{
if ((var = getVariable(st, argv[4] + 1)) == NULL)
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
st->ecnt++;
return true;
}
ope2 = atoi(var);
}
else
ope2 = atoi(argv[4]);
if (strcmp(argv[3], "+") == 0)
snprintf(res, sizeof(res), "%d", ope1 + ope2);
else if (strcmp(argv[3], "-") == 0)
snprintf(res, sizeof(res), "%d", ope1 - ope2);
else if (strcmp(argv[3], "*") == 0)
snprintf(res, sizeof(res), "%d", ope1 * ope2);
else if (strcmp(argv[3], "/") == 0)
{
if (ope2 == 0)
{
fprintf(stderr, "%s: division by zero\n", argv[0]);
st->ecnt++;
return true;
}
snprintf(res, sizeof(res), "%d", ope1 / ope2);
}
else
{
fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
st->ecnt++;
return true;
}
}
if (!putVariable(st, argv[0], argv[1], res))
{
st->ecnt++;
return true;
}
st->listen = 1;
}
else if (pg_strcasecmp(argv[0], "sleep") == 0)
{
char *var;
int usec;
instr_time now;
if (*argv[1] == ':')
{
if ((var = getVariable(st, argv[1] + 1)) == NULL)
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
st->ecnt++;
return true;
}
usec = atoi(var);
}
else
usec = atoi(argv[1]);
if (argc > 2)
{
if (pg_strcasecmp(argv[2], "ms") == 0)
usec *= 1000;
else if (pg_strcasecmp(argv[2], "s") == 0)
usec *= 1000000;
}
else
usec *= 1000000;
INSTR_TIME_SET_CURRENT(now);
st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
st->sleeping = 1;
st->listen = 1;
}
else if (pg_strcasecmp(argv[0], "setshell") == 0)
{
bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
if (timer_exceeded) /* timeout */
return clientDone(st, true);
else if (!ret) /* on error */
{
st->ecnt++;
return true;
}
else /* succeeded */
st->listen = 1;
}
else if (pg_strcasecmp(argv[0], "shell") == 0)
{
bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
if (timer_exceeded) /* timeout */
return clientDone(st, true);
else if (!ret) /* on error */
{
st->ecnt++;
return true;
}
else /* succeeded */
st->listen = 1;
}
goto top;
}
return true;
}
/* discard connections */
static void
disconnect_all(CState *state, int length)
{
int i;
for (i = 0; i < length; i++)
{
if (state[i].con)
{
PQfinish(state[i].con);
state[i].con = NULL;
}
}
}
/* create tables and setup data */
static void
init(bool is_no_vacuum)
{
/*
* Note: TPC-B requires at least 100 bytes per row, and the "filler"
* fields in these table declarations were intended to comply with that.
* But because they default to NULLs, they don't actually take any space.
* We could fix that by giving them non-null default values. However, that
* would completely break comparability of pgbench results with prior
* versions. Since pgbench has never pretended to be fully TPC-B
* compliant anyway, we stick with the historical behavior.
*/
struct ddlinfo
{
char *table;
char *cols;
int declare_fillfactor;
};
struct ddlinfo DDLs[] = {
{
"pgbench_history",
"tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
0
},
{
"pgbench_tellers",
"tid int not null,bid int,tbalance int,filler char(92),"
"tbalance1 int, filler1 char(192),tbalance2 int,filler2 char(1500)",
1
},
{
"pgbench_accounts",
"aid int not null,bid int,abalance int,filler char(92),"
"abalance1 int,filler1 char(192),abalance2 int,filler2 char(1500)",
1
},
{
"pgbench_branches",
"bid int not null,bbalance int,filler char(92),bbalance1 int,"
"filler1 char(192), bbalance2 int, filler2 char(1500)",
1
}
};
static char *DDLAFTERs[] = {
"alter table pgbench_branches add primary key (bid)",
"alter table pgbench_tellers add primary key (tid)",
"alter table pgbench_accounts add primary key (aid)"
};
static char *DDLKEYs[] = {
"alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
"alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
"alter table pgbench_history add foreign key (bid) references pgbench_branches",
"alter table pgbench_history add foreign key (tid) references pgbench_tellers",
"alter table pgbench_history add foreign key (aid) references pgbench_accounts"
};
PGconn *con;
PGresult *res;
char sql[256];
int i;
if ((con = doConnect()) == NULL)
exit(1);
for (i = 0; i < lengthof(DDLs); i++)
{
char opts[256];
char buffer[256];
struct ddlinfo *ddl = &DDLs[i];
/* Remove old table, if it exists. */
snprintf(buffer, 256, "drop table if exists %s", ddl->table);
executeStatement(con, buffer);
/* Construct new create table statement. */
opts[0] = '\0';
if (ddl->declare_fillfactor)
snprintf(opts + strlen(opts), 256 - strlen(opts),
" with (fillfactor=%d)", fillfactor);
if (tablespace != NULL)
{
char *escape_tablespace;
escape_tablespace = PQescapeIdentifier(con, tablespace,
strlen(tablespace));
snprintf(opts + strlen(opts), 256 - strlen(opts),
" tablespace %s", escape_tablespace);
PQfreemem(escape_tablespace);
}
snprintf(buffer, 256, "create%s table %s(%s)%s",
unlogged_tables ? " unlogged" : "",
ddl->table, ddl->cols, opts);
executeStatement(con, buffer);
}
executeStatement(con, "begin");
for (i = 0; i < nbranches * scale; i++)
{
snprintf(sql, 256, "insert into pgbench_branches values(%d,0,0,0,0,0,0)", i + 1);
executeStatement(con, sql);
}
for (i = 0; i < ntellers * scale; i++)
{
snprintf(sql, 256, "insert into pgbench_tellers values (%d,%d,0,0,0,0,0,0)",
i + 1, i / ntellers + 1);
executeStatement(con, sql);
}
executeStatement(con, "commit");
/*
* fill the pgbench_accounts table with some data
*/
fprintf(stderr, "creating tables...\n");
executeStatement(con, "begin");
executeStatement(con, "truncate pgbench_accounts");
res = PQexec(con, "copy pgbench_accounts from stdin");
if (PQresultStatus(res) != PGRES_COPY_IN)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
for (i = 0; i < naccounts * scale; i++)
{
int j = i + 1;
snprintf(sql, 256, "%d\t%d\t%d\t \t%d\t \t%d\t \n", j, i / naccounts + 1, 0,0,0);
if (PQputline(con, sql))
{
fprintf(stderr, "PQputline failed\n");
exit(1);
}
if (j % 100000 == 0)
fprintf(stderr, "%d tuples done.\n", j);
}
if (PQputline(con, "\\.\n"))
{
fprintf(stderr, "very last PQputline failed\n");
exit(1);
}
if (PQendcopy(con))
{
fprintf(stderr, "PQendcopy failed\n");
exit(1);
}
executeStatement(con, "commit");
/* vacuum */
if (!is_no_vacuum)
{
fprintf(stderr, "vacuum...\n");
executeStatement(con, "vacuum analyze pgbench_branches");
executeStatement(con, "vacuum analyze pgbench_tellers");
executeStatement(con, "vacuum analyze pgbench_accounts");
executeStatement(con, "vacuum analyze pgbench_history");
}
/*
* create indexes
*/
fprintf(stderr, "set primary keys...\n");
for (i = 0; i < lengthof(DDLAFTERs); i++)
{
char buffer[256];
strncpy(buffer, DDLAFTERs[i], 256);
if (index_tablespace != NULL)
{
char *escape_tablespace;
escape_tablespace = PQescapeIdentifier(con, index_tablespace,
strlen(index_tablespace));
snprintf(buffer + strlen(buffer), 256 - strlen(buffer),
" using index tablespace %s", escape_tablespace);
PQfreemem(escape_tablespace);
}
executeStatement(con, buffer);
}
/*
* create foreign keys
*/
if (foreign_keys)
{
fprintf(stderr, "set foreign keys...\n");
for (i = 0; i < lengthof(DDLKEYs); i++)
{
executeStatement(con, DDLKEYs[i]);
}
}
fprintf(stderr, "done.\n");
PQfinish(con);
}
/*
* Parse the raw sql and replace :param to $n.
*/
static bool
parseQuery(Command *cmd, const char *raw_sql)
{
char *sql,
*p;
sql = xstrdup(raw_sql);
cmd->argc = 1;
p = sql;
while ((p = strchr(p, ':')) != NULL)
{
char var[12];
char *name;
int eaten;
name = parseVariable(p, &eaten);
if (name == NULL)
{
while (*p == ':')
{
p++;
}
continue;
}
if (cmd->argc >= MAX_ARGS)
{
fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
return false;
}
sprintf(var, "$%d", cmd->argc);
p = replaceVariable(&sql, p, eaten, var);
cmd->argv[cmd->argc] = name;
cmd->argc++;
}
cmd->argv[0] = sql;
return true;
}
/* Parse a command; return a Command struct, or NULL if it's a comment */
static Command *
process_commands(char *buf)
{
const char delim[] = " \f\n\r\t\v";
Command *my_commands;
int j;
char *p,
*tok;
/* Make the string buf end at the next newline */
if ((p = strchr(buf, '\n')) != NULL)
*p = '\0';
/* Skip leading whitespace */
p = buf;
while (isspace((unsigned char) *p))
p++;
/* If the line is empty or actually a comment, we're done */
if (*p == '\0' || strncmp(p, "--", 2) == 0)
return NULL;
/* Allocate and initialize Command structure */
my_commands = (Command *) xmalloc(sizeof(Command));
my_commands->line = xstrdup(buf);
my_commands->command_num = num_commands++;
my_commands->type = 0; /* until set */
my_commands->argc = 0;
if (*p == '\\')
{
my_commands->type = META_COMMAND;
j = 0;
tok = strtok(++p, delim);
while (tok != NULL)
{
my_commands->argv[j++] = xstrdup(tok);
my_commands->argc++;
tok = strtok(NULL, delim);
}
if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
{
if (my_commands->argc < 4)
{
fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
exit(1);
}
for (j = 4; j < my_commands->argc; j++)
fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
my_commands->argv[0], my_commands->argv[j]);
}
else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
{
if (my_commands->argc < 3)
{
fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
exit(1);
}
for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
my_commands->argv[0], my_commands->argv[j]);
}
else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
{
if (my_commands->argc < 2)
{
fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
exit(1);
}
/*
* Split argument into number and unit to allow "sleep 1ms" etc.
* We don't have to terminate the number argument with null
* because it will be parsed with atoi, which ignores trailing
* non-digit characters.
*/
if (my_commands->argv[1][0] != ':')
{
char *c = my_commands->argv[1];
while (isdigit((unsigned char) *c))
c++;
if (*c)
{
my_commands->argv[2] = c;
if (my_commands->argc < 3)
my_commands->argc = 3;
}
}
if (my_commands->argc >= 3)
{
if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
pg_strcasecmp(my_commands->argv[2], "s") != 0)
{
fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
my_commands->argv[0], my_commands->argv[2]);
exit(1);
}
}
for (j = 3; j < my_commands->argc; j++)
fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
my_commands->argv[0], my_commands->argv[j]);
}
else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
{
if (my_commands->argc < 3)
{
fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
exit(1);
}
}
else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
{
if (my_commands->argc < 1)
{
fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
exit(1);
}
}
else
{
fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
exit(1);
}
}
else
{
my_commands->type = SQL_COMMAND;
switch (querymode)
{
case QUERY_SIMPLE:
my_commands->argv[0] = xstrdup(p);
my_commands->argc++;
break;
case QUERY_EXTENDED:
case QUERY_PREPARED:
if (!parseQuery(my_commands, p))
exit(1);
break;
default:
exit(1);
}
}
return my_commands;
}
static int
process_file(char *filename)
{
#define COMMANDS_ALLOC_NUM 128
Command **my_commands;
FILE *fd;
int lineno;
char buf[BUFSIZ];
int alloc_num;
if (num_files >= MAX_FILES)
{
fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
exit(1);
}
alloc_num = COMMANDS_ALLOC_NUM;
my_commands = (Command **) xmalloc(sizeof(Command *) * alloc_num);
if (strcmp(filename, "-") == 0)
fd = stdin;
else if ((fd = fopen(filename, "r")) == NULL)
{
fprintf(stderr, "%s: %s\n", filename, strerror(errno));
return false;
}
lineno = 0;
while (fgets(buf, sizeof(buf), fd) != NULL)
{
Command *command;
command = process_commands(buf);
if (command == NULL)
continue;
my_commands[lineno] = command;
lineno++;
if (lineno >= alloc_num)
{
alloc_num += COMMANDS_ALLOC_NUM;
my_commands = xrealloc(my_commands, sizeof(Command *) * alloc_num);
}
}
fclose(fd);
my_commands[lineno] = NULL;
sql_files[num_files++] = my_commands;
return true;
}
static Command **
process_builtin(char *tb)
{
#define COMMANDS_ALLOC_NUM 128
Command **my_commands;
int lineno;
char buf[BUFSIZ];
int alloc_num;
alloc_num = COMMANDS_ALLOC_NUM;
my_commands = (Command **) xmalloc(sizeof(Command *) * alloc_num);
lineno = 0;
for (;;)
{
char *p;
Command *command;
p = buf;
while (*tb && *tb != '\n')
*p++ = *tb++;
if (*tb == '\0')
break;
if (*tb == '\n')
tb++;
*p = '\0';
command = process_commands(buf);
if (command == NULL)
continue;
my_commands[lineno] = command;
lineno++;
if (lineno >= alloc_num)
{
alloc_num += COMMANDS_ALLOC_NUM;
my_commands = xrealloc(my_commands, sizeof(Command *) * alloc_num);
}
}
my_commands[lineno] = NULL;
return my_commands;
}
/* print out results */
static void
printResults(int ttype, int normal_xacts, int nclients,
TState *threads, int nthreads,
instr_time total_time, instr_time conn_total_time)
{
double time_include,
tps_include,
tps_exclude;
char *s;
time_include = INSTR_TIME_GET_DOUBLE(total_time);
tps_include = normal_xacts / time_include;
tps_exclude = normal_xacts / (time_include -
(INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
if (ttype == 0)
s = "TPC-B (sort of)";
else if (ttype == 2)
s = "Update only pgbench_accounts";
else if (ttype == 1)
s = "SELECT only";
else
s = "Custom query";
printf("transaction type: %s\n", s);
printf("scaling factor: %d\n", scale);
printf("query mode: %s\n", QUERYMODE[querymode]);
printf("number of clients: %d\n", nclients);
printf("number of threads: %d\n", nthreads);
if (duration <= 0)
{
printf("number of transactions per client: %d\n", nxacts);
printf("number of transactions actually processed: %d/%d\n",
normal_xacts, nxacts * nclients);
}
else
{
printf("duration: %d s\n", duration);
printf("number of transactions actually processed: %d\n",
normal_xacts);
}
printf("tps = %f (including connections establishing)\n", tps_include);
printf("tps = %f (excluding connections establishing)\n", tps_exclude);
/* Report per-command latencies */
if (is_latencies)
{
int i;
for (i = 0; i < num_files; i++)
{
Command **commands;
if (num_files > 1)
printf("statement latencies in milliseconds, file %d:\n", i + 1);
else
printf("statement latencies in milliseconds:\n");
for (commands = sql_files[i]; *commands != NULL; commands++)
{
Command *command = *commands;
int cnum = command->command_num;
double total_time;
instr_time total_exec_elapsed;
int total_exec_count;
int t;
/* Accumulate per-thread data for command */
INSTR_TIME_SET_ZERO(total_exec_elapsed);
total_exec_count = 0;
for (t = 0; t < nthreads; t++)
{
TState *thread = &threads[t];
INSTR_TIME_ADD(total_exec_elapsed,
thread->exec_elapsed[cnum]);
total_exec_count += thread->exec_count[cnum];
}
if (total_exec_count > 0)
total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
else
total_time = 0.0;
printf("\t%f\t%s\n", total_time, command->line);
}
}
}
}
int
main(int argc, char **argv)
{
int c;
int nclients = 1; /* default number of simulated clients */
int nthreads = 1; /* default number of threads */
int is_init_mode = 0; /* initialize mode? */
int is_no_vacuum = 0; /* no vacuum at all before testing? */
int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
* 2: skip update of branches and tellers */
int optindex;
char *filename = NULL;
bool scale_given = false;
CState *state; /* status of clients */
TState *threads; /* array of thread */
instr_time start_time; /* start up time */
instr_time total_time;
instr_time conn_total_time;
int total_xacts;
int i;
static struct option long_options[] = {
{"foreign-keys", no_argument, &foreign_keys, 1},
{"index-tablespace", required_argument, NULL, 3},
{"tablespace", required_argument, NULL, 2},
{"unlogged-tables", no_argument, &unlogged_tables, 1},
{NULL, 0, NULL, 0}
};
#ifdef HAVE_GETRLIMIT
struct rlimit rlim;
#endif
PGconn *con;
PGresult *res;
char *env;
char val[64];
progname = get_progname(argv[0]);
if (argc > 1)
{
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
{
usage();
exit(0);
}
if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
{
puts("pgbench (PostgreSQL) " PG_VERSION);
exit(0);
}
}
#ifdef WIN32
/* stderr is buffered on Win32. */
setvbuf(stderr, NULL, _IONBF, 0);
#endif
if ((env = getenv("PGHOST")) != NULL && *env != '\0')
pghost = env;
if ((env = getenv("PGPORT")) != NULL && *env != '\0')
pgport = env;
else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
login = env;
state = (CState *) xmalloc(sizeof(CState));
memset(state, 0, sizeof(CState));
while ((c = getopt_long(argc, argv, "ih:nvp:dSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
{
switch (c)
{
case 'i':
is_init_mode++;
break;
case 'h':
pghost = optarg;
break;
case 'n':
is_no_vacuum++;
break;
case 'v':
do_vacuum_accounts++;
break;
case 'p':
pgport = optarg;
break;
case 'd':
debug++;
break;
case 'S':
ttype = 1;
break;
case 'N':
ttype = 2;
break;
case 'c':
nclients = atoi(optarg);
if (nclients <= 0 || nclients > MAXCLIENTS)
{
fprintf(stderr, "invalid number of clients: %d\n", nclients);
exit(1);
}
#ifdef HAVE_GETRLIMIT
#ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
#else /* but BSD doesn't ... */
if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
#endif /* RLIMIT_NOFILE */
{
fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
exit(1);
}
if (rlim.rlim_cur <= (nclients + 2))
{
fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
exit(1);
}
#endif /* HAVE_GETRLIMIT */
break;
case 'j': /* jobs */
nthreads = atoi(optarg);
if (nthreads <= 0)
{
fprintf(stderr, "invalid number of threads: %d\n", nthreads);
exit(1);
}
break;
case 'C':
is_connect = true;
break;
case 'r':
is_latencies = true;
break;
case 's':
scale_given = true;
scale = atoi(optarg);
if (scale <= 0)
{
fprintf(stderr, "invalid scaling factor: %d\n", scale);
exit(1);
}
break;
case 't':
if (duration > 0)
{
fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
exit(1);
}
nxacts = atoi(optarg);
if (nxacts <= 0)
{
fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
exit(1);
}
break;
case 'T':
if (nxacts > 0)
{
fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
exit(1);
}
duration = atoi(optarg);
if (duration <= 0)
{
fprintf(stderr, "invalid duration: %d\n", duration);
exit(1);
}
break;
case 'U':
login = optarg;
break;
case 'l':
use_log = true;
break;
case 'f':
ttype = 3;
filename = optarg;
if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
exit(1);
break;
case 'D':
{
char *p;
if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
{
fprintf(stderr, "invalid variable definition: %s\n", optarg);
exit(1);
}
*p++ = '\0';
if (!putVariable(&state[0], "option", optarg, p))
exit(1);
}
break;
case 'F':
fillfactor = atoi(optarg);
if ((fillfactor < 10) || (fillfactor > 100))
{
fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
exit(1);
}
break;
case 'M':
if (num_files > 0)
{
fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
exit(1);
}
for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
if (strcmp(optarg, QUERYMODE[querymode]) == 0)
break;
if (querymode >= NUM_QUERYMODE)
{
fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
exit(1);
}
break;
case 0:
/* This covers long options which take no argument. */
break;
case 2: /* tablespace */
tablespace = optarg;
break;
case 3: /* index-tablespace */
index_tablespace = optarg;
break;
default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
break;
}
}
if (argc > optind)
dbName = argv[optind];
else
{
if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
dbName = env;
else if (login != NULL && *login != '\0')
dbName = login;
else
dbName = "";
}
if (is_init_mode)
{
init(is_no_vacuum);
exit(0);
}
/* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
if (nxacts <= 0 && duration <= 0)
nxacts = DEFAULT_NXACTS;
if (nclients % nthreads != 0)
{
fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
exit(1);
}
/*
* is_latencies only works with multiple threads in thread-based
* implementations, not fork-based ones, because it supposes that the
* parent can see changes made to the per-thread execution stats by child
* threads. It seems useful enough to accept despite this limitation, but
* perhaps we should FIXME someday (by passing the stats data back up
* through the parent-to-child pipes).
*/
#ifndef ENABLE_THREAD_SAFETY
if (is_latencies && nthreads > 1)
{
fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
exit(1);
}
#endif
/*
* save main process id in the global variable because process id will be
* changed after fork.
*/
main_pid = (int) getpid();
if (nclients > 1)
{
state = (CState *) xrealloc(state, sizeof(CState) * nclients);
memset(state + 1, 0, sizeof(CState) * (nclients - 1));
/* copy any -D switch values to all clients */
for (i = 1; i < nclients; i++)
{
int j;
state[i].id = i;
for (j = 0; j < state[0].nvariables; j++)
{
if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
exit(1);
}
}
}
if (debug)
{
if (duration <= 0)
printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
pghost, pgport, nclients, nxacts, dbName);
else
printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
pghost, pgport, nclients, duration, dbName);
}
/* opening connection... */
con = doConnect();
if (con == NULL)
exit(1);
if (PQstatus(con) == CONNECTION_BAD)
{
fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
if (ttype != 3)
{
/*
* get the scaling factor that should be same as count(*) from
* pgbench_branches if this is not a custom query
*/
res = PQexec(con, "select count(*) from pgbench_branches");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
scale = atoi(PQgetvalue(res, 0, 0));
if (scale < 0)
{
fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
exit(1);
}
PQclear(res);
/* warn if we override user-given -s switch */
if (scale_given)
fprintf(stderr,
"Scale option ignored, using pgbench_branches table count = %d\n",
scale);
}
/*
* :scale variables normally get -s or database scale, but don't override
* an explicit -D switch
*/
if (getVariable(&state[0], "scale") == NULL)
{
snprintf(val, sizeof(val), "%d", scale);
for (i = 0; i < nclients; i++)
{
if (!putVariable(&state[i], "startup", "scale", val))
exit(1);
}
}
if (!is_no_vacuum)
{
fprintf(stderr, "starting vacuum...");
executeStatement(con, "vacuum pgbench_branches");
executeStatement(con, "vacuum pgbench_tellers");
executeStatement(con, "truncate pgbench_history");
fprintf(stderr, "end.\n");
if (do_vacuum_accounts)
{
fprintf(stderr, "starting vacuum pgbench_accounts...");
executeStatement(con, "vacuum analyze pgbench_accounts");
fprintf(stderr, "end.\n");
}
}
PQfinish(con);
/* set random seed */
INSTR_TIME_SET_CURRENT(start_time);
srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
/* process builtin SQL scripts */
switch (ttype)
{
case 0:
sql_files[0] = process_builtin(tpc_b);
num_files = 1;
break;
case 1:
sql_files[0] = process_builtin(select_only);
num_files = 1;
break;
case 2:
sql_files[0] = process_builtin(simple_update);
num_files = 1;
break;
default:
break;
}
/* set up thread data structures */
threads = (TState *) xmalloc(sizeof(TState) * nthreads);
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
thread->tid = i;
thread->state = &state[nclients / nthreads * i];
thread->nstate = nclients / nthreads;
thread->random_state[0] = random();
thread->random_state[1] = random();
thread->random_state[2] = random();
if (is_latencies)
{
/* Reserve memory for the thread to store per-command latencies */
int t;
thread->exec_elapsed = (instr_time *)
xmalloc(sizeof(instr_time) * num_commands);
thread->exec_count = (int *)
xmalloc(sizeof(int) * num_commands);
for (t = 0; t < num_commands; t++)
{
INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
thread->exec_count[t] = 0;
}
}
else
{
thread->exec_elapsed = NULL;
thread->exec_count = NULL;
}
}
/* get start up time */
INSTR_TIME_SET_CURRENT(start_time);
/* set alarm if duration is specified. */
if (duration > 0)
setalarm(duration);
/* start threads */
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
INSTR_TIME_SET_CURRENT(thread->start_time);
/* the first thread (i = 0) is executed by main thread */
if (i > 0)
{
int err = pthread_create(&thread->thread, NULL, threadRun, thread);
if (err != 0 || thread->thread == INVALID_THREAD)
{
fprintf(stderr, "cannot create thread: %s\n", strerror(err));
exit(1);
}
}
else
{
thread->thread = INVALID_THREAD;
}
}
/* wait for threads and accumulate results */
total_xacts = 0;
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
void *ret = NULL;
if (threads[i].thread == INVALID_THREAD)
ret = threadRun(&threads[i]);
else
pthread_join(threads[i].thread, &ret);
if (ret != NULL)
{
TResult *r = (TResult *) ret;
total_xacts += r->xacts;
INSTR_TIME_ADD(conn_total_time, r->conn_time);
free(ret);
}
}
disconnect_all(state, nclients);
/* get end time */
INSTR_TIME_SET_CURRENT(total_time);
INSTR_TIME_SUBTRACT(total_time, start_time);
printResults(ttype, total_xacts, nclients, threads, nthreads,
total_time, conn_total_time);
return 0;
}
static void *
threadRun(void *arg)
{
TState *thread = (TState *) arg;
CState *state = thread->state;
TResult *result;
FILE *logfile = NULL; /* per-thread log file */
instr_time start,
end;
int nstate = thread->nstate;
int remains = nstate; /* number of remaining clients */
int i;
result = xmalloc(sizeof(TResult));
INSTR_TIME_SET_ZERO(result->conn_time);
/* open log file if requested */
if (use_log)
{
char logpath[64];
if (thread->tid == 0)
snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
else
snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
logfile = fopen(logpath, "w");
if (logfile == NULL)
{
fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
goto done;
}
}
if (!is_connect)
{
/* make connections to the database */
for (i = 0; i < nstate; i++)
{
if ((state[i].con = doConnect()) == NULL)
goto done;
}
}
/* time after thread and connections set up */
INSTR_TIME_SET_CURRENT(result->conn_time);
INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
/* send start up queries in async manner */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
Command **commands = sql_files[st->use_file];
int prev_ecnt = st->ecnt;
st->use_file = getrand(thread, 0, num_files - 1);
if (!doCustom(thread, st, &result->conn_time, logfile))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
}
}
while (remains > 0)
{
fd_set input_mask;
int maxsock; /* max socket number to be waited */
int64 now_usec = 0;
int64 min_usec;
FD_ZERO(&input_mask);
maxsock = -1;
min_usec = INT64_MAX;
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
Command **commands = sql_files[st->use_file];
int sock;
if (st->sleeping)
{
int this_usec;
if (min_usec == INT64_MAX)
{
instr_time now;
INSTR_TIME_SET_CURRENT(now);
now_usec = INSTR_TIME_GET_MICROSEC(now);
}
this_usec = st->until - now_usec;
if (min_usec > this_usec)
min_usec = this_usec;
}
else if (st->con == NULL)
{
continue;
}
else if (commands[st->state]->type == META_COMMAND)
{
min_usec = 0; /* the connection is ready to run */
break;
}
sock = PQsocket(st->con);
if (sock < 0)
{
fprintf(stderr, "bad socket: %s\n", strerror(errno));
goto done;
}
FD_SET(sock, &input_mask);
if (maxsock < sock)
maxsock = sock;
}
if (min_usec > 0 && maxsock != -1)
{
int nsocks; /* return from select(2) */
if (min_usec != INT64_MAX)
{
struct timeval timeout;
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
}
else
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
if (nsocks < 0)
{
if (errno == EINTR)
continue;
/* must be something wrong */
fprintf(stderr, "select failed: %s\n", strerror(errno));
goto done;
}
}
/* ok, backend returns reply */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
Command **commands = sql_files[st->use_file];
int prev_ecnt = st->ecnt;
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|| commands[st->state]->type == META_COMMAND))
{
if (!doCustom(thread, st, &result->conn_time, logfile))
remains--; /* I've aborted */
}
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
}
}
}
done:
INSTR_TIME_SET_CURRENT(start);
disconnect_all(state, nstate);
result->xacts = 0;
for (i = 0; i < nstate; i++)
result->xacts += state[i].cnt;
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
if (logfile)
fclose(logfile);
return result;
}
/*
* Support for duration option: set timer_exceeded after so many seconds.
*/
#ifndef WIN32
static void
handle_sig_alarm(SIGNAL_ARGS)
{
timer_exceeded = true;
}
static void
setalarm(int seconds)
{
pqsignal(SIGALRM, handle_sig_alarm);
alarm(seconds);
}
#ifndef ENABLE_THREAD_SAFETY
/*
* implements pthread using fork.
*/
typedef struct fork_pthread
{
pid_t pid;
int pipes[2];
} fork_pthread;
static int
pthread_create(pthread_t *thread,
pthread_attr_t *attr,
void *(*start_routine) (void *),
void *arg)
{
fork_pthread *th;
void *ret;
th = (fork_pthread *) xmalloc(sizeof(fork_pthread));
if (pipe(th->pipes) < 0)
{
free(th);
return errno;
}
th->pid = fork();
if (th->pid == -1) /* error */
{
free(th);
return errno;
}
if (th->pid != 0) /* in parent process */
{
close(th->pipes[1]);
*thread = th;
return 0;
}
/* in child process */
close(th->pipes[0]);
/* set alarm again because the child does not inherit timers */
if (duration > 0)
setalarm(duration);
ret = start_routine(arg);
write(th->pipes[1], ret, sizeof(TResult));
close(th->pipes[1]);
free(th);
exit(0);
}
static int
pthread_join(pthread_t th, void **thread_return)
{
int status;
while (waitpid(th->pid, &status, 0) != th->pid)
{
if (errno != EINTR)
return errno;
}
if (thread_return != NULL)
{
/* assume result is TResult */
*thread_return = xmalloc(sizeof(TResult));
if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
{
free(*thread_return);
*thread_return = NULL;
}
}
close(th->pipes[0]);
free(th);
return 0;
}
#endif
#else /* WIN32 */
static VOID CALLBACK
win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
{
timer_exceeded = true;
}
static void
setalarm(int seconds)
{
HANDLE queue;
HANDLE timer;
/* This function will be called at most once, so we can cheat a bit. */
queue = CreateTimerQueue();
if (seconds > ((DWORD) -1) / 1000 ||
!CreateTimerQueueTimer(&timer, queue,
win32_timer_callback, NULL, seconds * 1000, 0,
WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
{
fprintf(stderr, "Failed to set timer\n");
exit(1);
}
}
/* partial pthread implementation for Windows */
typedef struct win32_pthread
{
HANDLE handle;
void *(*routine) (void *);
void *arg;
void *result;
} win32_pthread;
static unsigned __stdcall
win32_pthread_run(void *arg)
{
win32_pthread *th = (win32_pthread *) arg;
th->result = th->routine(th->arg);
return 0;
}
static int
pthread_create(pthread_t *thread,
pthread_attr_t *attr,
void *(*start_routine) (void *),
void *arg)
{
int save_errno;
win32_pthread *th;
th = (win32_pthread *) xmalloc(sizeof(win32_pthread));
th->routine = start_routine;
th->arg = arg;
th->result = NULL;
th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
if (th->handle == NULL)
{
save_errno = errno;
free(th);
return save_errno;
}
*thread = th;
return 0;
}
static int
pthread_join(pthread_t th, void **thread_return)
{
if (th == NULL || th->handle == NULL)
return errno = EINVAL;
if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
{
_dosmaperr(GetLastError());
return errno;
}
if (thread_return)
*thread_return = th->result;
CloseHandle(th->handle);
free(th);
return 0;
}
#endif /* WIN32 */
On 03.08.2012 14:46, Amit kapila wrote:
Currently the change is done only for fixed length columns for simple tables and the tuple should not contain NULLS.
This is a Proof of concept, the design and implementation needs to be changed based on final design required for handling other scenario's
Update operation:
-----------------------------
1. Check for the simple table or not.(No toast, No before update triggers)
2. Works only for not null tuples.
3. Identify the modified columns from the target entry.
4. Based on the modified column list, check for any variable length columns are modified, if so this optimization is not applied.
5. Identify the offset and length for the modified columns and store it as an optimized WAL tuple in the following format.
Note: Wal update header is modified to denote whether wal update optimization is done or not.
WAL update header + Tuple header(no change from previous format) +
[offset(2bytes)] [length(2 bytes)] [changed data value]
[offset(2bytes)] [length(2 bytes)] [changed data value]
....
....
The performance will need to be re-verified after you fix these
limitations. Those limitations need to be fixed before this can be applied.
It would be nice to use some well-known binary delta algorithm for this,
rather than invent our own. OTOH, we have more knowledge of the
attribute boundaries, so a custom algorithm might work better. In any
case, I'd like to see the code to do the delta encoding/decoding to be
put into separate functions, outside of heapam.c. It would be good for
readability, and we might want to reuse this in other places too.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
From: Heikki Linnakangas [mailto:heikki.linnakangas@enterprisedb.com]
Sent: Saturday, August 04, 2012 1:33 AM
On 03.08.2012 14:46, Amit kapila wrote:
Currently the change is done only for fixed length columns for simple
tables and the tuple should not contain NULLS.
This is a Proof of concept, the design and implementation needs to be
changed based on final design required for handling other scenario's
Update operation:
-----------------------------
1. Check for the simple table or not.(No toast, No before update
triggers)
2. Works only for not null tuples.
3. Identify the modified columns from the target entry.
4. Based on the modified column list, check for any variable length
columns are modified, if so this optimization is not applied.
5. Identify the offset and length for the modified columns and store it
as an optimized WAL tuple in the following format.
Note: Wal update header is modified to denote whether wal update
optimization is done or not.
WAL update header + Tuple header(no change from previous format)
+
[offset(2bytes)] [length(2 bytes)] [changed data value]
[offset(2bytes)] [length(2 bytes)] [changed data value]
....
....
The performance will need to be re-verified after you fix these
limitations. Those limitations need to be fixed before this can be
applied.
Yes, I agree that solution should fix these limitations and performance
numbers needs to be re-verified.
Currently in my mind the work to be done is as follows:
1. Solution which can handle Variable length columns and NULLs
2. Handling of Before Triggers
3. Can the solution for fixed length columns be same as Variable length
columns and NULLS.
4. Make the final code patch which addresses all the above.
Please suggest if there are more things that needs to be handled?
For the 3rd point, currently the solution for fixed length columns cannot
handle the case of variable length columns and NULLS. The reason is for
fixed length columns there is no need of diff technology between old and new
tuple, however for other cases it will be required.
For fixed length columns, if we just note the OFFSET, LENGTH, VALUE of
changed columns of new tuple in WAL, it will be sufficient to do the replay
of WAL. However to handle other cases we need to use diff mechanism.
Can we do something like if the changed columns are fixed length and doesn't
contain NULL's, then store [OFFSET, LENGTH, VALUE] format in WAL and for
other cases store diff format.
This has advantage that for Updates containing only fixed length columns
don't have to pay penality of doing diff between new and old tuple. Also we
can do the whole work in 2 parts, one for fixed length columns and second to
handle other cases.
It would be nice to use some well-known binary delta algorithm for this,
rather than invent our own. OTOH, we have more knowledge of the
attribute boundaries, so a custom algorithm might work better.
I shall work on this and post after initial work.
In any case, I'd like to see the code to do the delta encoding/decoding to
be
put into separate functions, outside of heapam.c. It would be good for
readability, and we might want to reuse this in other places too.
Agreed. I shall take care of doing it in suggested way.
With Regards,
Amit Kapila.
On 04.08.2012 11:01, Amit Kapila wrote:
Missed one point which needs to be handled is pg_upgrade
I don't think there's anything to do for pg_upgrade. This doesn't change
the on-disk data format, just the WAL format, and pg_upgrade isn't
sensitive to WAL format changes.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
Import Notes
Reply to msg id not found: 501cd6e2.c9c9440a.09a6.ffffb708SMTPIN_ADDED@mx.google.comReference msg id not found: 501cd6e2.c9c9440a.09a6.ffffb708SMTPIN_ADDED@mx.google.com | Resolved by subject fallback
On Sat, Aug 4, 2012 at 05:21:06PM +0300, Heikki Linnakangas wrote:
On 04.08.2012 11:01, Amit Kapila wrote:
Missed one point which needs to be handled is pg_upgrade
I don't think there's anything to do for pg_upgrade. This doesn't
change the on-disk data format, just the WAL format, and pg_upgrade
isn't sensitive to WAL format changes.
Correct.
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ It's impossible for everything to be true. +
From: Bruce Momjian [mailto:bruce@momjian.us]
Sent: Saturday, August 04, 2012 8:06 PM
On Sat, Aug 4, 2012 at 05:21:06PM +0300, Heikki Linnakangas wrote:
On 04.08.2012 11:01, Amit Kapila wrote:
Missed one point which needs to be handled is pg_upgrade
I don't think there's anything to do for pg_upgrade. This doesn't
change the on-disk data format, just the WAL format, and pg_upgrade
isn't sensitive to WAL format changes.
Correct.
Thanks Bruce and Heikki for this information.
I need your feedback on the below design point, as it will make my further
work on this performance issue more clear.
Also let me know if the explanation below is not clear, I shall try to use
some examples to explain my point.
Currently the solution for fixed length columns cannot handle the case of
variable length columns and NULLS. The reason is for fixed length columns
there is no need of diff technology between old and new tuple, however for
other cases it will be required.
For fixed length columns, if we just note the OFFSET, LENGTH, VALUE of
changed columns of new tuple in WAL, it will be sufficient to do the replay
of WAL. However to handle other cases we need to use diff mechanism.
Can we do something like if the changed columns are fixed length and doesn't
contain NULL's, then store [OFFSET, LENGTH, VALUE] format in WAL and for
other cases store diff format.
This has advantage that for Updates containing only fixed length columns
don't have to pay penality of doing diff between new and old tuple. Also we
can do the whole work in 2 parts, one for fixed length columns and second to
handle other cases.
With Regards,
Amit Kapila.
On 06.08.2012 06:10, Amit Kapila wrote:
Currently the solution for fixed length columns cannot handle the case of
variable length columns and NULLS. The reason is for fixed length columns
there is no need of diff technology between old and new tuple, however for
other cases it will be required.
For fixed length columns, if we just note the OFFSET, LENGTH, VALUE of
changed columns of new tuple in WAL, it will be sufficient to do the replay
of WAL. However to handle other cases we need to use diff mechanism.Can we do something like if the changed columns are fixed length and doesn't
contain NULL's, then store [OFFSET, LENGTH, VALUE] format in WAL and for
other cases store diff format.This has advantage that for Updates containing only fixed length columns
don't have to pay penality of doing diff between new and old tuple. Also we
can do the whole work in 2 parts, one for fixed length columns and second to
handle other cases.
Let's keep it simple and use the same diff format for all tuples, at
least for now. If it turns out that you can indeed get even more gain
for fixed length tuples by something like that, then let's do that later
as a separate patch.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
Import Notes
Reply to msg id not found: 501f35e3.869aec0a.0a95.ffffc442SMTPIN_ADDED@mx.google.comReference msg id not found: 501cd6e2.c9c9440a.09a6.ffffb708SMTPIN_ADDED@mx.google.com
From: Heikki Linnakangas [mailto:heikki.linnakangas@enterprisedb.com]
Sent: Monday, August 06, 2012 2:32 PM
To: Amit Kapila
Cc: 'Bruce Momjian'; pgsql-hackers@postgresql.org
Subject: Re: [HACKERS] [WIP] Performance Improvement by reducing WAL for
Update Operation
On 06.08.2012 06:10, Amit Kapila wrote:
Currently the solution for fixed length columns cannot handle the case of
variable length columns and NULLS. The reason is for fixed length columns
there is no need of diff technology between old and new tuple, however
for
other cases it will be required.
For fixed length columns, if we just note the OFFSET, LENGTH, VALUE of
changed columns of new tuple in WAL, it will be sufficient to do the
replay
of WAL. However to handle other cases we need to use diff mechanism.
Can we do something like if the changed columns are fixed length and
doesn't
contain NULL's, then store [OFFSET, LENGTH, VALUE] format in WAL and for
other cases store diff format.This has advantage that for Updates containing only fixed length columns
don't have to pay penality of doing diff between new and old tuple. Also
we
can do the whole work in 2 parts, one for fixed length columns and second
to
handle other cases.
Let's keep it simple and use the same diff format for all tuples, at
least for now. If it turns out that you can indeed get even more gain
for fixed length tuples by something like that, then let's do that later
as a separate patch.
Okay, I shall first try to design and implement the same format for all
tuples
and discuss the results of same with community.
With Regards,
Amit Kapila.
On 3 August 2012 12:46, Amit kapila <amit.kapila@huawei.com> wrote:
Frame the new tuple from old tuple and WAL record:
Sounds good.
I'd suggest we do this only when the saving is large enough for
benefit, rather than do this every time.
You don't mention whether or not the old and the new tuple are on the
same data block.
Personally, I think it will be important to ensure the above,
otherwise recovery will require much additional code for that case.
And that code will be prone to race conditions and performance issues.
Please also bear in mind that Andres will be looking to include the PK
columns in every WAL record for BDR. That could be an option, but I
doubt there is much value in excluding PK columns. I think I'd want
them to be there for debugging purposes so we can prove this code is
correct in production, since otherwise this could be a source of data
loss bugs.
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
From: Simon Riggs [mailto:simon@2ndQuadrant.com]
Sent: Thursday, August 09, 2012 12:36 PM
On 3 August 2012 12:46, Amit kapila <amit.kapila@huawei.com> wrote:
Frame the new tuple from old tuple and WAL record:
Sounds good.
Thanks.
I'd suggest we do this only when the saving is large enough for
benefit, rather than do this every time.
Do you mean to say that when length of updated values of tuple is less
than some threshold(1/3 or 2/3, etc..) value of
total length?
You don't mention whether or not the old and the new tuple are on the
same data block.
WAL reduction is done for the case even when old and new are on different
data blocks as well.
Personally, I think it will be important to ensure the above,
otherwise recovery will require much additional code for that case.
In recovery currently also, it handles the case when old and new are on
different page such that
it has to read old page to get the old tuple.
The modifications needs to ensure handling of following cases:
a. When there is backup block,and old-new tuples are on different page
Currently it doesn't read the old page,
However for new implementation it needs to read old page for this case
also.
b. When changes are already applied on page [line : if (XLByteLE(lsn,
PageGetLSN(page))); function: heap_xlog_update]
Currently it doesn't read the old page,
However for new implementation it needs to read old page for this case
also.
And that code will be prone to race conditions and performance issues.
Are you telling performance issues, as now we may need to read old page in
some of the cases
when earlier it was not reading?
If yes, then I think as I have mentioned above, according to me above 2
cases are not very usual cases.
However the benefit of Update operation on running server is good enough
as it reduces the WAL volume.
If other then above, then please suggest me.
Please also bear in mind that Andres will be looking to include the PK
columns in every WAL record for BDR. That could be an option, but I
doubt there is much value in excluding PK columns.
Agreed. However once the implementation by Andres is done I can merge both
codes and
take the performance data again, based on which we can take decision.
With Regards,
Amit Kapila.
On 9 August 2012 09:49, Amit Kapila <amit.kapila@huawei.com> wrote:
I'd suggest we do this only when the saving is large enough for
benefit, rather than do this every time.Do you mean to say that when length of updated values of tuple is less
than some threshold(1/3 or 2/3, etc..) value of
total length?
Some heuristic, yes, similar to TOAST's minimum threshold. To attempt
removal of rows in all cases would not be worth it, so we need a fast
path way of saying lets just take all of the columns.
You don't mention whether or not the old and the new tuple are on the
same data block.WAL reduction is done for the case even when old and new are on different
data blocks as well.
That makes me feel nervous. I doubt the marginal gain is worth it.
Most updates don't cross blocks.
Please also bear in mind that Andres will be looking to include the PK
columns in every WAL record for BDR. That could be an option, but I
doubt there is much value in excluding PK columns.Agreed. However once the implementation by Andres is done I can merge both
codes and
take the performance data again, based on which we can take decision.
It won't happen like that because there won't be a single point where
Andres is done. If you agree, then its worth doing it that way to
begin with, rather than requiring us to revisit the same section of
code twice.
One huge point that needs to be thought through is how we prove this
code actually works on WAL/recovery side. A normal regression test
won't prove that and we don't have a framework in place for that.
If you think about what you'll need to do to prove you haven't made
some fatal corruption of WAL, its going to look a lot like logical
replication tests. Worst case here is that mistakes on this patch will
show up as Andres' mistakes. So there is a stronger connection to
Andres' work than it first appears.
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Import Notes
Reply to msg id not found: -3088902293716353364@unknownmsgid
On 09.08.2012 12:18, Simon Riggs wrote:
On 9 August 2012 09:49, Amit Kapila<amit.kapila@huawei.com> wrote:
WAL reduction is done for the case even when old and new are on different
data blocks as well.That makes me feel nervous. I doubt the marginal gain is worth it.
Most updates don't cross blocks.
That was my first instinctive reaction too. But if the mechanism works
just as well for cross-page updates, seems a bit strange to not use it.
One argument would be that if for some reason the old block is corrupt
or lost, you would not be able to recover the new version of the tuple
from the WAL alone. At the moment, it's nice that the WAL record
contains all the information required to reconstruct the new tuple,
regardless of the old data block contents. But then again, full-page
writes cover that too. There will be a full-page image of the old block
in the WAL anyway.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On 9 August 2012 11:30, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
On 09.08.2012 12:18, Simon Riggs wrote:
On 9 August 2012 09:49, Amit Kapila<amit.kapila@huawei.com> wrote:
WAL reduction is done for the case even when old and new are on
different
data blocks as well.That makes me feel nervous. I doubt the marginal gain is worth it.
Most updates don't cross blocks.That was my first instinctive reaction too. But if the mechanism works just
as well for cross-page updates, seems a bit strange to not use it.One argument would be that if for some reason the old block is corrupt or
lost, you would not be able to recover the new version of the tuple from the
WAL alone. At the moment, it's nice that the WAL record contains all the
information required to reconstruct the new tuple, regardless of the old
data block contents.
Exactly. If we lose the first block in a checkpoint, we could lose all
updates to rows in that page and all other pages linked to it over a
whole checkpoint duration. Basically, page corruption will propogate
from block to block if we do this.
Given the marginal gain because of a low percentage of cross-block
updates, I'm not keen. Low percentage because HOT tries hard to keep
things on same block - even for non-HOT updates (which is the case,
even though it sounds weird).
But then again, full-page writes cover that too. There
will be a full-page image of the old block in the WAL anyway.
Right, but we're planning to remove that, so its not a safe assumption
to use when building new code.
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Simon Riggs
Sent: Thursday, August 09, 2012 2:49 PM
On 9 August 2012 09:49, Amit Kapila <amit.kapila@huawei.com> wrote:
I'd suggest we do this only when the saving is large enough for
benefit, rather than do this every time.Do you mean to say that when length of updated values of tuple is less
than some threshold(1/3 or 2/3, etc..) value of
total length?
Some heuristic, yes, similar to TOAST's minimum threshold. To attempt
removal of rows in all cases would not be worth it, so we need a fast
path way of saying lets just take all of the columns.
Yes, it has to be done. Currently I have 2 ideas to take care of this:
a. Based on number of updated columns
b. Based on length of updated values
If you have any other idea or you favor among one of the above, let me
know your opinion.
You don't mention whether or not the old and the new tuple are on the
same data block.WAL reduction is done for the case even when old and new are on
different
data blocks as well.
That makes me feel nervous. I doubt the marginal gain is worth it.
Most updates don't cross blocks.
How can it be proved whether gain is marginal or substantial to handle the
case.
One way is test after modification:
I have updated pg_bench tpc_b case:
1. Schema is such that it contains 1800 length rows
2. tpc_b only has updates
3. length of updated column values is 300.
4. All tables has 100% fill factor.
5. Vacuum is OFF
So in such a run, I think many should be updates are across blocks. But not
sure, neither I have verified it in any way.
The above run has given a good performance improvement.
Please also bear in mind that Andres will be looking to include the PK
columns in every WAL record for BDR. That could be an option, but I
doubt there is much value in excluding PK columns.Agreed. However once the implementation by Andres is done I can merge
both
codes and
take the performance data again, based on which we can take decision.
It won't happen like that because there won't be a single point where
Andres is done. If you agree, then its worth doing it that way to
begin with, rather than requiring us to revisit the same section of
code twice.
This optimization is to reduce the amount of WAL and definitely adding
anything extra will
have some impact.
However if there is no better way other than by including PK in WAL, then I
don't have any problem.
One huge point that needs to be thought through is how we prove this
code actually works on WAL/recovery side. A normal regression test
won't prove that and we don't have a framework in place for that.
My initial idea to validate recovery :
1. Manual Test: a. To generate enough scenarios for update operation.
b. For each scenario, make sure Replay happens properly.
2. Community Review.
With Regards,
Amit Kapila.
On 09.08.2012 14:11, Simon Riggs wrote:
Given the marginal gain because of a low percentage of cross-block
updates, I'm not keen. Low percentage because HOT tries hard to keep
things on same block - even for non-HOT updates (which is the case,
even though it sounds weird).
That depends entirely on the workload. If you do a bulk update that
updates every row on the table, most are going to be cross-block
updates, and the WAL size does matter.
But then again, full-page writes cover that too. There
will be a full-page image of the old block in the WAL anyway.Right, but we're planning to remove that, so its not a safe assumption
to use when building new code.
I don't think we're going to get rid of full-page images any time soon.
I guess you could easily check if full-page writes are enabled, though,
and only do it for cross-page updates if it is.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On 9 August 2012 12:17, Amit Kapila <amit.kapila@huawei.com> wrote:
This optimization is to reduce the amount of WAL and definitely adding
anything extra will have some impact.
Of course. The question is "How much impact?". Each tweak has
progressively less and less gain. This isn't a binary choice.
Squeezing the last ounce of performance at the expense of all other
concerns is not a sensible goal, IMHO, nor do we attempt that
elsewhere.
Given we're making no attempt to remove full page writes, which is
clearly the biggest source of WAL volume currently, micro optimisation
of other factors seems unwarranted at this stage.
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Import Notes
Reply to msg id not found: 460502096067889413@unknownmsgid
From: Heikki Linnakangas [mailto:heikki.linnakangas@enterprisedb.com]
Sent: Thursday, August 09, 2012 4:59 PM
On 09.08.2012 14:11, Simon Riggs wrote:
But then again, full-page writes cover that too. There
will be a full-page image of the old block in the WAL anyway.Right, but we're planning to remove that, so its not a safe assumption
to use when building new code.
I don't think we're going to get rid of full-page images any time soon.
I guess you could easily check if full-page writes are enabled, though,
and only do it for cross-page updates if it is.
According to my understanding you are talking about corruption due to
partial page writes which can be handled by full-page image of WAL. Correct
me if I misunderstood.
Based on that, even if full-page image is removed it will be maintained by
double buffer write[an alternative solution to full-page writes for some of
the paths] for the case of corrupt page handling.
With Regards,
Amit Kapila.
On 09.08.2012 15:56, Amit Kapila wrote:
From: Heikki Linnakangas [mailto:heikki.linnakangas@enterprisedb.com]
Sent: Thursday, August 09, 2012 4:59 PM
On 09.08.2012 14:11, Simon Riggs wrote:But then again, full-page writes cover that too. There
will be a full-page image of the old block in the WAL anyway.Right, but we're planning to remove that, so its not a safe assumption
to use when building new code.I don't think we're going to get rid of full-page images any time soon.
I guess you could easily check if full-page writes are enabled, though,
and only do it for cross-page updates if it is.According to my understanding you are talking about corruption due to
partial page writes which can be handled by full-page image of WAL. Correct
me if I misunderstood.
I meant corruption caused by anything, like disk failure, bugs, cosmic
rays, etc. The point is that currently the WAL record contains all the
information required to reconstruct the old tuple. With a diff method,
that's no longer the case, so if the old tuple gets corrupt for whatever
reason, that error will be propagated to the new tuple.
It's not an issue as long as everything works correctly, but some
redundancy is nice when you're trying to resurrect a corrupt database.
That's what we're talking about here. That said, I don't think it's a
big deal for this patch, at least not as long as full-page writes are
enabled.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
Import Notes
Reply to msg id not found: 5023b3fa.e78a440a.594f.4457SMTPIN_ADDED@mx.google.com
From: Simon Riggs [mailto:simon@2ndQuadrant.com]
Sent: Thursday, August 09, 2012 5:29 PM
On 9 August 2012 12:17, Amit Kapila <amit.kapila@huawei.com> wrote:
This optimization is to reduce the amount of WAL and definitely adding
anything extra will have some impact.
Of course. The question is "How much impact?". Each tweak has
progressively less and less gain. This isn't a binary choice.
Squeezing the last ounce of performance at the expense of all other
concerns is not a sensible goal, IMHO, nor do we attempt that
elsewhere.
Given we're making no attempt to remove full page writes, which is
clearly the biggest source of WAL volume currently, micro optimisation
of other factors seems unwarranted at this stage.
What I am pointing from WAL reduction is about Update operation performance
and
full-page writes doesn't have direct correlation with Update operation
except for
a case of first time update of page after checkpoint.
With Regards,
Amit Kapila.
On Thu, Aug 9, 2012 at 9:09 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
I meant corruption caused by anything, like disk failure, bugs, cosmic rays,
etc. The point is that currently the WAL record contains all the information
required to reconstruct the old tuple. With a diff method, that's no longer
the case, so if the old tuple gets corrupt for whatever reason, that error
will be propagated to the new tuple.It's not an issue as long as everything works correctly, but some redundancy
is nice when you're trying to resurrect a corrupt database. That's what
we're talking about here. That said, I don't think it's a big deal for this
patch, at least not as long as full-page writes are enabled.
So suppose that the following sequence of events occurs:
1. Tuple A on page 1 is updated. The new version, tuple B, is placed on page 2.
2. The table is vacuumed, removing tuple A.
3. Page 1 is written durably to disk.
4. Crash.
If reconstructing tuple B requires possession of tuple A, it seems
that we are now screwed.
No?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On 09.08.2012 19:39, Robert Haas wrote:
On Thu, Aug 9, 2012 at 9:09 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:I meant corruption caused by anything, like disk failure, bugs, cosmic rays,
etc. The point is that currently the WAL record contains all the information
required to reconstruct the old tuple. With a diff method, that's no longer
the case, so if the old tuple gets corrupt for whatever reason, that error
will be propagated to the new tuple.It's not an issue as long as everything works correctly, but some redundancy
is nice when you're trying to resurrect a corrupt database. That's what
we're talking about here. That said, I don't think it's a big deal for this
patch, at least not as long as full-page writes are enabled.So suppose that the following sequence of events occurs:
1. Tuple A on page 1 is updated. The new version, tuple B, is placed on page 2.
2. The table is vacuumed, removing tuple A.
3. Page 1 is written durably to disk.
4. Crash.If reconstructing tuple B requires possession of tuple A, it seems
that we are now screwed.
Not with full_page_writes=on, as crash recovery will restore the old
page contents. But you're right, with full_page_writes=off you are screwed.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On Thu, Aug 9, 2012 at 12:43 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
So suppose that the following sequence of events occurs:
1. Tuple A on page 1 is updated. The new version, tuple B, is placed on
page 2.
2. The table is vacuumed, removing tuple A.
3. Page 1 is written durably to disk.
4. Crash.If reconstructing tuple B requires possession of tuple A, it seems
that we are now screwed.Not with full_page_writes=on, as crash recovery will restore the old page
contents. But you're right, with full_page_writes=off you are screwed.
I think the property that recovery only needs to worry about each
block individually is one that we want to preserve. Supporting this
optimizating only when full_page_writes=off seems ugly, and I also
agree with Simon's objection upthread: the current design minimizes
the chances of corruption propagating from block to block. Even if
the proposed design is bullet-proof as of this moment (at least with
full_page_writes=on) it seems very possible that it could get
accidentally broken by future code changes, leading to hard-to-find
data corruption bugs. It might also complicate other things that we
will want to do down the line, like parallelizing recovery.
In the pgbench testing I've done, almost all of the updates are HOT,
provided you run the test long enough to reach steady state, so
restricting this optimization to HOT updates shouldn't hurt that case
(or similar real-world cases) very much. Of course there are probably
also real-world cases where HOT applies only seldom, and those cases
won't get the benefit of this, but you can't win them all.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From: Robert Haas [mailto:robertmhaas@gmail.com]
Sent: Thursday, August 09, 2012 11:18 PM
On Thu, Aug 9, 2012 at 12:43 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
So suppose that the following sequence of events occurs:
1. Tuple A on page 1 is updated. The new version, tuple B, is placed on
page 2.
2. The table is vacuumed, removing tuple A.
3. Page 1 is written durably to disk.
4. Crash.If reconstructing tuple B requires possession of tuple A, it seems
that we are now screwed.Not with full_page_writes=on, as crash recovery will restore the old page
contents. But you're right, with full_page_writes=off you are screwed.
I think the property that recovery only needs to worry about each
block individually is one that we want to preserve. Supporting this
optimizating only when full_page_writes=off seems ugly,
I think recovery needs to worry about multiple blocks as well in some cases.
Please see below case and correct me if I am wrong.
I think currently also there can be problems in case of full_page_writes=off
for crash recovery.
1. Tuple A on page 1 is updated. The new version, tuple B, is placed on
page 2.
2. Page 1 is Partially written to disk.
3. During recovery, it can so appear that there is no need to update XMAX
and other related things in Old tuple
as LSN is greater than WAL lsn.
4. Now also there can be other problems related to tuple visibility.
and I also
agree with Simon's objection upthread: the current design minimizes
the chances of corruption propagating from block to block. Even if
the proposed design is bullet-proof as of this moment (at least with
full_page_writes=on) it seems very possible that it could get
accidentally broken by future code changes, leading to hard-to-find
data corruption bugs. It might also complicate other things that we
will want to do down the line, like parallelizing recovery.
I can see the problem incase we remove full-page-writes concept and replace
with some
other equivalent concept which doesn't have current flexibility.
With Regards,
Amit Kapila.
On Fri, Aug 10, 2012 at 1:25 AM, Amit Kapila <amit.kapila@huawei.com> wrote:
I think the property that recovery only needs to worry about each
block individually is one that we want to preserve. Supporting this
optimizating only when full_page_writes=off seems ugly,I think recovery needs to worry about multiple blocks as well in some cases.
Please see below case and correct me if I am wrong.
I think currently also there can be problems in case of full_page_writes=off
for crash recovery.
1. Tuple A on page 1 is updated. The new version, tuple B, is placed on
page 2.
2. Page 1 is Partially written to disk.
3. During recovery, it can so appear that there is no need to update XMAX
and other related things in Old tuple
as LSN is greater than WAL lsn.
4. Now also there can be other problems related to tuple visibility.
Well, you're only supposed to turn full_page_writes=off if partial
page writes are impossible on your system. If you turn off
full_page_writes on a system where partial page writes are impossible,
then you've intentionally broken crash recovery, and you get to keep
both pieces.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Import Notes
Reply to msg id not found: 50249b78.6710b60a.6cab.3b7aSMTPIN_ADDED@mx.google.com
On Thursday, August 30, 2012 11:23 PM Robert Haas
[mailto:robertmhaas@gmail.com] wrote:
On Fri, Aug 10, 2012 at 1:25 AM, Amit Kapila <amit.kapila@huawei.com> wrote:
I think the property that recovery only needs to worry about each
block individually is one that we want to preserve. Supporting this
optimizating only when full_page_writes=off seems ugly,I think recovery needs to worry about multiple blocks as well in some
cases.
Please see below case and correct me if I am wrong.
I think currently also there can be problems in case of
full_page_writes=off
for crash recovery.
1. Tuple A on page 1 is updated. The new version, tuple B, is placed on
page 2.
2. Page 1 is Partially written to disk.
3. During recovery, it can so appear that there is no need to update XMAX
and other related things in Old tuple
as LSN is greater than WAL lsn.
4. Now also there can be other problems related to tuple visibility.
Well, you're only supposed to turn full_page_writes=off if partial
page writes are impossible on your system. If you turn off
full_page_writes on a system where partial page writes are impossible,
I think you mean to say "full_page_writes on a system where partial page
writes are possible."
Because if partial page writes are impossible then user should keep
full_page_writes = OFF.
then you've intentionally broken crash recovery, and you get to keep
both pieces.
Robert, in broad I got your and Simon's idea that we should do
optimization of WAL (Reduce) in case update happens
on same page. I have implemented the final Patch which does WAL
optimization only in case when updated tuple is on same
page. Also we have observed that with fillfactor 80 the performance
improvement is good.
With Regards,
Amit Kapila.