import pytest import subprocess import pathlib import os import json import threading from plugins.agl_test_base import AGLBaseTest class CrashmeBase(AGLBaseTest): def __init__(self): super().__init__(name="crashme") def killer(self): killer_cmd = "killall -15 crashme" subprocess.run(killer_cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # killer eta due to env variable "STRESS_VALE", 5 per cent larger than runtime in spec file def setup_killer_eta_timer(self): # Get env 'STRESS_LEVEL' env_dist=os.environ local_stress_level = str(env_dist.get("STRESS_LEVEL")) # If not set correctly or just not set, set default to "low" if local_stress_level not in ("low", "mid", "high"): local_stress_level = "low" # expected_runtime_limit is counted by seconds, spare 5 per cent jam time if local_stress_level == "low": expected_runtime_limit = 945 elif local_stress_level == "mid": expected_runtime_limit = 1890 elif local_stress_level == "high": expected_runtime_limit = 3780 return expected_runtime_limit def setup_runtest_params(self): # Get env 'STRESS_LEVEL' env_dist=os.environ local_stress_level = str(env_dist.get("STRESS_LEVEL")) # If not set correctly or just not set, set default to "low" if local_stress_level not in ("low", "mid", "high"): local_stress_level = "low" # Read dictionary data out of spec.json spec_file_location = self.get_spec_path() fp = open(spec_file_location, 'r') json_data = fp.read() spec_dict = json.loads(json_data) dict_key="stress_level_" + str(local_stress_level) local_param_nbytes = spec_dict[dict_key]['NBYTES'] local_param_srand = spec_dict[dict_key]['SRAND'] local_param_ntries = spec_dict[dict_key]['NTRIES'] local_param_nsub = spec_dict[dict_key]['NSUB'] param_string = str(local_param_nbytes) + ' ' \ + str(local_param_srand) + ' ' \ + str(local_param_ntries) + ' ' \ + str(local_param_nsub) return param_string def run_test_fun(self): log_file = self.get_logfile() cwd_buf = self.get_temp_logdir() oss_workdir = self.get_workdir() runtest_param_buf = self.setup_runtest_params() run_test_cmd = oss_workdir + 'crashme ' + runtest_param_buf + \ ' | grep "^Test complete\\|^exit status\\|' + \ '^child_kill_count\\|[.]\\{3\\}\\|^Number of distinct cases"' # call killer to come ETA in some seconds eta_time = self.setup_killer_eta_timer() countdown = threading.Timer(eta_time, self.killer) countdown.start() with open(log_file, 'w') as log_f: run_test_crashme = subprocess.run(run_test_cmd, shell=True, cwd=cwd_buf, stdout=log_f, stderr=subprocess.DEVNULL) log_f.close() if (run_test_crashme.returncode == 0): self.case_info_list = {'test_crashme': ['test_crashme', str(run_test_crashme.returncode), 'passed']} else: self.case_info_list = {'test_crashme': ['test_crashme', str(run_test_crashme.returncode), 'failed']} def precheck(self): test_file_location = self.get_workdir() + "/crashme" path_checker = pathlib.Path(test_file_location) return super().precheck() and path_checker.is_file() @pytest.fixture(scope='module') def testbase(): #init instance for test instance = CrashmeBase() #run test scripts instance.run_test_fun() yield instance #package log files and make report file instance.log_report() def precheck(): instance = CrashmeBase() return instance.precheck() skip_msg = "The current environment does not match the test requirements." pytestmark = pytest.mark.skipif(precheck() == False, reason = skip_msg) @pytest.mark.dangerous @pytest.mark.order("last") def test_crashme(testbase: CrashmeBase): assert testbase.case_info_list['test_crashme'][1] == '0' if __name__ == '__main__': pytest.main("-s run_tests")