From: leo Date: Wed, 3 Jun 2009 09:35:48 +0000 (+0000) Subject: put parser configurations into SilkParserConfig X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=c36b5d79dde664779af4948e23084a7e2f6ff0d2;p=xerial%2Fxerial-core.git put parser configurations into SilkParserConfig git-svn-id: http://www.xerial.org/svn/project/XerialJ/trunk/xerial-core@3354 ae02f08e-27ec-0310-ae8c-8ba02fe2eafd --- diff --git a/src/main/java/org/xerial/silk/SilkContext.java b/src/main/java/org/xerial/silk/SilkContext.java index c6c00a8..15ed782 100644 --- a/src/main/java/org/xerial/silk/SilkContext.java +++ b/src/main/java/org/xerial/silk/SilkContext.java @@ -26,7 +26,7 @@ package org.xerial.silk; import org.xerial.silk.impl.SilkNode; -public class SilkContext +class SilkContext { public final SilkNode contextNode; boolean isOpen; diff --git a/src/main/java/org/xerial/silk/SilkLineFastParser.java b/src/main/java/org/xerial/silk/SilkLineFastParser.java index 7e12b24..03da100 100644 --- a/src/main/java/org/xerial/silk/SilkLineFastParser.java +++ b/src/main/java/org/xerial/silk/SilkLineFastParser.java @@ -53,39 +53,36 @@ public class SilkLineFastParser private static Logger _logger = Logger.getLogger(SilkLineFastParser.class); private final BufferedReader buffer; - - private int numLinesPerJob = 5000; - private int numWorkers = 4; private final ExecutorService threadManager; - - private final LinkedBlockingQueue>> eventContainer = new LinkedBlockingQueue>>( - numWorkers); - - private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; // 1M + private final LinkedBlockingQueue>> eventContainer; + private final SilkParserConfig config; public SilkLineFastParser(URL resourceURL) throws IOException { - this(resourceURL, DEFAULT_BUFFER_SIZE); + this(resourceURL, new SilkParserConfig()); } - public SilkLineFastParser(URL resourceURL, int bufferSize) throws IOException + public SilkLineFastParser(URL resourceURL, SilkParserConfig config) throws IOException { - this(new InputStreamReader(resourceURL.openStream()), bufferSize); + this(new InputStreamReader(resourceURL.openStream()), config); } public SilkLineFastParser(Reader reader) { - this(reader, DEFAULT_BUFFER_SIZE); + this(reader, new SilkParserConfig()); } - public SilkLineFastParser(Reader reader, int bufferSize) + public SilkLineFastParser(Reader reader, SilkParserConfig config) { + this.config = config; + this.eventContainer = new LinkedBlockingQueue>>(config.numWorkers); + if (reader.getClass().isAssignableFrom(BufferedReader.class)) buffer = BufferedReader.class.cast(reader); else - buffer = new BufferedReader(reader, bufferSize); + buffer = new BufferedReader(reader, config.bufferSize); - threadManager = Executors.newFixedThreadPool(numWorkers + 1); + threadManager = Executors.newFixedThreadPool(config.numWorkers + 1); } private volatile boolean foundEOF = false; @@ -102,9 +99,9 @@ public class SilkLineFastParser while (!foundEOF) { - ArrayList cache = new ArrayList(numLinesPerJob); + ArrayList cache = new ArrayList(config.numLinesInBlock); int lineCount = 0; - while (lineCount < numLinesPerJob) + while (lineCount < config.numLinesInBlock) { lineCount++; String line = buffer.readLine(); diff --git a/src/main/java/org/xerial/silk/SilkLinePushParser.java b/src/main/java/org/xerial/silk/SilkLinePushParser.java index a0caa21..61548af 100644 --- a/src/main/java/org/xerial/silk/SilkLinePushParser.java +++ b/src/main/java/org/xerial/silk/SilkLinePushParser.java @@ -62,6 +62,7 @@ public class SilkLinePushParser private final BufferedReader buffer; private long lineCount = 0; private SilkEventHandler handler = null; + private final SilkParserConfig config; private static final SilkEvent EOFEvent = new SilkEvent(SilkEventType.END_OF_FILE, null); private static final SilkEvent BlankLineEvent = new SilkEvent(SilkEventType.BLANK_LINE, null); @@ -71,22 +72,24 @@ public class SilkLinePushParser this(new InputStreamReader(resourceURL.openStream())); } - public SilkLinePushParser(URL resourceURL, int bufferSize) throws IOException + public SilkLinePushParser(URL resourceURL, SilkParserConfig config) throws IOException { - this(new InputStreamReader(resourceURL.openStream()), bufferSize); + this(new InputStreamReader(resourceURL.openStream()), config); } public SilkLinePushParser(Reader reader) { - this(reader, 1024 * 1024); // 1MB + this(reader, new SilkParserConfig()); // 1MB } - public SilkLinePushParser(Reader reader, int bufferSize) + public SilkLinePushParser(Reader reader, SilkParserConfig config) { + this.config = config; + if (reader.getClass().isAssignableFrom(BufferedReader.class)) buffer = BufferedReader.class.cast(reader); else - buffer = new BufferedReader(reader, bufferSize); + buffer = new BufferedReader(reader, config.bufferSize); lexer = new SilkLexer(); } diff --git a/src/main/java/org/xerial/silk/SilkParser.java b/src/main/java/org/xerial/silk/SilkParser.java index bdc57da..fb99f06 100644 --- a/src/main/java/org/xerial/silk/SilkParser.java +++ b/src/main/java/org/xerial/silk/SilkParser.java @@ -78,13 +78,15 @@ public class SilkParser implements SilkEventHandler { private static Logger _logger = Logger.getLogger(SilkStreamReader.class); - private final SilkLinePushParser parser; + private final SilkLineFastParser parser; private final SilkEnv parseContext; private TreeEventQueue eventQueue = new TreeEventQueue(); private final ArrayDeque readerStack = new ArrayDeque(); private long numReadLine = 0; + private final SilkParserConfig config; + /** * Creates a new reader with the specified reader * @@ -105,7 +107,8 @@ public class SilkParser implements SilkEventHandler */ private SilkParser(Reader input, SilkEnv env) throws IOException { - this.parser = new SilkLinePushParser(input); + this.config = new SilkParserConfig(); + this.parser = new SilkLineFastParser(input); this.parseContext = env; } @@ -120,23 +123,24 @@ public class SilkParser implements SilkEventHandler this(resourcePath, SilkEnv.newEnv()); } - public SilkParser(URL resourcePath, int bufferSize) throws IOException + public SilkParser(URL resourcePath, SilkParserConfig config) throws IOException { - this(resourcePath, SilkEnv.newEnv(), bufferSize); + this(resourcePath, SilkEnv.newEnv(), config); } public SilkParser(URL resource, SilkEnv env) throws IOException { - this(resource, env, 1024 * 1024); + this(resource, env, new SilkParserConfig()); } - public SilkParser(URL resource, SilkEnv env, int bufferSize) throws IOException + public SilkParser(URL resource, SilkEnv env, SilkParserConfig config) throws IOException { + this.config = config; String path = resource.toExternalForm(); int fileNamePos = path.lastIndexOf("/"); String resourceBasePath = fileNamePos > 0 ? path.substring(0, fileNamePos) : null; - this.parser = new SilkLinePushParser(new InputStreamReader(resource.openStream()), bufferSize); + this.parser = new SilkLineFastParser(new InputStreamReader(resource.openStream()), config); this.parseContext = SilkEnv.newEnv(env, resourceBasePath); } @@ -824,8 +828,7 @@ public class SilkParser implements SilkEventHandler } catch (JSONException e) { - throw new XerialException(e.getErrorCode(), String.format("line=%d: %s", parser.getNumReadLine(), e - .getMessage())); + throw new XerialException(e.getErrorCode(), String.format("line=%d: %s", numReadLine, e.getMessage())); } } diff --git a/src/main/java/org/xerial/silk/SilkParserConfig.java b/src/main/java/org/xerial/silk/SilkParserConfig.java new file mode 100644 index 0000000..566352d --- /dev/null +++ b/src/main/java/org/xerial/silk/SilkParserConfig.java @@ -0,0 +1,39 @@ +/*-------------------------------------------------------------------------- + * Copyright 2009 Taro L. Saito + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *--------------------------------------------------------------------------*/ +//-------------------------------------- +// XerialJ +// +// SilkParserConfig.java +// Since: Jun 3, 2009 5:33:54 PM +// +// $URL$ +// $Author$ +//-------------------------------------- +package org.xerial.silk; + +/** + * SilkParserConfig + * + * @author leo + * + */ +public class SilkParserConfig +{ + public int bufferSize = 1024 * 1024; // 1M + public int numWorkers = 2; + public int numLinesInBlock = 1000; + +} diff --git a/src/main/java/org/xerial/silk/cui/Scan.java b/src/main/java/org/xerial/silk/cui/Scan.java index 9f4282b..08713cb 100644 --- a/src/main/java/org/xerial/silk/cui/Scan.java +++ b/src/main/java/org/xerial/silk/cui/Scan.java @@ -28,12 +28,14 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; +import org.xerial.lens.ObjectLens; import org.xerial.silk.SilkEvent; import org.xerial.silk.SilkEventHandler; import org.xerial.silk.SilkEventType; import org.xerial.silk.SilkLineFastParser; import org.xerial.silk.SilkLinePushParser; import org.xerial.silk.SilkParser; +import org.xerial.silk.SilkParserConfig; import org.xerial.util.StopWatch; import org.xerial.util.log.Logger; import org.xerial.util.opt.Argument; @@ -65,25 +67,42 @@ public class Scan implements SilkCommand @Option(symbol = "b", longName = "buffer", description = "buffer size in MB (default = 1)") private int bufferSizeInMB = 1; + @Option(symbol = "n", longName = "thread", description = "num workder threads") + private int numThreads = 2; + + @Option(symbol = "c", longName = "lines", description = "num assigned lines for each worker threads") + private int numLines = 1000; + private void reportReadSpeed(double time, long fileSize) { double speedInMBS = fileSize / 1024 / 1024 / time; - _logger.info(String.format("time=%.2f, %3.2f MB/s", time, speedInMBS)); + _logger.info(String.format("\ntime=%.2f, %3.2f MB/s", time, speedInMBS)); } + private void reportLinesPerSec(double time, long lineCount) + { + double speed = lineCount / time; + System.err.print(String.format("time=%5.2f line=%,10d %,10.0f lines/s\r", time, lineCount, speed)); + } + public void execute() throws Exception { File f = new File(inputSilkFile); final long fileSize = f.length(); - int bufferSize = bufferSizeInMB * 1024 * 1024; + final SilkParserConfig config = new SilkParserConfig(); + config.bufferSize = bufferSizeInMB * 1024 * 1024; + config.numWorkers = numThreads; + config.numLinesInBlock = numLines; + + _logger.info("config: " + ObjectLens.toJSON(config)); switch (mode) { case NODE: { - SilkParser parser = new SilkParser(f.toURL(), bufferSize); + SilkParser parser = new SilkParser(f.toURL(), config); parser.parse(new TreeEventHandlerBase() { @@ -104,7 +123,7 @@ public class Scan implements SilkCommand { double time = timer.getElapsedTime(); double speed = count / time; - _logger.info(String.format("node=%,15d time=%5.2f %,10.0f nodes/s", count, time, speed)); + System.err.print(String.format("node=%,15d time=%5.2f %,10.0f nodes/s\r", count, time, speed)); } } @@ -115,9 +134,8 @@ public class Scan implements SilkCommand double time = timer.getElapsedTime(); double speedPerNode = ((double) count) / time; double speedInMBS = fileSize / 1024 / 1024 / time; - _logger - .info(String - .format("time=%.2f %,10.0f nodes/s, %3.2f MB/s", time, speedPerNode, speedInMBS)); + _logger.info(String.format("\ntime=%.2f %,10.0f nodes/s, %3.2f MB/s", time, speedPerNode, + speedInMBS)); } }); @@ -125,7 +143,7 @@ public class Scan implements SilkCommand } case LINE: { - SilkLinePushParser parser = new SilkLinePushParser(f.toURL(), bufferSize); + SilkLinePushParser parser = new SilkLinePushParser(f.toURL(), config); parser.parse(new SilkEventHandler() { int lineCount = 0; @@ -142,9 +160,7 @@ public class Scan implements SilkCommand lineCount++; if (lineCount % 100000 == 0) { - double time = timer.getElapsedTime(); - double speed = lineCount / time; - _logger.info(String.format("time=%5.2f line=%,10d %,10.0f lines/s", time, lineCount, speed)); + reportLinesPerSec(timer.getElapsedTime(), lineCount); } } @@ -154,7 +170,7 @@ public class Scan implements SilkCommand } case FASTLINE: { - SilkLineFastParser parser = new SilkLineFastParser(f.toURL(), bufferSize); + SilkLineFastParser parser = new SilkLineFastParser(f.toURL(), config); parser.parse(new SilkEventHandler() { int lineCount = 0; @@ -171,9 +187,7 @@ public class Scan implements SilkCommand lineCount++; if (lineCount % 100000 == 0) { - double time = timer.getElapsedTime(); - double speed = lineCount / time; - _logger.info(String.format("time=%5.2f line=%,10d %,10.0f lines/s", time, lineCount, speed)); + reportLinesPerSec(timer.getElapsedTime(), lineCount); } } @@ -183,7 +197,7 @@ public class Scan implements SilkCommand } case READONLY: { - BufferedReader reader = new BufferedReader(new FileReader(f), bufferSize); + BufferedReader reader = new BufferedReader(new FileReader(f), config.bufferSize); String line; int lineCount = 0; @@ -194,9 +208,7 @@ public class Scan implements SilkCommand lineCount++; if (lineCount % 100000 == 0) { - double time = timer.getElapsedTime(); - double speed = lineCount / time; - _logger.info(String.format("time=%5.2f, line=%,10d, %,10.0f lines/s", time, lineCount, speed)); + reportLinesPerSec(timer.getElapsedTime(), lineCount); } } diff --git a/src/test/java/org/xerial/silk/SilkStreamReaderTest.java b/src/test/java/org/xerial/silk/SilkStreamReaderTest.java index 45e4e03..328addb 100644 --- a/src/test/java/org/xerial/silk/SilkStreamReaderTest.java +++ b/src/test/java/org/xerial/silk/SilkStreamReaderTest.java @@ -60,9 +60,14 @@ public class SilkStreamReaderTest private static final double largeFileLines = 111965; private static final int numNodes = 5826313; + private SilkParserConfig config; + @Before public void setUp() throws Exception - {} + { + config = new SilkParserConfig(); + config.bufferSize = 1024 * 1024 * 16; // 16MB + } @After public void tearDown() throws Exception @@ -203,12 +208,10 @@ public class SilkStreamReaderTest speedPerNode, speedInMBS, speedPerLine)); } - private final int bufferSize = 1024 * 1024 * 16; - @Test public void maxReadSpeed() throws Exception { - BufferedReader reader = new BufferedReader(new InputStreamReader(largeFile.openStream()), bufferSize); + BufferedReader reader = new BufferedReader(new InputStreamReader(largeFile.openStream()), config.bufferSize); int lineCount = 0; String line = null; StopWatch timer = new StopWatch(); @@ -238,10 +241,10 @@ public class SilkStreamReaderTest int lineCount = 0; StopWatch timer = new StopWatch(); - char[] buf = new char[bufferSize]; + char[] buf = new char[config.bufferSize]; int numBytes = 0; int numReadBytes = 0; - while ((numReadBytes = reader.read(buf, 0, bufferSize)) != -1) + while ((numReadBytes = reader.read(buf, 0, config.bufferSize)) != -1) { numBytes += numReadBytes; } @@ -312,11 +315,10 @@ public class SilkStreamReaderTest int count = 0; - @Ignore @Test public void parserPerformance() throws Exception { - SilkParser parser = new SilkParser(largeFile, bufferSize); + SilkParser parser = new SilkParser(largeFile, config); final StopWatch timer = new StopWatch(); count = 0; @@ -363,7 +365,7 @@ public class SilkStreamReaderTest @Test public void pushParserPerformance() throws Exception { - final SilkLinePushParser reader = new SilkLinePushParser(largeFile, bufferSize); + final SilkLinePushParser reader = new SilkLinePushParser(largeFile, config); final StopWatch timer = new StopWatch(); reader.parse(new SilkEventHandler() { @@ -389,7 +391,7 @@ public class SilkStreamReaderTest @Test public void fastPushParserPerformance() throws Exception { - SilkLineFastParser parser = new SilkLineFastParser(largeFile, bufferSize); + SilkLineFastParser parser = new SilkLineFastParser(largeFile, config); StopWatch timer = new StopWatch(); parser.parse(new SilkEventHandler() {