OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / tendermint / abci / server / socket_server.go
1 package server
2
3 import (
4         "bufio"
5         "fmt"
6         "io"
7         "net"
8         "sync"
9
10         "github.com/tendermint/abci/types"
11         cmn "github.com/tendermint/tmlibs/common"
12 )
13
14 // var maxNumberConnections = 2
15
16 type SocketServer struct {
17         cmn.BaseService
18
19         proto    string
20         addr     string
21         listener net.Listener
22
23         connsMtx   sync.Mutex
24         conns      map[int]net.Conn
25         nextConnID int
26
27         appMtx sync.Mutex
28         app    types.Application
29 }
30
31 func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
32         proto, addr := cmn.ProtocolAndAddress(protoAddr)
33         s := &SocketServer{
34                 proto:    proto,
35                 addr:     addr,
36                 listener: nil,
37                 app:      app,
38                 conns:    make(map[int]net.Conn),
39         }
40         s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
41         return s
42 }
43
44 func (s *SocketServer) OnStart() error {
45         if err := s.BaseService.OnStart(); err != nil {
46                 return err
47         }
48         ln, err := net.Listen(s.proto, s.addr)
49         if err != nil {
50                 return err
51         }
52         s.listener = ln
53         go s.acceptConnectionsRoutine()
54         return nil
55 }
56
57 func (s *SocketServer) OnStop() {
58         s.BaseService.OnStop()
59         s.listener.Close()
60
61         s.connsMtx.Lock()
62         for id, conn := range s.conns {
63                 delete(s.conns, id)
64                 conn.Close()
65         }
66         s.connsMtx.Unlock()
67 }
68
69 func (s *SocketServer) addConn(conn net.Conn) int {
70         s.connsMtx.Lock()
71         defer s.connsMtx.Unlock()
72
73         connID := s.nextConnID
74         s.nextConnID++
75         s.conns[connID] = conn
76
77         return connID
78 }
79
80 // deletes conn even if close errs
81 func (s *SocketServer) rmConn(connID int, conn net.Conn) error {
82         s.connsMtx.Lock()
83         defer s.connsMtx.Unlock()
84
85         delete(s.conns, connID)
86         return conn.Close()
87 }
88
89 func (s *SocketServer) acceptConnectionsRoutine() {
90         // semaphore := make(chan struct{}, maxNumberConnections)
91
92         for {
93                 // semaphore <- struct{}{}
94
95                 // Accept a connection
96                 s.Logger.Info("Waiting for new connection...")
97                 conn, err := s.listener.Accept()
98                 if err != nil {
99                         if !s.IsRunning() {
100                                 return // Ignore error from listener closing.
101                         }
102                         s.Logger.Error("Failed to accept connection: " + err.Error())
103                 } else {
104                         s.Logger.Info("Accepted a new connection")
105                 }
106
107                 connID := s.addConn(conn)
108
109                 closeConn := make(chan error, 2)              // Push to signal connection closed
110                 responses := make(chan *types.Response, 1000) // A channel to buffer responses
111
112                 // Read requests from conn and deal with them
113                 go s.handleRequests(closeConn, conn, responses)
114                 // Pull responses from 'responses' and write them to conn.
115                 go s.handleResponses(closeConn, responses, conn)
116
117                 go func() {
118                         // Wait until signal to close connection
119                         errClose := <-closeConn
120                         if err == io.EOF {
121                                 s.Logger.Error("Connection was closed by client")
122                         } else if errClose != nil {
123                                 s.Logger.Error("Connection error", "error", errClose)
124                         } else {
125                                 // never happens
126                                 s.Logger.Error("Connection was closed.")
127                         }
128
129                         // Close the connection
130                         err := s.rmConn(connID, conn)
131                         if err != nil {
132                                 s.Logger.Error("Error in closing connection", "error", err)
133                         }
134
135                         // <-semaphore
136                 }()
137         }
138 }
139
140 // Read requests from conn and deal with them
141 func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
142         var count int
143         var bufReader = bufio.NewReader(conn)
144         for {
145
146                 var req = &types.Request{}
147                 err := types.ReadMessage(bufReader, req)
148                 if err != nil {
149                         if err == io.EOF {
150                                 closeConn <- err
151                         } else {
152                                 closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
153                         }
154                         return
155                 }
156                 s.appMtx.Lock()
157                 count++
158                 s.handleRequest(req, responses)
159                 s.appMtx.Unlock()
160         }
161 }
162
163 func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
164         switch r := req.Value.(type) {
165         case *types.Request_Echo:
166                 responses <- types.ToResponseEcho(r.Echo.Message)
167         case *types.Request_Flush:
168                 responses <- types.ToResponseFlush()
169         case *types.Request_Info:
170                 resInfo := s.app.Info(*r.Info)
171                 responses <- types.ToResponseInfo(resInfo)
172         case *types.Request_SetOption:
173                 so := r.SetOption
174                 logStr := s.app.SetOption(so.Key, so.Value)
175                 responses <- types.ToResponseSetOption(logStr)
176         case *types.Request_DeliverTx:
177                 res := s.app.DeliverTx(r.DeliverTx.Tx)
178                 responses <- types.ToResponseDeliverTx(res.Code, res.Data, res.Log)
179         case *types.Request_CheckTx:
180                 res := s.app.CheckTx(r.CheckTx.Tx)
181                 responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log)
182         case *types.Request_Commit:
183                 res := s.app.Commit()
184                 responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
185         case *types.Request_Query:
186                 resQuery := s.app.Query(*r.Query)
187                 responses <- types.ToResponseQuery(resQuery)
188         case *types.Request_InitChain:
189                 s.app.InitChain(*r.InitChain)
190                 responses <- types.ToResponseInitChain()
191         case *types.Request_BeginBlock:
192                 s.app.BeginBlock(*r.BeginBlock)
193                 responses <- types.ToResponseBeginBlock()
194         case *types.Request_EndBlock:
195                 resEndBlock := s.app.EndBlock(r.EndBlock.Height)
196                 responses <- types.ToResponseEndBlock(resEndBlock)
197         default:
198                 responses <- types.ToResponseException("Unknown request")
199         }
200 }
201
202 // Pull responses from 'responses' and write them to conn.
203 func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
204         var count int
205         var bufWriter = bufio.NewWriter(conn)
206         for {
207                 var res = <-responses
208                 err := types.WriteMessage(res, bufWriter)
209                 if err != nil {
210                         closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
211                         return
212                 }
213                 if _, ok := res.Value.(*types.Response_Flush); ok {
214                         err = bufWriter.Flush()
215                         if err != nil {
216                                 closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
217                                 return
218                         }
219                 }
220                 count++
221         }
222 }