Avoid unnecessary catalog updates in ALTER SEQUENCE
authorPeter Eisentraut <peter_e@gmx.net>
Tue, 2 May 2017 14:41:48 +0000 (10:41 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Tue, 2 May 2017 14:41:48 +0000 (10:41 -0400)
ALTER SEQUENCE can do nontransactional changes to the sequence (RESTART
clause) and transactional updates to the pg_sequence catalog (most other
clauses).  When just calling RESTART, the code would still needlessly do
a catalog update without any changes.  This would entangle that
operation in the concurrency issues of a catalog update (causing either
locking or concurrency errors, depending on how that issue is to be
resolved).

Fix by keeping track during options parsing whether a catalog update is
needed, and skip it if not.

Reported-by: Jason Petersen <jason@citusdata.com>
src/backend/commands/sequence.c

index acd4e359bb4521fc64bc05f0007263970904d9cf..4a56926b5306b47ac1e7c80c5c6fa6488157b658 100644 (file)
@@ -101,6 +101,7 @@ static Form_pg_sequence_data read_seq_tuple(Relation rel,
 static void init_params(ParseState *pstate, List *options, bool for_identity,
                        bool isInit,
                        Form_pg_sequence seqform,
+                       bool *changed_seqform,
                        Form_pg_sequence_data seqdataform, List **owned_by);
 static void do_setval(Oid relid, int64 next, bool iscalled);
 static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity);
@@ -115,6 +116,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
 {
    FormData_pg_sequence seqform;
    FormData_pg_sequence_data seqdataform;
+   bool        changed_seqform = false; /* not used here */
    List       *owned_by;
    CreateStmt *stmt = makeNode(CreateStmt);
    Oid         seqoid;
@@ -153,7 +155,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
    }
 
    /* Check and set all option values */
-   init_params(pstate, seq->options, seq->for_identity, true, &seqform, &seqdataform, &owned_by);
+   init_params(pstate, seq->options, seq->for_identity, true, &seqform, &changed_seqform, &seqdataform, &owned_by);
 
    /*
     * Create relation (and fill value[] and null[] for the tuple)
@@ -418,6 +420,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
    Form_pg_sequence seqform;
    Form_pg_sequence_data seqdata;
    FormData_pg_sequence_data newseqdata;
+   bool        changed_seqform = false;
    List       *owned_by;
    ObjectAddress address;
    Relation    rel;
@@ -443,7 +446,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
    /* lock page' buffer and read tuple into new sequence structure */
    seqdata = read_seq_tuple(seqrel, &buf, &seqdatatuple);
 
-   /* Copy old values of options into workspace */
+   /* Copy old sequence data into workspace */
    memcpy(&newseqdata, seqdata, sizeof(FormData_pg_sequence_data));
 
    rel = heap_open(SequenceRelationId, RowExclusiveLock);
@@ -456,7 +459,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
    seqform = (Form_pg_sequence) GETSTRUCT(tuple);
 
    /* Check and set new values */
-   init_params(pstate, stmt->options, stmt->for_identity, false, seqform, &newseqdata, &owned_by);
+   init_params(pstate, stmt->options, stmt->for_identity, false, seqform, &changed_seqform, &newseqdata, &owned_by);
 
    /* Clear local cache so that we don't think we have cached numbers */
    /* Note that we do not change the currval() state */
@@ -507,7 +510,8 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
 
    relation_close(seqrel, NoLock);
 
-   CatalogTupleUpdate(rel, &tuple->t_self, tuple);
+   if (changed_seqform)
+       CatalogTupleUpdate(rel, &tuple->t_self, tuple);
    heap_close(rel, RowExclusiveLock);
 
    return address;
@@ -1213,9 +1217,12 @@ read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple)
 }
 
 /*
- * init_params: process the options list of CREATE or ALTER SEQUENCE,
- * and store the values into appropriate fields of *new.  Also set
- * *owned_by to any OWNED BY option, or to NIL if there is none.
+ * init_params: process the options list of CREATE or ALTER SEQUENCE, and
+ * store the values into appropriate fields of seqform, for changes that go
+ * into the pg_sequence catalog, and seqdataform for changes to the sequence
+ * relation itself.  Set *changed_seqform to true if seqform was changed
+ * (interesting for ALTER SEQUENCE).  Also set *owned_by to any OWNED BY
+ * option, or to NIL if there is none.
  *
  * If isInit is true, fill any unspecified options with default values;
  * otherwise, do not change existing options that aren't explicitly overridden.
@@ -1224,7 +1231,9 @@ static void
 init_params(ParseState *pstate, List *options, bool for_identity,
            bool isInit,
            Form_pg_sequence seqform,
-           Form_pg_sequence_data seqdataform, List **owned_by)
+           bool *changed_seqform,
+           Form_pg_sequence_data seqdataform,
+           List **owned_by)
 {
    DefElem    *as_type = NULL;
    DefElem    *start_value = NULL;
@@ -1342,6 +1351,8 @@ init_params(ParseState *pstate, List *options, bool for_identity,
                 defel->defname);
    }
 
+   *changed_seqform = false;
+
    /*
     * We must reset log_cnt when isInit or when changing any parameters that
     * would affect future nextval allocations.
@@ -1382,14 +1393,19 @@ init_params(ParseState *pstate, List *options, bool for_identity,
        }
 
        seqform->seqtypid = newtypid;
+       *changed_seqform = true;
    }
    else if (isInit)
+   {
        seqform->seqtypid = INT8OID;
+       *changed_seqform = true;
+   }
 
    /* INCREMENT BY */
    if (increment_by != NULL)
    {
        seqform->seqincrement = defGetInt64(increment_by);
+       *changed_seqform = true;
        if (seqform->seqincrement == 0)
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -1397,22 +1413,30 @@ init_params(ParseState *pstate, List *options, bool for_identity,
        seqdataform->log_cnt = 0;
    }
    else if (isInit)
+   {
        seqform->seqincrement = 1;
+       *changed_seqform = true;
+   }
 
    /* CYCLE */
    if (is_cycled != NULL)
    {
        seqform->seqcycle = intVal(is_cycled->arg);
+       *changed_seqform = true;
        Assert(BoolIsValid(seqform->seqcycle));
        seqdataform->log_cnt = 0;
    }
    else if (isInit)
+   {
        seqform->seqcycle = false;
+       *changed_seqform = true;
+   }
 
    /* MAXVALUE (null arg means NO MAXVALUE) */
    if (max_value != NULL && max_value->arg)
    {
        seqform->seqmax = defGetInt64(max_value);
+       *changed_seqform = true;
        seqdataform->log_cnt = 0;
    }
    else if (isInit || max_value != NULL || reset_max_value)
@@ -1429,6 +1453,7 @@ init_params(ParseState *pstate, List *options, bool for_identity,
        }
        else
            seqform->seqmax = -1;   /* descending seq */
+       *changed_seqform = true;
        seqdataform->log_cnt = 0;
    }
 
@@ -1450,6 +1475,7 @@ init_params(ParseState *pstate, List *options, bool for_identity,
    if (min_value != NULL && min_value->arg)
    {
        seqform->seqmin = defGetInt64(min_value);
+       *changed_seqform = true;
        seqdataform->log_cnt = 0;
    }
    else if (isInit || min_value != NULL || reset_min_value)
@@ -1466,6 +1492,7 @@ init_params(ParseState *pstate, List *options, bool for_identity,
        }
        else
            seqform->seqmin = 1; /* ascending seq */
+       *changed_seqform = true;
        seqdataform->log_cnt = 0;
    }
 
@@ -1499,13 +1526,17 @@ init_params(ParseState *pstate, List *options, bool for_identity,
 
    /* START WITH */
    if (start_value != NULL)
+   {
        seqform->seqstart = defGetInt64(start_value);
+       *changed_seqform = true;
+   }
    else if (isInit)
    {
        if (seqform->seqincrement > 0)
            seqform->seqstart = seqform->seqmin;    /* ascending seq */
        else
            seqform->seqstart = seqform->seqmax;    /* descending seq */
+       *changed_seqform = true;
    }
 
    /* crosscheck START */
@@ -1580,6 +1611,7 @@ init_params(ParseState *pstate, List *options, bool for_identity,
    if (cache_value != NULL)
    {
        seqform->seqcache = defGetInt64(cache_value);
+       *changed_seqform = true;
        if (seqform->seqcache <= 0)
        {
            char        buf[100];
@@ -1593,7 +1625,10 @@ init_params(ParseState *pstate, List *options, bool for_identity,
        seqdataform->log_cnt = 0;
    }
    else if (isInit)
+   {
        seqform->seqcache = 1;
+       *changed_seqform = true;
+   }
 }
 
 /*