OSDN Git Service

Fixed a bug in SilkLineFastParser:
authorleo <leo@ae02f08e-27ec-0310-ae8c-8ba02fe2eafd>
Fri, 5 Jun 2009 07:11:41 +0000 (07:11 +0000)
committerleo <leo@ae02f08e-27ec-0310-ae8c-8ba02fe2eafd>
Fri, 5 Jun 2009 07:11:41 +0000 (07:11 +0000)
 * The parser did not await the termination of the reducer thread.

git-svn-id: http://www.xerial.org/svn/project/XerialJ/trunk/xerial-core@3363 ae02f08e-27ec-0310-ae8c-8ba02fe2eafd

src/main/java/org/xerial/silk/SilkLineFastParser.java
src/main/java/org/xerial/silk/SilkLineParser.java [new file with mode: 0644]
src/main/java/org/xerial/silk/SilkLinePushParser.java
src/main/java/org/xerial/silk/SilkParser.java
src/main/java/org/xerial/silk/SilkParserConfig.java
src/main/java/org/xerial/silk/SilkPullParser.java
src/main/java/org/xerial/silk/SilkWalker.java
src/test/java/org/xerial/silk/SilkLineFastParserTest.java [new file with mode: 0644]
src/test/java/org/xerial/silk/SilkWalkerTest.java

index 7e1d85c..7f143e2 100644 (file)
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.xerial.core.XerialErrorCode;
 import org.xerial.core.XerialException;
@@ -52,7 +53,7 @@ import org.xerial.util.log.Logger;
  * @author leo
  * 
  */
-public class SilkLineFastParser
+public class SilkLineFastParser implements SilkLineParser
 {
     private static Logger _logger = Logger.getLogger(SilkLineFastParser.class);
 
@@ -90,7 +91,7 @@ public class SilkLineFastParser
     }
 
     private boolean foundEOF = false;
-    private boolean noMoreJob = false;
+    private volatile boolean noMoreJob = false;
 
     public void parse(SilkEventHandler handler) throws XerialException
     {
@@ -98,54 +99,58 @@ public class SilkLineFastParser
         {
             // start up the reducer
             Future<Void> reducerTask = threadManager.submit(new Reducer(handler));
-
-            int workerCount = 0;
-            foundEOF = false;
-
-            while (!foundEOF)
+            try
             {
-                ArrayList<String> cache = new ArrayList<String>(config.numLinesInBlock);
-                int lineCount = 0;
-                while (lineCount < config.numLinesInBlock)
-                {
+                int workerCount = 0;
+                foundEOF = false;
 
-                    String line = buffer.readLine();
-                    if (line == null)
+                while (!foundEOF)
+                {
+                    ArrayList<String> cache = new ArrayList<String>(config.numLinesInBlock);
+                    int lineCount = 0;
+                    while (lineCount < config.numLinesInBlock)
                     {
-                        foundEOF = true;
-                        break;
+
+                        String line = buffer.readLine();
+                        if (line == null)
+                        {
+                            foundEOF = true;
+                            break;
+                        }
+                        lineCount++;
+                        cache.add(line);
                     }
-                    lineCount++;
-                    cache.add(line);
-                }
 
-                if (!cache.isEmpty())
-                {
-                    // map the input
-                    Mapper map = new Mapper(workerCount++, cache);
+                    if (!cache.isEmpty())
+                    {
+                        // map the input
+                        Mapper map = new Mapper(workerCount++, cache);
 
-                    Future<ArrayDeque<SilkEvent>> future = threadManager.submit(map);
-                    eventContainer.put(future);
+                        Future<ArrayDeque<SilkEvent>> future = threadManager.submit(map);
+                        eventContainer.put(future);
+                    }
                 }
-            }
 
-            // wake up the reducer thread
-            noMoreJob = true;
-            reducerTask.cancel(false);
+                // wake up the reducer thread
+                noMoreJob = true;
 
-        }
-        catch (IOException e)
-        {
-            throw new XerialException(XerialErrorCode.IO_EXCEPTION, e);
+            }
+            catch (IOException e)
+            {
+                reducerTask.cancel(true);
+                throw new XerialException(XerialErrorCode.IO_EXCEPTION, e);
+            }
+            finally
+            {
+                threadManager.shutdown();
+                while (!threadManager.awaitTermination(1, TimeUnit.MILLISECONDS))
+                {}
+            }
         }
         catch (InterruptedException e)
         {
             throw new XerialException(XerialErrorCode.INTERRUPTED, e);
         }
-        finally
-        {
-            threadManager.shutdown();
-        }
 
     }
 
@@ -187,6 +192,9 @@ public class SilkLineFastParser
             // finished the parsing
             cache.clear();
 
+            if (_logger.isTraceEnabled())
+                _logger.trace(String.format("finished workder=%d. event queue size = %d", lsn, eventQueue.size()));
+
             return eventQueue;
         }
 
@@ -200,8 +208,8 @@ public class SilkLineFastParser
      */
     private class Reducer implements Callable<Void>
     {
-        private SilkEventHandler handler;
-        private ArrayDeque<SilkEvent> eventQueue;
+        private final SilkEventHandler handler;
+        private ArrayDeque<SilkEvent> eventQueue = null;
         private int eventCount = 0;
 
         public Reducer(SilkEventHandler handler)
diff --git a/src/main/java/org/xerial/silk/SilkLineParser.java b/src/main/java/org/xerial/silk/SilkLineParser.java
new file mode 100644 (file)
index 0000000..9974727
--- /dev/null
@@ -0,0 +1,36 @@
+/*--------------------------------------------------------------------------
+ *  Copyright 2009 Taro L. Saito
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *--------------------------------------------------------------------------*/
+//--------------------------------------
+// XerialJ
+//
+// SilkPushParser.java
+// Since: Jun 5, 2009 3:17:54 PM
+//
+// $URL$
+// $Author$
+//--------------------------------------
+package org.xerial.silk;
+
+/**
+ * Push-parser interface
+ * 
+ * @author leo
+ * 
+ */
+public interface SilkLineParser
+{
+    public void parse(SilkEventHandler handler) throws Exception;
+}
index 61548af..431f63a 100644 (file)
@@ -54,7 +54,7 @@ import org.xerial.util.log.Logger;
  * @author leo
  * 
  */
-public class SilkLinePushParser
+public class SilkLinePushParser implements SilkLineParser
 {
     private static Logger _logger = Logger.getLogger(SilkLinePushParser.class);
 
index 12bd941..d6eb7c3 100644 (file)
@@ -73,9 +73,9 @@ import org.xerial.util.xml.impl.TreeEventQueue;
  */
 public class SilkParser implements SilkEventHandler
 {
-    private static Logger _logger = Logger.getLogger(SilkPullParser.class);
+    private static Logger _logger = Logger.getLogger(SilkParser.class);
 
-    private final SilkLinePushParser parser;
+    private final SilkLineParser parser;
     private final SilkEnv parseContext;
     private final SilkParserConfig config;
 
@@ -134,7 +134,10 @@ public class SilkParser implements SilkEventHandler
     public SilkParser(Reader input, SilkEnv env, SilkParserConfig config) throws IOException
     {
         this.config = config;
-        this.parser = new SilkLinePushParser(input);
+        if (config.numWorkers > 1)
+            this.parser = new SilkLineFastParser(input);
+        else
+            this.parser = new SilkLinePushParser(input);
         this.parseContext = env;
     }
 
index 566352d..9225343 100644 (file)
@@ -33,7 +33,7 @@ package org.xerial.silk;
 public class SilkParserConfig
 {
     public int bufferSize = 1024 * 1024; // 1M
-    public int numWorkers = 2;
+    public int numWorkers = 1;
     public int numLinesInBlock = 1000;
 
 }
index 0f02443..182d504 100644 (file)
@@ -81,7 +81,12 @@ public class SilkPullParser implements TreeStreamReader
      */\r
     protected SilkPullParser(Reader input) throws IOException\r
     {\r
-        this(input, SilkEnv.newEnv());\r
+        this(input, SilkEnv.newEnv(), new SilkParserConfig());\r
+    }\r
+\r
+    public SilkPullParser(Reader input, SilkEnv env) throws IOException\r
+    {\r
+        this(input, env, new SilkParserConfig());\r
     }\r
 \r
     /**\r
@@ -91,15 +96,56 @@ public class SilkPullParser implements TreeStreamReader
      * @param env\r
      * @throws IOException\r
      */\r
-    public SilkPullParser(Reader input, SilkEnv env) throws IOException\r
+    public SilkPullParser(Reader input, SilkEnv env, SilkParserConfig config) throws IOException\r
     {\r
-        this.parser = new SilkParser(input, env);\r
+        this.parser = new SilkParser(input, env, config);\r
 \r
         this.threadManager = Executors.newFixedThreadPool(1);\r
         threadManager.submit(new BackgroundParser());\r
 \r
     }\r
 \r
+    /**\r
+     * Create a new reader for reading local resources\r
+     * \r
+     * @param resourceBasePath\r
+     * @param resourceName\r
+     * @throws IOException\r
+     */\r
+    public SilkPullParser(String resourceBasePath, String resourceName) throws IOException\r
+    {\r
+        this(new BufferedReader(new InputStreamReader(SilkWalker.class.getResourceAsStream(SilkParser.getResourcePath(\r
+                resourceBasePath, resourceName)))), SilkEnv.newEnv(resourceBasePath));\r
+    }\r
+\r
+    /**\r
+     * Create a new reader for reading the specified resource URL\r
+     * \r
+     * @param resourcePath\r
+     * @throws IOException\r
+     */\r
+    public SilkPullParser(URL resourcePath) throws IOException\r
+    {\r
+        this(resourcePath, SilkEnv.newEnv());\r
+    }\r
+\r
+    public SilkPullParser(URL resource, SilkEnv env) throws IOException\r
+    {\r
+        this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser\r
+                .getResourceBasePath(resource)));\r
+    }\r
+\r
+    public SilkPullParser(URL resource, SilkEnv env, SilkParserConfig config) throws IOException\r
+    {\r
+        this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser\r
+                .getResourceBasePath(resource)), config);\r
+    }\r
+\r
+    public SilkPullParser(URL resource, SilkParserConfig config) throws IOException\r
+    {\r
+        this(resource, SilkEnv.newEnv(), config);\r
+    }\r
+\r
     private class BackgroundParser implements Callable<Void>\r
     {\r
 \r
@@ -145,36 +191,6 @@ public class SilkPullParser implements TreeStreamReader
 \r
     }\r
 \r
-    /**\r
-     * Create a new reader for reading local resources\r
-     * \r
-     * @param resourceBasePath\r
-     * @param resourceName\r
-     * @throws IOException\r
-     */\r
-    public SilkPullParser(String resourceBasePath, String resourceName) throws IOException\r
-    {\r
-        this(new BufferedReader(new InputStreamReader(SilkWalker.class.getResourceAsStream(SilkParser.getResourcePath(\r
-                resourceBasePath, resourceName)))), SilkEnv.newEnv(resourceBasePath));\r
-    }\r
-\r
-    /**\r
-     * Create a new reader for reading the specified resource URL\r
-     * \r
-     * @param resourcePath\r
-     * @throws IOException\r
-     */\r
-    public SilkPullParser(URL resourcePath) throws IOException\r
-    {\r
-        this(resourcePath, SilkEnv.newEnv());\r
-    }\r
-\r
-    public SilkPullParser(URL resource, SilkEnv env) throws IOException\r
-    {\r
-        this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser\r
-                .getResourceBasePath(resource)));\r
-    }\r
-\r
     public TreeEvent peekNext() throws XerialException\r
     {\r
         if (hasNext())\r
index 460cd86..0d628fa 100644 (file)
@@ -73,6 +73,11 @@ public class SilkWalker extends TreeWalkerImpl
         super(new SilkPullParser(resourcePath));
     }
 
+    public SilkWalker(URL resourcePath, SilkParserConfig config) throws IOException
+    {
+        super(new SilkPullParser(resourcePath, config));
+    }
+
     public SilkWalker(URL resource, SilkEnv env) throws IOException
     {
         super(new SilkPullParser(resource, env));
diff --git a/src/test/java/org/xerial/silk/SilkLineFastParserTest.java b/src/test/java/org/xerial/silk/SilkLineFastParserTest.java
new file mode 100644 (file)
index 0000000..f609602
--- /dev/null
@@ -0,0 +1,73 @@
+/*--------------------------------------------------------------------------
+ *  Copyright 2009 Taro L. Saito
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *--------------------------------------------------------------------------*/
+//--------------------------------------
+// XerialJ
+//
+// SilkLineFastParserTest.java
+// Since: Jun 5, 2009 3:11:31 PM
+//
+// $URL$
+// $Author$
+//--------------------------------------
+package org.xerial.silk;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.xerial.core.XerialException;
+import org.xerial.util.FileResource;
+import org.xerial.util.bean.JSONStreamWalker;
+import org.xerial.util.log.Logger;
+
+public class SilkLineFastParserTest
+{
+    private static Logger _logger = Logger.getLogger(SilkLineFastParserTest.class);
+
+    @Before
+    public void setUp() throws Exception
+    {}
+
+    @After
+    public void tearDown() throws Exception
+    {}
+
+    public static void compare(String silkFile, String jsonFile) throws IOException, XerialException
+    {
+        SilkParserConfig config = new SilkParserConfig();
+        config.numWorkers = 2; // use SilkLineFastParser
+        SilkWalker walker = new SilkWalker(FileResource.find(SilkLineFastParserTest.class, silkFile), config);
+        TreeWalkLog l1 = new TreeWalkLog();
+        TreeWalkLog l2 = new TreeWalkLog();
+
+        walker.walk(l1);
+        _logger.debug(l1);
+
+        JSONStreamWalker jWalker = new JSONStreamWalker(FileResource.open(SilkWalkerTest.class, jsonFile));
+        jWalker.walk(l2);
+
+        Assert.assertTrue(TreeWalkLog.compare(l1, l2));
+    }
+
+    @Test
+    public void parse() throws Exception
+    {
+        compare("small.silk", "small.json");
+    }
+
+}
index fedbb07..2f6293b 100644 (file)
@@ -61,7 +61,7 @@ public class SilkWalkerTest
         return l1;
     }
 
-    public void compare(String silkFile, String jsonFile) throws IOException, XerialException
+    public static void compare(String silkFile, String jsonFile) throws IOException, XerialException
     {
         SilkWalker walker = new SilkWalker(FileResource.find(SilkWalkerTest.class, silkFile));
         TreeWalkLog l1 = new TreeWalkLog();