// synchronise tries to sync up our local block chain with a remote peer.
func (sm *SyncManager) synchronise() {
+ log.Debug("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List())
// Make sure only one goroutine is ever allowed past this point at once
if !atomic.CompareAndSwapInt32(&sm.synchronising, 0, 1) {
log.Info("Synchronising ...")
return
}
defer atomic.StoreInt32(&sm.synchronising, 0)
+ for len(sm.dropPeerCh) > 0 {
+ <-sm.dropPeerCh
+ }
peer, bestHeight := sm.peers.BestPeer()
// Short circuit if no peers are available
if peer == nil {
return
}
- if bestHeight > sm.chain.Height() {
+
+ if ok := sm.Switch().Peers().Has(peer.Key); !ok {
+ log.Info("Peer disconnected")
+ sm.sw.StopPeerGracefully(peer)
+ return
+ }
+
+ if bestHeight > sm.chain.BestBlockHeight() {
+ log.Info("sync peer:", peer.Addr(), " height:", bestHeight)
sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
}
}
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
- delete(pending, s.p.Key)
+ delete(pending, s.p.swPeer.Key)
}
// Send the pack in the background.
log.Info("Sending batch of transactions. ", "count:", len(pack.txs), " bytes:", size)
for {
select {
case s := <-sm.txSyncCh:
- pending[s.p.Key] = s
+ pending[s.p.swPeer.Key] = s
if !sending {
send(s)
}
// Stop tracking peers that cause send failures.
if err != nil {
log.Info("Transaction send failed", "err", err)
- delete(pending, pack.p.Key)
+ delete(pending, pack.p.swPeer.Key)
}
// Schedule the next send.
if s := pick(); s != nil {