8 wire "github.com/tendermint/go-wire"
9 "github.com/tendermint/tmlibs/flowrate"
11 "github.com/vapor/consensus"
12 "github.com/vapor/netsync/peers"
13 "github.com/vapor/protocol/bc"
14 "github.com/vapor/protocol/bc/types"
15 "github.com/vapor/test/mock"
21 flag consensus.ServiceFlag
29 func NewP2PPeer(addr, id string, flag consensus.ServiceFlag) *P2PPeer {
32 ip: &net.IPAddr{IP: net.ParseIP(addr)},
34 msgCh: make(chan []byte),
39 func (p *P2PPeer) Addr() net.Addr {
43 func (p *P2PPeer) ID() string {
47 func (p *P2PPeer) IsLAN() bool {
51 func (p *P2PPeer) ServiceFlag() consensus.ServiceFlag {
55 func (p *P2PPeer) SetConnection(srcPeer *P2PPeer, node *Manager) {
60 func (p *P2PPeer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
64 func (p *P2PPeer) TrySend(b byte, msg interface{}) bool {
65 msgBytes := wire.BinaryBytes(msg)
69 msgType, msg, _ := decodeMessage(msgBytes)
70 p.remoteNode.processMsg(p.srcPeer, msgType, msg)
75 func (p *P2PPeer) setAsync(b bool) {
79 func (p *P2PPeer) postMan() {
80 for msgBytes := range p.msgCh {
81 msgType, msg, _ := decodeMessage(msgBytes)
82 p.remoteNode.processMsg(p.srcPeer, msgType, msg)
88 func NewPeerSet() *PeerSet {
92 func (ps *PeerSet) AddBannedPeer(string) error { return nil }
93 func (ps *PeerSet) StopPeerGracefully(string) {}
96 nodes map[*Manager]P2PPeer
99 func NewNetWork() *NetWork {
100 return &NetWork{map[*Manager]P2PPeer{}}
103 func (nw *NetWork) Register(node *Manager, addr, id string, flag consensus.ServiceFlag) {
104 peer := NewP2PPeer(addr, id, flag)
105 nw.nodes[node] = *peer
108 func (nw *NetWork) HandsShake(nodeA, nodeB *Manager) (*P2PPeer, *P2PPeer, error) {
109 B2A, ok := nw.nodes[nodeA]
111 return nil, nil, errors.New("can't find nodeA's p2p peer on network")
113 A2B, ok := nw.nodes[nodeB]
115 return nil, nil, errors.New("can't find nodeB's p2p peer on network")
118 A2B.SetConnection(&B2A, nodeB)
119 B2A.SetConnection(&A2B, nodeA)
126 return &B2A, &A2B, nil
129 func mockBlocks(startBlock *types.Block, height uint64) []*types.Block {
130 blocks := []*types.Block{}
131 indexBlock := &types.Block{}
132 if startBlock == nil {
133 indexBlock = &types.Block{BlockHeader: types.BlockHeader{Version: uint64(rand.Uint32())}}
134 blocks = append(blocks, indexBlock)
136 indexBlock = startBlock
139 for indexBlock.Height < height {
140 block := &types.Block{
141 BlockHeader: types.BlockHeader{
142 Height: indexBlock.Height + 1,
143 PreviousBlockHash: indexBlock.Hash(),
144 Version: uint64(rand.Uint32()),
147 blocks = append(blocks, block)
153 func mockSync(blocks []*types.Block, mempool *mock.Mempool) *Manager {
154 chain := mock.NewChain(mempool)
155 peers := peers.NewPeerSet(NewPeerSet())
156 chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader)
157 for _, block := range blocks {
158 chain.SetBlockByHeight(block.Height, block)
163 blockKeeper: newBlockKeeper(chain, peers),
166 txSyncCh: make(chan *txSyncMsg),
170 func mockTxs(txCount int) ([]*types.Tx, []*bc.Tx) {
173 trueProg := mockControlProgram(60)
174 assetID := bc.AssetID{V0: 9999}
175 for i := uint64(0); i < uint64(txCount); i++ {
176 tx := types.NewTx(types.TxData{
178 Inputs: []*types.TxInput{types.NewSpendInput(nil, bc.Hash{V0: i + 1}, assetID, i, i, trueProg)},
179 Outputs: []*types.TxOutput{types.NewIntraChainOutput(assetID, 1, trueProg)},
181 txs = append(txs, tx)
182 bcTxs = append(bcTxs, tx.Tx)
187 func mockControlProgram(length int) []byte {
189 for i := 0; i < length; i++ {
190 cp = append(cp, byte(rand.Intn(1<<8)))