10 "github.com/tendermint/abci/types"
11 cmn "github.com/tendermint/tmlibs/common"
14 // var maxNumberConnections = 2
16 type SocketServer struct {
24 conns map[int]net.Conn
31 func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
32 proto, addr := cmn.ProtocolAndAddress(protoAddr)
38 conns: make(map[int]net.Conn),
40 s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
44 func (s *SocketServer) OnStart() error {
45 if err := s.BaseService.OnStart(); err != nil {
48 ln, err := net.Listen(s.proto, s.addr)
53 go s.acceptConnectionsRoutine()
57 func (s *SocketServer) OnStop() {
58 s.BaseService.OnStop()
62 for id, conn := range s.conns {
69 func (s *SocketServer) addConn(conn net.Conn) int {
71 defer s.connsMtx.Unlock()
73 connID := s.nextConnID
75 s.conns[connID] = conn
80 // deletes conn even if close errs
81 func (s *SocketServer) rmConn(connID int, conn net.Conn) error {
83 defer s.connsMtx.Unlock()
85 delete(s.conns, connID)
89 func (s *SocketServer) acceptConnectionsRoutine() {
90 // semaphore := make(chan struct{}, maxNumberConnections)
93 // semaphore <- struct{}{}
95 // Accept a connection
96 s.Logger.Info("Waiting for new connection...")
97 conn, err := s.listener.Accept()
100 return // Ignore error from listener closing.
102 s.Logger.Error("Failed to accept connection: " + err.Error())
104 s.Logger.Info("Accepted a new connection")
107 connID := s.addConn(conn)
109 closeConn := make(chan error, 2) // Push to signal connection closed
110 responses := make(chan *types.Response, 1000) // A channel to buffer responses
112 // Read requests from conn and deal with them
113 go s.handleRequests(closeConn, conn, responses)
114 // Pull responses from 'responses' and write them to conn.
115 go s.handleResponses(closeConn, responses, conn)
118 // Wait until signal to close connection
119 errClose := <-closeConn
121 s.Logger.Error("Connection was closed by client")
122 } else if errClose != nil {
123 s.Logger.Error("Connection error", "error", errClose)
126 s.Logger.Error("Connection was closed.")
129 // Close the connection
130 err := s.rmConn(connID, conn)
132 s.Logger.Error("Error in closing connection", "error", err)
140 // Read requests from conn and deal with them
141 func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
143 var bufReader = bufio.NewReader(conn)
146 var req = &types.Request{}
147 err := types.ReadMessage(bufReader, req)
152 closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
158 s.handleRequest(req, responses)
163 func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
164 switch r := req.Value.(type) {
165 case *types.Request_Echo:
166 responses <- types.ToResponseEcho(r.Echo.Message)
167 case *types.Request_Flush:
168 responses <- types.ToResponseFlush()
169 case *types.Request_Info:
170 resInfo := s.app.Info(*r.Info)
171 responses <- types.ToResponseInfo(resInfo)
172 case *types.Request_SetOption:
174 logStr := s.app.SetOption(so.Key, so.Value)
175 responses <- types.ToResponseSetOption(logStr)
176 case *types.Request_DeliverTx:
177 res := s.app.DeliverTx(r.DeliverTx.Tx)
178 responses <- types.ToResponseDeliverTx(res.Code, res.Data, res.Log)
179 case *types.Request_CheckTx:
180 res := s.app.CheckTx(r.CheckTx.Tx)
181 responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log)
182 case *types.Request_Commit:
183 res := s.app.Commit()
184 responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
185 case *types.Request_Query:
186 resQuery := s.app.Query(*r.Query)
187 responses <- types.ToResponseQuery(resQuery)
188 case *types.Request_InitChain:
189 s.app.InitChain(*r.InitChain)
190 responses <- types.ToResponseInitChain()
191 case *types.Request_BeginBlock:
192 s.app.BeginBlock(*r.BeginBlock)
193 responses <- types.ToResponseBeginBlock()
194 case *types.Request_EndBlock:
195 resEndBlock := s.app.EndBlock(r.EndBlock.Height)
196 responses <- types.ToResponseEndBlock(resEndBlock)
198 responses <- types.ToResponseException("Unknown request")
202 // Pull responses from 'responses' and write them to conn.
203 func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
205 var bufWriter = bufio.NewWriter(conn)
207 var res = <-responses
208 err := types.WriteMessage(res, bufWriter)
210 closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
213 if _, ok := res.Value.(*types.Response_Flush); ok {
214 err = bufWriter.Flush()
216 closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())