_testmain.go
cmd/fedd/fedd
+cmd/precognitive/precognitive
cmd/vapord/vapord
cmd/vapord/.vapord
cmd/vaporcli/vaporcli
@echo "Building fedd to cmd/fedd/fedd"
@go build $(BUILD_FLAGS) -o cmd/fedd/fedd cmd/fedd/main.go
+precognitive:
+ @echo "Building precognitive to cmd/precognitive/precognitive"
+ @go build $(BUILD_FLAGS) -o cmd/precognitive/precognitive cmd/precognitive/main.go
+
vapord:
@echo "Building vapord to cmd/vapord/vapord"
@go build $(BUILD_FLAGS) -o cmd/vapord/vapord cmd/vapord/main.go
--- /dev/null
+package main
+
+import (
+ "sync"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/toolbar/common"
+ "github.com/vapor/toolbar/precognitive/api"
+ "github.com/vapor/toolbar/precognitive/config"
+ "github.com/vapor/toolbar/precognitive/monitor"
+)
+
+func main() {
+ cfg := config.NewConfig()
+ db, err := common.NewMySQLDB(cfg.MySQLConfig)
+ if err != nil {
+ log.WithField("err", err).Panic("initialize mysql db error")
+ }
+
+ go monitor.NewMonitor(cfg, db).Run()
+ go api.NewApiServer(cfg, db).Run()
+
+ // keep the main func running in case of terminating goroutines
+ var wg sync.WaitGroup
+ wg.Add(1)
+ wg.Wait()
+}
// PrivateKey is the type of Ed25519 private keys. It implements crypto.Signer.
type PrivateKey []byte
+func (pub PublicKey) String() string {
+ return hex.EncodeToString(pub)
+}
+
// Public returns the PublicKey corresponding to priv.
func (priv PrivateKey) Public() PublicKey {
publicKey := make([]byte, PublicKeySize)
--- /dev/null
+# Precognitive
+
+Keep monitoring (leader & candidate) consensus nodes status in vapor network.
+
+## Init
+
+### Database Schema
+[precognitive_schema.sql](./sql_dump/precognitive_schema.sql)
+
+### Config
+run with [config_example.json](docs/precognitive/config_example.json)
+```
+go run cmd/precognitive/main.go docs/precognitive/config_example.json
+```
+
+## API
+
++ [/list-nodes](#list-nodes)
+
+### /list-nodes
+
+__method:__ POST
+
+```
+curl -X POST 127.0.0.1:3009/api/v1/list-nodes -d '{}'
+```
+
+__example response:__
+```
+{
+ [
+ {
+ "alias": "cobo",
+ "public_key": "b928e46bb01e834fdf167185e31b15de7cc257af8bbdf17f9c7fefd5bb97b306d048b6bc0da2097152c1c2ff38333c756a543adbba7030a447dcc776b8ac64ef",
+ "host": "vapornode.cobo.com",
+ "port": 123,
+ "best_height": 1023,
+ "lantency_ms": 300,
+ "active_minutes": 4096,
+ "status": "healthy"
+ },
+ {
+ "alias": "matpool",
+ "public_key": "0f8669abbd3cc0a167156188e428f940088d5b2f36bb3449df71d2bdc5e077814ea3f68628eef279ed435f51ee26cff00f8bd28fabfd500bedb2a9e369f5c825",
+ "host": "vapornode.matpool.io",
+ "port": 321,
+ "best_height": 1024,
+ "lantency_ms": 299,
+ "active_minutes": 4097,
+ "status": "healthy"
+ }
+ ]
+}
+```
+
+
+### /get-node-statistics
\ No newline at end of file
--- /dev/null
+{
+ "api" : {
+ "listening_port" : 3009
+ },
+ "network_id": 10817814959495988245,
+ "mysql" : {
+ "connection" : {
+ "host": "127.0.0.1",
+ "port": 3306,
+ "username": "root",
+ "password": "toor",
+ "database": "precognitive"
+ },
+ "log_mode" : true
+ },
+ "check_frequency_minutes" : 30,
+ "policy" : {
+ "required_rtt_ms" : 2000
+ },
+ "seeds" : [
+ {
+ "xpub" : "f2767279cd01ed8793808e0542a18958e1a2f3a6b6fe5328ec79596a022bc6f085951a98a631917563f86bb91db9159dd2969ff9d690fc12b250baff2b6f6a1d",
+ "ip": "47.103.79.68",
+ "port": 56656
+ },
+ {
+ "xpub" : "c785deb76af14e918ea05eeab863288c48b16d1816621adf74ada868e8e3246780bf5206dc5a0e3efef0c119978e11ed75eca9d3b50846e37c55cf8d56717c4f",
+ "ip": "47.103.13.86",
+ "port": 56656
+ },
+ {
+ "xpub" : "22bc19ec65d4ee524c5130575ddff041e712dbb415740eae314fd3359aa3978319384cd3ded8c4125ca2774716d7285268ebf1d85091eef8e7ad03077857e7ab",
+ "ip": "47.102.193.119",
+ "port": 56656
+ },
+ {
+ "xpub" : "214c0e6827346e9fee1056c4c8b96cefd67b75ed1dead59e4e4e3eee8c1fe095dbe7a7fb61bb23b4ab66cde2a1c04466b8d3e8efa21cf7eee064c70fb1525b14",
+ "ip": "47.103.17.22",
+ "port": 56656
+ }
+ ]
+}
\ No newline at end of file
--- /dev/null
+/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
+/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
+/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
+/*!40101 SET NAMES utf8 */;
+/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
+/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
+/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
+
+CREATE SCHEMA IF NOT EXISTS `precognitive`;
+DROP DATABASE `precognitive`;
+CREATE SCHEMA `precognitive`;
+
+USE `precognitive`;
+
+# Dump of table nodes
+# ------------------------------------------------------------
+
+CREATE TABLE `nodes` (
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `alias` varchar(128) NOT NULL DEFAULT '',
+ `xpub` char(128) NOT NULL DEFAULT '',
+ `public_key` char(64) NOT NULL DEFAULT '',
+ `ip` varchar(128) NOT NULL DEFAULT '',
+ `port` smallint unsigned NOT NULL DEFAULT '0',
+ `best_height` int(11) DEFAULT '0',
+ `avg_rtt_ms` int(11) DEFAULT NULL,
+ `latest_daily_uptime_minutes` int(11) DEFAULT '0',
+ `status` tinyint(1) NOT NULL DEFAULT '0',
+ `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `address` (`ip`,`port`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+LOCK TABLES `nodes` WRITE;
+UNLOCK TABLES;
+
+
+# Dump of table node_livenesses
+# ------------------------------------------------------------
+
+CREATE TABLE `node_livenesses` (
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `node_id` int(11) NOT NULL,
+ `ping_times` int(11) DEFAULT '0',
+ `pong_times` int(11) DEFAULT '0',
+ `best_height` int(11) DEFAULT '0',
+ `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY (`id`),
+ CONSTRAINT `node_livenesses_ibfk_1` FOREIGN KEY (`node_id`) REFERENCES `nodes` (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+LOCK TABLES `node_livenesses` WRITE;
+UNLOCK TABLES;
+
return false
}
+func (p *P2PPeer) Moniker() string {
+ return ""
+}
+
func (p *P2PPeer) RemoteAddrHost() string {
return ""
}
return ""
}
+func (p *p2peer) Moniker() string {
+ return ""
+}
+
func (p *p2peer) RemoteAddrHost() string {
return ""
}
//BasePeer is the interface for connection level peer
type BasePeer interface {
+ Moniker() string
Addr() net.Addr
ID() string
RemoteAddrHost() string
// PeerInfo indicate peer status snap
type PeerInfo struct {
ID string `json:"peer_id"`
+ Moniker string `json:"moniker"`
RemoteAddr string `json:"remote_addr"`
Height uint64 `json:"height"`
Ping string `json:"ping"`
return &PeerInfo{
ID: p.ID(),
+ Moniker: p.BasePeer.Moniker(),
RemoteAddr: p.Addr().String(),
Height: p.bestHeight,
Ping: ping.String(),
return bp.id
}
+func (bp *basePeer) Moniker() string {
+ return ""
+}
+
func (bp *basePeer) RemoteAddrHost() string {
switch bp.ID() {
case peer1ID:
// NewSyncManager create sync manager and set switch.
func NewSyncManager(config *config.Config, chain *protocol.Chain, txPool *protocol.TxPool, dispatcher *event.Dispatcher, fastSyncDB dbm.DB) (*SyncManager, error) {
- sw, err := p2p.NewSwitch(config)
+ sw, err := p2p.NewSwitchMaybeDiscover(config)
if err != nil {
return nil, err
}
return p.isLAN
}
+// Moniker returns peer's moniker.
+func (p *Peer) Moniker() string {
+ return p.NodeInfo.Moniker
+}
+
// PubKey returns peer's public key.
func (p *Peer) PubKey() string {
return p.conn.(*connection.SecretConnection).RemotePubKey().String()
security Security
}
-// NewSwitch create a new Switch and set discover.
-func NewSwitch(config *cfg.Config) (*Switch, error) {
+// NewSwitchMaybeDiscover create a new Switch and set discover.
+func NewSwitchMaybeDiscover(config *cfg.Config) (*Switch, error) {
var err error
var l Listener
var listenAddr string
}
}
- return newSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID)
+ return NewSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID)
}
// newSwitch creates a new Switch with the given config.
-func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
+func NewSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
sw := &Switch{
Config: config,
peerConfig: DefaultPeerConfig(config.P2P),
return sw, nil
}
+func (sw *Switch) GetDiscv() discv {
+ return sw.discv
+}
+
+func (sw *Switch) GetNodeInfo() *NodeInfo {
+ return sw.nodeInfo
+}
+
+func (sw *Switch) GetPeers() *PeerSet {
+ return sw.peers
+}
+
+func (sw *Switch) GetReactors() map[string]Reactor {
+ return sw.reactors
+}
+
+func (sw *Switch) GetSecurity() Security {
+ return sw.security
+}
+
// OnStart implements BaseService. It starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() error {
for _, reactor := range sw.reactors {
for i := 0; i < len(lanPeer.IP); i++ {
addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
}
- sw.dialPeers(addresses)
+ sw.DialPeers(addresses)
}
func (sw *Switch) connectLANPeersRoutine() {
break
}
- // disconnect if we alrady have MaxNumPeers
+ // disconnect if we already have MaxNumPeers
if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
if err := inConn.Close(); err != nil {
log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
wg.Done()
}
-func (sw *Switch) dialPeers(addresses []*NetAddress) {
+func (sw *Switch) DialPeers(addresses []*NetAddress) {
connectedPeers := make(map[string]struct{})
for _, peer := range sw.Peers().List() {
connectedPeers[peer.RemoteAddrHost()] = struct{}{}
addresses = append(addresses, address)
}
- sw.dialPeers(addresses)
+ sw.DialPeers(addresses)
}
func (sw *Switch) ensureOutboundPeers() {
address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
addresses = append(addresses, address)
}
- sw.dialPeers(addresses)
+ sw.DialPeers(addresses)
}
func (sw *Switch) ensureOutboundPeersRoutine() {
// new switch, add reactors
l, listenAddr := GetListener(cfg.P2P)
cfg.P2P.LANDiscover = false
- sw, err := newSwitch(cfg, new(mockDiscv), nil, l, privKey, listenAddr, 0)
+ sw, err := NewSwitch(cfg, new(mockDiscv), nil, l, privKey, listenAddr, 0)
if err != nil {
log.Errorf("create switch error: %s", err)
return nil
func NewChain(mempool *Mempool) *Chain {
return &Chain{
- heightMap: map[uint64]*types.Block{},
- blockMap: map[bc.Hash]*types.Block{},
- prevOrphans: make(map[bc.Hash]*types.Block),
- mempool: mempool,
+ bestBlockHeader: &types.BlockHeader{},
+ heightMap: map[uint64]*types.Block{},
+ blockMap: map[bc.Hash]*types.Block{},
+ prevOrphans: make(map[bc.Hash]*types.Block),
+ mempool: mempool,
}
}
return false, nil
}
+// TODO:
+func (c *Chain) ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error {
+ return nil
+}
+
func (c *Chain) SetBestBlockHeader(header *types.BlockHeader) {
c.bestBlockHeader = header
}
-package types
+package common
import (
"fmt"
package orm
import (
- "github.com/vapor/toolbar/federation/types"
+ "github.com/vapor/toolbar/common"
)
type Asset struct {
- ID uint64 `gorm:"primary_key;foreignkey:ID" json:"-"`
- AssetID string `json:"asset_id"`
- IssuanceProgram string `json:"-"`
- VMVersion uint64 `json:"-"`
- Definition string `json:"-"`
- CreatedAt types.Timestamp `json:"-"`
- UpdatedAt types.Timestamp `json:"-"`
+ ID uint64 `gorm:"primary_key;foreignkey:ID" json:"-"`
+ AssetID string `json:"asset_id"`
+ IssuanceProgram string `json:"-"`
+ VMVersion uint64 `json:"-"`
+ Definition string `json:"-"`
+ CreatedAt common.Timestamp `json:"-"`
+ UpdatedAt common.Timestamp `json:"-"`
}
package orm
import (
- "github.com/vapor/toolbar/federation/types"
+ "github.com/vapor/toolbar/common"
)
type Chain struct {
- ID uint64 `gorm:"primary_key" json:"-"`
- Name string `json:"name"`
- BlockHeight uint64 `json:"block_height"`
- BlockHash string `json:"block_hash"`
- CreatedAt types.Timestamp `json:"-"`
- UpdatedAt types.Timestamp `json:"-"`
+ ID uint64 `gorm:"primary_key" json:"-"`
+ Name string `json:"name"`
+ BlockHeight uint64 `json:"block_height"`
+ BlockHash string `json:"block_hash"`
+ CreatedAt common.Timestamp `json:"-"`
+ UpdatedAt common.Timestamp `json:"-"`
}
"encoding/json"
"github.com/vapor/errors"
- "github.com/vapor/toolbar/federation/common"
- "github.com/vapor/toolbar/federation/types"
+ "github.com/vapor/toolbar/common"
+ fedCommon "github.com/vapor/toolbar/federation/common"
)
type CrossTransaction struct {
DestTxIndex sql.NullInt64 `sql:"default:null"`
DestTxHash sql.NullString `sql:"default:null"`
Status uint8
- CreatedAt types.Timestamp
- UpdatedAt types.Timestamp
+ CreatedAt common.Timestamp
+ UpdatedAt common.Timestamp
Chain *Chain `gorm:"foreignkey:ChainID"`
Reqs []*CrossTransactionReq
func (c *CrossTransaction) MarshalJSON() ([]byte, error) {
var status string
switch c.Status {
- case common.CrossTxPendingStatus:
- status = common.CrossTxPendingStatusLabel
- case common.CrossTxCompletedStatus:
- status = common.CrossTxCompletedStatusLabel
+ case fedCommon.CrossTxPendingStatus:
+ status = fedCommon.CrossTxPendingStatusLabel
+ case fedCommon.CrossTxCompletedStatus:
+ status = fedCommon.CrossTxCompletedStatusLabel
default:
return nil, errors.New("unknown cross-chain tx status")
}
package orm
import (
- "github.com/vapor/toolbar/federation/types"
+ "github.com/vapor/toolbar/common"
)
type CrossTransactionReq struct {
- ID uint64 `gorm:"primary_key" json:"-"`
- CrossTransactionID uint64 `json:"-"`
- SourcePos uint64 `json:"-"`
- AssetID uint64 `json:"-"`
- AssetAmount uint64 `json:"amount"`
- Script string `json:"-"`
- FromAddress string `json:"from_address"`
- ToAddress string `json:"to_address"`
- CreatedAt types.Timestamp `json:"-"`
- UpdatedAt types.Timestamp `json:"-"`
+ ID uint64 `gorm:"primary_key" json:"-"`
+ CrossTransactionID uint64 `json:"-"`
+ SourcePos uint64 `json:"-"`
+ AssetID uint64 `json:"-"`
+ AssetAmount uint64 `json:"amount"`
+ Script string `json:"-"`
+ FromAddress string `json:"from_address"`
+ ToAddress string `json:"to_address"`
+ CreatedAt common.Timestamp `json:"-"`
+ UpdatedAt common.Timestamp `json:"-"`
CrossTransaction *CrossTransaction `gorm:"foreignkey:CrossTransactionID" json:"-"`
Asset *Asset `json:"asset"`
--- /dev/null
+package api
+
+import (
+ "github.com/gin-gonic/gin"
+
+ "github.com/vapor/toolbar/precognitive/database/orm"
+ serverCommon "github.com/vapor/toolbar/server"
+)
+
+type listNodesReq struct{ serverCommon.Display }
+
+func (s *Server) ListNodes(c *gin.Context, listNodesReq *listNodesReq, query *serverCommon.PaginationQuery) ([]*orm.Node, error) {
+ var ormNodes []*orm.Node
+ if err := s.db.Offset(query.Start).Limit(query.Limit).Find(&ormNodes).Error; err != nil {
+ return nil, err
+ }
+
+ return ormNodes, nil
+}
--- /dev/null
+package api
+
+import (
+ "fmt"
+
+ "github.com/gin-gonic/gin"
+ "github.com/jinzhu/gorm"
+
+ "github.com/vapor/toolbar/precognitive/config"
+ serverCommon "github.com/vapor/toolbar/server"
+)
+
+type Server struct {
+ cfg *config.Config
+ db *gorm.DB
+ engine *gin.Engine
+}
+
+func NewApiServer(cfg *config.Config, db *gorm.DB) *Server {
+ server := &Server{
+ cfg: cfg,
+ db: db,
+ }
+ if cfg.API.IsReleaseMode {
+ gin.SetMode(gin.ReleaseMode)
+ }
+ server.setupRouter()
+ return server
+}
+
+func (s *Server) setupRouter() {
+ r := gin.Default()
+ r.Use(serverCommon.Middleware(s))
+
+ v1 := r.Group("/api/v1")
+ v1.POST("/list-nodes", serverCommon.HandlerMiddleware(s.ListNodes))
+
+ s.engine = r
+}
+
+func (s *Server) Run() {
+ s.engine.Run(fmt.Sprintf(":%d", s.cfg.API.ListeningPort))
+}
--- /dev/null
+package common
+
+const (
+ NodeUnknownStatus uint8 = iota
+ NodeHealthyStatus
+ NodeCongestedStatus
+ NodeOrphanStatus
+ NodeOfflineStatus
+)
+
+var StatusLookupTable = map[uint8]string{
+ NodeUnknownStatus: "unknown",
+ NodeHealthyStatus: "healthy",
+ NodeCongestedStatus: "congested",
+ NodeOrphanStatus: "orphan",
+ NodeOfflineStatus: "offline",
+}
--- /dev/null
+package config
+
+import (
+ "encoding/json"
+ "os"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/vapor/crypto/ed25519/chainkd"
+
+ "github.com/vapor/toolbar/common"
+)
+
+func NewConfig() *Config {
+ if len(os.Args) <= 1 {
+ log.Fatal("Please setup the config file path")
+ }
+
+ return NewConfigWithPath(os.Args[1])
+}
+
+func NewConfigWithPath(path string) *Config {
+ configFile, err := os.Open(path)
+ if err != nil {
+ log.WithFields(log.Fields{"err": err, "file_path": os.Args[1]}).Fatal("fail to open config file")
+ }
+ defer configFile.Close()
+
+ cfg := &Config{}
+ if err := json.NewDecoder(configFile).Decode(cfg); err != nil {
+ log.WithField("err", err).Fatal("fail to decode config file")
+ }
+
+ return cfg
+}
+
+type Config struct {
+ NetworkID uint64 `json:"network_id"`
+ MySQLConfig common.MySQLConfig `json:"mysql"`
+ CheckFreqMinutes uint64 `json:"check_frequency_minutes"`
+ Policy Policy `json:"policy"`
+ Nodes []Node `json:"seeds"`
+ API API `json:"api"`
+}
+
+type Policy struct {
+ Confirmations uint64 `json:"confirmations"`
+ RequiredRttMS uint64 `json:"required_rtt_ms"`
+}
+
+type Node struct {
+ XPub *chainkd.XPub `json:"xpub"`
+ PublicKey string `json:"public_key"`
+ IP string `json:"ip"`
+ Port uint16 `json:"port"`
+}
+
+type API struct {
+ ListeningPort uint64 `json:"listening_port"`
+ AccessToken string `json:"access_token"`
+ IsReleaseMode bool `json:"is_release_mode"`
+}
--- /dev/null
+package orm
+
+import (
+ "database/sql"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/vapor/toolbar/common"
+ precogCommon "github.com/vapor/toolbar/precognitive/common"
+)
+
+type Node struct {
+ ID uint16 `gorm:"primary_key"`
+ Alias string
+ Xpub string
+ PublicKey string
+ IP string
+ Port uint16
+ BestHeight uint64
+ AvgRttMS sql.NullInt64
+ LatestDailyUptimeMinutes uint64
+ Status uint8
+ CreatedAt time.Time
+ UpdatedAt time.Time
+}
+
+func (n *Node) MarshalJSON() ([]byte, error) {
+ status, ok := precogCommon.StatusLookupTable[n.Status]
+ if !ok {
+ return nil, errors.New("fail to look up status")
+ }
+
+ avgRttMS := uint64(0)
+ if n.AvgRttMS.Valid {
+ avgRttMS = uint64(n.AvgRttMS.Int64)
+ }
+
+ return json.Marshal(&struct {
+ Alias string `json:"alias"`
+ PublicKey string `json:"publickey"`
+ Address string `json:"address"`
+ BestHeight uint64 `json:"best_height"`
+ AvgRttMS uint64 `json:"avg_rtt_ms"`
+ LatestDailyUptimeMinutes uint64 `json:"latest_daily_uptime_minutes"`
+ Status string `json:"status"`
+ UpdatedAt common.Timestamp `json:"updated_at"`
+ }{
+ Alias: n.Alias,
+ PublicKey: n.PublicKey,
+ Address: fmt.Sprintf("%s:%d", n.IP, n.Port),
+ BestHeight: n.BestHeight,
+ AvgRttMS: avgRttMS,
+ LatestDailyUptimeMinutes: n.LatestDailyUptimeMinutes,
+ Status: status,
+ UpdatedAt: common.Timestamp(n.UpdatedAt),
+ })
+}
--- /dev/null
+package orm
+
+import (
+ "time"
+)
+
+type NodeLiveness struct {
+ ID uint64 `gorm:"primary_key"`
+ NodeID uint16
+ PingTimes uint64
+ PongTimes uint64
+ BestHeight uint64
+ CreatedAt time.Time
+ UpdatedAt time.Time
+
+ Node *Node `gorm:"foreignkey:NodeID"`
+}
--- /dev/null
+package monitor
+
+import (
+ "net"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/p2p"
+ "github.com/vapor/toolbar/precognitive/database/orm"
+)
+
+func (m *monitor) connectionRoutine() {
+ ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqMinutes) * time.Minute)
+ for ; true; <-ticker.C {
+ if err := m.dialNodes(); err != nil {
+ log.WithFields(log.Fields{"err": err}).Error("dialNodes")
+ }
+ }
+}
+
+func (m *monitor) dialNodes() error {
+ log.Info("Start to reconnect to nodes...")
+ var nodes []*orm.Node
+ if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil {
+ return err
+ }
+
+ addresses := make([]*p2p.NetAddress, 0)
+ for i := 0; i < len(nodes); i++ {
+ address := p2p.NewNetAddressIPPort(net.ParseIP(nodes[i].IP), nodes[i].Port)
+ addresses = append(addresses, address)
+ }
+
+ // connected peers will be skipped in switch.DialPeers()
+ m.sw.DialPeers(addresses)
+ log.Info("DialPeers done.")
+ peerList := m.sw.GetPeers().List()
+ m.processDialResults(peerList)
+ m.checkStatus(peerList)
+ return nil
+}
+
+func (m *monitor) checkStatus(peerList []*p2p.Peer) {
+ for _, peer := range peerList {
+ peer.Start()
+ m.peers.AddPeer(peer)
+ }
+ log.WithFields(log.Fields{"num": len(m.sw.GetPeers().List()), "peers": m.sw.GetPeers().List()}).Info("connected peers")
+
+ for _, peerInfo := range m.peers.GetPeerInfos() {
+ if peerInfo.Height > m.bestHeightSeen {
+ m.bestHeightSeen = peerInfo.Height
+ }
+ }
+ log.WithFields(log.Fields{"bestHeight": m.bestHeightSeen}).Info("peersInfo")
+ m.processPeerInfos(m.peers.GetPeerInfos())
+
+ for _, peer := range peerList {
+ p := m.peers.GetPeer(peer.ID())
+ if p == nil {
+ continue
+ }
+
+ m.peers.RemovePeer(p.ID())
+ }
+ log.Info("Disonnect all peers.")
+}
--- /dev/null
+package monitor
+
+import (
+ "fmt"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/p2p/discover/dht"
+ "github.com/vapor/toolbar/precognitive/config"
+)
+
+var (
+ nodesToDiscv = 150
+ discvFreqSec = 60
+)
+
+func (m *monitor) discoveryRoutine() {
+ discvMap := make(map[string]*dht.Node)
+ ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
+ for range ticker.C {
+ nodes := make([]*dht.Node, nodesToDiscv)
+ num := m.sw.GetDiscv().ReadRandomNodes(nodes)
+ for _, node := range nodes[:num] {
+ address := fmt.Sprintf("%s:%d", node.IP.String(), node.TCP)
+ if n, ok := discvMap[address]; ok && n.String() == node.String() {
+ continue
+ }
+
+ log.WithFields(log.Fields{"new node": node}).Info("discover")
+
+ if err := m.upsertNode(&config.Node{
+ PublicKey: node.ID.String(),
+ IP: node.IP.String(),
+ Port: node.TCP,
+ }); err != nil {
+ log.WithFields(log.Fields{"node": node, "err": err}).Error("upsertNode")
+ } else {
+ discvMap[address] = node
+ }
+ }
+ }
+}
--- /dev/null
+package monitor
+
+import (
+ "github.com/vapor/protocol/bc/types"
+ "github.com/vapor/test/mock"
+)
+
+func mockChainAndPool() (*mock.Chain, *mock.Mempool, error) {
+ txPool := &mock.Mempool{}
+ mockChain := mock.NewChain(txPool)
+ genesisBlock, err := getGenesisBlock()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ mockChain.SetBlockByHeight(genesisBlock.BlockHeader.Height, genesisBlock)
+ mockChain.SetBestBlockHeader(&genesisBlock.BlockHeader)
+ return mockChain, txPool, nil
+}
+
+func getGenesisBlock() (*types.Block, error) {
+ genesisBlock := &types.Block{}
+ if err := genesisBlock.UnmarshalText([]byte("030100000000000000000000000000000000000000000000000000000000000000000082bfe3f4bf2d4052415e796436f587fac94677b20f027e910b70e2c220c411c0e87c37e0e1cc2ec9c377e5192668bc0a367e4a4764f11e7c725ecced1d7b6a492974fab1b6d5bc01000107010001012402220020f86826d640810eb08a2bfb706e0092273e05e9a7d3d71f9d53f4f6cc2e3d6c6a0001013b0039ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff00011600148c9d063ff74ee6d9ffa88d83aeb038068366c4c400")); err != nil {
+ return nil, err
+ }
+
+ return genesisBlock, nil
+}
--- /dev/null
+package monitor
+
+import (
+ "fmt"
+ "os"
+ "os/user"
+ "path"
+ "strings"
+
+ "github.com/jinzhu/gorm"
+ log "github.com/sirupsen/logrus"
+
+ vaporCfg "github.com/vapor/config"
+ "github.com/vapor/crypto/ed25519/chainkd"
+ dbm "github.com/vapor/database/leveldb"
+ "github.com/vapor/event"
+ "github.com/vapor/netsync/chainmgr"
+ "github.com/vapor/netsync/consensusmgr"
+ "github.com/vapor/netsync/peers"
+ "github.com/vapor/p2p"
+ "github.com/vapor/p2p/discover/dht"
+ "github.com/vapor/p2p/discover/mdns"
+ "github.com/vapor/p2p/signlib"
+ "github.com/vapor/test/mock"
+ "github.com/vapor/toolbar/precognitive/config"
+)
+
+type monitor struct {
+ cfg *config.Config
+ db *gorm.DB
+ nodeCfg *vaporCfg.Config
+ sw *p2p.Switch
+ privKey chainkd.XPrv
+ chain *mock.Chain
+ txPool *mock.Mempool
+ bestHeightSeen uint64
+ peers *peers.PeerSet
+}
+
+func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
+ dbPath, err := makePath()
+ if err != nil {
+ log.WithFields(log.Fields{"err": err}).Fatal("makePath")
+ }
+
+ nodeCfg := &vaporCfg.Config{
+ BaseConfig: vaporCfg.DefaultBaseConfig(),
+ P2P: vaporCfg.DefaultP2PConfig(),
+ Federation: vaporCfg.DefaultFederationConfig(),
+ }
+ nodeCfg.DBPath = dbPath
+ nodeCfg.ChainID = "mainnet"
+ privKey, err := signlib.NewPrivKey()
+ if err != nil {
+ log.WithFields(log.Fields{"err": err}).Fatal("NewPrivKey")
+ }
+
+ chain, txPool, err := mockChainAndPool()
+ if err != nil {
+ log.WithFields(log.Fields{"err": err}).Fatal("mockChainAndPool")
+ }
+
+ return &monitor{
+ cfg: cfg,
+ db: db,
+ nodeCfg: nodeCfg,
+ privKey: privKey.(chainkd.XPrv),
+ chain: chain,
+ txPool: txPool,
+ bestHeightSeen: uint64(0),
+ }
+}
+
+func makePath() (string, error) {
+ usr, err := user.Current()
+ if err != nil {
+ return "", err
+ }
+
+ dataPath := path.Join(usr.HomeDir, "/.vapor_precog")
+ if err := os.MkdirAll(dataPath, os.ModePerm); err != nil {
+ return "", err
+ }
+
+ return dataPath, nil
+}
+
+func (m *monitor) Run() {
+ if err := m.makeSwitch(); err != nil {
+ log.WithFields(log.Fields{"err": err}).Fatal("makeSwitch")
+ }
+
+ go m.discoveryRoutine()
+ go m.connectionRoutine()
+}
+
+func (m *monitor) makeSwitch() error {
+ var seeds []string
+ for _, node := range m.cfg.Nodes {
+ seeds = append(seeds, fmt.Sprintf("%s:%d", node.IP, node.Port))
+ }
+ m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",")
+
+ l, listenAddr := p2p.GetListener(m.nodeCfg.P2P)
+ discv, err := dht.NewDiscover(m.nodeCfg, m.privKey, l.ExternalAddress().Port, m.cfg.NetworkID)
+ if err != nil {
+ return err
+ }
+
+ // no need for lanDiscv, but passing &mdns.LANDiscover{} will cause NilPointer
+ lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
+ m.sw, err = p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID)
+ if err != nil {
+ return err
+ }
+
+ m.peers = peers.NewPeerSet(m.sw)
+ return m.prepareReactors()
+}
+
+func (m *monitor) prepareReactors() error {
+ dispatcher := event.NewDispatcher()
+ // add ConsensusReactor for consensusChannel
+ _ = consensusmgr.NewManager(m.sw, m.chain, m.peers, dispatcher)
+ fastSyncDB := dbm.NewDB("fastsync", m.nodeCfg.DBBackend, m.nodeCfg.DBDir())
+ // add ProtocolReactor to handle msgs
+ if _, err := chainmgr.NewManager(m.nodeCfg, m.sw, m.chain, m.txPool, dispatcher, m.peers, fastSyncDB); err != nil {
+ return err
+ }
+
+ for label, reactor := range m.sw.GetReactors() {
+ log.WithFields(log.Fields{"label": label, "reactor": reactor}).Debug("start reactor")
+ if _, err := reactor.Start(); err != nil {
+ return err
+ }
+ }
+
+ m.sw.GetSecurity().RegisterFilter(m.sw.GetNodeInfo())
+ m.sw.GetSecurity().RegisterFilter(m.sw.GetPeers())
+ return m.sw.GetSecurity().Start()
+}
--- /dev/null
+package monitor
+
+import (
+ "database/sql"
+ "fmt"
+ "net"
+ "strconv"
+ "time"
+
+ "github.com/jinzhu/gorm"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/netsync/peers"
+ "github.com/vapor/p2p"
+ "github.com/vapor/toolbar/precognitive/common"
+ "github.com/vapor/toolbar/precognitive/config"
+ "github.com/vapor/toolbar/precognitive/database/orm"
+)
+
+func (m *monitor) upsertNode(node *config.Node) error {
+ ormNode := &orm.Node{
+ IP: node.IP,
+ Port: node.Port,
+ }
+ if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
+ return err
+ }
+
+ ormNode.PublicKey = node.PublicKey
+ if node.XPub != nil {
+ ormNode.Xpub = node.XPub.String()
+ ormNode.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
+ }
+ return m.db.Save(ormNode).Error
+}
+
+func parseRemoteAddr(remoteAddr string) (string, uint16, error) {
+ host, portStr, err := net.SplitHostPort(remoteAddr)
+ if err != nil {
+ return "", 0, err
+ }
+
+ port, err := strconv.Atoi(portStr)
+ if err != nil {
+ return "", 0, err
+ }
+
+ return host, uint16(port), nil
+}
+
+func (m *monitor) processDialResults(peerList []*p2p.Peer) error {
+ var ormNodes []*orm.Node
+ if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
+ return err
+ }
+
+ addressMap := make(map[string]*orm.Node, len(ormNodes))
+ for _, ormNode := range ormNodes {
+ addressMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)] = ormNode
+ }
+
+ connMap := make(map[string]bool, len(ormNodes))
+ // connected peers
+ for _, peer := range peerList {
+ connMap[peer.RemoteAddr] = true
+ if err := m.processConnectedPeer(addressMap[peer.RemoteAddr]); err != nil {
+ log.WithFields(log.Fields{"peer remoteAddr": peer.RemoteAddr, "err": err}).Error("processConnectedPeer")
+ }
+ }
+
+ // offline peers
+ for _, ormNode := range ormNodes {
+ if _, ok := connMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)]; ok {
+ continue
+ }
+
+ if err := m.processOfflinePeer(ormNode); err != nil {
+ log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
+ }
+ }
+
+ return nil
+}
+
+func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
+ ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
+ err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
+ if err != nil && err != gorm.ErrRecordNotFound {
+ return err
+ }
+
+ ormNodeLiveness.PongTimes += 1
+ if ormNode.Status == common.NodeOfflineStatus {
+ ormNode.Status = common.NodeUnknownStatus
+ }
+ ormNodeLiveness.Node = ormNode
+ return m.db.Save(ormNodeLiveness).Error
+}
+
+func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
+ ormNode.Status = common.NodeOfflineStatus
+ return m.db.Save(ormNode).Error
+}
+
+func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
+ for _, peerInfo := range peerInfos {
+ dbTx := m.db.Begin()
+ if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
+ log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
+ dbTx.Rollback()
+ } else {
+ dbTx.Commit()
+ }
+ }
+}
+
+func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
+ ip, port, err := parseRemoteAddr(peerInfo.RemoteAddr)
+ if err != nil {
+ return err
+ }
+
+ ormNode := &orm.Node{IP: ip, Port: uint16(port)}
+ if err := dbTx.Where(ormNode).First(ormNode).Error; err != nil {
+ return err
+ }
+
+ if ormNode.Status == common.NodeOfflineStatus {
+ return fmt.Errorf("node %s:%d status error", ormNode.IP, ormNode.Port)
+ }
+
+ log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
+ ping, err := time.ParseDuration(peerInfo.Ping)
+ if err != nil {
+ return err
+ }
+
+ now := time.Now()
+ yesterday := now.Add(-24 * time.Hour)
+ var ormNodeLivenesses []*orm.NodeLiveness
+ if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
+ Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
+ Order(fmt.Sprintf("created_at %s", "DESC")).Find(&ormNodeLivenesses).Error; err != nil {
+ return err
+ }
+
+ // update latest liveness
+ latestLiveness := ormNodeLivenesses[0]
+ rttMS := ping.Nanoseconds() / 1000000
+ if rttMS > 0 && uint64(rttMS) <= m.cfg.Policy.RequiredRttMS {
+ ormNode.Status = common.NodeHealthyStatus
+ } else if uint64(rttMS) > m.cfg.Policy.RequiredRttMS {
+ ormNode.Status = common.NodeCongestedStatus
+ }
+ if rttMS != 0 {
+ ormNode.AvgRttMS = sql.NullInt64{
+ Int64: (ormNode.AvgRttMS.Int64*int64(latestLiveness.PongTimes) + rttMS) / int64(latestLiveness.PongTimes+1),
+ Valid: true,
+ }
+ }
+ latestLiveness.PongTimes += 1
+ if peerInfo.Height != 0 {
+ latestLiveness.BestHeight = peerInfo.Height
+ ormNode.BestHeight = peerInfo.Height
+ }
+ if err := dbTx.Save(latestLiveness).Error; err != nil {
+ return err
+ }
+
+ // calc LatestDailyUptimeMinutes
+ total := 0 * time.Minute
+ ormNodeLivenesses[0].UpdatedAt = now
+ for _, ormNodeLiveness := range ormNodeLivenesses {
+ if ormNodeLiveness.CreatedAt.Before(yesterday) {
+ ormNodeLiveness.CreatedAt = yesterday
+ }
+
+ total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
+ }
+ ormNode.LatestDailyUptimeMinutes = uint64(total.Minutes())
+ ormNode.Alias = peerInfo.Moniker
+ ormNode.Xpub = peerInfo.ID
+ return dbTx.Save(ormNode).Error
+}