OSDN Git Service

netsync add test case (#365)
[bytom/vapor.git] / netsync / sync_manager.go
1 package netsync
2
3 import (
4         "errors"
5
6         "github.com/sirupsen/logrus"
7
8         "github.com/vapor/config"
9         "github.com/vapor/consensus"
10         dbm "github.com/vapor/database/leveldb"
11         "github.com/vapor/event"
12         "github.com/vapor/netsync/chainmgr"
13         "github.com/vapor/netsync/consensusmgr"
14         "github.com/vapor/netsync/peers"
15         "github.com/vapor/p2p"
16         "github.com/vapor/protocol"
17 )
18
19 const (
20         logModule = "netsync"
21 )
22
23 var (
24         errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
25 )
26
27 // ChainMgr is the interface for p2p chain message sync manager.
28 type ChainMgr interface {
29         Start() error
30         IsCaughtUp() bool
31         Stop()
32 }
33
34 // ConsensusMgr is the interface for consensus message sync manager.
35 type ConsensusMgr interface {
36         Start() error
37         Stop()
38 }
39
40 // Switch is the interface for p2p switch.
41 type Switch interface {
42         Start() (bool, error)
43         Stop() bool
44         IsListening() bool
45         DialPeerWithAddress(addr *p2p.NetAddress) error
46         Peers() *p2p.PeerSet
47 }
48
49 //SyncManager Sync Manager is responsible for the business layer information synchronization
50 type SyncManager struct {
51         config       *config.Config
52         sw           Switch
53         chainMgr     ChainMgr
54         consensusMgr ConsensusMgr
55         peers        *peers.PeerSet
56 }
57
58 // NewSyncManager create sync manager and set switch.
59 func NewSyncManager(config *config.Config, chain *protocol.Chain, txPool *protocol.TxPool, dispatcher *event.Dispatcher, fastSyncDB dbm.DB) (*SyncManager, error) {
60         sw, err := p2p.NewSwitch(config)
61         if err != nil {
62                 return nil, err
63         }
64         peers := peers.NewPeerSet(sw)
65
66         chainManger, err := chainmgr.NewManager(config, sw, chain, txPool, dispatcher, peers, fastSyncDB)
67         if err != nil {
68                 return nil, err
69         }
70         consensusMgr := consensusmgr.NewManager(sw, chain, dispatcher, peers)
71         return &SyncManager{
72                 config:       config,
73                 sw:           sw,
74                 chainMgr:     chainManger,
75                 consensusMgr: consensusMgr,
76                 peers:        peers,
77         }, nil
78 }
79
80 // Start message sync manager service.
81 func (sm *SyncManager) Start() error {
82         if _, err := sm.sw.Start(); err != nil {
83                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed start switch")
84                 return err
85         }
86
87         if err := sm.chainMgr.Start(); err != nil {
88                 return err
89         }
90
91         return sm.consensusMgr.Start()
92 }
93
94 // Stop message sync manager service.
95 func (sm *SyncManager) Stop() {
96         sm.chainMgr.Stop()
97         sm.consensusMgr.Stop()
98         if !sm.config.VaultMode {
99                 sm.sw.Stop()
100         }
101
102 }
103
104 // IsListening check if the vapord service port is open?
105 func (sm *SyncManager) IsListening() bool {
106         if sm.config.VaultMode {
107                 return false
108         }
109         return sm.sw.IsListening()
110
111 }
112
113 //IsCaughtUp check wheather the peer finish the sync
114 func (sm *SyncManager) IsCaughtUp() bool {
115         return sm.chainMgr.IsCaughtUp()
116 }
117
118 // PeerCount count the number of connected peers.
119 func (sm *SyncManager) PeerCount() int {
120         if sm.config.VaultMode {
121                 return 0
122         }
123         return len(sm.sw.Peers().List())
124 }
125
126 // GetNetwork get the type of network.
127 func (sm *SyncManager) GetNetwork() string {
128         return sm.config.ChainID
129 }
130
131 // BestPeer fine the peer with the highest height from the connected peers.
132 func (sm *SyncManager) BestPeer() *peers.PeerInfo {
133         bestPeer := sm.peers.BestPeer(consensus.SFFullNode)
134         if bestPeer != nil {
135                 return bestPeer.GetPeerInfo()
136         }
137         return nil
138 }
139
140 // DialPeerWithAddress dial the peer and establish a connection.
141 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
142         if sm.config.VaultMode {
143                 return errVaultModeDialPeer
144         }
145
146         return sm.sw.DialPeerWithAddress(addr)
147 }
148
149 //GetPeerInfos return peer info of all connected peers.
150 func (sm *SyncManager) GetPeerInfos() []*peers.PeerInfo {
151         return sm.peers.GetPeerInfos()
152 }
153
154 //StopPeer try to stop peer by given ID
155 func (sm *SyncManager) StopPeer(peerID string) error {
156         if peer := sm.peers.GetPeer(peerID); peer == nil {
157                 return errors.New("peerId not exist")
158         }
159         sm.peers.RemovePeer(peerID)
160         return nil
161 }