hwsim_utils.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # hwsim testing utilities
  2. # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import os
  7. import subprocess
  8. import time
  9. import logging
  10. logger = logging.getLogger()
  11. from wpasupplicant import WpaSupplicant
  12. def test_connectivity_run(ifname1, ifname2, dscp=None, tos=None, max_tries=1):
  13. if os.path.isfile("../../mac80211_hwsim/tools/hwsim_test"):
  14. hwsim_test = "../../mac80211_hwsim/tools/hwsim_test"
  15. else:
  16. hwsim_test = "hwsim_test"
  17. cmd = ["sudo",
  18. hwsim_test,
  19. ifname1,
  20. ifname2]
  21. if dscp:
  22. cmd.append('-D')
  23. cmd.append(str(dscp))
  24. elif tos:
  25. cmd.append('-t')
  26. cmd.append(str(tos))
  27. success = False
  28. for i in range(0, max_tries):
  29. try:
  30. s = subprocess.check_output(cmd)
  31. logger.debug(s)
  32. success = True
  33. break
  34. except subprocess.CalledProcessError, e:
  35. logger.info("hwsim failed: " + str(e.returncode))
  36. logger.info(e.output)
  37. if i + 1 < max_tries:
  38. time.sleep(1)
  39. if not success:
  40. raise Exception("hwsim_test failed")
  41. def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False):
  42. addr1 = dev1.own_addr()
  43. if not dev1group and isinstance(dev1, WpaSupplicant):
  44. addr1 = dev1.get_driver_status_field('addr')
  45. addr2 = dev2.own_addr()
  46. if not dev2group and isinstance(dev2, WpaSupplicant):
  47. addr2 = dev2.get_driver_status_field('addr')
  48. try:
  49. cmd = "DATA_TEST_CONFIG 1"
  50. if dev1group:
  51. res = dev1.group_request(cmd)
  52. else:
  53. res = dev1.request(cmd)
  54. if "OK" not in res:
  55. raise Exception("Failed to enable data test functionality")
  56. if dev2group:
  57. res = dev2.group_request(cmd)
  58. else:
  59. res = dev2.request(cmd)
  60. if "OK" not in res:
  61. raise Exception("Failed to enable data test functionality")
  62. cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
  63. if dev1group:
  64. dev1.group_request(cmd)
  65. else:
  66. dev1.request(cmd)
  67. if dev2group:
  68. ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
  69. else:
  70. ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
  71. if ev is None:
  72. raise Exception("dev1->dev2 unicast data delivery failed")
  73. if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
  74. raise Exception("Unexpected dev1->dev2 unicast data result")
  75. cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
  76. if dev1group:
  77. dev1.group_request(cmd)
  78. else:
  79. dev1.request(cmd)
  80. if dev2group:
  81. ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
  82. else:
  83. ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
  84. if ev is None:
  85. raise Exception("dev1->dev2 broadcast data delivery failed")
  86. if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
  87. raise Exception("Unexpected dev1->dev2 broadcast data result")
  88. cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
  89. if dev2group:
  90. dev2.group_request(cmd)
  91. else:
  92. dev2.request(cmd)
  93. if dev1group:
  94. ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
  95. else:
  96. ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
  97. if ev is None:
  98. raise Exception("dev2->dev1 unicast data delivery failed")
  99. if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
  100. raise Exception("Unexpected dev2->dev1 unicast data result")
  101. cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
  102. if dev2group:
  103. dev2.group_request(cmd)
  104. else:
  105. dev2.request(cmd)
  106. if dev1group:
  107. ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
  108. else:
  109. ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
  110. if ev is None:
  111. raise Exception("dev2->dev1 broadcast data delivery failed")
  112. if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
  113. raise Exception("Unexpected dev2->dev1 broadcast data result")
  114. finally:
  115. if dev1group:
  116. dev1.group_request("DATA_TEST_CONFIG 0")
  117. else:
  118. dev1.request("DATA_TEST_CONFIG 0")
  119. if dev2group:
  120. dev2.group_request("DATA_TEST_CONFIG 0")
  121. else:
  122. dev2.request("DATA_TEST_CONFIG 0")
  123. def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1, dev1group=False, dev2group=False):
  124. if dscp:
  125. tos = dscp << 2
  126. if not tos:
  127. tos = 0
  128. success = False
  129. last_err = None
  130. for i in range(0, max_tries):
  131. try:
  132. run_connectivity_test(dev1, dev2, tos, dev1group, dev2group)
  133. success = True
  134. break
  135. except Exception, e:
  136. last_err = e
  137. if i + 1 < max_tries:
  138. time.sleep(1)
  139. if not success:
  140. raise Exception(last_err)
  141. def test_connectivity_iface(dev1, ifname, dscp=None, tos=None, max_tries=1):
  142. test_connectivity_run(dev1.ifname, ifname, dscp=dscp, tos=tos,
  143. max_tries=max_tries)
  144. def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
  145. test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
  146. def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
  147. test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
  148. def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
  149. test_connectivity(dev1, dev2, dscp, tos)