grequests併發之小試牛刀

2021-10-14 11:06:40 字數 3148 閱讀 5860

題主為某**鏈金融公司的測試人員,系統有額度開單、鏈單轉讓、鏈單融資功能,這些功能點在使用者序列請求情況下均不會出現金額溢位(如額度 < 所有使用該額度開出的鏈單金額總和)。目前的測試需求是,測試使用者併發情況下,是否出現金額溢位。

python的requests模組只能實現使用者序列請求,那麼有沒有滿足我的測試需求的第三方模組呢?

在網路上搜尋一番,的確已有,就是grequests模組!傳為k神(kenneth reitz)所作,結合requests、gevent模組實現,python友対之讚譽有加。

使用python命令pip install grequests安裝:

pip install grequests
檢視已安裝的grequests模組:

required-by:根據已經嘗鮮的朋友介紹,我要解決的測試需求需要用到grequests.request、grequests.map方法,當然grequests也提供grequests.get、grequests.post等,大家可根據喜好使用。

使用前,先使用help檢視這2個函式的內功心法。可以得知,grequests.request和requests.request的修**法完全一致,而且是非同步的,請看:

help on function request in module grequests:

request(method, url,

**kwargs)

# synonym

grequests.map的修**法呢?請看:

help on function map in module grequests:

map(requests, stream=false, size=none, exception_handler=none, gtimeout=none)

concurrently converts a list of requests to responses.

:param requests: a collection of request objects.

:param stream: if true, the content will not be downloaded immediately.

:param size: specifies the number of requests to make at a time.

if none, no throttling occurs.

:param exception_handler: callback function

, called when exception occured. params: request, exception

:param gtimeout: gevent joinall timeout in seconds.

(note: unrelated to requests timeout)

先測試併發額度開單,**組織如下:

1.初始化乙個空的請求陣列;

2.for迴圈下,根據介面定義,使用同一額度並全額開單(額度全部使用完),使用grequests.request產生非同步請求物件(grequests.asyncrequest object),並加入到請求陣列;

3.使用grequests.map產生併發請求,並把所有響應物件賦值給變數result;

4.獲取result中所有響應成功的標誌(如success),預期響應成功的數目等於1,即只有乙個全額開單成功。

核心**如下:

import pytest

import grequests

class

testconcurrentoutput()

:"""併發開單測試"""

deftest_output_chains_concurrently

(self, user_login, get_host)

: host =

''url = host +

method =

'post'

params =

try:

# 步驟1:定義任務列表task

task =

list()

# 步驟2:構建請求,加入任務列表task

for k in

range(0

,3):

req = grequests.request(method, url,

**params)

# 步驟3:產生併發請求,並把所有響應物件賦值給result

result = grequests.

map(task)

# 列印響應內容

print

(result[0]

.json(

), result[1]

.json(

), result[2]

.json())

# 步驟4:收集所有響應的"是否成功",檢查所有響應中只有乙個成功

success_info =

[item.json()[

'success'

]for item in result]

assert success_info.count(

true)==

1except exception as e:

raise e

if __name__ ==

'__main__'

: pytest.main(

['-s'

,'--cache-clear'

, __file__]

)

下期再說,拜拜!

Python使用grequests併發傳送請求

參考博文 requests是python傳送介面請求非常好用的乙個三方庫,由k神編寫,簡單,方便上手快。但是requests傳送請求是序列的,即阻塞的。傳送完一條請求才能傳送另一條請求。為了提公升測試效率,一般我們需要並行傳送請求。這裡可以使用多執行緒,或者協程,gevent或者aiohttp,然而...

用GREQUESTS實現併發HTTP請求

起因 要用http請求探測服務的有效性,多程序,多執行緒,感覺似乎沒有必要,看看有沒有協程的方案 1.簡單用法 grequests 利用 requests和gevent庫,做了乙個簡單封裝,使用起來非常方便 import grequests import time import requests u...

spring boot admin 2 0小試牛刀

本文主要展示下spring boot admin 2.0版本的新特性 這裡配置admin server的位址 新版前端改用vue.js進行了重構,後端的話,使用event sourcing的原則進行了重構,支援spring5,移除了spring cloud starter依賴,另外使用webclie...