单进程串行执行的话会非常耗费时间。假设每条用例耗时2s,1000条就需要2000s $\approx$ 33min;还要加上用例加载、测试前/后置套件等耗时;导致测试执行效率会相对低。并行执行;这就是一种分布式场景来缩短测试用例的执行时间,提高效率。分布式执行用例的原则:
项目结构

测试脚本
# test1/test_1.py
import time
def test1_test1():
time.sleep(1)
assert 1 == 1, "1==1"
def test1_test2():
time.sleep(1)
assert 1 == 1, "1==1"
class TestDemo1:
def test_inner_1(self):
time.sleep(1)
assert 1 == 1, "1==1"
class TestDemo2:
def test_inner_2(self):
time.sleep(1)
assert 1 == 1, "1==1"
# test1/inner/test_3.py
import time
def test3_test1():
time.sleep(1)
assert 1 == 1, "1==1"
def test3_test2():
time.sleep(1)
assert 1 == 1, "1==1"
# test2/test_2.py
import time
def test2_test1():
time.sleep(1)
assert 1 == 1, "1==1"
def test2_test2():
time.sleep(1)
assert 1 == 1, "1==1"
# test2/inner/test_3.py
import time
def test4_test1():
time.sleep(1)
assert 1 == 1, "1==1"
def test4_test2():
time.sleep(1)
assert 1 == 1, "1==1"
正常执行:需要8.10s 
安装:
pip install pytest-xdist
多cpu并行执行用例,直接加个-n参数即可,后面num参数就是并行数量,比如num设置为3
pytest -v -n num
参数:
多进程并行执行:耗时2.66s大大的缩短了测试用例的执行时间。

xdist的分布式类似于一主多从的结构,master负责下发命令,控制slave;slave根据master的命令执行特定测试任务。
在xdist中,主是master,从是workers;xdist会产生一个或多个workers,workers都通过master来控制,每个worker相当于一个mini版pytest执行器 。
master不执行测试任务,只对worker收集到的所有用例进行分发;每个worker负责执行测试用例,然后将执行结果反馈给master;由master统计最终测试结果。
master在测试会话(test session)开始前产生一个或多个worker。
实际编译执行测试代码的worker可能是本地机器也可能是远程机器。
每个worker类似一个迷你型的pytest执行器。
worker会执行一个完整的test collection过程。【收集所有测试用例的过程】
然后把测试用例的ids返回给master。【ids表示收集到的测试用例路径】
master不执行任何测试用例。
注意:分布式测试(pytest-xdist)方式执行测试时不会输出测试用例中的print内容,因为master并不执行测试用例。
master接收到所有worker收集的测试用例集之后,master会进行一些完整性检查,以确保所有worker都收集到一样的测试用例集(包括顺序)。
如果检查通过,会将测试用例的ids列表转换成简单的索引列表,每个索引对应一个测试用例的在原来测试集中的位置。
这个方案可行的原因是:所有的节点都保存着相同的测试用例集。
并且使用这种方式可以节省带宽,因为master只需要告知workers需要执行的测试用例对应的索引,而不用告知完整的测试用例信息。
有以下四种分发策略:命令行参数 --dist=mode选项(默认load)
each:master将完整的测试索引列表分发到每个worker,即每个worker都会执行一遍所有的用例。

load:master将大约$\frac{1}{n}$的测试用例以轮询的方式分发到各个worker,剩余的测试用例则会等待worker执行完测试用例以后再分发;每个用例只会被其中一个worker执行一次。

loadfile:master分发用例的策略为按ids中的文件名(test_xx.py或xx_test.py)进行分发,即同一个测试文件中的测试用例只会分发给其中一个worker;具有一定的隔离性。

loadscope:master分发用例对策略为按作用域进行分发,同一个模块下的测试函数或某个测试类中的测试函数会分发给同一个worker来执行;即py文件中无测试类的话(只有测试function)将该模块分发给同一个worker执行,如果有测试类则会将该文件中的测试类只会分发给同一个worker执行,多个类可能分发给多个worker;目前无法自定义分组,按类 class 分组优先于按模块 module 分组。

注意:可以使用pytest_xdist_make_scheduler这个hook来实现自定义测试分发逻辑。
如:想按目录级别来分发测试用例:
from xdist.scheduler import LoadScopeScheduling
class CustomizeScheduler(LoadScopeScheduling):
def _split_scope(self, nodeid):
return nodeid.split("/", 1)[0]
def pytest_xdist_make_scheduler(config, log):
return CustomizeScheduler(config, log)
xdist.scheduler.LoadScopeScheduling并重写_split_scope方法pytest_xdist_make_schedulerpytest -v -n 4 --dist=loadfile

pytest_runtestloop:pytest的默认实现是循环执行所有在test_session这个对象里面收集到的测试用例。pytest_runtest_protocol。pytest_runtest_protocol(item, nextitem)hook的参数要求,为了将nextitem传给hook。pytest_runtest_protocol,因为这时nextitem参数已经可以确定。shutdown信号, 那么就将nextitem参数设为None, 然后执行 pytest_runtest_protocolshutdown信号给所有worker。pytest hooks比如:pytest_runtest_logstart、pytest_runtest_logreport 确保整个测试活动进行正常运作。注意:pytest-xdist 是让每个 worker 进程执行属于自己的测试用例集下的所有测试用例。这意味着在不同进程中,不同的测试用例可能会调用同一个 scope 范围级别较高(例如session)的 fixture,该 fixture 则会被执行多次,这不符合 scope=session 的预期。
pytest-xdist 没有内置的支持来确保会话范围的 fixture 仅执行一次,但是可以通过使用文件锁进行进程间通信来实现;让scope=session 的 fixture 在 test session 中仅执行一次。
示例:需要安装 filelock 包,安装命令pip install filelock
FileLock仅产生一次fixture数据。import pytest
import uuid
from filelock import FileLock
@pytest.fixture(scope="session")
def login(tmp_path_factory, worker_id):
# 代表是单机运行
if worker_id == "master":
token = uuid.uuid4()
print("fixture:请求登录接口,获取token", token)
os.environ['token'] = token
return token
# 分布式运行
# 获取所有子节点共享的临时目录,无需修改【不可删除、修改】
root_tmp_dir = tmp_path_factory.getbasetemp().parent
fn = root_tmp_dir / "data.json"
with FileLock(str(fn) + ".lock"):
if fn.is_file(): # 代表已经有进程执行过该fixture
token = json.loads(fn.read_text())
else: # 代表该fixture第一次被执行
token = uuid.uuid4()
fn.write_text(json.dumps(token))
# 最好将后续需要保留的数据存在某个地方,比如这里是os的环境变量
os.environ['token'] = token
return token
用于并行和并发测试的 pytest 插件
pip install pytest-parallel
--workers=n :多进程运行需要加此参数, n是进程数。默认为1
--tests-per-worker=n :多线程需要添加此参数,n是线程数
如果两个参数都配置了,就是进程并行;每个进程最多n个线程,总线程数:进程数*线程数
【注意】
在windows上进程数永远为1。
需要使用 if name == “main” :在命令行窗口运行测试用例会报错
示例:
import pytest
def test_01():
print('测试用例1操作')
def test_02():
print('测试用例2操作')
def test_03():
print('测试用例3操作')
def test_04():
print('测试用例4操作')
def test_05():
print('测试用例5操作')
def test_06():
print('测试用例6操作')
def test_07():
print('测试用例7操作')
def test_08():
print('测试用例8操作')
if __name__ == "__main__":
pytest.main(["-s", "test_b.py", '--workers=2', '--tests-per-worker=4'])
简而言之,pytest-xdist并行性pytest-parallel是并行性和并发性。
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
我在使用omniauth/openid时遇到了一些麻烦。在尝试进行身份验证时,我在日志中发现了这一点:OpenID::FetchingError:Errorfetchinghttps://www.google.com/accounts/o8/.well-known/host-meta?hd=profiles.google.com%2Fmy_username:undefinedmethod`io'fornil:NilClass重要的是undefinedmethodio'fornil:NilClass来自openid/fetchers.rb,在下面的代码片段中:moduleNetclass
我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当
我有一个围绕一些对象的包装类,我想将这些对象用作散列中的键。包装对象和解包装对象应映射到相同的键。一个简单的例子是这样的:classAattr_reader:xdefinitialize(inner)@inner=innerenddefx;@inner.x;enddef==(other)@inner.x==other.xendenda=A.new(o)#oisjustanyobjectthatallowso.xb=A.new(o)h={a=>5}ph[a]#5ph[b]#nil,shouldbe5ph[o]#nil,shouldbe5我试过==、===、eq?并散列所有无济于事。
在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',
我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere
我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是
我遵循了教程http://gettingstartedwithchef.com/,第1章。我的运行list是"run_list":["recipe[apt]","recipe[phpap]"]我的phpapRecipe默认Recipeinclude_recipe"apache2"include_recipe"build-essential"include_recipe"openssl"include_recipe"mysql::client"include_recipe"mysql::server"include_recipe"php"include_recipe"php::modul
Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/
我遵循MichaelHartl的“RubyonRails教程:学习Web开发”,并创建了检查用户名和电子邮件长度有效性的测试(名称最多50个字符,电子邮件最多255个字符)。test/helpers/application_helper_test.rb的内容是:require'test_helper'classApplicationHelperTest在运行bundleexecraketest时,所有测试都通过了,但我看到以下消息在最后被标记为错误:ERROR["test_full_title_helper",ApplicationHelperTest,1.820016791]test