OSDN Git Service

feat: add node discovery and status check (#374)
authorHAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
Tue, 3 Sep 2019 10:56:05 +0000 (18:56 +0800)
committerPaladz <yzhu101@uottawa.ca>
Tue, 3 Sep 2019 10:56:05 +0000 (18:56 +0800)
* clean up

* fix

* add

* wip

* Revert "wip"

This reverts commit 83076e5f7fb08a396786de89d073d0469c72f7ae.

* dododo

* do

* fk

* clean up

* fix netID

* rename

* clean up

* update config

* fix typos

* don't stop

* change

* fix

* add

* fix

* config networkID

* can dial peer

* add comments

* doing

* collect discv

* clean

* add

* host to ip

* clean

* more clean

* refactor

* clean

* for test

* skip

* add best_height for liveness

* minor

* clean

* Revert "clean"

This reverts commit 676391a8b3411ba66fa50841825650bb147199cb.

* clean

* add back seeds

* add

* clean

* clean

* update seeds

* rename

* rename

* rename

* dd

* fix

* refactor

* ??

* fix

* update todo

* ???

* add peers for reactors

* update

* update

* add mock

* more mock

* mock

* add

* disconnect

* refactor

* ....

* why disconnect?

* fk

* not panic anymore

* fix

* fix

* clean up

* fk

* fk

* clean up

* clean up

* clean

* rename

* clean

* rename

* clean

* clean yo

* rename

* add todo

* fk

* ???

* clean up

* fix

* clean

* fix

* fix

* fk

* fix deadlock

* add

* clean

* folder

* fix

* add comments

* clean

* add

* clean up

* rename

* add moniker

* update

* update todo

* fk

* fix

* fix

* add ping

* fix

* move around

* fk

* remove locks

* fk

* fk

* Revert "fk"

This reverts commit a24d9d6d08105f3db232c0a1c54e2a466ce779fd.

* dododo

* beter

* fk

* try_err....

* fix AvgLantencyMS

* try

* ???

* fl

* fix

* ???

* clean up

* clean

* clean up

* ???

* use dbTX

* clean

* rename

* add

* init gin

* wip

* fix config

* add todos

* add

* add locks

* init discvWg

* add locks for conn

* clean up

* init processDialResult

* mv nodeMap

* rollback upsert

* clean

* fix deadlock

* clean

* fix join

* dododo

* wip

* fix

* add avgLatency

* fix height

* fix

* fix dir

* clean up

* fix rm

* clean

* clean up

* refactot

* fix itme

* fox

* fix order

* LatestDailyUptimeMinutes

* clean

* fix CI

* fix ci

* clean up

* minor

* change check_frequency

* init s.ListNodes

* rename node.host to node.ip

* mv bestHeight

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r318683697

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r318680795

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r318674582

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r318677500

* fix log.Debug

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r318669877

* fix for log

* fix log

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r318673448

* fix

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r318681813

* fix

* refactor

* clean up

* clean up

* ix

* clean

* update

* clean up

* fix ListNodes

* update

* update

* updare

* add status

* fix status

* clean up

* add offset

* clean up

* init rm NodeLiveness

* fix for status

* minor fix for status

* fix

* clean

* fix

* clean up

* check

* fix

* fix

* clean up

* ???

* ip&port

* clean

* fk

* updare

* fk

* fix

* ...

* clean

* peerLisr

* clean

* clean

* fix

* minro

* fix

* fix bestHeight?

* fix nanoseconds

* fix rtt

* rename policy config

* fix status

* fix

* add RequiredRttMS

* clean

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r319836748

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r319840951

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r319840078

* fix for https://github.com/Bytom/vapor/pull/374#discussion_r319835254

* fix

* fix

* rename

* mv timestamp type def

* fix for timestamp

* rollback

32 files changed:
.gitignore
Makefile
cmd/precognitive/main.go [new file with mode: 0644]
crypto/ed25519/ed25519.go
docs/precognitive/README.md [new file with mode: 0644]
docs/precognitive/config_example.json [new file with mode: 0644]
docs/precognitive/sql_dump/precognitive_schema.sql [new file with mode: 0644]
netsync/chainmgr/tool_test.go
netsync/consensusmgr/handle_test.go
netsync/peers/peer.go
netsync/peers/peer_test.go
netsync/sync_manager.go
p2p/peer.go
p2p/switch.go
p2p/test_util.go
test/mock/chain.go
toolbar/common/types.go [moved from toolbar/federation/types/types.go with 96% similarity]
toolbar/federation/database/orm/asset.go
toolbar/federation/database/orm/chain.go
toolbar/federation/database/orm/cross_transaction.go
toolbar/federation/database/orm/cross_transaction_req.go
toolbar/precognitive/api/handler.go [new file with mode: 0644]
toolbar/precognitive/api/server.go [new file with mode: 0644]
toolbar/precognitive/common/const.go [new file with mode: 0644]
toolbar/precognitive/config/config.go [new file with mode: 0644]
toolbar/precognitive/database/orm/node.go [new file with mode: 0644]
toolbar/precognitive/database/orm/node_liveness.go [new file with mode: 0644]
toolbar/precognitive/monitor/connection.go [new file with mode: 0644]
toolbar/precognitive/monitor/discover.go [new file with mode: 0644]
toolbar/precognitive/monitor/mock.go [new file with mode: 0644]
toolbar/precognitive/monitor/monitor.go [new file with mode: 0644]
toolbar/precognitive/monitor/stats.go [new file with mode: 0644]

index 3d50b94..d74d7d5 100644 (file)
@@ -21,6 +21,7 @@ _cgo_export.*
 _testmain.go
 
 cmd/fedd/fedd
+cmd/precognitive/precognitive
 cmd/vapord/vapord
 cmd/vapord/.vapord
 cmd/vaporcli/vaporcli
index 8c85059..9575331 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -38,6 +38,10 @@ fedd:
        @echo "Building fedd to cmd/fedd/fedd"
        @go build $(BUILD_FLAGS) -o cmd/fedd/fedd cmd/fedd/main.go
 
+precognitive:
+       @echo "Building precognitive to cmd/precognitive/precognitive"
+       @go build $(BUILD_FLAGS) -o cmd/precognitive/precognitive cmd/precognitive/main.go
+
 vapord:
        @echo "Building vapord to cmd/vapord/vapord"
        @go build $(BUILD_FLAGS) -o cmd/vapord/vapord cmd/vapord/main.go
diff --git a/cmd/precognitive/main.go b/cmd/precognitive/main.go
new file mode 100644 (file)
index 0000000..07913be
--- /dev/null
@@ -0,0 +1,28 @@
+package main
+
+import (
+       "sync"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/toolbar/common"
+       "github.com/vapor/toolbar/precognitive/api"
+       "github.com/vapor/toolbar/precognitive/config"
+       "github.com/vapor/toolbar/precognitive/monitor"
+)
+
+func main() {
+       cfg := config.NewConfig()
+       db, err := common.NewMySQLDB(cfg.MySQLConfig)
+       if err != nil {
+               log.WithField("err", err).Panic("initialize mysql db error")
+       }
+
+       go monitor.NewMonitor(cfg, db).Run()
+       go api.NewApiServer(cfg, db).Run()
+
+       // keep the main func running in case of terminating goroutines
+       var wg sync.WaitGroup
+       wg.Add(1)
+       wg.Wait()
+}
index 530b08b..aa22b5b 100644 (file)
@@ -36,6 +36,10 @@ type PublicKey []byte
 // PrivateKey is the type of Ed25519 private keys. It implements crypto.Signer.
 type PrivateKey []byte
 
+func (pub PublicKey) String() string {
+       return hex.EncodeToString(pub)
+}
+
 // Public returns the PublicKey corresponding to priv.
 func (priv PrivateKey) Public() PublicKey {
        publicKey := make([]byte, PublicKeySize)
diff --git a/docs/precognitive/README.md b/docs/precognitive/README.md
new file mode 100644 (file)
index 0000000..76839e1
--- /dev/null
@@ -0,0 +1,57 @@
+# Precognitive
+
+Keep monitoring (leader & candidate) consensus nodes status in vapor network.
+
+## Init
+
+### Database Schema
+[precognitive_schema.sql](./sql_dump/precognitive_schema.sql)
+
+### Config
+run with [config_example.json](docs/precognitive/config_example.json)
+```
+go run cmd/precognitive/main.go docs/precognitive/config_example.json
+```
+
+## API
+
++ [/list-nodes](#list-nodes)
+
+### /list-nodes
+
+__method:__ POST
+
+```
+curl -X POST 127.0.0.1:3009/api/v1/list-nodes -d '{}'
+```
+
+__example response:__
+```
+{
+    [
+        {
+            "alias": "cobo",
+            "public_key": "b928e46bb01e834fdf167185e31b15de7cc257af8bbdf17f9c7fefd5bb97b306d048b6bc0da2097152c1c2ff38333c756a543adbba7030a447dcc776b8ac64ef",
+            "host": "vapornode.cobo.com",
+            "port": 123,
+            "best_height": 1023,
+            "lantency_ms": 300,
+            "active_minutes": 4096,
+            "status": "healthy"
+        },
+        {
+            "alias": "matpool",
+            "public_key": "0f8669abbd3cc0a167156188e428f940088d5b2f36bb3449df71d2bdc5e077814ea3f68628eef279ed435f51ee26cff00f8bd28fabfd500bedb2a9e369f5c825",
+            "host": "vapornode.matpool.io",
+            "port": 321,
+            "best_height": 1024,
+            "lantency_ms": 299,
+            "active_minutes": 4097,
+            "status": "healthy"
+        }
+    ] 
+}
+```
+
+
+### /get-node-statistics
\ No newline at end of file
diff --git a/docs/precognitive/config_example.json b/docs/precognitive/config_example.json
new file mode 100644 (file)
index 0000000..6068a99
--- /dev/null
@@ -0,0 +1,42 @@
+{
+    "api" : {
+        "listening_port" : 3009
+    }, 
+    "network_id": 10817814959495988245,
+    "mysql" : {
+        "connection" : {
+            "host": "127.0.0.1",
+            "port": 3306,
+            "username": "root",
+            "password": "toor",
+            "database": "precognitive"
+        },
+        "log_mode" : true
+    },
+    "check_frequency_minutes" : 30,
+    "policy" : {
+        "required_rtt_ms" : 2000
+    },
+    "seeds" : [
+        {
+            "xpub" : "f2767279cd01ed8793808e0542a18958e1a2f3a6b6fe5328ec79596a022bc6f085951a98a631917563f86bb91db9159dd2969ff9d690fc12b250baff2b6f6a1d",
+            "ip": "47.103.79.68",
+            "port": 56656
+        },
+        {
+            "xpub" : "c785deb76af14e918ea05eeab863288c48b16d1816621adf74ada868e8e3246780bf5206dc5a0e3efef0c119978e11ed75eca9d3b50846e37c55cf8d56717c4f",
+            "ip": "47.103.13.86",
+            "port": 56656
+        },
+        {
+            "xpub" : "22bc19ec65d4ee524c5130575ddff041e712dbb415740eae314fd3359aa3978319384cd3ded8c4125ca2774716d7285268ebf1d85091eef8e7ad03077857e7ab",
+            "ip": "47.102.193.119",
+            "port": 56656
+        },
+        {
+            "xpub" : "214c0e6827346e9fee1056c4c8b96cefd67b75ed1dead59e4e4e3eee8c1fe095dbe7a7fb61bb23b4ab66cde2a1c04466b8d3e8efa21cf7eee064c70fb1525b14",
+            "ip": "47.103.17.22",
+            "port": 56656
+        }
+    ]
+}
\ No newline at end of file
diff --git a/docs/precognitive/sql_dump/precognitive_schema.sql b/docs/precognitive/sql_dump/precognitive_schema.sql
new file mode 100644 (file)
index 0000000..2e0bf27
--- /dev/null
@@ -0,0 +1,56 @@
+/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
+/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
+/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
+/*!40101 SET NAMES utf8 */;
+/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
+/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
+/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
+
+CREATE SCHEMA IF NOT EXISTS `precognitive`;
+DROP DATABASE `precognitive`;
+CREATE SCHEMA `precognitive`;
+
+USE `precognitive`;
+
+# Dump of table nodes
+# ------------------------------------------------------------
+
+CREATE TABLE `nodes` (
+  `id` int(11) NOT NULL AUTO_INCREMENT,
+  `alias` varchar(128) NOT NULL DEFAULT '',
+  `xpub` char(128) NOT NULL DEFAULT '',
+  `public_key` char(64) NOT NULL DEFAULT '',
+  `ip` varchar(128) NOT NULL DEFAULT '',
+  `port` smallint unsigned NOT NULL DEFAULT '0',
+  `best_height` int(11) DEFAULT '0',
+  `avg_rtt_ms` int(11) DEFAULT NULL,
+  `latest_daily_uptime_minutes` int(11) DEFAULT '0',
+  `status` tinyint(1) NOT NULL DEFAULT '0',
+  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `address` (`ip`,`port`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+LOCK TABLES `nodes` WRITE;
+UNLOCK TABLES;
+
+
+# Dump of table node_livenesses
+# ------------------------------------------------------------
+
+CREATE TABLE `node_livenesses` (
+  `id` int(11) NOT NULL AUTO_INCREMENT,
+  `node_id` int(11) NOT NULL,
+  `ping_times` int(11) DEFAULT '0',
+  `pong_times` int(11) DEFAULT '0',
+  `best_height` int(11) DEFAULT '0',
+  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+  PRIMARY KEY (`id`),
+  CONSTRAINT `node_livenesses_ibfk_1` FOREIGN KEY (`node_id`) REFERENCES `nodes` (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+LOCK TABLES `node_livenesses` WRITE;
+UNLOCK TABLES;
+
index 1905e11..9e4bf63 100644 (file)
@@ -51,6 +51,10 @@ func (p *P2PPeer) IsLAN() bool {
        return false
 }
 
+func (p *P2PPeer) Moniker() string {
+       return ""
+}
+
 func (p *P2PPeer) RemoteAddrHost() string {
        return ""
 }
index d5e1569..832b677 100644 (file)
@@ -28,6 +28,10 @@ func (p *p2peer) ID() string {
        return ""
 }
 
+func (p *p2peer) Moniker() string {
+       return ""
+}
+
 func (p *p2peer) RemoteAddrHost() string {
        return ""
 }
index e0c52f5..dea5a24 100644 (file)
@@ -35,6 +35,7 @@ var (
 
 //BasePeer is the interface for connection level peer
 type BasePeer interface {
+       Moniker() string
        Addr() net.Addr
        ID() string
        RemoteAddrHost() string
@@ -61,6 +62,7 @@ type BroadcastMsg interface {
 // PeerInfo indicate peer status snap
 type PeerInfo struct {
        ID                  string `json:"peer_id"`
+       Moniker             string `json:"moniker"`
        RemoteAddr          string `json:"remote_addr"`
        Height              uint64 `json:"height"`
        Ping                string `json:"ping"`
@@ -169,6 +171,7 @@ func (p *Peer) GetPeerInfo() *PeerInfo {
 
        return &PeerInfo{
                ID:                  p.ID(),
+               Moniker:             p.BasePeer.Moniker(),
                RemoteAddr:          p.Addr().String(),
                Height:              p.bestHeight,
                Ping:                ping.String(),
index 27a1625..514d269 100644 (file)
@@ -38,6 +38,10 @@ func (bp *basePeer) ID() string {
        return bp.id
 }
 
+func (bp *basePeer) Moniker() string {
+       return ""
+}
+
 func (bp *basePeer) RemoteAddrHost() string {
        switch bp.ID() {
        case peer1ID:
index 88cf9d4..307d6c3 100644 (file)
@@ -57,7 +57,7 @@ type SyncManager struct {
 
 // NewSyncManager create sync manager and set switch.
 func NewSyncManager(config *config.Config, chain *protocol.Chain, txPool *protocol.TxPool, dispatcher *event.Dispatcher, fastSyncDB dbm.DB) (*SyncManager, error) {
-       sw, err := p2p.NewSwitch(config)
+       sw, err := p2p.NewSwitchMaybeDiscover(config)
        if err != nil {
                return nil, err
        }
index 856d1e9..fbb663b 100644 (file)
@@ -194,6 +194,11 @@ func (p *Peer) IsLAN() bool {
        return p.isLAN
 }
 
+// Moniker returns peer's moniker.
+func (p *Peer) Moniker() string {
+       return p.NodeInfo.Moniker
+}
+
 // PubKey returns peer's public key.
 func (p *Peer) PubKey() string {
        return p.conn.(*connection.SecretConnection).RemotePubKey().String()
index d3daa7c..075498f 100644 (file)
@@ -78,8 +78,8 @@ type Switch struct {
        security     Security
 }
 
-// NewSwitch create a new Switch and set discover.
-func NewSwitch(config *cfg.Config) (*Switch, error) {
+// NewSwitchMaybeDiscover create a new Switch and set discover.
+func NewSwitchMaybeDiscover(config *cfg.Config) (*Switch, error) {
        var err error
        var l Listener
        var listenAddr string
@@ -109,11 +109,11 @@ func NewSwitch(config *cfg.Config) (*Switch, error) {
                }
        }
 
-       return newSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID)
+       return NewSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID)
 }
 
 // newSwitch creates a new Switch with the given config.
-func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
+func NewSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
        sw := &Switch{
                Config:       config,
                peerConfig:   DefaultPeerConfig(config.P2P),
@@ -135,6 +135,26 @@ func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, p
        return sw, nil
 }
 
+func (sw *Switch) GetDiscv() discv {
+       return sw.discv
+}
+
+func (sw *Switch) GetNodeInfo() *NodeInfo {
+       return sw.nodeInfo
+}
+
+func (sw *Switch) GetPeers() *PeerSet {
+       return sw.peers
+}
+
+func (sw *Switch) GetReactors() map[string]Reactor {
+       return sw.reactors
+}
+
+func (sw *Switch) GetSecurity() Security {
+       return sw.security
+}
+
 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
 func (sw *Switch) OnStart() error {
        for _, reactor := range sw.reactors {
@@ -352,7 +372,7 @@ func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
        for i := 0; i < len(lanPeer.IP); i++ {
                addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
        }
-       sw.dialPeers(addresses)
+       sw.DialPeers(addresses)
 }
 
 func (sw *Switch) connectLANPeersRoutine() {
@@ -392,7 +412,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
                        break
                }
 
-               // disconnect if we alrady have MaxNumPeers
+               // disconnect if we already have MaxNumPeers
                if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
                        if err := inConn.Close(); err != nil {
                                log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
@@ -416,7 +436,7 @@ func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
        wg.Done()
 }
 
-func (sw *Switch) dialPeers(addresses []*NetAddress) {
+func (sw *Switch) DialPeers(addresses []*NetAddress) {
        connectedPeers := make(map[string]struct{})
        for _, peer := range sw.Peers().List() {
                connectedPeers[peer.RemoteAddrHost()] = struct{}{}
@@ -452,7 +472,7 @@ func (sw *Switch) ensureKeepConnectPeers() {
                addresses = append(addresses, address)
        }
 
-       sw.dialPeers(addresses)
+       sw.DialPeers(addresses)
 }
 
 func (sw *Switch) ensureOutboundPeers() {
@@ -470,7 +490,7 @@ func (sw *Switch) ensureOutboundPeers() {
                address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
                addresses = append(addresses, address)
        }
-       sw.dialPeers(addresses)
+       sw.DialPeers(addresses)
 }
 
 func (sw *Switch) ensureOutboundPeersRoutine() {
index fa5d631..fc27e4e 100644 (file)
@@ -92,7 +92,7 @@ func MakeSwitch(cfg *cfg.Config, testdb dbm.DB, privKey signlib.PrivKey, initSwi
        // new switch, add reactors
        l, listenAddr := GetListener(cfg.P2P)
        cfg.P2P.LANDiscover = false
-       sw, err := newSwitch(cfg, new(mockDiscv), nil, l, privKey, listenAddr, 0)
+       sw, err := NewSwitch(cfg, new(mockDiscv), nil, l, privKey, listenAddr, 0)
        if err != nil {
                log.Errorf("create switch error: %s", err)
                return nil
index 69ef7c4..82fc7dc 100644 (file)
@@ -28,10 +28,11 @@ type Chain struct {
 
 func NewChain(mempool *Mempool) *Chain {
        return &Chain{
-               heightMap:   map[uint64]*types.Block{},
-               blockMap:    map[bc.Hash]*types.Block{},
-               prevOrphans: make(map[bc.Hash]*types.Block),
-               mempool:     mempool,
+               bestBlockHeader: &types.BlockHeader{},
+               heightMap:       map[uint64]*types.Block{},
+               blockMap:        map[bc.Hash]*types.Block{},
+               prevOrphans:     make(map[bc.Hash]*types.Block),
+               mempool:         mempool,
        }
 }
 
@@ -147,6 +148,11 @@ func (c *Chain) ProcessBlock(block *types.Block) (bool, error) {
        return false, nil
 }
 
+// TODO:
+func (c *Chain) ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error {
+       return nil
+}
+
 func (c *Chain) SetBestBlockHeader(header *types.BlockHeader) {
        c.bestBlockHeader = header
 }
similarity index 96%
rename from toolbar/federation/types/types.go
rename to toolbar/common/types.go
index e3f8f48..18e3b0e 100644 (file)
@@ -1,4 +1,4 @@
-package types
+package common
 
 import (
        "fmt"
index 61c0287..1d39d49 100644 (file)
@@ -1,15 +1,15 @@
 package orm
 
 import (
-       "github.com/vapor/toolbar/federation/types"
+       "github.com/vapor/toolbar/common"
 )
 
 type Asset struct {
-       ID              uint64          `gorm:"primary_key;foreignkey:ID" json:"-"`
-       AssetID         string          `json:"asset_id"`
-       IssuanceProgram string          `json:"-"`
-       VMVersion       uint64          `json:"-"`
-       Definition      string          `json:"-"`
-       CreatedAt       types.Timestamp `json:"-"`
-       UpdatedAt       types.Timestamp `json:"-"`
+       ID              uint64           `gorm:"primary_key;foreignkey:ID" json:"-"`
+       AssetID         string           `json:"asset_id"`
+       IssuanceProgram string           `json:"-"`
+       VMVersion       uint64           `json:"-"`
+       Definition      string           `json:"-"`
+       CreatedAt       common.Timestamp `json:"-"`
+       UpdatedAt       common.Timestamp `json:"-"`
 }
index afbdbe8..92656eb 100644 (file)
@@ -1,14 +1,14 @@
 package orm
 
 import (
-       "github.com/vapor/toolbar/federation/types"
+       "github.com/vapor/toolbar/common"
 )
 
 type Chain struct {
-       ID          uint64          `gorm:"primary_key" json:"-"`
-       Name        string          `json:"name"`
-       BlockHeight uint64          `json:"block_height"`
-       BlockHash   string          `json:"block_hash"`
-       CreatedAt   types.Timestamp `json:"-"`
-       UpdatedAt   types.Timestamp `json:"-"`
+       ID          uint64           `gorm:"primary_key" json:"-"`
+       Name        string           `json:"name"`
+       BlockHeight uint64           `json:"block_height"`
+       BlockHash   string           `json:"block_hash"`
+       CreatedAt   common.Timestamp `json:"-"`
+       UpdatedAt   common.Timestamp `json:"-"`
 }
index 9d2eb29..6c424db 100644 (file)
@@ -5,8 +5,8 @@ import (
        "encoding/json"
 
        "github.com/vapor/errors"
-       "github.com/vapor/toolbar/federation/common"
-       "github.com/vapor/toolbar/federation/types"
+       "github.com/vapor/toolbar/common"
+       fedCommon "github.com/vapor/toolbar/federation/common"
 )
 
 type CrossTransaction struct {
@@ -25,8 +25,8 @@ type CrossTransaction struct {
        DestTxIndex          sql.NullInt64  `sql:"default:null"`
        DestTxHash           sql.NullString `sql:"default:null"`
        Status               uint8
-       CreatedAt            types.Timestamp
-       UpdatedAt            types.Timestamp
+       CreatedAt            common.Timestamp
+       UpdatedAt            common.Timestamp
 
        Chain *Chain `gorm:"foreignkey:ChainID"`
        Reqs  []*CrossTransactionReq
@@ -35,10 +35,10 @@ type CrossTransaction struct {
 func (c *CrossTransaction) MarshalJSON() ([]byte, error) {
        var status string
        switch c.Status {
-       case common.CrossTxPendingStatus:
-               status = common.CrossTxPendingStatusLabel
-       case common.CrossTxCompletedStatus:
-               status = common.CrossTxCompletedStatusLabel
+       case fedCommon.CrossTxPendingStatus:
+               status = fedCommon.CrossTxPendingStatusLabel
+       case fedCommon.CrossTxCompletedStatus:
+               status = fedCommon.CrossTxCompletedStatusLabel
        default:
                return nil, errors.New("unknown cross-chain tx status")
        }
index 907cd3f..2719561 100644 (file)
@@ -1,20 +1,20 @@
 package orm
 
 import (
-       "github.com/vapor/toolbar/federation/types"
+       "github.com/vapor/toolbar/common"
 )
 
 type CrossTransactionReq struct {
-       ID                 uint64          `gorm:"primary_key" json:"-"`
-       CrossTransactionID uint64          `json:"-"`
-       SourcePos          uint64          `json:"-"`
-       AssetID            uint64          `json:"-"`
-       AssetAmount        uint64          `json:"amount"`
-       Script             string          `json:"-"`
-       FromAddress        string          `json:"from_address"`
-       ToAddress          string          `json:"to_address"`
-       CreatedAt          types.Timestamp `json:"-"`
-       UpdatedAt          types.Timestamp `json:"-"`
+       ID                 uint64           `gorm:"primary_key" json:"-"`
+       CrossTransactionID uint64           `json:"-"`
+       SourcePos          uint64           `json:"-"`
+       AssetID            uint64           `json:"-"`
+       AssetAmount        uint64           `json:"amount"`
+       Script             string           `json:"-"`
+       FromAddress        string           `json:"from_address"`
+       ToAddress          string           `json:"to_address"`
+       CreatedAt          common.Timestamp `json:"-"`
+       UpdatedAt          common.Timestamp `json:"-"`
 
        CrossTransaction *CrossTransaction `gorm:"foreignkey:CrossTransactionID" json:"-"`
        Asset            *Asset            `json:"asset"`
diff --git a/toolbar/precognitive/api/handler.go b/toolbar/precognitive/api/handler.go
new file mode 100644 (file)
index 0000000..1a85c37
--- /dev/null
@@ -0,0 +1,19 @@
+package api
+
+import (
+       "github.com/gin-gonic/gin"
+
+       "github.com/vapor/toolbar/precognitive/database/orm"
+       serverCommon "github.com/vapor/toolbar/server"
+)
+
+type listNodesReq struct{ serverCommon.Display }
+
+func (s *Server) ListNodes(c *gin.Context, listNodesReq *listNodesReq, query *serverCommon.PaginationQuery) ([]*orm.Node, error) {
+       var ormNodes []*orm.Node
+       if err := s.db.Offset(query.Start).Limit(query.Limit).Find(&ormNodes).Error; err != nil {
+               return nil, err
+       }
+
+       return ormNodes, nil
+}
diff --git a/toolbar/precognitive/api/server.go b/toolbar/precognitive/api/server.go
new file mode 100644 (file)
index 0000000..b4e617d
--- /dev/null
@@ -0,0 +1,43 @@
+package api
+
+import (
+       "fmt"
+
+       "github.com/gin-gonic/gin"
+       "github.com/jinzhu/gorm"
+
+       "github.com/vapor/toolbar/precognitive/config"
+       serverCommon "github.com/vapor/toolbar/server"
+)
+
+type Server struct {
+       cfg    *config.Config
+       db     *gorm.DB
+       engine *gin.Engine
+}
+
+func NewApiServer(cfg *config.Config, db *gorm.DB) *Server {
+       server := &Server{
+               cfg: cfg,
+               db:  db,
+       }
+       if cfg.API.IsReleaseMode {
+               gin.SetMode(gin.ReleaseMode)
+       }
+       server.setupRouter()
+       return server
+}
+
+func (s *Server) setupRouter() {
+       r := gin.Default()
+       r.Use(serverCommon.Middleware(s))
+
+       v1 := r.Group("/api/v1")
+       v1.POST("/list-nodes", serverCommon.HandlerMiddleware(s.ListNodes))
+
+       s.engine = r
+}
+
+func (s *Server) Run() {
+       s.engine.Run(fmt.Sprintf(":%d", s.cfg.API.ListeningPort))
+}
diff --git a/toolbar/precognitive/common/const.go b/toolbar/precognitive/common/const.go
new file mode 100644 (file)
index 0000000..3cbf5de
--- /dev/null
@@ -0,0 +1,17 @@
+package common
+
+const (
+       NodeUnknownStatus uint8 = iota
+       NodeHealthyStatus
+       NodeCongestedStatus
+       NodeOrphanStatus
+       NodeOfflineStatus
+)
+
+var StatusLookupTable = map[uint8]string{
+       NodeUnknownStatus:   "unknown",
+       NodeHealthyStatus:   "healthy",
+       NodeCongestedStatus: "congested",
+       NodeOrphanStatus:    "orphan",
+       NodeOfflineStatus:   "offline",
+}
diff --git a/toolbar/precognitive/config/config.go b/toolbar/precognitive/config/config.go
new file mode 100644 (file)
index 0000000..3c5ba3f
--- /dev/null
@@ -0,0 +1,61 @@
+package config
+
+import (
+       "encoding/json"
+       "os"
+
+       log "github.com/sirupsen/logrus"
+       "github.com/vapor/crypto/ed25519/chainkd"
+
+       "github.com/vapor/toolbar/common"
+)
+
+func NewConfig() *Config {
+       if len(os.Args) <= 1 {
+               log.Fatal("Please setup the config file path")
+       }
+
+       return NewConfigWithPath(os.Args[1])
+}
+
+func NewConfigWithPath(path string) *Config {
+       configFile, err := os.Open(path)
+       if err != nil {
+               log.WithFields(log.Fields{"err": err, "file_path": os.Args[1]}).Fatal("fail to open config file")
+       }
+       defer configFile.Close()
+
+       cfg := &Config{}
+       if err := json.NewDecoder(configFile).Decode(cfg); err != nil {
+               log.WithField("err", err).Fatal("fail to decode config file")
+       }
+
+       return cfg
+}
+
+type Config struct {
+       NetworkID        uint64             `json:"network_id"`
+       MySQLConfig      common.MySQLConfig `json:"mysql"`
+       CheckFreqMinutes uint64             `json:"check_frequency_minutes"`
+       Policy           Policy             `json:"policy"`
+       Nodes            []Node             `json:"seeds"`
+       API              API                `json:"api"`
+}
+
+type Policy struct {
+       Confirmations uint64 `json:"confirmations"`
+       RequiredRttMS uint64 `json:"required_rtt_ms"`
+}
+
+type Node struct {
+       XPub      *chainkd.XPub `json:"xpub"`
+       PublicKey string        `json:"public_key"`
+       IP        string        `json:"ip"`
+       Port      uint16        `json:"port"`
+}
+
+type API struct {
+       ListeningPort uint64 `json:"listening_port"`
+       AccessToken   string `json:"access_token"`
+       IsReleaseMode bool   `json:"is_release_mode"`
+}
diff --git a/toolbar/precognitive/database/orm/node.go b/toolbar/precognitive/database/orm/node.go
new file mode 100644 (file)
index 0000000..addf7b1
--- /dev/null
@@ -0,0 +1,59 @@
+package orm
+
+import (
+       "database/sql"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "time"
+
+       "github.com/vapor/toolbar/common"
+       precogCommon "github.com/vapor/toolbar/precognitive/common"
+)
+
+type Node struct {
+       ID                       uint16 `gorm:"primary_key"`
+       Alias                    string
+       Xpub                     string
+       PublicKey                string
+       IP                       string
+       Port                     uint16
+       BestHeight               uint64
+       AvgRttMS                 sql.NullInt64
+       LatestDailyUptimeMinutes uint64
+       Status                   uint8
+       CreatedAt                time.Time
+       UpdatedAt                time.Time
+}
+
+func (n *Node) MarshalJSON() ([]byte, error) {
+       status, ok := precogCommon.StatusLookupTable[n.Status]
+       if !ok {
+               return nil, errors.New("fail to look up status")
+       }
+
+       avgRttMS := uint64(0)
+       if n.AvgRttMS.Valid {
+               avgRttMS = uint64(n.AvgRttMS.Int64)
+       }
+
+       return json.Marshal(&struct {
+               Alias                    string           `json:"alias"`
+               PublicKey                string           `json:"publickey"`
+               Address                  string           `json:"address"`
+               BestHeight               uint64           `json:"best_height"`
+               AvgRttMS                 uint64           `json:"avg_rtt_ms"`
+               LatestDailyUptimeMinutes uint64           `json:"latest_daily_uptime_minutes"`
+               Status                   string           `json:"status"`
+               UpdatedAt                common.Timestamp `json:"updated_at"`
+       }{
+               Alias:                    n.Alias,
+               PublicKey:                n.PublicKey,
+               Address:                  fmt.Sprintf("%s:%d", n.IP, n.Port),
+               BestHeight:               n.BestHeight,
+               AvgRttMS:                 avgRttMS,
+               LatestDailyUptimeMinutes: n.LatestDailyUptimeMinutes,
+               Status:    status,
+               UpdatedAt: common.Timestamp(n.UpdatedAt),
+       })
+}
diff --git a/toolbar/precognitive/database/orm/node_liveness.go b/toolbar/precognitive/database/orm/node_liveness.go
new file mode 100644 (file)
index 0000000..307f8ba
--- /dev/null
@@ -0,0 +1,17 @@
+package orm
+
+import (
+       "time"
+)
+
+type NodeLiveness struct {
+       ID         uint64 `gorm:"primary_key"`
+       NodeID     uint16
+       PingTimes  uint64
+       PongTimes  uint64
+       BestHeight uint64
+       CreatedAt  time.Time
+       UpdatedAt  time.Time
+
+       Node *Node `gorm:"foreignkey:NodeID"`
+}
diff --git a/toolbar/precognitive/monitor/connection.go b/toolbar/precognitive/monitor/connection.go
new file mode 100644 (file)
index 0000000..6072377
--- /dev/null
@@ -0,0 +1,68 @@
+package monitor
+
+import (
+       "net"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/p2p"
+       "github.com/vapor/toolbar/precognitive/database/orm"
+)
+
+func (m *monitor) connectionRoutine() {
+       ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqMinutes) * time.Minute)
+       for ; true; <-ticker.C {
+               if err := m.dialNodes(); err != nil {
+                       log.WithFields(log.Fields{"err": err}).Error("dialNodes")
+               }
+       }
+}
+
+func (m *monitor) dialNodes() error {
+       log.Info("Start to reconnect to nodes...")
+       var nodes []*orm.Node
+       if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil {
+               return err
+       }
+
+       addresses := make([]*p2p.NetAddress, 0)
+       for i := 0; i < len(nodes); i++ {
+               address := p2p.NewNetAddressIPPort(net.ParseIP(nodes[i].IP), nodes[i].Port)
+               addresses = append(addresses, address)
+       }
+
+       // connected peers will be skipped in switch.DialPeers()
+       m.sw.DialPeers(addresses)
+       log.Info("DialPeers done.")
+       peerList := m.sw.GetPeers().List()
+       m.processDialResults(peerList)
+       m.checkStatus(peerList)
+       return nil
+}
+
+func (m *monitor) checkStatus(peerList []*p2p.Peer) {
+       for _, peer := range peerList {
+               peer.Start()
+               m.peers.AddPeer(peer)
+       }
+       log.WithFields(log.Fields{"num": len(m.sw.GetPeers().List()), "peers": m.sw.GetPeers().List()}).Info("connected peers")
+
+       for _, peerInfo := range m.peers.GetPeerInfos() {
+               if peerInfo.Height > m.bestHeightSeen {
+                       m.bestHeightSeen = peerInfo.Height
+               }
+       }
+       log.WithFields(log.Fields{"bestHeight": m.bestHeightSeen}).Info("peersInfo")
+       m.processPeerInfos(m.peers.GetPeerInfos())
+
+       for _, peer := range peerList {
+               p := m.peers.GetPeer(peer.ID())
+               if p == nil {
+                       continue
+               }
+
+               m.peers.RemovePeer(p.ID())
+       }
+       log.Info("Disonnect all peers.")
+}
diff --git a/toolbar/precognitive/monitor/discover.go b/toolbar/precognitive/monitor/discover.go
new file mode 100644 (file)
index 0000000..6eb8781
--- /dev/null
@@ -0,0 +1,43 @@
+package monitor
+
+import (
+       "fmt"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/p2p/discover/dht"
+       "github.com/vapor/toolbar/precognitive/config"
+)
+
+var (
+       nodesToDiscv = 150
+       discvFreqSec = 60
+)
+
+func (m *monitor) discoveryRoutine() {
+       discvMap := make(map[string]*dht.Node)
+       ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
+       for range ticker.C {
+               nodes := make([]*dht.Node, nodesToDiscv)
+               num := m.sw.GetDiscv().ReadRandomNodes(nodes)
+               for _, node := range nodes[:num] {
+                       address := fmt.Sprintf("%s:%d", node.IP.String(), node.TCP)
+                       if n, ok := discvMap[address]; ok && n.String() == node.String() {
+                               continue
+                       }
+
+                       log.WithFields(log.Fields{"new node": node}).Info("discover")
+
+                       if err := m.upsertNode(&config.Node{
+                               PublicKey: node.ID.String(),
+                               IP:        node.IP.String(),
+                               Port:      node.TCP,
+                       }); err != nil {
+                               log.WithFields(log.Fields{"node": node, "err": err}).Error("upsertNode")
+                       } else {
+                               discvMap[address] = node
+                       }
+               }
+       }
+}
diff --git a/toolbar/precognitive/monitor/mock.go b/toolbar/precognitive/monitor/mock.go
new file mode 100644 (file)
index 0000000..31a8411
--- /dev/null
@@ -0,0 +1,28 @@
+package monitor
+
+import (
+       "github.com/vapor/protocol/bc/types"
+       "github.com/vapor/test/mock"
+)
+
+func mockChainAndPool() (*mock.Chain, *mock.Mempool, error) {
+       txPool := &mock.Mempool{}
+       mockChain := mock.NewChain(txPool)
+       genesisBlock, err := getGenesisBlock()
+       if err != nil {
+               return nil, nil, err
+       }
+
+       mockChain.SetBlockByHeight(genesisBlock.BlockHeader.Height, genesisBlock)
+       mockChain.SetBestBlockHeader(&genesisBlock.BlockHeader)
+       return mockChain, txPool, nil
+}
+
+func getGenesisBlock() (*types.Block, error) {
+       genesisBlock := &types.Block{}
+       if err := genesisBlock.UnmarshalText([]byte("030100000000000000000000000000000000000000000000000000000000000000000082bfe3f4bf2d4052415e796436f587fac94677b20f027e910b70e2c220c411c0e87c37e0e1cc2ec9c377e5192668bc0a367e4a4764f11e7c725ecced1d7b6a492974fab1b6d5bc01000107010001012402220020f86826d640810eb08a2bfb706e0092273e05e9a7d3d71f9d53f4f6cc2e3d6c6a0001013b0039ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff00011600148c9d063ff74ee6d9ffa88d83aeb038068366c4c400")); err != nil {
+               return nil, err
+       }
+
+       return genesisBlock, nil
+}
diff --git a/toolbar/precognitive/monitor/monitor.go b/toolbar/precognitive/monitor/monitor.go
new file mode 100644 (file)
index 0000000..8a838ae
--- /dev/null
@@ -0,0 +1,141 @@
+package monitor
+
+import (
+       "fmt"
+       "os"
+       "os/user"
+       "path"
+       "strings"
+
+       "github.com/jinzhu/gorm"
+       log "github.com/sirupsen/logrus"
+
+       vaporCfg "github.com/vapor/config"
+       "github.com/vapor/crypto/ed25519/chainkd"
+       dbm "github.com/vapor/database/leveldb"
+       "github.com/vapor/event"
+       "github.com/vapor/netsync/chainmgr"
+       "github.com/vapor/netsync/consensusmgr"
+       "github.com/vapor/netsync/peers"
+       "github.com/vapor/p2p"
+       "github.com/vapor/p2p/discover/dht"
+       "github.com/vapor/p2p/discover/mdns"
+       "github.com/vapor/p2p/signlib"
+       "github.com/vapor/test/mock"
+       "github.com/vapor/toolbar/precognitive/config"
+)
+
+type monitor struct {
+       cfg            *config.Config
+       db             *gorm.DB
+       nodeCfg        *vaporCfg.Config
+       sw             *p2p.Switch
+       privKey        chainkd.XPrv
+       chain          *mock.Chain
+       txPool         *mock.Mempool
+       bestHeightSeen uint64
+       peers          *peers.PeerSet
+}
+
+func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
+       dbPath, err := makePath()
+       if err != nil {
+               log.WithFields(log.Fields{"err": err}).Fatal("makePath")
+       }
+
+       nodeCfg := &vaporCfg.Config{
+               BaseConfig: vaporCfg.DefaultBaseConfig(),
+               P2P:        vaporCfg.DefaultP2PConfig(),
+               Federation: vaporCfg.DefaultFederationConfig(),
+       }
+       nodeCfg.DBPath = dbPath
+       nodeCfg.ChainID = "mainnet"
+       privKey, err := signlib.NewPrivKey()
+       if err != nil {
+               log.WithFields(log.Fields{"err": err}).Fatal("NewPrivKey")
+       }
+
+       chain, txPool, err := mockChainAndPool()
+       if err != nil {
+               log.WithFields(log.Fields{"err": err}).Fatal("mockChainAndPool")
+       }
+
+       return &monitor{
+               cfg:            cfg,
+               db:             db,
+               nodeCfg:        nodeCfg,
+               privKey:        privKey.(chainkd.XPrv),
+               chain:          chain,
+               txPool:         txPool,
+               bestHeightSeen: uint64(0),
+       }
+}
+
+func makePath() (string, error) {
+       usr, err := user.Current()
+       if err != nil {
+               return "", err
+       }
+
+       dataPath := path.Join(usr.HomeDir, "/.vapor_precog")
+       if err := os.MkdirAll(dataPath, os.ModePerm); err != nil {
+               return "", err
+       }
+
+       return dataPath, nil
+}
+
+func (m *monitor) Run() {
+       if err := m.makeSwitch(); err != nil {
+               log.WithFields(log.Fields{"err": err}).Fatal("makeSwitch")
+       }
+
+       go m.discoveryRoutine()
+       go m.connectionRoutine()
+}
+
+func (m *monitor) makeSwitch() error {
+       var seeds []string
+       for _, node := range m.cfg.Nodes {
+               seeds = append(seeds, fmt.Sprintf("%s:%d", node.IP, node.Port))
+       }
+       m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",")
+
+       l, listenAddr := p2p.GetListener(m.nodeCfg.P2P)
+       discv, err := dht.NewDiscover(m.nodeCfg, m.privKey, l.ExternalAddress().Port, m.cfg.NetworkID)
+       if err != nil {
+               return err
+       }
+
+       // no need for lanDiscv, but passing &mdns.LANDiscover{} will cause NilPointer
+       lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
+       m.sw, err = p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID)
+       if err != nil {
+               return err
+       }
+
+       m.peers = peers.NewPeerSet(m.sw)
+       return m.prepareReactors()
+}
+
+func (m *monitor) prepareReactors() error {
+       dispatcher := event.NewDispatcher()
+       // add ConsensusReactor for consensusChannel
+       _ = consensusmgr.NewManager(m.sw, m.chain, m.peers, dispatcher)
+       fastSyncDB := dbm.NewDB("fastsync", m.nodeCfg.DBBackend, m.nodeCfg.DBDir())
+       // add ProtocolReactor to handle msgs
+       if _, err := chainmgr.NewManager(m.nodeCfg, m.sw, m.chain, m.txPool, dispatcher, m.peers, fastSyncDB); err != nil {
+               return err
+       }
+
+       for label, reactor := range m.sw.GetReactors() {
+               log.WithFields(log.Fields{"label": label, "reactor": reactor}).Debug("start reactor")
+               if _, err := reactor.Start(); err != nil {
+                       return err
+               }
+       }
+
+       m.sw.GetSecurity().RegisterFilter(m.sw.GetNodeInfo())
+       m.sw.GetSecurity().RegisterFilter(m.sw.GetPeers())
+       return m.sw.GetSecurity().Start()
+}
diff --git a/toolbar/precognitive/monitor/stats.go b/toolbar/precognitive/monitor/stats.go
new file mode 100644 (file)
index 0000000..3538f61
--- /dev/null
@@ -0,0 +1,184 @@
+package monitor
+
+import (
+       "database/sql"
+       "fmt"
+       "net"
+       "strconv"
+       "time"
+
+       "github.com/jinzhu/gorm"
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/netsync/peers"
+       "github.com/vapor/p2p"
+       "github.com/vapor/toolbar/precognitive/common"
+       "github.com/vapor/toolbar/precognitive/config"
+       "github.com/vapor/toolbar/precognitive/database/orm"
+)
+
+func (m *monitor) upsertNode(node *config.Node) error {
+       ormNode := &orm.Node{
+               IP:   node.IP,
+               Port: node.Port,
+       }
+       if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
+               return err
+       }
+
+       ormNode.PublicKey = node.PublicKey
+       if node.XPub != nil {
+               ormNode.Xpub = node.XPub.String()
+               ormNode.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
+       }
+       return m.db.Save(ormNode).Error
+}
+
+func parseRemoteAddr(remoteAddr string) (string, uint16, error) {
+       host, portStr, err := net.SplitHostPort(remoteAddr)
+       if err != nil {
+               return "", 0, err
+       }
+
+       port, err := strconv.Atoi(portStr)
+       if err != nil {
+               return "", 0, err
+       }
+
+       return host, uint16(port), nil
+}
+
+func (m *monitor) processDialResults(peerList []*p2p.Peer) error {
+       var ormNodes []*orm.Node
+       if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
+               return err
+       }
+
+       addressMap := make(map[string]*orm.Node, len(ormNodes))
+       for _, ormNode := range ormNodes {
+               addressMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)] = ormNode
+       }
+
+       connMap := make(map[string]bool, len(ormNodes))
+       // connected peers
+       for _, peer := range peerList {
+               connMap[peer.RemoteAddr] = true
+               if err := m.processConnectedPeer(addressMap[peer.RemoteAddr]); err != nil {
+                       log.WithFields(log.Fields{"peer remoteAddr": peer.RemoteAddr, "err": err}).Error("processConnectedPeer")
+               }
+       }
+
+       // offline peers
+       for _, ormNode := range ormNodes {
+               if _, ok := connMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)]; ok {
+                       continue
+               }
+
+               if err := m.processOfflinePeer(ormNode); err != nil {
+                       log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
+               }
+       }
+
+       return nil
+}
+
+func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
+       ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
+       err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
+       if err != nil && err != gorm.ErrRecordNotFound {
+               return err
+       }
+
+       ormNodeLiveness.PongTimes += 1
+       if ormNode.Status == common.NodeOfflineStatus {
+               ormNode.Status = common.NodeUnknownStatus
+       }
+       ormNodeLiveness.Node = ormNode
+       return m.db.Save(ormNodeLiveness).Error
+}
+
+func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
+       ormNode.Status = common.NodeOfflineStatus
+       return m.db.Save(ormNode).Error
+}
+
+func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
+       for _, peerInfo := range peerInfos {
+               dbTx := m.db.Begin()
+               if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
+                       log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
+                       dbTx.Rollback()
+               } else {
+                       dbTx.Commit()
+               }
+       }
+}
+
+func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
+       ip, port, err := parseRemoteAddr(peerInfo.RemoteAddr)
+       if err != nil {
+               return err
+       }
+
+       ormNode := &orm.Node{IP: ip, Port: uint16(port)}
+       if err := dbTx.Where(ormNode).First(ormNode).Error; err != nil {
+               return err
+       }
+
+       if ormNode.Status == common.NodeOfflineStatus {
+               return fmt.Errorf("node %s:%d status error", ormNode.IP, ormNode.Port)
+       }
+
+       log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
+       ping, err := time.ParseDuration(peerInfo.Ping)
+       if err != nil {
+               return err
+       }
+
+       now := time.Now()
+       yesterday := now.Add(-24 * time.Hour)
+       var ormNodeLivenesses []*orm.NodeLiveness
+       if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
+               Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
+               Order(fmt.Sprintf("created_at %s", "DESC")).Find(&ormNodeLivenesses).Error; err != nil {
+               return err
+       }
+
+       // update latest liveness
+       latestLiveness := ormNodeLivenesses[0]
+       rttMS := ping.Nanoseconds() / 1000000
+       if rttMS > 0 && uint64(rttMS) <= m.cfg.Policy.RequiredRttMS {
+               ormNode.Status = common.NodeHealthyStatus
+       } else if uint64(rttMS) > m.cfg.Policy.RequiredRttMS {
+               ormNode.Status = common.NodeCongestedStatus
+       }
+       if rttMS != 0 {
+               ormNode.AvgRttMS = sql.NullInt64{
+                       Int64: (ormNode.AvgRttMS.Int64*int64(latestLiveness.PongTimes) + rttMS) / int64(latestLiveness.PongTimes+1),
+                       Valid: true,
+               }
+       }
+       latestLiveness.PongTimes += 1
+       if peerInfo.Height != 0 {
+               latestLiveness.BestHeight = peerInfo.Height
+               ormNode.BestHeight = peerInfo.Height
+       }
+       if err := dbTx.Save(latestLiveness).Error; err != nil {
+               return err
+       }
+
+       // calc LatestDailyUptimeMinutes
+       total := 0 * time.Minute
+       ormNodeLivenesses[0].UpdatedAt = now
+       for _, ormNodeLiveness := range ormNodeLivenesses {
+               if ormNodeLiveness.CreatedAt.Before(yesterday) {
+                       ormNodeLiveness.CreatedAt = yesterday
+               }
+
+               total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
+       }
+       ormNode.LatestDailyUptimeMinutes = uint64(total.Minutes())
+       ormNode.Alias = peerInfo.Moniker
+       ormNode.Xpub = peerInfo.ID
+       return dbTx.Save(ormNode).Error
+}