test_dbus_old.py 36 KB


  1. # wpa_supplicant D-Bus old interface tests
  2. # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import logging
  7. logger = logging.getLogger()
  8. try:
  9. import gobject
  10. import dbus
  11. dbus_imported = True
  12. except ImportError:
  13. dbus_imported = False
  14. import hostapd
  15. from utils import HwsimSkip
  16. from test_dbus import TestDbus, alloc_fail_dbus, start_ap
  17. WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant"
  18. WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant"
  19. WPAS_DBUS_OLD_IFACE = "fi.epitest.hostap.WPASupplicant.Interface"
  20. WPAS_DBUS_OLD_BSSID = "fi.epitest.hostap.WPASupplicant.BSSID"
  21. WPAS_DBUS_OLD_NETWORK = "fi.epitest.hostap.WPASupplicant.Network"
  22. def prepare_dbus(dev):
  23. if not dbus_imported:
  24. raise HwsimSkip("No dbus module available")
  25. try:
  26. from dbus.mainloop.glib import DBusGMainLoop
  27. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  28. bus = dbus.SystemBus()
  29. wpas_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, WPAS_DBUS_OLD_PATH)
  30. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  31. path = wpas.getInterface(dev.ifname)
  32. if_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  33. return (bus,wpas_obj,path,if_obj)
  34. except Exception, e:
  35. raise HwsimSkip("Could not connect to D-Bus: %s" % e)
  36. class TestDbusOldWps(TestDbus):
  37. def __init__(self, bus):
  38. TestDbus.__init__(self, bus)
  39. self.event_ok = False
  40. def __enter__(self):
  41. gobject.timeout_add(1, self.run_wps)
  42. gobject.timeout_add(15000, self.timeout)
  43. self.add_signal(self.wpsCred, WPAS_DBUS_OLD_IFACE, "WpsCred")
  44. self.loop.run()
  45. return self
  46. def wpsCred(self, cred):
  47. logger.debug("wpsCred: " + str(cred))
  48. self.event_ok = True
  49. self.loop.quit()
  50. def success(self):
  51. return self.event_ok
  52. def test_dbus_old(dev, apdev):
  53. """The old D-Bus interface"""
  54. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  55. res = if_obj.capabilities(dbus_interface=WPAS_DBUS_OLD_IFACE)
  56. logger.debug("capabilities(): " + str(res))
  57. if 'auth_alg' not in res or "OPEN" not in res['auth_alg']:
  58. raise Exception("Unexpected capabilities")
  59. res2 = if_obj.capabilities(dbus.Boolean(True),
  60. dbus_interface=WPAS_DBUS_OLD_IFACE)
  61. logger.debug("capabilities(strict): " + str(res2))
  62. res = if_obj.state(dbus_interface=WPAS_DBUS_OLD_IFACE)
  63. logger.debug("State: " + res)
  64. res = if_obj.scanning(dbus_interface=WPAS_DBUS_OLD_IFACE)
  65. if res != 0:
  66. raise Exception("Unexpected scanning: " + str(res))
  67. if_obj.setAPScan(dbus.UInt32(1), dbus_interface=WPAS_DBUS_OLD_IFACE)
  68. for t in [ dbus.UInt32(123), "foo" ]:
  69. try:
  70. if_obj.setAPScan(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  71. raise Exception("Invalid setAPScan() accepted")
  72. except dbus.exceptions.DBusException, e:
  73. if "InvalidOptions" not in str(e):
  74. raise Exception("Unexpected error message for invalid setAPScan: " + str(e))
  75. for p in [ path + "/Networks/12345",
  76. path + "/Networks/foo" ]:
  77. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  78. try:
  79. obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  80. raise Exception("Invalid disable() accepted")
  81. except dbus.exceptions.DBusException, e:
  82. if "InvalidNetwork" not in str(e):
  83. raise Exception("Unexpected error message for invalid disable: " + str(e))
  84. for p in [ path + "/BSSIDs/foo",
  85. path + "/BSSIDs/001122334455"]:
  86. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  87. try:
  88. obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID)
  89. raise Exception("Invalid properties() accepted")
  90. except dbus.exceptions.DBusException, e:
  91. if "InvalidBSSID" not in str(e):
  92. raise Exception("Unexpected error message for invalid properties: " + str(e))
  93. def test_dbus_old_scan(dev, apdev):
  94. """The old D-Bus interface - scanning"""
  95. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  96. hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
  97. params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
  98. params['wpa'] = '3'
  99. hapd2 = hostapd.add_ap(apdev[1], params)
  100. class TestDbusScan(TestDbus):
  101. def __init__(self, bus):
  102. TestDbus.__init__(self, bus)
  103. self.scan_completed = False
  104. def __enter__(self):
  105. gobject.timeout_add(1, self.run_scan)
  106. gobject.timeout_add(7000, self.timeout)
  107. self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
  108. "ScanResultsAvailable")
  109. self.loop.run()
  110. return self
  111. def scanDone(self):
  112. logger.debug("scanDone")
  113. self.scan_completed = True
  114. self.loop.quit()
  115. def run_scan(self, *args):
  116. logger.debug("run_scan")
  117. if not if_obj.scan(dbus_interface=WPAS_DBUS_OLD_IFACE):
  118. raise Exception("Failed to trigger scan")
  119. return False
  120. def success(self):
  121. return self.scan_completed
  122. with TestDbusScan(bus) as t:
  123. if not t.success():
  124. raise Exception("Expected signals not seen")
  125. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  126. if len(res) != 2:
  127. raise Exception("Unexpected number of scan results: " + str(res))
  128. for i in range(2):
  129. logger.debug("Scan result BSS path: " + res[i])
  130. bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
  131. bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  132. byte_arrays=True)
  133. logger.debug("BSS: " + str(bss))
  134. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[0])
  135. try:
  136. bss_obj.properties2(dbus_interface=WPAS_DBUS_OLD_BSSID)
  137. raise Exception("Unknown BSSID method accepted")
  138. except Exception, e:
  139. logger.debug("Unknown BSSID method exception: " + str(e))
  140. if not if_obj.flush(0, dbus_interface=WPAS_DBUS_OLD_IFACE):
  141. raise Exception("Failed to issue flush(0)")
  142. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  143. if len(res) != 0:
  144. raise Exception("Unexpected BSS entry after flush")
  145. if not if_obj.flush(1, dbus_interface=WPAS_DBUS_OLD_IFACE):
  146. raise Exception("Failed to issue flush(1)")
  147. try:
  148. if_obj.flush("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
  149. raise Exception("Invalid flush arguments accepted")
  150. except dbus.exceptions.DBusException, e:
  151. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  152. raise Exception("Unexpected error message for invalid flush: " + str(e))
  153. try:
  154. bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  155. byte_arrays=True)
  156. except dbus.exceptions.DBusException, e:
  157. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidBSSID"):
  158. raise Exception("Unexpected error message for invalid BSS: " + str(e))
  159. def test_dbus_old_debug(dev, apdev):
  160. """The old D-Bus interface - debug"""
  161. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  162. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  163. try:
  164. wpas.setDebugParams(123)
  165. raise Exception("Invalid setDebugParams accepted")
  166. except dbus.exceptions.DBusException, e:
  167. if "InvalidOptions" not in str(e):
  168. raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
  169. try:
  170. wpas.setDebugParams(123, True, True)
  171. raise Exception("Invalid setDebugParams accepted")
  172. except dbus.exceptions.DBusException, e:
  173. if "InvalidOptions" not in str(e):
  174. raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
  175. wpas.setDebugParams(1, True, True)
  176. dev[0].request("LOG_LEVEL MSGDUMP")
  177. def test_dbus_old_smartcard(dev, apdev):
  178. """The old D-Bus interface - smartcard"""
  179. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  180. params = dbus.Dictionary(signature='sv')
  181. if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
  182. params = dbus.Dictionary({ 'opensc_engine_path': "foobar1",
  183. 'pkcs11_engine_path': "foobar2",
  184. 'pkcs11_module_path': "foobar3",
  185. 'foo': 'bar' },
  186. signature='sv')
  187. params2 = dbus.Dictionary({ 'pkcs11_engine_path': "foobar2",
  188. 'foo': 'bar' },
  189. signature='sv')
  190. params3 = dbus.Dictionary({ 'pkcs11_module_path': "foobar3",
  191. 'foo2': 'bar' },
  192. signature='sv')
  193. params4 = dbus.Dictionary({ 'opensc_engine_path': "foobar4",
  194. 'foo3': 'bar' },
  195. signature='sv')
  196. tests = [ 1, params, params2, params3, params4 ]
  197. for t in tests:
  198. try:
  199. if_obj.setSmartcardModules(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  200. raise Exception("Invalid setSmartcardModules accepted: " + str(t))
  201. except dbus.exceptions.DBusException, e:
  202. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  203. raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e)))
  204. def test_dbus_old_smartcard_oom(dev, apdev):
  205. """The old D-Bus interface - smartcard (OOM)"""
  206. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  207. for arg in [ 'opensc_engine_path', 'pkcs11_engine_path', 'pkcs11_module_path' ]:
  208. with alloc_fail_dbus(dev[0], 1,
  209. "=wpas_dbus_iface_set_smartcard_modules",
  210. "setSmartcardModules",
  211. "InvalidOptions"):
  212. params = dbus.Dictionary({ arg : "foo", }, signature='sv')
  213. if_obj.setSmartcardModules(params,
  214. dbus_interface=WPAS_DBUS_OLD_IFACE)
  215. with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_smartcard_modules",
  216. "setSmartcardModules", "InvalidOptions"):
  217. params = dbus.Dictionary({ arg : "foo", }, signature='sv')
  218. if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
  219. def test_dbus_old_interface(dev, apdev):
  220. """The old D-Bus interface - interface get/add/remove"""
  221. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  222. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  223. tests = [ (123, "InvalidOptions"),
  224. ("foo", "InvalidInterface") ]
  225. for (ifname,err) in tests:
  226. try:
  227. wpas.getInterface(ifname)
  228. raise Exception("Invalid getInterface accepted")
  229. except dbus.exceptions.DBusException, e:
  230. if err not in str(e):
  231. raise Exception("Unexpected error message for invalid getInterface: " + str(e))
  232. params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
  233. wpas.addInterface("lo", params)
  234. path = wpas.getInterface("lo")
  235. logger.debug("New interface path: " + str(path))
  236. wpas.removeInterface(path)
  237. try:
  238. wpas.removeInterface(path)
  239. raise Exception("Invalid removeInterface() accepted")
  240. except dbus.exceptions.DBusException, e:
  241. if "InvalidInterface" not in str(e):
  242. raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
  243. params1 = dbus.Dictionary({ 'driver': 'foo',
  244. 'driver-params': 'foo',
  245. 'config-file': 'foo',
  246. 'bridge-ifname': 'foo' },
  247. signature='sv')
  248. params2 = dbus.Dictionary({ 'foo': 'bar' }, signature='sv')
  249. tests = [ (123, None, "InvalidOptions"),
  250. ("", None, "InvalidOptions"),
  251. ("foo", None, "AddError"),
  252. ("foo", params1, "AddError"),
  253. ("foo", params2, "InvalidOptions"),
  254. ("foo", 1234, "InvalidOptions"),
  255. (dev[0].ifname, None, "ExistsError" ) ]
  256. for (ifname,params,err) in tests:
  257. try:
  258. if params is None:
  259. wpas.addInterface(ifname)
  260. else:
  261. wpas.addInterface(ifname, params)
  262. raise Exception("Invalid addInterface accepted: " + str(params))
  263. except dbus.exceptions.DBusException, e:
  264. if err not in str(e):
  265. raise Exception("Unexpected error message for invalid addInterface(%s): %s" % (str(params), str(e)))
  266. try:
  267. wpas.removeInterface(123)
  268. raise Exception("Invalid removeInterface accepted")
  269. except dbus.exceptions.DBusException, e:
  270. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  271. raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
  272. def test_dbus_old_interface_oom(dev, apdev):
  273. """The old D-Bus interface - interface get/add/remove (OOM)"""
  274. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  275. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  276. with alloc_fail_dbus(dev[0], 1, "=_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_global_add_interface",
  277. "addInterface", "InvalidOptions"):
  278. params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
  279. wpas.addInterface("lo", params)
  280. for arg in [ "driver", "driver-params", "config-file", "bridge-ifname" ]:
  281. with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_global_add_interface",
  282. "addInterface", "InvalidOptions"):
  283. params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
  284. wpas.addInterface("lo", params)
  285. def test_dbus_old_blob(dev, apdev):
  286. """The old D-Bus interface - blob operations"""
  287. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  288. param1 = dbus.Dictionary({ 'blob3': 123 }, signature='sv')
  289. param2 = dbus.Dictionary({ 'blob3': "foo" })
  290. param3 = dbus.Dictionary({ '': dbus.ByteArray([ 1, 2 ]) },
  291. signature='sv')
  292. tests = [ (1, "InvalidOptions"),
  293. (param1, "InvalidOptions"),
  294. (param2, "InvalidOptions"),
  295. (param3, "InvalidOptions") ]
  296. for (arg,err) in tests:
  297. try:
  298. if_obj.setBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  299. raise Exception("Invalid setBlobs() accepted: " + str(arg))
  300. except dbus.exceptions.DBusException, e:
  301. logger.debug("setBlobs(%s): %s" % (str(arg), str(e)))
  302. if err not in str(e):
  303. raise Exception("Unexpected error message for invalid setBlobs: " + str(e))
  304. tests = [ (["foo"], "RemoveError: Error removing blob"),
  305. ([""], "RemoveError: Invalid blob name"),
  306. ([1], "InvalidOptions"),
  307. ("foo", "InvalidOptions") ]
  308. for (arg,err) in tests:
  309. try:
  310. if_obj.removeBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  311. raise Exception("Invalid removeBlobs() accepted: " + str(arg))
  312. except dbus.exceptions.DBusException, e:
  313. logger.debug("removeBlobs(%s): %s" % (str(arg), str(e)))
  314. if err not in str(e):
  315. raise Exception("Unexpected error message for invalid removeBlobs: " + str(e))
  316. blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
  317. 'blob2': dbus.ByteArray([ 1, 2 ]) },
  318. signature='sv')
  319. if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
  320. if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE)
  321. def test_dbus_old_blob_oom(dev, apdev):
  322. """The old D-Bus interface - blob operations (OOM)"""
  323. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  324. blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
  325. 'blob2': dbus.ByteArray([ 1, 2 ]) },
  326. signature='sv')
  327. with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_blobs", "setBlobs",
  328. "AddError: Not enough memory to add blob"):
  329. if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
  330. with alloc_fail_dbus(dev[0], 2, "=wpas_dbus_iface_set_blobs", "setBlobs",
  331. "AddError: Not enough memory to add blob data"):
  332. if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
  333. with alloc_fail_dbus(dev[0], 3, "=wpas_dbus_iface_set_blobs", "setBlobs",
  334. "AddError: Error adding blob"):
  335. if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
  336. with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_decompose_object_path;wpas_iface_message_handler",
  337. "setBlobs",
  338. "InvalidInterface: wpa_supplicant knows nothing about this interface"):
  339. if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
  340. def test_dbus_old_connect(dev, apdev):
  341. """The old D-Bus interface - add a network and connect"""
  342. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  343. ssid = "test-wpa2-psk"
  344. passphrase = 'qwertyuiop'
  345. params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
  346. hapd = hostapd.add_ap(apdev[0], params)
  347. for p in [ "/no/where/to/be/found",
  348. path + "/Networks/12345",
  349. path + "/Networks/foo",
  350. "/fi/epitest/hostap/WPASupplicant/Interfaces",
  351. "/fi/epitest/hostap/WPASupplicant/Interfaces/12345/Networks/0" ]:
  352. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  353. try:
  354. if_obj.removeNetwork(obj, dbus_interface=WPAS_DBUS_OLD_IFACE)
  355. raise Exception("Invalid removeNetwork accepted: " + p)
  356. except dbus.exceptions.DBusException, e:
  357. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  358. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  359. try:
  360. if_obj.removeNetwork("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
  361. raise Exception("Invalid removeNetwork accepted")
  362. except dbus.exceptions.DBusException, e:
  363. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  364. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  365. try:
  366. if_obj.removeNetwork(path, dbus_interface=WPAS_DBUS_OLD_IFACE)
  367. raise Exception("Invalid removeNetwork accepted")
  368. except dbus.exceptions.DBusException, e:
  369. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  370. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  371. tests = [ (path, "InvalidNetwork"),
  372. (bus.get_object(WPAS_DBUS_OLD_SERVICE, "/no/where"),
  373. "InvalidInterface"),
  374. (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/1234"),
  375. "InvalidNetwork"),
  376. (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/foo"),
  377. "InvalidNetwork"),
  378. (1, "InvalidOptions") ]
  379. for t,err in tests:
  380. try:
  381. if_obj.selectNetwork(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  382. raise Exception("Invalid selectNetwork accepted: " + str(t))
  383. except dbus.exceptions.DBusException, e:
  384. if err not in str(e):
  385. raise Exception("Unexpected error message for invalid selectNetwork(%s): %s" % (str(t), str(e)))
  386. npath = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  387. if not npath.startswith(WPAS_DBUS_OLD_PATH):
  388. raise Exception("Unexpected addNetwork result: " + path)
  389. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, npath)
  390. tests = [ 123,
  391. dbus.Dictionary({ 'foo': 'bar' }, signature='sv') ]
  392. for t in tests:
  393. try:
  394. netw_obj.set(t, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  395. raise Exception("Invalid set() accepted: " + str(t))
  396. except dbus.exceptions.DBusException, e:
  397. if "InvalidOptions" not in str(e):
  398. raise Exception("Unexpected error message for invalid set: " + str(e))
  399. params = dbus.Dictionary({ 'ssid': ssid,
  400. 'key_mgmt': 'WPA-PSK',
  401. 'psk': passphrase,
  402. 'identity': dbus.ByteArray([ 1, 2 ]),
  403. 'priority': dbus.Int32(0),
  404. 'scan_freq': dbus.UInt32(2412) },
  405. signature='sv')
  406. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  407. id = int(dev[0].list_networks()[0]['id'])
  408. val = dev[0].get_network(id, "scan_freq")
  409. if val != "2412":
  410. raise Exception("Invalid scan_freq value: " + str(val))
  411. params = dbus.Dictionary({ 'scan_freq': "2412 2432",
  412. 'freq_list': "2412 2417 2432" },
  413. signature='sv')
  414. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  415. val = dev[0].get_network(id, "scan_freq")
  416. if val != "2412 2432":
  417. raise Exception("Invalid scan_freq value (2): " + str(val))
  418. val = dev[0].get_network(id, "freq_list")
  419. if val != "2412 2417 2432":
  420. raise Exception("Invalid freq_list value: " + str(val))
  421. if_obj.removeNetwork(npath, dbus_interface=WPAS_DBUS_OLD_IFACE)
  422. class TestDbusConnect(TestDbus):
  423. def __init__(self, bus):
  424. TestDbus.__init__(self, bus)
  425. self.state = 0
  426. def __enter__(self):
  427. gobject.timeout_add(1, self.run_connect)
  428. gobject.timeout_add(15000, self.timeout)
  429. self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
  430. "ScanResultsAvailable")
  431. self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
  432. "StateChange")
  433. self.loop.run()
  434. return self
  435. def scanDone(self):
  436. logger.debug("scanDone")
  437. def stateChange(self, new, old):
  438. logger.debug("stateChange(%d): %s --> %s" % (self.state, old, new))
  439. if new == "COMPLETED":
  440. if self.state == 0:
  441. self.state = 1
  442. self.netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  443. elif self.state == 2:
  444. self.state = 3
  445. if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
  446. elif self.state == 4:
  447. self.state = 5
  448. if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
  449. elif self.state == 6:
  450. self.state = 7
  451. if_obj.removeNetwork(self.path,
  452. dbus_interface=WPAS_DBUS_OLD_IFACE)
  453. try:
  454. if_obj.removeNetwork(self.path,
  455. dbus_interface=WPAS_DBUS_OLD_IFACE)
  456. raise Exception("Invalid removeNetwork accepted")
  457. except dbus.exceptions.DBusException, e:
  458. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  459. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  460. self.loop.quit()
  461. elif new == "DISCONNECTED":
  462. if self.state == 1:
  463. self.state = 2
  464. self.netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  465. elif self.state == 3:
  466. self.state = 4
  467. if_obj.selectNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  468. elif self.state == 5:
  469. self.state = 6
  470. if_obj.selectNetwork(self.path,
  471. dbus_interface=WPAS_DBUS_OLD_IFACE)
  472. def run_connect(self, *args):
  473. logger.debug("run_connect")
  474. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  475. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  476. netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  477. params = dbus.Dictionary({ 'ssid': ssid,
  478. 'key_mgmt': 'WPA-PSK',
  479. 'psk': passphrase,
  480. 'scan_freq': 2412 },
  481. signature='sv')
  482. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  483. netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  484. self.path = path
  485. self.netw_obj = netw_obj
  486. return False
  487. def success(self):
  488. return self.state == 7
  489. with TestDbusConnect(bus) as t:
  490. if not t.success():
  491. raise Exception("Expected signals not seen")
  492. if len(dev[0].list_networks()) != 0:
  493. raise Exception("Unexpected network")
  494. def test_dbus_old_connect_eap(dev, apdev):
  495. """The old D-Bus interface - add an EAP network and connect"""
  496. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  497. ssid = "test-wpa2-eap"
  498. params = hostapd.wpa2_eap_params(ssid=ssid)
  499. hapd = hostapd.add_ap(apdev[0], params)
  500. class TestDbusConnect(TestDbus):
  501. def __init__(self, bus):
  502. TestDbus.__init__(self, bus)
  503. self.connected = False
  504. self.certification_received = False
  505. def __enter__(self):
  506. gobject.timeout_add(1, self.run_connect)
  507. gobject.timeout_add(15000, self.timeout)
  508. self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
  509. "StateChange")
  510. self.add_signal(self.certification, WPAS_DBUS_OLD_IFACE,
  511. "Certification")
  512. self.loop.run()
  513. return self
  514. def stateChange(self, new, old):
  515. logger.debug("stateChange: %s --> %s" % (old, new))
  516. if new == "COMPLETED":
  517. self.connected = True
  518. self.loop.quit()
  519. def certification(self, depth, subject, hash, cert_hex):
  520. logger.debug("certification: depth={} subject={} hash={} cert_hex={}".format(depth, subject, hash, cert_hex))
  521. self.certification_received = True
  522. def run_connect(self, *args):
  523. logger.debug("run_connect")
  524. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  525. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  526. params = dbus.Dictionary({ 'ssid': ssid,
  527. 'key_mgmt': 'WPA-EAP',
  528. 'eap': 'TTLS',
  529. 'anonymous_identity': 'ttls',
  530. 'identity': 'pap user',
  531. 'ca_cert': 'auth_serv/ca.pem',
  532. 'phase2': 'auth=PAP',
  533. 'password': 'password',
  534. 'scan_freq': 2412 },
  535. signature='sv')
  536. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  537. netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  538. self.path = path
  539. self.netw_obj = netw_obj
  540. return False
  541. def success(self):
  542. return self.connected and self.certification_received
  543. with TestDbusConnect(bus) as t:
  544. if not t.success():
  545. raise Exception("Expected signals not seen")
  546. def test_dbus_old_network_set(dev, apdev):
  547. """The old D-Bus interface and network set method"""
  548. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  549. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  550. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  551. netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  552. params = dbus.Dictionary({ 'priority': dbus.UInt64(1) }, signature='sv')
  553. try:
  554. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  555. raise Exception("set succeeded with unexpected type")
  556. except dbus.exceptions.DBusException, e:
  557. if "InvalidOptions" not in str(e):
  558. raise Exception("Unexpected error message for unexpected type: " + str(e))
  559. def test_dbus_old_wps_pbc(dev, apdev):
  560. """The old D-Bus interface and WPS/PBC"""
  561. try:
  562. _test_dbus_old_wps_pbc(dev, apdev)
  563. finally:
  564. dev[0].request("SET wps_cred_processing 0")
  565. def _test_dbus_old_wps_pbc(dev, apdev):
  566. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  567. dev[0].flush_scan_cache()
  568. hapd = start_ap(apdev[0])
  569. hapd.request("WPS_PBC")
  570. bssid = apdev[0]['bssid']
  571. dev[0].scan_for_bss(bssid, freq="2412")
  572. dev[0].request("SET wps_cred_processing 2")
  573. for arg in [ 123, "123" ]:
  574. try:
  575. if_obj.wpsPbc(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  576. raise Exception("Invalid wpsPbc arguments accepted: " + str(arg))
  577. except dbus.exceptions.DBusException, e:
  578. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  579. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  580. class TestDbusWps(TestDbusOldWps):
  581. def __init__(self, bus, pbc_param):
  582. TestDbusOldWps.__init__(self, bus)
  583. self.pbc_param = pbc_param
  584. def run_wps(self, *args):
  585. logger.debug("run_wps: pbc_param=" + self.pbc_param)
  586. if_obj.wpsPbc(self.pbc_param, dbus_interface=WPAS_DBUS_OLD_IFACE)
  587. return False
  588. with TestDbusWps(bus, "any") as t:
  589. if not t.success():
  590. raise Exception("Expected signals not seen")
  591. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  592. if len(res) != 1:
  593. raise Exception("Unexpected number of scan results: " + str(res))
  594. for i in range(1):
  595. logger.debug("Scan result BSS path: " + res[i])
  596. bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
  597. bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  598. byte_arrays=True)
  599. logger.debug("BSS: " + str(bss))
  600. dev[0].wait_connected(timeout=10)
  601. dev[0].request("DISCONNECT")
  602. dev[0].wait_disconnected(timeout=10)
  603. dev[0].request("FLUSH")
  604. hapd.request("WPS_PBC")
  605. dev[0].scan_for_bss(bssid, freq="2412")
  606. with TestDbusWps(bus, bssid) as t:
  607. if not t.success():
  608. raise Exception("Expected signals not seen")
  609. dev[0].wait_connected(timeout=10)
  610. dev[0].request("DISCONNECT")
  611. dev[0].wait_disconnected(timeout=10)
  612. hapd.disable()
  613. dev[0].flush_scan_cache()
  614. def test_dbus_old_wps_pin(dev, apdev):
  615. """The old D-Bus interface and WPS/PIN"""
  616. try:
  617. _test_dbus_old_wps_pin(dev, apdev)
  618. finally:
  619. dev[0].request("SET wps_cred_processing 0")
  620. def _test_dbus_old_wps_pin(dev, apdev):
  621. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  622. hapd = start_ap(apdev[0])
  623. hapd.request("WPS_PIN any 12345670")
  624. bssid = apdev[0]['bssid']
  625. dev[0].scan_for_bss(bssid, freq="2412")
  626. dev[0].request("SET wps_cred_processing 2")
  627. for arg in [ (123, "12345670"),
  628. ("123", "12345670") ]:
  629. try:
  630. if_obj.wpsPin(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
  631. raise Exception("Invalid wpsPin arguments accepted: " + str(arg))
  632. except dbus.exceptions.DBusException, e:
  633. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  634. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  635. class TestDbusWps(TestDbusOldWps):
  636. def __init__(self, bus, bssid, pin):
  637. TestDbusOldWps.__init__(self, bus)
  638. self.bssid = bssid
  639. self.pin = pin
  640. def run_wps(self, *args):
  641. logger.debug("run_wps %s %s" % (self.bssid, self.pin))
  642. pin = if_obj.wpsPin(self.bssid, self.pin,
  643. dbus_interface=WPAS_DBUS_OLD_IFACE)
  644. if len(self.pin) == 0:
  645. h = hostapd.Hostapd(apdev[0]['ifname'])
  646. h.request("WPS_PIN any " + pin)
  647. return False
  648. with TestDbusWps(bus, bssid, "12345670") as t:
  649. if not t.success():
  650. raise Exception("Expected signals not seen")
  651. dev[0].wait_connected(timeout=10)
  652. dev[0].request("DISCONNECT")
  653. dev[0].wait_disconnected(timeout=10)
  654. dev[0].request("FLUSH")
  655. dev[0].scan_for_bss(bssid, freq="2412")
  656. with TestDbusWps(bus, "any", "") as t:
  657. if not t.success():
  658. raise Exception("Expected signals not seen")
  659. def test_dbus_old_wps_reg(dev, apdev):
  660. """The old D-Bus interface and WPS/Registar"""
  661. try:
  662. _test_dbus_old_wps_reg(dev, apdev)
  663. finally:
  664. dev[0].request("SET wps_cred_processing 0")
  665. def _test_dbus_old_wps_reg(dev, apdev):
  666. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  667. hapd = start_ap(apdev[0])
  668. bssid = apdev[0]['bssid']
  669. dev[0].scan_for_bss(bssid, freq="2412")
  670. dev[0].request("SET wps_cred_processing 2")
  671. for arg in [ (123, "12345670"),
  672. ("123", "12345670") ]:
  673. try:
  674. if_obj.wpsReg(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
  675. raise Exception("Invalid wpsReg arguments accepted: " + str(arg))
  676. except dbus.exceptions.DBusException, e:
  677. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  678. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  679. class TestDbusWps(TestDbusOldWps):
  680. def run_wps(self, *args):
  681. logger.debug("run_wps")
  682. if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
  683. return False
  684. with TestDbusWps(bus) as t:
  685. if not t.success():
  686. raise Exception("Expected signals not seen")
  687. dev[0].wait_connected(timeout=10)
  688. def test_dbus_old_wps_oom(dev, apdev):
  689. """The old D-Bus interface and WPS (OOM)"""
  690. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  691. bssid = apdev[0]['bssid']
  692. with alloc_fail_dbus(dev[0], 1,
  693. "=wpa_config_add_network;wpas_dbus_iface_wps_pbc",
  694. "wpsPbc",
  695. "WpsPbcError: Could not start PBC negotiation"):
  696. if_obj.wpsPbc("any", dbus_interface=WPAS_DBUS_OLD_IFACE)
  697. with alloc_fail_dbus(dev[0], 1,
  698. "=wpa_config_add_network;wpas_dbus_iface_wps_pin",
  699. "wpsPin", "WpsPinError: Could not init PIN"):
  700. if_obj.wpsPin("any", "", dbus_interface=WPAS_DBUS_OLD_IFACE)
  701. with alloc_fail_dbus(dev[0], 1,
  702. "=wpa_config_add_network;wpas_dbus_iface_wps_reg",
  703. "wpsReg",
  704. "WpsRegError: Could not request credentials"):
  705. if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
  706. def test_dbus_old_network_set_oom(dev, apdev):
  707. """The old D-Bus interface and network set method (OOM)"""
  708. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  709. with alloc_fail_dbus(dev[0], 1,
  710. "=wpa_config_add_network;wpas_dbus_iface_add_network",
  711. "addNetwork",
  712. "AddNetworkError: wpa_supplicant could not add"):
  713. if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  714. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  715. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  716. netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  717. with alloc_fail_dbus(dev[0], 1,
  718. "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_iface_set_network",
  719. "set", "InvalidOptions"):
  720. params = dbus.Dictionary({ 'ssid': "foo" }, signature='sv')
  721. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  722. tests = [ { 'identity': dbus.ByteArray([ 1, 2 ]) },
  723. { 'scan_freq': dbus.UInt32(2412) },
  724. { 'priority': dbus.Int32(0) },
  725. { 'identity': "user" },
  726. { 'eap': "TLS" }]
  727. for arg in tests:
  728. with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_iface_set_network",
  729. "set", "InvalidOptions"):
  730. params = dbus.Dictionary(arg, signature='sv')
  731. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)