import java.io.InputStream;\r
import java.io.InputStreamReader;\r
import java.io.Reader;\r
-import java.lang.reflect.Field;\r
import java.net.URL;\r
-import java.util.ArrayList;\r
-import java.util.Collections;\r
-import java.util.Comparator;\r
-import java.util.HashMap;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.TreeMap;\r
+import java.util.concurrent.ArrayBlockingQueue;\r
+import java.util.concurrent.Callable;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.TimeUnit;\r
\r
-import org.xerial.core.XerialError;\r
-import org.xerial.core.XerialErrorCode;\r
import org.xerial.core.XerialException;\r
-import org.xerial.json.JSONArray;\r
-import org.xerial.json.JSONException;\r
-import org.xerial.json.JSONObject;\r
-import org.xerial.json.JSONUtil;\r
-import org.xerial.json.JSONValue;\r
-import org.xerial.json.JSONValueType;\r
-import org.xerial.silk.impl.SilkDataLine;\r
-import org.xerial.silk.impl.SilkFunction;\r
-import org.xerial.silk.impl.SilkFunctionArg;\r
-import org.xerial.silk.impl.SilkJSONValue;\r
-import org.xerial.silk.impl.SilkNode;\r
-import org.xerial.silk.impl.SilkNodeOccurrence;\r
-import org.xerial.silk.impl.SilkValue;\r
-import org.xerial.silk.plugin.SilkFunctionArgument;\r
-import org.xerial.silk.plugin.SilkFunctionPlugin;\r
import org.xerial.util.ArrayDeque;\r
-import org.xerial.util.FileResource;\r
-import org.xerial.util.StringUtil;\r
-import org.xerial.util.bean.TypeInfo;\r
import org.xerial.util.log.Logger;\r
-import org.xerial.util.reflect.ReflectionUtil;\r
import org.xerial.util.tree.TreeEvent;\r
+import org.xerial.util.tree.TreeEventHandler;\r
import org.xerial.util.tree.TreeStreamReader;\r
-import org.xerial.util.xml.impl.TreeEventQueue;\r
\r
/**\r
* {@link TreeStreamReader} implementation for the Silk data format.\r
{\r
private static Logger _logger = Logger.getLogger(SilkStreamReader.class);\r
\r
- private final SilkLinePullParser parser;\r
- private final SilkEnv parseContext;\r
- private TreeEventQueue eventQueue = new TreeEventQueue();\r
- private final ArrayDeque<TreeStreamReader> readerStack = new ArrayDeque<TreeStreamReader>();\r
+ private final SilkParser parser;\r
+ private final ArrayBlockingQueue<TreeEvent> eventQueue = new ArrayBlockingQueue<TreeEvent>(10000);\r
+ private final ArrayDeque<TreeEvent> prefetchedEventQueue = new ArrayDeque<TreeEvent>();\r
\r
private long numReadLine = 0;\r
\r
+ // for changing push-parser to pull parser\r
+ private final ExecutorService threadManager;\r
+\r
/**\r
* Creates a new reader with the specified input stream\r
* \r
*/\r
public SilkStreamReader(Reader input, SilkEnv env) throws IOException\r
{\r
- this.parser = new SilkLinePullParser(input);\r
- this.parseContext = env;\r
+ this.parser = new SilkParser(input, env);\r
+\r
+ this.threadManager = Executors.newFixedThreadPool(1);\r
+ threadManager.submit(new BackgroundParser());\r
+\r
}\r
\r
- /**\r
- * Concatenates the base path and the resource name\r
- * \r
- * @param resourceBasePath\r
- * @param resourceName\r
- * @return\r
- */\r
- private static String getResourcePath(String resourceBasePath, String resourceName)\r
+ private class BackgroundParser implements Callable<Void>\r
{\r
- String resourcePath = resourceBasePath;\r
- if (!resourcePath.endsWith("/"))\r
- resourcePath += "/";\r
- resourcePath += resourceName;\r
- return resourcePath;\r
+\r
+ public Void call() throws Exception\r
+ {\r
+ try\r
+ {\r
+ parser.parse(new TreeEventHandler() {\r
+ public void finish() throws Exception\r
+ {\r
+ hasParsingFinished = true;\r
+ }\r
+\r
+ public void init() throws Exception\r
+ {\r
+ hasParsingFinished = false;\r
+ }\r
+\r
+ public void leaveNode(String nodeName) throws Exception\r
+ {\r
+ eventQueue.put(TreeEvent.newLeaveEvent(nodeName));\r
+ }\r
+\r
+ public void text(String nodeName, String textDataFragment) throws Exception\r
+ {\r
+ eventQueue.put(TreeEvent.newTextEvent(nodeName, textDataFragment));\r
+ }\r
+\r
+ public void visitNode(String nodeName, String immediateNodeValue) throws Exception\r
+ {\r
+ eventQueue.put(TreeEvent.newVisitEvent(nodeName, immediateNodeValue));\r
+\r
+ }\r
+ });\r
+\r
+ return null;\r
+ }\r
+ finally\r
+ {\r
+ threadManager.shutdown();\r
+ }\r
+ }\r
+\r
}\r
\r
/**\r
*/\r
public SilkStreamReader(String resourceBasePath, String resourceName) throws IOException\r
{\r
- this.parser = new SilkLinePullParser(new BufferedReader(new InputStreamReader(SilkWalker.class\r
- .getResourceAsStream(getResourcePath(resourceBasePath, resourceName)))));\r
- this.parseContext = SilkEnv.newEnv(resourceBasePath);\r
+ this(new BufferedReader(new InputStreamReader(SilkWalker.class.getResourceAsStream(SilkParser.getResourcePath(\r
+ resourceBasePath, resourceName)))), SilkEnv.newEnv(resourceBasePath));\r
}\r
\r
/**\r
\r
public SilkStreamReader(URL resource, SilkEnv env) throws IOException\r
{\r
- String path = resource.toExternalForm();\r
- int fileNamePos = path.lastIndexOf("/");\r
- String resourceBasePath = fileNamePos > 0 ? path.substring(0, fileNamePos) : null;\r
-\r
- this.parser = new SilkLinePullParser(new BufferedReader(new InputStreamReader(resource.openStream())));\r
- this.parseContext = SilkEnv.newEnv(env, resourceBasePath);\r
+ this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser\r
+ .getResourceBasePath(resource)));\r
}\r
\r
public TreeEvent peekNext() throws XerialException\r
{\r
if (hasNext())\r
- return eventQueue.peekFirst();\r
+ return prefetchedEventQueue.getFirst();\r
else\r
return null;\r
}\r
public TreeEvent next() throws XerialException\r
{\r
if (hasNext())\r
- return getNextEvent();\r
- else\r
- return null;\r
- }\r
-\r
- /**\r
- * Enqueues a visit event.\r
- * \r
- * @param nodeName\r
- * @param immediateNodeValue\r
- * @throws XerialException\r
- */\r
- private void visit(String nodeName, String immediateNodeValue) throws XerialException\r
- {\r
- eventQueue.push(TreeEvent.newVisitEvent(nodeName, immediateNodeValue));\r
- }\r
-\r
- /**\r
- * Enqueues a leave event\r
- * \r
- * @param nodeName\r
- * @throws XerialException\r
- */\r
- private void leave(String nodeName) throws XerialException\r
- {\r
- eventQueue.push(TreeEvent.newLeaveEvent(nodeName));\r
- }\r
-\r
- /**\r
- * Enqueues a text event\r
- * \r
- * @param textFragment\r
- * @throws XerialException\r
- */\r
- private void text(String textFragment) throws XerialException\r
- {\r
- eventQueue.push(TreeEvent.newTextEvent(parseContext.getContextNode().getName(), textFragment));\r
- }\r
-\r
- /**\r
- * Closed pre-opened contexts up to the specified indent level\r
- * \r
- * @param newIndentLevel\r
- * @throws XerialException\r
- */\r
- private void closeContextUpTo(int newIndentLevel) throws XerialException\r
- {\r
- while (!parseContext.isContextNodeStackEmpty())\r
- {\r
- SilkContext context = parseContext.peekLastContext();\r
- SilkNode node = context.contextNode;\r
- if (node.getIndentLevel() >= newIndentLevel)\r
- {\r
- parseContext.popLastContext();\r
-\r
- if (parseContext.isAttributeOpen)\r
- {\r
- // close attribute \r
- SilkNode attribute = node.getChildNodes().get(parseContext.contextNodeAttributeCursor);\r
- leave(attribute.getName());\r
- leave(node.getName());\r
- parseContext.isAttributeOpen = false;\r
- }\r
-\r
- if (context.isOpen)\r
- {\r
- // close context\r
- String nodeName = node.getName();\r
- leave(nodeName);\r
- }\r
- }\r
- else\r
- return;\r
- }\r
- }\r
-\r
- /**\r
- * Opens a new context for the given node\r
- * \r
- * @param node\r
- * new context node\r
- * @param visitor\r
- * @throws XerialException\r
- */\r
- private void openContext(SilkNode node) throws XerialException\r
- {\r
- int indentLevel = node.getIndentLevel();\r
- if (indentLevel != SilkNode.NO_INDENT)\r
- indentLevel += parseContext.getIndentationOffset();\r
-\r
- closeContextUpTo(indentLevel);\r
- openContext_internal(node);\r
- }\r
-\r
- private void openContext_internal(SilkNode node) throws XerialException\r
- {\r
- if (node.getName() == null)\r
- {\r
- // no name nodes must hierarchically organize attribute nodes\r
- for (SilkNode eachChild : node.getChildNodes())\r
- {\r
- eachChild.setNodeIndent(node.getNodeIndent());\r
- openContext_internal(eachChild);\r
- }\r
- return;\r
- }\r
-\r
- SilkContext currentContext = new SilkContext(node, true);\r
- parseContext.pushContext(currentContext);\r
-\r
- SilkNodeOccurrence occurrence = node.getOccurrence();\r
- if (occurrence.isSchemaOnlyNode())\r
{\r
- currentContext.isOpen = false;\r
- // reset the attribute cursor\r
- parseContext.contextNodeAttributeCursor = 0;\r
- parseContext.isAttributeOpen = false;\r
- return; // do not invoke visit events\r
- }\r
-\r
- String nodeName = node.getName();\r
- SilkValue textValue = node.getValue();\r
-\r
- // process text values attached to the node\r
- if (textValue != null)\r
- {\r
- // When the text data is JSON, traverses the JSON data \r
- if (textValue.isJSON())\r
- {\r
-\r
- SilkJSONValue jsonValue = SilkJSONValue.class.cast(textValue);\r
- if (jsonValue.isObject())\r
- {\r
- visit(nodeName, null);\r
- JSONObject jsonObj = new JSONObject(jsonValue.getValue());\r
- walkJSONObject(jsonObj);\r
- }\r
- else\r
- {\r
- currentContext.isOpen = false;\r
- JSONArray jsonArray = new JSONArray(jsonValue.getValue());\r
- walkJSONAray(jsonArray, nodeName);\r
- }\r
- }\r
- else if (textValue.isFunction())\r
- {\r
- // evaluate the function \r
- visit(nodeName, null);\r
- SilkFunction function = SilkFunction.class.cast(textValue);\r
- evalFunction(function);\r
-\r
- return;\r
- }\r
- else\r
- {\r
- // Simple text value will be reported as it is.\r
- visit(nodeName, textValue.toString());\r
- }\r
+ TreeEvent e = prefetchedEventQueue.removeFirst();\r
+ return e;\r
}\r
else\r
- {\r
- if (occurrence == SilkNodeOccurrence.ZERO_OR_MORE)\r
- {\r
- // CSV data\r
- return; // do not invoke visit events\r
- }\r
-\r
- // Report a visit event without text value\r
- visit(nodeName, null);\r
- }\r
-\r
- // Traverse attribute nodes having text values. If no text value is specified for these attributes, \r
- // they are schema elements for the following DATA_LINE. \r
- for (SilkNode eachChild : node.getChildNodes())\r
- {\r
- if (eachChild.hasValue())\r
- {\r
- openContext(eachChild);\r
- }\r
- }\r
-\r
- }\r
-\r
- private static class FunctionReader implements TreeStreamReader\r
- {\r
- SilkFunctionPlugin plugin;\r
-\r
- public FunctionReader(SilkFunctionPlugin plugin)\r
- {\r
- this.plugin = plugin;\r
- }\r
-\r
- public TreeEvent peekNext() throws XerialException\r
- {\r
- return plugin.peekNext();\r
- }\r
-\r
- public TreeEvent next() throws XerialException\r
- {\r
- return plugin.next();\r
- }\r
- }\r
-\r
- /**\r
- * Evaluate the function\r
- * \r
- * @param function\r
- * @throws XerialException\r
- */\r
- private void evalFunction(SilkFunction function) throws XerialException\r
- {\r
- SilkFunctionPlugin plugin = getPlugin(function.getName());\r
- if (plugin == null)\r
- {\r
- _logger.error(String.format("plugin %s not found", function.getName()));\r
- return;\r
- }\r
- // fill the function argument to the plugin instance\r
- populate(plugin, function.getArgumentList());\r
-\r
- // evaluate the function\r
- SilkEnv env = parseContext.newEnvFor(function);\r
- plugin.init(env);\r
-\r
- readerStack.addLast(new FunctionReader(plugin));\r
+ return null;\r
}\r
\r
/**\r
* Has finished reading the stream?\r
*/\r
- private boolean hasFinished = false;\r
+ private volatile boolean hasParsingFinished = false;\r
+ private boolean hasPrefetchFinished = false;\r
\r
/**\r
* Is next event available?\r
*/\r
private boolean hasNext() throws XerialException\r
{\r
- if (!eventQueue.isEmpty())\r
+ if (!prefetchedEventQueue.isEmpty())\r
return true;\r
\r
- if (hasFinished)\r
- return false;\r
-\r
- while (!hasFinished && eventQueue.isEmpty())\r
- fillQueue();\r
-\r
- return hasNext();\r
- }\r
-\r
- /**\r
- * Retrieves the next event from the queue. If the event queue is empty,\r
- * fill the queue with the next event\r
- * \r
- * @return the next event.\r
- * @throws XerialException\r
- */\r
- private TreeEvent getNextEvent() throws XerialException\r
- {\r
- if (!eventQueue.isEmpty())\r
- return eventQueue.pop();\r
-\r
- if (hasFinished)\r
- throw new XerialError(XerialErrorCode.INVALID_STATE,\r
- "hasNext() value must be checked before calling getNextEvent()");\r
-\r
- fillQueue();\r
- return getNextEvent();\r
- }\r
-\r
- private void walkMicroFormatRoot(SilkNode schemaNode, JSONArray value) throws XerialException\r
- {\r
- // e.g., exon(start, name)\r
-\r
- if (schemaNode.hasManyOccurrences())\r
- {\r
- if (schemaNode.hasChildren())\r
- {\r
- // e.g., exon(start, name)*\r
- // multiple occurrences: [[start, end], [start, end], ... ] \r
- for (int i = 0; i < value.size(); i++)\r
- {\r
- JSONArray eachElement = value.getJSONArray(i);\r
- if (eachElement == null)\r
- continue;\r
-\r
- visit(schemaNode.getName(), null);\r
- int index = 0;\r
- for (SilkNode eachSubSchema : schemaNode.getChildNodes())\r
- {\r
- walkMicroFormatElement(eachSubSchema, eachElement.get(index++));\r
- }\r
- leave(schemaNode.getName());\r
- }\r
- }\r
- else\r
- {\r
- // e.g. QV*: [20, 50, 50]\r
- for (int i = 0; i < value.size(); i++)\r
- {\r
- visit(schemaNode.getName(), value.get(i).toString());\r
- leave(schemaNode.getName());\r
- }\r
- }\r
- }\r
- else\r
- {\r
- // [e1, e2, ...]\r
- visit(schemaNode.getName(), null);\r
- int index = 0;\r
- if (schemaNode.getChildNodes().size() != value.size())\r
- {\r
- throw new XerialException(XerialErrorCode.INVALID_INPUT, String.format(\r
- "data format doesn't match: schema=%s, value=%s", schemaNode, value));\r
- }\r
- for (SilkNode each : schemaNode.getChildNodes())\r
- {\r
- walkMicroFormatElement(each, value.get(index++));\r
- }\r
- leave(schemaNode.getName());\r
- }\r
- }\r
-\r
- private void walkMicroFormatElement(SilkNode schemaNode, JSONValue value) throws XerialException\r
- {\r
- if (schemaNode.hasChildren())\r
- {\r
- JSONArray array = value.getJSONArray();\r
- if (array != null)\r
- walkMicroFormatRoot(schemaNode, array);\r
- else\r
- throw new XerialException(XerialErrorCode.INVALID_INPUT, String.format(\r
- "data format doesn't match: schema=%s, value=%s", schemaNode, value));\r
- }\r
- else\r
- {\r
- visit(schemaNode.getName(), value.toString());\r
- leave(schemaNode.getName());\r
- }\r
- }\r
-\r
- private void evalDatalineColumn(SilkNode node, String columnData) throws XerialException\r
- {\r
- // 7600 lines/sec\r
-\r
- if (node.hasChildren())\r
- {\r
- JSONArray array = new JSONArray(columnData);\r
- walkMicroFormatRoot(node, array);\r
- return;\r
- }\r
-\r
- switch (node.getOccurrence())\r
- {\r
- case ZERO_OR_MORE:\r
- case ONE_OR_MORE:\r
- if (columnData.startsWith("["))\r
- {\r
- // micro-data format\r
-\r
- // 7900 lines/sec \r
-\r
- JSONArray array = new JSONArray(columnData);\r
- // 1400 lines/sec (ANTLR) 4200 lines/sec (JSONTokener)\r
-\r
- // 5233 lines/sec w/o traversing JSONArray\r
- // // dummy code \r
- // if (true)\r
- // {\r
- // visit(node.getName(), null);\r
- // leave(node.getName());\r
- // return;\r
- // }\r
- walkMicroFormatRoot(node, array);\r
- return;\r
- }\r
- else\r
- {\r
- String[] csv = columnData.split(",");\r
- for (String each : csv)\r
- {\r
- String value = each.trim();\r
- evalColumnData(node, value);\r
- }\r
- return;\r
- }\r
- default:\r
- evalColumnData(node, columnData);\r
- return;\r
- }\r
-\r
- }\r
-\r
- private void evalColumnData(SilkNode node, String columnData) throws XerialException\r
- {\r
- try\r
- {\r
- if (node.hasChildren())\r
- {\r
- // micro-data format\r
- JSONArray array = new JSONArray(columnData);\r
- walkMicroFormatRoot(node, array);\r
- return;\r
- }\r
-\r
- String dataType = node.getDataType();\r
- if (dataType != null && dataType.equalsIgnoreCase("json"))\r
- {\r
- JSONValue json = JSONUtil.parseJSON(columnData);\r
- if (json.getJSONObject() != null)\r
- {\r
- if (node.getName().equals("_")) // no name object\r
- {\r
- walkJSONValue(json, node.getName());\r
- }\r
- else\r
- {\r
- visit(node.getName(), null);\r
- walkJSONValue(json, node.getName());\r
- leave(node.getName());\r
- }\r
- }\r
- else\r
- walkJSONValue(json, node.getName());\r
- }\r
- else\r
- {\r
- visit(node.getName(), StringUtil.unquote(columnData));\r
- leave(node.getName());\r
- }\r
- }\r
- catch (JSONException e)\r
- {\r
- throw new XerialException(e.getErrorCode(), String.format("line=%d: %s", parser.getNumReadLine(), e\r
- .getMessage()));\r
- }\r
-\r
- }\r
-\r
- /**\r
- * Fill the queue by retrieving the next event from the pull parser.\r
- * \r
- * @throws XerialException\r
- */\r
- private void fillQueue() throws XerialException\r
- {\r
- if (!readerStack.isEmpty())\r
- {\r
- TreeEvent e = readerStack.peekLast().next();\r
- if (e == null)\r
- {\r
- readerStack.removeLast();\r
- fillQueue();\r
- }\r
- else\r
- {\r
- eventQueue.push(e);\r
- return;\r
- }\r
- }\r
-\r
- if (!parser.hasNext())\r
- {\r
- // no more input data\r
- closeContextUpTo(parseContext.getIndentationOffset());\r
- hasFinished = true;\r
- return;\r
- }\r
-\r
- SilkEvent currentEvent = parser.next();\r
-\r
- // update the line count\r
- numReadLine = parser.getNumReadLine();\r
-\r
- if (_logger.isTraceEnabled())\r
- {\r
- _logger.trace("stack: " + parseContext.getContextNodeStack());\r
- _logger.trace(currentEvent);\r
- }\r
-\r
- switch (currentEvent.getType())\r
- {\r
- case NODE:\r
- // push context node\r
- SilkNode newContextNode = SilkNode.class.cast(currentEvent.getElement());\r
- openContext(newContextNode);\r
- break;\r
- case FUNCTION:\r
- SilkFunction function = SilkFunction.class.cast(currentEvent.getElement());\r
- evalFunction(function);\r
- break;\r
- case DATA_LINE:\r
- // pop the context stack until finding a node for stream data node occurrence\r
- while (!parseContext.isContextNodeStackEmpty())\r
- {\r
- SilkContext context = parseContext.peekLastContext();\r
- SilkNode node = context.contextNode;\r
- if (!node.getOccurrence().isFollowedByStreamData())\r
- {\r
- parseContext.popLastContext();\r
- if (context.isOpen)\r
- leave(node.getName());\r
- }\r
- else\r
- break;\r
- }\r
-\r
- if (parseContext.isContextNodeStackEmpty())\r
- {\r
- // use default column names(c1, c2, ...) \r
- SilkDataLine line = SilkDataLine.class.cast(currentEvent.getElement());\r
- String[] columns = line.getDataLine().trim().split("\t");\r
- int index = 1;\r
- visit("row", null);\r
- for (String each : columns)\r
- {\r
- String columnName = String.format("c%d", index++);\r
-\r
- // TODO use evalColumnData\r
- visit(columnName, each);\r
- leave(columnName);\r
- }\r
- leave("row");\r
- }\r
- else\r
- {\r
- SilkContext context = parseContext.peekLastContext();\r
- SilkNode schema = context.contextNode;\r
- SilkDataLine line = SilkDataLine.class.cast(currentEvent.getElement());\r
- switch (schema.getOccurrence())\r
- {\r
- case SEQUENCE:\r
- text(line.getDataLine());\r
- break;\r
- case ZERO_OR_MORE:\r
- // CSV data\r
- {\r
- evalDatalineColumn(schema, line.getDataLine());\r
- }\r
- break;\r
- case TABBED_SEQUENCE:\r
- {\r
- String[] columns = line.getDataLine().trim().split("\t");\r
- int columnIndex = 0;\r
- visit(schema.getName(), schema.hasValue() ? schema.getValue().toString() : null);\r
- for (int i = 0; i < schema.getChildNodes().size(); i++)\r
- {\r
- SilkNode child = schema.getChildNodes().get(i);\r
- if (child.hasValue())\r
- {\r
- // output the default value for the column \r
- evalDatalineColumn(child, child.getValue().toString());\r
- }\r
- else\r
- {\r
- if (columnIndex < columns.length)\r
- {\r
- String columnData = columns[columnIndex++];\r
- if (columnData.length() > 0)\r
- evalDatalineColumn(child, columnData);\r
- }\r
- }\r
- }\r
- leave(schema.getName());\r
- break;\r
- }\r
- case MULTILINE_SEQUENCE:\r
- {\r
- int cursor = parseContext.contextNodeAttributeCursor;\r
-\r
- if (cursor >= schema.getChildNodes().size())\r
- break;\r
-\r
- SilkNode attributeNode = schema.getChildNodes().get(cursor);\r
- if (cursor == 0 && !parseContext.isAttributeOpen)\r
- {\r
- visit(schema.getName(), schema.hasValue() ? schema.getValue().toString() : null);\r
- }\r
- if (!parseContext.isAttributeOpen)\r
- {\r
- if (attributeNode.hasValue())\r
- visit(attributeNode.getName(), attributeNode.getValue().toString());\r
- else\r
- visit(attributeNode.getName(), null);\r
-\r
- parseContext.isAttributeOpen = true;\r
- }\r
- text(line.getDataLine().trim());\r
- break;\r
- }\r
- }\r
- }\r
- break;\r
- case MULTILINE_ENTRY_SEPARATOR: // >>\r
- {\r
- SilkContext context = parseContext.peekLastContext();\r
- SilkNode schema = context.contextNode;\r
- if (parseContext.isAttributeOpen)\r
- {\r
- SilkNode attributeNode = schema.getChildNodes().get(parseContext.contextNodeAttributeCursor);\r
- leave(attributeNode.getName());\r
- }\r
- leave(schema.getName());\r
- // reset\r
- parseContext.contextNodeAttributeCursor = 0;\r
- parseContext.isAttributeOpen = false;\r
- break;\r
- }\r
- case MULTILINE_SEPARATOR: // --\r
- {\r
- SilkContext context = parseContext.peekLastContext();\r
- SilkNode schema = context.contextNode;\r
- if (parseContext.isAttributeOpen)\r
- {\r
- SilkNode attributeNode = schema.getChildNodes().get(parseContext.contextNodeAttributeCursor);\r
- leave(attributeNode.getName());\r
- }\r
- parseContext.contextNodeAttributeCursor++;\r
- parseContext.isAttributeOpen = false;\r
- break;\r
- }\r
- case BLANK_LINE:\r
- break;\r
-\r
- case PREAMBLE:\r
- break;\r
+ assert (prefetchedEventQueue.isEmpty());\r
\r
- }\r
-\r
- }\r
-\r
- private void walkJSONAray(JSONArray jsonArray, String parentNodeName) throws XerialException\r
- {\r
- for (JSONValue each : jsonArray)\r
- {\r
- walkJSONValue(each, parentNodeName);\r
- }\r
- }\r
-\r
- private void walkJSONObject(JSONObject jsonObj) throws XerialException\r
- {\r
- for (String key : jsonObj.keys())\r
- {\r
- JSONValue val = jsonObj.get(key);\r
- walkJSONValue(val, key);\r
- }\r
- }\r
+ if (hasPrefetchFinished)\r
+ return false;\r
\r
- private void walkJSONValue(JSONValue value, String parentNodeName) throws XerialException\r
- {\r
- JSONValueType type = value.getValueType();\r
- switch (type)\r
+ if (hasParsingFinished)\r
{\r
- case Array:\r
- walkJSONAray(value.getJSONArray(), parentNodeName);\r
- break;\r
- case Object:\r
- walkJSONObject(value.getJSONObject());\r
- break;\r
- case Boolean:\r
- visit(parentNodeName, value.toString());\r
- leave(parentNodeName);\r
- break;\r
- case Double:\r
- visit(parentNodeName, value.toString());\r
- leave(parentNodeName);\r
- break;\r
- case Integer:\r
- visit(parentNodeName, value.toString());\r
- leave(parentNodeName);\r
- break;\r
- case Null:\r
- visit(parentNodeName, value.toString());\r
- leave(parentNodeName);\r
- break;\r
- case String:\r
- visit(parentNodeName, value.toString());\r
- leave(parentNodeName);\r
- break;\r
+ int count = eventQueue.drainTo(prefetchedEventQueue);\r
+ //_logger.debug("prefetch: " + count);\r
+ //_logger.debug("eventQueue size: " + eventQueue.size());\r
+ hasPrefetchFinished = true;\r
+ return hasNext();\r
}\r
\r
- }\r
-\r
- /**\r
- * Plugin holder\r
- */\r
- private static Map<String, Class<SilkFunctionPlugin>> pluginTable = null;\r
-\r
- /**\r
- * Get the plugin of the specified name\r
- * \r
- * @param name\r
- * plugin name\r
- * @return plugin instance or null if no corresponding plugin is found.\r
- */\r
- private SilkFunctionPlugin getPlugin(String name)\r
- {\r
- Class<SilkFunctionPlugin> pluginClass = getPluginTable().get(name);\r
- if (pluginClass == null)\r
- return null;\r
-\r
try\r
{\r
- SilkFunctionPlugin pluginInstance = pluginClass.newInstance();\r
- return pluginInstance;\r
- }\r
- catch (InstantiationException e)\r
- {\r
- _logger.warn(e);\r
- return null;\r
- }\r
- catch (IllegalAccessException e)\r
- {\r
- _logger.warn(e);\r
- return null;\r
- }\r
- }\r
+ TreeEvent e = null;\r
+ while (!hasParsingFinished && (e = eventQueue.poll(1, TimeUnit.MILLISECONDS)) == null)\r
+ {}\r
\r
- private Map<String, Class<SilkFunctionPlugin>> getPluginTable()\r
- {\r
- if (pluginTable == null)\r
- {\r
- pluginTable = new TreeMap<String, Class<SilkFunctionPlugin>>();\r
- // load plugins \r
- for (Class<SilkFunctionPlugin> each : FileResource.findClasses(SilkFunctionPlugin.class.getPackage(),\r
- SilkFunctionPlugin.class, SilkWalker.class.getClassLoader()))\r
- {\r
- String functionName = each.getSimpleName().toLowerCase();\r
- _logger.trace("loaded " + functionName);\r
- pluginTable.put(functionName, each);\r
- }\r
- }\r
-\r
- return pluginTable;\r
- }\r
+ if (e != null)\r
+ prefetchedEventQueue.addLast(e);\r
\r
- /**\r
- * Information of the function (plugin) arguments (\r
- * {@link SilkFunctionArgument}) described in the Class definition, which\r
- * implements {@link SilkFunctionPlugin}.\r
- * \r
- * @author leo\r
- * \r
- */\r
- private static class PluginField\r
- {\r
- Field field;\r
- SilkFunctionArgument argInfo;\r
-\r
- private PluginField(SilkFunctionArgument argInfo, Field field)\r
- {\r
- this.argInfo = argInfo;\r
- this.field = field;\r
+ return hasNext();\r
}\r
- }\r
-\r
- private static class PluginHolder\r
- {\r
- Class< ? extends SilkFunctionPlugin> pluginClass;\r
- ArrayList<PluginField> argumentFieldList = new ArrayList<PluginField>();\r
- Map<String, PluginField> keyValueFieldTable = new HashMap<String, PluginField>();\r
-\r
- public PluginHolder(Class< ? extends SilkFunctionPlugin> pluginClass)\r
+ catch (InterruptedException e)\r
{\r
- this.pluginClass = pluginClass;\r
\r
- //ArrayList<SilkFunctionArgument> argDefs = new ArrayList<SilkFunctionArgument>();\r
- for (Field eachField : pluginClass.getDeclaredFields())\r
- {\r
- SilkFunctionArgument argInfo = eachField.getAnnotation(SilkFunctionArgument.class);\r
- if (argInfo != null)\r
- {\r
- PluginField pf = new PluginField(argInfo, eachField);\r
- if (argInfo.name().equals(SilkFunctionArgument.NO_VALUE))\r
- argumentFieldList.add(pf);\r
- else\r
- keyValueFieldTable.put(argInfo.name(), pf);\r
- }\r
- }\r
-\r
- // sort arguments in the order of their ordinal\r
- Collections.sort(argumentFieldList, new Comparator<PluginField>() {\r
- public int compare(PluginField o1, PluginField o2)\r
- {\r
- return o1.argInfo.ordinal() - o2.argInfo.ordinal();\r
- }\r
- });\r
-\r
- }\r
-\r
- /**\r
- * Bind function arguments to the plug-in instance\r
- * \r
- * @param plugin\r
- * the instance of the plug-in\r
- * @param args\r
- * the function arguments\r
- */\r
- public void populate(SilkFunctionPlugin plugin, List<SilkFunctionArg> args)\r
- {\r
- int noNameArgCount = 0;\r
- for (SilkFunctionArg eachArgument : args)\r
- {\r
- String argValue = eachArgument.getValue().toString();\r
- try\r
- {\r
- if (eachArgument.hasName())\r
- {\r
- // key value arg\r
- PluginField f = keyValueFieldTable.get(eachArgument.getName());\r
- if (f == null)\r
- {\r
- _logger.warn("unknown argument: " + eachArgument);\r
- continue;\r
- }\r
- ReflectionUtil.setFieldValue(plugin, f.field, argValue);\r
- }\r
- else\r
- {\r
- // unnamed argument\r
- // matching argument order\r
- if (noNameArgCount >= argumentFieldList.size())\r
- {\r
- _logger.warn(String.format("no corresponding field for the argument %s is found",\r
- eachArgument));\r
- continue;\r
- }\r
- PluginField f = argumentFieldList.get(noNameArgCount);\r
- ReflectionUtil.setFieldValue(plugin, f.field, argValue);\r
-\r
- if (!TypeInfo.isCollection(f.field.getType()))\r
- noNameArgCount++;\r
- }\r
- }\r
- catch (XerialException e)\r
- {\r
- _logger.error(e);\r
- }\r
-\r
- }\r
}\r
\r
- }\r
-\r
- /**\r
- * Fill the plug-in argument fields with the given arguments\r
- * \r
- * @param plugin\r
- * plug-in instance.\r
- * @param args\r
- * function arguments.\r
- */\r
- private static void populate(SilkFunctionPlugin plugin, List<SilkFunctionArg> args)\r
- {\r
- PluginHolder holder = new PluginHolder(plugin.getClass());\r
- holder.populate(plugin, args);\r
+ return false;\r
}\r
\r
public long getNumReadLine()\r