//--------------------------------------
package org.xerial.silk;
-import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.antlr.runtime.ANTLRStringStream;
-import org.antlr.runtime.CommonTokenStream;
-import org.antlr.runtime.RecognitionException;
-import org.antlr.runtime.tree.Tree;
-import org.xerial.core.XerialError;
-import org.xerial.core.XerialErrorCode;
import org.xerial.core.XerialException;
-import org.xerial.silk.impl.SilkDataLine;
import org.xerial.silk.impl.SilkElement;
-import org.xerial.silk.impl.SilkFunction;
-import org.xerial.silk.impl.SilkLexer;
-import org.xerial.silk.impl.SilkNode;
-import org.xerial.silk.impl.SilkParser;
-import org.xerial.silk.impl.SilkPreamble;
-import org.xerial.silk.impl.SilkParser.silkLine_return;
import org.xerial.util.ArrayDeque;
-import org.xerial.util.bean.impl.BeanUtilImpl;
import org.xerial.util.log.Logger;
/**
private int lineCount = 0;
private final int eventQueueMax = 10000;
- private boolean foundEOF = false;
+ volatile private boolean foundEOF = false;
private ArrayBlockingQueue<SilkEvent> eventQueue = new ArrayBlockingQueue<SilkEvent>(eventQueueMax);
}
}
+ private SilkPushParser parser;
private ExecutorService threadPool;
+ private Future<Boolean> future;
public SilkPullParser(InputStream input) throws IOException
{
public SilkPullParser(Reader input) throws IOException
{
- threadPool = Executors.newFixedThreadPool(2);
- threadPool.execute(new SilkEventProducer(input));
+ threadPool = Executors.newFixedThreadPool(1);
+
+ parser = new SilkPushParser(input);
+ future = threadPool.submit(new SilkEventProducer());
}
- protected class SilkEventProducer implements Runnable
+ private class SilkEventProducer implements Callable<Boolean>, SilkEventHandler
{
- private final SilkLexer lexer;
- private final SilkParser parser;
- private final BufferedReader buffer;
-
- public SilkEventProducer(Reader input)
- {
- buffer = new BufferedReader(input, 1024 * 1024); // use 1MB buffer size
- lexer = new SilkLexer();
- parser = new SilkParser(null);
- }
+ public SilkEventProducer()
+ {}
- public void push(SilkEvent e)
+ public void handle(SilkEvent event) throws XerialException
{
try
{
- eventQueue.put(e);
+ //if (!Thread.currentThread().isInterrupted())
+ eventQueue.put(event);
}
- catch (InterruptedException e1)
+ catch (InterruptedException e)
{
- e1.printStackTrace();
- }
- }
- public void run()
- {
- while (!foundEOF)
- {
- readNext();
}
}
- public void readNext()
+ public Boolean call() throws Exception
{
- // read next line
- String line = null;
- try
- {
- // line without newline characters, '\n' and '\r'
- line = buffer.readLine();
- lineCount++;
- }
- catch (IOException e)
- {
- throw new XerialError(XerialErrorCode.IO_EXCEPTION, String.format("line=%d: %s", lineCount, e
- .getMessage()));
- }
-
- if (line == null)
- {
- // EOF
- push(EOFEvent);
- foundEOF = true;
- return;
- }
-
- // 40000 lines/sec
-
- // // dummy
- // if (true)
- // {
- // SilkNode node = new SilkNode();
- // node.setName("dummy");
- // node.setNodeIndent("-");
- // push(new SilkEvent(SilkEventType.NODE, node));
- // //push(BlankLineEvent);
- // return;
- // }
-
- if (line.length() <= 0)
- {
- push(BlankLineEvent);
- return;
- }
-
- if (line.startsWith("%"))
- {
- push(new SilkEvent(SilkEventType.PREAMBLE, new SilkPreamble(line)));
- return;
- }
- else if (line.startsWith("--"))
- {
- push(new SilkEvent(SilkEventType.MULTILINE_SEPARATOR, null));
- return;
- }
- else if (line.startsWith(">>"))
- {
- push(new SilkEvent(SilkEventType.MULTILINE_ENTRY_SEPARATOR, null));
- return;
- }
-
- // 39000 lines/sec
-
- // remove leading and trailing white spaces (' ')
- String trimmedLine = line.trim();
- if (trimmedLine.length() <= 0)
- {
- push(BlankLineEvent);
- return;
- }
-
- // comment line
- if (trimmedLine.startsWith("#"))
- {
- // ignore the comment line
- return;
- }
-
- // 36000 lines / sec
-
- if (!(trimmedLine.startsWith("-") || trimmedLine.startsWith("@")))
- {
- SilkDataLine dataLine = new SilkDataLine(sanitizeDataLine(trimmedLine));
- push(new SilkEvent(SilkEventType.DATA_LINE, dataLine));
- return;
- }
-
- // 17000 lines/sec
-
- // lexical analysis
- lexer.resetContext();
- lexer.setCharStream(new ANTLRStringStream(line));
-
- // 17500 lines/sec
-
- try
- {
- CommonTokenStream tokenStream = new CommonTokenStream(lexer);
- parser.setTokenStream(tokenStream);
-
- // 17000 lines/sec
-
- silkLine_return ret = parser.silkLine();
- Tree t = (Tree) ret.getTree();
-
- // 8500 -> 12000 lines/sec
-
- switch (t.getType())
- {
- case SilkParser.Function:
- {
- SilkFunction func = BeanUtilImpl.createBeanFromParseTree(SilkFunction.class, t,
- SilkParser.tokenNames);
- push(new SilkEvent(SilkEventType.FUNCTION, func));
- return;
- }
- case SilkParser.SilkNode:
- {
- SilkNode node = BeanUtilImpl.createBeanFromParseTree(SilkNode.class, t, SilkParser.tokenNames);
- push(new SilkEvent(SilkEventType.NODE, node));
- return;
- }
- default:
- throw new XerialError(XerialErrorCode.INVALID_INPUT, String.format(
- "line=%d: invalid data type: %s", lineCount, parser.getTokenNames()[t.getType()]));
- }
-
- // 1500 lines/sec
- }
- catch (RecognitionException e)
- {
- throw new XerialError(XerialErrorCode.INVALID_INPUT, String.format("parse error line=%d: %s",
- lineCount, e.getMessage()));
- }
- catch (XerialException e)
- {
- throw new XerialError(e.getErrorCode(), e);
- }
-
+ parser.parse(this);
+ foundEOF = true;
+ return true;
}
}
if (foundEOF)
return !eventQueue.isEmpty();
- try
- {
- SilkEvent e = null;
- while (!foundEOF && (e = eventQueue.poll(1, TimeUnit.MILLISECONDS)) == null)
- {
+ fetchNext();
- }
- if (e != null)
- prefetchedEventQueue.addLast(e);
- }
- catch (InterruptedException e)
- {
- foundEOF = true;
- }
return hasNext();
}
if (foundEOF)
return eventQueue.poll();
+ fetchNext();
+
+ return next();
+ }
+
+ private void fetchNext() throws XerialException
+ {
try
{
-
SilkEvent e = null;
while (!foundEOF && (e = eventQueue.poll(1, TimeUnit.MILLISECONDS)) == null)
- {
-
- }
+ {}
if (e != null)
prefetchedEventQueue.addLast(e);
}
- catch (InterruptedException e1)
+ catch (InterruptedException e)
{
foundEOF = true;
}
-
- return next();
+ return;
}
- public static String sanitizeDataLine(String line)
- {
- if (line.startsWith("\\"))
- return removeLineComment(line.substring(1));
- else
- return removeLineComment(line);
- }
-
- private static Pattern lineCommentPattern = Pattern.compile("[^\"]*(\\\"[^\"]*\\\")*[^\"]*(#.*)");
-
- public static String removeLineComment(String line)
- {
- if (!line.contains("#"))
- return line;
-
- Matcher m = lineCommentPattern.matcher(line);
- if (m.matches())
- {
- int lineCommentStart = m.start(2);
- if (lineCommentStart != -1)
- line = line.substring(0, lineCommentStart);
- }
- return line;
- }
-
- protected void fillQueue() throws XerialException
- {}
-
- public int getLine()
+ public int getNumReadLine()
{
- return lineCount;
+ return parser.getNumReadLine();
}
}