OSDN Git Service

Merge pull request #935 from Bytom/dev
[bytom/bytom.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "strings"
5
6         log "github.com/sirupsen/logrus"
7         "github.com/tendermint/go-crypto"
8         "github.com/tendermint/go-wire"
9         cmn "github.com/tendermint/tmlibs/common"
10         dbm "github.com/tendermint/tmlibs/db"
11
12         cfg "github.com/bytom/config"
13         "github.com/bytom/p2p"
14         core "github.com/bytom/protocol"
15         "github.com/bytom/protocol/bc"
16         "github.com/bytom/version"
17 )
18
19 //SyncManager Sync Manager is responsible for the business layer information synchronization
20 type SyncManager struct {
21         networkID uint64
22         sw        *p2p.Switch
23         addrBook  *p2p.AddrBook // known peers
24
25         privKey     crypto.PrivKeyEd25519 // local node's p2p key
26         chain       *core.Chain
27         txPool      *core.TxPool
28         fetcher     *Fetcher
29         blockKeeper *blockKeeper
30         peers       *peerSet
31
32         newBlockCh    chan *bc.Hash
33         newPeerCh     chan struct{}
34         txSyncCh      chan *txsync
35         dropPeerCh    chan *string
36         quitSync      chan struct{}
37         config        *cfg.Config
38         synchronising int32
39 }
40
41 //NewSyncManager create a sync manager
42 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
43         // Create the protocol manager with the base fields
44         manager := &SyncManager{
45                 txPool:     txPool,
46                 chain:      chain,
47                 privKey:    crypto.GenPrivKeyEd25519(),
48                 config:     config,
49                 quitSync:   make(chan struct{}),
50                 newBlockCh: newBlockCh,
51                 newPeerCh:  make(chan struct{}),
52                 txSyncCh:   make(chan *txsync),
53                 dropPeerCh: make(chan *string, maxQuitReq),
54                 peers:      newPeerSet(),
55         }
56
57         trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
58         manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
59
60         manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
61         manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
62
63         protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
64         manager.sw.AddReactor("PROTOCOL", protocolReactor)
65
66         // Create & add listener
67         var listenerStatus bool
68         var l p2p.Listener
69         if !config.VaultMode {
70                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
71                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
72                 manager.sw.AddListener(l)
73         }
74         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
75         manager.sw.SetNodePrivKey(manager.privKey)
76         // Optionally, start the pex reactor
77         //var addrBook *p2p.AddrBook
78         if config.P2P.PexReactor {
79                 manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
80                 pexReactor := p2p.NewPEXReactor(manager.addrBook, manager.sw)
81                 manager.sw.AddReactor("PEX", pexReactor)
82         }
83
84         return manager, nil
85 }
86
87 // Defaults to tcp
88 func protocolAndAddress(listenAddr string) (string, string) {
89         p, address := "tcp", listenAddr
90         parts := strings.SplitN(address, "://", 2)
91         if len(parts) == 2 {
92                 p, address = parts[0], parts[1]
93         }
94         return p, address
95 }
96
97 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
98         nodeInfo := &p2p.NodeInfo{
99                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
100                 Moniker: sm.config.Moniker,
101                 Network: sm.config.ChainID,
102                 Version: version.Version,
103                 Other: []string{
104                         cmn.Fmt("wire_version=%v", wire.Version),
105                         cmn.Fmt("p2p_version=%v", p2p.Version),
106                 },
107         }
108
109         if !sm.sw.IsListening() {
110                 return nodeInfo
111         }
112
113         p2pListener := sm.sw.Listeners()[0]
114
115         // We assume that the rpcListener has the same ExternalAddress.
116         // This is probably true because both P2P and RPC listeners use UPnP,
117         // except of course if the rpc is only bound to localhost
118         if listenerStatus {
119                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
120         } else {
121                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
122         }
123         return nodeInfo
124 }
125
126 func (sm *SyncManager) netStart() error {
127         // Start the switch
128         _, err := sm.sw.Start()
129         if err != nil {
130                 return err
131         }
132
133         // If seeds exist, add them to the address book and dial out
134         if sm.config.P2P.Seeds != "" {
135                 // dial out
136                 seeds := strings.Split(sm.config.P2P.Seeds, ",")
137                 if err := sm.DialSeeds(seeds); err != nil {
138                         return err
139                 }
140         }
141         return nil
142 }
143
144 //Start start sync manager service
145 func (sm *SyncManager) Start() {
146         go sm.netStart()
147         // broadcast transactions
148         go sm.txBroadcastLoop()
149
150         // broadcast mined blocks
151         go sm.minedBroadcastLoop()
152
153         // start sync handlers
154         go sm.syncer()
155
156         go sm.txsyncLoop()
157 }
158
159 //Stop stop sync manager
160 func (sm *SyncManager) Stop() {
161         close(sm.quitSync)
162         sm.sw.Stop()
163 }
164
165 func (sm *SyncManager) txBroadcastLoop() {
166         newTxCh := sm.txPool.GetNewTxCh()
167         for {
168                 select {
169                 case newTx := <-newTxCh:
170                         peers, err := sm.peers.BroadcastTx(newTx)
171                         if err != nil {
172                                 log.Errorf("Broadcast new tx error. %v", err)
173                                 return
174                         }
175                         for _, smPeer := range peers {
176                                 if smPeer == nil {
177                                         continue
178                                 }
179                                 swPeer := smPeer.getPeer()
180                                 log.Info("Tx broadcast error. Stop Peer.")
181                                 sm.sw.StopPeerGracefully(swPeer)
182                         }
183                 case <-sm.quitSync:
184                         return
185                 }
186         }
187 }
188
189 func (sm *SyncManager) minedBroadcastLoop() {
190         for {
191                 select {
192                 case blockHash := <-sm.newBlockCh:
193                         block, err := sm.chain.GetBlockByHash(blockHash)
194                         if err != nil {
195                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
196                                 return
197                         }
198                         peers, err := sm.peers.BroadcastMinedBlock(block)
199                         if err != nil {
200                                 log.Errorf("Broadcast mine block error. %v", err)
201                                 return
202                         }
203                         for _, smPeer := range peers {
204                                 if smPeer == nil {
205                                         continue
206                                 }
207                                 swPeer := smPeer.getPeer()
208                                 log.Info("New mined block broadcast error. Stop Peer.")
209                                 sm.sw.StopPeerGracefully(swPeer)
210                         }
211                 case <-sm.quitSync:
212                         return
213                 }
214         }
215 }
216
217 //NodeInfo get P2P peer node info
218 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
219         return sm.sw.NodeInfo()
220 }
221
222 //BlockKeeper get block keeper
223 func (sm *SyncManager) BlockKeeper() *blockKeeper {
224         return sm.blockKeeper
225 }
226
227 //Peers get sync manager peer set
228 func (sm *SyncManager) Peers() *peerSet {
229         return sm.peers
230 }
231
232 //DialSeeds dial seed peers
233 func (sm *SyncManager) DialSeeds(seeds []string) error {
234         return sm.sw.DialSeeds(sm.addrBook, seeds)
235 }
236
237 //Switch get sync manager switch
238 func (sm *SyncManager) Switch() *p2p.Switch {
239         return sm.sw
240 }