# Copyright (c) 2017, Mathy Vanhoef # # This code may be distributed under the terms of the BSD license. # See README for more details. from scapy.all import * #### Packet Processing Functions #### class MitmSocket(L2Socket): def __init__(self, **kwargs): super(MitmSocket, self).__init__(**kwargs) def send(self, p): # Hack: set the More Data flag so we can detect injected frames (and so clients stay awake longer) p[Dot11].FCfield |= 0x20 L2Socket.send(self, RadioTap()/p) def _strip_fcs(self, p): # Scapy can't handle the optional Frame Check Sequence (FCS) field automatically if p[RadioTap].present & 2 != 0: rawframe = str(p[RadioTap]) pos = 8 while ord(rawframe[pos - 1]) & 0x80 != 0: pos += 4 # If the TSFT field is present, it must be 8-bytes aligned if p[RadioTap].present & 1 != 0: pos += (8 - (pos % 8)) pos += 8 # Remove FCS if present if ord(rawframe[pos]) & 0x10 != 0: return Dot11(str(p[Dot11])[:-4]) return p[Dot11] def recv(self, x=MTU): p = L2Socket.recv(self, x) if p == None or not Dot11 in p: return None # Hack: ignore frames that we just injected and are echoed back by the kernel if p[Dot11].FCfield & 0x20 != 0: return None # Strip the FCS if present, and drop the RadioTap header return self._strip_fcs(p) def close(self): super(MitmSocket, self).close() def dot11_get_seqnum(p): return p[Dot11].SC >> 4 def dot11_get_iv(p): """Scapy can't handle Extended IVs, so do this properly ourselves (only works for CCMP)""" if Dot11WEP not in p: log(ERROR, "INTERNAL ERROR: Requested IV of plaintext frame") return 0 wep = p[Dot11WEP] if wep.keyid & 32: return ord(wep.iv[0]) + (ord(wep.iv[1]) << 8) + (struct.unpack(">I", wep.wepdata[:4])[0] << 16) else: return ord(wep.iv[0]) + (ord(wep.iv[1]) << 8) + (ord(wep.iv[2]) << 16) def get_tlv_value(p, type): if not Dot11Elt in p: return None el = p[Dot11Elt] while isinstance(el, Dot11Elt): if el.ID == type: return el.info el = el.payload return None class IvInfo(): def __init__(self, p): self.iv = dot11_get_iv(p) self.seq = dot11_get_seqnum(p) self.time = p.time def is_reused(self, p): """Return true if frame p reuses an IV and if p is not a retransmitted frame""" iv = dot11_get_iv(p) seq = dot11_get_seqnum(p) return self.iv == iv and self.seq != seq and p.time >= self.time + 1 class IvCollection(): def __init__(self): self.ivs = dict() # maps IV values to IvInfo objects def reset(self): self.ivs = dict() def track_used_iv(self, p): iv = dot11_get_iv(p) self.ivs[iv] = IvInfo(p) def is_iv_reused(self, p): """Returns True if this is an *observed* IV reuse and not just a retransmission""" iv = dot11_get_iv(p) return iv in self.ivs and self.ivs[iv].is_reused(p) def is_new_iv(self, p): """Returns True if the IV in this frame is higher than all previously observed ones""" iv = dot11_get_iv(p) if len(self.ivs) == 0: return True return iv > max(self.ivs.keys())