因为自学 python 工作中会经常用到淘宝Api调用数据 一直以来后台下载的淘宝Api 都是2.7版本
还是12年 lihao同学编写,一直没有升级
用Python 自带的2to3脚本工具升级后 大部分接口 调用正常
但是上传图片接口 一直提示错误
由于是初学 只能网上找资料了 找了很多资料 都没解决
最后通过国外友人的一篇博客 找到了解决方法
在这里分享一下 希望能帮助其他人 只要替换 base.py 代码就可以 初步测试调用接口 都能成功
# -*- coding: utf-8 -*- """ Created on 2012-7-3 @author: lihao """ try: import httplib except ImportError: import http.client as httplib import urllib import time import hashlib import json import io import top import sys import itertools import mimetypes from urllib.parse import urlencode import codecs ''' 定义一些系统变量 ''' SYSTEM_GENERATE_VERSION = "taobao-sdk-python-20151217" P_APPKEY = "app_key" P_API = "method" P_SESSION = "session" P_ACCESS_TOKEN = "access_token" P_VERSION = "v" P_FORMAT = "format" P_TIMESTAMP = "timestamp" P_SIGN = "sign" P_SIGN_METHOD = "sign_method" P_PARTNER_ID = "partner_id" P_CODE = 'code' P_SUB_CODE = 'sub_code' P_MSG = 'msg' P_SUB_MSG = 'sub_msg' N_REST = '/router/rest' writer = codecs.lookup('utf-8')[3] def sign(secret, parameters): # =========================================================================== # '''签名方法 # @param secret: 签名需要的密钥 # @param parameters: 支持字典和string两种 # ''' # =========================================================================== # 如果parameters 是字典类的话 if hasattr(parameters, "items"): # keys = parameters.keys() keys = list(parameters.keys()) # sudoz: Py3 keys.sort() parameters = "%s%s%s" % (secret, str().join( '%s%s' % (key, parameters[key]) for key in keys), secret) # sign = hashlib.md5(parameters).hexdigest().upper() sign = hashlib.md5(parameters.encode('utf8')).hexdigest().upper() # sudoz: Py3 print(sign) return sign def mixStr(pstr): if (isinstance(pstr, str)): return pstr # elif(isinstance(pstr, unicode)): elif (isinstance(pstr, bytes)): # sudoz: Py3 return ascii(pstr) else: return str(pstr) class FileItem(object): def __init__(self, filename=None, content=None): self.filename = filename self.content = content class MultiPartForm(object): """Accumulate the data to be used when posting a form.""" def __init__(self): self.form_fields = [] self.files = [] self.boundary = "PYTHON_SDK_BOUNDARY" return def get_content_type(self): return 'multipart/form-data; boundary=%s' % self.boundary def add_field(self, name, value): """Add a simple field to the form data.""" self.form_fields.append((name, str(value))) return def add_file(self, fieldname, filename, fileHandle, mimetype=None): """Add a file to be uploaded.""" if mimetype is None: mimetype = mimetypes.guess_type(filename)[ 0] or 'application/octet-stream' self.files.append((fieldname, filename,fileHandle, mimetype)) return # =================================================================================== @classmethod def u(cls, s): if sys.hexversion < 0x03000000 and isinstance(s, str): s = s.decode('utf-8') if sys.hexversion >= 0x03000000 and isinstance(s, bytes): s = s.decode('utf-8') return s def iter(self, fields, files): """ fields is a sequence of (name, value) elements for regular form fields. files is a sequence of (name, filename, file-type) elements for data to be uploaded as files Yield body's chunk as bytes """ encoder = codecs.getencoder('utf-8') for (key, value) in fields: key = self.u(key) yield encoder('--{}\r\n'.format(self.boundary)) yield encoder(self.u('Content-Disposition: form-data; name="{}"\r\n').format(key)) yield encoder('\r\n') if isinstance(value, int) or isinstance(value, float): value = str(value) yield encoder(self.u(value)) yield encoder('\r\n') for (key, filename, fileHander,content_type) in files: key = self.u(key) filename = self.u(filename) yield encoder('--{}\r\n'.format(self.boundary)) yield encoder(self.u('Content-Disposition: form-data; name="{}"; filename="{}"\r\n').format(key, filename)) print(content_type) yield encoder('Content-Type: {}\r\n'.format(content_type)) yield encoder('\r\n') buff = fileHander.read() yield (buff, len(buff)) yield encoder('\r\n') yield encoder('--{}--\r\n'.format(self.boundary)) #================================================================================= def __bytes__(self): body = io.BytesIO() for chunk, chunk_len in self.iter(self.form_fields, self.files): body.write(chunk) return body.getvalue() class TopException(Exception): # =========================================================================== # 业务异常类 # =========================================================================== def __init__(self): self.errorcode = None self.message = None self.subcode = None self.submsg = None self.application_host = None self.service_host = None def __str__(self, *args, **kwargs): sb = "errorcode=" + mixStr(self.errorcode) + \ " message=" + mixStr(self.message) + \ " subcode=" + mixStr(self.subcode) + \ " submsg=" + mixStr(self.submsg) + \ " application_host=" + mixStr(self.application_host) + \ " service_host=" + mixStr(self.service_host) return sb class RequestException(Exception): # =========================================================================== # 请求连接异常类 # =========================================================================== pass class RestApi(object): # =========================================================================== # Rest api的基类 # =========================================================================== def __init__(self, domain='gw.api.taobao.com', port=80): # ======================================================================= # 初始化基类 # Args @param domain: 请求的域名或者ip # @param port: 请求的端口 # ======================================================================= self.__domain = domain self.__port = port self.__httpmethod = "POST" if (top.getDefaultAppInfo()): self.__app_key = top.getDefaultAppInfo().appkey self.__secret = top.getDefaultAppInfo().secret def get_request_header(self): return { 'Content-type': 'application/x-www-form-urlencoded;charset=UTF-8', "Cache-Control": "no-cache", "Connection": "Keep-Alive", } def set_app_info(self, appinfo): # ======================================================================= # 设置请求的app信息 # @param appinfo: import top # appinfo top.appinfo(appkey,secret) # ======================================================================= self.__app_key = appinfo.appkey self.__secret = appinfo.secret def getapiname(self): return "" def getMultipartParas(self): return [] def getTranslateParas(self): return {} def _check_requst(self): pass def getResponse(self, authrize=None, timeout=30): # ======================================================================= # 获取response结果 # ======================================================================= # connection = httplib.HTTPConnection(self.__domain, self.__port, False, # timeout) connection = httplib.HTTPConnection(self.__domain, self.__port, timeout) # sudoz: Py3 sys_parameters = { P_FORMAT: 'json', P_APPKEY: self.__app_key, P_SIGN_METHOD: "md5", P_VERSION: '2.0', # P_TIMESTAMP: str(long(time.time() * 1000)), P_TIMESTAMP: str(int(time.time() * 1000)), # sudoz: Py3 P_PARTNER_ID: SYSTEM_GENERATE_VERSION, P_API: self.getapiname(), } if authrize is not None: sys_parameters[P_SESSION] = authrize application_parameter = self.getApplicationParameters() sign_parameter = sys_parameters.copy() sign_parameter.update(application_parameter) sys_parameters[P_SIGN] = sign(self.__secret, sign_parameter) connection.connect() header = self.get_request_header() if self.getMultipartParas(): form = MultiPartForm() for key, value in application_parameter.items(): form.add_field(key, value) for key in self.getMultipartParas(): fileitem = getattr(self, key) if fileitem and isinstance(fileitem, FileItem): form.add_file(key, fileitem.filename, fileitem.content) #传入二进制信息 body =bytes(form) header['Content-type'] = form.get_content_type() else: # body = urllib.urlencode(application_parameter) body = urllib.parse.urlencode(application_parameter) # sudoz: Py3 # url = N_REST + "?" + urllib.urlencode(sys_parameters) url = N_REST + "?" + urllib.parse.urlencode(sys_parameters) # sudoz: Py3 connection.request(self.__httpmethod, url, body=body, headers=header) print(connection.host) response = connection.getresponse() if response.status is not 200: raise RequestException('invalid http status ' + str( response.status) + ',detail body:' + response.read()) # result = response.read() result = response.read().decode('utf-8') # sudoz: Py3里JSON只接收unicode jsonobj = json.loads(result) return jsonobj def getApplicationParameters(self): application_parameter = {} # for key, value in self.__dict__.iteritems(): for key, value in self.__dict__.items(): if not key.startswith( "__") and not key in self.getMultipartParas() and not key.startswith( "_RestApi__") and value is not None: if (key.startswith("_")): application_parameter[key[1:]] = value else: application_parameter[key] = value # 查询翻译字典来规避一些关键字属性 translate_parameter = self.getTranslateParas() # for key, value in application_parameter.iteritems(): for key, value in application_parameter.items(): # sudoz: Py3 if key in translate_parameter: application_parameter[translate_parameter[key]] = \ application_parameter[key] del application_parameter[key] return application_parameter
AI 代码解读