--- /dev/null
+package event
+
+import (
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+)
+
+type testEvent int
+
+func TestSubCloseUnsub(t *testing.T) {
+ // the point of this test is **not** to panic
+ var mux Dispatcher
+ mux.Stop()
+ sub, _ := mux.Subscribe(int(0))
+ sub.Unsubscribe()
+}
+
+func TestSub(t *testing.T) {
+ mux := NewDispatcher()
+ defer mux.Stop()
+
+ sub, _ := mux.Subscribe(testEvent(0))
+ go func() {
+ if err := mux.Post(testEvent(5)); err != nil {
+ t.Errorf("Post returned unexpected error: %v", err)
+ }
+ }()
+ ev := <-sub.Chan()
+
+ if ev.Data.(testEvent) != testEvent(5) {
+ t.Errorf("Got %v (%T), expected event %v (%T)",
+ ev, ev, testEvent(5), testEvent(5))
+ }
+}
+
+func TestMuxErrorAfterStop(t *testing.T) {
+ mux := NewDispatcher()
+ mux.Stop()
+
+ sub, _ := mux.Subscribe(testEvent(0))
+ if _, isopen := <-sub.Chan(); isopen {
+ t.Errorf("subscription channel was not closed")
+ }
+ if err := mux.Post(testEvent(0)); err != ErrMuxClosed {
+ t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed)
+ }
+}
+
+func TestUnsubscribeUnblockPost(t *testing.T) {
+ mux := NewDispatcher()
+ defer mux.Stop()
+
+ sub, _ := mux.Subscribe(testEvent(0))
+ unblocked := make(chan bool)
+ go func() {
+ mux.Post(testEvent(5))
+ unblocked <- true
+ }()
+
+ select {
+ case <-unblocked:
+ t.Errorf("Post returned before Unsubscribe")
+ default:
+ sub.Unsubscribe()
+ <-unblocked
+ }
+}
+
+func TestSubscribeDuplicateType(t *testing.T) {
+ mux := NewDispatcher()
+ if _, err := mux.Subscribe(testEvent(1), testEvent(2)); err != ErrDuplicateSubscribe {
+ t.Fatal("Subscribe didn't error for duplicate type")
+ }
+}
+
+func TestMuxConcurrent(t *testing.T) {
+ rand.Seed(time.Now().Unix())
+ mux := NewDispatcher()
+ defer mux.Stop()
+
+ recv := make(chan int)
+ poster := func() {
+ for {
+ err := mux.Post(testEvent(0))
+ if err != nil {
+ return
+ }
+ }
+ }
+ sub := func(i int) {
+ time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
+ sub, _ := mux.Subscribe(testEvent(0))
+ <-sub.Chan()
+ sub.Unsubscribe()
+ recv <- i
+ }
+
+ go poster()
+ go poster()
+ go poster()
+ nsubs := 1000
+ for i := 0; i < nsubs; i++ {
+ go sub(i)
+ }
+
+ // wait until everyone has been served
+ counts := make(map[int]int, nsubs)
+ for i := 0; i < nsubs; i++ {
+ counts[<-recv]++
+ }
+ for i, count := range counts {
+ if count != 1 {
+ t.Errorf("receiver %d called %d times, expected only 1 call", i, count)
+ }
+ }
+}
+
+func emptySubscriber(mux *Dispatcher) {
+ s, _ := mux.Subscribe(testEvent(0))
+ go func() {
+ for range s.Chan() {
+ }
+ }()
+}
+
+func BenchmarkPost1000(b *testing.B) {
+ var (
+ mux = NewDispatcher()
+ subscribed, done sync.WaitGroup
+ nsubs = 1000
+ )
+ subscribed.Add(nsubs)
+ done.Add(nsubs)
+ for i := 0; i < nsubs; i++ {
+ go func() {
+ s, _ := mux.Subscribe(testEvent(0))
+ subscribed.Done()
+ for range s.Chan() {
+ }
+ done.Done()
+ }()
+ }
+ subscribed.Wait()
+
+ // The actual benchmark.
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ mux.Post(testEvent(0))
+ }
+
+ b.StopTimer()
+ mux.Stop()
+ done.Wait()
+}
+
+func BenchmarkPostConcurrent(b *testing.B) {
+ var mux = NewDispatcher()
+ defer mux.Stop()
+ emptySubscriber(mux)
+ emptySubscriber(mux)
+ emptySubscriber(mux)
+
+ var wg sync.WaitGroup
+ poster := func() {
+ for i := 0; i < b.N; i++ {
+ mux.Post(testEvent(0))
+ }
+ wg.Done()
+ }
+ wg.Add(5)
+ for i := 0; i < 5; i++ {
+ go poster()
+ }
+ wg.Wait()
+}
+
+// for comparison
+func BenchmarkChanSend(b *testing.B) {
+ c := make(chan interface{})
+ closed := make(chan struct{})
+ go func() {
+ for range c {
+ }
+ }()
+
+ for i := 0; i < b.N; i++ {
+ select {
+ case c <- i:
+ case <-closed:
+ }
+ }
+}