草庐IT

python - aiohttp.TCPConnector (with limit argument) vs asyncio.Semaphore 用于限制并发连接数

coder 2023-08-13 原文

我想我想学习新的 python async await 语法,更具体地说是 asyncio 模块,方法是制作一个允许您一次下载多个资源的简单脚本。

但现在我卡住了。

在研究过程中,我遇到了两个限制并发请求数量的选项:

  1. 将 aiohttp.TCPConnector(带有 limit 参数)传递给 aiohttp.ClientSession 或
  2. 使用 asyncio.Semaphore。

如果您只想限制并发连接数,是否有首选选项或者它们可以互换使用? 两者在性能方面(大致)相等吗?

而且两者似乎都有 100 个并发连接/操作的默认值。如果我只使用限制为 500 的信号量,aiohttp 内部会隐式地将我锁定为 100 个并发连接吗?

这对我来说都是非常新的和不清楚的。请随时指出我的任何误解或我的代码中的缺陷。

这是我目前包含两个选项的代码(我应该删除哪个?):

奖励问题:

  1. 我如何处理(最好重试 x 次)抛出错误的 coros?
  2. coro 完成后立即保存返回数据(通知我的 DataHandler)的最佳方法是什么?我不希望最后都保存下来,因为我可以尽快开始处理结果。

s

import asyncio
from tqdm import tqdm
import uvloop as uvloop
from aiohttp import ClientSession, TCPConnector, BasicAuth

# You can ignore this class
class DummyDataHandler(DataHandler):
    """Takes data and stores it somewhere"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def take(self, origin_url, data):
        return True

    def done(self):
        return None

class AsyncDownloader(object):
    def __init__(self, concurrent_connections=100, silent=False, data_handler=None, loop_policy=None):

        self.concurrent_connections = concurrent_connections
        self.silent = silent

        self.data_handler = data_handler or DummyDataHandler()

        self.sending_bar = None
        self.receiving_bar = None

        asyncio.set_event_loop_policy(loop_policy or uvloop.EventLoopPolicy())
        self.loop = asyncio.get_event_loop()
        self.semaphore = asyncio.Semaphore(concurrent_connections)

    async def fetch(self, session, url):
        # This is option 1: The semaphore, limiting the number of concurrent coros,
        # thereby limiting the number of concurrent requests.
        with (await self.semaphore):
            async with session.get(url) as response:
                # Bonus Question 1: What is the best way to retry a request that failed?
                resp_task = asyncio.ensure_future(response.read())
                self.sending_bar.update(1)
                resp = await resp_task

                await  response.release()
                if not self.silent:
                    self.receiving_bar.update(1)
                return resp

    async def batch_download(self, urls, auth=None):
        # This is option 2: Limiting the number of open connections directly via the TCPConnector
        conn = TCPConnector(limit=self.concurrent_connections, keepalive_timeout=60)
        async with ClientSession(connector=conn, auth=auth) as session:
            await asyncio.gather(*[asyncio.ensure_future(self.download_and_save(session, url)) for url in urls])

    async def download_and_save(self, session, url):
        content_task = asyncio.ensure_future(self.fetch(session, url))
        content = await content_task
        # Bonus Question 2: This is blocking, I know. Should this be wrapped in another coro
        # or should I use something like asyncio.as_completed in the download function?
        self.data_handler.take(origin_url=url, data=content)

    def download(self, urls, auth=None):
        if isinstance(auth, tuple):
            auth = BasicAuth(*auth)
        print('Running on concurrency level {}'.format(self.concurrent_connections))
        self.sending_bar = tqdm(urls, total=len(urls), desc='Sent    ', unit='requests')
        self.sending_bar.update(0)

        self.receiving_bar = tqdm(urls, total=len(urls), desc='Reveived', unit='requests')
        self.receiving_bar.update(0)

        tasks = self.batch_download(urls, auth)
        self.loop.run_until_complete(tasks)
        return self.data_handler.done()


### call like so ###

URL_PATTERN = 'https://www.example.com/{}.html'

def gen_url(lower=0, upper=None):
    for i in range(lower, upper):
        yield URL_PATTERN.format(i)   

ad = AsyncDownloader(concurrent_connections=30)
data = ad.download([g for g in gen_url(upper=1000)])

最佳答案

有首选方案吗?

是的,见下文:

aiohttp 内部会隐式地将我锁定为 100 个并发连接吗?

是的,默认值 100 会锁定您,除非您指定另一个限制。 您可以在此处的源代码中看到它:https://github.com/aio-libs/aiohttp/blob/master/aiohttp/connector.py#L1084

它们在性能方面(大致)相等吗?

否(但性能差异应该可以忽略不计),因为 aiohttp.TCPConnector 无论如何都会检查可用连接,无论它是否被信号量包围,在这里使用信号量都是不必要的开销。

我如何处理(最好重试 x 次)抛出错误的 coros?

我不认为有这样做的标准方法,但一种解决方案是将您的调用包装在这样的方法中:

async def retry_requests(...):
    for i in range(5):
        try:
            return (await session.get(...)
        except aiohttp.ClientResponseError:
            pass

关于python - aiohttp.TCPConnector (with limit argument) vs asyncio.Semaphore 用于限制并发连接数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45757393/

有关python - aiohttp.TCPConnector (with limit argument) vs asyncio.Semaphore 用于限制并发连接数的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  3. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  4. ruby - inverse_of 是否适用于 has_many? - 2

    当我使用has_one时,它​​工作得很好,但在has_many上却不行。在这里您可以看到object_id不同,因为它运行了另一个SQL来再次获取它。ruby-1.9.2-p290:001>e=Employee.create(name:'rafael',active:false)ruby-1.9.2-p290:002>b=Badge.create(number:1,employee:e)ruby-1.9.2-p290:003>a=Address.create(street:"123MarketSt",city:"SanDiego",employee:e)ruby-1.9.2-p290

  5. Python 相当于 Perl/Ruby ||= - 2

    这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。

  6. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  7. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  8. python - 如何读取 MIDI 文件、更改其乐器并将其写回? - 2

    我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的

  9. 「Python|Selenium|场景案例」如何定位iframe中的元素? - 2

    本文主要介绍在使用Selenium进行自动化测试或者任务时,对于使用了iframe的页面,如何定位iframe中的元素文章目录场景描述解决方案具体代码场景描述当我们在使用Selenium进行自动化测试的时候,可能会遇到一些界面或者窗体是使用HTML的iframe标签进行承载的。对于iframe中的标签,如果直接查找是无法找到的,会抛出没有找到元素的异常。比如近在咫尺的例子就是,CSDN的登录窗体就是使用的iframe,大家可以尝试通过F12开发者模式查看到的tag_name,class_name,id或者xpath来定位中的页面元素,会抛出NoSuchElementException异常。解决

  10. python ffmpeg 使用 pyav 转换 一组图像 到 视频 - 2

    2022/8/4更新支持加入水印水印必须包含透明图像,并且水印图像大小要等于原图像的大小pythonconvert_image_to_video.py-f30-mwatermark.pngim_dirout.mkv2022/6/21更新让命令行参数更加易用新的命令行使用方法pythonconvert_image_to_video.py-f30im_dirout.mkvFFMPEG命令行转换一组JPG图像到视频时,是将这组图像视为MJPG流。我需要转换一组PNG图像到视频,FFMPEG就不认了。pyav内置了ffmpeg库,不需要系统带有ffmpeg工具因此我使用ffmpeg的python包装p

随机推荐