test_wnm.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. # WNM tests
  2. # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import binascii
  7. import struct
  8. import time
  9. import logging
  10. logger = logging.getLogger()
  11. import hostapd
  12. from wlantest import Wlantest
  13. def test_wnm_bss_transition_mgmt(dev, apdev):
  14. """WNM BSS Transition Management"""
  15. params = { "ssid": "test-wnm",
  16. "time_advertisement": "2",
  17. "time_zone": "EST5",
  18. "wnm_sleep_mode": "1",
  19. "bss_transition": "1" }
  20. hostapd.add_ap(apdev[0]['ifname'], params)
  21. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  22. dev[0].request("WNM_BSS_QUERY 0")
  23. def test_wnm_disassoc_imminent(dev, apdev):
  24. """WNM Disassociation Imminent"""
  25. params = { "ssid": "test-wnm",
  26. "time_advertisement": "2",
  27. "time_zone": "EST5",
  28. "wnm_sleep_mode": "1",
  29. "bss_transition": "1" }
  30. hostapd.add_ap(apdev[0]['ifname'], params)
  31. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  32. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  33. addr = dev[0].p2p_interface_addr()
  34. hapd.request("DISASSOC_IMMINENT " + addr + " 10")
  35. ev = dev[0].wait_event(["WNM: Disassociation Imminent"])
  36. if ev is None:
  37. raise Exception("Timeout while waiting for disassociation imminent")
  38. if "Disassociation Timer 10" not in ev:
  39. raise Exception("Unexpected disassociation imminent contents")
  40. ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
  41. if ev is None:
  42. raise Exception("Timeout while waiting for re-connection scan")
  43. def test_wnm_ess_disassoc_imminent(dev, apdev):
  44. """WNM ESS Disassociation Imminent"""
  45. params = { "ssid": "test-wnm",
  46. "time_advertisement": "2",
  47. "time_zone": "EST5",
  48. "wnm_sleep_mode": "1",
  49. "bss_transition": "1" }
  50. hostapd.add_ap(apdev[0]['ifname'], params)
  51. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  52. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  53. addr = dev[0].p2p_interface_addr()
  54. hapd.request("ESS_DISASSOC " + addr + " 10 http://example.com/session-info")
  55. ev = dev[0].wait_event(["ESS-DISASSOC-IMMINENT"])
  56. if ev is None:
  57. raise Exception("Timeout while waiting for ESS disassociation imminent")
  58. if "0 1024 http://example.com/session-info" not in ev:
  59. raise Exception("Unexpected ESS disassociation imminent message contents")
  60. ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
  61. if ev is None:
  62. raise Exception("Timeout while waiting for re-connection scan")
  63. def test_wnm_ess_disassoc_imminent_pmf(dev, apdev):
  64. """WNM ESS Disassociation Imminent"""
  65. params = hostapd.wpa2_params("test-wnm-rsn", "12345678")
  66. params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
  67. params["ieee80211w"] = "2";
  68. params["bss_transition"] = "1"
  69. hostapd.add_ap(apdev[0]['ifname'], params)
  70. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  71. dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2",
  72. key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
  73. addr = dev[0].p2p_interface_addr()
  74. hapd.request("ESS_DISASSOC " + addr + " 10 http://example.com/session-info")
  75. ev = dev[0].wait_event(["ESS-DISASSOC-IMMINENT"])
  76. if ev is None:
  77. raise Exception("Timeout while waiting for ESS disassociation imminent")
  78. if "1 1024 http://example.com/session-info" not in ev:
  79. raise Exception("Unexpected ESS disassociation imminent message contents")
  80. ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
  81. if ev is None:
  82. raise Exception("Timeout while waiting for re-connection scan")
  83. def check_wnm_sleep_mode_enter_exit(hapd, dev):
  84. addr = dev.p2p_interface_addr()
  85. sta = hapd.get_sta(addr)
  86. if "[WNM_SLEEP_MODE]" in sta['flags']:
  87. raise Exception("Station unexpectedly in WNM-Sleep Mode")
  88. logger.info("Going to WNM Sleep Mode")
  89. dev.request("WNM_SLEEP enter")
  90. time.sleep(0.5)
  91. sta = hapd.get_sta(addr)
  92. if "[WNM_SLEEP_MODE]" not in sta['flags']:
  93. raise Exception("Station failed to enter WNM-Sleep Mode")
  94. logger.info("Waking up from WNM Sleep Mode")
  95. dev.request("WNM_SLEEP exit")
  96. time.sleep(0.5)
  97. sta = hapd.get_sta(addr)
  98. if "[WNM_SLEEP_MODE]" in sta['flags']:
  99. raise Exception("Station failed to exit WNM-Sleep Mode")
  100. def test_wnm_sleep_mode_open(dev, apdev):
  101. """WNM Sleep Mode - open"""
  102. params = { "ssid": "test-wnm",
  103. "time_advertisement": "2",
  104. "time_zone": "EST5",
  105. "wnm_sleep_mode": "1",
  106. "bss_transition": "1" }
  107. hostapd.add_ap(apdev[0]['ifname'], params)
  108. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  109. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  110. check_wnm_sleep_mode_enter_exit(hapd, dev[0])
  111. def test_wnm_sleep_mode_rsn(dev, apdev):
  112. """WNM Sleep Mode - RSN"""
  113. params = hostapd.wpa2_params("test-wnm-rsn", "12345678")
  114. params["time_advertisement"] = "2"
  115. params["time_zone"] = "EST5"
  116. params["wnm_sleep_mode"] = "1"
  117. params["bss_transition"] = "1"
  118. hostapd.add_ap(apdev[0]['ifname'], params)
  119. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  120. dev[0].connect("test-wnm-rsn", psk="12345678", scan_freq="2412")
  121. check_wnm_sleep_mode_enter_exit(hapd, dev[0])
  122. def test_wnm_sleep_mode_rsn_pmf(dev, apdev):
  123. """WNM Sleep Mode - RSN with PMF"""
  124. wt = Wlantest()
  125. wt.flush()
  126. wt.add_passphrase("12345678")
  127. params = hostapd.wpa2_params("test-wnm-rsn", "12345678")
  128. params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
  129. params["ieee80211w"] = "2";
  130. params["time_advertisement"] = "2"
  131. params["time_zone"] = "EST5"
  132. params["wnm_sleep_mode"] = "1"
  133. params["bss_transition"] = "1"
  134. hostapd.add_ap(apdev[0]['ifname'], params)
  135. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  136. dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2",
  137. key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
  138. check_wnm_sleep_mode_enter_exit(hapd, dev[0])
  139. MGMT_SUBTYPE_ACTION = 13
  140. ACTION_CATEG_WNM = 10
  141. WNM_ACT_BSS_TM_REQ = 7
  142. WNM_ACT_BSS_TM_RESP = 8
  143. def bss_tm_req(dst, src, dialog_token=1, req_mode=0, disassoc_timer=0,
  144. validity_interval=1):
  145. msg = {}
  146. msg['fc'] = MGMT_SUBTYPE_ACTION << 4
  147. msg['da'] = dst
  148. msg['sa'] = src
  149. msg['bssid'] = src
  150. msg['payload'] = struct.pack("<BBBBHB",
  151. ACTION_CATEG_WNM, WNM_ACT_BSS_TM_REQ,
  152. dialog_token, req_mode, disassoc_timer,
  153. validity_interval)
  154. return msg
  155. def rx_bss_tm_resp(hapd, expect_dialog=None, expect_status=None):
  156. for i in range(0, 100):
  157. resp = hapd.mgmt_rx()
  158. if resp is None:
  159. raise Exception("No BSS TM Response received")
  160. if resp['subtype'] == MGMT_SUBTYPE_ACTION:
  161. break
  162. if i == 99:
  163. raise Exception("Not an Action frame")
  164. payload = resp['payload']
  165. if len(payload) < 2 + 3:
  166. raise Exception("Too short payload")
  167. (category, action) = struct.unpack('BB', payload[0:2])
  168. if category != ACTION_CATEG_WNM or action != WNM_ACT_BSS_TM_RESP:
  169. raise Exception("Not a BSS TM Response")
  170. pos = payload[2:]
  171. (dialog, status, bss_term_delay) = struct.unpack('BBB', pos[0:3])
  172. resp['dialog'] = dialog
  173. resp['status'] = status
  174. resp['bss_term_delay'] = bss_term_delay
  175. pos = pos[3:]
  176. if len(pos) >= 6 and status == 0:
  177. resp['target_bssid'] = binascii.hexlify(pos[0:6])
  178. pos = pos[6:]
  179. resp['candidates'] = pos
  180. if expect_dialog is not None and dialog != expect_dialog:
  181. raise Exception("Unexpected dialog token")
  182. if expect_status is not None and status != expect_status:
  183. raise Exception("Unexpected status code %d" % status)
  184. return resp
  185. def except_ack(hapd):
  186. ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
  187. if ev is None:
  188. raise Exception("Missing TX status")
  189. if "ok=1" not in ev:
  190. raise Exception("Action frame not acknowledged")
  191. def test_wnm_bss_tm_req(dev, apdev):
  192. """BSS Transition Management Request"""
  193. params = { "ssid": "test-wnm", "bss_transition": "1" }
  194. hapd = hostapd.add_ap(apdev[0]['ifname'], params)
  195. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  196. hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
  197. hapd.set("ext_mgmt_frame_handling", "1")
  198. # truncated BSS TM Request
  199. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  200. req_mode=0x08)
  201. req['payload'] = struct.pack("<BBBBH",
  202. ACTION_CATEG_WNM, WNM_ACT_BSS_TM_REQ,
  203. 1, 0, 0)
  204. hapd.mgmt_tx(req)
  205. except_ack(hapd)
  206. # no disassociation and no candidate list
  207. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  208. dialog_token=2)
  209. hapd.mgmt_tx(req)
  210. resp = rx_bss_tm_resp(hapd, expect_dialog=2, expect_status=1)
  211. # truncated BSS Termination Duration
  212. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  213. req_mode=0x08)
  214. hapd.mgmt_tx(req)
  215. except_ack(hapd)
  216. # BSS Termination Duration with TSF=0 and Duration=10
  217. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  218. req_mode=0x08, dialog_token=3)
  219. req['payload'] += struct.pack("<BBQH", 4, 10, 0, 10)
  220. hapd.mgmt_tx(req)
  221. resp = rx_bss_tm_resp(hapd, expect_dialog=3, expect_status=1)
  222. # truncated Session Information URL
  223. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  224. req_mode=0x10)
  225. hapd.mgmt_tx(req)
  226. except_ack(hapd)
  227. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  228. req_mode=0x10)
  229. req['payload'] += struct.pack("<BBB", 3, 65, 66)
  230. hapd.mgmt_tx(req)
  231. except_ack(hapd)
  232. # Session Information URL
  233. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  234. req_mode=0x10, dialog_token=4)
  235. req['payload'] += struct.pack("<BBB", 2, 65, 66)
  236. hapd.mgmt_tx(req)
  237. resp = rx_bss_tm_resp(hapd, expect_dialog=4, expect_status=0)
  238. # Preferred Candidate List without any entries
  239. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  240. req_mode=0x01, dialog_token=5)
  241. hapd.mgmt_tx(req)
  242. resp = rx_bss_tm_resp(hapd, expect_dialog=5, expect_status=1)
  243. # Preferred Candidate List with a truncated entry
  244. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  245. req_mode=0x01)
  246. req['payload'] += struct.pack("<BB", 52, 1)
  247. hapd.mgmt_tx(req)
  248. except_ack(hapd)
  249. # Preferred Candidate List with a too short entry
  250. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  251. req_mode=0x01, dialog_token=6)
  252. req['payload'] += struct.pack("<BB", 52, 0)
  253. hapd.mgmt_tx(req)
  254. resp = rx_bss_tm_resp(hapd, expect_dialog=6, expect_status=1)
  255. # Preferred Candidate List with a non-matching entry
  256. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  257. req_mode=0x01, dialog_token=6)
  258. req['payload'] += struct.pack("<BB6BLBBB", 52, 13,
  259. 1, 2, 3, 4, 5, 6,
  260. 0, 81, 1, 7)
  261. hapd.mgmt_tx(req)
  262. resp = rx_bss_tm_resp(hapd, expect_dialog=6, expect_status=1)
  263. # Preferred Candidate List with a truncated subelement
  264. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  265. req_mode=0x01, dialog_token=7)
  266. req['payload'] += struct.pack("<BB6BLBBBBB", 52, 13 + 2,
  267. 1, 2, 3, 4, 5, 6,
  268. 0, 81, 1, 7,
  269. 1, 1)
  270. hapd.mgmt_tx(req)
  271. resp = rx_bss_tm_resp(hapd, expect_dialog=7, expect_status=1)
  272. # Preferred Candidate List with lots of invalid optional subelements
  273. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  274. req_mode=0x01, dialog_token=8)
  275. subelems = struct.pack("<BBHB", 1, 3, 0, 100)
  276. subelems += struct.pack("<BBB", 2, 1, 65)
  277. subelems += struct.pack("<BB", 3, 0)
  278. subelems += struct.pack("<BBQB", 4, 9, 0, 10)
  279. subelems += struct.pack("<BBHLB", 5, 7, 0, 0, 0)
  280. subelems += struct.pack("<BB", 66, 0)
  281. subelems += struct.pack("<BBBBBB", 70, 4, 0, 0, 0, 0)
  282. subelems += struct.pack("<BB", 71, 0)
  283. req['payload'] += struct.pack("<BB6BLBBB", 52, 13 + len(subelems),
  284. 1, 2, 3, 4, 5, 6,
  285. 0, 81, 1, 7) + subelems
  286. hapd.mgmt_tx(req)
  287. resp = rx_bss_tm_resp(hapd, expect_dialog=8, expect_status=1)
  288. # Preferred Candidate List with lots of valid optional subelements (twice)
  289. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  290. req_mode=0x01, dialog_token=8)
  291. # TSF Information
  292. subelems = struct.pack("<BBHH", 1, 4, 0, 100)
  293. # Condensed Country String
  294. subelems += struct.pack("<BBBB", 2, 2, 65, 66)
  295. # BSS Transition Candidate Preference
  296. subelems += struct.pack("<BBB", 3, 1, 100)
  297. # BSS Termination Duration
  298. subelems += struct.pack("<BBQH", 4, 10, 0, 10)
  299. # Bearing
  300. subelems += struct.pack("<BBHLH", 5, 8, 0, 0, 0)
  301. # Measurement Pilot Transmission
  302. subelems += struct.pack("<BBBBB", 66, 3, 0, 0, 0)
  303. # RM Enabled Capabilities
  304. subelems += struct.pack("<BBBBBBB", 70, 5, 0, 0, 0, 0, 0)
  305. # Multiple BSSID
  306. subelems += struct.pack("<BBBB", 71, 2, 0, 0)
  307. req['payload'] += struct.pack("<BB6BLBBB", 52, 13 + len(subelems) * 2,
  308. 1, 2, 3, 4, 5, 6,
  309. 0, 81, 1, 7) + subelems + subelems
  310. hapd.mgmt_tx(req)
  311. resp = rx_bss_tm_resp(hapd, expect_dialog=8, expect_status=1)