OSDN Git Service

Create ossClient.go (#574)
[bytom/vapor.git] / vendor / github.com / aliyun / aliyun-oss-go-sdk / oss / select_object_type.go
1 package oss
2
3 import (
4         "bytes"
5         "encoding/binary"
6         "fmt"
7         "hash"
8         "hash/crc32"
9         "io"
10         "net/http"
11         "time"
12 )
13
14 // The adapter class for Select object's response.
15 // The response consists of frames. Each frame has the following format:
16
17 // Type  |   Payload Length |  Header Checksum | Payload | Payload Checksum
18
19 // |<4-->|  <--4 bytes------><---4 bytes-------><-n/a-----><--4 bytes--------->
20 // And we have three kind of frames.
21 // Data Frame:
22 // Type:8388609
23 // Payload:   Offset    |    Data
24 //            <-8 bytes>
25
26 // Continuous Frame
27 // Type:8388612
28 // Payload: Offset  (8-bytes)
29
30 // End Frame
31 // Type:8388613
32 // Payload: Offset | total scanned bytes | http status code | error message
33 //     <-- 8bytes--><-----8 bytes--------><---4 bytes-------><---variabe--->
34
35 // SelectObjectResponse defines HTTP response from OSS SelectObject
36 type SelectObjectResponse struct {
37         StatusCode          int
38         Headers             http.Header
39         Body                io.ReadCloser
40         Frame               SelectObjectResult
41         ReadTimeOut         uint
42         ClientCRC32         uint32
43         ServerCRC32         uint32
44         WriterForCheckCrc32 hash.Hash32
45         Finish              bool
46 }
47
48 func (sr *SelectObjectResponse) Read(p []byte) (n int, err error) {
49         n, err = sr.readFrames(p)
50         return
51 }
52
53 // Close http reponse body
54 func (sr *SelectObjectResponse) Close() error {
55         return sr.Body.Close()
56 }
57
58 // PostSelectResult is the request of SelectObject
59 type PostSelectResult struct {
60         Response *SelectObjectResponse
61 }
62
63 // readFrames is read Frame
64 func (sr *SelectObjectResponse) readFrames(p []byte) (int, error) {
65         var nn int
66         var err error
67         var checkValid bool
68         if sr.Frame.OutputRawData == true {
69                 nn, err = sr.Body.Read(p)
70                 return nn, err
71         }
72
73         if sr.Finish {
74                 return 0, io.EOF
75         }
76
77         for {
78                 // if this Frame is Readed, then not reading Header
79                 if sr.Frame.OpenLine != true {
80                         err = sr.analysisHeader()
81                         if err != nil {
82                                 return nn, err
83                         }
84                 }
85
86                 if sr.Frame.FrameType == DataFrameType {
87                         n, err := sr.analysisData(p[nn:])
88                         if err != nil {
89                                 return nn, err
90                         }
91                         nn += n
92
93                         // if this Frame is readed all data, then empty the Frame to read it with next frame
94                         if sr.Frame.ConsumedBytesLength == sr.Frame.PayloadLength-8 {
95                                 checkValid, err = sr.checkPayloadSum()
96                                 if err != nil || !checkValid {
97                                         return nn, fmt.Errorf("%s", err.Error())
98                                 }
99                                 sr.emptyFrame()
100                         }
101
102                         if nn == len(p) {
103                                 return nn, nil
104                         }
105                 } else if sr.Frame.FrameType == ContinuousFrameType {
106                         checkValid, err = sr.checkPayloadSum()
107                         if err != nil || !checkValid {
108                                 return nn, fmt.Errorf("%s", err.Error())
109                         }
110                 } else if sr.Frame.FrameType == EndFrameType {
111                         err = sr.analysisEndFrame()
112                         if err != nil {
113                                 return nn, err
114                         }
115                         checkValid, err = sr.checkPayloadSum()
116                         if checkValid {
117                                 sr.Finish = true
118                         }
119                         return nn, err
120                 } else if sr.Frame.FrameType == MetaEndFrameCSVType {
121                         err = sr.analysisMetaEndFrameCSV()
122                         if err != nil {
123                                 return nn, err
124                         }
125                         checkValid, err = sr.checkPayloadSum()
126                         if checkValid {
127                                 sr.Finish = true
128                         }
129                         return nn, err
130                 } else if sr.Frame.FrameType == MetaEndFrameJSONType {
131                         err = sr.analysisMetaEndFrameJSON()
132                         if err != nil {
133                                 return nn, err
134                         }
135                         checkValid, err = sr.checkPayloadSum()
136                         if checkValid {
137                                 sr.Finish = true
138                         }
139                         return nn, err
140                 }
141         }
142         return nn, nil
143 }
144
145 type chanReadIO struct {
146         readLen int
147         err     error
148 }
149
150 func (sr *SelectObjectResponse) readLen(p []byte, timeOut time.Duration) (int, error) {
151         r := sr.Body
152         ch := make(chan chanReadIO, 1)
153         defer close(ch)
154         go func(p []byte) {
155                 var needReadLength int
156                 readChan := chanReadIO{}
157                 needReadLength = len(p)
158                 for {
159                         n, err := r.Read(p[readChan.readLen:needReadLength])
160                         readChan.readLen += n
161                         if err != nil {
162                                 readChan.err = err
163                                 ch <- readChan
164                                 return
165                         }
166
167                         if readChan.readLen == needReadLength {
168                                 break
169                         }
170                 }
171                 ch <- readChan
172         }(p)
173
174         select {
175         case <-time.After(time.Second * timeOut):
176                 return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", sr.Headers.Get(HTTPHeaderOssRequestID), timeOut, len(p))
177         case result := <-ch:
178                 return result.readLen, result.err
179         }
180 }
181
182 // analysisHeader is reading selectObject response body's header
183 func (sr *SelectObjectResponse) analysisHeader() error {
184         headFrameByte := make([]byte, 20)
185         _, err := sr.readLen(headFrameByte, time.Duration(sr.ReadTimeOut))
186         if err != nil {
187                 return fmt.Errorf("requestId: %s, Read response frame header failure,err:%s", sr.Headers.Get(HTTPHeaderOssRequestID), err.Error())
188         }
189
190         frameTypeByte := headFrameByte[0:4]
191         sr.Frame.Version = frameTypeByte[0]
192         frameTypeByte[0] = 0
193         bytesToInt(frameTypeByte, &sr.Frame.FrameType)
194
195         if sr.Frame.FrameType != DataFrameType && sr.Frame.FrameType != ContinuousFrameType &&
196                 sr.Frame.FrameType != EndFrameType && sr.Frame.FrameType != MetaEndFrameCSVType && sr.Frame.FrameType != MetaEndFrameJSONType {
197                 return fmt.Errorf("requestId: %s, Unexpected frame type: %d", sr.Headers.Get(HTTPHeaderOssRequestID), sr.Frame.FrameType)
198         }
199
200         payloadLengthByte := headFrameByte[4:8]
201         bytesToInt(payloadLengthByte, &sr.Frame.PayloadLength)
202         headCheckSumByte := headFrameByte[8:12]
203         bytesToInt(headCheckSumByte, &sr.Frame.HeaderCheckSum)
204         byteOffset := headFrameByte[12:20]
205         bytesToInt(byteOffset, &sr.Frame.Offset)
206         sr.Frame.OpenLine = true
207
208         err = sr.writerCheckCrc32(byteOffset)
209         return err
210 }
211
212 // analysisData is reading the DataFrameType data of selectObject response body
213 func (sr *SelectObjectResponse) analysisData(p []byte) (int, error) {
214         var needReadLength int32
215         lenP := int32(len(p))
216         restByteLength := sr.Frame.PayloadLength - 8 - sr.Frame.ConsumedBytesLength
217         if lenP <= restByteLength {
218                 needReadLength = lenP
219         } else {
220                 needReadLength = restByteLength
221         }
222         n, err := sr.readLen(p[:needReadLength], time.Duration(sr.ReadTimeOut))
223         if err != nil {
224                 return n, fmt.Errorf("read frame data error,%s", err.Error())
225         }
226         sr.Frame.ConsumedBytesLength += int32(n)
227         err = sr.writerCheckCrc32(p[:n])
228         return n, err
229 }
230
231 // analysisEndFrame is reading the EndFrameType data of selectObject response body
232 func (sr *SelectObjectResponse) analysisEndFrame() error {
233         var eF EndFrame
234         payLoadBytes := make([]byte, sr.Frame.PayloadLength-8)
235         _, err := sr.readLen(payLoadBytes, time.Duration(sr.ReadTimeOut))
236         if err != nil {
237                 return fmt.Errorf("read end frame error:%s", err.Error())
238         }
239         bytesToInt(payLoadBytes[0:8], &eF.TotalScanned)
240         bytesToInt(payLoadBytes[8:12], &eF.HTTPStatusCode)
241         errMsgLength := sr.Frame.PayloadLength - 20
242         eF.ErrorMsg = string(payLoadBytes[12 : errMsgLength+12])
243         sr.Frame.EndFrame.TotalScanned = eF.TotalScanned
244         sr.Frame.EndFrame.HTTPStatusCode = eF.HTTPStatusCode
245         sr.Frame.EndFrame.ErrorMsg = eF.ErrorMsg
246         err = sr.writerCheckCrc32(payLoadBytes)
247         return err
248 }
249
250 // analysisMetaEndFrameCSV is reading the MetaEndFrameCSVType data of selectObject response body
251 func (sr *SelectObjectResponse) analysisMetaEndFrameCSV() error {
252         var mCF MetaEndFrameCSV
253         payLoadBytes := make([]byte, sr.Frame.PayloadLength-8)
254         _, err := sr.readLen(payLoadBytes, time.Duration(sr.ReadTimeOut))
255         if err != nil {
256                 return fmt.Errorf("read meta end csv frame error:%s", err.Error())
257         }
258
259         bytesToInt(payLoadBytes[0:8], &mCF.TotalScanned)
260         bytesToInt(payLoadBytes[8:12], &mCF.Status)
261         bytesToInt(payLoadBytes[12:16], &mCF.SplitsCount)
262         bytesToInt(payLoadBytes[16:24], &mCF.RowsCount)
263         bytesToInt(payLoadBytes[24:28], &mCF.ColumnsCount)
264         errMsgLength := sr.Frame.PayloadLength - 36
265         mCF.ErrorMsg = string(payLoadBytes[28 : errMsgLength+28])
266         sr.Frame.MetaEndFrameCSV.ErrorMsg = mCF.ErrorMsg
267         sr.Frame.MetaEndFrameCSV.TotalScanned = mCF.TotalScanned
268         sr.Frame.MetaEndFrameCSV.Status = mCF.Status
269         sr.Frame.MetaEndFrameCSV.SplitsCount = mCF.SplitsCount
270         sr.Frame.MetaEndFrameCSV.RowsCount = mCF.RowsCount
271         sr.Frame.MetaEndFrameCSV.ColumnsCount = mCF.ColumnsCount
272         err = sr.writerCheckCrc32(payLoadBytes)
273         return err
274 }
275
276 // analysisMetaEndFrameJSON is reading the MetaEndFrameJSONType data of selectObject response body
277 func (sr *SelectObjectResponse) analysisMetaEndFrameJSON() error {
278         var mJF MetaEndFrameJSON
279         payLoadBytes := make([]byte, sr.Frame.PayloadLength-8)
280         _, err := sr.readLen(payLoadBytes, time.Duration(sr.ReadTimeOut))
281         if err != nil {
282                 return fmt.Errorf("read meta end json frame error:%s", err.Error())
283         }
284
285         bytesToInt(payLoadBytes[0:8], &mJF.TotalScanned)
286         bytesToInt(payLoadBytes[8:12], &mJF.Status)
287         bytesToInt(payLoadBytes[12:16], &mJF.SplitsCount)
288         bytesToInt(payLoadBytes[16:24], &mJF.RowsCount)
289         errMsgLength := sr.Frame.PayloadLength - 32
290         mJF.ErrorMsg = string(payLoadBytes[24 : errMsgLength+24])
291         sr.Frame.MetaEndFrameJSON.ErrorMsg = mJF.ErrorMsg
292         sr.Frame.MetaEndFrameJSON.TotalScanned = mJF.TotalScanned
293         sr.Frame.MetaEndFrameJSON.Status = mJF.Status
294         sr.Frame.MetaEndFrameJSON.SplitsCount = mJF.SplitsCount
295         sr.Frame.MetaEndFrameJSON.RowsCount = mJF.RowsCount
296
297         err = sr.writerCheckCrc32(payLoadBytes)
298         return err
299 }
300
301 func (sr *SelectObjectResponse) checkPayloadSum() (bool, error) {
302         payLoadChecksumByte := make([]byte, 4)
303         n, err := sr.readLen(payLoadChecksumByte, time.Duration(sr.ReadTimeOut))
304         if n == 4 {
305                 bytesToInt(payLoadChecksumByte, &sr.Frame.PayloadChecksum)
306                 sr.ServerCRC32 = sr.Frame.PayloadChecksum
307                 sr.ClientCRC32 = sr.WriterForCheckCrc32.Sum32()
308                 if sr.Frame.EnablePayloadCrc == true && sr.ServerCRC32 != 0 && sr.ServerCRC32 != sr.ClientCRC32 {
309                         return false, fmt.Errorf("RequestId: %s, Unexpected frame type: %d, client %d but server %d",
310                                 sr.Headers.Get(HTTPHeaderOssRequestID), sr.Frame.FrameType, sr.ClientCRC32, sr.ServerCRC32)
311                 }
312                 return true, err
313         }
314         return false, fmt.Errorf("RequestId:%s, read checksum error:%s", sr.Headers.Get(HTTPHeaderOssRequestID), err.Error())
315 }
316
317 func (sr *SelectObjectResponse) writerCheckCrc32(p []byte) (err error) {
318         err = nil
319         if sr.Frame.EnablePayloadCrc == true {
320                 _, err = sr.WriterForCheckCrc32.Write(p)
321         }
322         return err
323 }
324
325 // emptyFrame is emptying SelectObjectResponse Frame information
326 func (sr *SelectObjectResponse) emptyFrame() {
327         crcCalc := crc32.NewIEEE()
328         sr.WriterForCheckCrc32 = crcCalc
329         sr.Finish = false
330
331         sr.Frame.ConsumedBytesLength = 0
332         sr.Frame.OpenLine = false
333         sr.Frame.Version = byte(0)
334         sr.Frame.FrameType = 0
335         sr.Frame.PayloadLength = 0
336         sr.Frame.HeaderCheckSum = 0
337         sr.Frame.Offset = 0
338         sr.Frame.Data = ""
339
340         sr.Frame.EndFrame.TotalScanned = 0
341         sr.Frame.EndFrame.HTTPStatusCode = 0
342         sr.Frame.EndFrame.ErrorMsg = ""
343
344         sr.Frame.MetaEndFrameCSV.TotalScanned = 0
345         sr.Frame.MetaEndFrameCSV.Status = 0
346         sr.Frame.MetaEndFrameCSV.SplitsCount = 0
347         sr.Frame.MetaEndFrameCSV.RowsCount = 0
348         sr.Frame.MetaEndFrameCSV.ColumnsCount = 0
349         sr.Frame.MetaEndFrameCSV.ErrorMsg = ""
350
351         sr.Frame.MetaEndFrameJSON.TotalScanned = 0
352         sr.Frame.MetaEndFrameJSON.Status = 0
353         sr.Frame.MetaEndFrameJSON.SplitsCount = 0
354         sr.Frame.MetaEndFrameJSON.RowsCount = 0
355         sr.Frame.MetaEndFrameJSON.ErrorMsg = ""
356
357         sr.Frame.PayloadChecksum = 0
358 }
359
360 // bytesToInt byte's array trans to int
361 func bytesToInt(b []byte, ret interface{}) {
362         binBuf := bytes.NewBuffer(b)
363         binary.Read(binBuf, binary.BigEndian, ret)
364 }