import (
"sort"
+ "sync"
"github.com/bytom/vapor/application/mov/common"
"github.com/bytom/vapor/application/mov/database"
type OrderBook struct {
movStore database.MovStore
// key of tradePair -> []order
- dbOrders map[string][]*common.Order
+ dbOrders *sync.Map
// key of tradePair -> iterator
orderIterators map[string]*database.OrderIterator
// key of tradePair -> []order
- arrivalAddOrders map[string][]*common.Order
+ arrivalAddOrders *sync.Map
// key of order -> order
- arrivalDelOrders map[string]*common.Order
+ arrivalDelOrders *sync.Map
}
// NewOrderBook create a new OrderBook object
func NewOrderBook(movStore database.MovStore, arrivalAddOrders, arrivalDelOrders []*common.Order) *OrderBook {
return &OrderBook{
movStore: movStore,
- dbOrders: make(map[string][]*common.Order),
+ dbOrders: &sync.Map{},
orderIterators: make(map[string]*database.OrderIterator),
arrivalAddOrders: arrangeArrivalAddOrders(arrivalAddOrders),
// AddOrder add the in memory temp order to order table
func (o *OrderBook) AddOrder(order *common.Order) error {
tradePairKey := order.TradePair().Key()
- orders := o.arrivalAddOrders[tradePairKey]
+ orders := o.getArrivalAddOrders(tradePairKey)
if len(orders) > 0 && order.Cmp(orders[len(orders)-1]) > 0 {
return errors.New("rate of order must less than the min order in order table")
}
- o.arrivalAddOrders[tradePairKey] = append(orders, order)
+ orders = append(orders, order)
+ o.arrivalAddOrders.Store(tradePairKey, orders)
return nil
}
// PeekOrder return the next lowest order of given trade pair
func (o *OrderBook) PeekOrder(tradePair *common.TradePair) *common.Order {
- if len(o.dbOrders[tradePair.Key()]) == 0 {
+ if len(o.getDBOrders(tradePair.Key())) == 0 {
o.extendDBOrders(tradePair)
}
var nextOrder *common.Order
- orders := o.dbOrders[tradePair.Key()]
+ orders := o.getDBOrders(tradePair.Key())
if len(orders) != 0 {
nextOrder = orders[len(orders)-1]
}
- if nextOrder != nil && o.arrivalDelOrders[nextOrder.Key()] != nil {
- o.dbOrders[tradePair.Key()] = orders[0 : len(orders)-1]
+ if nextOrder != nil && o.getArrivalDelOrders(nextOrder.Key()) != nil {
+ o.dbOrders.Store(tradePair.Key(), orders[0:len(orders)-1])
return o.PeekOrder(tradePair)
}
return
}
- orders := o.dbOrders[tradePair.Key()]
+ orders := o.getDBOrders(tradePair.Key())
if len(orders) != 0 && orders[len(orders)-1].Key() == order.Key() {
- o.dbOrders[tradePair.Key()] = orders[0 : len(orders)-1]
+ o.dbOrders.Store(tradePair.Key(), orders[0 : len(orders)-1])
}
- arrivalOrders := o.arrivalAddOrders[tradePair.Key()]
+ arrivalOrders := o.getArrivalAddOrders(tradePair.Key())
if len(arrivalOrders) != 0 && arrivalOrders[len(arrivalOrders)-1].Key() == order.Key() {
- o.arrivalAddOrders[tradePair.Key()] = arrivalOrders[0 : len(arrivalOrders)-1]
+ o.arrivalAddOrders.Store(tradePair.Key(), arrivalOrders[0 : len(arrivalOrders)-1])
}
}
return orders
}
-func arrangeArrivalAddOrders(orders []*common.Order) map[string][]*common.Order {
- arrivalAddOrderMap := make(map[string][]*common.Order)
+func (o *OrderBook) getDBOrders(tradePairKey string) []*common.Order {
+ if orders, ok := o.dbOrders.Load(tradePairKey); ok {
+ return orders.([]*common.Order)
+ }
+ return []*common.Order{}
+}
+
+func (o *OrderBook) getArrivalAddOrders(tradePairKey string) []*common.Order {
+ if orders, ok := o.arrivalAddOrders.Load(tradePairKey); ok {
+ return orders.([]*common.Order)
+ }
+ return []*common.Order{}
+}
+
+func (o *OrderBook) getArrivalDelOrders(orderKey string) *common.Order {
+ if order, ok := o.arrivalDelOrders.Load(orderKey); ok {
+ return order.(*common.Order)
+ }
+ return nil
+}
+
+func arrangeArrivalAddOrders(orders []*common.Order) *sync.Map {
+ orderMap := make(map[string][]*common.Order)
for _, order := range orders {
- arrivalAddOrderMap[order.TradePair().Key()] = append(arrivalAddOrderMap[order.TradePair().Key()], order)
+ orderMap[order.TradePair().Key()] = append(orderMap[order.TradePair().Key()], order)
}
- for _, orders := range arrivalAddOrderMap {
+ arrivalOrderMap := &sync.Map{}
+ for key, orders := range orderMap {
sort.Sort(sort.Reverse(common.OrderSlice(orders)))
+ arrivalOrderMap.Store(key, orders)
+
}
- return arrivalAddOrderMap
+ return arrivalOrderMap
}
-func arrangeArrivalDelOrders(orders []*common.Order) map[string]*common.Order {
- arrivalDelOrderMap := make(map[string]*common.Order)
+func arrangeArrivalDelOrders(orders []*common.Order) *sync.Map {
+ arrivalDelOrderMap := &sync.Map{}
for _, order := range orders {
- arrivalDelOrderMap[order.Key()] = order
+ arrivalDelOrderMap.Store(order.Key(), order)
}
return arrivalDelOrderMap
}
}
nextOrders := iterator.NextBatch()
+ orders := o.getDBOrders(tradePair.Key())
for i := len(nextOrders) - 1; i >= 0; i-- {
- o.dbOrders[tradePair.Key()] = append(o.dbOrders[tradePair.Key()], nextOrders[i])
+ orders = append(orders, nextOrders[i])
}
+ o.dbOrders.Store(tradePair.Key(), orders)
}
func (o *OrderBook) peekArrivalOrder(tradePair *common.TradePair) *common.Order {
- if arrivalAddOrders := o.arrivalAddOrders[tradePair.Key()]; len(arrivalAddOrders) > 0 {
+ if arrivalAddOrders := o.getArrivalAddOrders(tradePair.Key()); len(arrivalAddOrders) > 0 {
return arrivalAddOrders[len(arrivalAddOrders)-1]
}
return nil
package mov
import (
+ "math"
+ "runtime"
"sync"
"github.com/bytom/vapor/application/mov/common"
resultCh := make(chan *matchTxResult, 1)
closeCh := make(chan struct{})
+ var wg sync.WaitGroup
tradePairs := loadTradePairs(m.movStore)
matchEngine := match.NewEngine(orderBook, maxFeeRate, nodeProgram)
- var wg sync.WaitGroup
- for _, tradePair := range tradePairs {
+ workerNum := runtime.NumCPU()
+ jobs := splitMatchTxJobs(workerNum, tradePairs)
+
+ for _, job := range jobs {
wg.Add(1)
- go matchTxWorker(matchEngine, tradePair, processCh, closeCh, &wg)
+ go matchTxWorker(matchEngine, job, processCh, closeCh, &wg)
}
- go matchTxCollector(gasLeft, isTimeout, len(tradePairs), processCh, resultCh, closeCh)
+ go matchTxCollector(gasLeft, isTimeout, len(jobs), processCh, resultCh, closeCh)
wg.Wait()
result := <-resultCh
err error
}
-func matchTxCollector(gasLeft int64, isTimeout func() bool, lenOfTradePairs int, processCh <-chan interface{}, resultCh chan *matchTxResult, closeCh chan<- struct{}) {
+func matchTxCollector(gasLeft int64, isTimeout func() bool, numOfJobs int, processCh <-chan interface{}, resultCh chan *matchTxResult, closeCh chan<- struct{}) {
var matchedTxs []*types.Tx
var completed int
break
}
- if completed == lenOfTradePairs {
+ if completed == numOfJobs {
break
}
resultCh <- &matchTxResult{matchedTxs: matchedTxs}
}
-func matchTxWorker(engine *match.Engine, tradePair *common.TradePair, processCh chan<- interface{}, closeCh <-chan struct{}, wg *sync.WaitGroup) {
- for {
+func matchTxWorker(engine *match.Engine, tradePairs []*common.TradePair, processCh chan<- interface{}, closeCh <-chan struct{}, wg *sync.WaitGroup) {
+ dispatchData := func(data interface{}, err error) bool {
+ select {
+ case <-closeCh:
+ wg.Done()
+ return true
+ case processCh <- data:
+ if err != nil || data == nil {
+ wg.Done()
+ return true
+ }
+ return false
+ }
+ }
+
+ for _, tradePair := range tradePairs {
var data interface{}
var err error
- if engine.HasMatchedTx(tradePair, tradePair.Reverse()) {
+ for engine.HasMatchedTx(tradePair, tradePair.Reverse()) {
data, err = engine.NextMatchedTx(tradePair, tradePair.Reverse())
if err != nil {
data = err
}
- }
- select {
- case <-closeCh:
- wg.Done()
- return
- case processCh <- data:
- if data == nil || err != nil {
- wg.Done()
+ if done := dispatchData(data, err); done {
return
}
}
}
+ dispatchData(nil, nil)
}
func mergeOrders(addOrderMap, deleteOrderMap map[string]*common.Order) ([]*common.Order, []*common.Order) {
}
return addOrders, deleteOrders
}
+
+func splitMatchTxJobs(workerNum int, tradePairs []*common.TradePair) [][]*common.TradePair {
+ sizeOfJobs := len(tradePairs)
+ numOfJobsPerWorker := int(math.Ceil(float64(sizeOfJobs) / float64(workerNum)))
+ var splitJobs [][]*common.TradePair
+ for i := 0; i < workerNum; i++ {
+ begin := i * numOfJobsPerWorker
+ end := (i + 1) * numOfJobsPerWorker
+ if end > sizeOfJobs {
+ end = sizeOfJobs
+ }
+ splitJobs = append(splitJobs, tradePairs[begin:end])
+ if end >= sizeOfJobs {
+ break
+ }
+ }
+ return splitJobs
+}