因为对request,cooick等不甚了解,所以选用最简单的selenium爬取
selenium 的特点是所见即所得,爬取到的网页结构和正常加载的一样
配置也很简单,使用driver将谷歌浏览器驱动起来即可
可以看出 id 后面的XXXXXXXX(位数不固定)标识了每个用户,想要更换用户只需要找到对应用户的id即可
可以很容的看出 网页的结构为如下
<iframe>
<html>
<div>.........</div> //为要爬取的内容
</html>
</iframe>

selenium提供了方法有很多,因为有的div的id是随机生成的,class结构也比较复杂;我使用了full xpath的方法,获取方法也比较简单,只需要使用chrome浏览器,
在网页任意位置单击左键 --> 点击检查 --> 选中要获取的元素标签可以是<div> ,<li>,<a>,<span >--> 再次点击左键 --> 选择copy --> 选择copy full xpath

此时会得到如下的串:
是该元素从网页<html>标签下的结构
/html/body/div[3]/div/div[2]/div[1]/div/div/div/div/ul/li[1]/div[2]/div/div[4]/div/div/div[2]/h3/a
通过.text方法即可获取其中的内容
import re
from datetime import datetime,timedelta
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from selenium.webdriver.chrome.options import Options
from selenium import webdriver
import pymysql
"""
selenium 模块 爬去动态返回入库系统需要的基本信息
"""
def eye (url):
# 配置谷歌浏览器无界面运行
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--headless')
chrome_options.add_argument('blink-settings=imagesEnabled=false')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_experimental_option("excludeSwitches", ["ignore-certificate-errors",
"enable-automation"])
driver = webdriver.Chrome(chrome_options=chrome_options)
driver.get(url)
driver.implicitly_wait(1) # 显式等待1秒
driver.switch_to.frame('contentFrame') # 切入contentFrame
##使用 fullxpath获取元素的内容
name_session = driver.find_elements_by_xpath('/html/body/div[3]/div/dl/dd/div[1]/div/h2/span[1]')
name = name_session[0].text
dynamic_session = driver.find_elements_by_xpath('/html/body/div[3]/div/dl/dd/ul/li[1]/a/strong')
dynamic = dynamic_session[0].text
addtimes_session = driver.find_elements_by_xpath(
'/html/body/div[3]/div/div[2]/div[1]/div/div/div/div/ul/li/div[2]/div/div[2]/a')
addtimes = []
for item in addtimes_session:
addtimes.append(item.text)
comments_session = driver.find_elements_by_xpath(
'/html/body/div[3]/div/div[2]/div[1]/div/div/div/div/ul/li/div[2]/div/div[3]')
comments = []
for item in comments_session:
comments.append(item.text)
songs_session = driver.find_elements_by_xpath(
'/html/body/div[3]/div/div[2]/div[1]/div/div/div/div/ul/li/div[2]/div/div[4]/div/div/div[2]/h3/a')
songs = []
for item in songs_session:
songs.append(item.text)
singers_session = driver.find_elements_by_xpath(
'/html/body/div[3]/div/div[2]/div[1]/div/div/div/div/ul/li/div[2]/div/div[4]/div/div/div[2]/h4/a')
singers = []
for item in singers_session:
singers.append(item.text)
driver.quit()
return name,dynamic,addtimes,comments,songs,singers
"""
数据持久化模块 将爬去的信息存储到数据库中
"""
def keep(url):
flag= True
email_message = ''
error_message = ''
name, dynamic, addtimes, comments, songs, singers = eye(url)
db = pymysql.connect("XXXX", "root", "XXXXXX.", "数据库名称")
cursor = db.cursor()
cursor.execute("SELECT VERSION()")
data = cursor.fetchone()
print("Database version : %s " % data)
#keep_eye_on_title_on 标题内容存入数据库
# 先检察动态数是否改变(不否认删除一条增加一条的情况,但是我又懒得写)
sql = 'select dynamic from eye_on_title order by `date` desc limit 1 '
try:
cursor.execute(sql)
result = cursor.fetchall()
dynamic_number = result[0][0]
print("查询成功")
except:
print("出错")
if dynamic_number != dynamic:
today = datetime.today()
sql = "insert into eye_on_title (name,dynamic,date) values ( '%s', '%s', '%s')" % (name, dynamic, today)
print(sql)
try:
cursor.execute(sql)
db.commit()
print("保存成功")
if dynamic == 0:
email_message = email_message + "keep_an_eye_on失败,计划暴露或结束请求撤离" + dynamic
else:
email_message = email_message + "提示:动态更新 " + dynamic + "\n"
except:
db.rollback()
print("出错")
##检察更新
for i in range(len(comments)):
songName = re.sub(r'\'', "\\'", songs[i]) # 匹配掉 歌名中的 ' 单引号
sql = "select * from eye_on_timeline where comment = '%s' and song = '%s' " % (comments[i], songName)
print(sql)
cursor.execute(sql)
result = cursor.fetchall()
if result:
print("此条动态已存在")
if i + 1 == len(comments):
error_message = " \n(删除后未添加新的内容) " + today.strftime(
"%m月%d日 %H:%M") # 循环条件左开右闭 所以 i+1 才可以等于 len(comments)取巧写法不好
else:
print("未检测到此条动态,准备写入 ")
##处理时间问题
if addtimes[i] == "刚刚":
print("刚刚")
addtime = (datetime.now() + timedelta(minutes=-1)).strftime("%m月%d日 %H:%M")
sql = "insert into eye_on_timeline (song,singer,`comment`,addtine)values('%s','%s','%s','%s') " % (
songName, singers[i], comments[i], addtime)
elif (addtimes[i])[-3:] == "分钟前":
print('分钟前')
reducetime = (addtimes[i])[:-3]
print(reducetime, "计算时间")
addtime = (datetime.now() + timedelta(minutes=-int(reducetime))).strftime("%m月%d日 %H:%M")
sql = "insert into eye_on_timeline (song,singer,`comment`,addtine)values('%s','%s','%s','%s') " % (
songName, singers[i], comments[i], addtime)
elif (addtimes[i])[:2] == "昨天":
print("昨天")
addtime = (datetime.now() + timedelta(days=-1)).strftime("%m月%d日 %H:%M")
sql = "insert into eye_on_timeline (song,singer,`comment`,addtine)values('%s','%s','%s','%s') " % (
songName, singers[i], comments[i], addtime)
else:
sql = "insert into eye_on_timeline (song,singer,`comment`,addtine)values('%s','%s','%s','%s') " % (
songName, singers[i], comments[i], addtimes[i])
print(sql)
sql_message = ("分享歌曲: " + songs[i]) + (" 歌手: " + singers[i]) + (" 评论: " + comments[i] + "\n")
email_message = email_message + sql_message
try:
cursor.execute(sql)
db.commit()
print("保存成功")
except:
db.rollback()
print("出错")
else:
print("没有更新")
flag = False
db.close()
return flag,email_message ,error_message
"""邮件模块将检测到的更新信息发送到邮箱内提醒"""
def mail (email_message,url,error_message,flag):
if flag == True:
from_addr = 'XXXXXXXX@qq.com' # 邮件发送账号
to_addrs = 'XXXXXXXXX@qq.com' # 接收邮件账号
qqCode = 'XXXXXXX' # 授权码(这个要填自己获取到的)
smtp_server = 'smtp.qq.com' # 固定写死
smtp_port = 465 # 固定端口
# 配置服务器
stmp = smtplib.SMTP_SSL(smtp_server, smtp_port)
stmp.login(from_addr, qqCode)
# 组装发送内容
email_message = email_message + ("点击查看: " + url) + error_message
print(email_message)
message = MIMEText(email_message, 'plain', 'utf-8') # 发送的内容
message['From'] = Header("EYE", 'utf-8') # 发件人
message['To'] = Header("boss", 'utf-8') # 收件人
subject = 'Keep_an_eye_on 计划'
message['Subject'] = Header('Keep_an_eye_on', 'utf-8') # 邮件标题
try:
stmp.sendmail(from_addr, to_addrs, message.as_string())
except Exception as e:
print('邮件发送失败--' + str(e))
print('邮件发送成功')
if __name__ == '__main__':
url = 'https://music.163.com/#/user/event?id=XXXXXXXXX'#所要爬去的网易云动态页面
flag,email_message ,error_message=keep(url)
mail(email_message,url,error_message,flag)
"""整体有三大模块:
1. 爬取模块,使用selenium 爬取网易云的动态上的基本信息
2. 入库模块,使用pymysql 将爬取到的信息存入数据库
3. 邮件模块,使用smtp 将数据发送到用户邮箱以题型
4. 需要添加一个日志模块 保证服务持久运行,报错有据可循
"""
有没有办法在Ruby中动态创建数组?例如,假设我想遍历用户输入的书籍数组:books=gets.chomp用户输入:"TheGreatGatsby,CrimeandPunishment,Dracula,Fahrenheit451,PrideandPrejudice,SenseandSensibility,Slaughterhouse-Five,TheAdventuresofHuckleberryFinn"我把它变成一个数组:books_array=books.split(",")现在,对于用户输入的每一本书,我想用Ruby创建一个数组。伪代码来做到这一点:x=0books_array.
我想在IRB中浏览文件系统并让提示更改以反射(reflect)当前工作目录,但我不知道如何在每个命令后进行提示更新。最终,我想在日常工作中更多地使用IRB,让bash溜走。我在我的.irbrc中试过这个:require'fileutils'includeFileUtilsIRB.conf[:PROMPT][:CUSTOM]={:PROMPT_N=>"\e[1m:\e[m",:PROMPT_I=>"\e[1m#{pwd}>\e[m",:PROMPT_S=>"FOO",:PROMPT_C=>"\e[1m#{pwd}>\e[m",:RETURN=>""}IRB.conf[:PROMPT_MO
首先,我使用的是rails3.1.3和来自master的carrierwavegithub仓库的分支。我使用after_init钩子(Hook)来确定基于属性的字段页面模型实例并为这些字段定义属性访问器将值存储在序列化哈希中(希望它清楚我是什么谈论)。这是我正在做的事情的精简版:classPage省略mount_uploader命令让我可以访问我想要的属性。但是当我安装uploader时出现错误消息说“nil类的未定义新方法”我在源代码中读到有方法read_uploader和扩展模块中的write_uploader。我如何必须覆盖这些来制作mount_uploader命令使用我的“虚拟
我正在尝试动态构建一个多维数组。我想要的基本上是这样的(为简单起见写出来):b=0test=[[]]test[b]这给了我错误:NoMethodError:undefinedmethod`test=[[],[],[]]而且它工作正常,但在我的实际使用中,我不会事先知道需要多少个数组。有一个更好的方法吗?谢谢 最佳答案 不需要像您正在使用的索引变量。只需将每个数组附加到您的test数组:irb>test=[]=>[]irb>test[["a","b","c"]]irb>test[["a","b","c"],["d","e","f"]]
如何只加载map边界内的标记gmaps4rails?当然,在平移和/或缩放后加载新的。与此直接相关的是,如何获取map的当前边界和缩放级别? 最佳答案 我是这样做的,我只在用户完成平移或缩放后替换标记,如果您需要不同的行为,请使用不同的事件监听器:在你看来(index.html.erb):{"zoom"=>15,"auto_adjust"=>false,"detect_location"=>true,"center_on_user"=>true}},false,true)%>在View的底部添加:functiongmaps4rail
如何在对象上调用方法名称的嵌套哈希?例如,给定以下哈希:hash={:a=>{:b=>{:c=>:d}}}我想创建一个方法,给定上面的散列,执行以下操作:object.send(:a).send(:b).send(:c).send(:d)我的想法是我需要从一个未知的关联中获取一个特定的属性(这个方法不知道,但程序员知道)。我希望能够指定一个方法链来以嵌套哈希的形式检索该属性。例如:hash={:manufacturer=>{:addresses=>{:first=>:postal_code}}}car.execute_method_hash(hash)=>90210
我有一个ruby程序,我想接受用户创建的方法,并使用该名称创建一个新方法。我试过这个:defmethod_missing(meth,*args,&block)name=meth.to_sclass我收到以下错误:`define_method':interningemptystring(ArgumentError)in'method_missing'有什么想法吗?谢谢。编辑:我以不同的方式让它工作,但我仍然很好奇如何以这种方式做到这一点。这是我的代码:defmethod_missing(meth,*args,&block)Adder.class_evaldodefine_method
假设我们有A、B、C类。Adefself.inherited(sub)#metaprogramminggoeshere#takeclassthathasjustinheritedclassA#andforfooclassesinjectprepare_foo()as#firstlineofmethodthenrunrestofthecodeenddefprepare_foo#=>prepare_foo()neededhere#somecodeendendBprepare_foo()neededhere#somecodeendend如您所见,我正在尝试将foo_prepare()调用注入
这里我想输出带有动态组名的json而不是单词组@tickets.eachdo|group,v|json.group{json.array!vdo|ticket|json.partial!'tickets/ticket',ticket:ticketend}end@ticket是这样的散列{a:[....],b:[.....]}我想要这样的输出{a:[.....],b:[....]} 最佳答案 感谢@AntarrByrd,这个问题有类似的答案:JBuilderdynamickeysformodelattributes使用上面的逻辑我已经
我正在根据Rakefile中的现有测试文件动态生成测试任务。假设您有各种以模式命名的单元测试文件test_.rb.所以我正在做的是创建一个以“测试”命名空间内的文件名命名的任务。使用下面的代码,我可以用raketest:调用所有测试require'rake/testtask'task:default=>'test:all'namespace:testdodesc"Runalltests"Rake::TestTask.new(:all)do|t|t.test_files=FileList['test_*.rb']endFileList['test_*.rb'].eachdo|task|n