123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854 |
- # wpa_supplicant D-Bus old interface tests
- # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
- #
- # This software may be distributed under the terms of the BSD license.
- # See README for more details.
- import logging
- logger = logging.getLogger()
- try:
- import gobject
- import dbus
- dbus_imported = True
- except ImportError:
- dbus_imported = False
- import hostapd
- from utils import HwsimSkip
- from test_dbus import TestDbus, alloc_fail_dbus, start_ap
- WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant"
- WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant"
- WPAS_DBUS_OLD_IFACE = "fi.epitest.hostap.WPASupplicant.Interface"
- WPAS_DBUS_OLD_BSSID = "fi.epitest.hostap.WPASupplicant.BSSID"
- WPAS_DBUS_OLD_NETWORK = "fi.epitest.hostap.WPASupplicant.Network"
- def prepare_dbus(dev):
- if not dbus_imported:
- raise HwsimSkip("No dbus module available")
- try:
- from dbus.mainloop.glib import DBusGMainLoop
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SystemBus()
- wpas_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, WPAS_DBUS_OLD_PATH)
- wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
- path = wpas.getInterface(dev.ifname)
- if_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
- return (bus,wpas_obj,path,if_obj)
- except Exception, e:
- raise HwsimSkip("Could not connect to D-Bus: %s" % e)
- class TestDbusOldWps(TestDbus):
- def __init__(self, bus):
- TestDbus.__init__(self, bus)
- self.event_ok = False
- def __enter__(self):
- gobject.timeout_add(1, self.run_wps)
- gobject.timeout_add(15000, self.timeout)
- self.add_signal(self.wpsCred, WPAS_DBUS_OLD_IFACE, "WpsCred")
- self.loop.run()
- return self
- def wpsCred(self, cred):
- logger.debug("wpsCred: " + str(cred))
- self.event_ok = True
- self.loop.quit()
- def success(self):
- return self.event_ok
- def test_dbus_old(dev, apdev):
- """The old D-Bus interface"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- res = if_obj.capabilities(dbus_interface=WPAS_DBUS_OLD_IFACE)
- logger.debug("capabilities(): " + str(res))
- if 'auth_alg' not in res or "OPEN" not in res['auth_alg']:
- raise Exception("Unexpected capabilities")
- res2 = if_obj.capabilities(dbus.Boolean(True),
- dbus_interface=WPAS_DBUS_OLD_IFACE)
- logger.debug("capabilities(strict): " + str(res2))
- res = if_obj.state(dbus_interface=WPAS_DBUS_OLD_IFACE)
- logger.debug("State: " + res)
- res = if_obj.scanning(dbus_interface=WPAS_DBUS_OLD_IFACE)
- if res != 0:
- raise Exception("Unexpected scanning: " + str(res))
- if_obj.setAPScan(dbus.UInt32(1), dbus_interface=WPAS_DBUS_OLD_IFACE)
- for t in [ dbus.UInt32(123), "foo" ]:
- try:
- if_obj.setAPScan(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid setAPScan() accepted")
- except dbus.exceptions.DBusException, e:
- if "InvalidOptions" not in str(e):
- raise Exception("Unexpected error message for invalid setAPScan: " + str(e))
- for p in [ path + "/Networks/12345",
- path + "/Networks/foo" ]:
- obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
- try:
- obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
- raise Exception("Invalid disable() accepted")
- except dbus.exceptions.DBusException, e:
- if "InvalidNetwork" not in str(e):
- raise Exception("Unexpected error message for invalid disable: " + str(e))
- for p in [ path + "/BSSIDs/foo",
- path + "/BSSIDs/001122334455"]:
- obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
- try:
- obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID)
- raise Exception("Invalid properties() accepted")
- except dbus.exceptions.DBusException, e:
- if "InvalidBSSID" not in str(e):
- raise Exception("Unexpected error message for invalid properties: " + str(e))
- def test_dbus_old_scan(dev, apdev):
- """The old D-Bus interface - scanning"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
- params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
- params['wpa'] = '3'
- hapd2 = hostapd.add_ap(apdev[1], params)
- class TestDbusScan(TestDbus):
- def __init__(self, bus):
- TestDbus.__init__(self, bus)
- self.scan_completed = False
- def __enter__(self):
- gobject.timeout_add(1, self.run_scan)
- gobject.timeout_add(7000, self.timeout)
- self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
- "ScanResultsAvailable")
- self.loop.run()
- return self
- def scanDone(self):
- logger.debug("scanDone")
- self.scan_completed = True
- self.loop.quit()
- def run_scan(self, *args):
- logger.debug("run_scan")
- if not if_obj.scan(dbus_interface=WPAS_DBUS_OLD_IFACE):
- raise Exception("Failed to trigger scan")
- return False
- def success(self):
- return self.scan_completed
- with TestDbusScan(bus) as t:
- if not t.success():
- raise Exception("Expected signals not seen")
- res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
- if len(res) != 2:
- raise Exception("Unexpected number of scan results: " + str(res))
- for i in range(2):
- logger.debug("Scan result BSS path: " + res[i])
- bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
- bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
- byte_arrays=True)
- logger.debug("BSS: " + str(bss))
- obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[0])
- try:
- bss_obj.properties2(dbus_interface=WPAS_DBUS_OLD_BSSID)
- raise Exception("Unknown BSSID method accepted")
- except Exception, e:
- logger.debug("Unknown BSSID method exception: " + str(e))
- if not if_obj.flush(0, dbus_interface=WPAS_DBUS_OLD_IFACE):
- raise Exception("Failed to issue flush(0)")
- res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
- if len(res) != 0:
- raise Exception("Unexpected BSS entry after flush")
- if not if_obj.flush(1, dbus_interface=WPAS_DBUS_OLD_IFACE):
- raise Exception("Failed to issue flush(1)")
- try:
- if_obj.flush("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid flush arguments accepted")
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
- raise Exception("Unexpected error message for invalid flush: " + str(e))
- try:
- bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
- byte_arrays=True)
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidBSSID"):
- raise Exception("Unexpected error message for invalid BSS: " + str(e))
- def test_dbus_old_debug(dev, apdev):
- """The old D-Bus interface - debug"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
- try:
- wpas.setDebugParams(123)
- raise Exception("Invalid setDebugParams accepted")
- except dbus.exceptions.DBusException, e:
- if "InvalidOptions" not in str(e):
- raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
- try:
- wpas.setDebugParams(123, True, True)
- raise Exception("Invalid setDebugParams accepted")
- except dbus.exceptions.DBusException, e:
- if "InvalidOptions" not in str(e):
- raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
- wpas.setDebugParams(1, True, True)
- dev[0].request("LOG_LEVEL MSGDUMP")
- def test_dbus_old_smartcard(dev, apdev):
- """The old D-Bus interface - smartcard"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- params = dbus.Dictionary(signature='sv')
- if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
- params = dbus.Dictionary({ 'opensc_engine_path': "foobar1",
- 'pkcs11_engine_path': "foobar2",
- 'pkcs11_module_path': "foobar3",
- 'foo': 'bar' },
- signature='sv')
- params2 = dbus.Dictionary({ 'pkcs11_engine_path': "foobar2",
- 'foo': 'bar' },
- signature='sv')
- params3 = dbus.Dictionary({ 'pkcs11_module_path': "foobar3",
- 'foo2': 'bar' },
- signature='sv')
- params4 = dbus.Dictionary({ 'opensc_engine_path': "foobar4",
- 'foo3': 'bar' },
- signature='sv')
- tests = [ 1, params, params2, params3, params4 ]
- for t in tests:
- try:
- if_obj.setSmartcardModules(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid setSmartcardModules accepted: " + str(t))
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
- raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e)))
- def test_dbus_old_smartcard_oom(dev, apdev):
- """The old D-Bus interface - smartcard (OOM)"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- for arg in [ 'opensc_engine_path', 'pkcs11_engine_path', 'pkcs11_module_path' ]:
- with alloc_fail_dbus(dev[0], 1,
- "=wpas_dbus_iface_set_smartcard_modules",
- "setSmartcardModules",
- "InvalidOptions"):
- params = dbus.Dictionary({ arg : "foo", }, signature='sv')
- if_obj.setSmartcardModules(params,
- dbus_interface=WPAS_DBUS_OLD_IFACE)
- with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_smartcard_modules",
- "setSmartcardModules", "InvalidOptions"):
- params = dbus.Dictionary({ arg : "foo", }, signature='sv')
- if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
- def test_dbus_old_interface(dev, apdev):
- """The old D-Bus interface - interface get/add/remove"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
- tests = [ (123, "InvalidOptions"),
- ("foo", "InvalidInterface") ]
- for (ifname,err) in tests:
- try:
- wpas.getInterface(ifname)
- raise Exception("Invalid getInterface accepted")
- except dbus.exceptions.DBusException, e:
- if err not in str(e):
- raise Exception("Unexpected error message for invalid getInterface: " + str(e))
- params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
- wpas.addInterface("lo", params)
- path = wpas.getInterface("lo")
- logger.debug("New interface path: " + str(path))
- wpas.removeInterface(path)
- try:
- wpas.removeInterface(path)
- raise Exception("Invalid removeInterface() accepted")
- except dbus.exceptions.DBusException, e:
- if "InvalidInterface" not in str(e):
- raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
- params1 = dbus.Dictionary({ 'driver': 'foo',
- 'driver-params': 'foo',
- 'config-file': 'foo',
- 'bridge-ifname': 'foo' },
- signature='sv')
- params2 = dbus.Dictionary({ 'foo': 'bar' }, signature='sv')
- tests = [ (123, None, "InvalidOptions"),
- ("", None, "InvalidOptions"),
- ("foo", None, "AddError"),
- ("foo", params1, "AddError"),
- ("foo", params2, "InvalidOptions"),
- ("foo", 1234, "InvalidOptions"),
- (dev[0].ifname, None, "ExistsError" ) ]
- for (ifname,params,err) in tests:
- try:
- if params is None:
- wpas.addInterface(ifname)
- else:
- wpas.addInterface(ifname, params)
- raise Exception("Invalid addInterface accepted: " + str(params))
- except dbus.exceptions.DBusException, e:
- if err not in str(e):
- raise Exception("Unexpected error message for invalid addInterface(%s): %s" % (str(params), str(e)))
- try:
- wpas.removeInterface(123)
- raise Exception("Invalid removeInterface accepted")
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
- raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
- def test_dbus_old_interface_oom(dev, apdev):
- """The old D-Bus interface - interface get/add/remove (OOM)"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
- with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_global_add_interface",
- "addInterface", "InvalidOptions"):
- params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
- wpas.addInterface("lo", params)
- for arg in [ "driver", "driver-params", "config-file", "bridge-ifname" ]:
- with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_global_add_interface",
- "addInterface", "InvalidOptions"):
- params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
- wpas.addInterface("lo", params)
- def test_dbus_old_blob(dev, apdev):
- """The old D-Bus interface - blob operations"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- param1 = dbus.Dictionary({ 'blob3': 123 }, signature='sv')
- param2 = dbus.Dictionary({ 'blob3': "foo" })
- param3 = dbus.Dictionary({ '': dbus.ByteArray([ 1, 2 ]) },
- signature='sv')
- tests = [ (1, "InvalidOptions"),
- (param1, "InvalidOptions"),
- (param2, "InvalidOptions"),
- (param3, "InvalidOptions") ]
- for (arg,err) in tests:
- try:
- if_obj.setBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid setBlobs() accepted: " + str(arg))
- except dbus.exceptions.DBusException, e:
- logger.debug("setBlobs(%s): %s" % (str(arg), str(e)))
- if err not in str(e):
- raise Exception("Unexpected error message for invalid setBlobs: " + str(e))
- tests = [ (["foo"], "RemoveError: Error removing blob"),
- ([""], "RemoveError: Invalid blob name"),
- ([1], "InvalidOptions"),
- ("foo", "InvalidOptions") ]
- for (arg,err) in tests:
- try:
- if_obj.removeBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid removeBlobs() accepted: " + str(arg))
- except dbus.exceptions.DBusException, e:
- logger.debug("removeBlobs(%s): %s" % (str(arg), str(e)))
- if err not in str(e):
- raise Exception("Unexpected error message for invalid removeBlobs: " + str(e))
- blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
- 'blob2': dbus.ByteArray([ 1, 2 ]) },
- signature='sv')
- if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
- if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE)
- def test_dbus_old_blob_oom(dev, apdev):
- """The old D-Bus interface - blob operations (OOM)"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
- 'blob2': dbus.ByteArray([ 1, 2 ]) },
- signature='sv')
- with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_blobs", "setBlobs",
- "AddError: Not enough memory to add blob"):
- if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
- with alloc_fail_dbus(dev[0], 2, "=wpas_dbus_iface_set_blobs", "setBlobs",
- "AddError: Not enough memory to add blob data"):
- if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
- with alloc_fail_dbus(dev[0], 3, "=wpas_dbus_iface_set_blobs", "setBlobs",
- "AddError: Error adding blob"):
- if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
- with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_decompose_object_path;wpas_iface_message_handler",
- "setBlobs",
- "InvalidInterface: wpa_supplicant knows nothing about this interface"):
- if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
- def test_dbus_old_connect(dev, apdev):
- """The old D-Bus interface - add a network and connect"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- ssid = "test-wpa2-psk"
- passphrase = 'qwertyuiop'
- params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
- hapd = hostapd.add_ap(apdev[0], params)
- for p in [ "/no/where/to/be/found",
- path + "/Networks/12345",
- path + "/Networks/foo",
- "/fi/epitest/hostap/WPASupplicant/Interfaces",
- "/fi/epitest/hostap/WPASupplicant/Interfaces/12345/Networks/0" ]:
- obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
- try:
- if_obj.removeNetwork(obj, dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid removeNetwork accepted: " + p)
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
- raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
- try:
- if_obj.removeNetwork("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid removeNetwork accepted")
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
- raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
- try:
- if_obj.removeNetwork(path, dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid removeNetwork accepted")
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
- raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
- tests = [ (path, "InvalidNetwork"),
- (bus.get_object(WPAS_DBUS_OLD_SERVICE, "/no/where"),
- "InvalidInterface"),
- (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/1234"),
- "InvalidNetwork"),
- (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/foo"),
- "InvalidNetwork"),
- (1, "InvalidOptions") ]
- for t,err in tests:
- try:
- if_obj.selectNetwork(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid selectNetwork accepted: " + str(t))
- except dbus.exceptions.DBusException, e:
- if err not in str(e):
- raise Exception("Unexpected error message for invalid selectNetwork(%s): %s" % (str(t), str(e)))
- npath = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
- if not npath.startswith(WPAS_DBUS_OLD_PATH):
- raise Exception("Unexpected addNetwork result: " + path)
- netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, npath)
- tests = [ 123,
- dbus.Dictionary({ 'foo': 'bar' }, signature='sv') ]
- for t in tests:
- try:
- netw_obj.set(t, dbus_interface=WPAS_DBUS_OLD_NETWORK)
- raise Exception("Invalid set() accepted: " + str(t))
- except dbus.exceptions.DBusException, e:
- if "InvalidOptions" not in str(e):
- raise Exception("Unexpected error message for invalid set: " + str(e))
- params = dbus.Dictionary({ 'ssid': ssid,
- 'key_mgmt': 'WPA-PSK',
- 'psk': passphrase,
- 'identity': dbus.ByteArray([ 1, 2 ]),
- 'priority': dbus.Int32(0),
- 'scan_freq': dbus.UInt32(2412) },
- signature='sv')
- netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
- id = int(dev[0].list_networks()[0]['id'])
- val = dev[0].get_network(id, "scan_freq")
- if val != "2412":
- raise Exception("Invalid scan_freq value: " + str(val))
- params = dbus.Dictionary({ 'scan_freq': "2412 2432",
- 'freq_list': "2412 2417 2432" },
- signature='sv')
- netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
- val = dev[0].get_network(id, "scan_freq")
- if val != "2412 2432":
- raise Exception("Invalid scan_freq value (2): " + str(val))
- val = dev[0].get_network(id, "freq_list")
- if val != "2412 2417 2432":
- raise Exception("Invalid freq_list value: " + str(val))
- if_obj.removeNetwork(npath, dbus_interface=WPAS_DBUS_OLD_IFACE)
- class TestDbusConnect(TestDbus):
- def __init__(self, bus):
- TestDbus.__init__(self, bus)
- self.state = 0
- def __enter__(self):
- gobject.timeout_add(1, self.run_connect)
- gobject.timeout_add(15000, self.timeout)
- self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
- "ScanResultsAvailable")
- self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
- "StateChange")
- self.loop.run()
- return self
- def scanDone(self):
- logger.debug("scanDone")
- def stateChange(self, new, old):
- logger.debug("stateChange(%d): %s --> %s" % (self.state, old, new))
- if new == "COMPLETED":
- if self.state == 0:
- self.state = 1
- self.netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
- elif self.state == 2:
- self.state = 3
- if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
- elif self.state == 4:
- self.state = 5
- if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
- elif self.state == 6:
- self.state = 7
- if_obj.removeNetwork(self.path,
- dbus_interface=WPAS_DBUS_OLD_IFACE)
- try:
- if_obj.removeNetwork(self.path,
- dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid removeNetwork accepted")
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
- raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
- self.loop.quit()
- elif new == "DISCONNECTED":
- if self.state == 1:
- self.state = 2
- self.netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
- elif self.state == 3:
- self.state = 4
- if_obj.selectNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
- elif self.state == 5:
- self.state = 6
- if_obj.selectNetwork(self.path,
- dbus_interface=WPAS_DBUS_OLD_IFACE)
- def run_connect(self, *args):
- logger.debug("run_connect")
- path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
- netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
- netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
- params = dbus.Dictionary({ 'ssid': ssid,
- 'key_mgmt': 'WPA-PSK',
- 'psk': passphrase,
- 'scan_freq': 2412 },
- signature='sv')
- netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
- netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
- self.path = path
- self.netw_obj = netw_obj
- return False
- def success(self):
- return self.state == 7
- with TestDbusConnect(bus) as t:
- if not t.success():
- raise Exception("Expected signals not seen")
- if len(dev[0].list_networks()) != 0:
- raise Exception("Unexpected network")
- def test_dbus_old_connect_eap(dev, apdev):
- """The old D-Bus interface - add an EAP network and connect"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- ssid = "test-wpa2-eap"
- params = hostapd.wpa2_eap_params(ssid=ssid)
- hapd = hostapd.add_ap(apdev[0], params)
- class TestDbusConnect(TestDbus):
- def __init__(self, bus):
- TestDbus.__init__(self, bus)
- self.connected = False
- self.certification_received = False
- def __enter__(self):
- gobject.timeout_add(1, self.run_connect)
- gobject.timeout_add(15000, self.timeout)
- self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
- "StateChange")
- self.add_signal(self.certification, WPAS_DBUS_OLD_IFACE,
- "Certification")
- self.loop.run()
- return self
- def stateChange(self, new, old):
- logger.debug("stateChange: %s --> %s" % (old, new))
- if new == "COMPLETED":
- self.connected = True
- self.loop.quit()
- def certification(self, depth, subject, hash, cert_hex):
- logger.debug("certification: depth={} subject={} hash={} cert_hex={}".format(depth, subject, hash, cert_hex))
- self.certification_received = True
- def run_connect(self, *args):
- logger.debug("run_connect")
- path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
- netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
- params = dbus.Dictionary({ 'ssid': ssid,
- 'key_mgmt': 'WPA-EAP',
- 'eap': 'TTLS',
- 'anonymous_identity': 'ttls',
- 'identity': 'pap user',
- 'ca_cert': 'auth_serv/ca.pem',
- 'phase2': 'auth=PAP',
- 'password': 'password',
- 'scan_freq': 2412 },
- signature='sv')
- netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
- netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
- self.path = path
- self.netw_obj = netw_obj
- return False
- def success(self):
- return self.connected and self.certification_received
- with TestDbusConnect(bus) as t:
- if not t.success():
- raise Exception("Expected signals not seen")
- def test_dbus_old_network_set(dev, apdev):
- """The old D-Bus interface and network set method"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
- netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
- netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
- params = dbus.Dictionary({ 'priority': dbus.UInt64(1) }, signature='sv')
- try:
- netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
- raise Exception("set succeeded with unexpected type")
- except dbus.exceptions.DBusException, e:
- if "InvalidOptions" not in str(e):
- raise Exception("Unexpected error message for unexpected type: " + str(e))
- def test_dbus_old_wps_pbc(dev, apdev):
- """The old D-Bus interface and WPS/PBC"""
- try:
- _test_dbus_old_wps_pbc(dev, apdev)
- finally:
- dev[0].request("SET wps_cred_processing 0")
- def _test_dbus_old_wps_pbc(dev, apdev):
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- dev[0].flush_scan_cache()
- hapd = start_ap(apdev[0])
- hapd.request("WPS_PBC")
- bssid = apdev[0]['bssid']
- dev[0].scan_for_bss(bssid, freq="2412")
- dev[0].request("SET wps_cred_processing 2")
- for arg in [ 123, "123" ]:
- try:
- if_obj.wpsPbc(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid wpsPbc arguments accepted: " + str(arg))
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
- raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
- class TestDbusWps(TestDbusOldWps):
- def __init__(self, bus, pbc_param):
- TestDbusOldWps.__init__(self, bus)
- self.pbc_param = pbc_param
- def run_wps(self, *args):
- logger.debug("run_wps: pbc_param=" + self.pbc_param)
- if_obj.wpsPbc(self.pbc_param, dbus_interface=WPAS_DBUS_OLD_IFACE)
- return False
- with TestDbusWps(bus, "any") as t:
- if not t.success():
- raise Exception("Expected signals not seen")
- res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
- if len(res) != 1:
- raise Exception("Unexpected number of scan results: " + str(res))
- for i in range(1):
- logger.debug("Scan result BSS path: " + res[i])
- bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
- bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
- byte_arrays=True)
- logger.debug("BSS: " + str(bss))
- dev[0].wait_connected(timeout=10)
- dev[0].request("DISCONNECT")
- dev[0].wait_disconnected(timeout=10)
- dev[0].request("FLUSH")
- hapd.request("WPS_PBC")
- dev[0].scan_for_bss(bssid, freq="2412")
- with TestDbusWps(bus, bssid) as t:
- if not t.success():
- raise Exception("Expected signals not seen")
- dev[0].wait_connected(timeout=10)
- dev[0].request("DISCONNECT")
- dev[0].wait_disconnected(timeout=10)
- hapd.disable()
- dev[0].flush_scan_cache()
- def test_dbus_old_wps_pin(dev, apdev):
- """The old D-Bus interface and WPS/PIN"""
- try:
- _test_dbus_old_wps_pin(dev, apdev)
- finally:
- dev[0].request("SET wps_cred_processing 0")
- def _test_dbus_old_wps_pin(dev, apdev):
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- hapd = start_ap(apdev[0])
- hapd.request("WPS_PIN any 12345670")
- bssid = apdev[0]['bssid']
- dev[0].scan_for_bss(bssid, freq="2412")
- dev[0].request("SET wps_cred_processing 2")
- for arg in [ (123, "12345670"),
- ("123", "12345670") ]:
- try:
- if_obj.wpsPin(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid wpsPin arguments accepted: " + str(arg))
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
- raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
- class TestDbusWps(TestDbusOldWps):
- def __init__(self, bus, bssid, pin):
- TestDbusOldWps.__init__(self, bus)
- self.bssid = bssid
- self.pin = pin
- def run_wps(self, *args):
- logger.debug("run_wps %s %s" % (self.bssid, self.pin))
- pin = if_obj.wpsPin(self.bssid, self.pin,
- dbus_interface=WPAS_DBUS_OLD_IFACE)
- if len(self.pin) == 0:
- h = hostapd.Hostapd(apdev[0]['ifname'])
- h.request("WPS_PIN any " + pin)
- return False
- with TestDbusWps(bus, bssid, "12345670") as t:
- if not t.success():
- raise Exception("Expected signals not seen")
- dev[0].wait_connected(timeout=10)
- dev[0].request("DISCONNECT")
- dev[0].wait_disconnected(timeout=10)
- dev[0].request("FLUSH")
- dev[0].scan_for_bss(bssid, freq="2412")
- with TestDbusWps(bus, "any", "") as t:
- if not t.success():
- raise Exception("Expected signals not seen")
- def test_dbus_old_wps_reg(dev, apdev):
- """The old D-Bus interface and WPS/Registar"""
- try:
- _test_dbus_old_wps_reg(dev, apdev)
- finally:
- dev[0].request("SET wps_cred_processing 0")
- def _test_dbus_old_wps_reg(dev, apdev):
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- hapd = start_ap(apdev[0])
- bssid = apdev[0]['bssid']
- dev[0].scan_for_bss(bssid, freq="2412")
- dev[0].request("SET wps_cred_processing 2")
- for arg in [ (123, "12345670"),
- ("123", "12345670") ]:
- try:
- if_obj.wpsReg(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
- raise Exception("Invalid wpsReg arguments accepted: " + str(arg))
- except dbus.exceptions.DBusException, e:
- if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
- raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
- class TestDbusWps(TestDbusOldWps):
- def run_wps(self, *args):
- logger.debug("run_wps")
- if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
- return False
- with TestDbusWps(bus) as t:
- if not t.success():
- raise Exception("Expected signals not seen")
- dev[0].wait_connected(timeout=10)
- def test_dbus_old_wps_oom(dev, apdev):
- """The old D-Bus interface and WPS (OOM)"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- bssid = apdev[0]['bssid']
- with alloc_fail_dbus(dev[0], 1,
- "=wpa_config_add_network;wpas_dbus_iface_wps_pbc",
- "wpsPbc",
- "WpsPbcError: Could not start PBC negotiation"):
- if_obj.wpsPbc("any", dbus_interface=WPAS_DBUS_OLD_IFACE)
- with alloc_fail_dbus(dev[0], 1,
- "=wpa_config_add_network;wpas_dbus_iface_wps_pin",
- "wpsPin", "WpsPinError: Could not init PIN"):
- if_obj.wpsPin("any", "", dbus_interface=WPAS_DBUS_OLD_IFACE)
- with alloc_fail_dbus(dev[0], 1,
- "=wpa_config_add_network;wpas_dbus_iface_wps_reg",
- "wpsReg",
- "WpsRegError: Could not request credentials"):
- if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
- def test_dbus_old_network_set_oom(dev, apdev):
- """The old D-Bus interface and network set method (OOM)"""
- (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
- with alloc_fail_dbus(dev[0], 1,
- "=wpa_config_add_network;wpas_dbus_iface_add_network",
- "addNetwork",
- "AddNetworkError: wpa_supplicant could not add"):
- if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
- path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
- netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
- netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
- with alloc_fail_dbus(dev[0], 1,
- "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_network",
- "set", "InvalidOptions"):
- params = dbus.Dictionary({ 'ssid': "foo" }, signature='sv')
- netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
- tests = [ { 'identity': dbus.ByteArray([ 1, 2 ]) },
- { 'scan_freq': dbus.UInt32(2412) },
- { 'priority': dbus.Int32(0) },
- { 'identity': "user" },
- { 'eap': "TLS" }]
- for arg in tests:
- with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_network",
- "set", "InvalidOptions"):
- params = dbus.Dictionary(arg, signature='sv')
- netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
|