OSDN Git Service

Reimplement text_position and related functions to use Boyer-Moore-Horspool
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 7 Sep 2008 04:20:00 +0000 (04:20 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 7 Sep 2008 04:20:00 +0000 (04:20 +0000)
searching instead of naive matching.  In the worst case this has the same
O(M*N) complexity as the naive method, but the worst case is hard to hit,
and the average case is very fast, especially with longer patterns.

David Rowley

src/backend/utils/adt/varlena.c

index 164ff84..21c9913 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/adt/varlena.c,v 1.167 2008/05/27 00:13:09 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/adt/varlena.c,v 1.168 2008/09/07 04:20:00 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -39,6 +39,9 @@ typedef struct
        pg_wchar   *wstr2;                      /* note: these are palloc'd */
        int                     len1;                   /* string lengths in logical characters */
        int                     len2;
+       /* Skip table for Boyer-Moore-Horspool search algorithm: */
+       int                     skiptablemask;  /* mask for ANDing with skiptable subscripts */
+       int                     skiptable[256]; /* skip distance for given mismatched char */
 } TextPositionState;
 
 #define DatumGetUnknownP(X)                    ((unknown *) PG_DETOAST_DATUM(X))
@@ -753,7 +756,7 @@ text_substring(Datum str, int32 start, int32 length, bool length_not_specified)
                 * If we're working with an untoasted source, no need to do an extra
                 * copying step.
                 */
-               if (VARATT_IS_COMPRESSED(DatumGetPointer(str)) || 
+               if (VARATT_IS_COMPRESSED(DatumGetPointer(str)) ||
                        VARATT_IS_EXTERNAL(DatumGetPointer(str)))
                        slice = DatumGetTextPSlice(str, slice_start, slice_size);
                else
@@ -866,6 +869,7 @@ text_position(text *t1, text *t2)
        return result;
 }
 
+
 /*
  * text_position_setup, text_position_next, text_position_cleanup -
  *     Component steps of text_position()
@@ -909,64 +913,215 @@ text_position_setup(text *t1, text *t2, TextPositionState *state)
                state->len1 = len1;
                state->len2 = len2;
        }
+
+       /*
+        * Prepare the skip table for Boyer-Moore-Horspool searching.  In these
+        * notes we use the terminology that the "haystack" is the string to be
+        * searched (t1) and the "needle" is the pattern being sought (t2).
+        *
+        * If the needle is empty or bigger than the haystack then there is no
+        * point in wasting cycles initializing the table.  We also choose not
+        * to use B-M-H for needles of length 1, since the skip table can't
+        * possibly save anything in that case.
+        */
+       if (len1 >= len2 && len2 > 1)
+       {
+               int             searchlength = len1 - len2;
+               int     skiptablemask;
+               int     last;
+               int     i;
+
+               /*
+                * First we must determine how much of the skip table to use.  The
+                * declaration of TextPositionState allows up to 256 elements, but for
+                * short search problems we don't really want to have to initialize so
+                * many elements --- it would take too long in comparison to the
+                * actual search time.  So we choose a useful skip table size based on
+                * the haystack length minus the needle length.  The closer the needle
+                * length is to the haystack length the less useful skipping becomes.
+                *
+                * Note: since we use bit-masking to select table elements, the skip
+                * table size MUST be a power of 2, and so the mask must be 2^N-1.
+                */
+               if (searchlength < 16)
+                       skiptablemask = 3;
+               else if (searchlength < 64)
+                       skiptablemask = 7;
+               else if (searchlength < 128)
+                       skiptablemask = 15;
+               else if (searchlength < 512)
+                       skiptablemask = 31;
+               else if (searchlength < 2048)
+                       skiptablemask = 63;
+               else if (searchlength < 4096)
+                       skiptablemask = 127;
+               else
+                       skiptablemask = 255;
+               state->skiptablemask = skiptablemask;
+
+               /*
+                * Initialize the skip table.  We set all elements to the needle
+                * length, since this is the correct skip distance for any character
+                * not found in the needle.
+                */
+               for (i = 0; i <= skiptablemask; i++)
+                       state->skiptable[i] = len2;
+
+               /*
+                * Now examine the needle.  For each character except the last one,
+                * set the corresponding table element to the appropriate skip
+                * distance.  Note that when two characters share the same skip table
+                * entry, the one later in the needle must determine the skip distance.
+                */
+               last = len2 - 1;
+
+               if (!state->use_wchar)
+               {
+                       const char *str2 = state->str2;
+
+                       for (i = 0; i < last; i++)
+                               state->skiptable[(unsigned char) str2[i] & skiptablemask] = last - i;
+               }
+               else
+               {
+                       const pg_wchar *wstr2 = state->wstr2;
+
+                       for (i = 0; i < last; i++)
+                               state->skiptable[wstr2[i] & skiptablemask] = last - i;
+               }
+       }
 }
 
 static int
 text_position_next(int start_pos, TextPositionState *state)
 {
-       int                     pos = 0,
-                               p,
-                               px;
+       int                     haystack_len = state->len1;
+       int                     needle_len = state->len2;
+       int                     skiptablemask = state->skiptablemask;
 
        Assert(start_pos > 0);          /* else caller error */
 
-       if (state->len2 <= 0)
+       if (needle_len <= 0)
                return start_pos;               /* result for empty pattern */
 
+       start_pos--;                            /* adjust for zero based arrays */
+
+       /* Done if the needle can't possibly fit */
+       if (haystack_len < start_pos + needle_len)
+               return 0;
+
        if (!state->use_wchar)
        {
                /* simple case - single byte encoding */
-               char       *p1 = state->str1;
-               char       *p2 = state->str2;
+               const char *haystack = state->str1;
+               const char *needle = state->str2;
+               const char *haystack_end = &haystack[haystack_len];
+               const char *hptr;
 
-               /* no use in searching str past point where search_str will fit */
-               px = (state->len1 - state->len2);
-
-               p1 += start_pos - 1;
+               if (needle_len == 1)
+               {
+                       /* No point in using B-M-H for a one-character needle */
+                       char    nchar = *needle;
 
-               for (p = start_pos - 1; p <= px; p++)
+                       hptr = &haystack[start_pos];
+                       while (hptr < haystack_end)
+                       {
+                               if (*hptr == nchar)
+                                       return hptr - haystack + 1;
+                               hptr++;
+                       }
+               }
+               else
                {
-                       if ((*p1 == *p2) && (strncmp(p1, p2, state->len2) == 0))
+                       const char *needle_last = &needle[needle_len - 1];
+
+                       /* Start at startpos plus the length of the needle */
+                       hptr = &haystack[start_pos + needle_len - 1];
+                       while (hptr < haystack_end)
                        {
-                               pos = p + 1;
-                               break;
+                               /* Match the needle scanning *backward* */
+                               const char *nptr;
+                               const char *p;
+
+                               nptr = needle_last;
+                               p = hptr;
+                               while (*nptr == *p)
+                               {
+                                       /* Matched it all?  If so, return 1-based position */
+                                       if (nptr == needle)
+                                               return p - haystack + 1;
+                                       nptr--, p--;
+                               }
+                               /*
+                                * No match, so use the haystack char at hptr to decide how
+                                * far to advance.  If the needle had any occurrence of that
+                                * character (or more precisely, one sharing the same
+                                * skiptable entry) before its last character, then we advance
+                                * far enough to align the last such needle character with
+                                * that haystack position.  Otherwise we can advance by the
+                                * whole needle length.
+                                */
+                               hptr += state->skiptable[(unsigned char) *hptr & skiptablemask];
                        }
-                       p1++;
                }
        }
        else
        {
-               /* not as simple - multibyte encoding */
-               pg_wchar   *p1 = state->wstr1;
-               pg_wchar   *p2 = state->wstr2;
+               /* The multibyte char version. This works exactly the same way. */
+               const pg_wchar *haystack = state->wstr1;
+               const pg_wchar *needle = state->wstr2;
+               const pg_wchar *haystack_end = &haystack[haystack_len];
+               const pg_wchar *hptr;
 
-               /* no use in searching str past point where search_str will fit */
-               px = (state->len1 - state->len2);
-
-               p1 += start_pos - 1;
+               if (needle_len == 1)
+               {
+                       /* No point in using B-M-H for a one-character needle */
+                       pg_wchar        nchar = *needle;
 
-               for (p = start_pos - 1; p <= px; p++)
+                       hptr = &haystack[start_pos];
+                       while (hptr < haystack_end)
+                       {
+                               if (*hptr == nchar)
+                                       return hptr - haystack + 1;
+                               hptr++;
+                       }
+               }
+               else
                {
-                       if ((*p1 == *p2) && (pg_wchar_strncmp(p1, p2, state->len2) == 0))
+                       const pg_wchar *needle_last = &needle[needle_len - 1];
+
+                       /* Start at startpos plus the length of the needle */
+                       hptr = &haystack[start_pos + needle_len - 1];
+                       while (hptr < haystack_end)
                        {
-                               pos = p + 1;
-                               break;
+                               /* Match the needle scanning *backward* */
+                               const pg_wchar *nptr;
+                               const pg_wchar *p;
+
+                               nptr = needle_last;
+                               p = hptr;
+                               while (*nptr == *p)
+                               {
+                                       /* Matched it all?  If so, return 1-based position */
+                                       if (nptr == needle)
+                                               return p - haystack + 1;
+                                       nptr--, p--;
+                               }
+                               /*
+                                * No match, so use the haystack char at hptr to decide how
+                                * far to advance.  If the needle had any occurrence of that
+                                * character (or more precisely, one sharing the same
+                                * skiptable entry) before its last character, then we advance
+                                * far enough to align the last such needle character with
+                                * that haystack position.  Otherwise we can advance by the
+                                * whole needle length.
+                                */
+                               hptr += state->skiptable[*hptr & skiptablemask];
                        }
-                       p1++;
                }
        }
 
-       return pos;
+       return 0;                                       /* not found */
 }
 
 static void