草庐IT

HuggingFace——Accelerate的使用

Charon_HN 2023-08-13 原文

Overview

🤗 Accelerate is a library that enables the same PyTorch code to be run across any distributed configuration by adding just four lines of code! In short, training and inference at scale made simple, efficient and adaptable.

Demo

# + 代表使用accelerate的增加语句;- 代表去掉
+ from accelerate import Accelerator
 from transformers import AdamW, AutoModelForSequenceClassification, get_scheduler

+ accelerator = Accelerator()

 model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=2)
 optimizer = AdamW(model.parameters(), lr=3e-5)

- device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
- model.to(device)

+ train_dataloader, eval_dataloader, model, optimizer = accelerator.prepare(
+     train_dataloader, eval_dataloader, model, optimizer
+ )

 num_epochs = 3
 num_training_steps = num_epochs * len(train_dataloader)
 lr_scheduler = get_scheduler(
     "linear",
     optimizer=optimizer,
     num_warmup_steps=0,
     num_training_steps=num_training_steps
 )

 progress_bar = tqdm(range(num_training_steps))

 model.train()
 for epoch in range(num_epochs):
     for batch in train_dataloader:
-         batch = {k: v.to(device) for k, v in batch.items()}
         outputs = model(**batch)
         loss = outputs.loss
-         loss.backward()
+         accelerator.backward(loss)

         optimizer.step()
         lr_scheduler.step()
         optimizer.zero_grad()
         progress_bar.update(1)

如果简单来说,就是添加了一个accelerate来控制分布式训练,其中了loss的backward变成了accelerate.backward(loss)

Installation & Configuration

安装和配置参考官网即可,其中配置的过程是需要在终端Terminal上通过回答一系列问题,然后自动生成一个名为default_config的yaml文件,并保存在根目录.catch/huggingface/accelerate目录下。

配置完成之后可以使用accelerate env [--config_file] [config_file_name]来验证配置文件是否是Valid。

默认配置文件内容:

- `Accelerate` version: 0.11.0.dev0
- Platform: Linux-5.10.0-15-cloud-amd64-x86_64-with-debian-11.3
- Python version: 3.7.12
- Numpy version: 1.19.5
- PyTorch version (GPU?): 1.12.0+cu102 (True)
- `Accelerate` default config:
        - compute_environment: LOCAL_MACHINE
        - distributed_type: MULTI_GPU
        - mixed_precision: no
        - use_cpu: False
        - num_processes: 2
        - machine_rank: 0
        - num_machines: 1
        - main_process_ip: None
        - main_process_port: None
        - main_training_function: main
        - deepspeed_config: {}
        - fsdp_config: {}

下面就是主要的四种command:

Quicktour

Main use

  1. 首先是先导入accelerate的包:

    from accelerate import Accelerator
    accelerator = Accelerator()
    

    这一个配置需要写在整个training script的前面,因为这是对于distributed training十分重要。

  2. 如果原先的代码中有 .to(device).cuda(),那么就去掉,accelerator是可以自动处理的。如果非要使用 .to(device) ,那么就需要用accelerator.device来代替。

    如果想要完全手动配置device的情况,可以在初始化模型的时候,仅仅需要传入 device_placement=False 参数即可。

    如果在初始化的时候使用了自动化配置,后面还想用到.to(device).cuda()的话,也是可以的(就跟原来的使用方法一样),只不过此时的方法是根据通过prepare()方法之后的model来获取;或者通过Accelerator的属性来获取。

  3. 将所有与训练有关的object(包括optimizer、model、dataloader、scheduler)传入prepare()方法中。

    需要说明的有以下几点:

    • 在训练过程中dataloader是共享在GPUs/TPUs cores上,也就是说,每一个device是获得了整个dataloader中的不相同的一部分。而且 The random states of all processes will be synchronized at the beginning of each iteration through your dataloader, to make sure the data is shuffled the same way (if you decided to use shuffle=True or any kind of random sampler).【也就是说如果在pytorch的Dataloader中设置shuffle=True或设置其他的random sample,那么在每一次迭代的时候,所有进程的随机状态会以相同的shuffled形式进行同步】。
    • 实际处理的batch_size大小实际上是在script中设置的batch_size * 设备数量(这其实与pytorch的 DDP分布式训练方法一样)。举个例子,比如在script中设置的batch_size是16,其中有四块GPU可以使用,那么此时整个训练过程中共有16 * 4 个batch参与训练,但是还是以大小为16的batch参与训练的,只不过有16 * 4个batch数据被加入到内存中来。
      • 当然,如果在初始化accelerate的过程中,给定参数split_batches=True就可以保证相同整个训练过程中的总得batch size大小是script中设置的。对于这个参数,官网是这样定义的:split_batches (bool, optional, defaults to False) — Whether or not the accelerator should split the batches yielded by the dataloaders across the devices. If True the actual batch size used will be the same on any kind of distributed processes, but it must be a round multiple of the num_processes you are using. If False, actual batch size used will be the one set in your script multiplied by the number of processes.
    • 如果想要记录或者查看数据集的情况,要在prepare()方法之后。
      • 当然,prepare()方法也是可以在不同的需求下使用的,比如在验证数据集上。如果想要分布式验证(distrubuted evaluation)的话,可以把valuation dataloader也加入到prepare()方法中来。
  4. 用accelerate.backward(loss)代替loss.backward()即可。

至此,就可以依托于accelerate来通过使用不同的分布式训练工具(比如pytorch的torchrun或者accelerate的launch等)来实现训练了。

Distributed evaluation

这里主要讲一讲如何分布式评估在验证集上的效果。方法也很简单,只需要单独将validation的dataloader传入prepare()

方法中即可:

validation_dataloader = accelerator.prepare(validation_dataloader)

因为是分布式训练,每一个device只会看到数据集中的一部分,因此需要平均最后的结果,也就是说需要汇总所有的结果然后求一个均值。这里就可以使用 gather_for_metrics()方法:

for inputs, targets in validation_dataloader:
    predictions = model(inputs)
    # Gather all predictions and targets
    all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets))
    # Example of use with a *Datasets.Metric*
    metric.add_batch(all_predictions, all_targets)

这里还有几点需要强调:

  • 在实际过程中,数据可能在分发的时候,会出现重复的现象(Some data at the end of the dataset may be duplicated so the batch can be divided equally among all workers. )。为此 gather_for_metrics()方法可以在汇总结果的时候,自动将重复的数据结果删除。
    • 当然,这里还有手动的办法不去删除重复的结果,即用 gather() 方法,我们可以看到两者的不同点:
    • gather_for_metrics():Gathers tensor and potentially drops duplicates in the last batch if on a distributed system. Should be used for gathering the inputs and targets for metric calculation.
    • gather():The gathered tensor(s). Note that the first dimension of the result is num_processes multiplied by the first dimension of the input tensors. Gather the values in tensor across all processes and concatenate them on the first dimension. Useful to regroup the predictions from all processes when doing evaluation. Note: This gather happens in all processes.
  • 在处理NLP任务中需要注意:gather()gather_for_metrics() 方法都是需要处理相同tensor,即在每一个进程中size of tensor都必须是相同的。 如果在padding的过程中是这样动态策略来padding的——取一个min-batch中最长的作为整个batch中的长度,那么就需要调用pad_across_processes() 方法来padding所有的tensor达到所有tensor中的统一长度。

Launching your distributed script

分布式训练的方法就不赘述了,训练方法很简单,需要详细了解的是一些参数:Check out the Launch tutorial for more information about launching your scripts.

在Launch tutorial中主要有下面的三个部分内容:

  • Using accelerate launch

    • If you are familiar with launching scripts in PyTorch yourself such as with torchrun, you can still do this. It is not required to use accelerate launch.

    • 默认的运行时需要用到配置文件的,但是也是可以不使用配置文件的形式来运行,我们可以通过命令行的形式来运行,比如多GPU训练的demo:

      accelerate launch --multi_gpu {script_name.py} {--arg1} {--arg2} ...
      

      所有的CLI参数说明都在这里

  • Why you should always use accelerate config

  • Custom Configurations

其他问题

如何只在某一进程中处理

针对某些任务(在单服务上),比如下载数据或者记录日志等需要在某一个进程中进行即可。那么此时就可以用下面的方法:

if accelerator.is_local_main_process:
    # Is executed once per server(Once on a single server)

如果是使用了tqdm库的话,也是需要特殊处理的:

from tqdm.auto import tqdm

progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)

其中,如果对于某些任务(在多服务上),比如上传模型到model hub中,那么就需要用下面的方法:

if accelerator.is_main_process:
    # Is executed once only(Only ever once across all servers)

is_local_main_process和is_main_process就跟Pytorch的分布式训练中的LOCAL_RANK和RANK的区别,这里就不展开讨论了。

简单说就是,如果是单物理主机,那么就用 is_local_main_process 方法就够用的了。

还有一个可能会用到的小trick就是:For printing statements you only want executed once per machine, you can just replace the print function by accelerator.print.(也就是仅主进程为0的进程才输出)。

保存或加载模型

保存训练好的模型需要分两步完成:

  1. 使用 accelerator.wait_for_everyone()方法;

  2. 使用accelerator.unwrap_model(model)方法:

    总结来说如下:

    accelerator.wait_for_everyone()
    unwrapped_model = accelerator.unwrap_model(model)
    accelerator.save(unwrapped_model.state_dict(), filename)
    

    那么这个unwrap_model到底在干嘛呢,我查看了他们的源码,其中有下面这一段

    def extract_model_from_parallel(model):
        """
        Extract a model from its distributed containers.
        Args:
            model (`torch.nn.Module`): The model to extract.
        Returns:
            `torch.nn.Module`: The extracted model.
        """
        options = (torch.nn.parallel.DistributedDataParallel, torch.nn.DataParallel)
        if is_deepspeed_available():
            options += (DeepSpeedEngine,)
    
        while isinstance(model, options):
            model = model.module
        return model
    

    考虑一个简单的情况,如果我们没有使用DeepSpeed的方法,那么就是执行while循环的那一句,也就是model=model.module

    熟悉Pytorch DDP的都知道,分布式训练的时候,模型的保存总是在key中多一个module字段(具体的原因也不清楚)。因此这一步就是将这个module字段给去掉,要不然对于不熟悉DDP的同学来说,在将权重load进模型的时候就会出现字段不匹配的情况。

模型加载也是一样的处理方法,这里就直接放代码了:

unwrapped_model = accelerator.unwrap_model(model) # 如果是先调用了prepare()方法的话,这一步必须加
unwrapped_model.load_state_dict(torch.load(filename))

⚠️:如果你是使用prepare()方法之后的话,那么模型加载权重的时候是需要用accelerator.unwrap_model方法的。其他情况下,问题不大。

保存或加载训练的整个状态

这里说的整个训练过程状态是指保存/加载 训练模型过程中的model、optimizer、random generators和LR schedulers。详情参考文档吧,一般情况也不常用到(PS:主要是懒)

同步等待

我们知道,在pytoch中,有torch.distributed.barrier这个方法来实现多卡训练的进程等待。那么应用在什么地方呢?在比如我们想要在单进程中进行预测或者验证,那么其他的进程就必须要等待执行任务的进程结束之后,才可以进行下一轮迭代,那么为了实现这个等待,就需要写下面的代码:

accelerator.wait_for_everyone()

这个方法跟torch.distributed.barrier的含义是一样的,都是需要所有的进程执行了这句话之后才可以进行下一步。

这里我遇到了一个大问题,就是比如我们想要做单一设备上的预测,不通过分布式的方法来预测,那么通过accelerator.wait_for_everyone()或者torch.distributed.barrier两个方法时,就会出现所有进程卡住,虽然此时的GPU占用率是100%,但是没有任何程序在正常运行,不过在我找了好多博客之后,终于在Github上发现了类似的问题Using torch.distributed.barrier() makes the whole code hang

这里的方法简单来说,就是将原来的model(**input)用model.module(**input)来代替即可。至于为什么咱也不清楚😭。

同样还有其他方法——使用no_grad()方法:

# validate the model
if gpu==0 :
  with torch.no_grad():
    model.eval()
    for data, target in valid_loader:
      if torch.cuda.is_available:
        data, target = data.cuda(), target.cuda()
        output = model(data)
        loss = criterion(output, target)
        valid_loss += loss.item()*data.size(0)

那么知道了这个之后,我们就可以用accelerator.unwrap_model(model)方法就可以完成正常的barrier了。

梯度裁剪

在Pytorch中使用到的梯度裁剪方法是torch.nn.utils.clip_grad_norm_ or torch.nn.utils.clip_grad_value_,那么这里就对应使用 clipgrad_norm() and clipgrad_value() 两个方法就可以了。

其实跟用原生的Pytorch方法也差不多,这里以torch.nn.utils.clip_grad_norm_为例,可以看到其源码的实现也就是用了torch.nn.utils.clip_grad_norm_

def clip_grad_norm_(self, parameters, max_norm, norm_type=2):
        """
        Should be used in place of `torch.nn.utils.clip_grad_norm_`.
        Example:
        ```python
        >>> from accelerate import Accelerator
        >>> accelerator = Accelerator(gradient_accumulation_steps=2)
        >>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
        >>> for (input, target) in dataloader:
        ...     optimizer.zero_grad()
        ...     output = model(input)
        ...     loss = loss_func(output, target)
        ...     accelerator.backward(loss)
        ...     if accelerator.sync_gradients:
        ...         accelerator.clip_grad_norm_(model.parameters(), max_grad_norm)
        ...     optimizer.step()
        ```
        """
        if self.distributed_type == DistributedType.FSDP:
            self.unscale_gradients()
            parameters = [p for p in parameters]
            for model in self._models:
                if parameters == [p for p in model.parameters()]:
                    model.clip_grad_norm_(max_norm, norm_type)
                    return
        elif self.distributed_type == DistributedType.DEEPSPEED:
            # `accelerator.backward(loss)` is doing that automatically. Therefore, it's implementation is not needed
            return
        self.unscale_gradients()
        torch.nn.utils.clip_grad_norm_(parameters, max_norm, norm_type=norm_type)

混合精度训练

混合精度训练在 🤗 Accelerate框架的加持下使用起来也是非常简单的:

with accelerator.autocast():
    loss = complex_loss_function(outputs, target):

这里还有一个情况需要说明:混合精度训练的时候,在训练的开始或者其他的不确定的时候,会发生gradient skip。这是因为 “because of the dynamic loss scaling strategy, there are points during training where the gradients have overflown, and the loss scaling factor is reduced to avoid this happening again at the next step.

如果发生了这一情况,那么就需要手动更新LR scheduler。一般情况下不更新也是可以的,但是如果训练集很小或者说模型对scheduler的初始化的LR很敏感,那么就需要手动更新LR scheduler了:

if not accelerator.optimizer_step_was_skipped:
    lr_scheduler.step()

梯度累计

使用 accumulate()方法指定 gradient_accumulation_steps参数即可实现梯度累计。

accelerator = Accelerator(gradient_accumulation_steps=2)
model, optimizer, training_dataloader = accelerator.prepare(model, optimizer, training_dataloader)

for input, label in training_dataloader:
    with accelerator.accumulate(model):
        predictions = model(input)
        loss = loss_function(predictions, label)
        accelerator.backward(loss)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

随机种子设定

使用 utils.set_seed()方法固定种子

from accelerate import set_seed

set_seed(42)

内部机制

以上的内容就足够用于日常的炼丹了,但是这里还是想简单得聊聊Accelerate到底从头到尾做了什么呢?

  1. 首先Accelerate会先去分析给定的参数情况,所有的信息都被存储在了AcceleratorState中。

  2. 然后就是调用prepare()方法,该方法会做下面的三件事情:

    • wraps your model(s) in the container adapted for the distributed setup;

    • wraps your optimizer(s) in a AcceleratedOptimizer

    • creates a new version of your dataloader(s) in a DataLoaderShard.

      当model和optimizer被wrap的时候,dataloader会重新构建。这是因为Pytorch的问题,因为Pytorch要根据num_processes来确定新的batch_size的大小是多少。

      这里随便举一个Pytorch的使用便可以知道上面在说什么:

      # 分布式数据集
      train_sampler = DistributedSampler(train_dataset)
      train_loader = torch.utils.data.DataLoader(train_dataset, sampler=train_sampler, batch_size=batch_size)  # 注意这里的batch_size是每个GPU上batch_size
      
      # 分布式模型
      model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank, find_unused_parameters=True)
      

有关HuggingFace——Accelerate的使用的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  8. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  9. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  10. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

随机推荐