👨💻博客主页:i新木优子👀
🎉欢迎关注🔍点赞👍收藏⭐留言📝
🧚♂️寄语:成功的秘诀就是每天都比别人多努力一点👣
✨有任何疑问欢迎评论探讨
先声明一下:免费的代理稳定性都不高,即使经过层层筛选有些可能还是不能用,就像矮子里拔高的,即使已经是矮子里最高的,可是还是改变不了是矮子的本质
在做任何事情之前我们都需要先思考,要如何实现?需要用到什么?等等一系列的问题都要想清楚,要先将思路理清了,做起事来才能事半功倍
🎯下面是我做这个项目的思路,可能并不是很好,有更好的想法欢迎留言讨论
代理IP池:
自身:
zset有一个特性,他有一个分值(score),我们可以通过控制分值的高低就可以将稳定性高的IP取出来,从而提高免费IP的可用性对外:
思路理清了,接下来就是如何写程序了
采集:写爬虫抓取IP,将IP存储到Redis
校验:从Redis中取出IP,用IP简单发送一个请求,如果可以正常返回,证明该IP可用
提供:写api接口,将可用的IP提供给用户
如果我们按照单线程去完成上面的步骤,就有局限性,只有每次将IP提供给用户,才可以继续采集IP,而我们希望的是这三个步骤互不影响,不管采集、校验还是给用户提供IP,都应该是一直进行,在提供IP的时候也可以继续采集、校验
三个独立的程序,我们就可以用多进程
下图就是IP代理池的模型:

仔细观察上图,三个操作都用到了Redis,所以就先写Redis涉及到的各种操作,再写其他三个功能就可以游刃有余了
1️⃣Redis的各种操作
# redis的各种操作
from redis import Redis
from settings import *
class ProxyRedis:
# 连接redis
def __init__(self):
self.red = Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
password=REDIS_PASSWORD,
decode_responses=True
)
# 存储ip
def add_proxy_ip(self, ip):
# 判断是否有ip
if not self.red.zscore(REDIS_KEY, ip):
self.red.zadd(REDIS_KEY, {ip: DEFAULT_SCORE})
print("采集到了IP地址了", ip)
else:
print("采集到了IP地址了", ip, "但是已经存在")
# 查询所有ip
def get_all_proxy(self):
return self.red.zrange(REDIS_KEY, 0, -1)
# 将分值拉满
def set_max_score(self, ip):
self.red.zadd(REDIS_KEY, {ip: MAX_SCORE})
# 降低分值
def reduce_score(self, ip):
# 查询分值
score = self.red.zscore(REDIS_KEY, ip)
# 如果有分值,扣分
if score > 0:
self.red.zincrby(REDIS_KEY, -10, ip)
else: # 分值没有则删除
self.red.zrem(REDIS_KEY, ip)
# 查询可用ip
def get_avail_proxy(self):
lis = []
ips = self.red.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE, 0, -1)
if ips:
lis.append(ips)
return lis
else:
ips = self.red.zrangebyscore(REDIS_KEY, DEFAULT_SCORE + 1, MAX_SCORE - 1, 0, -1)
if ips:
lis.append(ips)
return lis
else:
print("没有可用ip")
return None
2️⃣采集IP
这里我爬取了三个网站,当然感觉不够用的自己还可以加
快代理:https://www.kuaidaili.com/free/intr/1/
高可用全球免费代理IP库:https://ip.jiangxianli.com/?page=1
66免费代理网:http://www.66ip.cn/areaindex_1/1.html
爬取这些网站很简单,基本都没有什么反爬,页面也都差不多,直接用xpath解析就可以得到想要的IP
# 代理IP的采集
from proxy_redis import ProxyRedis
import requests
from lxml import etree
import time
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"
}
# 采集快代理
def get_kuai_ip(red):
url = "https://www.kuaidaili.com/free/intr/1/"
resp = requests.get(url, headers=headers)
tree = etree.HTML(resp.text)
trs = tree.xpath("//table/tbody/tr")
for tr in trs:
ip = tr.xpath("./td[1]/text()") # ip地址
port = tr.xpath("./td[2]/text()") # 端口
if not ip:
continue
ip = ip[0]
port = port[0]
proxy_ip = ip + ":" + port
red.add_proxy_ip(proxy_ip) # 增加ip地址
# 采集66免费代理网
def get_66_ip(red):
url = "http://www.66ip.cn/areaindex_1/1.html"
resp = requests.get(url, headers=headers)
tree = etree.HTML(resp.text)
trs = tree.xpath("//table//tr")[1:]
for tr in trs:
ip = tr.xpath("./td[1]/text()") # ip地址
port = tr.xpath("./td[2]/text()") # 端口
if not ip:
continue
ip = ip[0]
port = port[0]
proxy_ip = ip + ":" + port
red.add_proxy_ip(proxy_ip) # 增加ip地址
# 采集高可用全球免费代理IP库
def get_quan_ip(red):
url = "https://ip.jiangxianli.com/?page=1"
resp = requests.get(url, headers=headers)
tree = etree.HTML(resp.text)
trs = tree.xpath("//table//tr")
for tr in trs:
ip = tr.xpath("./td[1]/text()") # ip地址
port = tr.xpath("./td[2]/text()") # 端口
if not ip:
continue
ip = ip[0]
port = port[0]
proxy_ip = ip + ":" + port
red.add_proxy_ip(proxy_ip) # 增加ip地址
def run():
red = ProxyRedis() # 创建redis存储
while True:
try:
get_kuai_ip(red) # 采集快代理
get_66_ip(red) # 采集66免费代理
get_quan_ip(red) # 采集全球免费ip代理库
except:
print("出错了")
time.sleep(60) # 每分钟跑一次
if __name__ == '__main__':
run()
3️⃣校验IP可用性
这里如果我们采集的IP比较多的话,用单线程就比较慢了,所以为了提高效率,这里我采用协程# 代理IP的验证
from proxy_redis import ProxyRedis
from settings import *
import asyncio
import aiohttp
import time
async def verify_one(ip, sem, red):
print(f"开始检测{ip}")
timeout = aiohttp.ClientTimeout(total=10) # 设置超时时间,超过10秒就报错
try:
async with sem:
async with aiohttp.ClientSession() as session:
async with session.get("http://www.baidu.com/", proxy="http://" + ip, timeout=timeout) as resp: # 简单发送一个请求
page_source = await resp.text()
if resp.status in [200, 302]: # 验证状态码
# 将分值拉满
red.set_max_score(ip)
print(f"检测到{ip}是可用的")
else:
red.reduce_score(ip)
print(f"检测到{ip}是不可用的, 扣10分")
except Exception as E:
print("ip检验时出错了", E)
red.reduce_score(ip)
print(f"检测到{ip}是不可用的, 扣10分")
async def main(red):
# 查询全部ip
all_proxy = red.get_all_proxy()
sem = asyncio.Semaphore(SEM_COUNT) # 控制并发量
tasks = []
for ip in all_proxy:
tasks.append(asyncio.create_task(verify_one(ip, sem, red)))
if tasks:
await asyncio.wait(tasks)
def run():
red = ProxyRedis()
time.sleep(10)
while True:
try:
asyncio.run(main(red))
time.sleep(100)
except Exception as e:
print("校验时报错了", e)
time.sleep(100)
if __name__ == '__main__':
run()
4️⃣提供api
http://xxx.xxx.xxx.xxx:xxxx/get_proxy就可获取到IPpip install sanic
pip install sanic_cors # 防止出现跨域的模块
# 代理的IP的api接口
from proxy_redis import ProxyRedis
from sanic import Sanic, json
from sanic_cors import CORS
# 1. 创建app
app = Sanic("ip")
# 2. 解决跨域
CORS(app)
red = ProxyRedis()
# 3. 准备处理http请求的函数
@app.route("/get_proxy") # 路由配置
def dispose(rep):
ip_list = red.get_avail_proxy()
return json({"ip": ip_list}) # 返回给客户端
def run():
app.run(host="127.0.0.1", port=5800)
if __name__ == '__main__':
run()
5️⃣启动采集IP、校验IP、提供api
将三个功能串在一起,每一个功能开一个进程
from ip_api import run as api_run
from ip_collection import run as col_run
from ip_verify import run as ver_run
from multiprocessing import Process
def run():
# 启动三个进程
p1 = Process(target=api_run)
p2 = Process(target=col_run)
p3 = Process(target=ver_run)
p1.start()
p2.start()
p3.start()
if __name__ == '__main__':
run()
下面代码是代理IP池的配置文件,想要修改参数的直接修改配置文件中的就行
# 配置文件
# proxy_redis
# redis主机ip地址
REDIS_HOST = "127.0.0.1"
# redis端口号
REDIS_PORT = 6379
# redis数据库编号
REDIS_DB = 2
# redis的密码
REDIS_PASSWORD = "123456"
# redis的key
REDIS_KEY = "proxy_ip"
# 默认的ip分值
DEFAULT_SCORE = 50
# 满分
MAX_SCORE = 100
# ip_verify
# 一次检测ip的数量
SEM_COUNT = 30
6️⃣到这里我们的IP代理池就已经完成了



我们可以看到程序可以正常执行
然后去看一下我们的Redis中是否有IP

我们访问http://127.0.0.1:5800/get_proxy检测用户是否可以拿到IP

7️⃣检验IP代理池中的IP是否可用
免费IP代理池已经搭建好了,接下来就从IP代理池中取出来IP,检测IP是否可以使用
我们的IP有很多,使用这些IP最好的方法是将存放IP的列表进行循环,每拿一个IP访问一次或多次就换一个IP在访问,所以就需要写一个生成器
import requests
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"
}
def get_proxy():
url = "http://127.0.0.1:5800/get_proxy"
resp = requests.get(url, headers=headers)
ips = resp.json()
for ip in ips["ip"][0]:
yield ip # 生成器
def spider():
url = "http://www.baidu.com/"
while True:
try:
proxy_ip = next(gen)
proxy = {
"http:": "http:" + proxy_ip,
"https:": "http:" + proxy_ip
}
resp = requests.get(url, proxies=proxy, headers=headers)
resp.encoding = "utf-8"
return resp.text
except:
print("代理失效了")
if __name__ == '__main__':
gen = get_proxy()
page_source = spider()
print(page_source)
可以拿到页面源代码表示我们的代理IP可用

我有一个存储主机名的Ruby数组server_names。如果我打印出来,它看起来像这样:["hostname.abc.com","hostname2.abc.com","hostname3.abc.com"]相当标准。我想要做的是获取这些服务器的IP(可能将它们存储在另一个变量中)。看起来IPSocket类可以做到这一点,但我不确定如何使用IPSocket类遍历它。如果它只是尝试像这样打印出IP:server_names.eachdo|name|IPSocket::getaddress(name)pnameend它提示我没有提供服务器名称。这是语法问题还是我没有正确使用类?输出:ge
作为新的阿里云用户,您可以50免费试用多种优惠,价值高达1,700美元(或8,500美元)。这将让您了解和体验阿里云平台上提供的一系列产品和服务。如果您以个人身份注册免费试用,您将获得价值1,700美元的优惠。但是,如果您是注册公司,您可以选择企业免费试用,提交基本信息通过企业实名注册验证,即可开始价值$8,500的免费试用!本教程介绍了如何设置您的帐户并使用您的免费试用版。关于免费试用在我们开始此试用之前,您还必须遵守以下条款和条件才能访问您的免费试用:只有在一年内创建的账户才有资格获得阿里云免费试用。通过此免费试用优惠,用户可以免费试用免费试用活动页面上列出的每种产品一次。如果您有多个帐
我是Ruby的新手。我试过查看在线文档,但没有找到任何有效的方法。我想在以下HTTP请求botget_response()和get()中包含一个用户代理。有人可以指出我正确的方向吗?#PreliminarycheckthatProggitisupcheck=Net::HTTP.get_response(URI.parse(proggit_url))ifcheck.code!="200"puts"ErrorcontactingProggit"returnend#Attempttogetthejsonresponse=Net::HTTP.get(URI.parse(proggit_url)
有人知道如何将capybarapoltergeist的用户代理覆盖到移动用户代理以进行测试吗?我发现了一些有关为seleniumwebdriver配置它的信息:http://blog.plataformatec.com.br/2011/03/configuring-user-agents-with-capybara-selenium-webdriver/这在capybara闹鬼中怎么可能? 最佳答案 请参阅poltergeistgithub页面上的链接:https://github.com/teampoltergeist/polte
我正在使用Ruby/Mechanize编写一个“自动填写表格”应用程序。它几乎可以工作。我可以使用精彩CharlesWeb代理以查看服务器和我的Firefox浏览器之间的交换。现在我想使用Charles查看服务器和我的应用程序之间的交换。Charles在端口8888上代理。假设服务器位于https://my.host.com。.一件不起作用的事情是:@agent||=Mechanize.newdo|agent|agent.set_proxy("my.host.com",8888)end这会导致Net::HTTP::Persistent::Error:...lib/net/http/pe
我希望访问我机器上的所有HTTP流量(我的Windows机器-不是服务器)。据我了解,拥有一个本地代理是所有流量路线的必经之路。我一直在谷歌搜索但未能找到任何资源(关于Ruby)来帮助我。非常感谢任何提示或链接。 最佳答案 WEBrick中有一个HTTP代理(Rubystdlib的一部分)和here's一个实现示例。如果你喜欢生活在边缘,还有em-proxy伊利亚·格里戈里克。这postIlya暗示它似乎确实需要一些调整来解决您的问题。 关于ruby-如何捕获所有HTTP流量(本地代理)
我想在Ruby的TCPServer中获取客户端的IP地址。以及(如果可能的话)MAC地址。例如,Ruby中的时间服务器,请参阅评论。tcpserver=TCPServer.new("",80)iftcpserverputs"Listening"loopdosocket=tcpserver.acceptifsocketThread.newdoputs"Connectedfrom"+#HERE!HowcanigettheIPAddressfromtheclient?socket.write(Time.now.to_s)socket.closeendendendend非常感谢!
我正在使用Net::FTPruby库连接到FTP服务器并下载文件。一切正常,但现在我需要使用出站代理,因为他们的防火墙将IP地址列入白名单,并且我正在使用Heroku来托管该站点。我正在试用新的Proximo附加组件,它看起来很有前途,但我无法让Net::FTP使用它。我在Net::FTPdocs中看到以下内容:connect(host,port=FTP_PORT)EstablishesanFTPconnectiontohost,optionallyoverridingthedefaultport.IftheenvironmentvariableSOCKS_SERVERisset,
我想使用nokogiri和mechanize自动化一个计时网络客户端。我需要通过代理服务器连接,但问题是,我不知道所述代理服务器的用户名和密码。我想获取存储在计算机上的此代理的缓存凭据..例如,在c#中你可以使用:stringproxyUri=proxy.GetProxy(requests.RequestUri).ToString();requests.UseDefaultCredentials=true;requests.Proxy=newWebProxy(proxyUri,false);requests.Proxy.Credentials=System.Net.Credential
Nginx在生产中的重要性通常基于它为慢速客户端提供服务的能力;在RESTfulAPI的设置中,它似乎是生产堆栈的一个不必要的层,尤其是Puma(不像广泛使用的unicorn可以处理nginx工作)。Pumacanallowmultipleslowclientstoconnectwithoutrequiringaworkertobeblockedontherequesttransaction.Becauseofthis,Pumahandlesslowclientsgracefully.HerokurecommendsPumaforuseinscenarioswhereyouexpect