import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.xerial.core.XerialErrorCode;
import org.xerial.core.XerialException;
* @author leo
*
*/
-public class SilkLineFastParser
+public class SilkLineFastParser implements SilkLineParser
{
private static Logger _logger = Logger.getLogger(SilkLineFastParser.class);
}
private boolean foundEOF = false;
- private boolean noMoreJob = false;
+ private volatile boolean noMoreJob = false;
public void parse(SilkEventHandler handler) throws XerialException
{
{
// start up the reducer
Future<Void> reducerTask = threadManager.submit(new Reducer(handler));
-
- int workerCount = 0;
- foundEOF = false;
-
- while (!foundEOF)
+ try
{
- ArrayList<String> cache = new ArrayList<String>(config.numLinesInBlock);
- int lineCount = 0;
- while (lineCount < config.numLinesInBlock)
- {
+ int workerCount = 0;
+ foundEOF = false;
- String line = buffer.readLine();
- if (line == null)
+ while (!foundEOF)
+ {
+ ArrayList<String> cache = new ArrayList<String>(config.numLinesInBlock);
+ int lineCount = 0;
+ while (lineCount < config.numLinesInBlock)
{
- foundEOF = true;
- break;
+
+ String line = buffer.readLine();
+ if (line == null)
+ {
+ foundEOF = true;
+ break;
+ }
+ lineCount++;
+ cache.add(line);
}
- lineCount++;
- cache.add(line);
- }
- if (!cache.isEmpty())
- {
- // map the input
- Mapper map = new Mapper(workerCount++, cache);
+ if (!cache.isEmpty())
+ {
+ // map the input
+ Mapper map = new Mapper(workerCount++, cache);
- Future<ArrayDeque<SilkEvent>> future = threadManager.submit(map);
- eventContainer.put(future);
+ Future<ArrayDeque<SilkEvent>> future = threadManager.submit(map);
+ eventContainer.put(future);
+ }
}
- }
- // wake up the reducer thread
- noMoreJob = true;
- reducerTask.cancel(false);
+ // wake up the reducer thread
+ noMoreJob = true;
- }
- catch (IOException e)
- {
- throw new XerialException(XerialErrorCode.IO_EXCEPTION, e);
+ }
+ catch (IOException e)
+ {
+ reducerTask.cancel(true);
+ throw new XerialException(XerialErrorCode.IO_EXCEPTION, e);
+ }
+ finally
+ {
+ threadManager.shutdown();
+ while (!threadManager.awaitTermination(1, TimeUnit.MILLISECONDS))
+ {}
+ }
}
catch (InterruptedException e)
{
throw new XerialException(XerialErrorCode.INTERRUPTED, e);
}
- finally
- {
- threadManager.shutdown();
- }
}
// finished the parsing
cache.clear();
+ if (_logger.isTraceEnabled())
+ _logger.trace(String.format("finished workder=%d. event queue size = %d", lsn, eventQueue.size()));
+
return eventQueue;
}
*/
private class Reducer implements Callable<Void>
{
- private SilkEventHandler handler;
- private ArrayDeque<SilkEvent> eventQueue;
+ private final SilkEventHandler handler;
+ private ArrayDeque<SilkEvent> eventQueue = null;
private int eventCount = 0;
public Reducer(SilkEventHandler handler)
--- /dev/null
+/*--------------------------------------------------------------------------
+ * Copyright 2009 Taro L. Saito
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *--------------------------------------------------------------------------*/
+//--------------------------------------
+// XerialJ
+//
+// SilkPushParser.java
+// Since: Jun 5, 2009 3:17:54 PM
+//
+// $URL$
+// $Author$
+//--------------------------------------
+package org.xerial.silk;
+
+/**
+ * Push-parser interface
+ *
+ * @author leo
+ *
+ */
+public interface SilkLineParser
+{
+ public void parse(SilkEventHandler handler) throws Exception;
+}
* @author leo
*
*/
-public class SilkLinePushParser
+public class SilkLinePushParser implements SilkLineParser
{
private static Logger _logger = Logger.getLogger(SilkLinePushParser.class);
*/
public class SilkParser implements SilkEventHandler
{
- private static Logger _logger = Logger.getLogger(SilkPullParser.class);
+ private static Logger _logger = Logger.getLogger(SilkParser.class);
- private final SilkLinePushParser parser;
+ private final SilkLineParser parser;
private final SilkEnv parseContext;
private final SilkParserConfig config;
public SilkParser(Reader input, SilkEnv env, SilkParserConfig config) throws IOException
{
this.config = config;
- this.parser = new SilkLinePushParser(input);
+ if (config.numWorkers > 1)
+ this.parser = new SilkLineFastParser(input);
+ else
+ this.parser = new SilkLinePushParser(input);
this.parseContext = env;
}
public class SilkParserConfig
{
public int bufferSize = 1024 * 1024; // 1M
- public int numWorkers = 2;
+ public int numWorkers = 1;
public int numLinesInBlock = 1000;
}
*/\r
protected SilkPullParser(Reader input) throws IOException\r
{\r
- this(input, SilkEnv.newEnv());\r
+ this(input, SilkEnv.newEnv(), new SilkParserConfig());\r
+ }\r
+\r
+ public SilkPullParser(Reader input, SilkEnv env) throws IOException\r
+ {\r
+ this(input, env, new SilkParserConfig());\r
}\r
\r
/**\r
* @param env\r
* @throws IOException\r
*/\r
- public SilkPullParser(Reader input, SilkEnv env) throws IOException\r
+ public SilkPullParser(Reader input, SilkEnv env, SilkParserConfig config) throws IOException\r
{\r
- this.parser = new SilkParser(input, env);\r
+ this.parser = new SilkParser(input, env, config);\r
\r
this.threadManager = Executors.newFixedThreadPool(1);\r
threadManager.submit(new BackgroundParser());\r
\r
}\r
\r
+ /**\r
+ * Create a new reader for reading local resources\r
+ * \r
+ * @param resourceBasePath\r
+ * @param resourceName\r
+ * @throws IOException\r
+ */\r
+ public SilkPullParser(String resourceBasePath, String resourceName) throws IOException\r
+ {\r
+ this(new BufferedReader(new InputStreamReader(SilkWalker.class.getResourceAsStream(SilkParser.getResourcePath(\r
+ resourceBasePath, resourceName)))), SilkEnv.newEnv(resourceBasePath));\r
+ }\r
+\r
+ /**\r
+ * Create a new reader for reading the specified resource URL\r
+ * \r
+ * @param resourcePath\r
+ * @throws IOException\r
+ */\r
+ public SilkPullParser(URL resourcePath) throws IOException\r
+ {\r
+ this(resourcePath, SilkEnv.newEnv());\r
+ }\r
+\r
+ public SilkPullParser(URL resource, SilkEnv env) throws IOException\r
+ {\r
+ this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser\r
+ .getResourceBasePath(resource)));\r
+ }\r
+\r
+ public SilkPullParser(URL resource, SilkEnv env, SilkParserConfig config) throws IOException\r
+ {\r
+ this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser\r
+ .getResourceBasePath(resource)), config);\r
+ }\r
+\r
+ public SilkPullParser(URL resource, SilkParserConfig config) throws IOException\r
+ {\r
+ this(resource, SilkEnv.newEnv(), config);\r
+ }\r
+\r
private class BackgroundParser implements Callable<Void>\r
{\r
\r
\r
}\r
\r
- /**\r
- * Create a new reader for reading local resources\r
- * \r
- * @param resourceBasePath\r
- * @param resourceName\r
- * @throws IOException\r
- */\r
- public SilkPullParser(String resourceBasePath, String resourceName) throws IOException\r
- {\r
- this(new BufferedReader(new InputStreamReader(SilkWalker.class.getResourceAsStream(SilkParser.getResourcePath(\r
- resourceBasePath, resourceName)))), SilkEnv.newEnv(resourceBasePath));\r
- }\r
-\r
- /**\r
- * Create a new reader for reading the specified resource URL\r
- * \r
- * @param resourcePath\r
- * @throws IOException\r
- */\r
- public SilkPullParser(URL resourcePath) throws IOException\r
- {\r
- this(resourcePath, SilkEnv.newEnv());\r
- }\r
-\r
- public SilkPullParser(URL resource, SilkEnv env) throws IOException\r
- {\r
- this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser\r
- .getResourceBasePath(resource)));\r
- }\r
-\r
public TreeEvent peekNext() throws XerialException\r
{\r
if (hasNext())\r
super(new SilkPullParser(resourcePath));
}
+ public SilkWalker(URL resourcePath, SilkParserConfig config) throws IOException
+ {
+ super(new SilkPullParser(resourcePath, config));
+ }
+
public SilkWalker(URL resource, SilkEnv env) throws IOException
{
super(new SilkPullParser(resource, env));
--- /dev/null
+/*--------------------------------------------------------------------------
+ * Copyright 2009 Taro L. Saito
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *--------------------------------------------------------------------------*/
+//--------------------------------------
+// XerialJ
+//
+// SilkLineFastParserTest.java
+// Since: Jun 5, 2009 3:11:31 PM
+//
+// $URL$
+// $Author$
+//--------------------------------------
+package org.xerial.silk;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.xerial.core.XerialException;
+import org.xerial.util.FileResource;
+import org.xerial.util.bean.JSONStreamWalker;
+import org.xerial.util.log.Logger;
+
+public class SilkLineFastParserTest
+{
+ private static Logger _logger = Logger.getLogger(SilkLineFastParserTest.class);
+
+ @Before
+ public void setUp() throws Exception
+ {}
+
+ @After
+ public void tearDown() throws Exception
+ {}
+
+ public static void compare(String silkFile, String jsonFile) throws IOException, XerialException
+ {
+ SilkParserConfig config = new SilkParserConfig();
+ config.numWorkers = 2; // use SilkLineFastParser
+ SilkWalker walker = new SilkWalker(FileResource.find(SilkLineFastParserTest.class, silkFile), config);
+ TreeWalkLog l1 = new TreeWalkLog();
+ TreeWalkLog l2 = new TreeWalkLog();
+
+ walker.walk(l1);
+ _logger.debug(l1);
+
+ JSONStreamWalker jWalker = new JSONStreamWalker(FileResource.open(SilkWalkerTest.class, jsonFile));
+ jWalker.walk(l2);
+
+ Assert.assertTrue(TreeWalkLog.compare(l1, l2));
+ }
+
+ @Test
+ public void parse() throws Exception
+ {
+ compare("small.silk", "small.json");
+ }
+
+}
return l1;
}
- public void compare(String silkFile, String jsonFile) throws IOException, XerialException
+ public static void compare(String silkFile, String jsonFile) throws IOException, XerialException
{
SilkWalker walker = new SilkWalker(FileResource.find(SilkWalkerTest.class, silkFile));
TreeWalkLog l1 = new TreeWalkLog();