From 2ca78e55a675a88dfda240f113c034cab6c13a77 Mon Sep 17 00:00:00 2001 From: leo Date: Wed, 22 Apr 2009 19:34:40 +0000 Subject: [PATCH] git-svn-id: http://www.xerial.org/svn/project/XerialJ/trunk/xerial-core@3252 ae02f08e-27ec-0310-ae8c-8ba02fe2eafd --- src/main/java/org/xerial/core/XerialErrorCode.java | 3 +- src/main/java/org/xerial/silk/SilkPullParser.java | 368 ++++++++++++--------- .../java/org/xerial/silk/SilkStreamReaderTest.java | 4 +- 3 files changed, 215 insertions(+), 160 deletions(-) diff --git a/src/main/java/org/xerial/core/XerialErrorCode.java b/src/main/java/org/xerial/core/XerialErrorCode.java index 36cc59a..d8cf778 100644 --- a/src/main/java/org/xerial/core/XerialErrorCode.java +++ b/src/main/java/org/xerial/core/XerialErrorCode.java @@ -49,6 +49,7 @@ public enum XerialErrorCode implements ErrorCode { READ_ERROR, OUTPUT_ERROR, IO_EXCEPTION, + INTERRUPTED, // option parser error DUPLICATE_OPTION, @@ -66,7 +67,7 @@ public enum XerialErrorCode implements ErrorCode { // parse error INVALID_TOKEN, - PARSE_ERROR + PARSE_ERROR, ; diff --git a/src/main/java/org/xerial/silk/SilkPullParser.java b/src/main/java/org/xerial/silk/SilkPullParser.java index 69af7b8..6dff25a 100644 --- a/src/main/java/org/xerial/silk/SilkPullParser.java +++ b/src/main/java/org/xerial/silk/SilkPullParser.java @@ -29,6 +29,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,6 +39,7 @@ import org.antlr.runtime.ANTLRStringStream; import org.antlr.runtime.CommonTokenStream; import org.antlr.runtime.RecognitionException; import org.antlr.runtime.tree.Tree; +import org.xerial.core.XerialError; import org.xerial.core.XerialErrorCode; import org.xerial.core.XerialException; import org.xerial.silk.impl.SilkDataLine; @@ -63,14 +67,12 @@ public class SilkPullParser private static final SilkEvent EOFEvent = new SilkEvent(SilkEventType.END_OF_FILE, null); private static final SilkEvent BlankLineEvent = new SilkEvent(SilkEventType.BLANK_LINE, null); - private final SilkLexer lexer; - private final SilkParser parser; private int lineCount = 0; - private final BufferedReader buffer; - + private final int eventQueueMax = 10000; private boolean foundEOF = false; - private ArrayDeque eventQueue = new ArrayDeque(); + + private ArrayBlockingQueue eventQueue = new ArrayBlockingQueue(eventQueueMax); /** * SilkEvents @@ -96,6 +98,8 @@ public class SilkPullParser } } + private ExecutorService threadPool; + public SilkPullParser(InputStream input) throws IOException { this(new InputStreamReader(input)); @@ -103,44 +107,225 @@ public class SilkPullParser public SilkPullParser(Reader input) throws IOException { - buffer = new BufferedReader(input, 1024 * 1024); // use 1MB buffer size - lexer = new SilkLexer(); - parser = new SilkParser(null); + threadPool = Executors.newFixedThreadPool(2); + threadPool.execute(new SilkEventProducer(input)); } + protected class SilkEventProducer implements Runnable + { + private final SilkLexer lexer; + private final SilkParser parser; + private final BufferedReader buffer; + + public SilkEventProducer(Reader input) + { + buffer = new BufferedReader(input, 1024 * 1024); // use 1MB buffer size + lexer = new SilkLexer(); + parser = new SilkParser(null); + } + + public void push(SilkEvent e) + { + try + { + eventQueue.put(e); + } + catch (InterruptedException e1) + { + e1.printStackTrace(); + } + } + + public void run() + { + while (!foundEOF) + { + readNext(); + } + } + + public void readNext() + { + // read next line + String line = null; + try + { + // line without newline characters, '\n' and '\r' + line = buffer.readLine(); + lineCount++; + } + catch (IOException e) + { + throw new XerialError(XerialErrorCode.IO_EXCEPTION, String.format("line=%d: %s", lineCount, e + .getMessage())); + } + + if (line == null) + { + // EOF + push(EOFEvent); + foundEOF = true; + return; + } + + // 40000 lines/sec + + // // dummy + // if (true) + // { + // SilkNode node = new SilkNode(); + // node.setName("dummy"); + // node.setNodeIndent("-"); + // push(new SilkEvent(SilkEventType.NODE, node)); + // //push(BlankLineEvent); + // return; + // } + + if (line.length() <= 0) + { + push(BlankLineEvent); + return; + } + + if (line.startsWith("%")) + { + push(new SilkEvent(SilkEventType.PREAMBLE, new SilkPreamble(line))); + return; + } + else if (line.startsWith("--")) + { + push(new SilkEvent(SilkEventType.MULTILINE_SEPARATOR, null)); + return; + } + else if (line.startsWith(">>")) + { + push(new SilkEvent(SilkEventType.MULTILINE_ENTRY_SEPARATOR, null)); + return; + } + + // 39000 lines/sec + + // remove leading and trailing white spaces (' ') + String trimmedLine = line.trim(); + if (trimmedLine.length() <= 0) + { + push(BlankLineEvent); + return; + } + + // comment line + if (trimmedLine.startsWith("#")) + { + // ignore the comment line + return; + } + + // 36000 lines / sec + + if (!(trimmedLine.startsWith("-") || trimmedLine.startsWith("@"))) + { + SilkDataLine dataLine = new SilkDataLine(sanitizeDataLine(trimmedLine)); + push(new SilkEvent(SilkEventType.DATA_LINE, dataLine)); + return; + } + + // 17000 lines/sec + + // lexical analysis + lexer.resetContext(); + lexer.setCharStream(new ANTLRStringStream(line)); + + // 17500 lines/sec + + try + { + CommonTokenStream tokenStream = new CommonTokenStream(lexer); + parser.setTokenStream(tokenStream); + + // 17000 lines/sec + + silkLine_return ret = parser.silkLine(); + Tree t = (Tree) ret.getTree(); + + // 8500 -> 12000 lines/sec + + switch (t.getType()) + { + case SilkParser.Function: + { + SilkFunction func = BeanUtilImpl.createBeanFromParseTree(SilkFunction.class, t, + SilkParser.tokenNames); + push(new SilkEvent(SilkEventType.FUNCTION, func)); + return; + } + case SilkParser.SilkNode: + { + SilkNode node = BeanUtilImpl.createBeanFromParseTree(SilkNode.class, t, SilkParser.tokenNames); + push(new SilkEvent(SilkEventType.NODE, node)); + return; + } + default: + throw new XerialError(XerialErrorCode.INVALID_INPUT, String.format( + "line=%d: invalid data type: %s", lineCount, parser.getTokenNames()[t.getType()])); + } + + // 1500 lines/sec + } + catch (RecognitionException e) + { + throw new XerialError(XerialErrorCode.INVALID_INPUT, String.format("parse error line=%d: %s", + lineCount, e.getMessage())); + } + catch (XerialException e) + { + throw new XerialError(e.getErrorCode(), e); + } + + } + + } + + private ArrayDeque prefetchedEventQueue = new ArrayDeque(); + public boolean hasNext() throws XerialException { - if (!eventQueue.isEmpty()) + if (!prefetchedEventQueue.isEmpty()) return true; if (foundEOF) - return false; - - while (!foundEOF && eventQueue.isEmpty()) - fillQueue(); + return !eventQueue.isEmpty(); + try + { + prefetchedEventQueue.addLast(eventQueue.take()); + } + catch (InterruptedException e) + { + foundEOF = true; + } return hasNext(); } public SilkEvent next() throws XerialException { - if (!eventQueue.isEmpty()) - return eventQueue.removeFirst(); + if (!prefetchedEventQueue.isEmpty()) + return prefetchedEventQueue.removeFirst(); - if (foundEOF) + if (foundEOF && eventQueue.isEmpty()) return null; - while (!foundEOF && eventQueue.isEmpty()) - fillQueue(); + try + { + prefetchedEventQueue.addLast(eventQueue.take()); + } + catch (InterruptedException e1) + { + foundEOF = true; + } return next(); } - public void push(SilkEvent e) - { - eventQueue.addLast(e); - } - public static String sanitizeDataLine(String line) { if (line.startsWith("\\")) @@ -166,141 +351,8 @@ public class SilkPullParser return line; } - public void fillQueue() throws XerialException - { - if (foundEOF) - return; - - // read next line - String line = null; - try - { - // line without newline characters, '\n' and '\r' - line = buffer.readLine(); - lineCount++; - } - catch (IOException e) - { - throw new XerialException(XerialErrorCode.IO_EXCEPTION, String.format("line=%d: %s", lineCount, e - .getMessage())); - } - - if (line == null) - { - // EOF - push(EOFEvent); - foundEOF = true; - return; - } - - // 40000 lines/sec - - // // dummy - // if (true) - // { - // SilkNode node = new SilkNode(); - // node.setName("dummy"); - // node.setNodeIndent("-"); - // push(new SilkEvent(SilkEventType.NODE, node)); - // //push(BlankLineEvent); - // return; - // } - - if (line.length() <= 0) - { - push(BlankLineEvent); - return; - } - - if (line.startsWith("%")) - { - push(new SilkEvent(SilkEventType.PREAMBLE, new SilkPreamble(line))); - return; - } - else if (line.startsWith("--")) - { - push(new SilkEvent(SilkEventType.MULTILINE_SEPARATOR, null)); - return; - } - else if (line.startsWith(">>")) - { - push(new SilkEvent(SilkEventType.MULTILINE_ENTRY_SEPARATOR, null)); - return; - } - - // 39000 lines/sec - - // remove leading and trailing white spaces (' ') - String trimmedLine = line.trim(); - if (trimmedLine.length() <= 0) - { - push(BlankLineEvent); - return; - } - - // comment line - if (trimmedLine.startsWith("#")) - { - // ignore the comment line - return; - } - - // 36000 lines / sec - - if (!(trimmedLine.startsWith("-") || trimmedLine.startsWith("@"))) - { - SilkDataLine dataLine = new SilkDataLine(sanitizeDataLine(trimmedLine)); - push(new SilkEvent(SilkEventType.DATA_LINE, dataLine)); - return; - } - - // 17000 lines/sec - - // lexical analysis - lexer.resetContext(); - lexer.setCharStream(new ANTLRStringStream(line)); - - // 17500 lines/sec - - try - { - CommonTokenStream tokenStream = new CommonTokenStream(lexer); - parser.setTokenStream(tokenStream); - - // 17000 lines/sec - - silkLine_return ret = parser.silkLine(); - Tree t = (Tree) ret.getTree(); - - // 8500 -> 12000 lines/sec - - switch (t.getType()) - { - case SilkParser.Function: - { - SilkFunction func = BeanUtilImpl.createBeanFromParseTree(SilkFunction.class, t, SilkParser.tokenNames); - push(new SilkEvent(SilkEventType.FUNCTION, func)); - return; - } - case SilkParser.SilkNode: - { - SilkNode node = BeanUtilImpl.createBeanFromParseTree(SilkNode.class, t, SilkParser.tokenNames); - push(new SilkEvent(SilkEventType.NODE, node)); - return; - } - default: - throw new XerialException(XerialErrorCode.INVALID_INPUT, String.format( - "line=%d: invalid data type: %s", lineCount, parser.getTokenNames()[t.getType()])); - } - - // 1500 lines/sec - } - catch (RecognitionException e) - { - throw new XerialException(XerialErrorCode.INVALID_INPUT, String.format("parse error line=%d: %s", - lineCount, e.getMessage())); - } - } + protected void fillQueue() throws XerialException + {} public int getLine() { diff --git a/src/test/java/org/xerial/silk/SilkStreamReaderTest.java b/src/test/java/org/xerial/silk/SilkStreamReaderTest.java index 2de4f28..890dcb4 100644 --- a/src/test/java/org/xerial/silk/SilkStreamReaderTest.java +++ b/src/test/java/org/xerial/silk/SilkStreamReaderTest.java @@ -84,7 +84,7 @@ public class SilkStreamReaderTest while ((e = reader.next()) != null) { count++; - if (count % 100000 == 0) + if (count % 1000000 == 0) { int line = reader.getNumReadLine(); double percentage = (line / 10145176.0) * 100; @@ -97,6 +97,7 @@ public class SilkStreamReaderTest _logger.info(String.format("time=%s", timer.getElapsedTime())); // best time: 4200 lines/sec (2009 Apr. 23) + // 6585 lines/sec (after threading SilkPullParser) } @Test @@ -122,6 +123,7 @@ public class SilkStreamReaderTest _logger.info(String.format("time=%s", timer.getElapsedTime())); // best time: 13000 lines/sec + // 12500 lines/sec (after threading SilkPullParser) } } -- 2.11.0