123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- # Copyright (c) 2019-2020, Mathy Vanhoef <mathy.vanhoef@nyu.edu>
- #
- # This code may be distributed under the terms of the BSD license.
- # See README for more details.
- from scapy.all import *
- from Crypto.Cipher import AES
- from datetime import datetime
- import binascii
- #### Constants ####
- IEEE_TLV_TYPE_SSID = 0
- IEEE_TLV_TYPE_CHANNEL = 3
- IEEE_TLV_TYPE_RSN = 48
- IEEE_TLV_TYPE_CSA = 37
- IEEE_TLV_TYPE_FT = 55
- IEEE_TLV_TYPE_VENDOR = 221
- WLAN_REASON_DISASSOC_DUE_TO_INACTIVITY = 4
- WLAN_REASON_CLASS2_FRAME_FROM_NONAUTH_STA = 6
- WLAN_REASON_CLASS3_FRAME_FROM_NONASSOC_STA = 7
- #TODO: Not sure if really needed...
- IEEE80211_RADIOTAP_RATE = (1 << 2)
- IEEE80211_RADIOTAP_CHANNEL = (1 << 3)
- IEEE80211_RADIOTAP_TX_FLAGS = (1 << 15)
- IEEE80211_RADIOTAP_DATA_RETRIES = (1 << 17)
- #### Basic output and logging functionality ####
- ALL, DEBUG, INFO, STATUS, WARNING, ERROR = range(6)
- COLORCODES = { "gray" : "\033[0;37m",
- "green" : "\033[0;32m",
- "orange": "\033[0;33m",
- "red" : "\033[0;31m" }
- global_log_level = INFO
- def log(level, msg, color=None, showtime=True):
- if level < global_log_level: return
- if level == DEBUG and color is None: color="gray"
- if level == WARNING and color is None: color="orange"
- if level == ERROR and color is None: color="red"
- msg = (datetime.now().strftime('[%H:%M:%S] ') if showtime else " "*11) + COLORCODES.get(color, "") + msg + "\033[1;0m"
- print(msg)
- def change_log_level(delta):
- global global_log_level
- global_log_level += delta
- #### Back-wards compatibility with older scapy
- if not "Dot11FCS" in locals():
- class Dot11FCS():
- pass
- if not "Dot11Encrypted" in locals():
- class Dot11Encrypted():
- pass
- class Dot11CCMP():
- pass
- class Dot11TKIP():
- pass
- #### Linux ####
- def get_device_driver(iface):
- path = "/sys/class/net/%s/device/driver" % iface
- try:
- output = subprocess.check_output(["readlink", "-f", path])
- return output.decode('utf-8').strip().split("/")[-1]
- except:
- return None
- #### Utility ####
- def get_mac_address(interface):
- return open("/sys/class/net/%s/address" % interface).read().strip()
- def addr2bin(addr):
- return binascii.a2b_hex(addr.replace(':', ''))
- def get_channel(iface):
- output = str(subprocess.check_output(["iw", iface, "info"]))
- p = re.compile("channel (\d+)")
- m = p.search(output)
- if m == None: return None
- return int(m.group(1))
- def get_channel(iface):
- output = str(subprocess.check_output(["iw", iface, "info"]))
- p = re.compile("channel (\d+)")
- m = p.search(output)
- if m == None:
- return None
- return int(m.group(1))
- def set_channel(iface, channel):
- subprocess.check_output(["iw", iface, "set", "channel", str(channel)])
- def set_macaddress(iface, macaddr):
- # macchanger throws an error if the interface already has the given MAC address
- if get_macaddress(iface) != macaddr:
- subprocess.check_output(["ifconfig", iface, "down"])
- subprocess.check_output(["macchanger", "-m", macaddr, iface])
- def get_macaddress(iface):
- """This works even for interfaces in monitor mode."""
- s = get_if_raw_hwaddr(iface)[1]
- return ("%02x:" * 6)[:-1] % tuple(orb(x) for x in s)
- def get_iface_type(iface):
- output = str(subprocess.check_output(["iw", iface, "info"]))
- p = re.compile("type (\w+)")
- return str(p.search(output).group(1))
- def set_monitor_mode(iface, up=True, mtu=1500):
- # Note: we let the user put the device in monitor mode, such that they can control optional
- # parameters such as "iw wlan0 set monitor active" for devices that support it.
- if get_iface_type(iface) != "monitor":
- # Some kernels (Debian jessie - 3.16.0-4-amd64) don't properly add the monitor interface. The following ugly
- # sequence of commands assures the virtual interface is properly registered as a 802.11 monitor interface.
- subprocess.check_output(["ifconfig", iface, "down"])
- subprocess.check_output(["iw", iface, "set", "type", "monitor"])
- time.sleep(0.5)
- subprocess.check_output(["iw", iface, "set", "type", "monitor"])
- if up:
- subprocess.check_output(["ifconfig", iface, "up"])
- subprocess.check_output(["ifconfig", iface, "mtu", str(mtu)])
- def rawmac(addr):
- return bytes.fromhex(addr.replace(':', ''))
- def set_amsdu(p):
- if "A_MSDU_Present" in [field.name for field in Dot11QoS.fields_desc]:
- p.A_MSDU_Present = 1
- else:
- p.Reserved = 1
- def is_amsdu(p):
- if "A_MSDU_Present" in [field.name for field in Dot11QoS.fields_desc]:
- return p.A_MSDU_Present == 1
- else:
- return p.Reserved == 1
- #### Packet Processing Functions ####
- class DHCP_sock(DHCP_am):
- def __init__(self, **kwargs):
- self.sock = kwargs.pop("sock")
- self.server_ip = kwargs["gw"]
- super(DHCP_sock, self).__init__(**kwargs)
- def prealloc_ip(self, clientmac, ip=None):
- """Allocate an IP for the client before it send DHCP requests"""
- if clientmac not in self.leases:
- if ip == None:
- ip = self.pool.pop()
- self.leases[clientmac] = ip
- return self.leases[clientmac]
- def make_reply(self, req):
- rep = super(DHCP_sock, self).make_reply(req)
- # Fix scapy bug: set broadcast IP if required
- if rep is not None and BOOTP in req and IP in rep:
- if req[BOOTP].flags & 0x8000 != 0 and req[BOOTP].giaddr == '0.0.0.0' and req[BOOTP].ciaddr == '0.0.0.0':
- rep[IP].dst = "255.255.255.255"
- # Explicitly set source IP if requested
- if not self.server_ip is None:
- rep[IP].src = self.server_ip
- return rep
- def send_reply(self, reply):
- self.sock.send(reply, **self.optsend)
- def print_reply(self, req, reply):
- log(STATUS, "%s: DHCP reply %s to %s" % (reply.getlayer(Ether).dst, reply.getlayer(BOOTP).yiaddr, reply.dst), color="green")
- def remove_client(self, clientmac):
- clientip = self.leases[clientmac]
- self.pool.append(clientip)
- del self.leases[clientmac]
- class ARP_sock(ARP_am):
- def __init__(self, **kwargs):
- self.sock = kwargs.pop("sock")
- super(ARP_am, self).__init__(**kwargs)
- def send_reply(self, reply):
- self.sock.send(reply, **self.optsend)
- def print_reply(self, req, reply):
- log(STATUS, "%s: ARP: %s ==> %s on %s" % (reply.getlayer(Ether).dst, req.summary(), reply.summary(), self.iff))
- #### Packet Processing Functions ####
- # Compatibility with older Scapy versions
- if not "ORDER" in scapy.layers.dot11._rt_txflags:
- scapy.layers.dot11._rt_txflags.append("ORDER")
- class MonitorSocket(L2Socket):
- def __init__(self, iface, dumpfile=None, detect_injected=False, **kwargs):
- super(MonitorSocket, self).__init__(iface, **kwargs)
- self.pcap = None
- if dumpfile:
- self.pcap = PcapWriter("%s.%s.pcap" % (dumpfile, self.iface), append=False, sync=True)
- self.detect_injected = detect_injected
- self.default_rate = None
- def set_channel(self, channel):
- subprocess.check_output(["iw", self.iface, "set", "channel", str(channel)])
- def attach_filter(self, bpf):
- log(DEBUG, "Attaching filter to %s: <%s>" % (self.iface, bpf))
- attach_filter(self.ins, bpf, self.iface)
- def set_default_rate(self, rate):
- self.default_rate = rate
- def send(self, p, rate=None):
- # Hack: set the More Data flag so we can detect injected frames (and so clients stay awake longer)
- if self.detect_injected:
- p.FCfield |= 0x20
- # Control data rate injected frames
- if rate is None and self.default_rate is None:
- rtap = RadioTap(present="TXFlags", TXFlags="NOSEQ+ORDER")
- else:
- use_rate = rate if rate != None else self.default_rate
- rtap = RadioTap(present="TXFlags+Rate", Rate=use_rate, TXFlags="NOSEQ+ORDER")
- L2Socket.send(self, rtap/p)
- if self.pcap: self.pcap.write(RadioTap()/p)
- def _strip_fcs(self, p):
- """
- Scapy may throw exceptions when handling malformed short frames,
- so we need to catch these exceptions and just ignore these frames.
- This in particular happened with short malformed beacons.
- """
- try:
- return Dot11(raw(p[Dot11FCS])[:-4])
- except:
- return None
- def _detect_and_strip_fcs(self, p):
- # Older scapy can't handle the optional Frame Check Sequence (FCS) field automatically
- if p[RadioTap].present & 2 != 0 and not Dot11FCS in p:
- rawframe = raw(p[RadioTap])
- pos = 8
- while orb(rawframe[pos - 1]) & 0x80 != 0: pos += 4
- # If the TSFT field is present, it must be 8-bytes aligned
- if p[RadioTap].present & 1 != 0:
- pos += (8 - (pos % 8))
- pos += 8
- # Remove FCS if present
- if orb(rawframe[pos]) & 0x10 != 0:
- return self._strip_fcs(p)
- return p[Dot11]
- def recv(self, x=MTU, reflected=False):
- p = L2Socket.recv(self, x)
- if p == None or not (Dot11 in p or Dot11FCS in p):
- return None
- if self.pcap:
- self.pcap.write(p)
- # Hack: ignore frames that we just injected and are echoed back by the kernel
- if self.detect_injected and p.FCfield & 0x20 != 0:
- return None
- # Ignore reflection of injected frames. These have a small RadioTap header.
- if not reflected and p[RadioTap].len < 13:
- return None
- # Strip the FCS if present, and drop the RadioTap header
- if Dot11FCS in p:
- return self._strip_fcs(p)
- else:
- return self._detect_and_strip_fcs(p)
- def close(self):
- if self.pcap: self.pcap.close()
- super(MonitorSocket, self).close()
- # For backwards compatibility
- class MitmSocket(MonitorSocket):
- pass
- def dot11_get_seqnum(p):
- return p.SC >> 4
- def dot11_is_encrypted_data(p):
- # All these different cases are explicitly tested to handle older scapy versions
- return (p.FCfield & 0x40) or Dot11CCMP in p or Dot11TKIP in p or Dot11WEP in p or Dot11Encrypted in p
- def payload_to_iv(payload):
- iv0 = payload[0]
- iv1 = payload[1]
- wepdata = payload[4:8]
- # FIXME: Only CCMP is supported (TKIP uses a different IV structure)
- return orb(iv0) + (orb(iv1) << 8) + (struct.unpack(">I", wepdata)[0] << 16)
- def dot11_get_iv(p):
- """
- Assume it's a CCMP frame. Old scapy can't handle Extended IVs.
- This code only works for CCMP frames.
- """
- if Dot11CCMP in p:
- payload = raw(p[Dot11CCMP])
- return payload_to_iv(payload)
- elif Dot11TKIP in p:
- # Scapy uses a heuristic to differentiate CCMP/TKIP and this may be wrong.
- # So even when we get a Dot11TKIP frame, we should treat it like a Dot11CCMP frame.
- payload = raw(p[Dot11TKIP])
- return payload_to_iv(payload)
- if Dot11CCMP in p:
- payload = raw(p[Dot11CCMP])
- return payload_to_iv(payload)
- elif Dot11TKIP in p:
- payload = raw(p[Dot11TKIP])
- return payload_to_iv(payload)
- elif Dot11Encrypted in p:
- payload = raw(p[Dot11Encrypted])
- return payload_to_iv(payload)
- elif Dot11WEP in p:
- wep = p[Dot11WEP]
- if wep.keyid & 32:
- # FIXME: Only CCMP is supported (TKIP uses a different IV structure)
- return orb(wep.iv[0]) + (orb(wep.iv[1]) << 8) + (struct.unpack(">I", wep.wepdata[:4])[0] << 16)
- else:
- return orb(wep.iv[0]) + (orb(wep.iv[1]) << 8) + (orb(wep.iv[2]) << 16)
- elif p.FCfield & 0x40:
- return payload_to_iv(p[Raw].load)
- else:
- return None
- def dot11_get_priority(p):
- if not Dot11QoS in p: return 0
- return p[Dot11QoS].TID
- #### Crypto functions and util ####
- def get_ccmp_keyid(p):
- if Dot11WEP in p:
- return p.keyid
- return p.key_id
- def get_ccmp_payload(p):
- if Dot11WEP in p:
- # Extract encrypted payload:
- # - Skip extended IV (4 bytes in total)
- # - Exclude first 4 bytes of the CCMP MIC (note that last 4 are saved in the WEP ICV field)
- return raw(p.wepdata[4:-4])
- elif Dot11CCMP in p:
- return p[Dot11CCMP].data
- elif Dot11TKIP in p:
- return p[Dot11TKIP].data
- elif Dot11Encrypted in p:
- return p[Dot11Encrypted].data
- elif Raw in p:
- return p[Raw].load
- else:
- return None
- class IvInfo():
- def __init__(self, p):
- self.iv = dot11_get_iv(p)
- self.seq = dot11_get_seqnum(p)
- self.time = p.time
- def is_reused(self, p):
- """Return true if frame p reuses an IV and if p is not a retransmitted frame"""
- iv = dot11_get_iv(p)
- seq = dot11_get_seqnum(p)
- return self.iv == iv and self.seq != seq and p.time >= self.time + 1
- class IvCollection():
- def __init__(self):
- self.ivs = dict() # maps IV values to IvInfo objects
- def reset(self):
- self.ivs = dict()
- def track_used_iv(self, p):
- iv = dot11_get_iv(p)
- self.ivs[iv] = IvInfo(p)
- def is_iv_reused(self, p):
- """Returns True if this is an *observed* IV reuse and not just a retransmission"""
- iv = dot11_get_iv(p)
- return iv in self.ivs and self.ivs[iv].is_reused(p)
- def is_new_iv(self, p):
- """Returns True if the IV in this frame is higher than all previously observed ones"""
- iv = dot11_get_iv(p)
- if len(self.ivs) == 0: return True
- return iv > max(self.ivs.keys())
- def create_fragments(header, data, num_frags):
- # This special case is useful so scapy keeps the full "interpretation" of the frame
- # instead of afterwards treating/displaying the payload as just raw data.
- if num_frags == 1: return [header/data]
- data = raw(data)
- fragments = []
- fragsize = (len(data) + num_frags - 1) // num_frags
- for i in range(num_frags):
- frag = header.copy()
- frag.SC |= i
- if i < num_frags - 1:
- frag.FCfield |= Dot11(FCfield="MF").FCfield
- payload = data[fragsize * i : fragsize * (i + 1)]
- frag = frag/Raw(payload)
- fragments.append(frag)
- return fragments
- def get_element(el, id):
- if not Dot11Elt in el: return None
- el = el[Dot11Elt]
- while not el is None:
- if el.ID == id:
- return el
- el = el.payload
- return None
- def get_ssid(beacon):
- if not (Dot11 in beacon or Dot11FCS in beacon): return
- if Dot11Elt not in beacon: return
- if beacon[Dot11].type != 0 and beacon[Dot11].subtype != 8: return
- el = get_element(beacon, IEEE_TLV_TYPE_SSID)
- return el.info.decode()
- def is_from_sta(p, macaddr):
- if not (Dot11 in p or Dot11FCS in p):
- return False
- if p.addr1 != macaddr and p.addr2 != macaddr:
- return False
- return True
- def get_bss(iface, clientmac, timeout=20):
- ps = sniff(count=1, timeout=timeout, lfilter=lambda p: is_from_sta(p, clientmac), iface=iface)
- if len(ps) == 0:
- return None
- return ps[0].addr1 if ps[0].addr1 != clientmac else ps[0].addr2
- def create_msdu_subframe(src, dst, payload, last=False):
- length = len(payload)
- p = Ether(dst=dst, src=src, type=length)
- payload = raw(payload)
- total_length = len(p) + len(payload)
- padding = ""
- if not last and total_length % 4 != 0:
- padding = b"\x00" * (4 - (total_length % 4))
- return p / payload / Raw(padding)
- def find_network(iface, ssid):
- ps = sniff(count=1, timeout=0.3, lfilter=lambda p: get_ssid(p) == ssid, iface=iface)
- if ps is None or len(ps) < 1:
- log(STATUS, "Searching for target network on other channels")
- for chan in [1, 6, 11, 3, 8, 2, 7, 4, 10, 5, 9, 12, 13]:
- set_channel(iface, chan)
- log(DEBUG, "Listening on channel %d" % chan)
- ps = sniff(count=1, timeout=0.3, lfilter=lambda p: get_ssid(p) == ssid, iface=iface)
- if ps and len(ps) >= 1: break
- if ps and len(ps) >= 1:
- # Even though we capture the beacon we might still be on another channel,
- # so it's important to explicitly switch to the correct channel.
- actual_chan = orb(get_element(ps[0], IEEE_TLV_TYPE_CHANNEL).info)
- set_channel(iface, actual_chan)
- # Return the beacon that we captured
- return ps[0]
- return None
|