23 // Conn defines OSS Conn
30 var signKeyList = []string{"acl", "uploads", "location", "cors",
31 "logging", "website", "referer", "lifecycle",
32 "delete", "append", "tagging", "objectMeta",
33 "uploadId", "partNumber", "security-token",
34 "position", "img", "style", "styleName",
35 "replication", "replicationProgress",
36 "replicationLocation", "cname", "bucketInfo",
37 "comp", "qos", "live", "status", "vod",
38 "startTime", "endTime", "symlink",
39 "x-oss-process", "response-content-type", "x-oss-traffic-limit",
40 "response-content-language", "response-expires",
41 "response-cache-control", "response-content-disposition",
42 "response-content-encoding", "udf", "udfName", "udfImage",
43 "udfId", "udfImageDesc", "udfApplication", "comp",
44 "udfApplicationLog", "restore", "callback", "callback-var", "qosInfo",
45 "policy", "stat", "encryption", "versions", "versioning", "versionId", "requestPayment",
46 "x-oss-request-payer", "sequential",
47 "inventory", "inventoryId", "continuation-token", "asyncFetch",
48 "worm", "wormId", "wormExtend"}
50 // init initializes Conn
51 func (conn *Conn) init(config *Config, urlMaker *urlMaker, client *http.Client) error {
54 transport := newTransport(conn, config)
57 if conn.config.IsUseProxy {
58 proxyURL, err := url.Parse(config.ProxyHost)
62 if config.IsAuthProxy {
63 if config.ProxyPassword != "" {
64 proxyURL.User = url.UserPassword(config.ProxyUser, config.ProxyPassword)
66 proxyURL.User = url.User(config.ProxyUser)
69 transport.Proxy = http.ProxyURL(proxyURL)
71 client = &http.Client{Transport: transport}
72 if !config.RedirectEnabled {
73 disableHTTPRedirect(client)
84 // Do sends request and returns the response
85 func (conn Conn) Do(method, bucketName, objectName string, params map[string]interface{}, headers map[string]string,
86 data io.Reader, initCRC uint64, listener ProgressListener) (*Response, error) {
87 urlParams := conn.getURLParams(params)
88 subResource := conn.getSubResource(params)
89 uri := conn.url.getURL(bucketName, objectName, urlParams)
90 resource := conn.getResource(bucketName, objectName, subResource)
91 return conn.doRequest(method, uri, resource, headers, data, initCRC, listener)
94 // DoURL sends the request with signed URL and returns the response result.
95 func (conn Conn) DoURL(method HTTPMethod, signedURL string, headers map[string]string,
96 data io.Reader, initCRC uint64, listener ProgressListener) (*Response, error) {
97 // Get URI from signedURL
98 uri, err := url.ParseRequestURI(signedURL)
103 m := strings.ToUpper(string(method))
104 req := &http.Request{
110 Header: make(http.Header),
114 tracker := &readerTracker{completedBytes: 0}
115 fd, crc := conn.handleBody(req, data, initCRC, listener, tracker)
123 if conn.config.IsAuthProxy {
124 auth := conn.config.ProxyUser + ":" + conn.config.ProxyPassword
125 basic := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
126 req.Header.Set("Proxy-Authorization", basic)
129 req.Header.Set(HTTPHeaderHost, req.Host)
130 req.Header.Set(HTTPHeaderUserAgent, conn.config.UserAgent)
133 for k, v := range headers {
139 event := newProgressEvent(TransferStartedEvent, 0, req.ContentLength, 0)
140 publishProgress(listener, event)
142 if conn.config.LogLevel >= Debug {
143 conn.LoggerHTTPReq(req)
146 resp, err := conn.client.Do(req)
149 event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength, 0)
150 publishProgress(listener, event)
151 conn.config.WriteLog(Debug, "[Resp:%p]http error:%s\n", req, err.Error())
155 if conn.config.LogLevel >= Debug {
156 //print out http resp
157 conn.LoggerHTTPResp(req, resp)
160 // Transfer completed
161 event = newProgressEvent(TransferCompletedEvent, tracker.completedBytes, req.ContentLength, 0)
162 publishProgress(listener, event)
164 return conn.handleResponse(resp, crc)
167 func (conn Conn) getURLParams(params map[string]interface{}) string {
169 keys := make([]string, 0, len(params))
170 for k := range params {
171 keys = append(keys, k)
177 for _, k := range keys {
181 buf.WriteString(url.QueryEscape(k))
182 if params[k] != nil {
183 buf.WriteString("=" + strings.Replace(url.QueryEscape(params[k].(string)), "+", "%20", -1))
190 func (conn Conn) getSubResource(params map[string]interface{}) string {
192 keys := make([]string, 0, len(params))
193 signParams := make(map[string]string)
194 for k := range params {
195 if conn.config.AuthVersion == AuthV2 {
196 encodedKey := url.QueryEscape(k)
197 keys = append(keys, encodedKey)
198 if params[k] != nil && params[k] != "" {
199 signParams[encodedKey] = strings.Replace(url.QueryEscape(params[k].(string)), "+", "%20", -1)
201 } else if conn.isParamSign(k) {
202 keys = append(keys, k)
203 if params[k] != nil {
204 signParams[k] = params[k].(string)
212 for _, k := range keys {
217 if _, ok := signParams[k]; ok {
218 buf.WriteString("=" + signParams[k])
224 func (conn Conn) isParamSign(paramKey string) bool {
225 for _, k := range signKeyList {
233 // getResource gets canonicalized resource
234 func (conn Conn) getResource(bucketName, objectName, subResource string) string {
235 if subResource != "" {
236 subResource = "?" + subResource
238 if bucketName == "" {
239 if conn.config.AuthVersion == AuthV2 {
240 return url.QueryEscape("/") + subResource
242 return fmt.Sprintf("/%s%s", bucketName, subResource)
244 if conn.config.AuthVersion == AuthV2 {
245 return url.QueryEscape("/"+bucketName+"/") + strings.Replace(url.QueryEscape(objectName), "+", "%20", -1) + subResource
247 return fmt.Sprintf("/%s/%s%s", bucketName, objectName, subResource)
250 func (conn Conn) doRequest(method string, uri *url.URL, canonicalizedResource string, headers map[string]string,
251 data io.Reader, initCRC uint64, listener ProgressListener) (*Response, error) {
252 method = strings.ToUpper(method)
253 req := &http.Request{
259 Header: make(http.Header),
263 tracker := &readerTracker{completedBytes: 0}
264 fd, crc := conn.handleBody(req, data, initCRC, listener, tracker)
272 if conn.config.IsAuthProxy {
273 auth := conn.config.ProxyUser + ":" + conn.config.ProxyPassword
274 basic := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
275 req.Header.Set("Proxy-Authorization", basic)
278 date := time.Now().UTC().Format(http.TimeFormat)
279 req.Header.Set(HTTPHeaderDate, date)
280 req.Header.Set(HTTPHeaderHost, req.Host)
281 req.Header.Set(HTTPHeaderUserAgent, conn.config.UserAgent)
283 akIf := conn.config.GetCredentials()
284 if akIf.GetSecurityToken() != "" {
285 req.Header.Set(HTTPHeaderOssSecurityToken, akIf.GetSecurityToken())
289 for k, v := range headers {
294 conn.signHeader(req, canonicalizedResource)
297 event := newProgressEvent(TransferStartedEvent, 0, req.ContentLength, 0)
298 publishProgress(listener, event)
300 if conn.config.LogLevel >= Debug {
301 conn.LoggerHTTPReq(req)
304 resp, err := conn.client.Do(req)
308 event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength, 0)
309 publishProgress(listener, event)
310 conn.config.WriteLog(Debug, "[Resp:%p]http error:%s\n", req, err.Error())
314 if conn.config.LogLevel >= Debug {
315 //print out http resp
316 conn.LoggerHTTPResp(req, resp)
319 // Transfer completed
320 event = newProgressEvent(TransferCompletedEvent, tracker.completedBytes, req.ContentLength, 0)
321 publishProgress(listener, event)
323 return conn.handleResponse(resp, crc)
326 func (conn Conn) signURL(method HTTPMethod, bucketName, objectName string, expiration int64, params map[string]interface{}, headers map[string]string) string {
327 akIf := conn.config.GetCredentials()
328 if akIf.GetSecurityToken() != "" {
329 params[HTTPParamSecurityToken] = akIf.GetSecurityToken()
332 m := strings.ToUpper(string(method))
333 req := &http.Request{
335 Header: make(http.Header),
338 if conn.config.IsAuthProxy {
339 auth := conn.config.ProxyUser + ":" + conn.config.ProxyPassword
340 basic := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
341 req.Header.Set("Proxy-Authorization", basic)
344 req.Header.Set(HTTPHeaderDate, strconv.FormatInt(expiration, 10))
345 req.Header.Set(HTTPHeaderUserAgent, conn.config.UserAgent)
348 for k, v := range headers {
353 if conn.config.AuthVersion == AuthV2 {
354 params[HTTPParamSignatureVersion] = "OSS2"
355 params[HTTPParamExpiresV2] = strconv.FormatInt(expiration, 10)
356 params[HTTPParamAccessKeyIDV2] = conn.config.AccessKeyID
357 additionalList, _ := conn.getAdditionalHeaderKeys(req)
358 if len(additionalList) > 0 {
359 params[HTTPParamAdditionalHeadersV2] = strings.Join(additionalList, ";")
363 subResource := conn.getSubResource(params)
364 canonicalizedResource := conn.getResource(bucketName, objectName, subResource)
365 signedStr := conn.getSignedStr(req, canonicalizedResource, akIf.GetAccessKeySecret())
367 if conn.config.AuthVersion == AuthV1 {
368 params[HTTPParamExpires] = strconv.FormatInt(expiration, 10)
369 params[HTTPParamAccessKeyID] = akIf.GetAccessKeyID()
370 params[HTTPParamSignature] = signedStr
371 } else if conn.config.AuthVersion == AuthV2 {
372 params[HTTPParamSignatureV2] = signedStr
374 urlParams := conn.getURLParams(params)
375 return conn.url.getSignURL(bucketName, objectName, urlParams)
378 func (conn Conn) signRtmpURL(bucketName, channelName, playlistName string, expiration int64) string {
379 params := map[string]interface{}{}
380 if playlistName != "" {
381 params[HTTPParamPlaylistName] = playlistName
383 expireStr := strconv.FormatInt(expiration, 10)
384 params[HTTPParamExpires] = expireStr
386 akIf := conn.config.GetCredentials()
387 if akIf.GetAccessKeyID() != "" {
388 params[HTTPParamAccessKeyID] = akIf.GetAccessKeyID()
389 if akIf.GetSecurityToken() != "" {
390 params[HTTPParamSecurityToken] = akIf.GetSecurityToken()
392 signedStr := conn.getRtmpSignedStr(bucketName, channelName, playlistName, expiration, akIf.GetAccessKeySecret(), params)
393 params[HTTPParamSignature] = signedStr
396 urlParams := conn.getURLParams(params)
397 return conn.url.getSignRtmpURL(bucketName, channelName, urlParams)
400 // handleBody handles request body
401 func (conn Conn) handleBody(req *http.Request, body io.Reader, initCRC uint64,
402 listener ProgressListener, tracker *readerTracker) (*os.File, hash.Hash64) {
406 readerLen, err := GetReaderLen(reader)
408 req.ContentLength = readerLen
410 req.Header.Set(HTTPHeaderContentLength, strconv.FormatInt(req.ContentLength, 10))
413 if body != nil && conn.config.IsEnableMD5 && req.Header.Get(HTTPHeaderContentMD5) == "" {
415 reader, md5, file, _ = calcMD5(body, req.ContentLength, conn.config.MD5Threshold)
416 req.Header.Set(HTTPHeaderContentMD5, md5)
420 if reader != nil && conn.config.IsEnableCRC {
421 crc = NewCRC(CrcTable(), initCRC)
422 reader = TeeReader(reader, crc, req.ContentLength, listener, tracker)
426 rc, ok := reader.(io.ReadCloser)
427 if !ok && reader != nil {
428 rc = ioutil.NopCloser(reader)
431 if conn.isUploadLimitReq(req) {
432 limitReader := &LimitSpeedReader{
434 ossLimiter: conn.config.UploadLimiter,
436 req.Body = limitReader
443 // isUploadLimitReq: judge limit upload speed or not
444 func (conn Conn) isUploadLimitReq(req *http.Request) bool {
445 if conn.config.UploadLimitSpeed == 0 || conn.config.UploadLimiter == nil {
449 if req.Method != "GET" && req.Method != "DELETE" && req.Method != "HEAD" {
450 if req.ContentLength > 0 {
457 func tryGetFileSize(f *os.File) int64 {
462 // handleResponse handles response
463 func (conn Conn) handleResponse(resp *http.Response, crc hash.Hash64) (*Response, error) {
467 statusCode := resp.StatusCode
468 if statusCode >= 400 && statusCode <= 505 {
469 // 4xx and 5xx indicate that the operation has error occurred
471 respBody, err := readResponseBody(resp)
476 if len(respBody) == 0 {
478 StatusCode: statusCode,
479 RequestID: resp.Header.Get(HTTPHeaderOssRequestID),
482 // Response contains storage service error object, unmarshal
483 srvErr, errIn := serviceErrFromXML(respBody, resp.StatusCode,
484 resp.Header.Get(HTTPHeaderOssRequestID))
485 if errIn != nil { // error unmarshaling the error response
486 err = fmt.Errorf("oss: service returned invalid response body, status = %s, RequestId = %s", resp.Status, resp.Header.Get(HTTPHeaderOssRequestID))
493 StatusCode: resp.StatusCode,
494 Headers: resp.Header,
495 Body: ioutil.NopCloser(bytes.NewReader(respBody)), // restore the body
497 } else if statusCode >= 300 && statusCode <= 307 {
498 // OSS use 3xx, but response has no body
499 err := fmt.Errorf("oss: service returned %d,%s", resp.StatusCode, resp.Status)
501 StatusCode: resp.StatusCode,
502 Headers: resp.Header,
507 if conn.config.IsEnableCRC && crc != nil {
510 srvCRC, _ = strconv.ParseUint(resp.Header.Get(HTTPHeaderOssCRC64), 10, 64)
514 StatusCode: resp.StatusCode,
515 Headers: resp.Header,
522 // LoggerHTTPReq Print the header information of the http request
523 func (conn Conn) LoggerHTTPReq(req *http.Request) {
524 var logBuffer bytes.Buffer
525 logBuffer.WriteString(fmt.Sprintf("[Req:%p]Method:%s\t", req, req.Method))
526 logBuffer.WriteString(fmt.Sprintf("Host:%s\t", req.URL.Host))
527 logBuffer.WriteString(fmt.Sprintf("Path:%s\t", req.URL.Path))
528 logBuffer.WriteString(fmt.Sprintf("Query:%s\t", req.URL.RawQuery))
529 logBuffer.WriteString(fmt.Sprintf("Header info:"))
531 for k, v := range req.Header {
532 var valueBuffer bytes.Buffer
533 for j := 0; j < len(v); j++ {
535 valueBuffer.WriteString(" ")
537 valueBuffer.WriteString(v[j])
539 logBuffer.WriteString(fmt.Sprintf("\t%s:%s", k, valueBuffer.String()))
541 conn.config.WriteLog(Debug, "%s\n", logBuffer.String())
544 // LoggerHTTPResp Print Response to http request
545 func (conn Conn) LoggerHTTPResp(req *http.Request, resp *http.Response) {
546 var logBuffer bytes.Buffer
547 logBuffer.WriteString(fmt.Sprintf("[Resp:%p]StatusCode:%d\t", req, resp.StatusCode))
548 logBuffer.WriteString(fmt.Sprintf("Header info:"))
549 for k, v := range resp.Header {
550 var valueBuffer bytes.Buffer
551 for j := 0; j < len(v); j++ {
553 valueBuffer.WriteString(" ")
555 valueBuffer.WriteString(v[j])
557 logBuffer.WriteString(fmt.Sprintf("\t%s:%s", k, valueBuffer.String()))
559 conn.config.WriteLog(Debug, "%s\n", logBuffer.String())
562 func calcMD5(body io.Reader, contentLen, md5Threshold int64) (reader io.Reader, b64 string, tempFile *os.File, err error) {
563 if contentLen == 0 || contentLen > md5Threshold {
564 // Huge body, use temporary file
565 tempFile, err = ioutil.TempFile(os.TempDir(), TempFilePrefix)
567 io.Copy(tempFile, body)
568 tempFile.Seek(0, os.SEEK_SET)
570 io.Copy(md5, tempFile)
572 b64 = base64.StdEncoding.EncodeToString(sum[:])
573 tempFile.Seek(0, os.SEEK_SET)
577 // Small body, use memory
578 buf, _ := ioutil.ReadAll(body)
580 b64 = base64.StdEncoding.EncodeToString(sum[:])
581 reader = bytes.NewReader(buf)
586 func readResponseBody(resp *http.Response) ([]byte, error) {
587 defer resp.Body.Close()
588 out, err := ioutil.ReadAll(resp.Body)
595 func serviceErrFromXML(body []byte, statusCode int, requestID string) (ServiceError, error) {
596 var storageErr ServiceError
598 if err := xml.Unmarshal(body, &storageErr); err != nil {
599 return storageErr, err
602 storageErr.StatusCode = statusCode
603 storageErr.RequestID = requestID
604 storageErr.RawMessage = string(body)
605 return storageErr, nil
608 func xmlUnmarshal(body io.Reader, v interface{}) error {
609 data, err := ioutil.ReadAll(body)
613 return xml.Unmarshal(data, v)
616 func jsonUnmarshal(body io.Reader, v interface{}) error {
617 data, err := ioutil.ReadAll(body)
621 return json.Unmarshal(data, v)
624 // timeoutConn handles HTTP timeout
625 type timeoutConn struct {
627 timeout time.Duration
628 longTimeout time.Duration
631 func newTimeoutConn(conn net.Conn, timeout time.Duration, longTimeout time.Duration) *timeoutConn {
632 conn.SetReadDeadline(time.Now().Add(longTimeout))
636 longTimeout: longTimeout,
640 func (c *timeoutConn) Read(b []byte) (n int, err error) {
641 c.SetReadDeadline(time.Now().Add(c.timeout))
642 n, err = c.conn.Read(b)
643 c.SetReadDeadline(time.Now().Add(c.longTimeout))
647 func (c *timeoutConn) Write(b []byte) (n int, err error) {
648 c.SetWriteDeadline(time.Now().Add(c.timeout))
649 n, err = c.conn.Write(b)
650 c.SetReadDeadline(time.Now().Add(c.longTimeout))
654 func (c *timeoutConn) Close() error {
655 return c.conn.Close()
658 func (c *timeoutConn) LocalAddr() net.Addr {
659 return c.conn.LocalAddr()
662 func (c *timeoutConn) RemoteAddr() net.Addr {
663 return c.conn.RemoteAddr()
666 func (c *timeoutConn) SetDeadline(t time.Time) error {
667 return c.conn.SetDeadline(t)
670 func (c *timeoutConn) SetReadDeadline(t time.Time) error {
671 return c.conn.SetReadDeadline(t)
674 func (c *timeoutConn) SetWriteDeadline(t time.Time) error {
675 return c.conn.SetWriteDeadline(t)
678 // UrlMaker builds URL and resource
685 type urlMaker struct {
686 Scheme string // HTTP or HTTPS
687 NetLoc string // Host or IP
688 Type int // 1 CNAME, 2 IP, 3 ALIYUN
689 IsProxy bool // Proxy
692 // Init parses endpoint
693 func (um *urlMaker) Init(endpoint string, isCname bool, isProxy bool) error {
694 if strings.HasPrefix(endpoint, "http://") {
696 um.NetLoc = endpoint[len("http://"):]
697 } else if strings.HasPrefix(endpoint, "https://") {
699 um.NetLoc = endpoint[len("https://"):]
705 //use url.Parse() to get real host
706 strUrl := um.Scheme + "://" + um.NetLoc
707 url, err := url.Parse(strUrl)
713 host, _, err := net.SplitHostPort(um.NetLoc)
716 if host[0] == '[' && host[len(host)-1] == ']' {
717 host = host[1 : len(host)-1]
721 ip := net.ParseIP(host)
725 um.Type = urlTypeCname
727 um.Type = urlTypeAliyun
735 func (um urlMaker) getURL(bucket, object, params string) *url.URL {
736 host, path := um.buildURL(bucket, object)
739 addr = fmt.Sprintf("%s://%s%s", um.Scheme, host, path)
741 addr = fmt.Sprintf("%s://%s%s?%s", um.Scheme, host, path, params)
743 uri, _ := url.ParseRequestURI(addr)
747 // getSignURL gets sign URL
748 func (um urlMaker) getSignURL(bucket, object, params string) string {
749 host, path := um.buildURL(bucket, object)
750 return fmt.Sprintf("%s://%s%s?%s", um.Scheme, host, path, params)
753 // getSignRtmpURL Build Sign Rtmp URL
754 func (um urlMaker) getSignRtmpURL(bucket, channelName, params string) string {
755 host, path := um.buildURL(bucket, "live")
757 channelName = url.QueryEscape(channelName)
758 channelName = strings.Replace(channelName, "+", "%20", -1)
760 return fmt.Sprintf("rtmp://%s%s/%s?%s", host, path, channelName, params)
763 // buildURL builds URL
764 func (um urlMaker) buildURL(bucket, object string) (string, string) {
768 object = url.QueryEscape(object)
769 object = strings.Replace(object, "+", "%20", -1)
771 if um.Type == urlTypeCname {
774 } else if um.Type == urlTypeIP {
780 path = fmt.Sprintf("/%s/%s", bucket, object)
787 host = bucket + "." + um.NetLoc