From: leo Date: Fri, 5 Jun 2009 05:58:53 +0000 (+0000) Subject: changed the implementation of SilkStreamReader: X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=ea7ac2830e7464c492faf656402ac73c4e34de9e;p=xerial%2Fxerial-core.git changed the implementation of SilkStreamReader: using push -> pull style conversion git-svn-id: http://www.xerial.org/svn/project/XerialJ/trunk/xerial-core@3361 ae02f08e-27ec-0310-ae8c-8ba02fe2eafd --- diff --git a/src/main/java/org/xerial/silk/SilkLineFastParser.java b/src/main/java/org/xerial/silk/SilkLineFastParser.java index 1b42e24..7e1d85c 100644 --- a/src/main/java/org/xerial/silk/SilkLineFastParser.java +++ b/src/main/java/org/xerial/silk/SilkLineFastParser.java @@ -47,6 +47,8 @@ import org.xerial.util.log.Logger; /** * Parsing Silk using threads. * + * TODO fix a bug that does not properly report some nodes, for some timing + * * @author leo * */ diff --git a/src/main/java/org/xerial/silk/SilkParser.java b/src/main/java/org/xerial/silk/SilkParser.java index fb99f06..9c5bee2 100644 --- a/src/main/java/org/xerial/silk/SilkParser.java +++ b/src/main/java/org/xerial/silk/SilkParser.java @@ -78,7 +78,7 @@ public class SilkParser implements SilkEventHandler { private static Logger _logger = Logger.getLogger(SilkStreamReader.class); - private final SilkLineFastParser parser; + private final SilkLinePushParser parser; private final SilkEnv parseContext; private TreeEventQueue eventQueue = new TreeEventQueue(); private final ArrayDeque readerStack = new ArrayDeque(); @@ -105,11 +105,9 @@ public class SilkParser implements SilkEventHandler * @param env * @throws IOException */ - private SilkParser(Reader input, SilkEnv env) throws IOException + public SilkParser(Reader input, SilkEnv env) throws IOException { - this.config = new SilkParserConfig(); - this.parser = new SilkLineFastParser(input); - this.parseContext = env; + this(input, env, new SilkParserConfig()); } /** @@ -135,13 +133,22 @@ public class SilkParser implements SilkEventHandler public SilkParser(URL resource, SilkEnv env, SilkParserConfig config) throws IOException { + this(new InputStreamReader(resource.openStream()), SilkEnv.newEnv(env, getResourceBasePath(resource)), config); + } + + public SilkParser(Reader input, SilkEnv env, SilkParserConfig config) throws IOException + { this.config = config; + this.parser = new SilkLinePushParser(input); + this.parseContext = env; + } + + static String getResourceBasePath(URL resource) + { String path = resource.toExternalForm(); int fileNamePos = path.lastIndexOf("/"); String resourceBasePath = fileNamePos > 0 ? path.substring(0, fileNamePos) : null; - - this.parser = new SilkLineFastParser(new InputStreamReader(resource.openStream()), config); - this.parseContext = SilkEnv.newEnv(env, resourceBasePath); + return resourceBasePath; } /** @@ -151,7 +158,7 @@ public class SilkParser implements SilkEventHandler * @param resourceName * @return */ - private static String getResourcePath(String resourceBasePath, String resourceName) + static String getResourcePath(String resourceBasePath, String resourceName) { String resourcePath = resourceBasePath; if (!resourcePath.endsWith("/")) @@ -315,7 +322,7 @@ public class SilkParser implements SilkEventHandler { if (columnIndex < columns.length) { - String columnData = columns[columnIndex++]; + String columnData = columns[columnIndex++].trim(); if (columnData.length() > 0) evalDatalineColumn(child, columnData); } @@ -685,7 +692,7 @@ public class SilkParser implements SilkEventHandler } else { - String[] csv = tabSplit.split(columnData, 0); + String[] csv = commaSplit.split(columnData, 0); for (String each : csv) { String value = each.trim(); diff --git a/src/main/java/org/xerial/silk/SilkStreamReader.java b/src/main/java/org/xerial/silk/SilkStreamReader.java index dc46a3f..e9a02b7 100644 --- a/src/main/java/org/xerial/silk/SilkStreamReader.java +++ b/src/main/java/org/xerial/silk/SilkStreamReader.java @@ -29,43 +29,19 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; -import java.lang.reflect.Field; import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; -import org.xerial.core.XerialError; -import org.xerial.core.XerialErrorCode; import org.xerial.core.XerialException; -import org.xerial.json.JSONArray; -import org.xerial.json.JSONException; -import org.xerial.json.JSONObject; -import org.xerial.json.JSONUtil; -import org.xerial.json.JSONValue; -import org.xerial.json.JSONValueType; -import org.xerial.silk.impl.SilkDataLine; -import org.xerial.silk.impl.SilkFunction; -import org.xerial.silk.impl.SilkFunctionArg; -import org.xerial.silk.impl.SilkJSONValue; -import org.xerial.silk.impl.SilkNode; -import org.xerial.silk.impl.SilkNodeOccurrence; -import org.xerial.silk.impl.SilkValue; -import org.xerial.silk.plugin.SilkFunctionArgument; -import org.xerial.silk.plugin.SilkFunctionPlugin; import org.xerial.util.ArrayDeque; -import org.xerial.util.FileResource; -import org.xerial.util.StringUtil; -import org.xerial.util.bean.TypeInfo; import org.xerial.util.log.Logger; -import org.xerial.util.reflect.ReflectionUtil; import org.xerial.util.tree.TreeEvent; +import org.xerial.util.tree.TreeEventHandler; import org.xerial.util.tree.TreeStreamReader; -import org.xerial.util.xml.impl.TreeEventQueue; /** * {@link TreeStreamReader} implementation for the Silk data format. @@ -77,13 +53,15 @@ public class SilkStreamReader implements TreeStreamReader { private static Logger _logger = Logger.getLogger(SilkStreamReader.class); - private final SilkLinePullParser parser; - private final SilkEnv parseContext; - private TreeEventQueue eventQueue = new TreeEventQueue(); - private final ArrayDeque readerStack = new ArrayDeque(); + private final SilkParser parser; + private final ArrayBlockingQueue eventQueue = new ArrayBlockingQueue(10000); + private final ArrayDeque prefetchedEventQueue = new ArrayDeque(); private long numReadLine = 0; + // for changing push-parser to pull parser + private final ExecutorService threadManager; + /** * Creates a new reader with the specified input stream * @@ -115,24 +93,56 @@ public class SilkStreamReader implements TreeStreamReader */ public SilkStreamReader(Reader input, SilkEnv env) throws IOException { - this.parser = new SilkLinePullParser(input); - this.parseContext = env; + this.parser = new SilkParser(input, env); + + this.threadManager = Executors.newFixedThreadPool(1); + threadManager.submit(new BackgroundParser()); + } - /** - * Concatenates the base path and the resource name - * - * @param resourceBasePath - * @param resourceName - * @return - */ - private static String getResourcePath(String resourceBasePath, String resourceName) + private class BackgroundParser implements Callable { - String resourcePath = resourceBasePath; - if (!resourcePath.endsWith("/")) - resourcePath += "/"; - resourcePath += resourceName; - return resourcePath; + + public Void call() throws Exception + { + try + { + parser.parse(new TreeEventHandler() { + public void finish() throws Exception + { + hasParsingFinished = true; + } + + public void init() throws Exception + { + hasParsingFinished = false; + } + + public void leaveNode(String nodeName) throws Exception + { + eventQueue.put(TreeEvent.newLeaveEvent(nodeName)); + } + + public void text(String nodeName, String textDataFragment) throws Exception + { + eventQueue.put(TreeEvent.newTextEvent(nodeName, textDataFragment)); + } + + public void visitNode(String nodeName, String immediateNodeValue) throws Exception + { + eventQueue.put(TreeEvent.newVisitEvent(nodeName, immediateNodeValue)); + + } + }); + + return null; + } + finally + { + threadManager.shutdown(); + } + } + } /** @@ -144,9 +154,8 @@ public class SilkStreamReader implements TreeStreamReader */ public SilkStreamReader(String resourceBasePath, String resourceName) throws IOException { - this.parser = new SilkLinePullParser(new BufferedReader(new InputStreamReader(SilkWalker.class - .getResourceAsStream(getResourcePath(resourceBasePath, resourceName))))); - this.parseContext = SilkEnv.newEnv(resourceBasePath); + this(new BufferedReader(new InputStreamReader(SilkWalker.class.getResourceAsStream(SilkParser.getResourcePath( + resourceBasePath, resourceName)))), SilkEnv.newEnv(resourceBasePath)); } /** @@ -162,18 +171,14 @@ public class SilkStreamReader implements TreeStreamReader public SilkStreamReader(URL resource, SilkEnv env) throws IOException { - String path = resource.toExternalForm(); - int fileNamePos = path.lastIndexOf("/"); - String resourceBasePath = fileNamePos > 0 ? path.substring(0, fileNamePos) : null; - - this.parser = new SilkLinePullParser(new BufferedReader(new InputStreamReader(resource.openStream()))); - this.parseContext = SilkEnv.newEnv(env, resourceBasePath); + this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env, SilkParser + .getResourceBasePath(resource))); } public TreeEvent peekNext() throws XerialException { if (hasNext()) - return eventQueue.peekFirst(); + return prefetchedEventQueue.getFirst(); else return null; } @@ -181,237 +186,19 @@ public class SilkStreamReader implements TreeStreamReader public TreeEvent next() throws XerialException { if (hasNext()) - return getNextEvent(); - else - return null; - } - - /** - * Enqueues a visit event. - * - * @param nodeName - * @param immediateNodeValue - * @throws XerialException - */ - private void visit(String nodeName, String immediateNodeValue) throws XerialException - { - eventQueue.push(TreeEvent.newVisitEvent(nodeName, immediateNodeValue)); - } - - /** - * Enqueues a leave event - * - * @param nodeName - * @throws XerialException - */ - private void leave(String nodeName) throws XerialException - { - eventQueue.push(TreeEvent.newLeaveEvent(nodeName)); - } - - /** - * Enqueues a text event - * - * @param textFragment - * @throws XerialException - */ - private void text(String textFragment) throws XerialException - { - eventQueue.push(TreeEvent.newTextEvent(parseContext.getContextNode().getName(), textFragment)); - } - - /** - * Closed pre-opened contexts up to the specified indent level - * - * @param newIndentLevel - * @throws XerialException - */ - private void closeContextUpTo(int newIndentLevel) throws XerialException - { - while (!parseContext.isContextNodeStackEmpty()) - { - SilkContext context = parseContext.peekLastContext(); - SilkNode node = context.contextNode; - if (node.getIndentLevel() >= newIndentLevel) - { - parseContext.popLastContext(); - - if (parseContext.isAttributeOpen) - { - // close attribute - SilkNode attribute = node.getChildNodes().get(parseContext.contextNodeAttributeCursor); - leave(attribute.getName()); - leave(node.getName()); - parseContext.isAttributeOpen = false; - } - - if (context.isOpen) - { - // close context - String nodeName = node.getName(); - leave(nodeName); - } - } - else - return; - } - } - - /** - * Opens a new context for the given node - * - * @param node - * new context node - * @param visitor - * @throws XerialException - */ - private void openContext(SilkNode node) throws XerialException - { - int indentLevel = node.getIndentLevel(); - if (indentLevel != SilkNode.NO_INDENT) - indentLevel += parseContext.getIndentationOffset(); - - closeContextUpTo(indentLevel); - openContext_internal(node); - } - - private void openContext_internal(SilkNode node) throws XerialException - { - if (node.getName() == null) - { - // no name nodes must hierarchically organize attribute nodes - for (SilkNode eachChild : node.getChildNodes()) - { - eachChild.setNodeIndent(node.getNodeIndent()); - openContext_internal(eachChild); - } - return; - } - - SilkContext currentContext = new SilkContext(node, true); - parseContext.pushContext(currentContext); - - SilkNodeOccurrence occurrence = node.getOccurrence(); - if (occurrence.isSchemaOnlyNode()) { - currentContext.isOpen = false; - // reset the attribute cursor - parseContext.contextNodeAttributeCursor = 0; - parseContext.isAttributeOpen = false; - return; // do not invoke visit events - } - - String nodeName = node.getName(); - SilkValue textValue = node.getValue(); - - // process text values attached to the node - if (textValue != null) - { - // When the text data is JSON, traverses the JSON data - if (textValue.isJSON()) - { - - SilkJSONValue jsonValue = SilkJSONValue.class.cast(textValue); - if (jsonValue.isObject()) - { - visit(nodeName, null); - JSONObject jsonObj = new JSONObject(jsonValue.getValue()); - walkJSONObject(jsonObj); - } - else - { - currentContext.isOpen = false; - JSONArray jsonArray = new JSONArray(jsonValue.getValue()); - walkJSONAray(jsonArray, nodeName); - } - } - else if (textValue.isFunction()) - { - // evaluate the function - visit(nodeName, null); - SilkFunction function = SilkFunction.class.cast(textValue); - evalFunction(function); - - return; - } - else - { - // Simple text value will be reported as it is. - visit(nodeName, textValue.toString()); - } + TreeEvent e = prefetchedEventQueue.removeFirst(); + return e; } else - { - if (occurrence == SilkNodeOccurrence.ZERO_OR_MORE) - { - // CSV data - return; // do not invoke visit events - } - - // Report a visit event without text value - visit(nodeName, null); - } - - // Traverse attribute nodes having text values. If no text value is specified for these attributes, - // they are schema elements for the following DATA_LINE. - for (SilkNode eachChild : node.getChildNodes()) - { - if (eachChild.hasValue()) - { - openContext(eachChild); - } - } - - } - - private static class FunctionReader implements TreeStreamReader - { - SilkFunctionPlugin plugin; - - public FunctionReader(SilkFunctionPlugin plugin) - { - this.plugin = plugin; - } - - public TreeEvent peekNext() throws XerialException - { - return plugin.peekNext(); - } - - public TreeEvent next() throws XerialException - { - return plugin.next(); - } - } - - /** - * Evaluate the function - * - * @param function - * @throws XerialException - */ - private void evalFunction(SilkFunction function) throws XerialException - { - SilkFunctionPlugin plugin = getPlugin(function.getName()); - if (plugin == null) - { - _logger.error(String.format("plugin %s not found", function.getName())); - return; - } - // fill the function argument to the plugin instance - populate(plugin, function.getArgumentList()); - - // evaluate the function - SilkEnv env = parseContext.newEnvFor(function); - plugin.init(env); - - readerStack.addLast(new FunctionReader(plugin)); + return null; } /** * Has finished reading the stream? */ - private boolean hasFinished = false; + private volatile boolean hasParsingFinished = false; + private boolean hasPrefetchFinished = false; /** * Is next event available? @@ -421,624 +208,40 @@ public class SilkStreamReader implements TreeStreamReader */ private boolean hasNext() throws XerialException { - if (!eventQueue.isEmpty()) + if (!prefetchedEventQueue.isEmpty()) return true; - if (hasFinished) - return false; - - while (!hasFinished && eventQueue.isEmpty()) - fillQueue(); - - return hasNext(); - } - - /** - * Retrieves the next event from the queue. If the event queue is empty, - * fill the queue with the next event - * - * @return the next event. - * @throws XerialException - */ - private TreeEvent getNextEvent() throws XerialException - { - if (!eventQueue.isEmpty()) - return eventQueue.pop(); - - if (hasFinished) - throw new XerialError(XerialErrorCode.INVALID_STATE, - "hasNext() value must be checked before calling getNextEvent()"); - - fillQueue(); - return getNextEvent(); - } - - private void walkMicroFormatRoot(SilkNode schemaNode, JSONArray value) throws XerialException - { - // e.g., exon(start, name) - - if (schemaNode.hasManyOccurrences()) - { - if (schemaNode.hasChildren()) - { - // e.g., exon(start, name)* - // multiple occurrences: [[start, end], [start, end], ... ] - for (int i = 0; i < value.size(); i++) - { - JSONArray eachElement = value.getJSONArray(i); - if (eachElement == null) - continue; - - visit(schemaNode.getName(), null); - int index = 0; - for (SilkNode eachSubSchema : schemaNode.getChildNodes()) - { - walkMicroFormatElement(eachSubSchema, eachElement.get(index++)); - } - leave(schemaNode.getName()); - } - } - else - { - // e.g. QV*: [20, 50, 50] - for (int i = 0; i < value.size(); i++) - { - visit(schemaNode.getName(), value.get(i).toString()); - leave(schemaNode.getName()); - } - } - } - else - { - // [e1, e2, ...] - visit(schemaNode.getName(), null); - int index = 0; - if (schemaNode.getChildNodes().size() != value.size()) - { - throw new XerialException(XerialErrorCode.INVALID_INPUT, String.format( - "data format doesn't match: schema=%s, value=%s", schemaNode, value)); - } - for (SilkNode each : schemaNode.getChildNodes()) - { - walkMicroFormatElement(each, value.get(index++)); - } - leave(schemaNode.getName()); - } - } - - private void walkMicroFormatElement(SilkNode schemaNode, JSONValue value) throws XerialException - { - if (schemaNode.hasChildren()) - { - JSONArray array = value.getJSONArray(); - if (array != null) - walkMicroFormatRoot(schemaNode, array); - else - throw new XerialException(XerialErrorCode.INVALID_INPUT, String.format( - "data format doesn't match: schema=%s, value=%s", schemaNode, value)); - } - else - { - visit(schemaNode.getName(), value.toString()); - leave(schemaNode.getName()); - } - } - - private void evalDatalineColumn(SilkNode node, String columnData) throws XerialException - { - // 7600 lines/sec - - if (node.hasChildren()) - { - JSONArray array = new JSONArray(columnData); - walkMicroFormatRoot(node, array); - return; - } - - switch (node.getOccurrence()) - { - case ZERO_OR_MORE: - case ONE_OR_MORE: - if (columnData.startsWith("[")) - { - // micro-data format - - // 7900 lines/sec - - JSONArray array = new JSONArray(columnData); - // 1400 lines/sec (ANTLR) 4200 lines/sec (JSONTokener) - - // 5233 lines/sec w/o traversing JSONArray - // // dummy code - // if (true) - // { - // visit(node.getName(), null); - // leave(node.getName()); - // return; - // } - walkMicroFormatRoot(node, array); - return; - } - else - { - String[] csv = columnData.split(","); - for (String each : csv) - { - String value = each.trim(); - evalColumnData(node, value); - } - return; - } - default: - evalColumnData(node, columnData); - return; - } - - } - - private void evalColumnData(SilkNode node, String columnData) throws XerialException - { - try - { - if (node.hasChildren()) - { - // micro-data format - JSONArray array = new JSONArray(columnData); - walkMicroFormatRoot(node, array); - return; - } - - String dataType = node.getDataType(); - if (dataType != null && dataType.equalsIgnoreCase("json")) - { - JSONValue json = JSONUtil.parseJSON(columnData); - if (json.getJSONObject() != null) - { - if (node.getName().equals("_")) // no name object - { - walkJSONValue(json, node.getName()); - } - else - { - visit(node.getName(), null); - walkJSONValue(json, node.getName()); - leave(node.getName()); - } - } - else - walkJSONValue(json, node.getName()); - } - else - { - visit(node.getName(), StringUtil.unquote(columnData)); - leave(node.getName()); - } - } - catch (JSONException e) - { - throw new XerialException(e.getErrorCode(), String.format("line=%d: %s", parser.getNumReadLine(), e - .getMessage())); - } - - } - - /** - * Fill the queue by retrieving the next event from the pull parser. - * - * @throws XerialException - */ - private void fillQueue() throws XerialException - { - if (!readerStack.isEmpty()) - { - TreeEvent e = readerStack.peekLast().next(); - if (e == null) - { - readerStack.removeLast(); - fillQueue(); - } - else - { - eventQueue.push(e); - return; - } - } - - if (!parser.hasNext()) - { - // no more input data - closeContextUpTo(parseContext.getIndentationOffset()); - hasFinished = true; - return; - } - - SilkEvent currentEvent = parser.next(); - - // update the line count - numReadLine = parser.getNumReadLine(); - - if (_logger.isTraceEnabled()) - { - _logger.trace("stack: " + parseContext.getContextNodeStack()); - _logger.trace(currentEvent); - } - - switch (currentEvent.getType()) - { - case NODE: - // push context node - SilkNode newContextNode = SilkNode.class.cast(currentEvent.getElement()); - openContext(newContextNode); - break; - case FUNCTION: - SilkFunction function = SilkFunction.class.cast(currentEvent.getElement()); - evalFunction(function); - break; - case DATA_LINE: - // pop the context stack until finding a node for stream data node occurrence - while (!parseContext.isContextNodeStackEmpty()) - { - SilkContext context = parseContext.peekLastContext(); - SilkNode node = context.contextNode; - if (!node.getOccurrence().isFollowedByStreamData()) - { - parseContext.popLastContext(); - if (context.isOpen) - leave(node.getName()); - } - else - break; - } - - if (parseContext.isContextNodeStackEmpty()) - { - // use default column names(c1, c2, ...) - SilkDataLine line = SilkDataLine.class.cast(currentEvent.getElement()); - String[] columns = line.getDataLine().trim().split("\t"); - int index = 1; - visit("row", null); - for (String each : columns) - { - String columnName = String.format("c%d", index++); - - // TODO use evalColumnData - visit(columnName, each); - leave(columnName); - } - leave("row"); - } - else - { - SilkContext context = parseContext.peekLastContext(); - SilkNode schema = context.contextNode; - SilkDataLine line = SilkDataLine.class.cast(currentEvent.getElement()); - switch (schema.getOccurrence()) - { - case SEQUENCE: - text(line.getDataLine()); - break; - case ZERO_OR_MORE: - // CSV data - { - evalDatalineColumn(schema, line.getDataLine()); - } - break; - case TABBED_SEQUENCE: - { - String[] columns = line.getDataLine().trim().split("\t"); - int columnIndex = 0; - visit(schema.getName(), schema.hasValue() ? schema.getValue().toString() : null); - for (int i = 0; i < schema.getChildNodes().size(); i++) - { - SilkNode child = schema.getChildNodes().get(i); - if (child.hasValue()) - { - // output the default value for the column - evalDatalineColumn(child, child.getValue().toString()); - } - else - { - if (columnIndex < columns.length) - { - String columnData = columns[columnIndex++]; - if (columnData.length() > 0) - evalDatalineColumn(child, columnData); - } - } - } - leave(schema.getName()); - break; - } - case MULTILINE_SEQUENCE: - { - int cursor = parseContext.contextNodeAttributeCursor; - - if (cursor >= schema.getChildNodes().size()) - break; - - SilkNode attributeNode = schema.getChildNodes().get(cursor); - if (cursor == 0 && !parseContext.isAttributeOpen) - { - visit(schema.getName(), schema.hasValue() ? schema.getValue().toString() : null); - } - if (!parseContext.isAttributeOpen) - { - if (attributeNode.hasValue()) - visit(attributeNode.getName(), attributeNode.getValue().toString()); - else - visit(attributeNode.getName(), null); - - parseContext.isAttributeOpen = true; - } - text(line.getDataLine().trim()); - break; - } - } - } - break; - case MULTILINE_ENTRY_SEPARATOR: // >> - { - SilkContext context = parseContext.peekLastContext(); - SilkNode schema = context.contextNode; - if (parseContext.isAttributeOpen) - { - SilkNode attributeNode = schema.getChildNodes().get(parseContext.contextNodeAttributeCursor); - leave(attributeNode.getName()); - } - leave(schema.getName()); - // reset - parseContext.contextNodeAttributeCursor = 0; - parseContext.isAttributeOpen = false; - break; - } - case MULTILINE_SEPARATOR: // -- - { - SilkContext context = parseContext.peekLastContext(); - SilkNode schema = context.contextNode; - if (parseContext.isAttributeOpen) - { - SilkNode attributeNode = schema.getChildNodes().get(parseContext.contextNodeAttributeCursor); - leave(attributeNode.getName()); - } - parseContext.contextNodeAttributeCursor++; - parseContext.isAttributeOpen = false; - break; - } - case BLANK_LINE: - break; - - case PREAMBLE: - break; + assert (prefetchedEventQueue.isEmpty()); - } - - } - - private void walkJSONAray(JSONArray jsonArray, String parentNodeName) throws XerialException - { - for (JSONValue each : jsonArray) - { - walkJSONValue(each, parentNodeName); - } - } - - private void walkJSONObject(JSONObject jsonObj) throws XerialException - { - for (String key : jsonObj.keys()) - { - JSONValue val = jsonObj.get(key); - walkJSONValue(val, key); - } - } + if (hasPrefetchFinished) + return false; - private void walkJSONValue(JSONValue value, String parentNodeName) throws XerialException - { - JSONValueType type = value.getValueType(); - switch (type) + if (hasParsingFinished) { - case Array: - walkJSONAray(value.getJSONArray(), parentNodeName); - break; - case Object: - walkJSONObject(value.getJSONObject()); - break; - case Boolean: - visit(parentNodeName, value.toString()); - leave(parentNodeName); - break; - case Double: - visit(parentNodeName, value.toString()); - leave(parentNodeName); - break; - case Integer: - visit(parentNodeName, value.toString()); - leave(parentNodeName); - break; - case Null: - visit(parentNodeName, value.toString()); - leave(parentNodeName); - break; - case String: - visit(parentNodeName, value.toString()); - leave(parentNodeName); - break; + int count = eventQueue.drainTo(prefetchedEventQueue); + //_logger.debug("prefetch: " + count); + //_logger.debug("eventQueue size: " + eventQueue.size()); + hasPrefetchFinished = true; + return hasNext(); } - } - - /** - * Plugin holder - */ - private static Map> pluginTable = null; - - /** - * Get the plugin of the specified name - * - * @param name - * plugin name - * @return plugin instance or null if no corresponding plugin is found. - */ - private SilkFunctionPlugin getPlugin(String name) - { - Class pluginClass = getPluginTable().get(name); - if (pluginClass == null) - return null; - try { - SilkFunctionPlugin pluginInstance = pluginClass.newInstance(); - return pluginInstance; - } - catch (InstantiationException e) - { - _logger.warn(e); - return null; - } - catch (IllegalAccessException e) - { - _logger.warn(e); - return null; - } - } + TreeEvent e = null; + while (!hasParsingFinished && (e = eventQueue.poll(1, TimeUnit.MILLISECONDS)) == null) + {} - private Map> getPluginTable() - { - if (pluginTable == null) - { - pluginTable = new TreeMap>(); - // load plugins - for (Class each : FileResource.findClasses(SilkFunctionPlugin.class.getPackage(), - SilkFunctionPlugin.class, SilkWalker.class.getClassLoader())) - { - String functionName = each.getSimpleName().toLowerCase(); - _logger.trace("loaded " + functionName); - pluginTable.put(functionName, each); - } - } - - return pluginTable; - } + if (e != null) + prefetchedEventQueue.addLast(e); - /** - * Information of the function (plugin) arguments ( - * {@link SilkFunctionArgument}) described in the Class definition, which - * implements {@link SilkFunctionPlugin}. - * - * @author leo - * - */ - private static class PluginField - { - Field field; - SilkFunctionArgument argInfo; - - private PluginField(SilkFunctionArgument argInfo, Field field) - { - this.argInfo = argInfo; - this.field = field; + return hasNext(); } - } - - private static class PluginHolder - { - Class< ? extends SilkFunctionPlugin> pluginClass; - ArrayList argumentFieldList = new ArrayList(); - Map keyValueFieldTable = new HashMap(); - - public PluginHolder(Class< ? extends SilkFunctionPlugin> pluginClass) + catch (InterruptedException e) { - this.pluginClass = pluginClass; - //ArrayList argDefs = new ArrayList(); - for (Field eachField : pluginClass.getDeclaredFields()) - { - SilkFunctionArgument argInfo = eachField.getAnnotation(SilkFunctionArgument.class); - if (argInfo != null) - { - PluginField pf = new PluginField(argInfo, eachField); - if (argInfo.name().equals(SilkFunctionArgument.NO_VALUE)) - argumentFieldList.add(pf); - else - keyValueFieldTable.put(argInfo.name(), pf); - } - } - - // sort arguments in the order of their ordinal - Collections.sort(argumentFieldList, new Comparator() { - public int compare(PluginField o1, PluginField o2) - { - return o1.argInfo.ordinal() - o2.argInfo.ordinal(); - } - }); - - } - - /** - * Bind function arguments to the plug-in instance - * - * @param plugin - * the instance of the plug-in - * @param args - * the function arguments - */ - public void populate(SilkFunctionPlugin plugin, List args) - { - int noNameArgCount = 0; - for (SilkFunctionArg eachArgument : args) - { - String argValue = eachArgument.getValue().toString(); - try - { - if (eachArgument.hasName()) - { - // key value arg - PluginField f = keyValueFieldTable.get(eachArgument.getName()); - if (f == null) - { - _logger.warn("unknown argument: " + eachArgument); - continue; - } - ReflectionUtil.setFieldValue(plugin, f.field, argValue); - } - else - { - // unnamed argument - // matching argument order - if (noNameArgCount >= argumentFieldList.size()) - { - _logger.warn(String.format("no corresponding field for the argument %s is found", - eachArgument)); - continue; - } - PluginField f = argumentFieldList.get(noNameArgCount); - ReflectionUtil.setFieldValue(plugin, f.field, argValue); - - if (!TypeInfo.isCollection(f.field.getType())) - noNameArgCount++; - } - } - catch (XerialException e) - { - _logger.error(e); - } - - } } - } - - /** - * Fill the plug-in argument fields with the given arguments - * - * @param plugin - * plug-in instance. - * @param args - * function arguments. - */ - private static void populate(SilkFunctionPlugin plugin, List args) - { - PluginHolder holder = new PluginHolder(plugin.getClass()); - holder.populate(plugin, args); + return false; } public long getNumReadLine()