import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map.Entry;
import org.xerial.core.XerialError;
import org.xerial.core.XerialErrorCode;
HashMap<Edge, List<Operation>> operationSetOnForward = new HashMap<Edge, List<Operation>>();
HashMap<Edge, List<Operation>> operationSetOnBack = new HashMap<Edge, List<Operation>>();
-
- List<Operation> forwardActionList = null;
+ HashMap<Edge, List<TextOperation>> operatSetOnText = new HashMap<Edge, List<TextOperation>>();
public StreamAmoebaJoin(QuerySet query, AmoebaJoinHandler handler) throws IOException
{
}
+ static interface TextOperation
+ {
+ void execute(String nodeName, String textData);
+ }
+
+ class SimpleTextOperation implements TextOperation
+ {
+ final Schema schema;
+
+ public SimpleTextOperation(Schema schema)
+ {
+ this.schema = schema;
+ }
+
+ public SimpleTextOperation(PushRelation pr)
+ {
+ this.schema = pr.schema;
+ }
+
+ public void execute(String nodeName, String textData)
+ {
+ handler.text(schema, nodeName, textData);
+ }
+ }
+
+ class ContextBasedTextOperation implements TextOperation
+ {
+ final HashMap<String, TextOperation> coreNode_action = new HashMap<String, TextOperation>();
+
+ public ContextBasedTextOperation(ScopedPushRelation scopedPushOperation)
+ {
+ for (Entry<String, PushRelation> each : scopedPushOperation.coreNode_action.entrySet())
+ {
+ coreNode_action.put(each.getKey(), new SimpleTextOperation(each.getValue()));
+ }
+ }
+
+ public void execute(String nodeName, String textData)
+ {
+ for (Iterator<String> it = currentPath.descendingIterator(); it.hasNext();)
+ {
+ String contextNode = it.next();
+ if (coreNode_action.containsKey(contextNode))
+ {
+ coreNode_action.get(contextNode).execute(nodeName, textData);
+ }
+ }
+ }
+
+ }
+
/**
* Defines an operation assigned to an currentEdge of the node name lattice
*
public void text(String nodeName, String textDataFragment, TreeWalker walker) throws XerialException
{
- if (forwardActionList == null)
- throw new XerialError(XerialErrorCode.INVALID_STATE, "null action list: for text node " + nodeName);
-
Iterator<LatticeNode<String>> it = stateStack.descendingIterator();
LatticeNode<String> currentState = it.next();
LatticeNode<String> prevState = it.next();
Edge currentEdge = new Edge(prevState.getID(), currentState.getID());
+ List<TextOperation> textOperation = operatSetOnText.get(currentEdge);
+ if (textOperation == null)
+ {
+ textOperation = new ArrayList<TextOperation>();
+ operatSetOnText.put(currentEdge, textOperation);
+
+ List<Operation> forwardAction = getForwardActionList(prevState, currentState, nodeName);
+ for (Operation each : forwardAction)
+ {
+ if (each instanceof PushRelation)
+ {
+ textOperation.add(new SimpleTextOperation((PushRelation) each));
+ }
+ else if (each instanceof ScopedPushRelation)
+ {
+ textOperation.add(new ContextBasedTextOperation((ScopedPushRelation) each));
+ }
+ else
+ throw new XerialError(XerialErrorCode.INVALID_STATE, "unknown operation: " + each);
+ }
+ }
+
+ assert textOperation != null;
- handler.text(nodeName, textDataFragment);
+ for (TextOperation each : textOperation)
+ each.execute(nodeName, textDataFragment);
}
public void leaveNode(String nodeName, TreeWalker walker) throws XerialException
void forward(Node node)
{
- forwardActionList = getForwardActionList(node);
+ LatticeNode<String> prevState = latticeCursor.getNode();
+ LatticeNode<String> nextState = latticeCursor.next(node.nodeName);
+ stateStack.addLast(nextState);
+
+ List<Operation> forwardActionList = getForwardActionList(prevState, nextState, node.nodeName);
assert forwardActionList != null;
for (Operation each : forwardActionList)
}
}
- private List<Operation> getForwardActionList(Node nextNode)
+ private List<Operation> getForwardActionList(LatticeNode<String> prevState, LatticeNode<String> nextState,
+ String nodeName)
{
- LatticeNode<String> prevState = latticeCursor.getNode();
- LatticeNode<String> nextState = latticeCursor.next(nextNode.nodeName);
-
- stateStack.addLast(nextState);
-
Edge currentEdge = new Edge(prevState.getID(), nextState.getID());
+
List<Operation> actionList = operationSetOnForward.get(currentEdge);
if (actionList != null)
return actionList;
- int prevNodeID = prevState.getID();
- int nextNodeID = nextState.getID();
+ int prevNodeID = currentEdge.getSourceNodeID();
+ int nextNodeID = currentEdge.getDestNodeID();
// lazily prepare the action list
actionList = new ArrayList<Operation>();
operationSetOnBack.put(new Edge(nextNodeID, prevNodeID), backActionList);
// search for the corresponding relations to newly found two node pair
- String newlyFoundTag = nextNode.nodeName;
+ String newlyFoundTag = nodeName;
if (_logger2.isTraceEnabled())
_logger2.trace("crate actions for " + newlyFoundTag);
- if (prevState != nextState)
+ if (prevNodeID != nextNodeID)
{
List<PushRelation> foundAction = new ArrayList<PushRelation>();
// (core node, attribute node)