18 . "github.com/tendermint/tmlibs/common"
22 groupCheckDuration = 5000 * time.Millisecond
23 defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
24 defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
25 maxFilesToRemove = 4 // needs to be greater than 1
29 You can open a Group to keep restrictions on an AutoFile, like
30 the maximum size of each chunk, and/or the total amount of bytes
33 The first file to be written in the Group.Dir is the head file.
38 Once the Head file reaches the size limit, it will be rotated.
41 - <HeadPath>.000 // First rolled file
42 - <HeadPath> // New head path, starts empty.
43 // The implicit index is 001.
45 As more files are written, the index numbers grow...
48 - <HeadPath>.000 // First rolled file
49 - <HeadPath>.001 // Second rolled file
51 - <HeadPath> // New head path
53 The Group can also be used to binary-search for some line,
54 assuming that marker lines are written occasionally.
60 Head *AutoFile // The head AutoFile to write to
62 Dir string // Directory that contains .Head
67 minIndex int // Includes head
68 maxIndex int // Includes head, where Head will move to
70 // TODO: When we start deleting files, we need to start tracking GroupReaders
71 // and their dependencies.
74 func OpenGroup(headPath string) (g *Group, err error) {
76 dir := path.Dir(headPath)
77 head, err := OpenAutoFile(headPath)
83 ID: "group:" + head.ID,
85 headBuf: bufio.NewWriterSize(head, 4096*10),
87 ticker: time.NewTicker(groupCheckDuration),
88 headSizeLimit: defaultHeadSizeLimit,
89 totalSizeLimit: defaultTotalSizeLimit,
93 g.BaseService = *NewBaseService(nil, "Group", g)
95 gInfo := g.readGroupInfo()
96 g.minIndex = gInfo.MinIndex
97 g.maxIndex = gInfo.MaxIndex
101 func (g *Group) OnStart() error {
102 g.BaseService.OnStart()
107 // NOTE: g.Head must be closed separately
108 func (g *Group) OnStop() {
109 g.BaseService.OnStop()
113 // SetHeadSizeLimit allows you to overwrite default head size limit - 10MB.
114 func (g *Group) SetHeadSizeLimit(limit int64) {
116 g.headSizeLimit = limit
120 // HeadSizeLimit returns the current head size limit.
121 func (g *Group) HeadSizeLimit() int64 {
124 return g.headSizeLimit
127 // SetTotalSizeLimit allows you to overwrite default total size limit of the
129 func (g *Group) SetTotalSizeLimit(limit int64) {
131 g.totalSizeLimit = limit
135 // TotalSizeLimit returns total size limit of the group.
136 func (g *Group) TotalSizeLimit() int64 {
139 return g.totalSizeLimit
142 // MaxIndex returns index of the last file in the group.
143 func (g *Group) MaxIndex() int {
149 // MinIndex returns index of the first file in the group.
150 func (g *Group) MinIndex() int {
156 // Write writes the contents of p into the current head of the group. It
157 // returns the number of bytes written. If nn < len(p), it also returns an
158 // error explaining why the write is short.
159 // NOTE: Writes are buffered so they don't write synchronously
160 // TODO: Make it halt if space is unavailable
161 func (g *Group) Write(p []byte) (nn int, err error) {
164 return g.headBuf.Write(p)
167 // WriteLine writes line into the current head of the group. It also appends "\n".
168 // NOTE: Writes are buffered so they don't write synchronously
169 // TODO: Make it halt if space is unavailable
170 func (g *Group) WriteLine(line string) error {
173 _, err := g.headBuf.Write([]byte(line + "\n"))
177 // Flush writes any buffered data to the underlying file and commits the
178 // current content of the file to stable storage.
179 func (g *Group) Flush() error {
182 err := g.headBuf.Flush()
189 func (g *Group) processTicks() {
191 _, ok := <-g.ticker.C
195 g.checkHeadSizeLimit()
196 g.checkTotalSizeLimit()
201 func (g *Group) stopTicker() {
205 // NOTE: this function is called manually in tests.
206 func (g *Group) checkHeadSizeLimit() {
207 limit := g.HeadSizeLimit()
211 size, err := g.Head.Size()
220 func (g *Group) checkTotalSizeLimit() {
221 limit := g.TotalSizeLimit()
226 gInfo := g.readGroupInfo()
227 totalSize := gInfo.TotalSize
228 for i := 0; i < maxFilesToRemove; i++ {
229 index := gInfo.MinIndex + i
230 if totalSize < limit {
233 if index == gInfo.MaxIndex {
234 // Special degenerate case, just do nothing.
235 log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound")
238 pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
239 fileInfo, err := os.Stat(pathToRemove)
241 log.Println("WARNING: Failed to fetch info for file @" + pathToRemove)
244 err = os.Remove(pathToRemove)
249 totalSize -= fileInfo.Size()
253 // RotateFile causes group to close the current head and assign it some index.
254 // Note it does not create a new head.
255 func (g *Group) RotateFile() {
259 headPath := g.Head.Path
261 if err := g.Head.closeFile(); err != nil {
265 indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
266 if err := os.Rename(headPath, indexPath); err != nil {
273 // NewReader returns a new group reader.
274 // CONTRACT: Caller must close the returned GroupReader.
275 func (g *Group) NewReader(index int) (*GroupReader, error) {
276 r := newGroupReader(g)
277 err := r.SetIndex(index)
285 // Returns -1 if line comes after, 0 if found, 1 if line comes before.
286 type SearchFunc func(line string) (int, error)
288 // Searches for the right file in Group, then returns a GroupReader to start
290 // Returns true if an exact match was found, otherwise returns the next greater
291 // line that starts with prefix.
292 // CONTRACT: Caller must close the returned GroupReader
293 func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
295 minIndex, maxIndex := g.minIndex, g.maxIndex
297 // Now minIndex/maxIndex may change meanwhile,
298 // but it shouldn't be a big deal
299 // (maybe we'll want to limit scanUntil though)
302 curIndex := (minIndex + maxIndex + 1) / 2
304 // Base case, when there's only 1 choice left.
305 if minIndex == maxIndex {
306 r, err := g.NewReader(maxIndex)
308 return nil, false, err
310 match, err := scanUntil(r, prefix, cmp)
313 return nil, false, err
319 // Read starting roughly at the middle file,
320 // until we find line that has prefix.
321 r, err := g.NewReader(curIndex)
323 return nil, false, err
325 foundIndex, line, err := scanNext(r, prefix)
328 return nil, false, err
331 // Compare this line to our search query.
332 val, err := cmp(line)
334 return nil, false, err
337 // Line will come later
338 minIndex = foundIndex
340 // Stroke of luck, found the line
341 r, err := g.NewReader(foundIndex)
343 return nil, false, err
345 match, err := scanUntil(r, prefix, cmp)
347 panic("Expected match to be true")
351 return nil, false, err
357 maxIndex = curIndex - 1
363 // Scans and returns the first line that starts with 'prefix'
364 // Consumes line and returns it.
365 func scanNext(r *GroupReader, prefix string) (int, string, error) {
367 line, err := r.ReadLine()
371 if !strings.HasPrefix(line, prefix) {
374 index := r.CurIndex()
375 return index, line, nil
379 // Returns true iff an exact match was found.
380 // Pushes line, does not consume it.
381 func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
383 line, err := r.ReadLine()
387 if !strings.HasPrefix(line, prefix) {
390 val, err := cmp(line)
406 // Searches backwards for the last line in Group with prefix.
407 // Scans each file forward until the end to find the last match.
408 func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
410 minIndex, maxIndex := g.minIndex, g.maxIndex
413 r, err := g.NewReader(maxIndex)
415 return "", false, err
419 // Open files from the back and read
421 for i := maxIndex; i >= minIndex; i-- {
424 return "", false, err
426 // Scan each line and test whether line matches
428 line, err := r.ReadLine()
431 return match, found, nil
435 } else if err != nil {
436 return "", false, err
438 if strings.HasPrefix(line, prefix) {
442 if r.CurIndex() > i {
444 return match, found, nil
455 // GroupInfo holds information about the group.
456 type GroupInfo struct {
457 MinIndex int // index of the first file in the group, including head
458 MaxIndex int // index of the last file in the group, including head
459 TotalSize int64 // total size of the group
460 HeadSize int64 // size of the head
463 // Returns info after scanning all files in g.Head's dir.
464 func (g *Group) ReadGroupInfo() GroupInfo {
467 return g.readGroupInfo()
470 // Index includes the head.
471 // CONTRACT: caller should have called g.mtx.Lock
472 func (g *Group) readGroupInfo() GroupInfo {
473 groupDir := filepath.Dir(g.Head.Path)
474 headBase := filepath.Base(g.Head.Path)
475 var minIndex, maxIndex int = -1, -1
476 var totalSize, headSize int64 = 0, 0
478 dir, err := os.Open(groupDir)
483 fiz, err := dir.Readdir(0)
488 // For each file in the directory, filter by pattern
489 for _, fileInfo := range fiz {
490 if fileInfo.Name() == headBase {
491 fileSize := fileInfo.Size()
492 totalSize += fileSize
495 } else if strings.HasPrefix(fileInfo.Name(), headBase) {
496 fileSize := fileInfo.Size()
497 totalSize += fileSize
498 indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
499 submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
500 if len(submatch) != 0 {
502 fileIndex, err := strconv.Atoi(string(submatch[1]))
506 if maxIndex < fileIndex {
509 if minIndex == -1 || fileIndex < minIndex {
516 // Now account for the head.
518 // If there were no numbered files,
519 // then the head is index 0.
520 minIndex, maxIndex = 0, 0
522 // Otherwise, the head file is 1 greater
525 return GroupInfo{minIndex, maxIndex, totalSize, headSize}
528 func filePathForIndex(headPath string, index int, maxIndex int) string {
529 if index == maxIndex {
532 return fmt.Sprintf("%v.%03d", headPath, index)
536 //--------------------------------------------------------------------------------
538 // GroupReader provides an interface for reading from a Group.
539 type GroupReader struct {
544 curReader *bufio.Reader
548 func newGroupReader(g *Group) *GroupReader {
558 // Close closes the GroupReader by closing the cursor file.
559 func (gr *GroupReader) Close() error {
561 defer gr.mtx.Unlock()
563 if gr.curReader != nil {
564 err := gr.curFile.Close()
575 // Read implements io.Reader, reading bytes from the current Reader
576 // incrementing index until enough bytes are read.
577 func (gr *GroupReader) Read(p []byte) (n int, err error) {
580 return 0, errors.New("given empty slice")
584 defer gr.mtx.Unlock()
586 // Open file if not open yet
587 if gr.curReader == nil {
588 if err = gr.openFile(gr.curIndex); err != nil {
593 // Iterate over files until enough bytes are read
596 nn, err = gr.curReader.Read(p[n:])
601 } else { // Open the next file
602 if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
606 } else if err != nil {
608 } else if nn == 0 { // empty file
614 // ReadLine reads a line (without delimiter).
615 // just return io.EOF if no new lines found.
616 func (gr *GroupReader) ReadLine() (string, error) {
618 defer gr.mtx.Unlock()
621 if gr.curLine != nil {
622 line := string(gr.curLine)
627 // Open file if not open yet
628 if gr.curReader == nil {
629 err := gr.openFile(gr.curIndex)
635 // Iterate over files until line is found
636 var linePrefix string
638 bytesRead, err := gr.curReader.ReadBytes('\n')
640 // Open the next file
641 if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
644 if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
645 return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
647 linePrefix += string(bytesRead)
650 } else if err != nil {
653 return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
657 // IF index > gr.Group.maxIndex, returns io.EOF
658 // CONTRACT: caller should hold gr.mtx
659 func (gr *GroupReader) openFile(index int) error {
661 // Lock on Group to ensure that head doesn't move in the meanwhile.
663 defer gr.Group.mtx.Unlock()
665 if index > gr.Group.maxIndex {
669 curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
670 curFile, err := os.Open(curFilePath)
674 curReader := bufio.NewReader(curFile)
677 if gr.curFile != nil {
678 gr.curFile.Close() // TODO return error?
682 gr.curReader = curReader
687 // PushLine makes the given line the current one, so the next time somebody
688 // calls ReadLine, this line will be returned.
689 // panics if called twice without calling ReadLine.
690 func (gr *GroupReader) PushLine(line string) {
692 defer gr.mtx.Unlock()
694 if gr.curLine == nil {
695 gr.curLine = []byte(line)
697 panic("PushLine failed, already have line")
701 // CurIndex returns cursor's file index.
702 func (gr *GroupReader) CurIndex() int {
704 defer gr.mtx.Unlock()
708 // SetIndex sets the cursor's file index to index by opening a file at this
710 func (gr *GroupReader) SetIndex(index int) error {
712 defer gr.mtx.Unlock()
713 return gr.openFile(index)
716 //--------------------------------------------------------------------------------
718 // A simple SearchFunc that assumes that the marker is of form
720 // For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
726 func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
727 return func(line string) (int, error) {
728 if !strings.HasPrefix(line, prefix) {
729 return -1, errors.New(Fmt("Marker line did not have prefix: %v", prefix))
731 i, err := strconv.Atoi(line[len(prefix):])
733 return -1, errors.New(Fmt("Failed to parse marker line: %v", err.Error()))
737 } else if target == i {