test_dbus_old.py 29 KB


  1. # wpa_supplicant D-Bus old interface tests
  2. # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import gobject
  7. import logging
  8. logger = logging.getLogger()
  9. try:
  10. import dbus
  11. dbus_imported = True
  12. except ImportError:
  13. dbus_imported = False
  14. import hostapd
  15. from test_dbus import TestDbus, start_ap
  16. WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant"
  17. WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant"
  18. WPAS_DBUS_OLD_IFACE = "fi.epitest.hostap.WPASupplicant.Interface"
  19. WPAS_DBUS_OLD_BSSID = "fi.epitest.hostap.WPASupplicant.BSSID"
  20. WPAS_DBUS_OLD_NETWORK = "fi.epitest.hostap.WPASupplicant.Network"
  21. def prepare_dbus(dev):
  22. if not dbus_imported:
  23. logger.info("No dbus module available")
  24. raise Exception("hwsim-SKIP")
  25. try:
  26. from dbus.mainloop.glib import DBusGMainLoop
  27. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  28. bus = dbus.SystemBus()
  29. wpas_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, WPAS_DBUS_OLD_PATH)
  30. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  31. path = wpas.getInterface(dev.ifname)
  32. if_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  33. return (bus,wpas_obj,path,if_obj)
  34. except Exception, e:
  35. logger.info("No D-Bus support available: " + str(e))
  36. raise Exception("hwsim-SKIP")
  37. class TestDbusOldWps(TestDbus):
  38. def __init__(self, bus):
  39. TestDbus.__init__(self, bus)
  40. self.event_ok = False
  41. def __enter__(self):
  42. gobject.timeout_add(1, self.run_wps)
  43. gobject.timeout_add(15000, self.timeout)
  44. self.add_signal(self.wpsCred, WPAS_DBUS_OLD_IFACE, "WpsCred")
  45. self.loop.run()
  46. return self
  47. def wpsCred(self, cred):
  48. logger.debug("wpsCred: " + str(cred))
  49. self.event_ok = True
  50. self.loop.quit()
  51. def success(self):
  52. return self.event_ok
  53. def test_dbus_old(dev, apdev):
  54. """The old D-Bus interface"""
  55. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  56. res = if_obj.capabilities(dbus_interface=WPAS_DBUS_OLD_IFACE)
  57. logger.debug("capabilities(): " + str(res))
  58. if 'auth_alg' not in res or "OPEN" not in res['auth_alg']:
  59. raise Exception("Unexpected capabilities")
  60. res2 = if_obj.capabilities(dbus.Boolean(True),
  61. dbus_interface=WPAS_DBUS_OLD_IFACE)
  62. logger.debug("capabilities(strict): " + str(res2))
  63. res = if_obj.state(dbus_interface=WPAS_DBUS_OLD_IFACE)
  64. logger.debug("State: " + res)
  65. res = if_obj.scanning(dbus_interface=WPAS_DBUS_OLD_IFACE)
  66. if res != 0:
  67. raise Exception("Unexpected scanning: " + str(res))
  68. if_obj.setAPScan(dbus.UInt32(1), dbus_interface=WPAS_DBUS_OLD_IFACE)
  69. for t in [ dbus.UInt32(123), "foo" ]:
  70. try:
  71. if_obj.setAPScan(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  72. raise Exception("Invalid setAPScan() accepted")
  73. except dbus.exceptions.DBusException, e:
  74. if "InvalidOptions" not in str(e):
  75. raise Exception("Unexpected error message for invalid setAPScan: " + str(e))
  76. for p in [ path + "/Networks/12345",
  77. path + "/Networks/foo" ]:
  78. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  79. try:
  80. obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  81. raise Exception("Invalid disable() accepted")
  82. except dbus.exceptions.DBusException, e:
  83. if "InvalidNetwork" not in str(e):
  84. raise Exception("Unexpected error message for invalid disable: " + str(e))
  85. for p in [ path + "/BSSIDs/foo",
  86. path + "/BSSIDs/001122334455"]:
  87. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  88. try:
  89. obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID)
  90. raise Exception("Invalid properties() accepted")
  91. except dbus.exceptions.DBusException, e:
  92. if "InvalidBSSID" not in str(e):
  93. raise Exception("Unexpected error message for invalid properties: " + str(e))
  94. def test_dbus_old_scan(dev, apdev):
  95. """The old D-Bus interface - scanning"""
  96. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  97. hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
  98. params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
  99. params['wpa'] = '3'
  100. hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
  101. class TestDbusScan(TestDbus):
  102. def __init__(self, bus):
  103. TestDbus.__init__(self, bus)
  104. self.scan_completed = False
  105. def __enter__(self):
  106. gobject.timeout_add(1, self.run_scan)
  107. gobject.timeout_add(7000, self.timeout)
  108. self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
  109. "ScanResultsAvailable")
  110. self.loop.run()
  111. return self
  112. def scanDone(self):
  113. logger.debug("scanDone")
  114. self.scan_completed = True
  115. self.loop.quit()
  116. def run_scan(self, *args):
  117. logger.debug("run_scan")
  118. if not if_obj.scan(dbus_interface=WPAS_DBUS_OLD_IFACE):
  119. raise Exception("Failed to trigger scan")
  120. return False
  121. def success(self):
  122. return self.scan_completed
  123. with TestDbusScan(bus) as t:
  124. if not t.success():
  125. raise Exception("Expected signals not seen")
  126. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  127. if len(res) != 2:
  128. raise Exception("Unexpected number of scan results: " + str(res))
  129. for i in range(2):
  130. logger.debug("Scan result BSS path: " + res[i])
  131. bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
  132. bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  133. byte_arrays=True)
  134. logger.debug("BSS: " + str(bss))
  135. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[0])
  136. try:
  137. bss_obj.properties2(dbus_interface=WPAS_DBUS_OLD_BSSID)
  138. raise Exception("Unknown BSSID method accepted")
  139. except Exception, e:
  140. logger.debug("Unknown BSSID method exception: " + str(e))
  141. if not if_obj.flush(0, dbus_interface=WPAS_DBUS_OLD_IFACE):
  142. raise Exception("Failed to issue flush(0)")
  143. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  144. if len(res) != 0:
  145. raise Exception("Unexpected BSS entry after flush")
  146. if not if_obj.flush(1, dbus_interface=WPAS_DBUS_OLD_IFACE):
  147. raise Exception("Failed to issue flush(1)")
  148. try:
  149. if_obj.flush("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
  150. raise Exception("Invalid flush arguments accepted")
  151. except dbus.exceptions.DBusException, e:
  152. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  153. raise Exception("Unexpected error message for invalid flush: " + str(e))
  154. try:
  155. bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  156. byte_arrays=True)
  157. except dbus.exceptions.DBusException, e:
  158. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidBSSID"):
  159. raise Exception("Unexpected error message for invalid BSS: " + str(e))
  160. def test_dbus_old_debug(dev, apdev):
  161. """The old D-Bus interface - debug"""
  162. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  163. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  164. try:
  165. wpas.setDebugParams(123)
  166. raise Exception("Invalid setDebugParams accepted")
  167. except dbus.exceptions.DBusException, e:
  168. if "InvalidOptions" not in str(e):
  169. raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
  170. try:
  171. wpas.setDebugParams(123, True, True)
  172. raise Exception("Invalid setDebugParams accepted")
  173. except dbus.exceptions.DBusException, e:
  174. if "InvalidOptions" not in str(e):
  175. raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
  176. wpas.setDebugParams(1, True, True)
  177. dev[0].request("LOG_LEVEL MSGDUMP")
  178. def test_dbus_old_smartcard(dev, apdev):
  179. """The old D-Bus interface - smartcard"""
  180. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  181. params = dbus.Dictionary(signature='sv')
  182. if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
  183. params = dbus.Dictionary({ 'opensc_engine_path': "foobar1",
  184. 'pkcs11_engine_path': "foobar2",
  185. 'pkcs11_module_path': "foobar3",
  186. 'foo': 'bar' },
  187. signature='sv')
  188. params2 = dbus.Dictionary({ 'pkcs11_engine_path': "foobar2",
  189. 'foo': 'bar' },
  190. signature='sv')
  191. params3 = dbus.Dictionary({ 'pkcs11_module_path': "foobar3",
  192. 'foo2': 'bar' },
  193. signature='sv')
  194. params4 = dbus.Dictionary({ 'opensc_engine_path': "foobar4",
  195. 'foo3': 'bar' },
  196. signature='sv')
  197. tests = [ 1, params, params2, params3, params4 ]
  198. for t in tests:
  199. try:
  200. if_obj.setSmartcardModules(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  201. raise Exception("Invalid setSmartcardModules accepted: " + str(t))
  202. except dbus.exceptions.DBusException, e:
  203. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  204. raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e)))
  205. def test_dbus_old_interface(dev, apdev):
  206. """The old D-Bus interface - interface get/add/remove"""
  207. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  208. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  209. tests = [ (123, "InvalidOptions"),
  210. ("foo", "InvalidInterface") ]
  211. for (ifname,err) in tests:
  212. try:
  213. wpas.getInterface(ifname)
  214. raise Exception("Invalid getInterface accepted")
  215. except dbus.exceptions.DBusException, e:
  216. if err not in str(e):
  217. raise Exception("Unexpected error message for invalid getInterface: " + str(e))
  218. params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
  219. wpas.addInterface("lo", params)
  220. path = wpas.getInterface("lo")
  221. logger.debug("New interface path: " + str(path))
  222. wpas.removeInterface(path)
  223. try:
  224. wpas.removeInterface(path)
  225. raise Exception("Invalid removeInterface() accepted")
  226. except dbus.exceptions.DBusException, e:
  227. if "InvalidInterface" not in str(e):
  228. raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
  229. params1 = dbus.Dictionary({ 'driver': 'foo',
  230. 'driver-params': 'foo',
  231. 'config-file': 'foo',
  232. 'bridge-ifname': 'foo' },
  233. signature='sv')
  234. params2 = dbus.Dictionary({ 'foo': 'bar' }, signature='sv')
  235. tests = [ (123, None, "InvalidOptions"),
  236. ("", None, "InvalidOptions"),
  237. ("foo", None, "AddError"),
  238. ("foo", params1, "AddError"),
  239. ("foo", params2, "InvalidOptions"),
  240. ("foo", 1234, "InvalidOptions"),
  241. (dev[0].ifname, None, "ExistsError" ) ]
  242. for (ifname,params,err) in tests:
  243. try:
  244. if params is None:
  245. wpas.addInterface(ifname)
  246. else:
  247. wpas.addInterface(ifname, params)
  248. raise Exception("Invalid addInterface accepted: " + str(params))
  249. except dbus.exceptions.DBusException, e:
  250. if err not in str(e):
  251. raise Exception("Unexpected error message for invalid addInterface(%s): %s" % (str(params), str(e)))
  252. try:
  253. wpas.removeInterface(123)
  254. raise Exception("Invalid removeInterface accepted")
  255. except dbus.exceptions.DBusException, e:
  256. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  257. raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
  258. def test_dbus_old_blob(dev, apdev):
  259. """The old D-Bus interface - blob operations"""
  260. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  261. param1 = dbus.Dictionary({ 'blob3': 123 }, signature='sv')
  262. param2 = dbus.Dictionary({ 'blob3': "foo" })
  263. param3 = dbus.Dictionary({ '': dbus.ByteArray([ 1, 2 ]) },
  264. signature='sv')
  265. tests = [ (1, "InvalidOptions"),
  266. (param1, "InvalidOptions"),
  267. (param2, "InvalidOptions"),
  268. (param3, "InvalidOptions") ]
  269. for (arg,err) in tests:
  270. try:
  271. if_obj.setBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  272. raise Exception("Invalid setBlobs() accepted: " + str(arg))
  273. except dbus.exceptions.DBusException, e:
  274. logger.debug("setBlobs(%s): %s" % (str(arg), str(e)))
  275. if err not in str(e):
  276. raise Exception("Unexpected error message for invalid setBlobs: " + str(e))
  277. tests = [ (["foo"], "RemoveError: Error removing blob"),
  278. ([""], "RemoveError: Invalid blob name"),
  279. ([1], "InvalidOptions"),
  280. ("foo", "InvalidOptions") ]
  281. for (arg,err) in tests:
  282. try:
  283. if_obj.removeBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  284. raise Exception("Invalid removeBlobs() accepted: " + str(arg))
  285. except dbus.exceptions.DBusException, e:
  286. logger.debug("removeBlobs(%s): %s" % (str(arg), str(e)))
  287. if err not in str(e):
  288. raise Exception("Unexpected error message for invalid removeBlobs: " + str(e))
  289. blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
  290. 'blob2': dbus.ByteArray([ 1, 2 ]) },
  291. signature='sv')
  292. if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
  293. if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE)
  294. def test_dbus_old_connect(dev, apdev):
  295. """The old D-Bus interface - add a network and connect"""
  296. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  297. ssid = "test-wpa2-psk"
  298. passphrase = 'qwertyuiop'
  299. params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
  300. hapd = hostapd.add_ap(apdev[0]['ifname'], params)
  301. for p in [ "/no/where/to/be/found",
  302. path + "/Networks/12345",
  303. path + "/Networks/foo",
  304. "/fi/epitest/hostap/WPASupplicant/Interfaces",
  305. "/fi/epitest/hostap/WPASupplicant/Interfaces/12345/Networks/0" ]:
  306. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  307. try:
  308. if_obj.removeNetwork(obj, dbus_interface=WPAS_DBUS_OLD_IFACE)
  309. raise Exception("Invalid removeNetwork accepted: " + p)
  310. except dbus.exceptions.DBusException, e:
  311. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  312. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  313. try:
  314. if_obj.removeNetwork("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
  315. raise Exception("Invalid removeNetwork accepted")
  316. except dbus.exceptions.DBusException, e:
  317. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  318. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  319. try:
  320. if_obj.removeNetwork(path, dbus_interface=WPAS_DBUS_OLD_IFACE)
  321. raise Exception("Invalid removeNetwork accepted")
  322. except dbus.exceptions.DBusException, e:
  323. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  324. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  325. tests = [ (path, "InvalidNetwork"),
  326. (bus.get_object(WPAS_DBUS_OLD_SERVICE, "/no/where"),
  327. "InvalidInterface"),
  328. (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/1234"),
  329. "InvalidNetwork"),
  330. (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/foo"),
  331. "InvalidNetwork"),
  332. (1, "InvalidOptions") ]
  333. for t,err in tests:
  334. try:
  335. if_obj.selectNetwork(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  336. raise Exception("Invalid selectNetwork accepted: " + str(t))
  337. except dbus.exceptions.DBusException, e:
  338. if err not in str(e):
  339. raise Exception("Unexpected error message for invalid selectNetwork(%s): %s" % (str(t), str(e)))
  340. npath = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  341. if not npath.startswith(WPAS_DBUS_OLD_PATH):
  342. raise Exception("Unexpected addNetwork result: " + path)
  343. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, npath)
  344. tests = [ 123,
  345. dbus.Dictionary({ 'foo': 'bar' }, signature='sv') ]
  346. for t in tests:
  347. try:
  348. netw_obj.set(t, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  349. raise Exception("Invalid set() accepted: " + str(t))
  350. except dbus.exceptions.DBusException, e:
  351. if "InvalidOptions" not in str(e):
  352. raise Exception("Unexpected error message for invalid set: " + str(e))
  353. params = dbus.Dictionary({ 'ssid': ssid,
  354. 'key_mgmt': 'WPA-PSK',
  355. 'psk': passphrase,
  356. 'identity': dbus.ByteArray([ 1, 2 ]),
  357. 'priority': dbus.Int32(0),
  358. 'scan_freq': dbus.UInt32(2412) },
  359. signature='sv')
  360. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  361. if_obj.removeNetwork(npath, dbus_interface=WPAS_DBUS_OLD_IFACE)
  362. class TestDbusConnect(TestDbus):
  363. def __init__(self, bus):
  364. TestDbus.__init__(self, bus)
  365. self.state = 0
  366. def __enter__(self):
  367. gobject.timeout_add(1, self.run_connect)
  368. gobject.timeout_add(15000, self.timeout)
  369. self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
  370. "ScanResultsAvailable")
  371. self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
  372. "StateChange")
  373. self.loop.run()
  374. return self
  375. def scanDone(self):
  376. logger.debug("scanDone")
  377. def stateChange(self, new, old):
  378. logger.debug("stateChange(%d): %s --> %s" % (self.state, old, new))
  379. if new == "COMPLETED":
  380. if self.state == 0:
  381. self.state = 1
  382. self.netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  383. elif self.state == 2:
  384. self.state = 3
  385. if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
  386. elif self.state == 4:
  387. self.state = 5
  388. if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
  389. elif self.state == 6:
  390. self.state = 7
  391. if_obj.removeNetwork(self.path,
  392. dbus_interface=WPAS_DBUS_OLD_IFACE)
  393. try:
  394. if_obj.removeNetwork(self.path,
  395. dbus_interface=WPAS_DBUS_OLD_IFACE)
  396. raise Exception("Invalid removeNetwork accepted")
  397. except dbus.exceptions.DBusException, e:
  398. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  399. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  400. self.loop.quit()
  401. elif new == "DISCONNECTED":
  402. if self.state == 1:
  403. self.state = 2
  404. self.netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  405. elif self.state == 3:
  406. self.state = 4
  407. if_obj.selectNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  408. elif self.state == 5:
  409. self.state = 6
  410. if_obj.selectNetwork(self.path,
  411. dbus_interface=WPAS_DBUS_OLD_IFACE)
  412. def run_connect(self, *args):
  413. logger.debug("run_connect")
  414. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  415. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  416. netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  417. params = dbus.Dictionary({ 'ssid': ssid,
  418. 'key_mgmt': 'WPA-PSK',
  419. 'psk': passphrase,
  420. 'scan_freq': 2412 },
  421. signature='sv')
  422. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  423. netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  424. self.path = path
  425. self.netw_obj = netw_obj
  426. return False
  427. def success(self):
  428. return self.state == 7
  429. with TestDbusConnect(bus) as t:
  430. if not t.success():
  431. raise Exception("Expected signals not seen")
  432. if len(dev[0].list_networks()) != 0:
  433. raise Exception("Unexpected network")
  434. def test_dbus_old_connect_eap(dev, apdev):
  435. """The old D-Bus interface - add an EAP network and connect"""
  436. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  437. ssid = "test-wpa2-eap"
  438. params = hostapd.wpa2_eap_params(ssid=ssid)
  439. hapd = hostapd.add_ap(apdev[0]['ifname'], params)
  440. class TestDbusConnect(TestDbus):
  441. def __init__(self, bus):
  442. TestDbus.__init__(self, bus)
  443. self.connected = False
  444. self.certification_received = False
  445. def __enter__(self):
  446. gobject.timeout_add(1, self.run_connect)
  447. gobject.timeout_add(15000, self.timeout)
  448. self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
  449. "StateChange")
  450. self.add_signal(self.certification, WPAS_DBUS_OLD_IFACE,
  451. "Certification")
  452. self.loop.run()
  453. return self
  454. def stateChange(self, new, old):
  455. logger.debug("stateChange: %s --> %s" % (old, new))
  456. if new == "COMPLETED":
  457. self.connected = True
  458. self.loop.quit()
  459. def certification(self, depth, subject, hash, cert_hex):
  460. logger.debug("certification: depth={} subject={} hash={} cert_hex={}".format(depth, subject, hash, cert_hex))
  461. self.certification_received = True
  462. def run_connect(self, *args):
  463. logger.debug("run_connect")
  464. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  465. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  466. params = dbus.Dictionary({ 'ssid': ssid,
  467. 'key_mgmt': 'WPA-EAP',
  468. 'eap': 'TTLS',
  469. 'anonymous_identity': 'ttls',
  470. 'identity': 'pap user',
  471. 'ca_cert': 'auth_serv/ca.pem',
  472. 'phase2': 'auth=PAP',
  473. 'password': 'password',
  474. 'scan_freq': 2412 },
  475. signature='sv')
  476. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  477. netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  478. self.path = path
  479. self.netw_obj = netw_obj
  480. return False
  481. def success(self):
  482. return self.connected and self.certification_received
  483. with TestDbusConnect(bus) as t:
  484. if not t.success():
  485. raise Exception("Expected signals not seen")
  486. def test_dbus_old_wps_pbc(dev, apdev):
  487. """The old D-Bus interface and WPS/PBC"""
  488. try:
  489. return _test_dbus_old_wps_pbc(dev, apdev)
  490. finally:
  491. dev[0].request("SET wps_cred_processing 0")
  492. def _test_dbus_old_wps_pbc(dev, apdev):
  493. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  494. hapd = start_ap(apdev[0])
  495. hapd.request("WPS_PBC")
  496. bssid = apdev[0]['bssid']
  497. dev[0].scan_for_bss(bssid, freq="2412")
  498. dev[0].request("SET wps_cred_processing 2")
  499. for arg in [ 123, "123" ]:
  500. try:
  501. if_obj.wpsPbc(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  502. raise Exception("Invalid wpsPbc arguments accepted: " + str(arg))
  503. except dbus.exceptions.DBusException, e:
  504. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  505. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  506. class TestDbusWps(TestDbusOldWps):
  507. def __init__(self, bus, pbc_param):
  508. TestDbusOldWps.__init__(self, bus)
  509. self.pbc_param = pbc_param
  510. def run_wps(self, *args):
  511. logger.debug("run_wps: pbc_param=" + self.pbc_param)
  512. if_obj.wpsPbc(self.pbc_param, dbus_interface=WPAS_DBUS_OLD_IFACE)
  513. return False
  514. with TestDbusWps(bus, "any") as t:
  515. if not t.success():
  516. raise Exception("Expected signals not seen")
  517. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  518. if len(res) != 1:
  519. raise Exception("Unexpected number of scan results: " + str(res))
  520. for i in range(1):
  521. logger.debug("Scan result BSS path: " + res[i])
  522. bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
  523. bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  524. byte_arrays=True)
  525. logger.debug("BSS: " + str(bss))
  526. dev[0].wait_connected(timeout=10)
  527. dev[0].request("DISCONNECT")
  528. dev[0].wait_disconnected(timeout=10)
  529. dev[0].request("FLUSH")
  530. hapd.request("WPS_PBC")
  531. dev[0].scan_for_bss(bssid, freq="2412")
  532. with TestDbusWps(bus, bssid) as t:
  533. if not t.success():
  534. raise Exception("Expected signals not seen")
  535. dev[0].wait_connected(timeout=10)
  536. dev[0].request("DISCONNECT")
  537. dev[0].wait_disconnected(timeout=10)
  538. hapd.disable()
  539. dev[0].flush_scan_cache()
  540. def test_dbus_old_wps_pin(dev, apdev):
  541. """The old D-Bus interface and WPS/PIN"""
  542. try:
  543. return _test_dbus_old_wps_pin(dev, apdev)
  544. finally:
  545. dev[0].request("SET wps_cred_processing 0")
  546. def _test_dbus_old_wps_pin(dev, apdev):
  547. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  548. hapd = start_ap(apdev[0])
  549. hapd.request("WPS_PIN any 12345670")
  550. bssid = apdev[0]['bssid']
  551. dev[0].scan_for_bss(bssid, freq="2412")
  552. dev[0].request("SET wps_cred_processing 2")
  553. for arg in [ (123, "12345670"),
  554. ("123", "12345670") ]:
  555. try:
  556. if_obj.wpsPin(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
  557. raise Exception("Invalid wpsPin arguments accepted: " + str(arg))
  558. except dbus.exceptions.DBusException, e:
  559. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  560. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  561. class TestDbusWps(TestDbusOldWps):
  562. def __init__(self, bus, bssid, pin):
  563. TestDbusOldWps.__init__(self, bus)
  564. self.bssid = bssid
  565. self.pin = pin
  566. def run_wps(self, *args):
  567. logger.debug("run_wps %s %s" % (self.bssid, self.pin))
  568. pin = if_obj.wpsPin(self.bssid, self.pin,
  569. dbus_interface=WPAS_DBUS_OLD_IFACE)
  570. if len(self.pin) == 0:
  571. h = hostapd.Hostapd(apdev[0]['ifname'])
  572. h.request("WPS_PIN any " + pin)
  573. return False
  574. with TestDbusWps(bus, bssid, "12345670") as t:
  575. if not t.success():
  576. raise Exception("Expected signals not seen")
  577. dev[0].wait_connected(timeout=10)
  578. dev[0].request("DISCONNECT")
  579. dev[0].wait_disconnected(timeout=10)
  580. dev[0].request("FLUSH")
  581. dev[0].scan_for_bss(bssid, freq="2412")
  582. with TestDbusWps(bus, "any", "") as t:
  583. if not t.success():
  584. raise Exception("Expected signals not seen")
  585. def test_dbus_old_wps_reg(dev, apdev):
  586. """The old D-Bus interface and WPS/Registar"""
  587. try:
  588. return _test_dbus_old_wps_reg(dev, apdev)
  589. finally:
  590. dev[0].request("SET wps_cred_processing 0")
  591. def _test_dbus_old_wps_reg(dev, apdev):
  592. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  593. hapd = start_ap(apdev[0])
  594. bssid = apdev[0]['bssid']
  595. dev[0].scan_for_bss(bssid, freq="2412")
  596. dev[0].request("SET wps_cred_processing 2")
  597. for arg in [ (123, "12345670"),
  598. ("123", "12345670") ]:
  599. try:
  600. if_obj.wpsReg(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
  601. raise Exception("Invalid wpsReg arguments accepted: " + str(arg))
  602. except dbus.exceptions.DBusException, e:
  603. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  604. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  605. class TestDbusWps(TestDbusOldWps):
  606. def run_wps(self, *args):
  607. logger.debug("run_wps")
  608. if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
  609. return False
  610. with TestDbusWps(bus) as t:
  611. if not t.success():
  612. raise Exception("Expected signals not seen")
  613. dev[0].wait_connected(timeout=10)