-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscanner.py
More file actions
150 lines (104 loc) · 4.06 KB
/
scanner.py
File metadata and controls
150 lines (104 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#coding=utf-8
import socket
import os
import struct
from ctypes import *
import threading
import time
from netaddr import IPNetwork,IPAddress
# 监听的主机
host = "192.168.0.105"
# 扫描的目标子网
subnet = "192.168.0.0/24"
# 自定义的字符串,我们将在ICMP响应中进行核对
magic_message = "PYTHONRULES!"
# 批量发送UDP数据包
def udp_sender(subnet, magic_message):
time.sleep(5)
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for ip in IPNetwork(subnet):
try:
sender.sendto(magic_message, ("%s" % ip, 65212))
except:
pass
# IP头定义
class IP(Structure):
_fields_ = [
("ihl", c_ubyte, 4),
("version", c_ubyte, 4),
("tos", c_ubyte),
("len", c_ushort),
("id", c_ushort),
("offset", c_ushort),
("ttl", c_ubyte),
("protocol_num", c_ubyte),
("sum", c_ushort),
("src", c_ulong),
("dst", c_ulong)
]
def __new__(self, socket_buffer=None):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer=None):
# 协议字段与协议名称对应
self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
# 可读性更强的IP地址
self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
# 协议类型
try:
self.protocol = self.protocol_map[self.protocol_num]
except:
self.protocol = str(self.protocol_num)
# ICMP头定义
class ICMP(Structure):
_fields_ = [
("type", c_ubyte),
("code", c_ubyte),
("checksum", c_ushort),
("unused", c_ushort),
("next_hop_mtu", c_ushort)
]
def __new__(self, socket_buffer):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer):
pass
# 下面的代码类似于之前的例子
if os.name == "nt":
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind((host,0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
t = threading.Thread(target=udp_sender,args=(subnet, magic_message))
t.start()
try:
while True:
# 读取数据包
raw_buffer = sniffer.recvfrom(65565)[0]
# 将缓冲区的前20个字节按IP头进行解析
ip_header = IP(raw_buffer[0:20])
# 输出协议和通信双方IP地址
# print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)
# 如果为ICMP,进行处理
if ip_header.protocol == "ICMP":
# 计算ICMP包的起始位置
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + sizeof(ICMP)]
# 解析ICMP数据
icmp_header = ICMP(buf)
# print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code)
# 检查类型和代码值是否为3
if icmp_header.code == 3 and icmp_header.type == 3:
# 确认相应的主机在我们的目标子网内
if IPAddress(ip_header.src_address) in IPNetwork(subnet):
# 确认ICMP数据中包含我们发送的自定义字符串
if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message:
print "Host Up: %s" % ip_header.src_address
# 处理CTRL-C
except KeyboardInterrupt:
# 如果运行在Windows上,关闭混杂模式
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)