13 type OssSelectCsvSuite struct {
18 var _ = Suite(&OssSelectCsvSuite{})
20 func (s *OssSelectCsvSuite) SetUpSuite(c *C) {
21 client, err := New(endpoint, accessID, accessKey)
24 s.client.Config.LogLevel = Error // Debug
25 // s.client.Config.Timeout = 5
26 err = s.client.CreateBucket(bucketName)
28 bucket, err := s.client.Bucket(bucketName)
32 testLogger.Println("test select csv started")
35 func (s *OssSelectCsvSuite) TearDownSuite(c *C) {
39 lor, err := s.bucket.ListObjects(marker)
41 for _, object := range lor.Objects {
42 err = s.bucket.DeleteObject(object.Key)
45 marker = Marker(lor.NextMarker)
51 err := s.client.DeleteBucket(bucketName)
54 testLogger.Println("test select csv completed")
57 func (s *OssSelectCsvSuite) SetUpTest(c *C) {
58 testLogger.Println("test func", c.TestName(), "start")
61 func (s *OssSelectCsvSuite) TearDownTest(c *C) {
62 testLogger.Println("test func", c.TestName(), "succeed")
65 // TestCreateSelectObjectMeta
66 func (s *OssSelectCsvSuite) TestCreateSelectCsvObjectMeta(c *C) {
67 key := "sample_data.csv"
68 localCsvFile := "../sample/sample_data.csv"
69 err := s.bucket.PutObjectFromFile(key, localCsvFile)
71 csvMeta := CsvMetaRequest{}
73 csvMeta.OverwriteIfExists = &bo
74 res, err := s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
76 l, err := readCsvLine(localCsvFile)
78 c.Assert(res.RowsCount, Equals, int64(l))
81 csvMeta.OverwriteIfExists = &bo
82 csvMeta.InputSerialization.CSV.RecordDelimiter = "\n"
83 csvMeta.InputSerialization.CSV.FieldDelimiter = ","
84 csvMeta.InputSerialization.CSV.QuoteCharacter = "\""
85 res, err = s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
87 c.Assert(res.RowsCount, Equals, int64(l))
89 err = s.bucket.DeleteObject(key)
93 func (s *OssSelectCsvSuite) TestSelectCsvObjectIsEmpty(c *C) {
94 key := "sample_data.csv"
95 localCsvFile := "../sample/sample_data.csv"
96 err := s.bucket.PutObjectFromFile(key, localCsvFile)
98 csvMeta := CsvMetaRequest{}
99 _, err = s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
101 selReq := SelectRequest{}
102 selReq.Expression = "select Year, StateAbbr, CityName, PopulationCount from ossobject where CityName != ''"
103 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
105 body, err := s.bucket.SelectObject(key, selReq)
109 p := make([]byte, 512)
110 n, err := body.Read(p)
112 c.Assert(n, Equals, 512)
113 p1 := make([]byte, 3)
114 _, err = body.Read(p1)
116 rets, err := ioutil.ReadAll(body)
118 str, err := readCsvIsEmpty(localCsvFile)
120 c.Assert(string(p)+string(p1)+string(rets), Equals, str)
122 err = s.bucket.DeleteObject(key)
126 func (s *OssSelectCsvSuite) TestSelectObjectIntoFile(c *C) {
128 key := "sample_data.csv"
129 localCsvFile := "../sample/sample_data.csv"
130 err := s.bucket.PutObjectFromFile(key, localCsvFile)
132 csvMeta := CsvMetaRequest{
133 InputSerialization: InputSerialization {
135 RecordDelimiter: "\n",
137 QuoteCharacter: "\"",
140 OverwriteIfExists: &bo,
142 res, err := s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
144 l, err := readCsvLine(localCsvFile)
146 c.Assert(res.RowsCount, Equals, int64(l))
148 selReq := SelectRequest{
149 Expression:"select * from ossobject",
150 InputSerializationSelect: InputSerializationSelect {
151 CsvBodyInput :CSVSelectInput{
152 FileHeaderInfo: "None",
153 CommentCharacter: "#",
154 RecordDelimiter: "\n",
161 outfile := "sample_data_out.csv"
162 err = s.bucket.SelectObjectIntoFile(key, outfile, selReq)
165 fd1, err := os.Open(outfile)
168 fd2, err := os.Open(localCsvFile)
171 str1, err := ioutil.ReadAll(fd1)
173 str2 ,err := ioutil.ReadAll(fd2)
175 c.Assert(string(str1), Equals, string(str2))
177 err = os.Remove(outfile)
179 err = s.bucket.DeleteObject(key)
183 func(s *OssSelectCsvSuite) TestSelectCsvObjectRange(c *C) {
184 key := "sample_data.csv"
185 localCsvFile := "../sample/sample_data.csv"
186 err := s.bucket.PutObjectFromFile(key, localCsvFile)
188 csvMeta := CsvMetaRequest{}
189 _,err = s.bucket.CreateSelectCsvObjectMeta(key, csvMeta)
191 selReq := SelectRequest{}
192 selReq.Expression = "select Year,StateAbbr, CityName, Short_Question_Text from ossobject"
193 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
194 selReq.InputSerializationSelect.CsvBodyInput.Range = "0-2"
195 body, err := s.bucket.SelectObject(key, selReq)
198 rets, err := ioutil.ReadAll(body)
200 str,err := readCsvRange(localCsvFile, 0, 2)
202 c.Assert(string(rets), Equals, str)
204 err = s.bucket.DeleteObject(key)
208 func(s *OssSelectCsvSuite) TestSelectCsvObjectLike(c *C) {
209 key := "sample_data.csv"
210 localCsvFile := "../sample/sample_data.csv"
211 err := s.bucket.PutObjectFromFile(key, localCsvFile)
213 selReq := SelectRequest{}
214 selReq.Expression = "select Year, StateAbbr, CityName, Short_Question_Text from ossobject where Measure like '%blood pressure%Years'"
215 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
216 ret,err := s.bucket.SelectObject(key, selReq)
219 ts, err := ioutil.ReadAll(ret)
221 str, err := readCsvLike(localCsvFile)
223 c.Assert(string(ts), Equals, str)
225 err = s.bucket.DeleteObject(key)
229 func(s *OssSelectCsvSuite) TestSelectCsvObjectIntAggregation(c *C) {
230 key := "sample_data.csv"
231 localCsvFile := "../sample/sample_data.csv"
232 err := s.bucket.PutObjectFromFile(key, localCsvFile)
234 selReq := SelectRequest{}
235 selReq.Expression = `select avg(cast(year as int)), max(cast(year as int)), min(cast(year as int)) from ossobject where year = 2015`
236 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
237 ret,err := s.bucket.SelectObject(key, selReq)
240 ts, err := ioutil.ReadAll(ret)
243 c.Assert(string(ts), Equals, "2015,2015,2015\n")
245 err = s.bucket.DeleteObject(key)
249 func(s *OssSelectCsvSuite) TestSelectCsvObjectFloatAggregation(c *C) {
250 key := "sample_data.csv"
251 localCsvFile := "../sample/sample_data.csv"
252 err := s.bucket.PutObjectFromFile(key, localCsvFile)
254 selReq := SelectRequest{}
255 selReq.Expression = `select avg(cast(data_value as double)), max(cast(data_value as double)), sum(cast(data_value as double)) from ossobject`
256 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
257 ret,err := s.bucket.SelectObject(key, selReq)
260 ts, err := ioutil.ReadAll(ret)
264 avg, max, sum , err := readCsvFloatAgg(localCsvFile)
267 s1 := strconv.FormatFloat(avg, 'f', 5, 32) + ","
268 s1 += strconv.FormatFloat(max, 'f', 5, 32) + ","
269 s1 += strconv.FormatFloat(sum, 'f', 5, 32) + ","
271 for _, v := range strings.Split(strR[:len(strR)-1], ",") {
272 vv, err := strconv.ParseFloat(v, 64)
274 retS += strconv.FormatFloat(vv, 'f', 5, 32) + ","
276 c.Assert(s1, Equals, retS)
278 err = s.bucket.DeleteObject(key)
282 func(s *OssSelectCsvSuite) TestSelectCsvObjectConcat(c *C) {
283 key := "sample_data.csv"
284 localCsvFile := "../sample/sample_data.csv"
285 err := s.bucket.PutObjectFromFile(key, localCsvFile)
287 selReq := SelectRequest{}
288 selReq.Expression = `select Year,StateAbbr, CityName, Short_Question_Text from ossobject where (data_value || data_value_unit) = '14.8%'`
289 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
290 ret,err := s.bucket.SelectObject(key, selReq)
293 ts, err := ioutil.ReadAll(ret)
296 str, err := readCsvConcat(localCsvFile)
298 c.Assert(string(ts), Equals, str)
300 err = s.bucket.DeleteObject(key)
304 func (s *OssSelectCsvSuite) TestSelectCsvObjectComplicateConcat(c *C) {
305 key := "sample_data.csv"
306 localCsvFile := "../sample/sample_data.csv"
307 err := s.bucket.PutObjectFromFile(key, localCsvFile)
309 selReq := SelectRequest{}
310 selReq.Expression = `
312 Year,StateAbbr, CityName, Short_Question_Text, data_value,
313 data_value_unit, category, high_confidence_limit
317 data_value > 14.8 and
318 data_value_unit = '%' or
319 Measure like '%18 Years' and
320 Category = 'Unhealthy Behaviors' or
321 high_confidence_limit > 70.0 `
323 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
324 ret,err := s.bucket.SelectObject(key, selReq)
327 ts, err := ioutil.ReadAll(ret)
330 str, err := readCsvComplicateCondition(localCsvFile)
332 c.Assert(string(ts), Equals, str)
334 err = s.bucket.DeleteObject(key)
338 func (s *OssSelectCsvSuite) TestSelectCsvObjectInvalidSql(c *C) {
339 key := "sample_data.csv"
340 localCsvFile := "../sample/sample_data.csv"
341 err := s.bucket.PutObjectFromFile(key, localCsvFile)
343 selReq := SelectRequest{}
344 selReq.Expression = `select * from ossobject where avg(cast(year as int)) > 2016`
345 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
346 _, err = s.bucket.SelectObject(key, selReq)
347 c.Assert(err, NotNil)
349 selReq.Expression = ``
350 _, err = s.bucket.SelectObject(key, selReq)
351 c.Assert(err, NotNil)
353 selReq.Expression = `select year || CityName from ossobject`
354 _, err = s.bucket.SelectObject(key, selReq)
355 c.Assert(err, NotNil)
357 selReq.Expression = `select * from ossobject group by CityName`
358 _, err = s.bucket.SelectObject(key, selReq)
359 c.Assert(err, NotNil)
361 selReq.Expression = `select * from ossobject order by _1`
362 _, err = s.bucket.SelectObject(key, selReq)
363 c.Assert(err, NotNil)
365 selReq.Expression = `select * from ossobject oss join s3object s3 on oss.CityName = s3.CityName`
366 _, err = s.bucket.SelectObject(key, selReq)
367 c.Assert(err, NotNil)
369 selReq.Expression = `select _1 from ossobject`
370 ret, err := s.bucket.SelectObject(key, selReq)
373 _, err = ioutil.ReadAll(ret)
376 err = s.bucket.DeleteObject(key)
380 func (s *OssSelectCsvSuite) TestSelectCsvObjectWithOutputDelimiters(c *C) {
381 key := "sample_data.csv"
382 content := "abc,def\n"
383 err := s.bucket.PutObject(key, strings.NewReader(content))
385 selReq := SelectRequest{}
386 selReq.Expression = `select _1, _2 from ossobject `
387 selReq.OutputSerializationSelect.CsvBodyOutput.RecordDelimiter = "\r\n"
388 selReq.OutputSerializationSelect.CsvBodyOutput.FieldDelimiter = "|"
390 ret,err := s.bucket.SelectObject(key, selReq)
393 ts, err := ioutil.ReadAll(ret)
395 c.Assert(string(ts), Equals, "abc|def\r\n")
397 err = s.bucket.DeleteObject(key)
401 func (s *OssSelectCsvSuite) TestSelectCsvObjectWithCrc(c *C) {
402 key := "sample_data.csv"
403 content := "abc,def\n"
404 err := s.bucket.PutObject(key, strings.NewReader(content))
406 selReq := SelectRequest{}
407 selReq.Expression = `select * from ossobject`
409 selReq.OutputSerializationSelect.EnablePayloadCrc = &bo
411 ret,err := s.bucket.SelectObject(key, selReq)
414 ts, err := ioutil.ReadAll(ret)
416 c.Assert(string(ts), Equals, content)
418 err = s.bucket.DeleteObject(key)
422 func (s *OssSelectCsvSuite) TestSelectCsvObjectWithSkipPartialData(c *C) {
423 key := "sample_data.csv"
424 content := "abc,def\nefg\n"
425 err := s.bucket.PutObject(key, strings.NewReader(content))
427 selReq := SelectRequest{}
428 selReq.Expression = `select _1, _2 from ossobject`
430 selReq.SelectOptions.SkipPartialDataRecord = &bo
431 ret,err := s.bucket.SelectObject(key, selReq)
434 ts, err := ioutil.ReadAll(ret)
436 c.Assert(string(ts), Equals, "abc,def\n")
438 err = s.bucket.DeleteObject(key)
442 func (s *OssSelectCsvSuite) TestSelectCsvObjectWithOutputRaw(c *C) {
443 key := "sample_data.csv"
444 content := "abc,def\n"
445 err := s.bucket.PutObject(key, strings.NewReader(content))
447 selReq := SelectRequest{}
448 selReq.Expression = `select _1 from ossobject`
450 selReq.OutputSerializationSelect.OutputRawData = &bo
452 ret,err := s.bucket.SelectObject(key, selReq)
455 ts, err := ioutil.ReadAll(ret)
457 c.Assert(string(ts), Equals, "abc\n")
459 err = s.bucket.DeleteObject(key)
463 func (s *OssSelectCsvSuite) TestSelectCsvObjectWithKeepColumns(c *C) {
464 key := "sample_data.csv"
465 content := "abc,def\n"
466 err := s.bucket.PutObject(key, strings.NewReader(content))
468 selReq := SelectRequest{}
469 selReq.Expression = `select _1 from ossobject`
471 selReq.OutputSerializationSelect.KeepAllColumns = &bo
473 ret,err := s.bucket.SelectObject(key, selReq)
476 ts, err := ioutil.ReadAll(ret)
478 c.Assert(string(ts), Equals, "abc,\n")
480 err = s.bucket.DeleteObject(key)
484 func (s *OssSelectCsvSuite) TestSelectCsvObjectWithOutputHeader(c *C) {
485 key := "sample_data.csv"
486 content := "name,job\nabc,def\n"
487 err := s.bucket.PutObject(key, strings.NewReader(content))
489 selReq := SelectRequest{}
490 selReq.Expression = `select name from ossobject`
492 selReq.OutputSerializationSelect.OutputHeader = &bo
493 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
495 ret,err := s.bucket.SelectObject(key, selReq)
498 ts, err := ioutil.ReadAll(ret)
500 c.Assert(string(ts), Equals, "name\nabc\n")
502 err = s.bucket.DeleteObject(key)
506 func (s *OssSelectCsvSuite) TestSelectCsvObjectRead(c *C) {
507 key := "sample_data.csv"
508 content := "name,job\nabc,def\n"
509 err := s.bucket.PutObject(key, strings.NewReader(content))
511 selReq := SelectRequest{}
512 selReq.Expression = `select name from ossobject`
514 selReq.OutputSerializationSelect.OutputHeader = &bo
515 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
516 selReq.OutputSerializationSelect.EnablePayloadCrc = &bo
518 ret,err := s.bucket.SelectObject(key, selReq)
522 // case 1: read length > data length
523 p := make([]byte, 512)
524 n, err := ret.Read(p[:20])
525 if err != nil && err != io.EOF {
528 c.Assert(string(p[:n]), Equals, "name\nabc\n")
529 ts, err := ioutil.ReadAll(ret)
531 c.Assert(string(ts), Equals, "")
533 // case 2: read length = data length
534 ret,err = s.bucket.SelectObject(key, selReq)
537 n, err = ret.Read(p[:9])
538 if err != nil && err != io.EOF {
541 c.Assert(string(p[:n]), Equals, "name\nabc\n")
542 ts, err = ioutil.ReadAll(ret)
544 c.Assert(string(ts), Equals, "")
546 // case 3: read length > one frame length and read length < two frame, (this data = 2 * frame length)
547 ret,err = s.bucket.SelectObject(key, selReq)
550 n, err = ret.Read(p[:7])
551 if err != nil && err != io.EOF {
554 c.Assert(string(p[:n]), Equals, "name\nab")
555 ts, err = ioutil.ReadAll(ret)
557 c.Assert(string(ts), Equals, "c\n")
559 // case 4: read length = a frame length (this data = 2 * frame length)
560 ret,err = s.bucket.SelectObject(key, selReq)
563 n, err = ret.Read(p[:5])
564 if err != nil && err != io.EOF {
567 c.Assert(string(p[:n]), Equals, "name\n")
568 ts, err = ioutil.ReadAll(ret)
570 c.Assert(string(ts), Equals, "abc\n")
572 // case 5: read length < a frame length (this data = 2 * frame length)
573 ret,err = s.bucket.SelectObject(key, selReq)
576 n, err = ret.Read(p[:3])
577 if err != nil && err != io.EOF {
580 c.Assert(string(p[:n]), Equals, "nam")
581 ts, err = ioutil.ReadAll(ret)
583 c.Assert(string(ts), Equals, "e\nabc\n")
585 err = s.bucket.DeleteObject(key)
589 // OssProgressListener is the progress listener
590 type OssSelectProgressListener struct {
592 // ProgressChanged handles progress event
593 func (listener *OssSelectProgressListener) ProgressChanged(event *ProgressEvent) {
594 switch event.EventType {
595 case TransferStartedEvent:
596 testLogger.Printf("Transfer Started.\n")
597 case TransferDataEvent:
598 testLogger.Printf("Transfer Data, This time consumedBytes: %d \n", event.ConsumedBytes)
599 case TransferCompletedEvent:
600 testLogger.Printf("Transfer Completed, This time consumedBytes: %d.\n", event.ConsumedBytes)
601 case TransferFailedEvent:
602 testLogger.Printf("Transfer Failed, This time consumedBytes: %d.\n", event.ConsumedBytes)
607 func(s *OssSelectCsvSuite) TestSelectCsvObjectConcatProgress(c *C) {
608 key := "sample_data.csv"
609 localCsvFile := "../sample/sample_data.csv"
610 err := s.bucket.PutObjectFromFile(key, localCsvFile)
612 selReq := SelectRequest{}
613 selReq.Expression = `select Year,StateAbbr, CityName, Short_Question_Text from ossobject where (data_value || data_value_unit) = '14.8%'`
614 selReq.InputSerializationSelect.CsvBodyInput.FileHeaderInfo = "Use"
615 ret,err := s.bucket.SelectObject(key, selReq, Progress(&OssSelectProgressListener{}))
618 ts, err := ioutil.ReadAll(ret)
621 str, err := readCsvConcat(localCsvFile)
623 c.Assert(string(ts), Equals, str)
625 err = s.bucket.DeleteObject(key)