OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / tendermint / tmlibs / autofile / group.go
1 package autofile
2
3 import (
4         "bufio"
5         "errors"
6         "fmt"
7         "io"
8         "log"
9         "os"
10         "path"
11         "path/filepath"
12         "regexp"
13         "strconv"
14         "strings"
15         "sync"
16         "time"
17
18         . "github.com/tendermint/tmlibs/common"
19 )
20
21 const (
22         groupCheckDuration    = 5000 * time.Millisecond
23         defaultHeadSizeLimit  = 10 * 1024 * 1024       // 10MB
24         defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
25         maxFilesToRemove      = 4                      // needs to be greater than 1
26 )
27
28 /*
29 You can open a Group to keep restrictions on an AutoFile, like
30 the maximum size of each chunk, and/or the total amount of bytes
31 stored in the group.
32
33 The first file to be written in the Group.Dir is the head file.
34
35         Dir/
36         - <HeadPath>
37
38 Once the Head file reaches the size limit, it will be rotated.
39
40         Dir/
41         - <HeadPath>.000   // First rolled file
42         - <HeadPath>       // New head path, starts empty.
43                                                                                  // The implicit index is 001.
44
45 As more files are written, the index numbers grow...
46
47         Dir/
48         - <HeadPath>.000   // First rolled file
49         - <HeadPath>.001   // Second rolled file
50         - ...
51         - <HeadPath>       // New head path
52
53 The Group can also be used to binary-search for some line,
54 assuming that marker lines are written occasionally.
55 */
56 type Group struct {
57         BaseService
58
59         ID             string
60         Head           *AutoFile // The head AutoFile to write to
61         headBuf        *bufio.Writer
62         Dir            string // Directory that contains .Head
63         ticker         *time.Ticker
64         mtx            sync.Mutex
65         headSizeLimit  int64
66         totalSizeLimit int64
67         minIndex       int // Includes head
68         maxIndex       int // Includes head, where Head will move to
69
70         // TODO: When we start deleting files, we need to start tracking GroupReaders
71         // and their dependencies.
72 }
73
74 func OpenGroup(headPath string) (g *Group, err error) {
75
76         dir := path.Dir(headPath)
77         head, err := OpenAutoFile(headPath)
78         if err != nil {
79                 return nil, err
80         }
81
82         g = &Group{
83                 ID:             "group:" + head.ID,
84                 Head:           head,
85                 headBuf:        bufio.NewWriterSize(head, 4096*10),
86                 Dir:            dir,
87                 ticker:         time.NewTicker(groupCheckDuration),
88                 headSizeLimit:  defaultHeadSizeLimit,
89                 totalSizeLimit: defaultTotalSizeLimit,
90                 minIndex:       0,
91                 maxIndex:       0,
92         }
93         g.BaseService = *NewBaseService(nil, "Group", g)
94
95         gInfo := g.readGroupInfo()
96         g.minIndex = gInfo.MinIndex
97         g.maxIndex = gInfo.MaxIndex
98         return
99 }
100
101 func (g *Group) OnStart() error {
102         g.BaseService.OnStart()
103         go g.processTicks()
104         return nil
105 }
106
107 // NOTE: g.Head must be closed separately
108 func (g *Group) OnStop() {
109         g.BaseService.OnStop()
110         g.ticker.Stop()
111 }
112
113 // SetHeadSizeLimit allows you to overwrite default head size limit - 10MB.
114 func (g *Group) SetHeadSizeLimit(limit int64) {
115         g.mtx.Lock()
116         g.headSizeLimit = limit
117         g.mtx.Unlock()
118 }
119
120 // HeadSizeLimit returns the current head size limit.
121 func (g *Group) HeadSizeLimit() int64 {
122         g.mtx.Lock()
123         defer g.mtx.Unlock()
124         return g.headSizeLimit
125 }
126
127 // SetTotalSizeLimit allows you to overwrite default total size limit of the
128 // group - 1GB.
129 func (g *Group) SetTotalSizeLimit(limit int64) {
130         g.mtx.Lock()
131         g.totalSizeLimit = limit
132         g.mtx.Unlock()
133 }
134
135 // TotalSizeLimit returns total size limit of the group.
136 func (g *Group) TotalSizeLimit() int64 {
137         g.mtx.Lock()
138         defer g.mtx.Unlock()
139         return g.totalSizeLimit
140 }
141
142 // MaxIndex returns index of the last file in the group.
143 func (g *Group) MaxIndex() int {
144         g.mtx.Lock()
145         defer g.mtx.Unlock()
146         return g.maxIndex
147 }
148
149 // MinIndex returns index of the first file in the group.
150 func (g *Group) MinIndex() int {
151         g.mtx.Lock()
152         defer g.mtx.Unlock()
153         return g.minIndex
154 }
155
156 // Write writes the contents of p into the current head of the group. It
157 // returns the number of bytes written. If nn < len(p), it also returns an
158 // error explaining why the write is short.
159 // NOTE: Writes are buffered so they don't write synchronously
160 // TODO: Make it halt if space is unavailable
161 func (g *Group) Write(p []byte) (nn int, err error) {
162         g.mtx.Lock()
163         defer g.mtx.Unlock()
164         return g.headBuf.Write(p)
165 }
166
167 // WriteLine writes line into the current head of the group. It also appends "\n".
168 // NOTE: Writes are buffered so they don't write synchronously
169 // TODO: Make it halt if space is unavailable
170 func (g *Group) WriteLine(line string) error {
171         g.mtx.Lock()
172         defer g.mtx.Unlock()
173         _, err := g.headBuf.Write([]byte(line + "\n"))
174         return err
175 }
176
177 // Flush writes any buffered data to the underlying file and commits the
178 // current content of the file to stable storage.
179 func (g *Group) Flush() error {
180         g.mtx.Lock()
181         defer g.mtx.Unlock()
182         err := g.headBuf.Flush()
183         if err == nil {
184                 err = g.Head.Sync()
185         }
186         return err
187 }
188
189 func (g *Group) processTicks() {
190         for {
191                 _, ok := <-g.ticker.C
192                 if !ok {
193                         return // Done.
194                 }
195                 g.checkHeadSizeLimit()
196                 g.checkTotalSizeLimit()
197         }
198 }
199
200 // NOTE: for testing
201 func (g *Group) stopTicker() {
202         g.ticker.Stop()
203 }
204
205 // NOTE: this function is called manually in tests.
206 func (g *Group) checkHeadSizeLimit() {
207         limit := g.HeadSizeLimit()
208         if limit == 0 {
209                 return
210         }
211         size, err := g.Head.Size()
212         if err != nil {
213                 panic(err)
214         }
215         if size >= limit {
216                 g.RotateFile()
217         }
218 }
219
220 func (g *Group) checkTotalSizeLimit() {
221         limit := g.TotalSizeLimit()
222         if limit == 0 {
223                 return
224         }
225
226         gInfo := g.readGroupInfo()
227         totalSize := gInfo.TotalSize
228         for i := 0; i < maxFilesToRemove; i++ {
229                 index := gInfo.MinIndex + i
230                 if totalSize < limit {
231                         return
232                 }
233                 if index == gInfo.MaxIndex {
234                         // Special degenerate case, just do nothing.
235                         log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound")
236                         return
237                 }
238                 pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
239                 fileInfo, err := os.Stat(pathToRemove)
240                 if err != nil {
241                         log.Println("WARNING: Failed to fetch info for file @" + pathToRemove)
242                         continue
243                 }
244                 err = os.Remove(pathToRemove)
245                 if err != nil {
246                         log.Println(err)
247                         return
248                 }
249                 totalSize -= fileInfo.Size()
250         }
251 }
252
253 // RotateFile causes group to close the current head and assign it some index.
254 // Note it does not create a new head.
255 func (g *Group) RotateFile() {
256         g.mtx.Lock()
257         defer g.mtx.Unlock()
258
259         headPath := g.Head.Path
260
261         if err := g.Head.closeFile(); err != nil {
262                 panic(err)
263         }
264
265         indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
266         if err := os.Rename(headPath, indexPath); err != nil {
267                 panic(err)
268         }
269
270         g.maxIndex += 1
271 }
272
273 // NewReader returns a new group reader.
274 // CONTRACT: Caller must close the returned GroupReader.
275 func (g *Group) NewReader(index int) (*GroupReader, error) {
276         r := newGroupReader(g)
277         err := r.SetIndex(index)
278         if err != nil {
279                 return nil, err
280         } else {
281                 return r, nil
282         }
283 }
284
285 // Returns -1 if line comes after, 0 if found, 1 if line comes before.
286 type SearchFunc func(line string) (int, error)
287
288 // Searches for the right file in Group, then returns a GroupReader to start
289 // streaming lines.
290 // Returns true if an exact match was found, otherwise returns the next greater
291 // line that starts with prefix.
292 // CONTRACT: Caller must close the returned GroupReader
293 func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
294         g.mtx.Lock()
295         minIndex, maxIndex := g.minIndex, g.maxIndex
296         g.mtx.Unlock()
297         // Now minIndex/maxIndex may change meanwhile,
298         // but it shouldn't be a big deal
299         // (maybe we'll want to limit scanUntil though)
300
301         for {
302                 curIndex := (minIndex + maxIndex + 1) / 2
303
304                 // Base case, when there's only 1 choice left.
305                 if minIndex == maxIndex {
306                         r, err := g.NewReader(maxIndex)
307                         if err != nil {
308                                 return nil, false, err
309                         }
310                         match, err := scanUntil(r, prefix, cmp)
311                         if err != nil {
312                                 r.Close()
313                                 return nil, false, err
314                         } else {
315                                 return r, match, err
316                         }
317                 }
318
319                 // Read starting roughly at the middle file,
320                 // until we find line that has prefix.
321                 r, err := g.NewReader(curIndex)
322                 if err != nil {
323                         return nil, false, err
324                 }
325                 foundIndex, line, err := scanNext(r, prefix)
326                 r.Close()
327                 if err != nil {
328                         return nil, false, err
329                 }
330
331                 // Compare this line to our search query.
332                 val, err := cmp(line)
333                 if err != nil {
334                         return nil, false, err
335                 }
336                 if val < 0 {
337                         // Line will come later
338                         minIndex = foundIndex
339                 } else if val == 0 {
340                         // Stroke of luck, found the line
341                         r, err := g.NewReader(foundIndex)
342                         if err != nil {
343                                 return nil, false, err
344                         }
345                         match, err := scanUntil(r, prefix, cmp)
346                         if !match {
347                                 panic("Expected match to be true")
348                         }
349                         if err != nil {
350                                 r.Close()
351                                 return nil, false, err
352                         } else {
353                                 return r, true, err
354                         }
355                 } else {
356                         // We passed it
357                         maxIndex = curIndex - 1
358                 }
359         }
360
361 }
362
363 // Scans and returns the first line that starts with 'prefix'
364 // Consumes line and returns it.
365 func scanNext(r *GroupReader, prefix string) (int, string, error) {
366         for {
367                 line, err := r.ReadLine()
368                 if err != nil {
369                         return 0, "", err
370                 }
371                 if !strings.HasPrefix(line, prefix) {
372                         continue
373                 }
374                 index := r.CurIndex()
375                 return index, line, nil
376         }
377 }
378
379 // Returns true iff an exact match was found.
380 // Pushes line, does not consume it.
381 func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
382         for {
383                 line, err := r.ReadLine()
384                 if err != nil {
385                         return false, err
386                 }
387                 if !strings.HasPrefix(line, prefix) {
388                         continue
389                 }
390                 val, err := cmp(line)
391                 if err != nil {
392                         return false, err
393                 }
394                 if val < 0 {
395                         continue
396                 } else if val == 0 {
397                         r.PushLine(line)
398                         return true, nil
399                 } else {
400                         r.PushLine(line)
401                         return false, nil
402                 }
403         }
404 }
405
406 // Searches backwards for the last line in Group with prefix.
407 // Scans each file forward until the end to find the last match.
408 func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
409         g.mtx.Lock()
410         minIndex, maxIndex := g.minIndex, g.maxIndex
411         g.mtx.Unlock()
412
413         r, err := g.NewReader(maxIndex)
414         if err != nil {
415                 return "", false, err
416         }
417         defer r.Close()
418
419         // Open files from the back and read
420 GROUP_LOOP:
421         for i := maxIndex; i >= minIndex; i-- {
422                 err := r.SetIndex(i)
423                 if err != nil {
424                         return "", false, err
425                 }
426                 // Scan each line and test whether line matches
427                 for {
428                         line, err := r.ReadLine()
429                         if err == io.EOF {
430                                 if found {
431                                         return match, found, nil
432                                 } else {
433                                         continue GROUP_LOOP
434                                 }
435                         } else if err != nil {
436                                 return "", false, err
437                         }
438                         if strings.HasPrefix(line, prefix) {
439                                 match = line
440                                 found = true
441                         }
442                         if r.CurIndex() > i {
443                                 if found {
444                                         return match, found, nil
445                                 } else {
446                                         continue GROUP_LOOP
447                                 }
448                         }
449                 }
450         }
451
452         return
453 }
454
455 // GroupInfo holds information about the group.
456 type GroupInfo struct {
457         MinIndex  int   // index of the first file in the group, including head
458         MaxIndex  int   // index of the last file in the group, including head
459         TotalSize int64 // total size of the group
460         HeadSize  int64 // size of the head
461 }
462
463 // Returns info after scanning all files in g.Head's dir.
464 func (g *Group) ReadGroupInfo() GroupInfo {
465         g.mtx.Lock()
466         defer g.mtx.Unlock()
467         return g.readGroupInfo()
468 }
469
470 // Index includes the head.
471 // CONTRACT: caller should have called g.mtx.Lock
472 func (g *Group) readGroupInfo() GroupInfo {
473         groupDir := filepath.Dir(g.Head.Path)
474         headBase := filepath.Base(g.Head.Path)
475         var minIndex, maxIndex int = -1, -1
476         var totalSize, headSize int64 = 0, 0
477
478         dir, err := os.Open(groupDir)
479         if err != nil {
480                 panic(err)
481         }
482         defer dir.Close()
483         fiz, err := dir.Readdir(0)
484         if err != nil {
485                 panic(err)
486         }
487
488         // For each file in the directory, filter by pattern
489         for _, fileInfo := range fiz {
490                 if fileInfo.Name() == headBase {
491                         fileSize := fileInfo.Size()
492                         totalSize += fileSize
493                         headSize = fileSize
494                         continue
495                 } else if strings.HasPrefix(fileInfo.Name(), headBase) {
496                         fileSize := fileInfo.Size()
497                         totalSize += fileSize
498                         indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
499                         submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
500                         if len(submatch) != 0 {
501                                 // Matches
502                                 fileIndex, err := strconv.Atoi(string(submatch[1]))
503                                 if err != nil {
504                                         panic(err)
505                                 }
506                                 if maxIndex < fileIndex {
507                                         maxIndex = fileIndex
508                                 }
509                                 if minIndex == -1 || fileIndex < minIndex {
510                                         minIndex = fileIndex
511                                 }
512                         }
513                 }
514         }
515
516         // Now account for the head.
517         if minIndex == -1 {
518                 // If there were no numbered files,
519                 // then the head is index 0.
520                 minIndex, maxIndex = 0, 0
521         } else {
522                 // Otherwise, the head file is 1 greater
523                 maxIndex += 1
524         }
525         return GroupInfo{minIndex, maxIndex, totalSize, headSize}
526 }
527
528 func filePathForIndex(headPath string, index int, maxIndex int) string {
529         if index == maxIndex {
530                 return headPath
531         } else {
532                 return fmt.Sprintf("%v.%03d", headPath, index)
533         }
534 }
535
536 //--------------------------------------------------------------------------------
537
538 // GroupReader provides an interface for reading from a Group.
539 type GroupReader struct {
540         *Group
541         mtx       sync.Mutex
542         curIndex  int
543         curFile   *os.File
544         curReader *bufio.Reader
545         curLine   []byte
546 }
547
548 func newGroupReader(g *Group) *GroupReader {
549         return &GroupReader{
550                 Group:     g,
551                 curIndex:  0,
552                 curFile:   nil,
553                 curReader: nil,
554                 curLine:   nil,
555         }
556 }
557
558 // Close closes the GroupReader by closing the cursor file.
559 func (gr *GroupReader) Close() error {
560         gr.mtx.Lock()
561         defer gr.mtx.Unlock()
562
563         if gr.curReader != nil {
564                 err := gr.curFile.Close()
565                 gr.curIndex = 0
566                 gr.curReader = nil
567                 gr.curFile = nil
568                 gr.curLine = nil
569                 return err
570         } else {
571                 return nil
572         }
573 }
574
575 // Read implements io.Reader, reading bytes from the current Reader
576 // incrementing index until enough bytes are read.
577 func (gr *GroupReader) Read(p []byte) (n int, err error) {
578         lenP := len(p)
579         if lenP == 0 {
580                 return 0, errors.New("given empty slice")
581         }
582
583         gr.mtx.Lock()
584         defer gr.mtx.Unlock()
585
586         // Open file if not open yet
587         if gr.curReader == nil {
588                 if err = gr.openFile(gr.curIndex); err != nil {
589                         return 0, err
590                 }
591         }
592
593         // Iterate over files until enough bytes are read
594         var nn int
595         for {
596                 nn, err = gr.curReader.Read(p[n:])
597                 n += nn
598                 if err == io.EOF {
599                         if n >= lenP {
600                                 return n, nil
601                         } else { // Open the next file
602                                 if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
603                                         return n, err1
604                                 }
605                         }
606                 } else if err != nil {
607                         return n, err
608                 } else if nn == 0 { // empty file
609                         return n, err
610                 }
611         }
612 }
613
614 // ReadLine reads a line (without delimiter).
615 // just return io.EOF if no new lines found.
616 func (gr *GroupReader) ReadLine() (string, error) {
617         gr.mtx.Lock()
618         defer gr.mtx.Unlock()
619
620         // From PushLine
621         if gr.curLine != nil {
622                 line := string(gr.curLine)
623                 gr.curLine = nil
624                 return line, nil
625         }
626
627         // Open file if not open yet
628         if gr.curReader == nil {
629                 err := gr.openFile(gr.curIndex)
630                 if err != nil {
631                         return "", err
632                 }
633         }
634
635         // Iterate over files until line is found
636         var linePrefix string
637         for {
638                 bytesRead, err := gr.curReader.ReadBytes('\n')
639                 if err == io.EOF {
640                         // Open the next file
641                         if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
642                                 return "", err1
643                         }
644                         if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
645                                 return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
646                         } else {
647                                 linePrefix += string(bytesRead)
648                                 continue
649                         }
650                 } else if err != nil {
651                         return "", err
652                 }
653                 return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
654         }
655 }
656
657 // IF index > gr.Group.maxIndex, returns io.EOF
658 // CONTRACT: caller should hold gr.mtx
659 func (gr *GroupReader) openFile(index int) error {
660
661         // Lock on Group to ensure that head doesn't move in the meanwhile.
662         gr.Group.mtx.Lock()
663         defer gr.Group.mtx.Unlock()
664
665         if index > gr.Group.maxIndex {
666                 return io.EOF
667         }
668
669         curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
670         curFile, err := os.Open(curFilePath)
671         if err != nil {
672                 return err
673         }
674         curReader := bufio.NewReader(curFile)
675
676         // Update gr.cur*
677         if gr.curFile != nil {
678                 gr.curFile.Close() // TODO return error?
679         }
680         gr.curIndex = index
681         gr.curFile = curFile
682         gr.curReader = curReader
683         gr.curLine = nil
684         return nil
685 }
686
687 // PushLine makes the given line the current one, so the next time somebody
688 // calls ReadLine, this line will be returned.
689 // panics if called twice without calling ReadLine.
690 func (gr *GroupReader) PushLine(line string) {
691         gr.mtx.Lock()
692         defer gr.mtx.Unlock()
693
694         if gr.curLine == nil {
695                 gr.curLine = []byte(line)
696         } else {
697                 panic("PushLine failed, already have line")
698         }
699 }
700
701 // CurIndex returns cursor's file index.
702 func (gr *GroupReader) CurIndex() int {
703         gr.mtx.Lock()
704         defer gr.mtx.Unlock()
705         return gr.curIndex
706 }
707
708 // SetIndex sets the cursor's file index to index by opening a file at this
709 // position.
710 func (gr *GroupReader) SetIndex(index int) error {
711         gr.mtx.Lock()
712         defer gr.mtx.Unlock()
713         return gr.openFile(index)
714 }
715
716 //--------------------------------------------------------------------------------
717
718 // A simple SearchFunc that assumes that the marker is of form
719 // <prefix><number>.
720 // For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
721 //
722 // #HEIGHT:1
723 // ...
724 // #HEIGHT:2
725 // ...
726 func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
727         return func(line string) (int, error) {
728                 if !strings.HasPrefix(line, prefix) {
729                         return -1, errors.New(Fmt("Marker line did not have prefix: %v", prefix))
730                 }
731                 i, err := strconv.Atoi(line[len(prefix):])
732                 if err != nil {
733                         return -1, errors.New(Fmt("Failed to parse marker line: %v", err.Error()))
734                 }
735                 if target < i {
736                         return 1, nil
737                 } else if target == i {
738                         return 0, nil
739                 } else {
740                         return -1, nil
741                 }
742         }
743 }