From 78058dccd6766b55cf325cf4b4eb2c3d8f77328e Mon Sep 17 00:00:00 2001 From: HAOYUatHZ Date: Tue, 20 Aug 2019 16:35:05 +0800 Subject: [PATCH] try --- toolbar/precog/monitor/connection.go | 7 +++--- toolbar/precog/monitor/discover.go | 4 ++-- toolbar/precog/monitor/monitor.go | 46 ++++++++++++++++++------------------ toolbar/precog/monitor/stats.go | 27 ++++++++++++++------- 4 files changed, 47 insertions(+), 37 deletions(-) diff --git a/toolbar/precog/monitor/connection.go b/toolbar/precog/monitor/connection.go index a440d776..791fb8b0 100644 --- a/toolbar/precog/monitor/connection.go +++ b/toolbar/precog/monitor/connection.go @@ -11,7 +11,7 @@ import ( func (m *monitor) connectNodesRoutine() { // TODO: change name? - ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds+15) * time.Second) + ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second) for ; true; <-ticker.C { if err := m.dialNodes(); err != nil { log.Error(err) @@ -20,7 +20,7 @@ func (m *monitor) connectNodesRoutine() { } func (m *monitor) dialNodes() error { - m.Lock() + // m.Lock() log.Info("Start to reconnect to nodes...") var nodes []*orm.Node if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil { @@ -46,6 +46,7 @@ func (m *monitor) dialNodes() error { // connected peers will be skipped in switch.DialPeers() m.sw.DialPeers(addresses) log.Info("DialPeers done.") - m.Unlock() + m.checkStatusCh <- struct{}{} + // m.Unlock() return nil } diff --git a/toolbar/precog/monitor/discover.go b/toolbar/precog/monitor/discover.go index 199eec0a..e01833ed 100644 --- a/toolbar/precog/monitor/discover.go +++ b/toolbar/precog/monitor/discover.go @@ -33,7 +33,7 @@ func (m *monitor) collectDiscoveredNodes() { continue } log.Info("discover new node: ", node) - m.Lock() + // m.Lock() if err := m.upSertNode(&config.Node{ PublicKey: node.ID.String(), @@ -45,6 +45,6 @@ func (m *monitor) collectDiscoveredNodes() { log.Error(err) } - m.Unlock() + // m.Unlock() } } diff --git a/toolbar/precog/monitor/monitor.go b/toolbar/precog/monitor/monitor.go index 81b0c1f8..59a5558d 100644 --- a/toolbar/precog/monitor/monitor.go +++ b/toolbar/precog/monitor/monitor.go @@ -7,7 +7,7 @@ import ( "os/user" "strings" "sync" - "time" + // "time" "github.com/jinzhu/gorm" log "github.com/sirupsen/logrus" @@ -30,21 +30,21 @@ import ( type monitor struct { *sync.RWMutex - cfg *config.Config - db *gorm.DB - nodeCfg *vaporCfg.Config - sw *p2p.Switch - discvCh chan *dht.Node - privKey chainkd.XPrv - chain *mock.Chain - txPool *mock.Mempool - connected uint32 + cfg *config.Config + db *gorm.DB + nodeCfg *vaporCfg.Config + sw *p2p.Switch + discvCh chan *dht.Node + privKey chainkd.XPrv + chain *mock.Chain + txPool *mock.Mempool + checkStatusCh chan struct{} } // TODO: set myself as SPV? func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor { //TODO: for test - cfg.CheckFreqSeconds = 15 + cfg.CheckFreqSeconds = 180 dbPath, err := makePath() if err != nil { @@ -70,14 +70,15 @@ func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor { } return &monitor{ - RWMutex: &sync.RWMutex{}, - cfg: cfg, - db: db, - nodeCfg: nodeCfg, - discvCh: discvCh, - privKey: privKey.(chainkd.XPrv), - chain: chain, - txPool: txPool, + RWMutex: &sync.RWMutex{}, + cfg: cfg, + db: db, + nodeCfg: nodeCfg, + discvCh: discvCh, + privKey: privKey.(chainkd.XPrv), + chain: chain, + txPool: txPool, + checkStatusCh: make(chan struct{}, 1), } } @@ -175,9 +176,8 @@ func (m *monitor) checkStatusRoutine() { } bestHeight := uint64(0) - ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second) - for range ticker.C { - m.Lock() + for range m.checkStatusCh { + // m.Lock() log.Info("connected peers: ", m.sw.GetPeers().List()) for _, peer := range m.sw.GetPeers().List() { @@ -220,6 +220,6 @@ func (m *monitor) checkStatusRoutine() { peers.RemovePeer(p.ID()) } log.Info("Disonnect all peers.") - m.Unlock() + // m.Unlock() } } diff --git a/toolbar/precog/monitor/stats.go b/toolbar/precog/monitor/stats.go index 5f80c83b..808940e7 100644 --- a/toolbar/precog/monitor/stats.go +++ b/toolbar/precog/monitor/stats.go @@ -56,13 +56,7 @@ func (m *monitor) savePeerInfo(peerInfo *peers.PeerInfo) error { } ormNode := &orm.Node{} - if err := m.db.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}). - UpdateColumn(&orm.Node{ - Alias: peerInfo.Moniker, - Xpub: peerInfo.ID, - BestHeight: peerInfo.Height, - // LatestDailyUptimeMinutes uint64 - }).First(ormNode).Error; err != nil { + if err := m.db.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil { return err } @@ -72,6 +66,7 @@ func (m *monitor) savePeerInfo(peerInfo *peers.PeerInfo) error { log.Debugf("Parse ping time err: %v", err) } + // TODO: preload? ormNodeLiveness := &orm.NodeLiveness{ NodeID: ormNode.ID, BestHeight: ormNode.BestHeight, @@ -79,9 +74,23 @@ func (m *monitor) savePeerInfo(peerInfo *peers.PeerInfo) error { // PingTimes uint64 // PongTimes uint64 } - return m.db.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus). + if err := m.db.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus). UpdateColumn(&orm.NodeLiveness{ BestHeight: ormNodeLiveness.BestHeight, AvgLantencyMS: ormNodeLiveness.AvgLantencyMS, - }).FirstOrCreate(ormNodeLiveness).Error + }).FirstOrCreate(ormNodeLiveness).Error; err != nil { + return err + } + + if err := m.db.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}). + UpdateColumn(&orm.Node{ + Alias: peerInfo.Moniker, + Xpub: peerInfo.ID, + BestHeight: peerInfo.Height, + // LatestDailyUptimeMinutes uint64 + }).First(ormNode).Error; err != nil { + return err + } + + return nil } -- 2.11.0