OSDN Git Service

fk
[bytom/vapor.git] / toolbar / precog / monitor / monitor.go
1 package monitor
2
3 import (
4         // "encoding/binary"
5         // "encoding/hex"
6         "fmt"
7         "io/ioutil"
8         "os"
9         // "os/user"
10         "strings"
11         "time"
12
13         "github.com/jinzhu/gorm"
14         log "github.com/sirupsen/logrus"
15         // dbm "github.com/vapor/database/leveldb"
16
17         vaporCfg "github.com/vapor/config"
18         "github.com/vapor/crypto/ed25519/chainkd"
19         dbm "github.com/vapor/database/leveldb"
20         "github.com/vapor/event"
21         "github.com/vapor/p2p"
22         // conn "github.com/vapor/p2p/connection"
23         "github.com/vapor/netsync/chainmgr"
24         "github.com/vapor/netsync/consensusmgr"
25         "github.com/vapor/netsync/peers"
26         "github.com/vapor/p2p/discover/dht"
27         "github.com/vapor/p2p/discover/mdns"
28         "github.com/vapor/p2p/signlib"
29         "github.com/vapor/test/mock"
30         "github.com/vapor/toolbar/precog/config"
31 )
32
33 type monitor struct {
34         cfg     *config.Config
35         db      *gorm.DB
36         nodeCfg *vaporCfg.Config
37         sw      *p2p.Switch
38         discvCh chan *dht.Node
39         privKey chainkd.XPrv
40         chain   *mock.Chain
41         txPool  *mock.Mempool
42 }
43
44 // TODO: set myself as SPV?
45 func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
46         //TODO: for test
47         cfg.CheckFreqSeconds = 1
48
49         // TODO: fix dir
50         tmpDir, err := ioutil.TempDir(".", "vpPrecog")
51         if err != nil {
52                 log.Fatalf("failed to create temporary data folder: %v", err)
53         }
54
55         nodeCfg := &vaporCfg.Config{
56                 BaseConfig: vaporCfg.DefaultBaseConfig(),
57                 P2P:        vaporCfg.DefaultP2PConfig(),
58                 Federation: vaporCfg.DefaultFederationConfig(),
59         }
60         nodeCfg.DBPath = tmpDir
61         nodeCfg.ChainID = "mainnet"
62         discvCh := make(chan *dht.Node)
63         privKey, err := signlib.NewPrivKey()
64         if err != nil {
65                 log.Fatal(err)
66         }
67
68         chain, txPool, err := mockChainAndPool()
69         if err != nil {
70                 log.Fatal(err)
71         }
72
73         return &monitor{
74                 cfg:     cfg,
75                 db:      db,
76                 nodeCfg: nodeCfg,
77                 discvCh: discvCh,
78                 privKey: privKey.(chainkd.XPrv),
79                 chain:   chain,
80                 txPool:  txPool,
81         }
82 }
83
84 func (m *monitor) Run() {
85         defer os.RemoveAll(m.nodeCfg.DBPath)
86
87         var seeds []string
88         for _, node := range m.cfg.Nodes {
89                 seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port))
90                 if err := m.upSertNode(&node); err != nil {
91                         log.Error(err)
92                 }
93         }
94         m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",")
95         if err := m.makeSwitch(); err != nil {
96                 log.Fatal(err)
97         }
98
99         go m.discoveryRoutine()
100         go m.collectDiscoveredNodes()
101         go m.connectNodesRoutine()
102         go m.checkStatusRoutine()
103 }
104
105 func (m *monitor) makeSwitch() error {
106         l, listenAddr := p2p.GetListener(m.nodeCfg.P2P)
107         discv, err := dht.NewDiscover(m.nodeCfg, m.privKey, l.ExternalAddress().Port, m.cfg.NetworkID)
108         if err != nil {
109                 return err
110         }
111
112         // no need for lanDiscv, but passing &mdns.LANDiscover{} will cause NilPointer
113         lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
114         sw, err := p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID)
115         if err != nil {
116                 return err
117         }
118
119         m.sw = sw
120         return nil
121 }
122
123 func (m *monitor) prepareReactors(peers *peers.PeerSet) error {
124         dispatcher := event.NewDispatcher()
125         // add ConsensusReactor for consensusChannel
126         _ = consensusmgr.NewManager(m.sw, m.chain, peers, dispatcher)
127         fastSyncDB := dbm.NewDB("fastsync", m.nodeCfg.DBBackend, m.nodeCfg.DBDir())
128         // add ProtocolReactor to handle msgs
129         _, err := chainmgr.NewManager(m.nodeCfg, m.sw, m.chain, m.txPool, dispatcher, peers, fastSyncDB)
130         if err != nil {
131                 return err
132         }
133
134         // TODO: clean up?? only start reactors??
135         m.sw.Start()
136
137         // for label, reactor := range m.sw.GetReactors() {
138         //      log.Debug("start reactor: (%s:%v)", label, reactor)
139         //      if _, err := reactor.Start(); err != nil {
140         //              return
141         //      }
142         // }
143
144         // m.sw.GetSecurity().RegisterFilter(m.sw.GetNodeInfo())
145         // m.sw.GetSecurity().RegisterFilter(m.sw.GetPeers())
146         // if err := m.sw.GetSecurity().Start(); err != nil {
147         //      return
148         // }
149
150         return nil
151 }
152
153 func (m *monitor) checkStatusRoutine() {
154         peers := peers.NewPeerSet(m.sw)
155         if err := m.prepareReactors(peers); err != nil {
156                 log.Fatal(err)
157         }
158
159         ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
160         for ; true; <-ticker.C {
161                 for _, reactor := range m.sw.GetReactors() {
162                         for _, peer := range m.sw.GetPeers().List() {
163                                 log.Debug("AddPeer %v for reactor %v", peer, reactor)
164                                 reactor.AddPeer(peer)
165                         }
166                 }
167
168                 for _, peerInfo := range peers.GetPeerInfos() {
169                         log.Info(peerInfo)
170                 }
171         }
172 }