Reimplement text_position and related functions to use Boyer-Moore-Horspool
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 7 Sep 2008 04:20:00 +0000 (04:20 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 7 Sep 2008 04:20:00 +0000 (04:20 +0000)
searching instead of naive matching.  In the worst case this has the same
O(M*N) complexity as the naive method, but the worst case is hard to hit,
and the average case is very fast, especially with longer patterns.

David Rowley

src/backend/utils/adt/varlena.c

index 164ff8495605c06efcff12860ddaf51a470a1bbe..21c99133f609c7cb5fb5567683fcbc7feb7d50e9 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/utils/adt/varlena.c,v 1.167 2008/05/27 00:13:09 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/utils/adt/varlena.c,v 1.168 2008/09/07 04:20:00 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -39,6 +39,9 @@ typedef struct
    pg_wchar   *wstr2;          /* note: these are palloc'd */
    int         len1;           /* string lengths in logical characters */
    int         len2;
+   /* Skip table for Boyer-Moore-Horspool search algorithm: */
+   int         skiptablemask;  /* mask for ANDing with skiptable subscripts */
+   int         skiptable[256]; /* skip distance for given mismatched char */
 } TextPositionState;
 
 #define DatumGetUnknownP(X)            ((unknown *) PG_DETOAST_DATUM(X))
@@ -753,7 +756,7 @@ text_substring(Datum str, int32 start, int32 length, bool length_not_specified)
         * If we're working with an untoasted source, no need to do an extra
         * copying step.
         */
-       if (VARATT_IS_COMPRESSED(DatumGetPointer(str)) || 
+       if (VARATT_IS_COMPRESSED(DatumGetPointer(str)) ||
            VARATT_IS_EXTERNAL(DatumGetPointer(str)))
            slice = DatumGetTextPSlice(str, slice_start, slice_size);
        else
@@ -866,6 +869,7 @@ text_position(text *t1, text *t2)
    return result;
 }
 
+
 /*
  * text_position_setup, text_position_next, text_position_cleanup -
  * Component steps of text_position()
@@ -909,64 +913,215 @@ text_position_setup(text *t1, text *t2, TextPositionState *state)
        state->len1 = len1;
        state->len2 = len2;
    }
+
+   /*
+    * Prepare the skip table for Boyer-Moore-Horspool searching.  In these
+    * notes we use the terminology that the "haystack" is the string to be
+    * searched (t1) and the "needle" is the pattern being sought (t2).
+    *
+    * If the needle is empty or bigger than the haystack then there is no
+    * point in wasting cycles initializing the table.  We also choose not
+    * to use B-M-H for needles of length 1, since the skip table can't
+    * possibly save anything in that case.
+    */
+   if (len1 >= len2 && len2 > 1)
+   {
+       int     searchlength = len1 - len2;
+       int     skiptablemask;
+       int     last;
+       int     i;
+
+       /*
+        * First we must determine how much of the skip table to use.  The
+        * declaration of TextPositionState allows up to 256 elements, but for
+        * short search problems we don't really want to have to initialize so
+        * many elements --- it would take too long in comparison to the
+        * actual search time.  So we choose a useful skip table size based on
+        * the haystack length minus the needle length.  The closer the needle
+        * length is to the haystack length the less useful skipping becomes.
+        *
+        * Note: since we use bit-masking to select table elements, the skip
+        * table size MUST be a power of 2, and so the mask must be 2^N-1.
+        */
+       if (searchlength < 16)
+           skiptablemask = 3;
+       else if (searchlength < 64)
+           skiptablemask = 7;
+       else if (searchlength < 128)
+           skiptablemask = 15;
+       else if (searchlength < 512)
+           skiptablemask = 31;
+       else if (searchlength < 2048)
+           skiptablemask = 63;
+       else if (searchlength < 4096)
+           skiptablemask = 127;
+       else
+           skiptablemask = 255;
+       state->skiptablemask = skiptablemask;
+
+       /*
+        * Initialize the skip table.  We set all elements to the needle
+        * length, since this is the correct skip distance for any character
+        * not found in the needle.
+        */
+       for (i = 0; i <= skiptablemask; i++)
+           state->skiptable[i] = len2;
+
+       /*
+        * Now examine the needle.  For each character except the last one,
+        * set the corresponding table element to the appropriate skip
+        * distance.  Note that when two characters share the same skip table
+        * entry, the one later in the needle must determine the skip distance.
+        */
+       last = len2 - 1;
+
+       if (!state->use_wchar)
+       {
+           const char *str2 = state->str2;
+
+           for (i = 0; i < last; i++)
+               state->skiptable[(unsigned char) str2[i] & skiptablemask] = last - i;
+       }
+       else
+       {
+           const pg_wchar *wstr2 = state->wstr2;
+
+           for (i = 0; i < last; i++)
+               state->skiptable[wstr2[i] & skiptablemask] = last - i;
+       }
+   }
 }
 
 static int
 text_position_next(int start_pos, TextPositionState *state)
 {
-   int         pos = 0,
-               p,
-               px;
+   int         haystack_len = state->len1;
+   int         needle_len = state->len2;
+   int         skiptablemask = state->skiptablemask;
 
    Assert(start_pos > 0);      /* else caller error */
 
-   if (state->len2 <= 0)
+   if (needle_len <= 0)
        return start_pos;       /* result for empty pattern */
 
+   start_pos--;                /* adjust for zero based arrays */
+
+   /* Done if the needle can't possibly fit */
+   if (haystack_len < start_pos + needle_len)
+       return 0;
+
    if (!state->use_wchar)
    {
        /* simple case - single byte encoding */
-       char       *p1 = state->str1;
-       char       *p2 = state->str2;
+       const char *haystack = state->str1;
+       const char *needle = state->str2;
+       const char *haystack_end = &haystack[haystack_len];
+       const char *hptr;
 
-       /* no use in searching str past point where search_str will fit */
-       px = (state->len1 - state->len2);
-
-       p1 += start_pos - 1;
+       if (needle_len == 1)
+       {
+           /* No point in using B-M-H for a one-character needle */
+           char    nchar = *needle;
 
-       for (p = start_pos - 1; p <= px; p++)
+           hptr = &haystack[start_pos];
+           while (hptr < haystack_end)
+           {
+               if (*hptr == nchar)
+                   return hptr - haystack + 1;
+               hptr++;
+           }
+       }
+       else
        {
-           if ((*p1 == *p2) && (strncmp(p1, p2, state->len2) == 0))
+           const char *needle_last = &needle[needle_len - 1];
+
+           /* Start at startpos plus the length of the needle */
+           hptr = &haystack[start_pos + needle_len - 1];
+           while (hptr < haystack_end)
            {
-               pos = p + 1;
-               break;
+               /* Match the needle scanning *backward* */
+               const char *nptr;
+               const char *p;
+
+               nptr = needle_last;
+               p = hptr;
+               while (*nptr == *p)
+               {
+                   /* Matched it all?  If so, return 1-based position */
+                   if (nptr == needle)
+                       return p - haystack + 1;
+                   nptr--, p--;
+               }
+               /*
+                * No match, so use the haystack char at hptr to decide how
+                * far to advance.  If the needle had any occurrence of that
+                * character (or more precisely, one sharing the same
+                * skiptable entry) before its last character, then we advance
+                * far enough to align the last such needle character with
+                * that haystack position.  Otherwise we can advance by the
+                * whole needle length.
+                */
+               hptr += state->skiptable[(unsigned char) *hptr & skiptablemask];
            }
-           p1++;
        }
    }
    else
    {
-       /* not as simple - multibyte encoding */
-       pg_wchar   *p1 = state->wstr1;
-       pg_wchar   *p2 = state->wstr2;
+       /* The multibyte char version. This works exactly the same way. */
+       const pg_wchar *haystack = state->wstr1;
+       const pg_wchar *needle = state->wstr2;
+       const pg_wchar *haystack_end = &haystack[haystack_len];
+       const pg_wchar *hptr;
 
-       /* no use in searching str past point where search_str will fit */
-       px = (state->len1 - state->len2);
-
-       p1 += start_pos - 1;
+       if (needle_len == 1)
+       {
+           /* No point in using B-M-H for a one-character needle */
+           pg_wchar    nchar = *needle;
 
-       for (p = start_pos - 1; p <= px; p++)
+           hptr = &haystack[start_pos];
+           while (hptr < haystack_end)
+           {
+               if (*hptr == nchar)
+                   return hptr - haystack + 1;
+               hptr++;
+           }
+       }
+       else
        {
-           if ((*p1 == *p2) && (pg_wchar_strncmp(p1, p2, state->len2) == 0))
+           const pg_wchar *needle_last = &needle[needle_len - 1];
+
+           /* Start at startpos plus the length of the needle */
+           hptr = &haystack[start_pos + needle_len - 1];
+           while (hptr < haystack_end)
            {
-               pos = p + 1;
-               break;
+               /* Match the needle scanning *backward* */
+               const pg_wchar *nptr;
+               const pg_wchar *p;
+
+               nptr = needle_last;
+               p = hptr;
+               while (*nptr == *p)
+               {
+                   /* Matched it all?  If so, return 1-based position */
+                   if (nptr == needle)
+                       return p - haystack + 1;
+                   nptr--, p--;
+               }
+               /*
+                * No match, so use the haystack char at hptr to decide how
+                * far to advance.  If the needle had any occurrence of that
+                * character (or more precisely, one sharing the same
+                * skiptable entry) before its last character, then we advance
+                * far enough to align the last such needle character with
+                * that haystack position.  Otherwise we can advance by the
+                * whole needle length.
+                */
+               hptr += state->skiptable[*hptr & skiptablemask];
            }
-           p1++;
        }
    }
 
-   return pos;
+   return 0;                   /* not found */
 }
 
 static void