OSDN Git Service

Merge pull request #201 from Bytom/v0.1
[bytom/vapor.git] / event / event_test.go
diff --git a/event/event_test.go b/event/event_test.go
new file mode 100644 (file)
index 0000000..916d40a
--- /dev/null
@@ -0,0 +1,194 @@
+package event
+
+import (
+       "math/rand"
+       "sync"
+       "testing"
+       "time"
+)
+
+type testEvent int
+
+func TestSubCloseUnsub(t *testing.T) {
+       // the point of this test is **not** to panic
+       var mux Dispatcher
+       mux.Stop()
+       sub, _ := mux.Subscribe(int(0))
+       sub.Unsubscribe()
+}
+
+func TestSub(t *testing.T) {
+       mux := NewDispatcher()
+       defer mux.Stop()
+
+       sub, _ := mux.Subscribe(testEvent(0))
+       go func() {
+               if err := mux.Post(testEvent(5)); err != nil {
+                       t.Errorf("Post returned unexpected error: %v", err)
+               }
+       }()
+       ev := <-sub.Chan()
+
+       if ev.Data.(testEvent) != testEvent(5) {
+               t.Errorf("Got %v (%T), expected event %v (%T)",
+                       ev, ev, testEvent(5), testEvent(5))
+       }
+}
+
+func TestMuxErrorAfterStop(t *testing.T) {
+       mux := NewDispatcher()
+       mux.Stop()
+
+       sub, _ := mux.Subscribe(testEvent(0))
+       if _, isopen := <-sub.Chan(); isopen {
+               t.Errorf("subscription channel was not closed")
+       }
+       if err := mux.Post(testEvent(0)); err != ErrMuxClosed {
+               t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed)
+       }
+}
+
+func TestUnsubscribeUnblockPost(t *testing.T) {
+       mux := NewDispatcher()
+       defer mux.Stop()
+
+       sub, _ := mux.Subscribe(testEvent(0))
+       unblocked := make(chan bool)
+       go func() {
+               mux.Post(testEvent(5))
+               unblocked <- true
+       }()
+
+       select {
+       case <-unblocked:
+               t.Errorf("Post returned before Unsubscribe")
+       default:
+               sub.Unsubscribe()
+               <-unblocked
+       }
+}
+
+func TestSubscribeDuplicateType(t *testing.T) {
+       mux := NewDispatcher()
+       if _, err := mux.Subscribe(testEvent(1), testEvent(2)); err != ErrDuplicateSubscribe {
+               t.Fatal("Subscribe didn't error for duplicate type")
+       }
+}
+
+func TestMuxConcurrent(t *testing.T) {
+       rand.Seed(time.Now().Unix())
+       mux := NewDispatcher()
+       defer mux.Stop()
+
+       recv := make(chan int)
+       poster := func() {
+               for {
+                       err := mux.Post(testEvent(0))
+                       if err != nil {
+                               return
+                       }
+               }
+       }
+       sub := func(i int) {
+               time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
+               sub, _ := mux.Subscribe(testEvent(0))
+               <-sub.Chan()
+               sub.Unsubscribe()
+               recv <- i
+       }
+
+       go poster()
+       go poster()
+       go poster()
+       nsubs := 1000
+       for i := 0; i < nsubs; i++ {
+               go sub(i)
+       }
+
+       // wait until everyone has been served
+       counts := make(map[int]int, nsubs)
+       for i := 0; i < nsubs; i++ {
+               counts[<-recv]++
+       }
+       for i, count := range counts {
+               if count != 1 {
+                       t.Errorf("receiver %d called %d times, expected only 1 call", i, count)
+               }
+       }
+}
+
+func emptySubscriber(mux *Dispatcher) {
+       s, _ := mux.Subscribe(testEvent(0))
+       go func() {
+               for range s.Chan() {
+               }
+       }()
+}
+
+func BenchmarkPost1000(b *testing.B) {
+       var (
+               mux              = NewDispatcher()
+               subscribed, done sync.WaitGroup
+               nsubs            = 1000
+       )
+       subscribed.Add(nsubs)
+       done.Add(nsubs)
+       for i := 0; i < nsubs; i++ {
+               go func() {
+                       s, _ := mux.Subscribe(testEvent(0))
+                       subscribed.Done()
+                       for range s.Chan() {
+                       }
+                       done.Done()
+               }()
+       }
+       subscribed.Wait()
+
+       // The actual benchmark.
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               mux.Post(testEvent(0))
+       }
+
+       b.StopTimer()
+       mux.Stop()
+       done.Wait()
+}
+
+func BenchmarkPostConcurrent(b *testing.B) {
+       var mux = NewDispatcher()
+       defer mux.Stop()
+       emptySubscriber(mux)
+       emptySubscriber(mux)
+       emptySubscriber(mux)
+
+       var wg sync.WaitGroup
+       poster := func() {
+               for i := 0; i < b.N; i++ {
+                       mux.Post(testEvent(0))
+               }
+               wg.Done()
+       }
+       wg.Add(5)
+       for i := 0; i < 5; i++ {
+               go poster()
+       }
+       wg.Wait()
+}
+
+// for comparison
+func BenchmarkChanSend(b *testing.B) {
+       c := make(chan interface{})
+       closed := make(chan struct{})
+       go func() {
+               for range c {
+               }
+       }()
+
+       for i := 0; i < b.N; i++ {
+               select {
+               case c <- i:
+               case <-closed:
+               }
+       }
+}