自从2020年进入公司从学生变成社会人以来,接手的第一项工作就是检查并整改公司终端的季度考核指标;
季度考核是公司总部要求的,每季度一次(废话。。),需要管理的终端量在1500台左右;
主要就是检查内网终端各类管理软件(桌面管理、防病毒等等)的安装率与版本合规情况;
各类软件都有总部提供的平台,需要做的就是去各个平台导出数据、Excel汇总计算,
然后看看哪项考核不达标,通知负责终端维护的外包公司去各个地点做维护;
这项工作虽然不难,但是每季度末都要重复地检查确保达标,因此工作量很大,每天至少要花一两个小时;
其实在刚接手的时候就有做一个自动检查系统的想法,奈何当时的我技术力还不够;
并且刚刚进入公司需要学习的东西太多,就暂且搁置了;
22年一季度不是很忙,就利用工作之余的上班时间做出来了;
每天自动出结果发给外包公司,很大程度上减少了季度考核时的工作量;
主要的实现过程大致是三个步骤:
一、利用Python的selenium WEB自动化工具去各个平台下载数据;
二、利用Python的pymysql数据库工具将数据文件导入MySQL数据库;
三、MySQL数据库按照考核标准计算结果,生成结果文件,再通过邮件发送结果文件。
结构图:

一、利用Python的selenium WEB自动化工具去各个平台下载数据
其实用浏览器控制去导数可靠性比较差,并且效率比较低,但是去找各个平台的项目组对接数据库实在太麻烦了,人家也不见得乐意开放数据库给你用;
并且这个工具最多季度末的时候一天跑两三次,并不需要太高的效率,所以就采用了selenium WEB自动化;
selenium的学习参考的是腾讯云社区的教程:Python中Selenium库使用教程详解 - 云+社区 - 腾讯云 (tencent.com);
首先是pip安装selenium,指定国内源加快下载速度:
pip install selenium -i https://pypi.tuna.tsinghua.edu.cn/simple
我用的是谷歌浏览器版本99,需要下载一个对应版本99的驱动程序才能调起浏览器,下载地址:http://chromedriver.storage.googleapis.com/index.html;
使用示例:
from selenium import webdriver
from time import sleep
# 实例化一款浏览器
browser = webdriver.Chrome(executable_path = "chromedriver.exe")
# 对指定的url发起请求
browser.get("https://www.bilibili.com/")
# 设定窗口大小
browser.set_window_size(1600, 900)
# 在页面中寻找元素
element = browser.find_element_by_xpath('/html/body/div[2]/div[1]/div[1]/ul[2]/li[1]/li/div/div/span') # “登录”
# 对元素进行操作
element.click() # 点击
# 延时3秒,等待页面回应
sleep(3)
# 关闭浏览器
browser.quit()
一般浏览器都可以用F12进入开发工具查看页面中的元素,点击1处的工具,再点击2处页面中任意的元素,在3处会跳转并高亮元素在HTML代码中的位置;
我是按照完整xpath来寻找元素的browser.find_element_by_xpath(),也可以使用元素的id、name、标签等等;
点击元素后element.click(),如果有后续操作可以添加一个延时,否则页面可能来不及响应。

1. 切换frame
某些页面会用到frame的结构,比如:
<html lang="en">
<head>
<title>FrameTest</title>
</head>
<body>
<iframe src="a.html" id="frame1" name="myframe"></iframe>
</body>
</html>
在frame中的元素无法被browser.find_element_by_xpath()直接寻找到,需要进行frame的切换:
frame = browser.find_elements_by_tag_name('iframe')[1] # 定义frame
browser.switch_to.frame(frame) # 切换frame
element = browser.find_element_by_xpath('XXXXXX') # 寻找frame中的元素
element.click() # 点击元素
browser.switch_to.default_content() # 退出frame回到主体结构
如果需要回到主体结构继续操作要使用browser.switch_to.default_content()。
2. 切换浏览器窗口
某些地方点击后会新建标签页或者弹出新的浏览器窗口,此时就需要切换窗口去继续操作:
windows = browser.window_handles
browser.switch_to.window(windows[1]) # 打开了新页面,需要切换窗口
element = browser.find_element_by_xpath('XXXXXX')
element.click()
3. 新建标签页
如果不想关闭浏览器,需要打开新的标签页继续操作:
js = "window.open('https://www.douban.com/')"
browser.execute_script(js) # 在新标签页中访问
4. 识别简单的验证码
本来我以为用selenium查找元素一个一个去点是很重复无聊且枯燥的工作;
直到我遇到了一个平台登录的时候需要输入一个图片验证码(其他平台都是只需要账号密码),事情突然变得有趣了起来;
大概的思路是先截取验证码的图片,然后用图片识别文字的工具来识别验证码,再输入到网页验证码框里;
首先要解决把验证码图片截出来的问题,我参考的是selenium验证码识别之局部截图 - 简书 (jianshu.com);
然后选择了一个比较轻量的(因为要放到内网,太大了不好处理)图片识别文字的工具——pytesseract;
pytesseract的安装和使用参考的是Python OCR工具pytesseract详解_测试开发小记的博客-CSDN博客_pytesseract;
因为图片识别验证码是一个不一定成功且准确的事件,所以我在这边使用了try finally结构,重复识别验证码并输入;
我在使用过程中识别正确率很低,一般要重复识别十几次几十次才能通过;
这个工具的使用场景并不需要追求很高的效率,因此只要能通过一次就能满足我的需求;
下面是这部分功能的代码:
from selenium import webdriver
from time import sleep
from PIL import Image
import pytesseract
# 重复尝试识别验证码,失败后刷新重试,成功后寻找不到try部分中第一个元素,跳转到finally部分
try:
r = 0
while True:
# 输入用户名密码,识别验证码,点击“登录”
element = browser.find_element_by_xpath('XXXXXX') # 用户名
element.send_keys('用户名')
element = browser.find_element_by_xpath('XXXXXX') # 密码
element.send_keys('密码')
# 截图识别验证码并输入
browser.save_screenshot('browser.png') # 对网页进行截图
code_png_lel = browser.find_element_by_xpath('XXXXXX') # 验证码
location = code_png_lel.location # 获取验证码元素所在位置
print('location', location)
size = code_png_lel.size # 获取验证码元素大小
print('size', size)
rangle = (int(location['x']), int(location['y']), int(location['x'] + size['width']), int(location['y'] + size['height'])) # 找到验证码在网页截图中的位置
print('rangle', rangle)
i = Image.open('browser.png')
frame = i.crop(rangle) # 按照验证码位置截图
frame.save('code.png')
j = Image.open('code.png')
string = pytesseract.image_to_string(j) # 识别验证码图片中的文字
string = string.replace(' ', '') # 删除识别结果中的空格
print(string)
element = browser.find_element_by_xpath('XXXXXX') # 验证码输入框
element.send_keys(string) # 输入删除空格后的识别结果
element = browser.find_element_by_xpath('XXXXXX') # “登录”
element.click() # 点击登录
browser.refresh() # 刷新页面
r = r + 1
print(r)
# 成功后跳出循环,继续操作
finally:
sleep(5)
element = browser.find_element_by_xpath('XXXXXX')
element.click()
二、利用Python的pymysql数据库工具将数据文件导入MySQL数据库
获取各个平台的多张数据表后,下一步就是将数据导入MySQL数据库中进行计算(其实是两步,“导入”和“计算”);
首先,各个平台导出数据表的格式各不相同,有xlsx,有xls,有csv;
需要把它们全部整合成统一的格式csv,编码转为utf-8;
有些表存在很多无效行,全部导入数据库会浪费很多时间,需要进行预处理;
xlsx或xls转csv:
import pandas
def xlsx_to_csv(xlsx_file_path, csv_file_path):
print('>>文件%s格式转换处理中。。。' % xlsx_file_path)
file_xlsx = pandas.read_excel(xlsx_file_path, index_col=0)
file_xlsx.to_csv(csv_file_path, encoding='utf-8')
print('>>文件%s格式已转换为csv' % xlsx_file_path)
xlsx或xls转csv,并删除无效表头(header = None):
import pandas
def xlsx_to_csv_noheader(xlsx_file_path, csv_file_path):
print('>>文件%s格式转换处理中。。。' % xlsx_file_path)
file_xlsx = pandas.read_excel(xlsx_file_path, index_col=0)
file_xlsx.to_csv(csv_file_path, encoding='utf-8', header = None)
print('>>文件%s格式已转换为csv,并删除了无效表头' % xlsx_file_path)
xlsx或xls转csv,并按照某关键字筛选(apply(lambda a:a == '关键字')):
import pandas
def xlsx_to_csv_bgfxm(xlsx_file_path, csv_file_path):
print('>>文件%s格式转换处理中。。。' % xlsx_file_path)
file_xlsx = pandas.read_excel(xlsx_file_path, index_col=0)
file_xlsx = file_xlsx.loc[file_xlsx['列名'].apply(lambda a:a == '关键字')]
file_xlsx.to_csv(csv_file_path, encoding='utf-8', index = False)
print('>>文件%s格式已转换为csv' % xlsx_file_path)
csv中按行删除某列中的重复值,保留重复值中从上到下的第一条(keep = 'first'):
import pandas
def delete_duplicates(csv_file_path, keyword):
print('>>文件%s删除%s重复记录处理中。。。' % (csv_file_path, keyword))
csv_file = pandas.read_csv(csv_file_path)
csv_file = csv_file.drop_duplicates(subset = [keyword], keep = 'first', inplace = False)
csv_file.to_csv(csv_file_path, encoding='utf-8', index = False)
print('>>文件%s删除%s重复记录已完成' % (csv_file_path, keyword))
import codecs
def ansi2uft8(file):
print('>>文件%s编码转换处理中。。。' % file)
f = codecs.open(file, 'r', 'ansi')
ff = f.read()
file_name = file.split('\\')[-1]
file_path = file.replace(file_name, "")
file_object = codecs.open(file_path + '\\' + file_name, 'w', 'utf-8')
file_object.write(ff)
print('>>文件%s编码已转换为UTF-8' % file)
某些平台导出的csv文件每行末尾都会有一个英文逗号“,”,会导致导入数据库的时候表头出现空字段、每行多一个空值,因此需要删掉:
import os
def droplastcomma(file):
print(">>文件%s末尾','删除处理中。。。" % file)
reader = open(file, 'r', encoding = 'utf8')
write_file = file.strip()[:-4] + '_temp.csv'
writer = open(write_file, 'w', encoding = 'utf8')
rows_raw = reader.readlines()
for row in rows_raw:
row = row.rstrip()[:-1] + '\n'
writer.writelines(row)
reader.close()
writer.close()
os.remove(file)
os.rename(write_file, file)
print(">>文件%s每行末尾','已删除" % file)
接下来就是重头戏,连接数据库并导入csv文件;
首先需要安装一个MySQL数据库,安装完后建立一个空的数据库;
Python调用MySQL的组件为pymysql,以下为实现过程:
import pymysql
# 连接数据库
db = pymysql.connect(
host = '127.0.0.1',
port = 3306,
user = '用户名',
passwd = '密码',
db = '库名',
charset = 'utf8')
# 建立连接游标
cursor=db.cursor()
print('>>已连接数据库,处理中。。。')
# 函数:删除旧表,从csv文件创建新表
def load_csv(csv_file_path, table_name, database, decoding):
print(">>文件%s开始创建表。。。" % csv_file_path)
# 打开文件,创建表和表头
file = open(csv_file_path, 'r', encoding = decoding)
# reader:readline读取csv文件第一行,用于创建表头
reader = file.readline()
b = reader.split(',')
colum = ''
head = ''
# for循环:从csv文件第一行逐个编辑字符串,创建表头字符串
for a in b:
# 去掉每个字符串的"、换行
c = a.strip('"' and '"\n')
# head:用于逐行插入数据时选择表头,中文表头需要加间隔号(1左边的键)
head = head + "`" + c + "`" + ','
# colum:用于创建表时指定表头,中文表头需要加间隔号(1左边的键),创建表时需要加数据类型(varchar(255))
colum = colum + "`" + c + "`" + ' varchar(255),'
# [:-1]去除末尾逗号
colum = colum[:-1]
head = head[:-1]
cursor.execute('use %s' % database)
cursor.execute('set names utf8')
cursor.execute('set character_set_connection=utf8')
cursor.execute('drop table if exists %s' % table_name)
# 创建表,增加id列
create_sql = 'create table if not exists ' + table_name + ' (id INT,' + colum + ')' + ' DEFAULT CHARSET=utf8'
cursor.execute(create_sql)
# while循环:逐行导入csv
# rows_raw:readlines逐行读取csv
rows_raw = file.readlines()
count_rows = len(rows_raw)
i = 0
while i < count_rows:
ii = str(i + 1)
d = rows_raw[i]
d = d.split(',')
rows = ''
for e in d:
# 去掉每个字符串的"、换行、\(\会在insert时报错)
f = e.strip('"')
f = f.strip('"\n')
f = f.strip('\\')
rows = rows + "'" + f + "'" + ','
# rows增加id列数值
rows = "'" + ii + "'," + rows[:-1]
print(rows)
insert_sql = 'insert into %s (`id`,%s) values (%s)' % (table_name, head, rows)
cursor.execute(insert_sql)
cursor.execute('commit')
i = i + 1
print(">>文件%s创建表完成" % csv_file_path)
load_csv('XXX.csv', '表名', '库名', 'UTF-8')
基本思路就是逐行读取csv文件,第一行作为表头,各个字段拼接成一句create table;
后续表中的内容同理,每行逐字段读取,拼接成一句insert into;
每张需要参与计算的数据表都按照此函数导入到数据库中,就可以开始下一步计算了。
三、MySQL数据库按照考核标准计算结果,生成结果文件,再通过邮件发送结果文件
MySQL的计算基本上就是按照考核标准处理多张表中的数据,通过SQL语句实现,因为涉及到公司内部信息比较多,这里就不展示细节了;
可以展示一下实现的框架,通过cursor.execute("")来执行SQL语句,但是不能把分号”;“放在里面,目前没找到解决方法,因此每一句都是分开的:
print('>>开始生成结果表。。。')
## SQL1
cursor.execute("update XXX set `XXX`=replace(`XXX`, 'A', 'B')")
print('>>XXX,已完成')
## SQL2
cursor.execute("delete from XXX where `XXX` != 'XXXXX'")
cursor.execute("delete from XXX where `XXX` = '' and `XXX` = '' and `XXX` = ''")
print('>>XXX,已完成')
下面是结果文件的生成,用xlwt导出结果表,还加入了一个按照内容调整每列宽度的功能,看起来方便些,最后断开数据库连接:
import xlwt
import copy
book = xlwt.Workbook()
def export_excel(table_name, sheet_name):
cursor.execute('select * from %s' % table_name) # 获取表
fields = [field[0] for field in cursor.description] # 获取所有字段名
all_data = cursor.fetchall() # 所有数据
# 写入excel
sheet = book.add_sheet('%s' % sheet_name)
col_num = [0 for x in range(0, len(fields))]
col_list = []
# 获取表头宽度,存入col_list
for col,field in enumerate(fields):
sheet.write(0,col,field)
col_num[col] = len(str(field).encode('gb18030'))
col_list.append(copy.copy(col_num))
row = 1
# 获取每行宽度,存入col_list
for data in all_data:
for col,field in enumerate(data):
sheet.write(row,col,field)
col_num[col] = len(str(field).encode('gb18030'))
col_list.append(copy.copy(col_num))
row += 1
# 函数:获取最适合的列宽
def get_max_col(max_list):
line_list = []
# i表示行,j代表列
for j in range(len(max_list[0])):
line_num = []
for i in range(len(max_list)):
line_num.append(max_list[i][j]) # 将每列的宽度存入line_num
line_list.append(max(line_num)) # 将每列最大宽度存入line_list
return line_list
# 调用函数get_max_col从col_list中获取最适合的列宽,并进行调整
col_max_num = get_max_col(col_list)
for i in range(0, len(col_max_num)):
sheet.col(i).width = 256 * (col_max_num[i] + 2)
print(">>结果表'%s'已导出为'%s'" % (table_name, sheet_name))
export_excel('数据库中表1', '导出表1')
export_excel('数据库中表2', '导出表2')
export_excel('数据库中表3', '导出表3')
book.save('XXX.xls')
print('>>结果文件已生成')
# 提交并断开数据库连接
cursor.execute('commit')
cursor.close()
db.close()
print('>>数据库连接已断开')
最后通过邮件的方式发送检查结果:
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
import smtplib
smtpserver = 'XXX' # SMTP服务器域名
username = 'XXX' # 登录用户名
password = 'XXX' # 登录密码
sender = 'XXX' # 发送者地址
receiver = ['XXX','XXX'] # 接收者地址
subject = '终端考核项检查结果' # 邮件标题
subject = Header(subject, 'utf-8').encode()
# 构建邮件对象
msg = MIMEMultipart('mixed')
msg['Subject'] = subject
msg['From'] = 'XXX'
msg['To'] = ";".join(receiver)
# 构造文字内容
text = "XXXXXX"
text_plain = MIMEText(text, 'plain', 'utf-8')
msg.attach(text_plain)
# 构造附件
sendfile = open('XXX.xls').read()
file_att = MIMEText(sendfile, 'base64', 'utf-8')
file_att["Content-Type"] = 'application/octet-stream'
file_att.add_header('Content-Disposition', 'attachment', filename = 'XXX.xls')
msg.attach(file_att)
# 发送邮件
smtp = smtplib.SMTP()
smtp.connect(smtpserver)
smtp.login(username, password)
smtp.sendmail(sender, receiver, msg.as_string())
smtp.quit()
print('>>结果文件已通过邮件发送')
整个工具写完以后,一共是1000+行。一开始运行需要40分钟,后来优化了平台导出表格的预处理,运行时间缩短到了10分钟;
不得不说Python做自动化的工具是真的很好用,只需要熟悉现成的模块,组合一下,就能实现很强大的功能。
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
为了将Cucumber用于命令行脚本,我按照提供的说明安装了arubagem。它在我的Gemfile中,我可以验证是否安装了正确的版本并且我已经包含了require'aruba/cucumber'在'features/env.rb'中为了确保它能正常工作,我写了以下场景:@announceScenario:Testingcucumber/arubaGivenablankslateThentheoutputfrom"ls-la"shouldcontain"drw"假设事情应该失败。它确实失败了,但失败的原因是错误的:@announceScenario:Testingcucumber/ar
这个问题在这里已经有了答案:Checktoseeifanarrayisalreadysorted?(8个答案)关闭9年前。我只是想知道是否有办法检查数组是否在增加?这是我的解决方案,但我正在寻找更漂亮的方法:n=-1@arr.flatten.each{|e|returnfalseife
我不确定传递给方法的对象的类型是否正确。我可能会将一个字符串传递给一个只能处理整数的函数。某种运行时保证怎么样?我看不到比以下更好的选择:defsomeFixNumMangler(input)raise"wrongtype:integerrequired"unlessinput.class==FixNumother_stuffend有更好的选择吗? 最佳答案 使用Kernel#Integer在使用之前转换输入的方法。当无法以任何合理的方式将输入转换为整数时,它将引发ArgumentError。defmy_method(number)
我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("
我有一个包含多个键的散列和一个字符串,该字符串不包含散列中的任何键或包含一个键。h={"k1"=>"v1","k2"=>"v2","k3"=>"v3"}s="thisisanexamplestringthatmightoccurwithakeysomewhereinthestringk1(withspecialcharacterslike(^&*$#@!^&&*))"检查s是否包含h中的任何键的最佳方法是什么,如果包含,则返回它包含的键的值?例如,对于上面的h和s的例子,输出应该是v1。编辑:只有字符串是用户定义的。哈希将始终相同。 最佳答案
我需要检查DateTime是否采用有效的ISO8601格式。喜欢:#iso8601?我检查了ruby是否有特定方法,但没有找到。目前我正在使用date.iso8601==date来检查这个。有什么好的方法吗?编辑解释我的环境,并改变问题的范围。因此,我的项目将使用jsapiFullCalendar,这就是我需要iso8601字符串格式的原因。我想知道更好或正确的方法是什么,以正确的格式将日期保存在数据库中,或者让ActiveRecord完成它们的工作并在我需要时间信息时对其进行操作。 最佳答案 我不太明白你的问题。我假设您想检查
我的日期格式如下:"%d-%m-%Y"(例如,今天的日期为07-09-2015),我想看看是不是在过去的七天内。谁能推荐一种方法? 最佳答案 你可以这样做:require"date"Date.today-7 关于ruby-检查日期是否在过去7天内,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/32438063/
如何检查Ruby文件是否是通过“require”或“load”导入的,而不是简单地从命令行执行的?例如:foo.rb的内容:puts"Hello"bar.rb的内容require'foo'输出:$./foo.rbHello$./bar.rbHello基本上,我想调用bar.rb以不执行puts调用。 最佳答案 将foo.rb改为:if__FILE__==$0puts"Hello"end检查__FILE__-当前ruby文件的名称-与$0-正在运行的脚本的名称。 关于ruby-检查是否
我有一个div,它根据表单是否正确提交而改变。我想知道是否可以检查类的特定元素?开始元素看起来像这样。如果输入不正确,添加错误类。 最佳答案 试试这个:browser.div(:id=>"myerrortest").class_name更多信息:http://watir.github.com/watir-webdriver/doc/Watir/HTMLElement.html#class_name-instance_method另一种选择是只查看具有您期望的类的div是否存在browser.div((:id=>"myerrortes