14 // The adapter class for Select object's response.
15 // The response consists of frames. Each frame has the following format:
17 // Type | Payload Length | Header Checksum | Payload | Payload Checksum
19 // |<4-->| <--4 bytes------><---4 bytes-------><-n/a-----><--4 bytes--------->
20 // And we have three kind of frames.
23 // Payload: Offset | Data
28 // Payload: Offset (8-bytes)
32 // Payload: Offset | total scanned bytes | http status code | error message
33 // <-- 8bytes--><-----8 bytes--------><---4 bytes-------><---variabe--->
35 // SelectObjectResponse defines HTTP response from OSS SelectObject
36 type SelectObjectResponse struct {
40 Frame SelectObjectResult
44 WriterForCheckCrc32 hash.Hash32
48 func (sr *SelectObjectResponse) Read(p []byte) (n int, err error) {
49 n, err = sr.readFrames(p)
53 // Close http reponse body
54 func (sr *SelectObjectResponse) Close() error {
55 return sr.Body.Close()
58 // PostSelectResult is the request of SelectObject
59 type PostSelectResult struct {
60 Response *SelectObjectResponse
63 // readFrames is read Frame
64 func (sr *SelectObjectResponse) readFrames(p []byte) (int, error) {
68 if sr.Frame.OutputRawData == true {
69 nn, err = sr.Body.Read(p)
78 // if this Frame is Readed, then not reading Header
79 if sr.Frame.OpenLine != true {
80 err = sr.analysisHeader()
86 if sr.Frame.FrameType == DataFrameType {
87 n, err := sr.analysisData(p[nn:])
93 // if this Frame is readed all data, then empty the Frame to read it with next frame
94 if sr.Frame.ConsumedBytesLength == sr.Frame.PayloadLength-8 {
95 checkValid, err = sr.checkPayloadSum()
96 if err != nil || !checkValid {
97 return nn, fmt.Errorf("%s", err.Error())
105 } else if sr.Frame.FrameType == ContinuousFrameType {
106 checkValid, err = sr.checkPayloadSum()
107 if err != nil || !checkValid {
108 return nn, fmt.Errorf("%s", err.Error())
110 } else if sr.Frame.FrameType == EndFrameType {
111 err = sr.analysisEndFrame()
115 checkValid, err = sr.checkPayloadSum()
120 } else if sr.Frame.FrameType == MetaEndFrameCSVType {
121 err = sr.analysisMetaEndFrameCSV()
125 checkValid, err = sr.checkPayloadSum()
130 } else if sr.Frame.FrameType == MetaEndFrameJSONType {
131 err = sr.analysisMetaEndFrameJSON()
135 checkValid, err = sr.checkPayloadSum()
145 type chanReadIO struct {
150 func (sr *SelectObjectResponse) readLen(p []byte, timeOut time.Duration) (int, error) {
152 ch := make(chan chanReadIO, 1)
155 var needReadLength int
156 readChan := chanReadIO{}
157 needReadLength = len(p)
159 n, err := r.Read(p[readChan.readLen:needReadLength])
160 readChan.readLen += n
167 if readChan.readLen == needReadLength {
175 case <-time.After(time.Second * timeOut):
176 return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", sr.Headers.Get(HTTPHeaderOssRequestID), timeOut, len(p))
178 return result.readLen, result.err
182 // analysisHeader is reading selectObject response body's header
183 func (sr *SelectObjectResponse) analysisHeader() error {
184 headFrameByte := make([]byte, 20)
185 _, err := sr.readLen(headFrameByte, time.Duration(sr.ReadTimeOut))
187 return fmt.Errorf("requestId: %s, Read response frame header failure,err:%s", sr.Headers.Get(HTTPHeaderOssRequestID), err.Error())
190 frameTypeByte := headFrameByte[0:4]
191 sr.Frame.Version = frameTypeByte[0]
193 bytesToInt(frameTypeByte, &sr.Frame.FrameType)
195 if sr.Frame.FrameType != DataFrameType && sr.Frame.FrameType != ContinuousFrameType &&
196 sr.Frame.FrameType != EndFrameType && sr.Frame.FrameType != MetaEndFrameCSVType && sr.Frame.FrameType != MetaEndFrameJSONType {
197 return fmt.Errorf("requestId: %s, Unexpected frame type: %d", sr.Headers.Get(HTTPHeaderOssRequestID), sr.Frame.FrameType)
200 payloadLengthByte := headFrameByte[4:8]
201 bytesToInt(payloadLengthByte, &sr.Frame.PayloadLength)
202 headCheckSumByte := headFrameByte[8:12]
203 bytesToInt(headCheckSumByte, &sr.Frame.HeaderCheckSum)
204 byteOffset := headFrameByte[12:20]
205 bytesToInt(byteOffset, &sr.Frame.Offset)
206 sr.Frame.OpenLine = true
208 err = sr.writerCheckCrc32(byteOffset)
212 // analysisData is reading the DataFrameType data of selectObject response body
213 func (sr *SelectObjectResponse) analysisData(p []byte) (int, error) {
214 var needReadLength int32
215 lenP := int32(len(p))
216 restByteLength := sr.Frame.PayloadLength - 8 - sr.Frame.ConsumedBytesLength
217 if lenP <= restByteLength {
218 needReadLength = lenP
220 needReadLength = restByteLength
222 n, err := sr.readLen(p[:needReadLength], time.Duration(sr.ReadTimeOut))
224 return n, fmt.Errorf("read frame data error,%s", err.Error())
226 sr.Frame.ConsumedBytesLength += int32(n)
227 err = sr.writerCheckCrc32(p[:n])
231 // analysisEndFrame is reading the EndFrameType data of selectObject response body
232 func (sr *SelectObjectResponse) analysisEndFrame() error {
234 payLoadBytes := make([]byte, sr.Frame.PayloadLength-8)
235 _, err := sr.readLen(payLoadBytes, time.Duration(sr.ReadTimeOut))
237 return fmt.Errorf("read end frame error:%s", err.Error())
239 bytesToInt(payLoadBytes[0:8], &eF.TotalScanned)
240 bytesToInt(payLoadBytes[8:12], &eF.HTTPStatusCode)
241 errMsgLength := sr.Frame.PayloadLength - 20
242 eF.ErrorMsg = string(payLoadBytes[12 : errMsgLength+12])
243 sr.Frame.EndFrame.TotalScanned = eF.TotalScanned
244 sr.Frame.EndFrame.HTTPStatusCode = eF.HTTPStatusCode
245 sr.Frame.EndFrame.ErrorMsg = eF.ErrorMsg
246 err = sr.writerCheckCrc32(payLoadBytes)
250 // analysisMetaEndFrameCSV is reading the MetaEndFrameCSVType data of selectObject response body
251 func (sr *SelectObjectResponse) analysisMetaEndFrameCSV() error {
252 var mCF MetaEndFrameCSV
253 payLoadBytes := make([]byte, sr.Frame.PayloadLength-8)
254 _, err := sr.readLen(payLoadBytes, time.Duration(sr.ReadTimeOut))
256 return fmt.Errorf("read meta end csv frame error:%s", err.Error())
259 bytesToInt(payLoadBytes[0:8], &mCF.TotalScanned)
260 bytesToInt(payLoadBytes[8:12], &mCF.Status)
261 bytesToInt(payLoadBytes[12:16], &mCF.SplitsCount)
262 bytesToInt(payLoadBytes[16:24], &mCF.RowsCount)
263 bytesToInt(payLoadBytes[24:28], &mCF.ColumnsCount)
264 errMsgLength := sr.Frame.PayloadLength - 36
265 mCF.ErrorMsg = string(payLoadBytes[28 : errMsgLength+28])
266 sr.Frame.MetaEndFrameCSV.ErrorMsg = mCF.ErrorMsg
267 sr.Frame.MetaEndFrameCSV.TotalScanned = mCF.TotalScanned
268 sr.Frame.MetaEndFrameCSV.Status = mCF.Status
269 sr.Frame.MetaEndFrameCSV.SplitsCount = mCF.SplitsCount
270 sr.Frame.MetaEndFrameCSV.RowsCount = mCF.RowsCount
271 sr.Frame.MetaEndFrameCSV.ColumnsCount = mCF.ColumnsCount
272 err = sr.writerCheckCrc32(payLoadBytes)
276 // analysisMetaEndFrameJSON is reading the MetaEndFrameJSONType data of selectObject response body
277 func (sr *SelectObjectResponse) analysisMetaEndFrameJSON() error {
278 var mJF MetaEndFrameJSON
279 payLoadBytes := make([]byte, sr.Frame.PayloadLength-8)
280 _, err := sr.readLen(payLoadBytes, time.Duration(sr.ReadTimeOut))
282 return fmt.Errorf("read meta end json frame error:%s", err.Error())
285 bytesToInt(payLoadBytes[0:8], &mJF.TotalScanned)
286 bytesToInt(payLoadBytes[8:12], &mJF.Status)
287 bytesToInt(payLoadBytes[12:16], &mJF.SplitsCount)
288 bytesToInt(payLoadBytes[16:24], &mJF.RowsCount)
289 errMsgLength := sr.Frame.PayloadLength - 32
290 mJF.ErrorMsg = string(payLoadBytes[24 : errMsgLength+24])
291 sr.Frame.MetaEndFrameJSON.ErrorMsg = mJF.ErrorMsg
292 sr.Frame.MetaEndFrameJSON.TotalScanned = mJF.TotalScanned
293 sr.Frame.MetaEndFrameJSON.Status = mJF.Status
294 sr.Frame.MetaEndFrameJSON.SplitsCount = mJF.SplitsCount
295 sr.Frame.MetaEndFrameJSON.RowsCount = mJF.RowsCount
297 err = sr.writerCheckCrc32(payLoadBytes)
301 func (sr *SelectObjectResponse) checkPayloadSum() (bool, error) {
302 payLoadChecksumByte := make([]byte, 4)
303 n, err := sr.readLen(payLoadChecksumByte, time.Duration(sr.ReadTimeOut))
305 bytesToInt(payLoadChecksumByte, &sr.Frame.PayloadChecksum)
306 sr.ServerCRC32 = sr.Frame.PayloadChecksum
307 sr.ClientCRC32 = sr.WriterForCheckCrc32.Sum32()
308 if sr.Frame.EnablePayloadCrc == true && sr.ServerCRC32 != 0 && sr.ServerCRC32 != sr.ClientCRC32 {
309 return false, fmt.Errorf("RequestId: %s, Unexpected frame type: %d, client %d but server %d",
310 sr.Headers.Get(HTTPHeaderOssRequestID), sr.Frame.FrameType, sr.ClientCRC32, sr.ServerCRC32)
314 return false, fmt.Errorf("RequestId:%s, read checksum error:%s", sr.Headers.Get(HTTPHeaderOssRequestID), err.Error())
317 func (sr *SelectObjectResponse) writerCheckCrc32(p []byte) (err error) {
319 if sr.Frame.EnablePayloadCrc == true {
320 _, err = sr.WriterForCheckCrc32.Write(p)
325 // emptyFrame is emptying SelectObjectResponse Frame information
326 func (sr *SelectObjectResponse) emptyFrame() {
327 crcCalc := crc32.NewIEEE()
328 sr.WriterForCheckCrc32 = crcCalc
331 sr.Frame.ConsumedBytesLength = 0
332 sr.Frame.OpenLine = false
333 sr.Frame.Version = byte(0)
334 sr.Frame.FrameType = 0
335 sr.Frame.PayloadLength = 0
336 sr.Frame.HeaderCheckSum = 0
340 sr.Frame.EndFrame.TotalScanned = 0
341 sr.Frame.EndFrame.HTTPStatusCode = 0
342 sr.Frame.EndFrame.ErrorMsg = ""
344 sr.Frame.MetaEndFrameCSV.TotalScanned = 0
345 sr.Frame.MetaEndFrameCSV.Status = 0
346 sr.Frame.MetaEndFrameCSV.SplitsCount = 0
347 sr.Frame.MetaEndFrameCSV.RowsCount = 0
348 sr.Frame.MetaEndFrameCSV.ColumnsCount = 0
349 sr.Frame.MetaEndFrameCSV.ErrorMsg = ""
351 sr.Frame.MetaEndFrameJSON.TotalScanned = 0
352 sr.Frame.MetaEndFrameJSON.Status = 0
353 sr.Frame.MetaEndFrameJSON.SplitsCount = 0
354 sr.Frame.MetaEndFrameJSON.RowsCount = 0
355 sr.Frame.MetaEndFrameJSON.ErrorMsg = ""
357 sr.Frame.PayloadChecksum = 0
360 // bytesToInt byte's array trans to int
361 func bytesToInt(b []byte, ret interface{}) {
362 binBuf := bytes.NewBuffer(b)
363 binary.Read(binBuf, binary.BigEndian, ret)