123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- #!/usr/bin/env python2
- #
- # eapol_test controller
- # Copyright (c) 2015, Jouni Malinen <j@w1.fi>
- #
- # This software may be distributed under the terms of the BSD license.
- # See README for more details.
- import argparse
- import logging
- import os
- import Queue
- import sys
- import threading
- logger = logging.getLogger()
- dir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__))
- sys.path.append(os.path.join(dir, '..', 'wpaspy'))
- import wpaspy
- wpas_ctrl = '/tmp/eapol_test'
- class eapol_test:
- def __init__(self, ifname):
- self.ifname = ifname
- self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
- if "PONG" not in self.ctrl.request("PING"):
- raise Exception("Failed to connect to eapol_test (%s)" % ifname)
- self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
- self.mon.attach()
- def add_network(self):
- id = self.request("ADD_NETWORK")
- if "FAIL" in id:
- raise Exception("ADD_NETWORK failed")
- return int(id)
- def remove_network(self, id):
- id = self.request("REMOVE_NETWORK " + str(id))
- if "FAIL" in id:
- raise Exception("REMOVE_NETWORK failed")
- return None
- def set_network(self, id, field, value):
- res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
- if "FAIL" in res:
- raise Exception("SET_NETWORK failed")
- return None
- def set_network_quoted(self, id, field, value):
- res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
- if "FAIL" in res:
- raise Exception("SET_NETWORK failed")
- return None
- def request(self, cmd, timeout=10):
- return self.ctrl.request(cmd, timeout=timeout)
- def wait_event(self, events, timeout=10):
- start = os.times()[4]
- while True:
- while self.mon.pending():
- ev = self.mon.recv()
- logger.debug(self.ifname + ": " + ev)
- for event in events:
- if event in ev:
- return ev
- now = os.times()[4]
- remaining = start + timeout - now
- if remaining <= 0:
- break
- if not self.mon.pending(timeout=remaining):
- break
- return None
- def run(ifname, count, no_fast_reauth, res):
- et = eapol_test(ifname)
- et.request("AP_SCAN 0")
- if no_fast_reauth:
- et.request("SET fast_reauth 0")
- else:
- et.request("SET fast_reauth 1")
- id = et.add_network()
- et.set_network(id, "key_mgmt", "IEEE8021X")
- et.set_network(id, "eapol_flags", "0")
- et.set_network(id, "eap", "TLS")
- et.set_network_quoted(id, "identity", "user")
- et.set_network_quoted(id, "ca_cert", 'ca.pem')
- et.set_network_quoted(id, "client_cert", 'client.pem')
- et.set_network_quoted(id, "private_key", 'client.key')
- et.set_network_quoted(id, "private_key_passwd", 'whatever')
- et.set_network(id, "disabled", "0")
- fail = False
- for i in range(count):
- et.request("REASSOCIATE")
- ev = et.wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-EAP-FAILURE"])
- if ev is None or "CTRL-EVENT-CONNECTED" not in ev:
- fail = True
- break
- et.remove_network(id)
- if fail:
- res.put("FAIL (%d OK)" % i)
- else:
- res.put("PASS %d" % (i + 1))
- def main():
- parser = argparse.ArgumentParser(description='eapol_test controller')
- parser.add_argument('--ctrl', help='control interface directory')
- parser.add_argument('--num', help='number of processes')
- parser.add_argument('--iter', help='number of iterations')
- parser.add_argument('--no-fast-reauth', action='store_true',
- dest='no_fast_reauth',
- help='disable TLS session resumption')
- args = parser.parse_args()
- num = int(args.num)
- iter = int(args.iter)
- if args.ctrl:
- global wpas_ctrl
- wpas_ctrl = args.ctrl
- t = {}
- res = {}
- for i in range(num):
- res[i] = Queue.Queue()
- t[i] = threading.Thread(target=run, args=(str(i), iter,
- args.no_fast_reauth, res[i]))
- for i in range(num):
- t[i].start()
- for i in range(num):
- t[i].join()
- try:
- results = res[i].get(False)
- except:
- results = "N/A"
- print "%d: %s" % (i, results)
- if __name__ == "__main__":
- main()
|