草庐IT

Python:实用运维脚本编写(进程/文件/目录操作)

Orion's Blog 2023-03-28 原文

Python在很大程度上可以对shell脚本进行替代。笔者一般单行命令用shell,复杂点的多行操作就直接用Python了。这篇文章就归纳一下Python的一些实用脚本操作。

1. 执行外部程序或命令

我们有以下C语言程序cal.c(已编译为.out文件),该程序负责输入两个命令行参数并打印它们的和。该程序需要用Python去调用C语言程序并检查程序是否正常返回(正常返回会返回 0)。

#include<stdio.h>
#include<stdlib.h>
int main(int argc, char* argv[]){
    int a = atoi(argv[1]);
    int b = atoi(argv[2]);
    int c = a + b;
    printf("%d + %d = %d\n", a, b, c);
    return 0;
}

那么我们可以使用subprocess模块的run函数来spawn一个子进程:

res = subprocess.run(["Python-Lang/cal.out", "1", "2"])
print(res.returncode) 

可以看到控制台打印出进程的返回值0:

1 + 2 = 3
0

当然,如果程序中途被杀死。如我们将下列while.c程序写为下列死循环(已编译为.out文件):

#include<stdio.h>
#include<stdlib.h>
int main(int argc, char* argv[]){
    while(1);
    return 0;
}

我们同样用run函数接收其返回值:

res = subprocess.run("Python-Lang/while.out")
print(res.returncode)

不过我们在程序运行中用shell命令将其终止掉:

(base) orion-orion@MacBook-Pro Python-Lang % ps -a |grep while 
11829 ttys001    0:17.49 Python-Lang/while.out
11891 ttys005    0:00.00 grep while
(base) orion-orion@MacBook-Pro Python-Lang % kill 11829

可以看到控制台打印输出的进程返回值为-15(因为负值-N表示子进程被信号N终止,而kill命令默认的信号是15,该信号会终止进程):

-15

如果程序陷入死循环不能正常终止,我们总不能一直等着吧?此时,我们可以设置超时机制并进行异常捕捉:

try:
    res = subprocess.run(["Python-Lang/while.out"], capture_output=True, timeout=5)
except subprocess.TimeoutExpired as e:
    print(e)

此时会打印输出异常结果:

Command '['Python-Lang/while.out']' timed out after 5 seconds

有时需要获取程序的输出结果,此时可以加上capture_output参数,然后访问返回对象的stdout属性即可:

res = subprocess.run(["netstat", "-a"], capture_output=True)
out_bytes = res.stdout

输出结果是以字节串返回的,如果想以文本形式解读,可以再增加一个解码步骤:

out_text = out_bytes.decode("utf-8")
print(out_text)

可以看到已正常获取文本形式的输出结果:

...
kctl       0      0     33      6 com.apple.netsrc
kctl       0      0     34      6 com.apple.netsrc
kctl       0      0      1      7 com.apple.network.statistics
kctl       0      0      2      7 com.apple.network.statistics
kctl       0      0      3      7 com.apple.network.statistics

(base) orion-orion@MacBook-Pro Learn-Python % 

一般来说,命令的执行不需要依赖底层shell的支持(如sh,bash等),我们提供的字符串列表会直接传递给底层的系统调用,如os.execve()。如果希望命令通过shell来执行,只需要给定参数shell=True并将命令以简单的字符串形式提供即可。比如我们想让Python执行一个涉及管道、I/O重定向或其它复杂的Shell命令时,我们就可以这样写:

out_bytes = subprocess.run("ps -a|wc -l> out", shell=True)

2. 文件和目录操作(命名、删除、拷贝、移动等)

我们想要和文件名称和路径打交道时,为了保证获得最佳的移植性(尤其是需要同时运行与Unix和Windows上时),最好使用os.path中的函数。例如:

import os
file_name = "/Users/orion-orion/Documents/LocalCode/Learn-Python/Python-Lang/test.txt"
print(os.path.basename(file_name)) 
# test.txt
print(os.path.dirname(file_name))
# /Users/orion-orion/Documents/LocalCode/Learn-Python/Python-Lang
print(os.path.split(file_name))
# ('/Users/orion-orion/Documents/LocalCode/Learn-Python/Python-Lang', 'test.txt')
print(os.path.join("/new/dir", os.path.basename(file_name)))
# /new/dir/test.txt
print(os.path.expanduser("~/Documents"))
# /Users/orion-orion/Documents

其中os.path.expanduser当用户或$HOME未知时, 将不做任何操作。如我们这里的$HOME就为/Users/orion-orion

(base) orion-orion@MacBook-Pro ~ % echo $HOME
/Users/orion-orion

如果要删除文件,请用os.remove(在删除前注意先判断文件是否存在):

file_name = "Python-Lang/test.txt"
if os.path.exists(file_name):
    os.remove(file_name)

接下来我们看如何拷贝文件。当然最直接的方法是调用Shell命令:

os.system("cp Python-Lang/test.txt Python-Lang/test2.txt")

当然这不够优雅。如果不像通过调用shell命令来实现,可以使用shutil模块,该模块提供了一系列对文件和文件集合的高阶操作,其中就包括文件拷贝和移动/重命名。这些函数的参数都是字符串,用来提供文件或目录的名称。以下是示例:


src = "Python-Lang/test.txt"
dst = "Python-Lang/test2.txt"

# 对应cp src dst (拷贝文件,存在则覆盖)
shutil.copy(src, dst) 


src = "Python-Lang/sub_dir"
dst = "Python-Lang/sub_dir2"
# 对应cp -R src dst (拷贝整个目录树)
shutil.copytree(src, dst)

src = "Python-Lang/test.txt"
dst = "Python-Lang/sub_dir/test2.txt"
# 对应mv src dst (移动文件,可选择是否重命名)
shutil.move(src, dst)

可以看到,正如注释所言,这些函数的语义和Unix命令类似。如果你对Unix下的文件拷贝/移动等操作不熟悉,可以参见我的博客《Linux:文件解压、复制和移动的若干坑》

默认情况下,如果源文件是一个符号链接,那么目标文件将会是该链接所指向的文件的拷贝。如果只想拷贝符号链接本身,可以提供关键字参数follow_symlinks:

shutil.copy(src, dst, follow_symlinks=True)

如果想在拷贝的目录中保留符号链接,可以这么做:

shutil.copytree(src, dst, symlinks=True)

有时在拷贝整个目录时需要对特定的文件和目录进行忽略,如.pyc这种中间过程字节码。我们可以为copytree提供一个ignore函数,该函数已目录名和文件名做为输入参数,返回一列要忽略的名称做为结果(此处用到字符串对象的.endswith方法,该方法用于获取文件类型):

def ignore_pyc_files(dirname, filenames):
    return [name for name in filenames if name.endswith('pyc')] 


shutil.copytree(src, dst, ignore=ignore_pyc_files)

不过由于忽略文件名这种模式非常常见,已经有一个实用函数ignore_patterns()提供给我们使用了(相关模式使用方法类似.gitignore):

shutil.copytree(src, dst, ignore=shutil.ignore_patterns("*~", "*.pyc"))

注:此处的"*~"模式匹配是文本编辑器(如Vi)产生的以"~"结尾的中间文件。

忽略文件名还常常用在os.listdir()中。比如我们在数据密集型(如机器学习)应用中,需要遍历data目录下的所有数据集文件并加载,但是需要排除.开头的隐藏文件,如.git,否则会出错,此时可采用下列写法:

import os
import os
filenames = [filename for filename in os.listdir("Python-Lang/data") if not filename.startswith(".")] #注意,os.listdir返回的是不带路径的文件名

让我们回到copytree()。用copytree()来拷贝目录时,一个比较棘手的问题是错误处理。比如在拷贝的过程中遇到已经损坏的符号链接,或者由于权限问题导致有些文件无法访问等。对于这种情况,所有遇到的异常会收集到一个列表中并将其归组为一个单独的异常,在操作结束时抛出。示例如下:

import shutil

src = "Python-Lang/sub_dir"
dst = "Python-Lang/sub_dir2"

try:
    shutil.copytree(src, dst)
except shutil.Error as e:
    for src, dst, msg in e.args[0]:
        print(src, dst, msg)

如果提供了ignore_dangling_symlinks=True,那么copytree将会忽略悬垂的符号链接。

更多关于shutil的使用(如记录日志、文件权限等)可参见shutil文档[4]

接下来我们看如何使用os.walk()函数遍历层级目录以搜索文件。只需要将顶层目录提供给它即可。比如下列函数用来查找一个特定的文件名,并将所有匹配结果的绝对路径打印出来:

import os
def findfile(start, name):
    for relpath, dirs, files in os.walk(start):
        if name in files:
            # print(relpath)
            full_path = os.path.abspath(os.path.join(relpath, name))
            print(full_path)

start = "."
name = "test.txt"
findfile(start, name)

可以看到,os.walk可为为我们遍历目录层级,且对于进入的每个目录层级它都返回一个三元组,包含:正在检视的目录的相对路径(相对脚本执行路径),正在检视的目录中包含的所有目录名列表,正在捡视的目录中包含的所有文件名列表。这里的os.path.abspath接受一个可能是相对的路径并将其组成绝对路径的形式。

我们还能够附加地让脚本完成更复杂的功能,如下面这个函数可打印出所有最近有修改过的文件:

import os
import time
def modified_within(start, seconds):
    now = time.time()
    for relpath, dirs, files in os.walk(start):
        for name in files:
            full_path = os.path.join(relpath, name)
            mtime = os.path.getmtime(full_path)
            if mtime > (now - seconds):
                print(full_path)

start = "."
seconds = 60
modified_within(start, 60)

3. 创建和解包归档文件

如果仅仅是想创建或解包归档文件,可以直接使用shutil模块中的高层函数:

import shutil
shutil.make_archive(base_name="data", format="zip", root_dir="Python-Lang/data")
shutil.unpack_archive("data.zip")

其中第二个参数format为期望输出的格式。要获取所支持的归档格式列表,可以使用get_archive_formats()函数:

print(shutil.get_archive_formats())
# [('bztar', "bzip2'ed tar-file"), ('gztar', "gzip'ed tar-file"), ('tar', 'uncompressed tar file'), ('xztar', "xz'ed tar-file"), ('zip', 'ZIP file')]

Python也提供了诸如tarfilezipfilegzip等模块来处理归档格式的底层细节。比如我们想要创建然后解包.zip归档文件,可以这样写:

import zipfile


with zipfile.ZipFile('Python-Lang/data.zip', 'w') as zout:
    zout.write(filename='Python-Lang/data/test1.txt', arcname="test1.txt")
    zout.write(filename='Python-Lang/data/test2.txt', arcname="test2.txt")
with zipfile.ZipFile('Python-Lang/data.zip', 'r') as zin:
    zin.extractall('Python-Lang/data2') #没有则自动创建data2目录

参考

有关Python:实用运维脚本编写(进程/文件/目录操作)的更多相关文章

  1. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  2. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  3. ruby-on-rails - 在 Rails 中将文件大小字符串转换为等效千字节 - 2

    我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,

  4. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  5. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  6. ruby - 将差异补丁应用于字符串/文件 - 2

    对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl

  7. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  8. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  9. ruby - 在 Ruby 中编写命令行实用程序 - 2

    我想用ruby​​编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序

  10. ruby - 在 jRuby 中使用 'fork' 生成进程的替代方案? - 2

    在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',

随机推荐