From: leo Date: Fri, 5 Jun 2009 07:11:41 +0000 (+0000) Subject: Fixed a bug in SilkLineFastParser: X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=62eba86e9d76eb09fdc4e90e4ea21b6a1422b812;p=xerial%2Fxerial-core.git Fixed a bug in SilkLineFastParser: * The parser did not await the termination of the reducer thread. git-svn-id: http://www.xerial.org/svn/project/XerialJ/trunk/xerial-core@3363 ae02f08e-27ec-0310-ae8c-8ba02fe2eafd --- diff --git a/src/main/java/org/xerial/silk/SilkLineFastParser.java b/src/main/java/org/xerial/silk/SilkLineFastParser.java index 7e1d85c..7f143e2 100644 --- a/src/main/java/org/xerial/silk/SilkLineFastParser.java +++ b/src/main/java/org/xerial/silk/SilkLineFastParser.java @@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.xerial.core.XerialErrorCode; import org.xerial.core.XerialException; @@ -52,7 +53,7 @@ import org.xerial.util.log.Logger; * @author leo * */ -public class SilkLineFastParser +public class SilkLineFastParser implements SilkLineParser { private static Logger _logger = Logger.getLogger(SilkLineFastParser.class); @@ -90,7 +91,7 @@ public class SilkLineFastParser } private boolean foundEOF = false; - private boolean noMoreJob = false; + private volatile boolean noMoreJob = false; public void parse(SilkEventHandler handler) throws XerialException { @@ -98,54 +99,58 @@ public class SilkLineFastParser { // start up the reducer Future reducerTask = threadManager.submit(new Reducer(handler)); - - int workerCount = 0; - foundEOF = false; - - while (!foundEOF) + try { - ArrayList cache = new ArrayList(config.numLinesInBlock); - int lineCount = 0; - while (lineCount < config.numLinesInBlock) - { + int workerCount = 0; + foundEOF = false; - String line = buffer.readLine(); - if (line == null) + while (!foundEOF) + { + ArrayList cache = new ArrayList(config.numLinesInBlock); + int lineCount = 0; + while (lineCount < config.numLinesInBlock) { - foundEOF = true; - break; + + String line = buffer.readLine(); + if (line == null) + { + foundEOF = true; + break; + } + lineCount++; + cache.add(line); } - lineCount++; - cache.add(line); - } - if (!cache.isEmpty()) - { - // map the input - Mapper map = new Mapper(workerCount++, cache); + if (!cache.isEmpty()) + { + // map the input + Mapper map = new Mapper(workerCount++, cache); - Future> future = threadManager.submit(map); - eventContainer.put(future); + Future> future = threadManager.submit(map); + eventContainer.put(future); + } } - } - // wake up the reducer thread - noMoreJob = true; - reducerTask.cancel(false); + // wake up the reducer thread + noMoreJob = true; - } - catch (IOException e) - { - throw new XerialException(XerialErrorCode.IO_EXCEPTION, e); + } + catch (IOException e) + { + reducerTask.cancel(true); + throw new XerialException(XerialErrorCode.IO_EXCEPTION, e); + } + finally + { + threadManager.shutdown(); + while (!threadManager.awaitTermination(1, TimeUnit.MILLISECONDS)) + {} + } } catch (InterruptedException e) { throw new XerialException(XerialErrorCode.INTERRUPTED, e); } - finally - { - threadManager.shutdown(); - } } @@ -187,6 +192,9 @@ public class SilkLineFastParser // finished the parsing cache.clear(); + if (_logger.isTraceEnabled()) + _logger.trace(String.format("finished workder=%d. event queue size = %d", lsn, eventQueue.size())); + return eventQueue; } @@ -200,8 +208,8 @@ public class SilkLineFastParser */ private class Reducer implements Callable { - private SilkEventHandler handler; - private ArrayDeque eventQueue; + private final SilkEventHandler handler; + private ArrayDeque eventQueue = null; private int eventCount = 0; public Reducer(SilkEventHandler handler) diff --git a/src/main/java/org/xerial/silk/SilkLineParser.java b/src/main/java/org/xerial/silk/SilkLineParser.java new file mode 100644 index 0000000..9974727 --- /dev/null +++ b/src/main/java/org/xerial/silk/SilkLineParser.java @@ -0,0 +1,36 @@ +/*-------------------------------------------------------------------------- + * Copyright 2009 Taro L. Saito + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *--------------------------------------------------------------------------*/ +//-------------------------------------- +// XerialJ +// +// SilkPushParser.java +// Since: Jun 5, 2009 3:17:54 PM +// +// $URL$ +// $Author$ +//-------------------------------------- +package org.xerial.silk; + +/** + * Push-parser interface + * + * @author leo + * + */ +public interface SilkLineParser +{ + public void parse(SilkEventHandler handler) throws Exception; +} diff --git a/src/main/java/org/xerial/silk/SilkLinePushParser.java b/src/main/java/org/xerial/silk/SilkLinePushParser.java index 61548af..431f63a 100644 --- a/src/main/java/org/xerial/silk/SilkLinePushParser.java +++ b/src/main/java/org/xerial/silk/SilkLinePushParser.java @@ -54,7 +54,7 @@ import org.xerial.util.log.Logger; * @author leo * */ -public class SilkLinePushParser +public class SilkLinePushParser implements SilkLineParser { private static Logger _logger = Logger.getLogger(SilkLinePushParser.class); diff --git a/src/main/java/org/xerial/silk/SilkParser.java b/src/main/java/org/xerial/silk/SilkParser.java index 12bd941..d6eb7c3 100644 --- a/src/main/java/org/xerial/silk/SilkParser.java +++ b/src/main/java/org/xerial/silk/SilkParser.java @@ -73,9 +73,9 @@ import org.xerial.util.xml.impl.TreeEventQueue; */ public class SilkParser implements SilkEventHandler { - private static Logger _logger = Logger.getLogger(SilkPullParser.class); + private static Logger _logger = Logger.getLogger(SilkParser.class); - private final SilkLinePushParser parser; + private final SilkLineParser parser; private final SilkEnv parseContext; private final SilkParserConfig config; @@ -134,7 +134,10 @@ public class SilkParser implements SilkEventHandler public SilkParser(Reader input, SilkEnv env, SilkParserConfig config) throws IOException { this.config = config; - this.parser = new SilkLinePushParser(input); + if (config.numWorkers > 1) + this.parser = new SilkLineFastParser(input); + else + this.parser = new SilkLinePushParser(input); this.parseContext = env; } diff --git a/src/main/java/org/xerial/silk/SilkParserConfig.java b/src/main/java/org/xerial/silk/SilkParserConfig.java index 566352d..9225343 100644 --- a/src/main/java/org/xerial/silk/SilkParserConfig.java +++ b/src/main/java/org/xerial/silk/SilkParserConfig.java @@ -33,7 +33,7 @@ package org.xerial.silk; public class SilkParserConfig { public int bufferSize = 1024 * 1024; // 1M - public int numWorkers = 2; + public int numWorkers = 1; public int numLinesInBlock = 1000; } diff --git a/src/main/java/org/xerial/silk/SilkPullParser.java b/src/main/java/org/xerial/silk/SilkPullParser.java index 0f02443..182d504 100644 --- a/src/main/java/org/xerial/silk/SilkPullParser.java +++ b/src/main/java/org/xerial/silk/SilkPullParser.java @@ -81,7 +81,12 @@ public class SilkPullParser implements TreeStreamReader */ protected SilkPullParser(Reader input) throws IOException { - this(input, SilkEnv.newEnv()); + this(input, SilkEnv.newEnv(), new SilkParserConfig()); + } + + public SilkPullParser(Reader input, SilkEnv env) throws IOException + { + this(input, env, new SilkParserConfig()); } /** @@ -91,15 +96,56 @@ public class SilkPullParser implements TreeStreamReader * @param env * @throws IOException */ - public SilkPullParser(Reader input, SilkEnv env) throws IOException + public SilkPullParser(Reader input, SilkEnv env, SilkParserConfig config) throws IOException { - this.parser = new SilkParser(input, env); + this.parser = new SilkParser(input, env, config); this.threadManager = Executors.newFixedThreadPool(1); threadManager.submit(new BackgroundParser()); } + /** + * Create a new reader for reading local resources + * + * @param resourceBasePath + * @param resourceName + * @throws IOException + */ + public SilkPullParser(String resourceBasePath, String resourceName) throws IOException + { + this(new BufferedReader(new InputStreamReader(SilkWalker.class.getResourceAsStream(SilkParser.getResourcePath( + resourceBasePath, resourceName)))), SilkEnv.newEnv(resourceBasePath)); + } + + /** + * Create a new reader for reading the specified resource URL + * + * @param resourcePath + * @throws IOException + */ + public SilkPullParser(URL resourcePath) throws IOException + { + this(resourcePath, SilkEnv.newEnv()); + } + + public SilkPullParser(URL resource, SilkEnv env) throws IOException + { + this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser + .getResourceBasePath(resource))); + } + + public SilkPullParser(URL resource, SilkEnv env, SilkParserConfig config) throws IOException + { + this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser + .getResourceBasePath(resource)), config); + } + + public SilkPullParser(URL resource, SilkParserConfig config) throws IOException + { + this(resource, SilkEnv.newEnv(), config); + } + private class BackgroundParser implements Callable { @@ -145,36 +191,6 @@ public class SilkPullParser implements TreeStreamReader } - /** - * Create a new reader for reading local resources - * - * @param resourceBasePath - * @param resourceName - * @throws IOException - */ - public SilkPullParser(String resourceBasePath, String resourceName) throws IOException - { - this(new BufferedReader(new InputStreamReader(SilkWalker.class.getResourceAsStream(SilkParser.getResourcePath( - resourceBasePath, resourceName)))), SilkEnv.newEnv(resourceBasePath)); - } - - /** - * Create a new reader for reading the specified resource URL - * - * @param resourcePath - * @throws IOException - */ - public SilkPullParser(URL resourcePath) throws IOException - { - this(resourcePath, SilkEnv.newEnv()); - } - - public SilkPullParser(URL resource, SilkEnv env) throws IOException - { - this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser - .getResourceBasePath(resource))); - } - public TreeEvent peekNext() throws XerialException { if (hasNext()) diff --git a/src/main/java/org/xerial/silk/SilkWalker.java b/src/main/java/org/xerial/silk/SilkWalker.java index 460cd86..0d628fa 100644 --- a/src/main/java/org/xerial/silk/SilkWalker.java +++ b/src/main/java/org/xerial/silk/SilkWalker.java @@ -73,6 +73,11 @@ public class SilkWalker extends TreeWalkerImpl super(new SilkPullParser(resourcePath)); } + public SilkWalker(URL resourcePath, SilkParserConfig config) throws IOException + { + super(new SilkPullParser(resourcePath, config)); + } + public SilkWalker(URL resource, SilkEnv env) throws IOException { super(new SilkPullParser(resource, env)); diff --git a/src/test/java/org/xerial/silk/SilkLineFastParserTest.java b/src/test/java/org/xerial/silk/SilkLineFastParserTest.java new file mode 100644 index 0000000..f609602 --- /dev/null +++ b/src/test/java/org/xerial/silk/SilkLineFastParserTest.java @@ -0,0 +1,73 @@ +/*-------------------------------------------------------------------------- + * Copyright 2009 Taro L. Saito + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *--------------------------------------------------------------------------*/ +//-------------------------------------- +// XerialJ +// +// SilkLineFastParserTest.java +// Since: Jun 5, 2009 3:11:31 PM +// +// $URL$ +// $Author$ +//-------------------------------------- +package org.xerial.silk; + +import java.io.IOException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.xerial.core.XerialException; +import org.xerial.util.FileResource; +import org.xerial.util.bean.JSONStreamWalker; +import org.xerial.util.log.Logger; + +public class SilkLineFastParserTest +{ + private static Logger _logger = Logger.getLogger(SilkLineFastParserTest.class); + + @Before + public void setUp() throws Exception + {} + + @After + public void tearDown() throws Exception + {} + + public static void compare(String silkFile, String jsonFile) throws IOException, XerialException + { + SilkParserConfig config = new SilkParserConfig(); + config.numWorkers = 2; // use SilkLineFastParser + SilkWalker walker = new SilkWalker(FileResource.find(SilkLineFastParserTest.class, silkFile), config); + TreeWalkLog l1 = new TreeWalkLog(); + TreeWalkLog l2 = new TreeWalkLog(); + + walker.walk(l1); + _logger.debug(l1); + + JSONStreamWalker jWalker = new JSONStreamWalker(FileResource.open(SilkWalkerTest.class, jsonFile)); + jWalker.walk(l2); + + Assert.assertTrue(TreeWalkLog.compare(l1, l2)); + } + + @Test + public void parse() throws Exception + { + compare("small.silk", "small.json"); + } + +} diff --git a/src/test/java/org/xerial/silk/SilkWalkerTest.java b/src/test/java/org/xerial/silk/SilkWalkerTest.java index fedbb07..2f6293b 100644 --- a/src/test/java/org/xerial/silk/SilkWalkerTest.java +++ b/src/test/java/org/xerial/silk/SilkWalkerTest.java @@ -61,7 +61,7 @@ public class SilkWalkerTest return l1; } - public void compare(String silkFile, String jsonFile) throws IOException, XerialException + public static void compare(String silkFile, String jsonFile) throws IOException, XerialException { SilkWalker walker = new SilkWalker(FileResource.find(SilkWalkerTest.class, silkFile)); TreeWalkLog l1 = new TreeWalkLog();