2 Pub-Sub in go with event caching
9 . "github.com/tendermint/tmlibs/common"
12 // Generic event data can be typed and registered with tendermint/go-wire
13 // via concrete implementation of this interface
14 type EventData interface {
18 // reactors and other modules should export
19 // this interface to become eventable
20 type Eventable interface {
21 SetEventSwitch(evsw EventSwitch)
24 // an event switch or cache implements fireable
25 type Fireable interface {
26 FireEvent(event string, data EventData)
29 type EventSwitch interface {
33 AddListenerForEvent(listenerID, event string, cb EventCallback)
34 RemoveListenerForEvent(event string, listenerID string)
35 RemoveListener(listenerID string)
38 type eventSwitch struct {
42 eventCells map[string]*eventCell
43 listeners map[string]*eventListener
46 func NewEventSwitch() EventSwitch {
47 evsw := &eventSwitch{}
48 evsw.BaseService = *NewBaseService(nil, "EventSwitch", evsw)
52 func (evsw *eventSwitch) OnStart() error {
53 evsw.BaseService.OnStart()
54 evsw.eventCells = make(map[string]*eventCell)
55 evsw.listeners = make(map[string]*eventListener)
59 func (evsw *eventSwitch) OnStop() {
61 defer evsw.mtx.Unlock()
62 evsw.BaseService.OnStop()
67 func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) {
68 // Get/Create eventCell and listener
70 eventCell := evsw.eventCells[event]
72 eventCell = newEventCell()
73 evsw.eventCells[event] = eventCell
75 listener := evsw.listeners[listenerID]
77 listener = newEventListener(listenerID)
78 evsw.listeners[listenerID] = listener
82 // Add event and listener
83 eventCell.AddListener(listenerID, cb)
84 listener.AddEvent(event)
87 func (evsw *eventSwitch) RemoveListener(listenerID string) {
88 // Get and remove listener
90 listener := evsw.listeners[listenerID]
97 delete(evsw.listeners, listenerID)
100 // Remove callback for each event.
101 listener.SetRemoved()
102 for _, event := range listener.GetEvents() {
103 evsw.RemoveListenerForEvent(event, listenerID)
107 func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
110 eventCell := evsw.eventCells[event]
113 if eventCell == nil {
117 // Remove listenerID from eventCell
118 numListeners := eventCell.RemoveListener(listenerID)
120 // Maybe garbage collect eventCell.
121 if numListeners == 0 {
122 // Lock again and double check.
123 evsw.mtx.Lock() // OUTER LOCK
124 eventCell.mtx.Lock() // INNER LOCK
125 if len(eventCell.listeners) == 0 {
126 delete(evsw.eventCells, event)
128 eventCell.mtx.Unlock() // INNER LOCK
129 evsw.mtx.Unlock() // OUTER LOCK
133 func (evsw *eventSwitch) FireEvent(event string, data EventData) {
136 eventCell := evsw.eventCells[event]
139 if eventCell == nil {
143 // Fire event for all listeners in eventCell
144 eventCell.FireEvent(data)
147 //-----------------------------------------------------------------------------
149 // eventCell handles keeping track of listener callbacks for a given event.
150 type eventCell struct {
152 listeners map[string]EventCallback
155 func newEventCell() *eventCell {
157 listeners: make(map[string]EventCallback),
161 func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
163 cell.listeners[listenerID] = cb
167 func (cell *eventCell) RemoveListener(listenerID string) int {
169 delete(cell.listeners, listenerID)
170 numListeners := len(cell.listeners)
175 func (cell *eventCell) FireEvent(data EventData) {
177 for _, listener := range cell.listeners {
183 //-----------------------------------------------------------------------------
185 type EventCallback func(data EventData)
187 type eventListener struct {
195 func newEventListener(id string) *eventListener {
196 return &eventListener{
203 func (evl *eventListener) AddEvent(event string) {
205 defer evl.mtx.Unlock()
210 evl.events = append(evl.events, event)
213 func (evl *eventListener) GetEvents() []string {
215 defer evl.mtx.RUnlock()
217 events := make([]string, len(evl.events))
218 copy(events, evl.events)
222 func (evl *eventListener) SetRemoved() {
224 defer evl.mtx.Unlock()