OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / rjeczalik / notify / watcher_inotify.go
1 // Copyright (c) 2014-2015 The Notify Authors. All rights reserved.
2 // Use of this source code is governed by the MIT license that can be
3 // found in the LICENSE file.
4
5 // +build linux
6
7 package notify
8
9 import (
10         "bytes"
11         "errors"
12         "path/filepath"
13         "runtime"
14         "sync"
15         "sync/atomic"
16         "unsafe"
17
18         "golang.org/x/sys/unix"
19 )
20
21 // eventBufferSize defines the size of the buffer given to read(2) function. One
22 // should not depend on this value, since it was arbitrary chosen and may be
23 // changed in the future.
24 const eventBufferSize = 64 * (unix.SizeofInotifyEvent + unix.PathMax + 1)
25
26 // consumersCount defines the number of consumers in producer-consumer based
27 // implementation. Each consumer is run in a separate goroutine and has read
28 // access to watched files map.
29 const consumersCount = 2
30
31 const invalidDescriptor = -1
32
33 // watched is a pair of file path and inotify mask used as a value in
34 // watched files map.
35 type watched struct {
36         path string
37         mask uint32
38 }
39
40 // inotify implements Watcher interface.
41 type inotify struct {
42         sync.RWMutex                       // protects inotify.m map
43         m            map[int32]*watched    // watch descriptor to watched object
44         fd           int32                 // inotify file descriptor
45         pipefd       []int                 // pipe's read and write descriptors
46         epfd         int                   // epoll descriptor
47         epes         []unix.EpollEvent     // epoll events
48         buffer       [eventBufferSize]byte // inotify event buffer
49         wg           sync.WaitGroup        // wait group used to close main loop
50         c            chan<- EventInfo      // event dispatcher channel
51 }
52
53 // NewWatcher creates new non-recursive inotify backed by inotify.
54 func newWatcher(c chan<- EventInfo) watcher {
55         i := &inotify{
56                 m:      make(map[int32]*watched),
57                 fd:     invalidDescriptor,
58                 pipefd: []int{invalidDescriptor, invalidDescriptor},
59                 epfd:   invalidDescriptor,
60                 epes:   make([]unix.EpollEvent, 0),
61                 c:      c,
62         }
63         runtime.SetFinalizer(i, func(i *inotify) {
64                 i.epollclose()
65                 if i.fd != invalidDescriptor {
66                         unix.Close(int(i.fd))
67                 }
68         })
69         return i
70 }
71
72 // Watch implements notify.watcher interface.
73 func (i *inotify) Watch(path string, e Event) error {
74         return i.watch(path, e)
75 }
76
77 // Rewatch implements notify.watcher interface.
78 func (i *inotify) Rewatch(path string, _, newevent Event) error {
79         return i.watch(path, newevent)
80 }
81
82 // watch adds a new watcher to the set of watched objects or modifies the existing
83 // one. If called for the first time, this function initializes inotify filesystem
84 // monitor and starts producer-consumers goroutines.
85 func (i *inotify) watch(path string, e Event) (err error) {
86         if e&^(All|Event(unix.IN_ALL_EVENTS)) != 0 {
87                 return errors.New("notify: unknown event")
88         }
89         if err = i.lazyinit(); err != nil {
90                 return
91         }
92         iwd, err := unix.InotifyAddWatch(int(i.fd), path, encode(e))
93         if err != nil {
94                 return
95         }
96         i.RLock()
97         wd := i.m[int32(iwd)]
98         i.RUnlock()
99         if wd == nil {
100                 i.Lock()
101                 if i.m[int32(iwd)] == nil {
102                         i.m[int32(iwd)] = &watched{path: path, mask: uint32(e)}
103                 }
104                 i.Unlock()
105         } else {
106                 i.Lock()
107                 wd.mask = uint32(e)
108                 i.Unlock()
109         }
110         return nil
111 }
112
113 // lazyinit sets up all required file descriptors and starts 1+consumersCount
114 // goroutines. The producer goroutine blocks until file-system notifications
115 // occur. Then, all events are read from system buffer and sent to consumer
116 // goroutines which construct valid notify events. This method uses
117 // Double-Checked Locking optimization.
118 func (i *inotify) lazyinit() error {
119         if atomic.LoadInt32(&i.fd) == invalidDescriptor {
120                 i.Lock()
121                 defer i.Unlock()
122                 if atomic.LoadInt32(&i.fd) == invalidDescriptor {
123                         fd, err := unix.InotifyInit1(unix.IN_CLOEXEC)
124                         if err != nil {
125                                 return err
126                         }
127                         i.fd = int32(fd)
128                         if err = i.epollinit(); err != nil {
129                                 _, _ = i.epollclose(), unix.Close(int(fd)) // Ignore errors.
130                                 i.fd = invalidDescriptor
131                                 return err
132                         }
133                         esch := make(chan []*event)
134                         go i.loop(esch)
135                         i.wg.Add(consumersCount)
136                         for n := 0; n < consumersCount; n++ {
137                                 go i.send(esch)
138                         }
139                 }
140         }
141         return nil
142 }
143
144 // epollinit opens an epoll file descriptor and creates a pipe which will be
145 // used to wake up the epoll_wait(2) function. Then, file descriptor associated
146 // with inotify event queue and the read end of the pipe are added to epoll set.
147 // Note that `fd` member must be set before this function is called.
148 func (i *inotify) epollinit() (err error) {
149         if i.epfd, err = unix.EpollCreate1(0); err != nil {
150                 return
151         }
152         if err = unix.Pipe(i.pipefd); err != nil {
153                 return
154         }
155         i.epes = []unix.EpollEvent{
156                 {Events: unix.EPOLLIN, Fd: i.fd},
157                 {Events: unix.EPOLLIN, Fd: int32(i.pipefd[0])},
158         }
159         if err = unix.EpollCtl(i.epfd, unix.EPOLL_CTL_ADD, int(i.fd), &i.epes[0]); err != nil {
160                 return
161         }
162         return unix.EpollCtl(i.epfd, unix.EPOLL_CTL_ADD, i.pipefd[0], &i.epes[1])
163 }
164
165 // epollclose closes the file descriptor created by the call to epoll_create(2)
166 // and two file descriptors opened by pipe(2) function.
167 func (i *inotify) epollclose() (err error) {
168         if i.epfd != invalidDescriptor {
169                 if err = unix.Close(i.epfd); err == nil {
170                         i.epfd = invalidDescriptor
171                 }
172         }
173         for n, fd := range i.pipefd {
174                 if fd != invalidDescriptor {
175                         switch e := unix.Close(fd); {
176                         case e != nil && err == nil:
177                                 err = e
178                         case e == nil:
179                                 i.pipefd[n] = invalidDescriptor
180                         }
181                 }
182         }
183         return
184 }
185
186 // loop blocks until either inotify or pipe file descriptor is ready for I/O.
187 // All read operations triggered by filesystem notifications are forwarded to
188 // one of the event's consumers. If pipe fd became ready, loop function closes
189 // all file descriptors opened by lazyinit method and returns afterwards.
190 func (i *inotify) loop(esch chan<- []*event) {
191         epes := make([]unix.EpollEvent, 1)
192         fd := atomic.LoadInt32(&i.fd)
193         for {
194                 switch _, err := unix.EpollWait(i.epfd, epes, -1); err {
195                 case nil:
196                         switch epes[0].Fd {
197                         case fd:
198                                 esch <- i.read()
199                                 epes[0].Fd = 0
200                         case int32(i.pipefd[0]):
201                                 i.Lock()
202                                 defer i.Unlock()
203                                 if err = unix.Close(int(fd)); err != nil && err != unix.EINTR {
204                                         panic("notify: close(2) error " + err.Error())
205                                 }
206                                 atomic.StoreInt32(&i.fd, invalidDescriptor)
207                                 if err = i.epollclose(); err != nil && err != unix.EINTR {
208                                         panic("notify: epollclose error " + err.Error())
209                                 }
210                                 close(esch)
211                                 return
212                         }
213                 case unix.EINTR:
214                         continue
215                 default: // We should never reach this line.
216                         panic("notify: epoll_wait(2) error " + err.Error())
217                 }
218         }
219 }
220
221 // read reads events from an inotify file descriptor. It does not handle errors
222 // returned from read(2) function since they are not critical to watcher logic.
223 func (i *inotify) read() (es []*event) {
224         n, err := unix.Read(int(i.fd), i.buffer[:])
225         if err != nil || n < unix.SizeofInotifyEvent {
226                 return
227         }
228         var sys *unix.InotifyEvent
229         nmin := n - unix.SizeofInotifyEvent
230         for pos, path := 0, ""; pos <= nmin; {
231                 sys = (*unix.InotifyEvent)(unsafe.Pointer(&i.buffer[pos]))
232                 pos += unix.SizeofInotifyEvent
233                 if path = ""; sys.Len > 0 {
234                         endpos := pos + int(sys.Len)
235                         path = string(bytes.TrimRight(i.buffer[pos:endpos], "\x00"))
236                         pos = endpos
237                 }
238                 es = append(es, &event{
239                         sys: unix.InotifyEvent{
240                                 Wd:     sys.Wd,
241                                 Mask:   sys.Mask,
242                                 Cookie: sys.Cookie,
243                         },
244                         path: path,
245                 })
246         }
247         return
248 }
249
250 // send is a consumer function which sends events to event dispatcher channel.
251 // It is run in a separate goroutine in order to not block loop method when
252 // possibly expensive write operations are performed on inotify map.
253 func (i *inotify) send(esch <-chan []*event) {
254         for es := range esch {
255                 for _, e := range i.transform(es) {
256                         if e != nil {
257                                 i.c <- e
258                         }
259                 }
260         }
261         i.wg.Done()
262 }
263
264 // transform prepares events read from inotify file descriptor for sending to
265 // user. It removes invalid events and these which are no longer present in
266 // inotify map. This method may also split one raw event into two different ones
267 // when system-dependent result is required.
268 func (i *inotify) transform(es []*event) []*event {
269         var multi []*event
270         i.RLock()
271         for idx, e := range es {
272                 if e.sys.Mask&(unix.IN_IGNORED|unix.IN_Q_OVERFLOW) != 0 {
273                         es[idx] = nil
274                         continue
275                 }
276                 wd, ok := i.m[e.sys.Wd]
277                 if !ok || e.sys.Mask&encode(Event(wd.mask)) == 0 {
278                         es[idx] = nil
279                         continue
280                 }
281                 if e.path == "" {
282                         e.path = wd.path
283                 } else {
284                         e.path = filepath.Join(wd.path, e.path)
285                 }
286                 multi = append(multi, decode(Event(wd.mask), e))
287                 if e.event == 0 {
288                         es[idx] = nil
289                 }
290         }
291         i.RUnlock()
292         es = append(es, multi...)
293         return es
294 }
295
296 // encode converts notify system-independent events to valid inotify mask
297 // which can be passed to inotify_add_watch(2) function.
298 func encode(e Event) uint32 {
299         if e&Create != 0 {
300                 e = (e ^ Create) | InCreate | InMovedTo
301         }
302         if e&Remove != 0 {
303                 e = (e ^ Remove) | InDelete | InDeleteSelf
304         }
305         if e&Write != 0 {
306                 e = (e ^ Write) | InModify
307         }
308         if e&Rename != 0 {
309                 e = (e ^ Rename) | InMovedFrom | InMoveSelf
310         }
311         return uint32(e)
312 }
313
314 // decode uses internally stored mask to distinguish whether system-independent
315 // or system-dependent event is requested. The first one is created by modifying
316 // `e` argument. decode method sets e.event value to 0 when an event should be
317 // skipped. System-dependent event is set as the function's return value which
318 // can be nil when the event should not be passed on.
319 func decode(mask Event, e *event) (syse *event) {
320         if sysmask := uint32(mask) & e.sys.Mask; sysmask != 0 {
321                 syse = &event{sys: unix.InotifyEvent{
322                         Wd:     e.sys.Wd,
323                         Mask:   e.sys.Mask,
324                         Cookie: e.sys.Cookie,
325                 }, event: Event(sysmask), path: e.path}
326         }
327         imask := encode(mask)
328         switch {
329         case mask&Create != 0 && imask&uint32(InCreate|InMovedTo)&e.sys.Mask != 0:
330                 e.event = Create
331         case mask&Remove != 0 && imask&uint32(InDelete|InDeleteSelf)&e.sys.Mask != 0:
332                 e.event = Remove
333         case mask&Write != 0 && imask&uint32(InModify)&e.sys.Mask != 0:
334                 e.event = Write
335         case mask&Rename != 0 && imask&uint32(InMovedFrom|InMoveSelf)&e.sys.Mask != 0:
336                 e.event = Rename
337         default:
338                 e.event = 0
339         }
340         return
341 }
342
343 // Unwatch implements notify.watcher interface. It looks for watch descriptor
344 // related to registered path and if found, calls inotify_rm_watch(2) function.
345 // This method is allowed to return EINVAL error when concurrently requested to
346 // delete identical path.
347 func (i *inotify) Unwatch(path string) (err error) {
348         iwd := int32(invalidDescriptor)
349         i.RLock()
350         for iwdkey, wd := range i.m {
351                 if wd.path == path {
352                         iwd = iwdkey
353                         break
354                 }
355         }
356         i.RUnlock()
357         if iwd == invalidDescriptor {
358                 return errors.New("notify: path " + path + " is already watched")
359         }
360         fd := atomic.LoadInt32(&i.fd)
361         if _, err = unix.InotifyRmWatch(int(fd), uint32(iwd)); err != nil {
362                 return
363         }
364         i.Lock()
365         delete(i.m, iwd)
366         i.Unlock()
367         return nil
368 }
369
370 // Close implements notify.watcher interface. It removes all existing watch
371 // descriptors and wakes up producer goroutine by sending data to the write end
372 // of the pipe. The function waits for a signal from producer which means that
373 // all operations on current monitoring instance are done.
374 func (i *inotify) Close() (err error) {
375         i.Lock()
376         if fd := atomic.LoadInt32(&i.fd); fd == invalidDescriptor {
377                 i.Unlock()
378                 return nil
379         }
380         for iwd := range i.m {
381                 if _, e := unix.InotifyRmWatch(int(i.fd), uint32(iwd)); e != nil && err == nil {
382                         err = e
383                 }
384                 delete(i.m, iwd)
385         }
386         switch _, errwrite := unix.Write(i.pipefd[1], []byte{0x00}); {
387         case errwrite != nil && err == nil:
388                 err = errwrite
389                 fallthrough
390         case errwrite != nil:
391                 i.Unlock()
392         default:
393                 i.Unlock()
394                 i.wg.Wait()
395         }
396         return
397 }