9 wire "github.com/tendermint/go-wire"
10 "github.com/tendermint/tmlibs/flowrate"
12 "github.com/vapor/consensus"
13 "github.com/vapor/protocol/bc"
14 "github.com/vapor/protocol/bc/types"
15 "github.com/vapor/test/mock"
21 flag consensus.ServiceFlag
24 remoteNode *SyncManager
29 func NewP2PPeer(addr, id string, flag consensus.ServiceFlag) *P2PPeer {
32 ip: &net.IPAddr{IP: net.ParseIP(addr)},
34 msgCh: make(chan []byte),
39 func (p *P2PPeer) Addr() net.Addr {
43 func (p *P2PPeer) ID() string {
47 func (p *P2PPeer) ServiceFlag() consensus.ServiceFlag {
51 func (p *P2PPeer) SetConnection(srcPeer *P2PPeer, node *SyncManager) {
56 func (p *P2PPeer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
60 func (p *P2PPeer) TrySend(b byte, msg interface{}) bool {
61 msgBytes := wire.BinaryBytes(msg)
65 msgType, msg, _ := DecodeMessage(msgBytes)
66 p.remoteNode.processMsg(p.srcPeer, msgType, msg)
71 func (p *P2PPeer) setAsync(b bool) {
75 func (p *P2PPeer) postMan() {
76 for msgBytes := range p.msgCh {
77 msgType, msg, _ := DecodeMessage(msgBytes)
78 p.remoteNode.processMsg(p.srcPeer, msgType, msg)
84 func NewPeerSet() *PeerSet {
88 func (ps *PeerSet) AddBannedPeer(string) error { return nil }
89 func (ps *PeerSet) StopPeerGracefully(string) {}
92 nodes map[*SyncManager]P2PPeer
95 func NewNetWork() *NetWork {
96 return &NetWork{map[*SyncManager]P2PPeer{}}
99 func (nw *NetWork) Register(node *SyncManager, addr, id string, flag consensus.ServiceFlag) {
100 peer := NewP2PPeer(addr, id, flag)
101 nw.nodes[node] = *peer
104 func (nw *NetWork) HandsShake(nodeA, nodeB *SyncManager) (*P2PPeer, *P2PPeer, error) {
105 B2A, ok := nw.nodes[nodeA]
107 return nil, nil, errors.New("can't find nodeA's p2p peer on network")
109 A2B, ok := nw.nodes[nodeB]
111 return nil, nil, errors.New("can't find nodeB's p2p peer on network")
114 A2B.SetConnection(&B2A, nodeB)
115 B2A.SetConnection(&A2B, nodeA)
117 nodeA.handleStatusRequestMsg(&A2B)
118 nodeB.handleStatusRequestMsg(&B2A)
122 return &B2A, &A2B, nil
125 func mockBlocks(startBlock *types.Block, height uint64) []*types.Block {
126 blocks := []*types.Block{}
127 indexBlock := &types.Block{}
128 if startBlock == nil {
129 indexBlock = &types.Block{BlockHeader: types.BlockHeader{}}
130 blocks = append(blocks, indexBlock)
132 indexBlock = startBlock
135 for indexBlock.Height < height {
136 block := &types.Block{
137 BlockHeader: types.BlockHeader{
138 Height: indexBlock.Height + 1,
139 PreviousBlockHash: indexBlock.Hash(),
142 blocks = append(blocks, block)
148 func mockSync(blocks []*types.Block) *SyncManager {
149 chain := mock.NewChain()
150 peers := newPeerSet(NewPeerSet())
151 chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader)
152 for _, block := range blocks {
153 chain.SetBlockByHeight(block.Height, block)
156 genesis, _ := chain.GetHeaderByHeight(0)
158 genesisHash: genesis.Hash(),
160 blockKeeper: newBlockKeeper(chain, peers),
165 func mockTxs(txCount int) ([]*types.Tx, []*bc.Tx) {
168 for i := 0; i < txCount; i++ {
169 trueProg := mockControlProgram(60)
170 assetID := bc.ComputeAssetID(trueProg, 1, &bc.EmptyStringHash)
171 now := []byte(time.Now().String())
172 issuanceInp := types.NewIssuanceInput(now, 1, trueProg, nil, nil)
173 tx := types.NewTx(types.TxData{
175 Inputs: []*types.TxInput{issuanceInp},
176 Outputs: []*types.TxOutput{types.NewTxOutput(assetID, 1, trueProg)},
178 txs = append(txs, tx)
179 bcTxs = append(bcTxs, tx.Tx)
184 func mockControlProgram(length int) []byte {
186 for i := 0; i < length; i++ {
187 cp = append(cp, byte(rand.Intn(1<<8)))