OSDN Git Service

changed the implementation of SilkStreamReader:
authorleo <leo@ae02f08e-27ec-0310-ae8c-8ba02fe2eafd>
Fri, 5 Jun 2009 05:58:53 +0000 (05:58 +0000)
committerleo <leo@ae02f08e-27ec-0310-ae8c-8ba02fe2eafd>
Fri, 5 Jun 2009 05:58:53 +0000 (05:58 +0000)
 using push -> pull style conversion

git-svn-id: http://www.xerial.org/svn/project/XerialJ/trunk/xerial-core@3361 ae02f08e-27ec-0310-ae8c-8ba02fe2eafd

src/main/java/org/xerial/silk/SilkLineFastParser.java
src/main/java/org/xerial/silk/SilkParser.java
src/main/java/org/xerial/silk/SilkStreamReader.java

index 1b42e24..7e1d85c 100644 (file)
@@ -47,6 +47,8 @@ import org.xerial.util.log.Logger;
 /**
  * Parsing Silk using threads.
  * 
+ * TODO fix a bug that does not properly report some nodes, for some timing
+ * 
  * @author leo
  * 
  */
index fb99f06..9c5bee2 100644 (file)
@@ -78,7 +78,7 @@ public class SilkParser implements SilkEventHandler
 {
     private static Logger _logger = Logger.getLogger(SilkStreamReader.class);
 
-    private final SilkLineFastParser parser;
+    private final SilkLinePushParser parser;
     private final SilkEnv parseContext;
     private TreeEventQueue eventQueue = new TreeEventQueue();
     private final ArrayDeque<TreeStreamReader> readerStack = new ArrayDeque<TreeStreamReader>();
@@ -105,11 +105,9 @@ public class SilkParser implements SilkEventHandler
      * @param env
      * @throws IOException
      */
-    private SilkParser(Reader input, SilkEnv env) throws IOException
+    public SilkParser(Reader input, SilkEnv env) throws IOException
     {
-        this.config = new SilkParserConfig();
-        this.parser = new SilkLineFastParser(input);
-        this.parseContext = env;
+        this(input, env, new SilkParserConfig());
     }
 
     /**
@@ -135,13 +133,22 @@ public class SilkParser implements SilkEventHandler
 
     public SilkParser(URL resource, SilkEnv env, SilkParserConfig config) throws IOException
     {
+        this(new InputStreamReader(resource.openStream()), SilkEnv.newEnv(env, getResourceBasePath(resource)), config);
+    }
+
+    public SilkParser(Reader input, SilkEnv env, SilkParserConfig config) throws IOException
+    {
         this.config = config;
+        this.parser = new SilkLinePushParser(input);
+        this.parseContext = env;
+    }
+
+    static String getResourceBasePath(URL resource)
+    {
         String path = resource.toExternalForm();
         int fileNamePos = path.lastIndexOf("/");
         String resourceBasePath = fileNamePos > 0 ? path.substring(0, fileNamePos) : null;
-
-        this.parser = new SilkLineFastParser(new InputStreamReader(resource.openStream()), config);
-        this.parseContext = SilkEnv.newEnv(env, resourceBasePath);
+        return resourceBasePath;
     }
 
     /**
@@ -151,7 +158,7 @@ public class SilkParser implements SilkEventHandler
      * @param resourceName
      * @return
      */
-    private static String getResourcePath(String resourceBasePath, String resourceName)
+    static String getResourcePath(String resourceBasePath, String resourceName)
     {
         String resourcePath = resourceBasePath;
         if (!resourcePath.endsWith("/"))
@@ -315,7 +322,7 @@ public class SilkParser implements SilkEventHandler
                     {
                         if (columnIndex < columns.length)
                         {
-                            String columnData = columns[columnIndex++];
+                            String columnData = columns[columnIndex++].trim();
                             if (columnData.length() > 0)
                                 evalDatalineColumn(child, columnData);
                         }
@@ -685,7 +692,7 @@ public class SilkParser implements SilkEventHandler
             }
             else
             {
-                String[] csv = tabSplit.split(columnData, 0);
+                String[] csv = commaSplit.split(columnData, 0);
                 for (String each : csv)
                 {
                     String value = each.trim();
index dc46a3f..e9a02b7 100644 (file)
@@ -29,43 +29,19 @@ import java.io.IOException;
 import java.io.InputStream;\r
 import java.io.InputStreamReader;\r
 import java.io.Reader;\r
-import java.lang.reflect.Field;\r
 import java.net.URL;\r
-import java.util.ArrayList;\r
-import java.util.Collections;\r
-import java.util.Comparator;\r
-import java.util.HashMap;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.TreeMap;\r
+import java.util.concurrent.ArrayBlockingQueue;\r
+import java.util.concurrent.Callable;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.TimeUnit;\r
 \r
-import org.xerial.core.XerialError;\r
-import org.xerial.core.XerialErrorCode;\r
 import org.xerial.core.XerialException;\r
-import org.xerial.json.JSONArray;\r
-import org.xerial.json.JSONException;\r
-import org.xerial.json.JSONObject;\r
-import org.xerial.json.JSONUtil;\r
-import org.xerial.json.JSONValue;\r
-import org.xerial.json.JSONValueType;\r
-import org.xerial.silk.impl.SilkDataLine;\r
-import org.xerial.silk.impl.SilkFunction;\r
-import org.xerial.silk.impl.SilkFunctionArg;\r
-import org.xerial.silk.impl.SilkJSONValue;\r
-import org.xerial.silk.impl.SilkNode;\r
-import org.xerial.silk.impl.SilkNodeOccurrence;\r
-import org.xerial.silk.impl.SilkValue;\r
-import org.xerial.silk.plugin.SilkFunctionArgument;\r
-import org.xerial.silk.plugin.SilkFunctionPlugin;\r
 import org.xerial.util.ArrayDeque;\r
-import org.xerial.util.FileResource;\r
-import org.xerial.util.StringUtil;\r
-import org.xerial.util.bean.TypeInfo;\r
 import org.xerial.util.log.Logger;\r
-import org.xerial.util.reflect.ReflectionUtil;\r
 import org.xerial.util.tree.TreeEvent;\r
+import org.xerial.util.tree.TreeEventHandler;\r
 import org.xerial.util.tree.TreeStreamReader;\r
-import org.xerial.util.xml.impl.TreeEventQueue;\r
 \r
 /**\r
  * {@link TreeStreamReader} implementation for the Silk data format.\r
@@ -77,13 +53,15 @@ public class SilkStreamReader implements TreeStreamReader
 {\r
     private static Logger _logger = Logger.getLogger(SilkStreamReader.class);\r
 \r
-    private final SilkLinePullParser parser;\r
-    private final SilkEnv parseContext;\r
-    private TreeEventQueue eventQueue = new TreeEventQueue();\r
-    private final ArrayDeque<TreeStreamReader> readerStack = new ArrayDeque<TreeStreamReader>();\r
+    private final SilkParser parser;\r
+    private final ArrayBlockingQueue<TreeEvent> eventQueue = new ArrayBlockingQueue<TreeEvent>(10000);\r
+    private final ArrayDeque<TreeEvent> prefetchedEventQueue = new ArrayDeque<TreeEvent>();\r
 \r
     private long numReadLine = 0;\r
 \r
+    // for changing push-parser to pull parser\r
+    private final ExecutorService threadManager;\r
+\r
     /**\r
      * Creates a new reader with the specified input stream\r
      * \r
@@ -115,24 +93,56 @@ public class SilkStreamReader implements TreeStreamReader
      */\r
     public SilkStreamReader(Reader input, SilkEnv env) throws IOException\r
     {\r
-        this.parser = new SilkLinePullParser(input);\r
-        this.parseContext = env;\r
+        this.parser = new SilkParser(input, env);\r
+\r
+        this.threadManager = Executors.newFixedThreadPool(1);\r
+        threadManager.submit(new BackgroundParser());\r
+\r
     }\r
 \r
-    /**\r
-     * Concatenates the base path and the resource name\r
-     * \r
-     * @param resourceBasePath\r
-     * @param resourceName\r
-     * @return\r
-     */\r
-    private static String getResourcePath(String resourceBasePath, String resourceName)\r
+    private class BackgroundParser implements Callable<Void>\r
     {\r
-        String resourcePath = resourceBasePath;\r
-        if (!resourcePath.endsWith("/"))\r
-            resourcePath += "/";\r
-        resourcePath += resourceName;\r
-        return resourcePath;\r
+\r
+        public Void call() throws Exception\r
+        {\r
+            try\r
+            {\r
+                parser.parse(new TreeEventHandler() {\r
+                    public void finish() throws Exception\r
+                    {\r
+                        hasParsingFinished = true;\r
+                    }\r
+\r
+                    public void init() throws Exception\r
+                    {\r
+                        hasParsingFinished = false;\r
+                    }\r
+\r
+                    public void leaveNode(String nodeName) throws Exception\r
+                    {\r
+                        eventQueue.put(TreeEvent.newLeaveEvent(nodeName));\r
+                    }\r
+\r
+                    public void text(String nodeName, String textDataFragment) throws Exception\r
+                    {\r
+                        eventQueue.put(TreeEvent.newTextEvent(nodeName, textDataFragment));\r
+                    }\r
+\r
+                    public void visitNode(String nodeName, String immediateNodeValue) throws Exception\r
+                    {\r
+                        eventQueue.put(TreeEvent.newVisitEvent(nodeName, immediateNodeValue));\r
+\r
+                    }\r
+                });\r
+\r
+                return null;\r
+            }\r
+            finally\r
+            {\r
+                threadManager.shutdown();\r
+            }\r
+        }\r
+\r
     }\r
 \r
     /**\r
@@ -144,9 +154,8 @@ public class SilkStreamReader implements TreeStreamReader
      */\r
     public SilkStreamReader(String resourceBasePath, String resourceName) throws IOException\r
     {\r
-        this.parser = new SilkLinePullParser(new BufferedReader(new InputStreamReader(SilkWalker.class\r
-                .getResourceAsStream(getResourcePath(resourceBasePath, resourceName)))));\r
-        this.parseContext = SilkEnv.newEnv(resourceBasePath);\r
+        this(new BufferedReader(new InputStreamReader(SilkWalker.class.getResourceAsStream(SilkParser.getResourcePath(\r
+                resourceBasePath, resourceName)))), SilkEnv.newEnv(resourceBasePath));\r
     }\r
 \r
     /**\r
@@ -162,18 +171,14 @@ public class SilkStreamReader implements TreeStreamReader
 \r
     public SilkStreamReader(URL resource, SilkEnv env) throws IOException\r
     {\r
-        String path = resource.toExternalForm();\r
-        int fileNamePos = path.lastIndexOf("/");\r
-        String resourceBasePath = fileNamePos > 0 ? path.substring(0, fileNamePos) : null;\r
-\r
-        this.parser = new SilkLinePullParser(new BufferedReader(new InputStreamReader(resource.openStream())));\r
-        this.parseContext = SilkEnv.newEnv(env, resourceBasePath);\r
+        this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser\r
+                .getResourceBasePath(resource)));\r
     }\r
 \r
     public TreeEvent peekNext() throws XerialException\r
     {\r
         if (hasNext())\r
-            return eventQueue.peekFirst();\r
+            return prefetchedEventQueue.getFirst();\r
         else\r
             return null;\r
     }\r
@@ -181,237 +186,19 @@ public class SilkStreamReader implements TreeStreamReader
     public TreeEvent next() throws XerialException\r
     {\r
         if (hasNext())\r
-            return getNextEvent();\r
-        else\r
-            return null;\r
-    }\r
-\r
-    /**\r
-     * Enqueues a visit event.\r
-     * \r
-     * @param nodeName\r
-     * @param immediateNodeValue\r
-     * @throws XerialException\r
-     */\r
-    private void visit(String nodeName, String immediateNodeValue) throws XerialException\r
-    {\r
-        eventQueue.push(TreeEvent.newVisitEvent(nodeName, immediateNodeValue));\r
-    }\r
-\r
-    /**\r
-     * Enqueues a leave event\r
-     * \r
-     * @param nodeName\r
-     * @throws XerialException\r
-     */\r
-    private void leave(String nodeName) throws XerialException\r
-    {\r
-        eventQueue.push(TreeEvent.newLeaveEvent(nodeName));\r
-    }\r
-\r
-    /**\r
-     * Enqueues a text event\r
-     * \r
-     * @param textFragment\r
-     * @throws XerialException\r
-     */\r
-    private void text(String textFragment) throws XerialException\r
-    {\r
-        eventQueue.push(TreeEvent.newTextEvent(parseContext.getContextNode().getName(), textFragment));\r
-    }\r
-\r
-    /**\r
-     * Closed pre-opened contexts up to the specified indent level\r
-     * \r
-     * @param newIndentLevel\r
-     * @throws XerialException\r
-     */\r
-    private void closeContextUpTo(int newIndentLevel) throws XerialException\r
-    {\r
-        while (!parseContext.isContextNodeStackEmpty())\r
-        {\r
-            SilkContext context = parseContext.peekLastContext();\r
-            SilkNode node = context.contextNode;\r
-            if (node.getIndentLevel() >= newIndentLevel)\r
-            {\r
-                parseContext.popLastContext();\r
-\r
-                if (parseContext.isAttributeOpen)\r
-                {\r
-                    // close attribute \r
-                    SilkNode attribute = node.getChildNodes().get(parseContext.contextNodeAttributeCursor);\r
-                    leave(attribute.getName());\r
-                    leave(node.getName());\r
-                    parseContext.isAttributeOpen = false;\r
-                }\r
-\r
-                if (context.isOpen)\r
-                {\r
-                    // close context\r
-                    String nodeName = node.getName();\r
-                    leave(nodeName);\r
-                }\r
-            }\r
-            else\r
-                return;\r
-        }\r
-    }\r
-\r
-    /**\r
-     * Opens a new context for the given node\r
-     * \r
-     * @param node\r
-     *            new context node\r
-     * @param visitor\r
-     * @throws XerialException\r
-     */\r
-    private void openContext(SilkNode node) throws XerialException\r
-    {\r
-        int indentLevel = node.getIndentLevel();\r
-        if (indentLevel != SilkNode.NO_INDENT)\r
-            indentLevel += parseContext.getIndentationOffset();\r
-\r
-        closeContextUpTo(indentLevel);\r
-        openContext_internal(node);\r
-    }\r
-\r
-    private void openContext_internal(SilkNode node) throws XerialException\r
-    {\r
-        if (node.getName() == null)\r
-        {\r
-            // no name nodes must hierarchically organize attribute nodes\r
-            for (SilkNode eachChild : node.getChildNodes())\r
-            {\r
-                eachChild.setNodeIndent(node.getNodeIndent());\r
-                openContext_internal(eachChild);\r
-            }\r
-            return;\r
-        }\r
-\r
-        SilkContext currentContext = new SilkContext(node, true);\r
-        parseContext.pushContext(currentContext);\r
-\r
-        SilkNodeOccurrence occurrence = node.getOccurrence();\r
-        if (occurrence.isSchemaOnlyNode())\r
         {\r
-            currentContext.isOpen = false;\r
-            // reset the attribute cursor\r
-            parseContext.contextNodeAttributeCursor = 0;\r
-            parseContext.isAttributeOpen = false;\r
-            return; // do not invoke visit events\r
-        }\r
-\r
-        String nodeName = node.getName();\r
-        SilkValue textValue = node.getValue();\r
-\r
-        // process text values attached to the node\r
-        if (textValue != null)\r
-        {\r
-            // When the text data is JSON, traverses the JSON data \r
-            if (textValue.isJSON())\r
-            {\r
-\r
-                SilkJSONValue jsonValue = SilkJSONValue.class.cast(textValue);\r
-                if (jsonValue.isObject())\r
-                {\r
-                    visit(nodeName, null);\r
-                    JSONObject jsonObj = new JSONObject(jsonValue.getValue());\r
-                    walkJSONObject(jsonObj);\r
-                }\r
-                else\r
-                {\r
-                    currentContext.isOpen = false;\r
-                    JSONArray jsonArray = new JSONArray(jsonValue.getValue());\r
-                    walkJSONAray(jsonArray, nodeName);\r
-                }\r
-            }\r
-            else if (textValue.isFunction())\r
-            {\r
-                // evaluate the function \r
-                visit(nodeName, null);\r
-                SilkFunction function = SilkFunction.class.cast(textValue);\r
-                evalFunction(function);\r
-\r
-                return;\r
-            }\r
-            else\r
-            {\r
-                // Simple text value will be reported as it is.\r
-                visit(nodeName, textValue.toString());\r
-            }\r
+            TreeEvent e = prefetchedEventQueue.removeFirst();\r
+            return e;\r
         }\r
         else\r
-        {\r
-            if (occurrence == SilkNodeOccurrence.ZERO_OR_MORE)\r
-            {\r
-                // CSV data\r
-                return; // do not invoke visit events\r
-            }\r
-\r
-            // Report a visit event without text value\r
-            visit(nodeName, null);\r
-        }\r
-\r
-        // Traverse attribute nodes having text values. If no text value is specified for these attributes, \r
-        // they are schema elements for the following DATA_LINE. \r
-        for (SilkNode eachChild : node.getChildNodes())\r
-        {\r
-            if (eachChild.hasValue())\r
-            {\r
-                openContext(eachChild);\r
-            }\r
-        }\r
-\r
-    }\r
-\r
-    private static class FunctionReader implements TreeStreamReader\r
-    {\r
-        SilkFunctionPlugin plugin;\r
-\r
-        public FunctionReader(SilkFunctionPlugin plugin)\r
-        {\r
-            this.plugin = plugin;\r
-        }\r
-\r
-        public TreeEvent peekNext() throws XerialException\r
-        {\r
-            return plugin.peekNext();\r
-        }\r
-\r
-        public TreeEvent next() throws XerialException\r
-        {\r
-            return plugin.next();\r
-        }\r
-    }\r
-\r
-    /**\r
-     * Evaluate the function\r
-     * \r
-     * @param function\r
-     * @throws XerialException\r
-     */\r
-    private void evalFunction(SilkFunction function) throws XerialException\r
-    {\r
-        SilkFunctionPlugin plugin = getPlugin(function.getName());\r
-        if (plugin == null)\r
-        {\r
-            _logger.error(String.format("plugin %s not found", function.getName()));\r
-            return;\r
-        }\r
-        // fill the function argument to the plugin instance\r
-        populate(plugin, function.getArgumentList());\r
-\r
-        // evaluate the function\r
-        SilkEnv env = parseContext.newEnvFor(function);\r
-        plugin.init(env);\r
-\r
-        readerStack.addLast(new FunctionReader(plugin));\r
+            return null;\r
     }\r
 \r
     /**\r
      * Has finished reading the stream?\r
      */\r
-    private boolean hasFinished = false;\r
+    private volatile boolean hasParsingFinished = false;\r
+    private boolean hasPrefetchFinished = false;\r
 \r
     /**\r
      * Is next event available?\r
@@ -421,624 +208,40 @@ public class SilkStreamReader implements TreeStreamReader
      */\r
     private boolean hasNext() throws XerialException\r
     {\r
-        if (!eventQueue.isEmpty())\r
+        if (!prefetchedEventQueue.isEmpty())\r
             return true;\r
 \r
-        if (hasFinished)\r
-            return false;\r
-\r
-        while (!hasFinished && eventQueue.isEmpty())\r
-            fillQueue();\r
-\r
-        return hasNext();\r
-    }\r
-\r
-    /**\r
-     * Retrieves the next event from the queue. If the event queue is empty,\r
-     * fill the queue with the next event\r
-     * \r
-     * @return the next event.\r
-     * @throws XerialException\r
-     */\r
-    private TreeEvent getNextEvent() throws XerialException\r
-    {\r
-        if (!eventQueue.isEmpty())\r
-            return eventQueue.pop();\r
-\r
-        if (hasFinished)\r
-            throw new XerialError(XerialErrorCode.INVALID_STATE,\r
-                    "hasNext() value must be checked before calling getNextEvent()");\r
-\r
-        fillQueue();\r
-        return getNextEvent();\r
-    }\r
-\r
-    private void walkMicroFormatRoot(SilkNode schemaNode, JSONArray value) throws XerialException\r
-    {\r
-        // e.g., exon(start, name)\r
-\r
-        if (schemaNode.hasManyOccurrences())\r
-        {\r
-            if (schemaNode.hasChildren())\r
-            {\r
-                // e.g., exon(start, name)*\r
-                // multiple occurrences: [[start, end], [start, end], ... ] \r
-                for (int i = 0; i < value.size(); i++)\r
-                {\r
-                    JSONArray eachElement = value.getJSONArray(i);\r
-                    if (eachElement == null)\r
-                        continue;\r
-\r
-                    visit(schemaNode.getName(), null);\r
-                    int index = 0;\r
-                    for (SilkNode eachSubSchema : schemaNode.getChildNodes())\r
-                    {\r
-                        walkMicroFormatElement(eachSubSchema, eachElement.get(index++));\r
-                    }\r
-                    leave(schemaNode.getName());\r
-                }\r
-            }\r
-            else\r
-            {\r
-                // e.g. QV*: [20, 50, 50]\r
-                for (int i = 0; i < value.size(); i++)\r
-                {\r
-                    visit(schemaNode.getName(), value.get(i).toString());\r
-                    leave(schemaNode.getName());\r
-                }\r
-            }\r
-        }\r
-        else\r
-        {\r
-            // [e1, e2, ...]\r
-            visit(schemaNode.getName(), null);\r
-            int index = 0;\r
-            if (schemaNode.getChildNodes().size() != value.size())\r
-            {\r
-                throw new XerialException(XerialErrorCode.INVALID_INPUT, String.format(\r
-                        "data format doesn't match: schema=%s, value=%s", schemaNode, value));\r
-            }\r
-            for (SilkNode each : schemaNode.getChildNodes())\r
-            {\r
-                walkMicroFormatElement(each, value.get(index++));\r
-            }\r
-            leave(schemaNode.getName());\r
-        }\r
-    }\r
-\r
-    private void walkMicroFormatElement(SilkNode schemaNode, JSONValue value) throws XerialException\r
-    {\r
-        if (schemaNode.hasChildren())\r
-        {\r
-            JSONArray array = value.getJSONArray();\r
-            if (array != null)\r
-                walkMicroFormatRoot(schemaNode, array);\r
-            else\r
-                throw new XerialException(XerialErrorCode.INVALID_INPUT, String.format(\r
-                        "data format doesn't match: schema=%s, value=%s", schemaNode, value));\r
-        }\r
-        else\r
-        {\r
-            visit(schemaNode.getName(), value.toString());\r
-            leave(schemaNode.getName());\r
-        }\r
-    }\r
-\r
-    private void evalDatalineColumn(SilkNode node, String columnData) throws XerialException\r
-    {\r
-        // 7600 lines/sec\r
-\r
-        if (node.hasChildren())\r
-        {\r
-            JSONArray array = new JSONArray(columnData);\r
-            walkMicroFormatRoot(node, array);\r
-            return;\r
-        }\r
-\r
-        switch (node.getOccurrence())\r
-        {\r
-        case ZERO_OR_MORE:\r
-        case ONE_OR_MORE:\r
-            if (columnData.startsWith("["))\r
-            {\r
-                // micro-data format\r
-\r
-                // 7900 lines/sec \r
-\r
-                JSONArray array = new JSONArray(columnData);\r
-                // 1400 lines/sec (ANTLR)   4200 lines/sec (JSONTokener)\r
-\r
-                // 5233 lines/sec w/o traversing JSONArray\r
-                //                // dummy code \r
-                //                if (true)\r
-                //                {\r
-                //                    visit(node.getName(), null);\r
-                //                    leave(node.getName());\r
-                //                    return;\r
-                //                }\r
-                walkMicroFormatRoot(node, array);\r
-                return;\r
-            }\r
-            else\r
-            {\r
-                String[] csv = columnData.split(",");\r
-                for (String each : csv)\r
-                {\r
-                    String value = each.trim();\r
-                    evalColumnData(node, value);\r
-                }\r
-                return;\r
-            }\r
-        default:\r
-            evalColumnData(node, columnData);\r
-            return;\r
-        }\r
-\r
-    }\r
-\r
-    private void evalColumnData(SilkNode node, String columnData) throws XerialException\r
-    {\r
-        try\r
-        {\r
-            if (node.hasChildren())\r
-            {\r
-                // micro-data format\r
-                JSONArray array = new JSONArray(columnData);\r
-                walkMicroFormatRoot(node, array);\r
-                return;\r
-            }\r
-\r
-            String dataType = node.getDataType();\r
-            if (dataType != null && dataType.equalsIgnoreCase("json"))\r
-            {\r
-                JSONValue json = JSONUtil.parseJSON(columnData);\r
-                if (json.getJSONObject() != null)\r
-                {\r
-                    if (node.getName().equals("_")) // no name object\r
-                    {\r
-                        walkJSONValue(json, node.getName());\r
-                    }\r
-                    else\r
-                    {\r
-                        visit(node.getName(), null);\r
-                        walkJSONValue(json, node.getName());\r
-                        leave(node.getName());\r
-                    }\r
-                }\r
-                else\r
-                    walkJSONValue(json, node.getName());\r
-            }\r
-            else\r
-            {\r
-                visit(node.getName(), StringUtil.unquote(columnData));\r
-                leave(node.getName());\r
-            }\r
-        }\r
-        catch (JSONException e)\r
-        {\r
-            throw new XerialException(e.getErrorCode(), String.format("line=%d: %s", parser.getNumReadLine(), e\r
-                    .getMessage()));\r
-        }\r
-\r
-    }\r
-\r
-    /**\r
-     * Fill the queue by retrieving the next event from the pull parser.\r
-     * \r
-     * @throws XerialException\r
-     */\r
-    private void fillQueue() throws XerialException\r
-    {\r
-        if (!readerStack.isEmpty())\r
-        {\r
-            TreeEvent e = readerStack.peekLast().next();\r
-            if (e == null)\r
-            {\r
-                readerStack.removeLast();\r
-                fillQueue();\r
-            }\r
-            else\r
-            {\r
-                eventQueue.push(e);\r
-                return;\r
-            }\r
-        }\r
-\r
-        if (!parser.hasNext())\r
-        {\r
-            // no more input data\r
-            closeContextUpTo(parseContext.getIndentationOffset());\r
-            hasFinished = true;\r
-            return;\r
-        }\r
-\r
-        SilkEvent currentEvent = parser.next();\r
-\r
-        // update the line count\r
-        numReadLine = parser.getNumReadLine();\r
-\r
-        if (_logger.isTraceEnabled())\r
-        {\r
-            _logger.trace("stack: " + parseContext.getContextNodeStack());\r
-            _logger.trace(currentEvent);\r
-        }\r
-\r
-        switch (currentEvent.getType())\r
-        {\r
-        case NODE:\r
-            // push context node\r
-            SilkNode newContextNode = SilkNode.class.cast(currentEvent.getElement());\r
-            openContext(newContextNode);\r
-            break;\r
-        case FUNCTION:\r
-            SilkFunction function = SilkFunction.class.cast(currentEvent.getElement());\r
-            evalFunction(function);\r
-            break;\r
-        case DATA_LINE:\r
-            // pop the context stack until finding a node for stream data node occurrence\r
-            while (!parseContext.isContextNodeStackEmpty())\r
-            {\r
-                SilkContext context = parseContext.peekLastContext();\r
-                SilkNode node = context.contextNode;\r
-                if (!node.getOccurrence().isFollowedByStreamData())\r
-                {\r
-                    parseContext.popLastContext();\r
-                    if (context.isOpen)\r
-                        leave(node.getName());\r
-                }\r
-                else\r
-                    break;\r
-            }\r
-\r
-            if (parseContext.isContextNodeStackEmpty())\r
-            {\r
-                // use default column names(c1, c2, ...) \r
-                SilkDataLine line = SilkDataLine.class.cast(currentEvent.getElement());\r
-                String[] columns = line.getDataLine().trim().split("\t");\r
-                int index = 1;\r
-                visit("row", null);\r
-                for (String each : columns)\r
-                {\r
-                    String columnName = String.format("c%d", index++);\r
-\r
-                    // TODO use evalColumnData\r
-                    visit(columnName, each);\r
-                    leave(columnName);\r
-                }\r
-                leave("row");\r
-            }\r
-            else\r
-            {\r
-                SilkContext context = parseContext.peekLastContext();\r
-                SilkNode schema = context.contextNode;\r
-                SilkDataLine line = SilkDataLine.class.cast(currentEvent.getElement());\r
-                switch (schema.getOccurrence())\r
-                {\r
-                case SEQUENCE:\r
-                    text(line.getDataLine());\r
-                    break;\r
-                case ZERO_OR_MORE:\r
-                    // CSV data\r
-                {\r
-                    evalDatalineColumn(schema, line.getDataLine());\r
-                }\r
-                    break;\r
-                case TABBED_SEQUENCE:\r
-                {\r
-                    String[] columns = line.getDataLine().trim().split("\t");\r
-                    int columnIndex = 0;\r
-                    visit(schema.getName(), schema.hasValue() ? schema.getValue().toString() : null);\r
-                    for (int i = 0; i < schema.getChildNodes().size(); i++)\r
-                    {\r
-                        SilkNode child = schema.getChildNodes().get(i);\r
-                        if (child.hasValue())\r
-                        {\r
-                            // output the default value for the column \r
-                            evalDatalineColumn(child, child.getValue().toString());\r
-                        }\r
-                        else\r
-                        {\r
-                            if (columnIndex < columns.length)\r
-                            {\r
-                                String columnData = columns[columnIndex++];\r
-                                if (columnData.length() > 0)\r
-                                    evalDatalineColumn(child, columnData);\r
-                            }\r
-                        }\r
-                    }\r
-                    leave(schema.getName());\r
-                    break;\r
-                }\r
-                case MULTILINE_SEQUENCE:\r
-                {\r
-                    int cursor = parseContext.contextNodeAttributeCursor;\r
-\r
-                    if (cursor >= schema.getChildNodes().size())\r
-                        break;\r
-\r
-                    SilkNode attributeNode = schema.getChildNodes().get(cursor);\r
-                    if (cursor == 0 && !parseContext.isAttributeOpen)\r
-                    {\r
-                        visit(schema.getName(), schema.hasValue() ? schema.getValue().toString() : null);\r
-                    }\r
-                    if (!parseContext.isAttributeOpen)\r
-                    {\r
-                        if (attributeNode.hasValue())\r
-                            visit(attributeNode.getName(), attributeNode.getValue().toString());\r
-                        else\r
-                            visit(attributeNode.getName(), null);\r
-\r
-                        parseContext.isAttributeOpen = true;\r
-                    }\r
-                    text(line.getDataLine().trim());\r
-                    break;\r
-                }\r
-                }\r
-            }\r
-            break;\r
-        case MULTILINE_ENTRY_SEPARATOR: // >>\r
-        {\r
-            SilkContext context = parseContext.peekLastContext();\r
-            SilkNode schema = context.contextNode;\r
-            if (parseContext.isAttributeOpen)\r
-            {\r
-                SilkNode attributeNode = schema.getChildNodes().get(parseContext.contextNodeAttributeCursor);\r
-                leave(attributeNode.getName());\r
-            }\r
-            leave(schema.getName());\r
-            // reset\r
-            parseContext.contextNodeAttributeCursor = 0;\r
-            parseContext.isAttributeOpen = false;\r
-            break;\r
-        }\r
-        case MULTILINE_SEPARATOR: // --\r
-        {\r
-            SilkContext context = parseContext.peekLastContext();\r
-            SilkNode schema = context.contextNode;\r
-            if (parseContext.isAttributeOpen)\r
-            {\r
-                SilkNode attributeNode = schema.getChildNodes().get(parseContext.contextNodeAttributeCursor);\r
-                leave(attributeNode.getName());\r
-            }\r
-            parseContext.contextNodeAttributeCursor++;\r
-            parseContext.isAttributeOpen = false;\r
-            break;\r
-        }\r
-        case BLANK_LINE:\r
-            break;\r
-\r
-        case PREAMBLE:\r
-            break;\r
+        assert (prefetchedEventQueue.isEmpty());\r
 \r
-        }\r
-\r
-    }\r
-\r
-    private void walkJSONAray(JSONArray jsonArray, String parentNodeName) throws XerialException\r
-    {\r
-        for (JSONValue each : jsonArray)\r
-        {\r
-            walkJSONValue(each, parentNodeName);\r
-        }\r
-    }\r
-\r
-    private void walkJSONObject(JSONObject jsonObj) throws XerialException\r
-    {\r
-        for (String key : jsonObj.keys())\r
-        {\r
-            JSONValue val = jsonObj.get(key);\r
-            walkJSONValue(val, key);\r
-        }\r
-    }\r
+        if (hasPrefetchFinished)\r
+            return false;\r
 \r
-    private void walkJSONValue(JSONValue value, String parentNodeName) throws XerialException\r
-    {\r
-        JSONValueType type = value.getValueType();\r
-        switch (type)\r
+        if (hasParsingFinished)\r
         {\r
-        case Array:\r
-            walkJSONAray(value.getJSONArray(), parentNodeName);\r
-            break;\r
-        case Object:\r
-            walkJSONObject(value.getJSONObject());\r
-            break;\r
-        case Boolean:\r
-            visit(parentNodeName, value.toString());\r
-            leave(parentNodeName);\r
-            break;\r
-        case Double:\r
-            visit(parentNodeName, value.toString());\r
-            leave(parentNodeName);\r
-            break;\r
-        case Integer:\r
-            visit(parentNodeName, value.toString());\r
-            leave(parentNodeName);\r
-            break;\r
-        case Null:\r
-            visit(parentNodeName, value.toString());\r
-            leave(parentNodeName);\r
-            break;\r
-        case String:\r
-            visit(parentNodeName, value.toString());\r
-            leave(parentNodeName);\r
-            break;\r
+            int count = eventQueue.drainTo(prefetchedEventQueue);\r
+            //_logger.debug("prefetch: " + count);\r
+            //_logger.debug("eventQueue size: " + eventQueue.size());\r
+            hasPrefetchFinished = true;\r
+            return hasNext();\r
         }\r
 \r
-    }\r
-\r
-    /**\r
-     * Plugin holder\r
-     */\r
-    private static Map<String, Class<SilkFunctionPlugin>> pluginTable = null;\r
-\r
-    /**\r
-     * Get the plugin of the specified name\r
-     * \r
-     * @param name\r
-     *            plugin name\r
-     * @return plugin instance or null if no corresponding plugin is found.\r
-     */\r
-    private SilkFunctionPlugin getPlugin(String name)\r
-    {\r
-        Class<SilkFunctionPlugin> pluginClass = getPluginTable().get(name);\r
-        if (pluginClass == null)\r
-            return null;\r
-\r
         try\r
         {\r
-            SilkFunctionPlugin pluginInstance = pluginClass.newInstance();\r
-            return pluginInstance;\r
-        }\r
-        catch (InstantiationException e)\r
-        {\r
-            _logger.warn(e);\r
-            return null;\r
-        }\r
-        catch (IllegalAccessException e)\r
-        {\r
-            _logger.warn(e);\r
-            return null;\r
-        }\r
-    }\r
+            TreeEvent e = null;\r
+            while (!hasParsingFinished && (e = eventQueue.poll(1, TimeUnit.MILLISECONDS)) == null)\r
+            {}\r
 \r
-    private Map<String, Class<SilkFunctionPlugin>> getPluginTable()\r
-    {\r
-        if (pluginTable == null)\r
-        {\r
-            pluginTable = new TreeMap<String, Class<SilkFunctionPlugin>>();\r
-            // load plugins \r
-            for (Class<SilkFunctionPlugin> each : FileResource.findClasses(SilkFunctionPlugin.class.getPackage(),\r
-                    SilkFunctionPlugin.class, SilkWalker.class.getClassLoader()))\r
-            {\r
-                String functionName = each.getSimpleName().toLowerCase();\r
-                _logger.trace("loaded " + functionName);\r
-                pluginTable.put(functionName, each);\r
-            }\r
-        }\r
-\r
-        return pluginTable;\r
-    }\r
+            if (e != null)\r
+                prefetchedEventQueue.addLast(e);\r
 \r
-    /**\r
-     * Information of the function (plugin) arguments (\r
-     * {@link SilkFunctionArgument}) described in the Class definition, which\r
-     * implements {@link SilkFunctionPlugin}.\r
-     * \r
-     * @author leo\r
-     * \r
-     */\r
-    private static class PluginField\r
-    {\r
-        Field field;\r
-        SilkFunctionArgument argInfo;\r
-\r
-        private PluginField(SilkFunctionArgument argInfo, Field field)\r
-        {\r
-            this.argInfo = argInfo;\r
-            this.field = field;\r
+            return hasNext();\r
         }\r
-    }\r
-\r
-    private static class PluginHolder\r
-    {\r
-        Class< ? extends SilkFunctionPlugin> pluginClass;\r
-        ArrayList<PluginField> argumentFieldList = new ArrayList<PluginField>();\r
-        Map<String, PluginField> keyValueFieldTable = new HashMap<String, PluginField>();\r
-\r
-        public PluginHolder(Class< ? extends SilkFunctionPlugin> pluginClass)\r
+        catch (InterruptedException e)\r
         {\r
-            this.pluginClass = pluginClass;\r
 \r
-            //ArrayList<SilkFunctionArgument> argDefs = new ArrayList<SilkFunctionArgument>();\r
-            for (Field eachField : pluginClass.getDeclaredFields())\r
-            {\r
-                SilkFunctionArgument argInfo = eachField.getAnnotation(SilkFunctionArgument.class);\r
-                if (argInfo != null)\r
-                {\r
-                    PluginField pf = new PluginField(argInfo, eachField);\r
-                    if (argInfo.name().equals(SilkFunctionArgument.NO_VALUE))\r
-                        argumentFieldList.add(pf);\r
-                    else\r
-                        keyValueFieldTable.put(argInfo.name(), pf);\r
-                }\r
-            }\r
-\r
-            // sort arguments in the order of their ordinal\r
-            Collections.sort(argumentFieldList, new Comparator<PluginField>() {\r
-                public int compare(PluginField o1, PluginField o2)\r
-                {\r
-                    return o1.argInfo.ordinal() - o2.argInfo.ordinal();\r
-                }\r
-            });\r
-\r
-        }\r
-\r
-        /**\r
-         * Bind function arguments to the plug-in instance\r
-         * \r
-         * @param plugin\r
-         *            the instance of the plug-in\r
-         * @param args\r
-         *            the function arguments\r
-         */\r
-        public void populate(SilkFunctionPlugin plugin, List<SilkFunctionArg> args)\r
-        {\r
-            int noNameArgCount = 0;\r
-            for (SilkFunctionArg eachArgument : args)\r
-            {\r
-                String argValue = eachArgument.getValue().toString();\r
-                try\r
-                {\r
-                    if (eachArgument.hasName())\r
-                    {\r
-                        // key value arg\r
-                        PluginField f = keyValueFieldTable.get(eachArgument.getName());\r
-                        if (f == null)\r
-                        {\r
-                            _logger.warn("unknown argument: " + eachArgument);\r
-                            continue;\r
-                        }\r
-                        ReflectionUtil.setFieldValue(plugin, f.field, argValue);\r
-                    }\r
-                    else\r
-                    {\r
-                        // unnamed argument\r
-                        // matching argument order\r
-                        if (noNameArgCount >= argumentFieldList.size())\r
-                        {\r
-                            _logger.warn(String.format("no corresponding field for the argument %s is found",\r
-                                    eachArgument));\r
-                            continue;\r
-                        }\r
-                        PluginField f = argumentFieldList.get(noNameArgCount);\r
-                        ReflectionUtil.setFieldValue(plugin, f.field, argValue);\r
-\r
-                        if (!TypeInfo.isCollection(f.field.getType()))\r
-                            noNameArgCount++;\r
-                    }\r
-                }\r
-                catch (XerialException e)\r
-                {\r
-                    _logger.error(e);\r
-                }\r
-\r
-            }\r
         }\r
 \r
-    }\r
-\r
-    /**\r
-     * Fill the plug-in argument fields with the given arguments\r
-     * \r
-     * @param plugin\r
-     *            plug-in instance.\r
-     * @param args\r
-     *            function arguments.\r
-     */\r
-    private static void populate(SilkFunctionPlugin plugin, List<SilkFunctionArg> args)\r
-    {\r
-        PluginHolder holder = new PluginHolder(plugin.getClass());\r
-        holder.populate(plugin, args);\r
+        return false;\r
     }\r
 \r
     public long getNumReadLine()\r