OSDN Git Service

ns_search: add update index related functions
[newslash/newslash.git] / src / ns_search / newslash_index / index.py
index a90e675..7cca1b0 100644 (file)
@@ -5,6 +5,10 @@ import lucene_wrapper
 import newslash_db
 import htmlutil
 import calendar
+from datetime import datetime
+
+from mysql.connector.errors import ProgrammingError
+import exceptions
 
 class Index(object):
     def __init__(self, **kwargs):
@@ -32,20 +36,153 @@ class Index(object):
     def _get_searcher(self):
         return lucene_wrapper.Searcher(index_directory=self.config("SearchIndex", "path"))
 
-    def update_all_stories(self, batch_size=1000, progress_cb=None, error_cb=None):
-        '''update index for all stories'''
+    def _db_config(self):
         db_cfg = self.config("Database")
         if db_cfg is None:
             raise ConfigFileError(ConfigFileError.SECTION_NOT_FOUND, "Database")
+        return db_cfg
+
+    def create_metadata_table(self):
+        sql = (
+            "CREATE TABLE ns_search_metadata ("
+            "  target_id    tinyint(8) unsigned NOT NULL AUTO_INCREMENT,"
+            "  target_name  varchar(32)         NOT NULL UNIQUE,"
+            "  last_update  timestamp           NOT NULL DEFAULT CURRENT_TIMESTAMP,"
+            "  latest_id    int(8) unsigned     NOT NULL DEFAULT 0,"
+            "  PRIMARY KEY (target_id)"
+            ")"
+        )
+
+        db= newslash_db.NewslashDB(self._db_config())
+        try:
+            db.execute(sql)
+        except ProgrammingError as e:
+            db.close()
+            raise exceptions.DatabaseError('table creation error: {}'.format(str(e)))
+
+        db.close()
+
+    def get_metadata(self, target):
+        sql = 'SELECT * from ns_search_metadata WHERE target_name = %(target)s'
+        db = newslash_db.NewslashDB(self._db_config())
+        cur = db.execute(sql, target=target)
+        if len(cur) > 0:
+            result = dict(zip(cur.column_names, cur[0]))
+        else:
+            result = None
+
+        db.close()
+        return result
+
+    def update_metadata(self, target, last_update, latest_id):
+        sql = (
+            "INSERT INTO ns_search_metadata"
+            "  (target_name, last_update, latest_id)"
+            "  VALUES (%(target)s, %(last_update)s, %(latest_id)s)"
+            "  ON DUPLICATE KEY UPDATE"
+            "    last_update = %(last_update),"
+            "    latest_id = %(latest_id)"
+        )
+
+        db = newslash_db.NewslashDB(self._db_config())
+        cur = db.execute(sql, target=target, last_update=last_update, latest_id=latest_id)
+        db.close()
+
+    def recreate(self, target):
+        '''destroy current index then create new one for target'''
+        pass
+
+    def update_story(self, target, batch_size=1000, progress_cb=None, error_cb=None):
+        '''update story index'''
+        stories = newslash_db.Stories(self._db_config())
+        query_done = False
+
+        # at first, get last indexed id  and timestamp
+        metadata = self.get_metadata('stories')
+        latest_id = metadata.get('latest_id', 0)
+        last_update = metadata.get('last_update')
+
+        # add new stories to index
+        start_update = datetime.now()
+        success = 0
+        errors = 0
+        offset = 0
+        max_stoid = 0
+        with lucene_wrapper.Indexer(index_directory=self.config("SearchIndex", "path")) as indexer:
+            while not query_done:
+                # repeat process
+                items = stories.select(limit=batch_size, offset=offset, stoid_gt=latest_id)
+                offset += len(items)
+                if len(items) < batch_size:
+                    query_done = True
+
+                for item in items:
+                    try:
+                        doc = self._make_story_document(item)
+                    except DocumentMakingError:
+                        errors += 1
+                        if error_cb is not None:
+                            error_cb('add', item)
+                        continue
+
+                    indexer.add(doc)
+                    success += 1
+
+                if progress_cb is not None:
+                    progress_cb('add', success, errors)
+
+                for item in items:
+                    if item["stoid"] > max_stoid:
+                        max_stoid = item["stoid"];
+
+        # update index for updated stories
+        update_success = 0
+        update_errors = 0
+        with lucene_wrapper.Indexer(index_directory=self.config("SearchIndex", "path")) as indexer:
+            items = stories.select(stoid_gt=latest_id, last_update_gt=last_update)
+            for item in items:
+                # first, create term to identify target document
+                target_id = self._get_primary_id(target, item)
+                term = lucene_wrapper.BooleanQuery()
+                term.add_must(lucene_wrapper.TermQuery("type", target))
+                term.add_must(lucene_wrapper.TermQuery("id", target_id))
+
+                try:
+                    doc = self._make_story_document(item)
+                except DocumentMakingError:
+                    errors += 1
+                    if error_cb is not None:
+                        error_cb('update', item)
+                    continue
+                indexer.update(term, doc)
+
+        if progress_cb is not None:
+            progress_cb('update', update_success, update_errors)
+
+        success += update_success
+        errors += update_errors
 
-        #start_time = time.time()
-        stories = newslash_db.Stories(db_cfg)
+        # update metadata
+        self.update_metadata(target='stories', last_update=start_update, latest_id=max_stoid)
+
+        # done
+        return (success, errors)
+
+    def _get_primary_id(target, item):
+        if target == 'stories':
+            return item['stoid']
+
+        return None
+    
+    def update_all_stories(self, batch_size=1000, progress_cb=None, error_cb=None):
+        '''update index for all stories'''
+        stories = newslash_db.Stories(self._db_config())
         query_done = False
 
         success = 0
         errors = 0
         offset = 0
-        with lucene_wrapper.Indexer(index_directory="./lucene_index") as indexer:
+        with lucene_wrapper.Indexer(index_directory=self.config("SearchIndex", "path")) as indexer:
             while not query_done:
                 # repeat process for each 1000 items
                 items = stories.select(limit=batch_size, offset=offset)
@@ -59,17 +196,15 @@ class Index(object):
                     except DocumentMakingError:
                         errors += 1
                         if error_cb is not None:
-                            error_cb(item)
+                            error_cb('add', item)
                         continue
-
                     indexer.add(doc)
                     success += 1
 
                 if progress_cb is not None:
-                    progress_cb(success, errors)
+                    progress_cb('add', success, errors)
 
         return (success, errors)
-        
 
     def _make_story_document(self, item):
         '''make Document object from query result'''
@@ -77,7 +212,11 @@ class Index(object):
         if item["time"] is None:
             raise DocumentMakingError()
 
+        # convert datetime to UNIX timestamp
         timestamp = calendar.timegm(item["time"].utctimetuple())
+        last_update = calendar.timegm(item["last_update"].utctimetuple())
+
+        # prepare intro-/body-text, url
         introtext = item["introtext"] or ""
         bodytext = item["bodytext"] or ""
         (content_text, urls) = htmlutil.strip_html_tag(introtext + bodytext)
@@ -90,6 +229,7 @@ class Index(object):
         doc.add_text_field("dept", item["dept"])
 
         doc.add_int_field("create_time", timestamp)
+        doc.add_int_field("last_update", last_update)
         doc.add_int_field("topic", item["tid"])
         doc.add_int_field("author", item["uid"])
         doc.add_int_field("submitter", item["submitter"])
@@ -116,16 +256,3 @@ class Index(object):
         return result[0]
         
         
-class NewslashIndexError(Exception):
-    pass
-
-
-class ConfigFileError(NewslashIndexError):
-    SECTION_NOT_FOUND = "Section not found"
-    def __init__(self, reason, section=""):
-        self.message = "Config Error - {}: {}".format(reason, section)
-
-
-class DocumentMakingError(NewslashIndexError):
-    def __init__(self, message="error while document making"):
-        self.message = message