前言
本文主要依据官网文档(英文)和源码相关函数用法编写。如有错误,欢迎反馈。
scapy包介绍
scapy包模块主要用来构造或者伪造网络中的各种数据报文。提供了从Ether层、IP层、传输层(UDP/TCP)、数据层各层的数据报文字段的构造方法。也可以用来解析数据报文。并能实现伪造异常报文,网络攻击,探测等功能。
scapy模块还支持构造IPv6报文,802.11无线报文以及蓝牙报文(只支持Linux系统)。
安装
python3.6,scapy最新版本2.4.0,本文根据此版本编写。
用法
本文主要具体通过查看scapy包模块下的相关源码、查看官方文档以及开发工具程序实践,等方式总结scapy包的一些常用的用法。
涉及到更高深或更复杂的用法,请点击这里查看官方文档。
构造常见协议报文
site-packages/scapy/layers下,包含构造各种协议报文的方法。有构造其他协议报文需求的读者建议可以去看这下面的源码。
对于报文类,如果想知道报文类提供了字段选项,可以用ls()方法查看,如下所示
构造ICMP报文
1 2
| str='abdndjafn' pkt=IP(src,dst=192.168.1.1)/ICMP(type=8)/Raw(str)
|
构造DHCP报文
1
| >>> pkt=Ether(src=self.smac,dst="ff:ff:ff:ff:ff:ff")/IP(src=self.sip,dst="255.255.255.255")/UDP(sport=68,dport=67)/BOOTP(chaddr=[mac2str(self.smac)],ciaddr=self.ciaddr,xid=translate_id)/DHCP(options=[("message-type","request"),("hostname",self.hostname),"end",('pad',self.pad)])
|
构造IGMPv2报文
1 2
| from scapy.contrib.igmp import * pkt=Ether(src,dst)/ip(src,dst)/IGMP(type=0x11,mrode=20,gaddr="")/Padding('000000000')
|
构造IGMPv3报文
1 2 3 4 5 6 7
| from scapy.contrib.igmp import * from scapy.contrib.igmp import * a=Ether() b=IP(src,dst) c=IGMPv3(type=0x11,mrcode=10) d=IGMPv3mq(gaddr='',qqic=125,qrv=2,srcaddrs=['192.168.10.1','192.168.11.1']) pkt=a/b/c/d
|
构造数据传输UDP报文
1
| pkt=Ether(src,dst)/ip(src,dst)/UDP(sport,dport)/Raw()
|
构造数据传输TCP报文
1
| >>> pkt=Ether(src,dst)/ip(src,dst)/TCP(sport,dport)/Raw()
|
构造PPPoE报文
1
| >>> pkt=Ether(src,dst)/PPP()
|
报文长度填充
报文长度填充实例如下所示
1 2
| >>> Padding(str='11111111') >>> Raw(load='advd')
|
具体参考源码site-packages/scapy/packet.py
常用工具函数
列出所有scapy中的命令或函数
打印查看某个函数或者类的帮助信息
查看报文字段信息
显示一个报文摘要
展示报文内容
1 2
| >>> pkt.show() >>> pkt.display()
|
显示聚合的数据包(例如,计算好了校验和的)
返回可以生产数据包的Scapy命令
traceroute方法
1
| >>> Traceroute('114.114.114.114')
|
发送报文
发送报文方法的源码路径
1
| site-packages/scapy/sendrecv.py
|
send()方法
三层以上,不能指定网络接口。
当loop=1时,一直发包。以下例子为发10个报文,报文间隔为1s。
1
| send(pkt,loop=0,inter=1,count=10)
|
sendp()方法
工作在2层,发包时必须指定网络接口。其他参数与send()方法一致。
1
| sendp(pkt,iface,loop=0,inter=1,count=5)
|
sendpfast()方法
工作在二层,可以指定网络接口和速率发包。Windows平台需要其他依赖库。
1
| sendpfast(pkt,iface,pps,mbps,loop=0)
|
sr()方法
发送数据包和接收响应,工作在3层(IP和ARP)。该函数返回有回应的数据包和没有回应的数据包。返回的两个列表数据,第一个就是发送的数据包及其应答组成的列表,第二个是无应答数据包组成的列表。
1 2 3
| >>> sr(pkt) >>> ans,unans=_ >>> ans.summary()
|
sr1()方法
与sr类似,工作在3层(IP和ARP)。但它只返回应答发送的分组(或分组集),用来返回一个应答数据包。
1 2 3 4 5 6
| >>> pkt1=IP()/ICMP()/Padding(str) >>> p=sr1(pkt) >>> p.show() >>> pkt=IP(dst=‘192.168.1.1’)/UDP()/DNS(rd=1,qd=DNSQR(qname=‘www.baidu.com’)) >>> ans=sr1(pkt_dns) >>> ans.show()
|
srp()方法
工作在二层(Ether,802.3),返回2个列表数据,第一个为返回结果,第二个无应答数据包组成
1 2 3
| >>> pkt=Ether(dst=“ff:ff:ff:ff:ff:ff”)/ARP(pdst=“192.168.1.0/24”,timeout=2) >>> ans,unans=srp(pkt) >>> ans.show()
|
数据包处理
嗅探报文
1
| sniff(count=0,offline=None,store=True,prn=None,filter,timeout,iface)
|
- count=200 抓到200个报文,即停止嗅探报文
- offline=‘D:\2019H1work\igmp.pcap’ 解析本地cap文件
- prn 为报文处理函数,回调此函数对抓到的报文进行处理
- filter=‘arp or icmp or (udp and src port 68 and dst port 67)’ 过滤条件,伯克利语法
- timeout 抓包时间,默认为None
- store=0 避免将所有的数据包存储在内存。
保存数据报文
1 2
| pkts=sniff(filter=‘arp or icmp’,iface=‘eth1’,timeout=120) wrpcap(‘test.cap’,pkts)
|
读取本地数据报文
有2种方法可以实现读取本地数据包,如下
1 2
| pkts= rdpcap(‘D:\\2019H1work\\igmp.pcap’ ) pkts= sniff(offline=‘D:\\2019H1work\\igmp.pcap’ )
|
数据包内容提取
其实在sniff()嗅探报文或者rdpcap()读取报文的时候,报文已经自动解析完成了。
我们可以直接获取报文不同层的字段值
1 2 3 4 5 6 7
| >>> pkts=sniff(filter=‘igmp’,iface=‘eth1’) >>> type(pkts) >>> pkts.show() >>> pkts[0][IP].show >>> pkts[0][Ether].src >>> pkts[IGMP].show() >>> pkts[IGMP][0][IGMP].show()
|
代码实例
实例一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
| import psutil from scapy.config import * from scapy.layers.all import * from scapy.packet import * from scapy.contrib.igmp import * from scapy.sendrecv import * import wmi class IgmpSer(object): def __init__(self): self.sip='192.168.11.1' self.pro_dip='224.0.0.1' self.data_dip='239.192.45.86' self.data_dmac='' self.inter=1 self.iface_name='test' self.iface="Intel(R) 82579LM Gigabit Network Connection" def select_iface(self): name_list=[] ifaces=psutil.net_if_addrs() for k,v in ifaces.items(): for item in v: if item[0] == 2 and not item[1] == '127.0.0.1': iface.append(k) elif item[0] == -1 and not item[1] == '00-00-00-00-00-00-00-E0': pass return name_list def get_mac_addr(self,iface): mac={} ifaces=psutil.net_if_addrs() for k,v in ifaces.items(): for item in v: if k == iface and item[0] == -1 and not item[1] == '00-00-00-00-00-00-00-E0': mac=item[1] mac=mac.replace("-",":") return mac def get_iface_desc(self): iface_list=[] w=wmi.WMI() iface=w.Win32_NetworkAdapterConfiguration (IPEnabled=1) for interface in iface: iface_desc=interface.Description iface_list.append(iface_desc) return iface_list def get_multicast_mac(self,ip): mac_s='01:00:5e:' ip_list=ip.split('.')[1:] num_list=list(map(int,ip_list)) i=0 if num_list[0] >= 128: num_list[0]=num_list[0]-128 for v in num_list: str=hex(v) if len(str)==3: str=str.replace("0x","0") num_list[i]=str elif len(str)==4: str=str.replace("0x","") num_list[i]=str i=i+1 mac_end=':'.join(num_list) mac_addr=mac_s+mac_end return mac_addr def general_query(self): srcmac=self.get_mac_addr(self.iface_name) dstmac=self.get_multicast_mac('224.0.0.1') str='00000000000000' a=Ether(src=srcmac,dst=dstmac) b=IP(src=self.sip,dst='224.0.0.1') c=IGMP(type=0x11,gaddr='224.2.2.4') pkt=a/b/c a=Ether(src=src_mac,dst='') b=IP(src='',dst='1.1.1.1') c=IGMP(type=0x11,gaddr='') x=a/b/c x[IGMP].igmpize() sendp(pkt,iface=self.iface,verbose=False) def general_query_speical(self): src_mac=self.get_mac_addr(self.iface) print('src_mac',src_mac) dst_mac=self.get_multicast_mac(self.data_dip) print('dst_mac',dst_mac) a1=Ether(src,dst) b1=IP(src='',dst='224.0.0.1') c1=IGMP(type=0x11,gaddr=self.data_dip) pkt=a1/b1/c1/Padding(str) a=Ether() b=IP(src=self.sip,dst=self.data_dip) c=IGMP(type=0x11,gaddr=self.data_dip) pkt=a/b/c sendp(pkt,iface=self.iface,inter=1,count=4) def udp_stream(self,**kwrgs): dst_mac=self.get_multicast_mac('0.0.0.0') src_mac=self.get_mac_addr(self.iface) print(src_mac) str='0000000000000' a=Ether() b=IP(src=self.sip,dst='0.0.0.0',flags=2) c=UDP(sport=7411,dport=7411) pkt=a/b/c/Raw(load=("AAAAAAAaaaaaaa") + str) sendp(pkt,iface=self.iface,loop=1,inter=2) def ipv4_stream(self): dst_mac=self.get_multicast_mac(self.data_dip) src_mac=self.get_mac_addr(self.iface) str='00000000000000000000000000' a=Ether(src=src_mac,dst=dst_mac) b=IP(src=self.sip,dst=self.data_dip) pkt=a/b/Padding(str) sendp(pkt,iface=self.iface,loop=1,inter=1) def sniff_pkt(self): sniff(filter='',store=0,timeout=20,prn=Packet.summary,iface=self.iface) def parser_pkt(self,pkt): pass if __name__ == '__main__': igmpser=IgmpSer() igmpser.general_query_speical()
|
实例二
工具应用
PyQt和scapy库开发发包工具,请访问我的Github。
技术难点
1、怎么解决在windows系统下选择指定网卡接口进行发包、嗅探的问题
cmd中输入命令wmic ->NIC ,查看网卡接口的Index。sendp(pkt,iface=’eth7’)
2、怎么解决UDP报文长度过大,能自动分片的问题
3、sendpfast()方法发送大流量,报type类型错误的问题
4、怎么解决拉起线程,持续监听报文,解析报文的问题