草庐IT

python - 在python中使用 'multiprocessing'包的最佳实践

coder 2023-08-18 原文

我正在尝试使用 multiprocessing模块在 python .我有以下示例代码,它在 ipython 笔记本中执行时没有任何错误。但是我看到每次执行笔记本中的代码块时,都会在后台产生额外的 python 进程。

import multiprocessing as mp

def f(x):
    print "Hello World ", mp.current_process()
    return 1

pool = mp.Pool(3)

data = range(0,10)
pool.map(f, data)

而当我将相同的内容保存在普通的 .py 文件中并执行时,我遇到了错误并且必须终止终端以停止执行程序。

我通过 if __name__ == '__main__': 纠正了这个问题以及在此之下创建池并使用 pool.close()关闭游泳池。

我很想知道在使用 multiprocessing 时应该遵循哪些最佳实践以及相关功能,如 map , apply , apply_async等等?我计划使用此模块并行读取文件,并希望将其应用于少数 ML 算法以加快进程。

最佳答案

概述、架构和一些实用技巧

根据我自己(也是有限的)经验,我可以分享以下有关多处理如何工作以及如何使用它的见解。我没有发现 python.org 手册的描述性或图形性很强,所以我阅读了代码。对于每个有相同印象的人......这是我迄今为止可以弥补的:

一般良好/最佳实践提示

  • 一般实现方法:
  • 减少数据大小的测试驱动:如果崩溃或计算
  • ,您不想怀疑几分钟
  • 逐步和时间分析:
  • 首先,在没有多处理的情况下实现和调试
  • 接下来,在没有多个进程的情况下实现和调试单进程,分析时间和比较开销
  • 接下来,增加进程编号和配置文件时间以识别任何 GIL 问题和等待时间。
  • 简单的 Process es 或它们的列表对于针对少数函数运行一对一功能 2-process 很有用。
  • Pool s 处理可批处理工作负载(高级任务/命令)在一组 Process es(进程池)之间的分布。
  • Pool 用于处理器绑定(bind)(具有可批处理输入/输出的高处理器负载)和 pool.ThreadPool 用于 IO 绑定(bind)(具有单独输入/输出的低处理器负载)任务。
  • 对于 Process es、Pool s、Thread s 和 ThreadPool s 之间的数据传输,使用 queues.Queue 和子类(如果结果顺序很重要)或 Pipe s 与 PipeConnection s 的 1 对 1 映射。
  • 共享不同类型的变量( BaseProxy , Namespace s, Queue s, Pool s 或用于设置同步对象,如 Barrier/Lock/RLock/Sempaphore/Condition s, Manager s, GIL s 或 Manager s 在不同的进程之间使用 x7108 类
  • 如果无法避免 GIL s,请使用 Pipe 处理它们,并尝试将密集计算过程与 Queue 相关计算(例如复杂数据结构中的解析等)分开,然后与 Pool s 或共享 Pool s 连接。
  • 使用多个 Pool() 可以为不同的任务分配不同数量的进程。否则只需使用多个映射或应用方法调用实现一个 Pool.(star)map_async()
  • 建立在彼此中间结果的顺序并行计算任务可以使用单个 Pool.(star)map() 和多个 ApplyResult()ApplyResult().ready()/.wait()/.get()/.successful() 进行计算。为了使任务彼此同步,映射函数返回的 import multiprocessing 实例及其方法 _current_process = MainProcess() 是正确的选择。

  • 架构和流程
  • BaseProcess 运行时, target 被初始化,它是 args 的一个子类,但没有 kwargs_paraent_pidProcessmultiprocessing ,基本上是所有其他 pool.ThreadPool 的句柄对象,这些 Pool 已经在 .o 内核中运行。
  • Pool._task_handler 是 Pool 的模拟 API,它可能也共享类似的架构
  • Pool._worker_handler 基于 3 个守护线程 Pool._result_handlerqueue.Queue()Pool._taskqueue ,它们与 1 个内部 SimpleQueue Pool._inqueue 和 2 个内部 Pool._outqueue Pool._cacheApplyResults 连接。
  • Pool.appy_async()/_map_async() 是一个字典,其中包含来自所有 ApplyResults._job 和 submethod 调用的 job_counter() 和子类实例,以及来自 key 的全局 ApplyResult 作为 Pool
  • Pool._cachePool.apply_async()/._map_async() 的子类可以在 Pool.map() 和作为 Pool.map_async() 和子方法的返回中找到。
  • Pool.map() == Pool.map_async().get()ApplyResult() 之间的区别在于 Queue 强制/锁定主进程以等待所有结果被计算并存储在返回对象 SimpleQueues in 中。
  • Pool.taskqueue/Pool.apply_async()/.map_async() 池`:
  • Pool._task_handler :管道 Pool._inqueue/etc 的高级作业。从 apply-method 到 Pool._task_handler 的任务批次。
  • Pool._pool.Process(target=worker, ...) :将作业作为批处理?迭代器?从 Pool._outqueuePool._pool.Process(target=worker, ...)
  • Pool._worker_handler :将结果从 Pool._result_handler (由 _set() 初始化)传送到 ApplyResultPool._cache[self._job] 再次将它们放入缓存在 ApplyResult 中的 func 中。
  • 如果目标 ApplyResult() 有返回对象,
  • queues.JoinableQueue 会将结果保存为列表。否则 queues.Queue 只是同步方法的句柄,即结果状态调用方法。
  • 为了连接进程和线程,按以下顺序提供了从高级到简单功能的 4 个类: SimpleQueuePipePipeConnectionPipe/PipeConnection
    ojit_code 只是一个返回 2 个实际 ojit_code 类实例的方法。

  • 一些代码示例

    import logging
    import multiprocessing as mp
    import random
    import time
    import numpy as np
    from copy import deepcopy
    
    MODEL_INPUTS = ["input_ids", "mc_token_ids", "lm_labels", "mc_labels", "token_type_ids"]
    
    mp.log_to_stderr(level=logging.INFO)  # mp.log_to_strerr(level=logging.DEBUG)
    logger = mp.get_logger()
    logger.setLevel(level=logging.INFO)  # mp.setLevel(level=logging.DEBUG)
    
    
    def secs2hms(seconds, num_decimals=4):
        hms_time = [*(*divmod(divmod(int(seconds), 60)[0], 60), divmod(int(seconds), 60)[1])]
        if hasattr(seconds, '__round__'):
            hms_time[-1] += seconds.__round__(num_decimals) - int(seconds)
        return hms_time
    
    
    class Timer():
        def __init__(self, time_name, log_method=print, time_format='hms', hms_decimals=4):
            self.time_name = time_name
            self.output_method = get_log_method(method_name=log_method_name)
            self.time_format = time_format
            self.hms_decimals = hms_decimals
            self.start_time = time.time()
    
        def start(self):
            raise RuntimeError('Timer was already started at initialization.')
    
        def stop(self, *args):
            seconds_time = time.time() - self.start_time
            time_name = self.time_name.format(*args)
            if self.time_format == 'hms':
                hms_time = secs2hms(seconds=seconds_time, num_decimals=self.hms_decimals)
                hms_time = ' '.join([text.format(dt) for dt, text in zip(hms_time, ['{}h', '{}min', '{}sec']) if dt > 0])
                self.output_method('{} = {}'.format(time_name, hms_time))
            else:
                self.output_method('{} = {}sec'.format(time_name, seconds_time))
            self._delete_timer()
    
        def _delete_timer(self):
            del self
    
    
    def get_log_method(method_name):
        if method_name == 'debug':
            log_method = logger.debug
        elif method_name == 'info':
            log_method = logger.info
        else:
            log_method = print
        return log_method
    
    
    def _generate_random_array(shape):
        return np.array([[[random.randint(0, 1000)
                           for _ in range(shape[2])]
                          for _ in range(shape[1])]
                         for _ in range(shape[0])])
    
    
    def random_piped_array(shape, pipe_in, log_method_name='print', log_name='RANDOM'):
        log_method = get_log_method(method_name=log_method_name)
        array = _generate_random_array(shape=shape)
        log_method('{}: sending `array   through `pipe_in`'.format(log_name))
        pipe_in.send(array)
    
    
    def random_array(shape, log_method_name='print', log_name='RANDOM'):
        log_method = get_log_method(method_name=log_method_name)
        assert len(shape) == 3
        array = _generate_random_array(shape=shape)
        log_method('{}: append `array` to `shared_array`'.format(log_name))
        # for dataset_name in ['train', 'valid']:
        #     shared_arrays[dataset_name].append(array)
        return array
    
    
    def random_shared_array(shape, shared_arrays, log_method_name='print', log_name='SHARED_RANDOM'):
        log_method = get_log_method(method_name=log_method_name)
        assert len(shape) == 3
        array = _generate_random_array(shape=shape)
        log_method('{}: append `array` to `shared_array`'.format(log_name))
        shared_arrays.append(array)
    
    
    def random_nested_array(shape, nested_shared_arrays, dataset_name, log_method_name='print', log_name='NESTED_RANDOM'):
        log_method = get_log_method(method_name=log_method_name)
        log_method('{}: appending array to shared_arrays[\'{}\']'.format(log_name, dataset_name))
        assert len(shape) == 3
        array = _generate_random_array(shape=shape)
        log_method('{}: appendind `array` to `shared_array` with currently len(nested_shared_array[\'{}\']) = {}'.format(
            log_name, dataset_name, len(nested_shared_arrays[dataset_name])))
        nested_shared_arrays[dataset_name].append(array)
    
    
    def nested_dict_list_deepcopy(nested_shared_arrays):
        """No hierachical switching between mp.manager.BaseProxy and unshared elements"""
        nested_unshared_arrays = dict()
        for key, shared_list in nested_shared_arrays.items():
            nested_unshared_arrays[key] = deepcopy(shared_list)
        return nested_unshared_arrays
    
    
    def log_arrays_state(arrays, log_method_name='print', log_name='ARRAY_STATE'):
        log_method = get_log_method(method_name=log_method_name)
        log_method('ARRAY_STATE: type(arrays) = {}'.format(type(arrays)))
        try:
            if hasattr(arrays, '__len__'):
                log_method('{}: len(arrays) = {}'.format(log_name, len(arrays)))
                if len(arrays) < 20:
                    for idx, array in enumerate(arrays):
                        log_method('{}: type(arrays[{}]) = {}'.format(log_name, idx, type(array)))
                        if hasattr(array, 'shape'):
                            log_method('{}: arrays[{}].shape = {}'.format(log_name, idx, array.shape))
                        else:
                            log_method('{}: arrays[{}] has not `shape` attribute'.format(log_name, idx))
            else:
                log_method('{}: array has no `__len__` method'.format(log_name))
    
        except BrokenPipeError as error_msg:
            log_method('{}: BrokenPipeError: {}'.format(log_name, error_msg))
    
    
    def log_nested_arrays_state(nested_arrays, log_method_name='print', log_name='NESTED_ARRAY_STATE'):
        log_method = get_log_method(method_name=log_method_name)
        log_method('{}: type(arrays) = {}'.format(log_name, type(nested_arrays)))
        for key, arrays in nested_arrays.items():
            log_arrays_state(arrays=arrays, log_name=log_name + '_' + key.upper(), log_method_name=log_method_name)
    
    
    if __name__ == '__main__':
        log_method = logger.info
        # log_method cannot be pickled in map_async, therefore an extra log_method_name string is implemented to hand
        # through
        log_method_name = 'info'
        num_samples = 100
        num_processes = 1  # len(MODEL_INPUTS)  #
        array_shapes = [(num_samples, random.randint(2, 5), random.randint(100, 300)) for _ in range(len(MODEL_INPUTS))]
    
    
        def stdout_some_newlines(num_lines=2, sleep_time=1):
            print(''.join(num_lines * ['\n']))
            time.sleep(sleep_time)
    
        # Pool with results from `func` with `return` received from `AsyncResult`(=`ApplyResult`)
        # `AsyncResult` also used for process synchronization, e.g. waiting for processes to finish
        log_method('MAIN: setting up `Pool.map_async` with `return`ing `func`')
        async_return_timer = Timer(time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
                                   log_method=log_method)
        # Pool with variable return
        setup_pool_timer = Timer(time_name='TIMER_SETUP: time to set up pool with {} processes'.format(num_processes),
                                 log_method=log_method)
        with mp.Pool(processes=num_processes) as pool:
            setup_pool_timer.stop()
            arrays = pool.starmap_async(func=random_array, iterable=[(shape, log_method_name) for shape in array_shapes])
            getted_arrays = arrays.get()
            async_return_timer.stop()
            # Logging array state inside the `pool` context manager
            log_method('MAIN: arrays from `pool.map_async() return` with in the `pool`\'s context manager:')
            log_arrays_state(arrays=arrays, log_method_name=log_method_name)
            log_method('MAIN: arrays.get() from `pool.map_async() return` with in the `pool`\'s context manager:')
            log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
        # Logging array state outside the `pool` context manager
        log_method('MAIN: arrays from `pool.map_async() return` outside the `pool`\'s context manager:')
        log_arrays_state(arrays=arrays, log_method_name=log_method_name)
        log_method('MAIN: arrays.get() from `pool.map_async() return` outside the `pool`\'s context manager:')
        log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
        del pool, arrays, getted_arrays
        stdout_some_newlines()
    
    
        # Functionality of `np.Process().is_alive()
        log_method('IS_ALIVE: testing funcktionality of flag `mp.Process().is_alive()` w.r.t. process status')
        p = mp.Process(target=lambda x: x ** 2, args=(10,))
        log_method('IS_ALIVE: after intializing, before starting: {}'.format(p.is_alive()))
        p.start()
        log_method('IS_ALIVE: after starting, before joining: p.is_alive() = {}'.format(p.is_alive()))
        time.sleep(5)
        log_method('IS_ALIVE: after sleeping 5sec, before joining: p.is_alive() = {}'.format(p.is_alive()))
        p.join()
        log_method('IS_ALIVE: after joining: p.is_alive() = {}'.format(p.is_alive()))
        p.terminate()
        del p
        stdout_some_newlines()
    
        # Pool with `func` `return`ing results directly to the reuslt handler from `mp.Pool().starmap_async()` of type
        # `AsyncResults()`
        log_method(
            'MAIN: Pool.map() is not tested explicitly because is equivalent to `Pool.map() == Pool.map_async().get()')
        stdout_some_newlines()
    
    
        # Pool with results assigned to shared variable & `AsyncResult` only used for process synchronization but
        # not for result receiving
    
        log_method(
            'MAIN: setting up Manager(), Manager.list() as shared variable and Pool.starmap_async with results from shared '
            'variable')
        async_shared_timer = Timer(
            time_name='TIMER_POOL_SHARED: time for random array with {} processes'.format(num_processes),
            log_method=log_method)
        setup_shared_variable_timer = Timer(time_name='TIMEE_INIT: time to set up shared variable', log_method=log_method)
        with mp.Manager() as sync_manager:
            shared_arrays = sync_manager.list()
            setup_shared_variable_timer.stop()
            async_return_timer = Timer(
                time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
                log_method=log_method)
            setup_pool_timer = Timer(
                time_name='TIMER_POOL_INIT: time to set up pool with {} processes'.format(num_processes),
                log_method=log_method)
            with mp.Pool(processes=num_processes) as pool:
                setup_pool_timer.stop()
                async_result = pool.starmap_async(
                    func=random_shared_array,
                    iterable=[(shape, shared_arrays, log_method_name) for shape in array_shapes])
                log_method('MAIN: async_result.ready() befor async.wait() = {}'.format(async_result.ready()))
                async_result.wait()
                log_method('MAIN: async_result.ready() after async.wait() = {}'.format(async_result.ready()))
                log_method('MAIN: asyn_result.sucessful() after async.wait() = {}'.format(async_result.successful()))
                async_return_timer.stop()
    
                copy_timer = Timer('TIMER_COPY: time to copy shared_arrays to standard arrays', log_method=log_method)
                unshared_arrays = deepcopy(shared_arrays)
                copy_timer.stop()
                async_shared_timer.stop()
                log_method('MAIN: shared_arrays from `pool.map_async()` within `sync_manager` context manager:')
                log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
                log_method(
                    'MAIN: unshared_arrays = deepcopy(shared_arrays) from `pool.map_async()` within `sync_manager`\'s '
                    'context manager:')
                log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
    
        log_method('MAIN: shared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
        log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
        log_method('MAIN: unshared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
        log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
        del sync_manager, shared_arrays, async_result, pool, unshared_arrays
        stdout_some_newlines()
    
        # Same as above just with pipe instead of `shared_arrays`
        log_method('MAIN: separate process outputting to `mp.Pipe()`')
        process_pipe_timer = Timer(time_name='TIMER_PIPE: time for `random_pipe_array` outputting through a `mp.Pipe()')
        arrays = list()
        pipe_in, pipe_out = mp.Pipe()
        # initialize processes
        processes = [mp.Process(target=random_piped_array, args=(shape, pipe_in, log_method_name)) for shape in
                     array_shapes]
        # Start processes
        for process in processes:
            process.start()
        # Collect piped arrays form pipe and append them to `arrays`
        while (any([process.is_alive() for process in processes]) or pipe_out.poll()) and len(arrays) < len(MODEL_INPUTS):
            log_method(
                'RANDOM: receiving arrays through pipe and appending to arrays with currently len(arrays) = {}'.format(
                    len(arrays)))
            arrays.append(pipe_out.recv())
        # join processes
        for process in processes:
            process.join()
        process_pipe_timer.stop()
        log_arrays_state(arrays=arrays, log_method_name=log_method_name)
        pipe_in.close()
        pipe_out.close()
        del arrays, pipe_in, pipe_out, processes, process
        stdout_some_newlines()
    
        # Nested shared dict/list/arrays
        log_method('MAIN: `random_nested_arrays` with nested shared `mp.Manager().dict()` and `mp.Manager().list()`s')
        nested_timer = Timer(time_name='TIMER_NESTED: time for `random_nested_arrays()`')
        with mp.Manager() as sync_manager:
            nested_shared_arrays = sync_manager.dict()
            nested_shared_arrays['train'] = sync_manager.list()
            nested_shared_arrays['valid'] = sync_manager.list()
            with mp.Pool(processes=num_processes) as pool:
                nested_results = pool.starmap_async(func=random_nested_array,
                                                    iterable=[(shape, nested_shared_arrays, dataset_name, log_method_name)
                                                              for dataset_name in nested_shared_arrays.keys()
                                                              for shape in array_shapes])
                nested_results.wait()
                unshared_nested_arrays = nested_dict_list_deepcopy(nested_shared_arrays)
                nested_timer.stop()
        log_nested_arrays_state(nested_arrays=unshared_nested_arrays, log_method_name=log_method_name)
        del sync_manager, nested_shared_arrays, pool, nested_results, unshared_nested_arrays
        stdout_some_newlines()
    
        # List of processes targeted directly to their functions one by one
        log_method(
            'MAIN: separate process outputting to shared `mp.Manager.list()` with process handles maintained in list()')
        log_method('MAIN: separate process implementations are only preferred over pools for 1-to-1=processes-to-tasks'
                   ' relations or asynchronous single tasks calculations.')
        processes_timer = Timer(
            time_name='TIMER_PROCESS: time for `random_shared_arrays` with separate {} processes'.format(num_processes),
            log_method=log_method)
        with mp.Manager() as sync_manager:
            shared_arrays = sync_manager.list()
            # Initialize processes
            processes = [mp.Process(target=random_shared_array, args=(shape, shared_arrays, log_method_name))
                         for shape in array_shapes]
            # Start processes
            for process in processes:
                process.start()
            processes_timer.stop()
            # Join processes = wait for processes to finish
            for process in processes:
                process.join()
            unshared_process_arrays = deepcopy(shared_arrays)
            processes_timer.stop()
        log_arrays_state(arrays=unshared_process_arrays, log_method_name=log_method_name)
        del sync_manager, shared_arrays, unshared_process_arrays, processes, process
        stdout_some_newlines()
    

    关于python - 在python中使用 'multiprocessing'包的最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22595639/

    有关python - 在python中使用 'multiprocessing'包的最佳实践的更多相关文章

    1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

      我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

    2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

      我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

    3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

      类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

    4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

      很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

    5. ruby - 在 Ruby 中使用匿名模块 - 2

      假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

    6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

      我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

    7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

      关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

    8. ruby-on-rails - rails : "missing partial" when calling 'render' in RSpec test - 2

      我正在尝试测试是否存在表单。我是Rails新手。我的new.html.erb_spec.rb文件的内容是:require'spec_helper'describe"messages/new.html.erb"doit"shouldrendertheform"dorender'/messages/new.html.erb'reponse.shouldhave_form_putting_to(@message)with_submit_buttonendendView本身,new.html.erb,有代码:当我运行rspec时,它失败了:1)messages/new.html.erbshou

    9. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

      我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

    10. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

      我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

    随机推荐