OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / rjeczalik / notify / watcher_trigger.go
1 // Copyright (c) 2014-2015 The Notify Authors. All rights reserved.
2 // Use of this source code is governed by the MIT license that can be
3 // found in the LICENSE file.
4
5 // +build darwin,kqueue dragonfly freebsd netbsd openbsd solaris
6
7 // watcher_trigger is used for FEN and kqueue which behave similarly:
8 // only files and dirs can be watched directly, but not files inside dirs.
9 // As a result Create events have to be generated by implementation when
10 // after Write event is returned for watched dir, it is rescanned and Create
11 // event is returned for new files and these are automatically added
12 // to watchlist. In case of removal of watched directory, native system returns
13 // events for all files, but for Rename, they also need to be generated.
14 // As a result native system works as something like trigger for rescan,
15 // but contains additional data about dir in which changes occurred. For files
16 // detailed data is returned.
17 // Usage of watcher_trigger requires:
18 // - trigger implementation,
19 // - encode func,
20 // - not2nat, nat2not maps.
21 // Required manual operations on filesystem can lead to loss of precision.
22
23 package notify
24
25 import (
26         "os"
27         "path/filepath"
28         "strings"
29         "sync"
30         "syscall"
31 )
32
33 // trigger is to be implemented by platform implementation like FEN or kqueue.
34 type trigger interface {
35         // Close closes watcher's main native file descriptor.
36         Close() error
37         // Stop waiting for new events.
38         Stop() error
39         // Create new instance of watched.
40         NewWatched(string, os.FileInfo) (*watched, error)
41         // Record internally new *watched instance.
42         Record(*watched)
43         // Del removes internal copy of *watched instance.
44         Del(*watched)
45         // Watched returns *watched instance and native events for native type.
46         Watched(interface{}) (*watched, int64, error)
47         // Init initializes native watcher call.
48         Init() error
49         // Watch starts watching provided file/dir.
50         Watch(os.FileInfo, *watched, int64) error
51         // Unwatch stops watching provided file/dir.
52         Unwatch(*watched) error
53         // Wait for new events.
54         Wait() (interface{}, error)
55         // IsStop checks if Wait finished because of request watcher's stop.
56         IsStop(n interface{}, err error) bool
57 }
58
59 // encode Event to native representation. Implementation is to be provided by
60 // platform specific implementation.
61 var encode func(Event, bool) int64
62
63 var (
64         // nat2not matches native events to notify's ones. To be initialized by
65         // platform dependent implementation.
66         nat2not map[Event]Event
67         // not2nat matches notify's events to native ones. To be initialized by
68         // platform dependent implementation.
69         not2nat map[Event]Event
70 )
71
72 // trg is a main structure implementing watcher.
73 type trg struct {
74         sync.Mutex
75         // s is a channel used to stop monitoring.
76         s chan struct{}
77         // c is a channel used to pass events further.
78         c chan<- EventInfo
79         // pthLkp is a data structure mapping file names with data about watching
80         // represented by them files/directories.
81         pthLkp map[string]*watched
82         // t is a platform dependent implementation of trigger.
83         t trigger
84 }
85
86 // newWatcher returns new watcher's implementation.
87 func newWatcher(c chan<- EventInfo) watcher {
88         t := &trg{
89                 s:      make(chan struct{}, 1),
90                 pthLkp: make(map[string]*watched, 0),
91                 c:      c,
92         }
93         t.t = newTrigger(t.pthLkp)
94         if err := t.t.Init(); err != nil {
95                 panic(err)
96         }
97         go t.monitor()
98         return t
99 }
100
101 // Close implements watcher.
102 func (t *trg) Close() (err error) {
103         t.Lock()
104         if err = t.t.Stop(); err != nil {
105                 t.Unlock()
106                 return
107         }
108         <-t.s
109         var e error
110         for _, w := range t.pthLkp {
111                 if e = t.unwatch(w.p, w.fi); e != nil {
112                         dbgprintf("trg: unwatch %q failed: %q\n", w.p, e)
113                         err = nonil(err, e)
114                 }
115         }
116         if e = t.t.Close(); e != nil {
117                 dbgprintf("trg: closing native watch failed: %q\n", e)
118                 err = nonil(err, e)
119         }
120         t.Unlock()
121         return
122 }
123
124 // send reported events one by one through chan.
125 func (t *trg) send(evn []event) {
126         for i := range evn {
127                 t.c <- &evn[i]
128         }
129 }
130
131 // singlewatch starts to watch given p file/directory.
132 func (t *trg) singlewatch(p string, e Event, direct mode, fi os.FileInfo) (err error) {
133         w, ok := t.pthLkp[p]
134         if !ok {
135                 if w, err = t.t.NewWatched(p, fi); err != nil {
136                         return
137                 }
138         }
139         switch direct {
140         case dir:
141                 w.eDir |= e
142         case ndir:
143                 w.eNonDir |= e
144         case both:
145                 w.eDir |= e
146                 w.eNonDir |= e
147         }
148         if err = t.t.Watch(fi, w, encode(w.eDir|w.eNonDir, fi.IsDir())); err != nil {
149                 return
150         }
151         if !ok {
152                 t.t.Record(w)
153                 return nil
154         }
155         return errAlreadyWatched
156 }
157
158 // decode converts event received from native to notify.Event
159 // representation taking into account requested events (w).
160 func decode(o int64, w Event) (e Event) {
161         for f, n := range nat2not {
162                 if o&int64(f) != 0 {
163                         if w&f != 0 {
164                                 e |= f
165                         }
166                         if w&n != 0 {
167                                 e |= n
168                         }
169                 }
170         }
171
172         return
173 }
174
175 func (t *trg) watch(p string, e Event, fi os.FileInfo) error {
176         if err := t.singlewatch(p, e, dir, fi); err != nil {
177                 if err != errAlreadyWatched {
178                         return nil
179                 }
180         }
181         if fi.IsDir() {
182                 err := t.walk(p, func(fi os.FileInfo) (err error) {
183                         if err = t.singlewatch(filepath.Join(p, fi.Name()), e, ndir,
184                                 fi); err != nil {
185                                 if err != errAlreadyWatched {
186                                         return
187                                 }
188                         }
189                         return nil
190                 })
191                 if err != nil {
192                         return err
193                 }
194         }
195         return nil
196 }
197
198 // walk runs f func on each file/dir from p directory.
199 func (t *trg) walk(p string, fn func(os.FileInfo) error) error {
200         fp, err := os.Open(p)
201         if err != nil {
202                 return err
203         }
204         ls, err := fp.Readdir(0)
205         fp.Close()
206         if err != nil {
207                 return err
208         }
209         for i := range ls {
210                 if err := fn(ls[i]); err != nil {
211                         return err
212                 }
213         }
214         return nil
215 }
216
217 func (t *trg) unwatch(p string, fi os.FileInfo) error {
218         if fi.IsDir() {
219                 err := t.walk(p, func(fi os.FileInfo) error {
220                         err := t.singleunwatch(filepath.Join(p, fi.Name()), ndir)
221                         if err != errNotWatched {
222                                 return err
223                         }
224                         return nil
225                 })
226                 if err != nil {
227                         return err
228                 }
229         }
230         return t.singleunwatch(p, dir)
231 }
232
233 // Watch implements Watcher interface.
234 func (t *trg) Watch(p string, e Event) error {
235         fi, err := os.Stat(p)
236         if err != nil {
237                 return err
238         }
239         t.Lock()
240         err = t.watch(p, e, fi)
241         t.Unlock()
242         return err
243 }
244
245 // Unwatch implements Watcher interface.
246 func (t *trg) Unwatch(p string) error {
247         fi, err := os.Stat(p)
248         if err != nil {
249                 return err
250         }
251         t.Lock()
252         err = t.unwatch(p, fi)
253         t.Unlock()
254         return err
255 }
256
257 // Rewatch implements Watcher interface.
258 //
259 // TODO(rjeczalik): This is a naive hack. Rewrite might help.
260 func (t *trg) Rewatch(p string, _, e Event) error {
261         fi, err := os.Stat(p)
262         if err != nil {
263                 return err
264         }
265         t.Lock()
266         if err = t.unwatch(p, fi); err == nil {
267                 // TODO(rjeczalik): If watch fails then we leave trigger in inconsistent
268                 // state. Handle? Panic? Native version of rewatch?
269                 err = t.watch(p, e, fi)
270         }
271         t.Unlock()
272         return nil
273 }
274
275 func (*trg) file(w *watched, n interface{}, e Event) (evn []event) {
276         evn = append(evn, event{w.p, e, w.fi.IsDir(), n})
277         return
278 }
279
280 func (t *trg) dir(w *watched, n interface{}, e, ge Event) (evn []event) {
281         // If it's dir and delete we have to send it and continue, because
282         // other processing relies on opening (in this case not existing) dir.
283         // Events for contents of this dir are reported by native impl.
284         // However events for rename must be generated for all monitored files
285         // inside of moved directory, because native impl does not report it independently
286         // for each file descriptor being moved in result of move action on
287         // parent directory.
288         if (ge & (not2nat[Rename] | not2nat[Remove])) != 0 {
289                 // Write is reported also for Remove on directory. Because of that
290                 // we have to filter it out explicitly.
291                 evn = append(evn, event{w.p, e & ^Write & ^not2nat[Write], true, n})
292                 if ge&not2nat[Rename] != 0 {
293                         for p := range t.pthLkp {
294                                 if strings.HasPrefix(p, w.p+string(os.PathSeparator)) {
295                                         if err := t.singleunwatch(p, both); err != nil && err != errNotWatched &&
296                                                 !os.IsNotExist(err) {
297                                                 dbgprintf("trg: failed stop watching moved file (%q): %q\n",
298                                                         p, err)
299                                         }
300                                         if (w.eDir|w.eNonDir)&(not2nat[Rename]|Rename) != 0 {
301                                                 evn = append(evn, event{
302                                                         p, (w.eDir | w.eNonDir) & e &^ Write &^ not2nat[Write],
303                                                         w.fi.IsDir(), nil,
304                                                 })
305                                         }
306                                 }
307                         }
308                 }
309                 t.t.Del(w)
310                 return
311         }
312         if (ge & not2nat[Write]) != 0 {
313                 switch err := t.walk(w.p, func(fi os.FileInfo) error {
314                         p := filepath.Join(w.p, fi.Name())
315                         switch err := t.singlewatch(p, w.eDir, ndir, fi); {
316                         case os.IsNotExist(err) && ((w.eDir & Remove) != 0):
317                                 evn = append(evn, event{p, Remove, fi.IsDir(), n})
318                         case err == errAlreadyWatched:
319                         case err != nil:
320                                 dbgprintf("trg: watching %q failed: %q", p, err)
321                         case (w.eDir & Create) != 0:
322                                 evn = append(evn, event{p, Create, fi.IsDir(), n})
323                         default:
324                         }
325                         return nil
326                 }); {
327                 case os.IsNotExist(err):
328                         return
329                 case err != nil:
330                         dbgprintf("trg: dir processing failed: %q", err)
331                 default:
332                 }
333         }
334         return
335 }
336
337 type mode uint
338
339 const (
340         dir mode = iota
341         ndir
342         both
343 )
344
345 // unwatch stops watching p file/directory.
346 func (t *trg) singleunwatch(p string, direct mode) error {
347         w, ok := t.pthLkp[p]
348         if !ok {
349                 return errNotWatched
350         }
351         switch direct {
352         case dir:
353                 w.eDir = 0
354         case ndir:
355                 w.eNonDir = 0
356         case both:
357                 w.eDir, w.eNonDir = 0, 0
358         }
359         if err := t.t.Unwatch(w); err != nil {
360                 return err
361         }
362         if w.eNonDir|w.eDir != 0 {
363                 mod := dir
364                 if w.eNonDir == 0 {
365                         mod = ndir
366                 }
367                 if err := t.singlewatch(p, w.eNonDir|w.eDir, mod,
368                         w.fi); err != nil && err != errAlreadyWatched {
369                         return err
370                 }
371         } else {
372                 t.t.Del(w)
373         }
374         return nil
375 }
376
377 func (t *trg) monitor() {
378         var (
379                 n   interface{}
380                 err error
381         )
382         for {
383                 switch n, err = t.t.Wait(); {
384                 case err == syscall.EINTR:
385                 case t.t.IsStop(n, err):
386                         t.s <- struct{}{}
387                         return
388                 case err != nil:
389                         dbgprintf("trg: failed to read events: %q\n", err)
390                 default:
391                         t.send(t.process(n))
392                 }
393         }
394 }
395
396 // process event returned by native call.
397 func (t *trg) process(n interface{}) (evn []event) {
398         t.Lock()
399         w, ge, err := t.t.Watched(n)
400         if err != nil {
401                 t.Unlock()
402                 dbgprintf("trg: %v event lookup failed: %q", Event(ge), err)
403                 return
404         }
405
406         e := decode(ge, w.eDir|w.eNonDir)
407         if ge&int64(not2nat[Remove]|not2nat[Rename]) == 0 {
408                 switch fi, err := os.Stat(w.p); {
409                 case err != nil:
410                 default:
411                         if err = t.t.Watch(fi, w, encode(w.eDir|w.eNonDir, fi.IsDir())); err != nil {
412                                 dbgprintf("trg: %q is no longer watched: %q", w.p, err)
413                                 t.t.Del(w)
414                         }
415                 }
416         }
417         if e == Event(0) && (!w.fi.IsDir() || (ge&int64(not2nat[Write])) == 0) {
418                 t.Unlock()
419                 return
420         }
421
422         if w.fi.IsDir() {
423                 evn = append(evn, t.dir(w, n, e, Event(ge))...)
424         } else {
425                 evn = append(evn, t.file(w, n, e)...)
426         }
427         if Event(ge)&(not2nat[Remove]|not2nat[Rename]) != 0 {
428                 t.t.Del(w)
429         }
430         t.Unlock()
431         return
432 }