Add +(pg_lsn,numeric) and -(pg_lsn,numeric) operators.
authorFujii Masao <fujii@postgresql.org>
Tue, 30 Jun 2020 14:55:07 +0000 (23:55 +0900)
committerFujii Masao <fujii@postgresql.org>
Tue, 30 Jun 2020 14:55:07 +0000 (23:55 +0900)
By using these operators, the number of bytes can be added into and
subtracted from LSN.

Bump catalog version.

Author: Fujii Masao
Reviewed-by: Kyotaro Horiguchi, Michael Paquier, Asif Rehman
Discussion: https://postgr.es/m/ed9f7f74-e996-67f8-554a-52ebd3779b3b@oss.nttdata.com

doc/src/sgml/datatype.sgml
src/backend/utils/adt/numeric.c
src/backend/utils/adt/pg_lsn.c
src/include/catalog/catversion.h
src/include/catalog/pg_operator.dat
src/include/catalog/pg_proc.dat
src/test/regress/expected/numeric.out
src/test/regress/expected/pg_lsn.out
src/test/regress/sql/numeric.sql
src/test/regress/sql/pg_lsn.sql

index 49fb19ff9194e9b6cc2c2cf284d8a54dec2875d3..7027758d28dd7105a604a6a0c94b4fa47ebe7a43 100644 (file)
@@ -4801,7 +4801,13 @@ SELECT * FROM pg_attribute
     standard comparison operators, like <literal>=</literal> and
     <literal>&gt;</literal>.  Two LSNs can be subtracted using the
     <literal>-</literal> operator; the result is the number of bytes separating
-    those write-ahead log locations.
+    those write-ahead log locations.  Also the number of bytes can be
+    added into and subtracted from LSN using the
+    <literal>+(pg_lsn,numeric)</literal> and
+    <literal>-(pg_lsn,numeric)</literal> operators, respectively. Note that
+    the calculated LSN should be in the range of <type>pg_lsn</type> type,
+    i.e., between <literal>0/0</literal> and
+    <literal>FFFFFFFF/FFFFFFFF</literal>.
    </para>
   </sect1>
 
index 5f23f2afac86af1b8f51ded0bd7f12749824370e..1773fa292e49e23f5f0be8eeb4f5b1da31f4adb9 100644 (file)
@@ -41,6 +41,7 @@
 #include "utils/guc.h"
 #include "utils/int8.h"
 #include "utils/numeric.h"
+#include "utils/pg_lsn.h"
 #include "utils/sortsupport.h"
 
 /* ----------
@@ -472,6 +473,7 @@ static void apply_typmod(NumericVar *var, int32 typmod);
 static bool numericvar_to_int32(const NumericVar *var, int32 *result);
 static bool numericvar_to_int64(const NumericVar *var, int64 *result);
 static void int64_to_numericvar(int64 val, NumericVar *var);
+static bool numericvar_to_uint64(const NumericVar *var, uint64 *result);
 #ifdef HAVE_INT128
 static bool numericvar_to_int128(const NumericVar *var, int128 *result);
 static void int128_to_numericvar(int128 val, NumericVar *var);
@@ -3692,6 +3694,30 @@ numeric_float4(PG_FUNCTION_ARGS)
 }
 
 
+Datum
+numeric_pg_lsn(PG_FUNCTION_ARGS)
+{
+   Numeric     num = PG_GETARG_NUMERIC(0);
+   NumericVar  x;
+   XLogRecPtr  result;
+
+   if (NUMERIC_IS_NAN(num))
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("cannot convert NaN to pg_lsn")));
+
+   /* Convert to variable format and thence to pg_lsn */
+   init_var_from_num(num, &x);
+
+   if (!numericvar_to_uint64(&x, (uint64 *) &result))
+       ereport(ERROR,
+               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                errmsg("pg_lsn out of range")));
+
+   PG_RETURN_LSN(result);
+}
+
+
 /* ----------------------------------------------------------------------
  *
  * Aggregate functions
@@ -6742,6 +6768,78 @@ int64_to_numericvar(int64 val, NumericVar *var)
    var->weight = ndigits - 1;
 }
 
+/*
+ * Convert numeric to uint64, rounding if needed.
+ *
+ * If overflow, return false (no error is raised).  Return true if okay.
+ */
+static bool
+numericvar_to_uint64(const NumericVar *var, uint64 *result)
+{
+   NumericDigit *digits;
+   int         ndigits;
+   int         weight;
+   int         i;
+   uint64      val;
+   NumericVar  rounded;
+
+   /* Round to nearest integer */
+   init_var(&rounded);
+   set_var_from_var(var, &rounded);
+   round_var(&rounded, 0);
+
+   /* Check for zero input */
+   strip_var(&rounded);
+   ndigits = rounded.ndigits;
+   if (ndigits == 0)
+   {
+       *result = 0;
+       free_var(&rounded);
+       return true;
+   }
+
+   /* Check for negative input */
+   if (rounded.sign == NUMERIC_NEG)
+   {
+       free_var(&rounded);
+       return false;
+   }
+
+   /*
+    * For input like 10000000000, we must treat stripped digits as real. So
+    * the loop assumes there are weight+1 digits before the decimal point.
+    */
+   weight = rounded.weight;
+   Assert(weight >= 0 && ndigits <= weight + 1);
+
+   /* Construct the result */
+   digits = rounded.digits;
+   val = digits[0];
+   for (i = 1; i <= weight; i++)
+   {
+       if (unlikely(pg_mul_u64_overflow(val, NBASE, &val)))
+       {
+           free_var(&rounded);
+           return false;
+       }
+
+       if (i < ndigits)
+       {
+           if (unlikely(pg_add_u64_overflow(val, digits[i], &val)))
+           {
+               free_var(&rounded);
+               return false;
+           }
+       }
+   }
+
+   free_var(&rounded);
+
+   *result = val;
+
+   return true;
+}
+
 #ifdef HAVE_INT128
 /*
  * Convert numeric to int128, rounding if needed.
index d9754a7778c9bdb8d053e809918644f2fc469e20..ad0a7bd869d1e9e203fb910bd80a5cc7f5066361 100644 (file)
@@ -16,6 +16,7 @@
 #include "funcapi.h"
 #include "libpq/pqformat.h"
 #include "utils/builtins.h"
+#include "utils/numeric.h"
 #include "utils/pg_lsn.h"
 
 #define MAXPG_LSNLEN           17
@@ -248,3 +249,71 @@ pg_lsn_mi(PG_FUNCTION_ARGS)
 
    return result;
 }
+
+/*
+ * Add the number of bytes to pg_lsn, giving a new pg_lsn.
+ * Must handle both positive and negative numbers of bytes.
+ */
+Datum
+pg_lsn_pli(PG_FUNCTION_ARGS)
+{
+   XLogRecPtr  lsn = PG_GETARG_LSN(0);
+   Numeric     nbytes = PG_GETARG_NUMERIC(1);
+   Datum       num;
+   Datum       res;
+   char        buf[32];
+
+   if (numeric_is_nan(nbytes))
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("cannot add NaN to pg_lsn")));
+
+   /* Convert to numeric */
+   snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn);
+   num = DirectFunctionCall3(numeric_in,
+                             CStringGetDatum(buf),
+                             ObjectIdGetDatum(0),
+                             Int32GetDatum(-1));
+
+   /* Add two numerics */
+   res = DirectFunctionCall2(numeric_add,
+                             NumericGetDatum(num),
+                             NumericGetDatum(nbytes));
+
+   /* Convert to pg_lsn */
+   return DirectFunctionCall1(numeric_pg_lsn, res);
+}
+
+/*
+ * Subtract the number of bytes from pg_lsn, giving a new pg_lsn.
+ * Must handle both positive and negative numbers of bytes.
+ */
+Datum
+pg_lsn_mii(PG_FUNCTION_ARGS)
+{
+   XLogRecPtr  lsn = PG_GETARG_LSN(0);
+   Numeric     nbytes = PG_GETARG_NUMERIC(1);
+   Datum       num;
+   Datum       res;
+   char        buf[32];
+
+   if (numeric_is_nan(nbytes))
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("cannot subtract NaN from pg_lsn")));
+
+   /* Convert to numeric */
+   snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn);
+   num = DirectFunctionCall3(numeric_in,
+                             CStringGetDatum(buf),
+                             ObjectIdGetDatum(0),
+                             Int32GetDatum(-1));
+
+   /* Subtract two numerics */
+   res = DirectFunctionCall2(numeric_sub,
+                             NumericGetDatum(num),
+                             NumericGetDatum(nbytes));
+
+   /* Convert to pg_lsn */
+   return DirectFunctionCall1(numeric_pg_lsn, res);
+}
index 7644147cf5c217f74881912bd8af2080273fbce2..a433bf52c1bdc104b1e65d9372027958b6bf490f 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202006151
+#define CATALOG_VERSION_NO 202006301
 
 #endif
index 59771f606d4f4387bbfc546f6f0f160c0c169b99..5b0e063655d33d85348a47cafd77cf998aaa4b38 100644 (file)
 { oid => '3228', descr => 'minus',
   oprname => '-', oprleft => 'pg_lsn', oprright => 'pg_lsn',
   oprresult => 'numeric', oprcode => 'pg_lsn_mi' },
+{ oid => '5025', descr => 'add',
+  oprname => '+', oprleft => 'pg_lsn', oprright => 'numeric',
+  oprresult => 'pg_lsn', oprcom => '+(numeric,pg_lsn)',
+  oprcode => 'pg_lsn_pli' },
+{ oid => '5026', descr => 'add',
+  oprname => '+', oprleft => 'numeric', oprright => 'pg_lsn',
+  oprresult => 'pg_lsn', oprcom => '+(pg_lsn,numeric)',
+  oprcode => 'numeric_pl_pg_lsn' },
+{ oid => '5027', descr => 'subtract',
+  oprname => '-', oprleft => 'pg_lsn', oprright => 'numeric',
+  oprresult => 'pg_lsn', oprcode => 'pg_lsn_mii' },
 
 # enum operators
 { oid => '3516', descr => 'equal',
index 61f2c2f5b4905eb77743b32d100f775496672584..38295aca4831e97514fa31446c1b1b2a79985c4b 100644 (file)
 { oid => '1783', descr => 'convert numeric to int2',
   proname => 'int2', prorettype => 'int2', proargtypes => 'numeric',
   prosrc => 'numeric_int2' },
+{ oid => '6103', descr => 'convert numeric to pg_lsn',
+  proname => 'pg_lsn', prorettype => 'pg_lsn', proargtypes => 'numeric',
+  prosrc => 'numeric_pg_lsn' },
 
 { oid => '3556', descr => 'convert jsonb to boolean',
   proname => 'bool', prorettype => 'bool', proargtypes => 'jsonb',
 { oid => '4188', descr => 'smaller of two',
   proname => 'pg_lsn_smaller', prorettype => 'pg_lsn',
   proargtypes => 'pg_lsn pg_lsn', prosrc => 'pg_lsn_smaller' },
+{ oid => '5022',
+  proname => 'pg_lsn_pli', prorettype => 'pg_lsn',
+  proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_pli' },
+{ oid => '5023',
+  proname => 'numeric_pl_pg_lsn', prolang => 'sql', prorettype => 'pg_lsn',
+  proargtypes => 'numeric pg_lsn', prosrc => 'select $2 + $1' },
+{ oid => '5024',
+  proname => 'pg_lsn_mii', prorettype => 'pg_lsn',
+  proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_mii' },
 
 # enum related procs
 { oid => '3504', descr => 'I/O',
index 2f3ecb50a73372019b25d60be19a8fc22d63260c..81a0c5d40f714575103b7e098fceb225741f4ba2 100644 (file)
@@ -2348,3 +2348,30 @@ SELECT -4!;
 ERROR:  factorial of a negative number is undefined
 SELECT factorial(-4);
 ERROR:  factorial of a negative number is undefined
+--
+-- Tests for pg_lsn()
+--
+SELECT pg_lsn(23783416::numeric);
+  pg_lsn   
+-----------
+ 0/16AE7F8
+(1 row)
+
+SELECT pg_lsn(0::numeric);
+ pg_lsn 
+--------
+ 0/0
+(1 row)
+
+SELECT pg_lsn(18446744073709551615::numeric);
+      pg_lsn       
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT pg_lsn(-1::numeric);
+ERROR:  pg_lsn out of range
+SELECT pg_lsn(18446744073709551616::numeric);
+ERROR:  pg_lsn out of range
+SELECT pg_lsn('NaN'::numeric);
+ERROR:  cannot convert NaN to pg_lsn
index 64d41dfdad269ef7cf32c23a97fbfa1b03e1cbbd..99a748a6a765168b33d2c32556783deca0836323 100644 (file)
@@ -71,6 +71,56 @@ SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn;
         1
 (1 row)
 
+SELECT '0/16AE7F7'::pg_lsn + 16::numeric;
+ ?column?  
+-----------
+ 0/16AE807
+(1 row)
+
+SELECT 16::numeric + '0/16AE7F7'::pg_lsn;
+ ?column?  
+-----------
+ 0/16AE807
+(1 row)
+
+SELECT '0/16AE7F7'::pg_lsn - 16::numeric;
+ ?column?  
+-----------
+ 0/16AE7E7
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric;
+     ?column?      
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error
+ERROR:  pg_lsn out of range
+SELECT '0/1'::pg_lsn - 1::numeric;
+ ?column? 
+----------
+ 0/0
+(1 row)
+
+SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error
+ERROR:  pg_lsn out of range
+SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+     ?column?      
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+ ?column? 
+----------
+ 0/0
+(1 row)
+
+SELECT '0/16AE7F7'::pg_lsn + 'NaN'::numeric;
+ERROR:  cannot add NaN to pg_lsn
+SELECT '0/16AE7F7'::pg_lsn - 'NaN'::numeric;
+ERROR:  cannot subtract NaN from pg_lsn
 -- Check btree and hash opclasses
 EXPLAIN (COSTS OFF)
 SELECT DISTINCT (i || '/' || j)::pg_lsn f
index 1332a9cf07a6c0dc6a7ab6df6705672636d2c0e5..5dc80f686f4814c2bb273e5059a8bc50aa29d78b 100644 (file)
@@ -1122,3 +1122,13 @@ SELECT 100000!;
 SELECT 0!;
 SELECT -4!;
 SELECT factorial(-4);
+
+--
+-- Tests for pg_lsn()
+--
+SELECT pg_lsn(23783416::numeric);
+SELECT pg_lsn(0::numeric);
+SELECT pg_lsn(18446744073709551615::numeric);
+SELECT pg_lsn(-1::numeric);
+SELECT pg_lsn(18446744073709551616::numeric);
+SELECT pg_lsn('NaN'::numeric);
index 2c143c82ffe74e59ed11667ac89556a531eb7fd3..615368ba960be3b23d9554e06578085ac9acdfd9 100644 (file)
@@ -27,6 +27,17 @@ SELECT '0/16AE7F7' < '0/16AE7F8'::pg_lsn;
 SELECT '0/16AE7F8' > pg_lsn '0/16AE7F7';
 SELECT '0/16AE7F7'::pg_lsn - '0/16AE7F8'::pg_lsn;
 SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn;
+SELECT '0/16AE7F7'::pg_lsn + 16::numeric;
+SELECT 16::numeric + '0/16AE7F7'::pg_lsn;
+SELECT '0/16AE7F7'::pg_lsn - 16::numeric;
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric;
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error
+SELECT '0/1'::pg_lsn - 1::numeric;
+SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error
+SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+SELECT '0/16AE7F7'::pg_lsn + 'NaN'::numeric;
+SELECT '0/16AE7F7'::pg_lsn - 'NaN'::numeric;
 
 -- Check btree and hash opclasses
 EXPLAIN (COSTS OFF)