python tornado 專案框架抽離

2021-09-30 00:00:47 字數 4702 閱讀 5431

1,主函式入口

#!../venv/bin/python3

])# 全域性變數字典

tornado.options.define('global_dict', type=dict, default={}, multiple=true)

tornado.options.parse_command_line()

server.bind(8888)

server.start(0)

tornado.ioloop.ioloop.current().start()

2.構建返回體及驗證介面
# -*- coding: utf-8 -*-

# 設定返回體和跨域請求

rsp_data = ,

'code': 0,

'message': 'success',

"requestid": str(uuid.uuid4())

}# 引數獲取

try:

if self.request.headers['content-type'].startswith("multipart/form-data"):

print('request header%s', str(str(self.request.headers).split('\n')))

params = json.loads(self.get_argument("params"))

inte***ce_name = self.get_argument('action')

else:

req_data = tornado.escape.json_decode(self.request.body)

# route to the corresponding handler by action name

print('request header%s body[%s]', str(str(self.request.headers).split('\n')), str(req_data))

inte***ce_name = req_data['action']

params = req_data['params']

except exception:

print('request format error: %s', traceback.format_exc())

self._rsp_data(rsp_data, 10005, 'invalid request body')

self.write(tornado.escape.json_encode(rsp_data))

return

# 許可權驗證

try:

# 如果介面不存在,直接返回

if inte***ce_name not in inte***ce_post_all.inte***ce_route.keys():

rsp_data['code'] = 10004

rsp_data['message'] = 'invalid inte***ce name'

self.write(tornado.escape.json_encode(rsp_data))

return

except jwt.exceptions.expiredsignature as e:

self._rsp_data(rsp_data, 4001, str(e))

self.write(tornado.escape.json_encode(rsp_data))

return

except jwt.exceptions.pyjwterror as e:

self._rsp_data(rsp_data, 4002, str(e))

self.write(tornado.escape.json_encode(rsp_data))

return

except exception as e:

print('unknown error occur when invoke api[%s], error type[%s], error: %s',

inte***ce_name,

str(type(e)),

traceback.format_exc())

self._rsp_data(rsp_data, 4004, str(e))

self.write(tornado.escape.json_encode(rsp_data))

return

# action對映

try:

action_handler = inte***ce_post_all.inte***ce_route_all[inte***ce_name](params)

data = action_handler.process()

rsp_data['data'] = data

except exception as e:

print('unknown error occur when invoke api[%s], error type[%s], error: %s',

inte***ce_name,

str(type(e)),

traceback.format_exc())

rsp_data['code'] = 10001

rsp_data['message'] = 'unknown error'

finally:

# lists not accepted for security reasons;

# rsp_data must be dict or unicode type or bytes

print('response data[%s]', tornado.escape.json_encode(rsp_data))

rsp_data = tornado.escape.json_encode(rsp_data)

self.write(rsp_data)

def _rsp_data(self, rsp_data, code, message, data=none):

rsp_data['code'] = code

rsp_data['message'] = message

if data:

rsp_data['data'] = data

print('response data[%s]', tornado.escape.json_encode(rsp_data))

def options(self, *args, **kwargs):

print('option method occur')

3.介面集合
class addsoftwaredefinition(object):

def __init__(self, params):

pass

def process(self):

return

inte***ce_route =

inte***ce_route_all = dict(inte***ce_route)

上述3個py檔案放在乙個資料夾裡面即可執行,下面是請求體。自己可以定義介面,,來體驗下。
post請求

url:127.0.0.1:8888/wf/price

請求體:

}返回結果:

, "requestid": "cbda039a-250c-4859-a799-48bb4a4a72b5",

"version": "1",

"message": "success",

"componenttime": "tcequotedprice"

}

python tornado 框架使用 (1)

1 日誌系統 common logging.py usr bin env python coding utf 8 import logging import logging.config import os from unipath import path logging.config.fileco...

python tornado非同步效能測試

測試兩個介面 coding utf 8 import time import tornado.web import tornado.gen import tornado.ioloop from tornado.concurrent import run on executor from concur...

python tornado實現簡單的檔案上傳功能

在web應用開發的功能中,檔案的上傳是經常會使用到的功能,本文就是利用python tornado框架,實現了乙個簡單的檔案上傳功能 tornado.httputil.httpfile物件三個屬性 1.filename檔名 2.body檔案內部實際內容 3.type檔案的型別 defget self...