OSDN Git Service

opt mov performance
authorshenao78 <shenao.78@163.com>
Tue, 4 Feb 2020 13:44:10 +0000 (21:44 +0800)
committershenao78 <shenao.78@163.com>
Tue, 4 Feb 2020 13:44:10 +0000 (21:44 +0800)
application/mov/match/order_book.go
application/mov/mov_core.go

index ec4e5dc..ce78803 100644 (file)
@@ -2,6 +2,7 @@ package match
 
 import (
        "sort"
+       "sync"
 
        "github.com/bytom/vapor/application/mov/common"
        "github.com/bytom/vapor/application/mov/database"
@@ -12,21 +13,21 @@ import (
 type OrderBook struct {
        movStore database.MovStore
        // key of tradePair -> []order
-       dbOrders map[string][]*common.Order
+       dbOrders *sync.Map
        // key of tradePair -> iterator
        orderIterators map[string]*database.OrderIterator
 
        // key of tradePair -> []order
-       arrivalAddOrders map[string][]*common.Order
+       arrivalAddOrders *sync.Map
        // key of order -> order
-       arrivalDelOrders map[string]*common.Order
+       arrivalDelOrders *sync.Map
 }
 
 // NewOrderBook create a new OrderBook object
 func NewOrderBook(movStore database.MovStore, arrivalAddOrders, arrivalDelOrders []*common.Order) *OrderBook {
        return &OrderBook{
                movStore:       movStore,
-               dbOrders:       make(map[string][]*common.Order),
+               dbOrders:       &sync.Map{},
                orderIterators: make(map[string]*database.OrderIterator),
 
                arrivalAddOrders: arrangeArrivalAddOrders(arrivalAddOrders),
@@ -37,29 +38,30 @@ func NewOrderBook(movStore database.MovStore, arrivalAddOrders, arrivalDelOrders
 // AddOrder add the in memory temp order to order table
 func (o *OrderBook) AddOrder(order *common.Order) error {
        tradePairKey := order.TradePair().Key()
-       orders := o.arrivalAddOrders[tradePairKey]
+       orders := o.getArrivalAddOrders(tradePairKey)
        if len(orders) > 0 && order.Cmp(orders[len(orders)-1]) > 0 {
                return errors.New("rate of order must less than the min order in order table")
        }
 
-       o.arrivalAddOrders[tradePairKey] = append(orders, order)
+       orders = append(orders, order)
+       o.arrivalAddOrders.Store(tradePairKey, orders)
        return nil
 }
 
 // PeekOrder return the next lowest order of given trade pair
 func (o *OrderBook) PeekOrder(tradePair *common.TradePair) *common.Order {
-       if len(o.dbOrders[tradePair.Key()]) == 0 {
+       if len(o.getDBOrders(tradePair.Key())) == 0 {
                o.extendDBOrders(tradePair)
        }
 
        var nextOrder *common.Order
-       orders := o.dbOrders[tradePair.Key()]
+       orders := o.getDBOrders(tradePair.Key())
        if len(orders) != 0 {
                nextOrder = orders[len(orders)-1]
        }
 
-       if nextOrder != nil && o.arrivalDelOrders[nextOrder.Key()] != nil {
-               o.dbOrders[tradePair.Key()] = orders[0 : len(orders)-1]
+       if nextOrder != nil && o.getArrivalDelOrders(nextOrder.Key()) != nil {
+               o.dbOrders.Store(tradePair.Key(), orders[0:len(orders)-1])
                return o.PeekOrder(tradePair)
        }
 
@@ -91,14 +93,14 @@ func (o *OrderBook) PopOrder(tradePair *common.TradePair) {
                return
        }
 
-       orders := o.dbOrders[tradePair.Key()]
+       orders := o.getDBOrders(tradePair.Key())
        if len(orders) != 0 && orders[len(orders)-1].Key() == order.Key() {
-               o.dbOrders[tradePair.Key()] = orders[0 : len(orders)-1]
+               o.dbOrders.Store(tradePair.Key(), orders[0 : len(orders)-1])
        }
 
-       arrivalOrders := o.arrivalAddOrders[tradePair.Key()]
+       arrivalOrders := o.getArrivalAddOrders(tradePair.Key())
        if len(arrivalOrders) != 0 && arrivalOrders[len(arrivalOrders)-1].Key() == order.Key() {
-               o.arrivalAddOrders[tradePair.Key()] = arrivalOrders[0 : len(arrivalOrders)-1]
+               o.arrivalAddOrders.Store(tradePair.Key(), arrivalOrders[0 : len(arrivalOrders)-1])
        }
 }
 
@@ -111,22 +113,46 @@ func (o *OrderBook) PopOrders(tradePairs []*common.TradePair) []*common.Order {
        return orders
 }
 
-func arrangeArrivalAddOrders(orders []*common.Order) map[string][]*common.Order {
-       arrivalAddOrderMap := make(map[string][]*common.Order)
+func (o *OrderBook) getDBOrders(tradePairKey string) []*common.Order {
+       if orders, ok := o.dbOrders.Load(tradePairKey); ok {
+               return orders.([]*common.Order)
+       }
+       return []*common.Order{}
+}
+
+func (o *OrderBook) getArrivalAddOrders(tradePairKey string) []*common.Order {
+       if orders, ok := o.arrivalAddOrders.Load(tradePairKey); ok {
+               return orders.([]*common.Order)
+       }
+       return []*common.Order{}
+}
+
+func (o *OrderBook) getArrivalDelOrders(orderKey string) *common.Order {
+       if order, ok := o.arrivalDelOrders.Load(orderKey); ok {
+               return order.(*common.Order)
+       }
+       return nil
+}
+
+func arrangeArrivalAddOrders(orders []*common.Order) *sync.Map {
+       orderMap := make(map[string][]*common.Order)
        for _, order := range orders {
-               arrivalAddOrderMap[order.TradePair().Key()] = append(arrivalAddOrderMap[order.TradePair().Key()], order)
+               orderMap[order.TradePair().Key()] = append(orderMap[order.TradePair().Key()], order)
        }
 
-       for _, orders := range arrivalAddOrderMap {
+       arrivalOrderMap := &sync.Map{}
+       for key, orders := range orderMap {
                sort.Sort(sort.Reverse(common.OrderSlice(orders)))
+               arrivalOrderMap.Store(key, orders)
+
        }
-       return arrivalAddOrderMap
+       return arrivalOrderMap
 }
 
-func arrangeArrivalDelOrders(orders []*common.Order) map[string]*common.Order {
-       arrivalDelOrderMap := make(map[string]*common.Order)
+func arrangeArrivalDelOrders(orders []*common.Order) *sync.Map {
+       arrivalDelOrderMap := &sync.Map{}
        for _, order := range orders {
-               arrivalDelOrderMap[order.Key()] = order
+               arrivalDelOrderMap.Store(order.Key(), order)
        }
        return arrivalDelOrderMap
 }
@@ -139,13 +165,15 @@ func (o *OrderBook) extendDBOrders(tradePair *common.TradePair) {
        }
 
        nextOrders := iterator.NextBatch()
+       orders := o.getDBOrders(tradePair.Key())
        for i := len(nextOrders) - 1; i >= 0; i-- {
-               o.dbOrders[tradePair.Key()] = append(o.dbOrders[tradePair.Key()], nextOrders[i])
+               orders = append(orders, nextOrders[i])
        }
+       o.dbOrders.Store(tradePair.Key(), orders)
 }
 
 func (o *OrderBook) peekArrivalOrder(tradePair *common.TradePair) *common.Order {
-       if arrivalAddOrders := o.arrivalAddOrders[tradePair.Key()]; len(arrivalAddOrders) > 0 {
+       if arrivalAddOrders := o.getArrivalAddOrders(tradePair.Key()); len(arrivalAddOrders) > 0 {
                return arrivalAddOrders[len(arrivalAddOrders)-1]
        }
        return nil
index f242e7f..b550a24 100644 (file)
@@ -1,6 +1,8 @@
 package mov
 
 import (
+       "math"
+       "runtime"
        "sync"
 
        "github.com/bytom/vapor/application/mov/common"
@@ -86,15 +88,18 @@ func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, block
        resultCh := make(chan *matchTxResult, 1)
        closeCh := make(chan struct{})
 
+       var wg sync.WaitGroup
        tradePairs := loadTradePairs(m.movStore)
        matchEngine := match.NewEngine(orderBook, maxFeeRate, nodeProgram)
 
-       var wg sync.WaitGroup
-       for _, tradePair := range tradePairs {
+       workerNum := runtime.NumCPU()
+       jobs := splitMatchTxJobs(workerNum, tradePairs)
+
+       for _, job := range jobs {
                wg.Add(1)
-               go matchTxWorker(matchEngine, tradePair, processCh, closeCh, &wg)
+               go matchTxWorker(matchEngine, job, processCh, closeCh, &wg)
        }
-       go matchTxCollector(gasLeft, isTimeout, len(tradePairs), processCh, resultCh, closeCh)
+       go matchTxCollector(gasLeft, isTimeout, len(jobs), processCh, resultCh, closeCh)
 
        wg.Wait()
        result := <-resultCh
@@ -476,7 +481,7 @@ type matchTxResult struct {
        err        error
 }
 
-func matchTxCollector(gasLeft int64, isTimeout func() bool, lenOfTradePairs int, processCh <-chan interface{}, resultCh chan *matchTxResult, closeCh chan<- struct{}) {
+func matchTxCollector(gasLeft int64, isTimeout func() bool, numOfJobs int, processCh <-chan interface{}, resultCh chan *matchTxResult, closeCh chan<- struct{}) {
        var matchedTxs []*types.Tx
        var completed int
 
@@ -487,7 +492,7 @@ Loop:
                        break
                }
 
-               if completed == lenOfTradePairs {
+               if completed == numOfJobs {
                        break
                }
 
@@ -515,28 +520,36 @@ Loop:
        resultCh <- &matchTxResult{matchedTxs: matchedTxs}
 }
 
-func matchTxWorker(engine *match.Engine, tradePair *common.TradePair, processCh chan<- interface{}, closeCh <-chan struct{}, wg *sync.WaitGroup) {
-       for {
+func matchTxWorker(engine *match.Engine, tradePairs []*common.TradePair, processCh chan<- interface{}, closeCh <-chan struct{}, wg *sync.WaitGroup) {
+       dispatchData := func(data interface{}, err error) bool {
+               select {
+               case <-closeCh:
+                       wg.Done()
+                       return true
+               case processCh <- data:
+                       if err != nil || data == nil {
+                               wg.Done()
+                               return true
+                       }
+                       return false
+               }
+       }
+
+       for _, tradePair := range tradePairs {
                var data interface{}
                var err error
-               if engine.HasMatchedTx(tradePair, tradePair.Reverse()) {
+               for engine.HasMatchedTx(tradePair, tradePair.Reverse()) {
                        data, err = engine.NextMatchedTx(tradePair, tradePair.Reverse())
                        if err != nil {
                                data = err
                        }
-               }
 
-               select {
-               case <-closeCh:
-                       wg.Done()
-                       return
-               case processCh <- data:
-                       if data == nil || err != nil {
-                               wg.Done()
+                       if done := dispatchData(data, err); done {
                                return
                        }
                }
        }
+       dispatchData(nil, nil)
 }
 
 func mergeOrders(addOrderMap, deleteOrderMap map[string]*common.Order) ([]*common.Order, []*common.Order) {
@@ -554,3 +567,21 @@ func mergeOrders(addOrderMap, deleteOrderMap map[string]*common.Order) ([]*commo
        }
        return addOrders, deleteOrders
 }
+
+func splitMatchTxJobs(workerNum int, tradePairs []*common.TradePair) [][]*common.TradePair {
+       sizeOfJobs := len(tradePairs)
+       numOfJobsPerWorker := int(math.Ceil(float64(sizeOfJobs) / float64(workerNum)))
+       var splitJobs [][]*common.TradePair
+       for i := 0; i < workerNum; i++ {
+               begin := i * numOfJobsPerWorker
+               end := (i + 1) * numOfJobsPerWorker
+               if end > sizeOfJobs {
+                       end = sizeOfJobs
+               }
+               splitJobs = append(splitJobs, tradePairs[begin:end])
+               if end >= sizeOfJobs {
+                       break
+               }
+       }
+       return splitJobs
+}