path: root/external/poky/meta/lib/oeqa/utils
diff options
Diffstat (limited to 'external/poky/meta/lib/oeqa/utils')
19 files changed, 3212 insertions, 0 deletions
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..d38a3230
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,103 @@
+# Enable other layers to have modules in the same named directory
+from pkgutil import extend_path
+__path__ = extend_path(__path__, __name__)
+# Borrowed from CalledProcessError
+class CommandError(Exception):
+ def __init__(self, retcode, cmd, output = None):
+ self.retcode = retcode
+ self.cmd = cmd
+ self.output = output
+ def __str__(self):
+ return "Command '%s' returned non-zero exit status %d with output: %s" % (self.cmd, self.retcode, self.output)
+def avoid_paths_in_environ(paths):
+ """
+ Searches for every path in os.environ['PATH']
+ if found remove it.
+ Returns new PATH without avoided PATHs.
+ """
+ import os
+ new_path = ''
+ for p in os.environ['PATH'].split(':'):
+ avoid = False
+ for pa in paths:
+ if pa in p:
+ avoid = True
+ break
+ if avoid:
+ continue
+ new_path = new_path + p + ':'
+ new_path = new_path[:-1]
+ return new_path
+def make_logger_bitbake_compatible(logger):
+ import logging
+ """
+ Bitbake logger redifines debug() in order to
+ set a level within debug, this breaks compatibility
+ with vainilla logging, so we neeed to redifine debug()
+ method again also add info() method with INFO + 1 level.
+ """
+ def _bitbake_log_debug(*args, **kwargs):
+ lvl = logging.DEBUG
+ if isinstance(args[0], int):
+ lvl = args[0]
+ msg = args[1]
+ args = args[2:]
+ else:
+ msg = args[0]
+ args = args[1:]
+ logger.log(lvl, msg, *args, **kwargs)
+ def _bitbake_log_info(msg, *args, **kwargs):
+ logger.log(logging.INFO + 1, msg, *args, **kwargs)
+ logger.debug = _bitbake_log_debug
+ = _bitbake_log_info
+ return logger
+def load_test_components(logger, executor):
+ import sys
+ import os
+ import importlib
+ from oeqa.core.context import OETestContextExecutor
+ components = {}
+ for path in sys.path:
+ base_dir = os.path.join(path, 'oeqa')
+ if os.path.exists(base_dir) and os.path.isdir(base_dir):
+ for file in os.listdir(base_dir):
+ comp_name = file
+ comp_context = os.path.join(base_dir, file, '')
+ if os.path.exists(comp_context):
+ comp_plugin = importlib.import_module('oeqa.%s.%s' % \
+ (comp_name, 'context'))
+ try:
+ if not issubclass(comp_plugin._executor_class,
+ OETestContextExecutor):
+ raise TypeError("Component %s in %s, _executor_class "\
+ "isn't derived from OETestContextExecutor."\
+ % (comp_name, comp_context))
+ if comp_plugin._executor_class._script_executor \
+ != executor:
+ continue
+ components[comp_name] = comp_plugin._executor_class()
+ except AttributeError:
+ raise AttributeError("Component %s in %s don't have "\
+ "_executor_class defined." % (comp_name, comp_context))
+ return components
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..01a803ab
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,61 @@
+# Copyright (C) 2013-2016 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+# Provides a class for automating build tests for projects
+import os
+import re
+import subprocess
+import shutil
+import tempfile
+from abc import ABCMeta, abstractmethod
+class BuildProject(metaclass=ABCMeta):
+ def __init__(self, uri, foldername=None, tmpdir=None, dl_dir=None):
+ self.uri = uri
+ self.archive = os.path.basename(uri)
+ if not tmpdir:
+ self.tempdirobj = tempfile.TemporaryDirectory(prefix='buildproject-')
+ tmpdir =
+ self.localarchive = os.path.join(tmpdir, self.archive)
+ self.dl_dir = dl_dir
+ if foldername:
+ self.fname = foldername
+ else:
+ self.fname = re.sub(r'\.tar\.bz2$|\.tar\.gz$|\.tar\.xz$', '', self.archive)
+ self.needclean = False
+ # Download self.archive to self.localarchive
+ def _download_archive(self):
+ self.needclean = True
+ if self.dl_dir and os.path.exists(os.path.join(self.dl_dir, self.archive)):
+ shutil.copyfile(os.path.join(self.dl_dir, self.archive), self.localarchive)
+ return
+ cmd = "wget -O %s %s" % (self.localarchive, self.uri)
+ subprocess.check_output(cmd, shell=True)
+ # This method should provide a way to run a command in the desired environment.
+ @abstractmethod
+ def _run(self, cmd):
+ pass
+ # The timeout parameter of is set to 0 to make the ssh command
+ # run with no timeout.
+ def run_configure(self, configure_args='', extra_cmds=''):
+ return self._run('cd %s; gnu-configize; %s ./configure %s' % (self.targetdir, extra_cmds, configure_args))
+ def run_make(self, make_args=''):
+ return self._run('cd %s; make %s' % (self.targetdir, make_args))
+ def run_install(self, install_args=''):
+ return self._run('cd %s; make install %s' % (self.targetdir, install_args))
+ def clean(self):
+ if not self.needclean:
+ return
+ self._run('rm -rf %s' % self.targetdir)
+ subprocess.check_call('rm -f %s' % self.localarchive, shell=True)
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..2e6a2289
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,364 @@
+# Copyright (c) 2013-2014 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+# This module is mainly used by scripts/oe-selftest and modules under meta/oeqa/selftest
+# It provides a class and methods for running commands on the host in a convienent way for tests.
+import os
+import sys
+import signal
+import subprocess
+import threading
+import time
+import logging
+from oeqa.utils import CommandError
+from oeqa.utils import ftools
+import re
+import contextlib
+# Export test doesn't require bb
+ import bb
+except ImportError:
+ pass
+class Command(object):
+ def __init__(self, command, bg=False, timeout=None, data=None, output_log=None, **options):
+ self.defaultopts = {
+ "stdout": subprocess.PIPE,
+ "stderr": subprocess.STDOUT,
+ "stdin": None,
+ "shell": False,
+ "bufsize": -1,
+ }
+ self.cmd = command
+ = bg
+ self.timeout = timeout
+ = data
+ self.options = dict(self.defaultopts)
+ if isinstance(self.cmd, str):
+ self.options["shell"] = True
+ if
+ self.options['stdin'] = subprocess.PIPE
+ self.options.update(options)
+ self.status = None
+ # We collect chunks of output before joining them at the end.
+ self._output_chunks = []
+ self._error_chunks = []
+ self.output = None
+ self.error = None
+ self.threads = []
+ self.output_log = output_log
+ self.log = logging.getLogger("utils.commands")
+ def run(self):
+ self.process = subprocess.Popen(self.cmd, **self.options)
+ def readThread(output, stream, logfunc):
+ if logfunc:
+ for line in stream:
+ output.append(line)
+ logfunc(line.decode("utf-8", errors='replace').rstrip())
+ else:
+ output.append(
+ def readStderrThread():
+ readThread(self._error_chunks, self.process.stderr, self.output_log.error if self.output_log else None)
+ def readStdoutThread():
+ readThread(self._output_chunks, self.process.stdout, if self.output_log else None)
+ def writeThread():
+ try:
+ self.process.stdin.write(
+ self.process.stdin.close()
+ except OSError as ex:
+ # It's not an error when the command does not consume all
+ # of our data. subprocess.communicate() also ignores that.
+ if ex.errno != EPIPE:
+ raise
+ # We write in a separate thread because then we can read
+ # without worrying about deadlocks. The additional thread is
+ # expected to terminate by itself and we mark it as a daemon,
+ # so even it should happen to not terminate for whatever
+ # reason, the main process will still exit, which will then
+ # kill the write thread.
+ if
+ threading.Thread(target=writeThread, daemon=True).start()
+ if self.process.stderr:
+ thread = threading.Thread(target=readStderrThread)
+ thread.start()
+ self.threads.append(thread)
+ if self.output_log:
+'Running: %s' % self.cmd)
+ thread = threading.Thread(target=readStdoutThread)
+ thread.start()
+ self.threads.append(thread)
+ self.log.debug("Running command '%s'" % self.cmd)
+ if not
+ if self.timeout is None:
+ for thread in self.threads:
+ thread.join()
+ else:
+ deadline = time.time() + self.timeout
+ for thread in self.threads:
+ timeout = deadline - time.time()
+ if timeout < 0:
+ timeout = 0
+ thread.join(timeout)
+ self.stop()
+ def stop(self):
+ for thread in self.threads:
+ if thread.isAlive():
+ self.process.terminate()
+ # let's give it more time to terminate gracefully before killing it
+ thread.join(5)
+ if thread.isAlive():
+ self.process.kill()
+ thread.join()
+ def finalize_output(data):
+ if not data:
+ data = ""
+ else:
+ data = b"".join(data)
+ data = data.decode("utf-8", errors='replace').rstrip()
+ return data
+ self.output = finalize_output(self._output_chunks)
+ self._output_chunks = None
+ # self.error used to be a byte string earlier, probably unintentionally.
+ # Now it is a normal string, just like self.output.
+ self.error = finalize_output(self._error_chunks)
+ self._error_chunks = None
+ # At this point we know that the process has closed stdout/stderr, so
+ # it is safe and necessary to wait for the actual process completion.
+ self.status = self.process.wait()
+ self.process.stdout.close()
+ if self.process.stderr:
+ self.process.stderr.close()
+ self.log.debug("Command '%s' returned %d as exit code." % (self.cmd, self.status))
+ # logging the complete output is insane
+ # bitbake -e output is really big
+ # and makes the log file useless
+ if self.status:
+ lout = "\n".join(self.output.splitlines()[-20:])
+ self.log.debug("Last 20 lines:\n%s" % lout)
+class Result(object):
+ pass
+def runCmd(command, ignore_status=False, timeout=None, assert_error=True,
+ native_sysroot=None, limit_exc_output=0, output_log=None, **options):
+ result = Result()
+ if native_sysroot:
+ extra_paths = "%s/sbin:%s/usr/sbin:%s/usr/bin" % \
+ (native_sysroot, native_sysroot, native_sysroot)
+ nenv = dict(options.get('env', os.environ))
+ nenv['PATH'] = extra_paths + ':' + nenv.get('PATH', '')
+ options['env'] = nenv
+ cmd = Command(command, timeout=timeout, output_log=output_log, **options)
+ result.command = command
+ result.status = cmd.status
+ result.output = cmd.output
+ result.error = cmd.error
+ =
+ if result.status and not ignore_status:
+ exc_output = result.output
+ if limit_exc_output > 0:
+ split = result.output.splitlines()
+ if len(split) > limit_exc_output:
+ exc_output = "\n... (last %d lines of output)\n" % limit_exc_output + \
+ '\n'.join(split[-limit_exc_output:])
+ if assert_error:
+ raise AssertionError("Command '%s' returned non-zero exit status %d:\n%s" % (command, result.status, exc_output))
+ else:
+ raise CommandError(result.status, command, exc_output)
+ return result
+def bitbake(command, ignore_status=False, timeout=None, postconfig=None, output_log=None, **options):
+ if postconfig:
+ postconfig_file = os.path.join(os.environ.get('BUILDDIR'), 'oeqa-post.conf')
+ ftools.write_file(postconfig_file, postconfig)
+ extra_args = "-R %s" % postconfig_file
+ else:
+ extra_args = ""
+ if isinstance(command, str):
+ cmd = "bitbake " + extra_args + " " + command
+ else:
+ cmd = [ "bitbake" ] + [a for a in (command + extra_args.split(" ")) if a not in [""]]
+ try:
+ return runCmd(cmd, ignore_status, timeout, output_log=output_log, **options)
+ finally:
+ if postconfig:
+ os.remove(postconfig_file)
+def get_bb_env(target=None, postconfig=None):
+ if target:
+ return bitbake("-e %s" % target, postconfig=postconfig).output
+ else:
+ return bitbake("-e", postconfig=postconfig).output
+def get_bb_vars(variables=None, target=None, postconfig=None):
+ """Get values of multiple bitbake variables"""
+ bbenv = get_bb_env(target, postconfig=postconfig)
+ if variables is not None:
+ variables = list(variables)
+ var_re = re.compile(r'^(export )?(?P<var>\w+(_.*)?)="(?P<value>.*)"$')
+ unset_re = re.compile(r'^unset (?P<var>\w+)$')
+ lastline = None
+ values = {}
+ for line in bbenv.splitlines():
+ match = var_re.match(line)
+ val = None
+ if match:
+ val ='value')
+ else:
+ match = unset_re.match(line)
+ if match:
+ # Handle [unexport] variables
+ if lastline.startswith('# "'):
+ val = lastline.split('"')[1]
+ if val:
+ var ='var')
+ if variables is None:
+ values[var] = val
+ else:
+ if var in variables:
+ values[var] = val
+ variables.remove(var)
+ # Stop after all required variables have been found
+ if not variables:
+ break
+ lastline = line
+ if variables:
+ # Fill in missing values
+ for var in variables:
+ values[var] = None
+ return values
+def get_bb_var(var, target=None, postconfig=None):
+ return get_bb_vars([var], target, postconfig)[var]
+def get_test_layer():
+ layers = get_bb_var("BBLAYERS").split()
+ testlayer = None
+ for l in layers:
+ if '~' in l:
+ l = os.path.expanduser(l)
+ if "/meta-selftest" in l and os.path.isdir(l):
+ testlayer = l
+ break
+ return testlayer
+def create_temp_layer(templayerdir, templayername, priority=999, recipepathspec='recipes-*/*'):
+ os.makedirs(os.path.join(templayerdir, 'conf'))
+ with open(os.path.join(templayerdir, 'conf', 'layer.conf'), 'w') as f:
+ f.write('BBPATH .= ":${LAYERDIR}"\n')
+ f.write('BBFILES += "${LAYERDIR}/%s/*.bb \\' % recipepathspec)
+ f.write(' ${LAYERDIR}/%s/*.bbappend"\n' % recipepathspec)
+ f.write('BBFILE_COLLECTIONS += "%s"\n' % templayername)
+ f.write('BBFILE_PATTERN_%s = "^${LAYERDIR}/"\n' % templayername)
+ f.write('BBFILE_PRIORITY_%s = "%d"\n' % (templayername, priority))
+ f.write('BBFILE_PATTERN_IGNORE_EMPTY_%s = "1"\n' % templayername)
+ f.write('LAYERSERIES_COMPAT_%s = "${LAYERSERIES_COMPAT_core}"\n' % templayername)
+def runqemu(pn, ssh=True, runqemuparams='', image_fstype=None, launch_cmd=None, qemuparams=None, overrides={}, discard_writes=True):
+ """
+ launch_cmd means directly run the command, don't need set rootfs or env vars.
+ """
+ import bb.tinfoil
+ import
+ # Need a non-'BitBake' logger to capture the runner output
+ targetlogger = logging.getLogger('TargetRunner')
+ targetlogger.setLevel(logging.DEBUG)
+ handler = logging.StreamHandler(sys.stdout)
+ targetlogger.addHandler(handler)
+ tinfoil = bb.tinfoil.Tinfoil()
+ tinfoil.prepare(config_only=False, quiet=True)
+ try:
+ tinfoil.logger.setLevel(logging.WARNING)
+ import oeqa.targetcontrol
+ tinfoil.config_data.setVar("TEST_LOG_DIR", "${WORKDIR}/testimage")
+ tinfoil.config_data.setVar("TEST_QEMUBOOT_TIMEOUT", "1000")
+ # Tell QemuTarget() whether need find rootfs/kernel or not
+ if launch_cmd:
+ tinfoil.config_data.setVar("FIND_ROOTFS", '0')
+ else:
+ tinfoil.config_data.setVar("FIND_ROOTFS", '1')
+ recipedata = tinfoil.parse_recipe(pn)
+ for key, value in overrides.items():
+ recipedata.setVar(key, value)
+ logdir = recipedata.getVar("TEST_LOG_DIR")
+ qemu = oeqa.targetcontrol.QemuTarget(recipedata, targetlogger, image_fstype)
+ finally:
+ # We need to shut down tinfoil early here in case we actually want
+ # to run tinfoil-using utilities with the running QEMU instance.
+ # Luckily QemuTarget doesn't need it after the constructor.
+ tinfoil.shutdown()
+ try:
+ qemu.deploy()
+ try:
+ qemu.start(params=qemuparams, ssh=ssh, runqemuparams=runqemuparams, launch_cmd=launch_cmd, discard_writes=discard_writes)
+ except
+ msg = 'Failed to start QEMU - see the logs in %s' % logdir
+ if os.path.exists(qemu.qemurunnerlog):
+ with open(qemu.qemurunnerlog, 'r') as f:
+ msg = msg + "Qemurunner log output from %s:\n%s" % (qemu.qemurunnerlog,
+ raise Exception(msg)
+ yield qemu
+ finally:
+ targetlogger.removeHandler(handler)
+ try:
+ qemu.stop()
+ except:
+ pass
+def updateEnv(env_file):
+ """
+ Source a file and update environment.
+ """
+ cmd = ". %s; env -0" % env_file
+ result = runCmd(cmd)
+ for line in result.output.split("\0"):
+ (key, _, value) = line.partition("=")
+ os.environ[key] = value
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..d8768969
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,295 @@
+# Copyright (C) 2013 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+# Some custom decorators that can be used by unittests
+# Most useful is skipUnlessPassed which can be used for
+# creating dependecies between two test methods.
+import os
+import logging
+import sys
+import unittest
+import threading
+import signal
+from functools import wraps
+#get the "result" object from one of the upper frames provided that one of these upper frames is a frame
+class getResults(object):
+ def __init__(self):
+ #dynamically determine the frame and use it to get the name of the test method
+ ident = threading.current_thread().ident
+ upperf = sys._current_frames()[ident]
+ while (upperf.f_globals['__name__'] != ''):
+ upperf = upperf.f_back
+ def handleList(items):
+ ret = []
+ # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception())
+ for i in items:
+ s = i[0].id()
+ #Handle the _ErrorHolder objects from skipModule failures
+ if "setUpModule (" in s:
+ ret.append(s.replace("setUpModule (", "").replace(")",""))
+ else:
+ ret.append(s)
+ # Append also the test without the full path
+ testname = s.split('.')[-1]
+ if testname:
+ ret.append(testname)
+ return ret
+ self.faillist = handleList(upperf.f_locals['result'].failures)
+ self.errorlist = handleList(upperf.f_locals['result'].errors)
+ self.skiplist = handleList(upperf.f_locals['result'].skipped)
+ def getFailList(self):
+ return self.faillist
+ def getErrorList(self):
+ return self.errorlist
+ def getSkipList(self):
+ return self.skiplist
+class skipIfFailure(object):
+ def __init__(self,testcase):
+ self.testcase = testcase
+ def __call__(self,f):
+ @wraps(f)
+ def wrapped_f(*args, **kwargs):
+ res = getResults()
+ if self.testcase in (res.getFailList() or res.getErrorList()):
+ raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
+ return f(*args, **kwargs)
+ wrapped_f.__name__ = f.__name__
+ return wrapped_f
+class skipIfSkipped(object):
+ def __init__(self,testcase):
+ self.testcase = testcase
+ def __call__(self,f):
+ @wraps(f)
+ def wrapped_f(*args, **kwargs):
+ res = getResults()
+ if self.testcase in res.getSkipList():
+ raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
+ return f(*args, **kwargs)
+ wrapped_f.__name__ = f.__name__
+ return wrapped_f
+class skipUnlessPassed(object):
+ def __init__(self,testcase):
+ self.testcase = testcase
+ def __call__(self,f):
+ @wraps(f)
+ def wrapped_f(*args, **kwargs):
+ res = getResults()
+ if self.testcase in res.getSkipList() or \
+ self.testcase in res.getFailList() or \
+ self.testcase in res.getErrorList():
+ raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
+ return f(*args, **kwargs)
+ wrapped_f.__name__ = f.__name__
+ wrapped_f._depends_on = self.testcase
+ return wrapped_f
+class testcase(object):
+ def __init__(self, test_case):
+ self.test_case = test_case
+ def __call__(self, func):
+ @wraps(func)
+ def wrapped_f(*args, **kwargs):
+ return func(*args, **kwargs)
+ wrapped_f.test_case = self.test_case
+ wrapped_f.__name__ = func.__name__
+ return wrapped_f
+class NoParsingFilter(logging.Filter):
+ def filter(self, record):
+ return record.levelno == 100
+import inspect
+def LogResults(original_class):
+ orig_method =
+ from time import strftime, gmtime
+ caller = os.path.basename(sys.argv[0])
+ timestamp = strftime('%Y%m%d%H%M%S',gmtime())
+ logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log')
+ linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log')
+ def get_class_that_defined_method(meth):
+ if inspect.ismethod(meth):
+ for cls in inspect.getmro(meth.__self__.__class__):
+ if cls.__dict__.get(meth.__name__) is meth:
+ return cls
+ meth = meth.__func__ # fallback to __qualname__ parsing
+ if inspect.isfunction(meth):
+ cls = getattr(inspect.getmodule(meth),
+ meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0])
+ if isinstance(cls, type):
+ return cls
+ return None
+ #rewrite the run method of unittest.TestCase to add testcase logging
+ def run(self, result, *args, **kws):
+ orig_method(self, result, *args, **kws)
+ passed = True
+ testMethod = getattr(self, self._testMethodName)
+ #if test case is decorated then use it's number, else use it's name
+ try:
+ test_case = testMethod.test_case
+ except AttributeError:
+ test_case = self._testMethodName
+ class_name = str(get_class_that_defined_method(testMethod)).split("'")[1]
+ #create custom logging level for filtering.
+ custom_log_level = 100
+ logging.addLevelName(custom_log_level, 'RESULTS')
+ def results(self, message, *args, **kws):
+ if self.isEnabledFor(custom_log_level):
+ self.log(custom_log_level, message, *args, **kws)
+ logging.Logger.results = results
+ logging.basicConfig(filename=logfile,
+ filemode='w',
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt='%H:%M:%S',
+ level=custom_log_level)
+ for handler in logging.root.handlers:
+ handler.addFilter(NoParsingFilter())
+ local_log = logging.getLogger(caller)
+ #check status of tests and record it
+ tcid =
+ for (name, msg) in result.errors:
+ if tcid ==
+ local_log.results("Testcase "+str(test_case)+": ERROR")
+ local_log.results("Testcase "+str(test_case)+":\n"+msg)
+ passed = False
+ for (name, msg) in result.failures:
+ if tcid ==
+ local_log.results("Testcase "+str(test_case)+": FAILED")
+ local_log.results("Testcase "+str(test_case)+":\n"+msg)
+ passed = False
+ for (name, msg) in result.skipped:
+ if tcid ==
+ local_log.results("Testcase "+str(test_case)+": SKIPPED")
+ passed = False
+ if passed:
+ local_log.results("Testcase "+str(test_case)+": PASSED")
+ # XXX: In order to avoid race condition when test if exists the linkfile
+ # use bb.utils.lock, the best solution is to create a unique name for the
+ # link file.
+ try:
+ import bb
+ has_bb = True
+ lockfilename = linkfile + '.lock'
+ except ImportError:
+ has_bb = False
+ if has_bb:
+ lf = bb.utils.lockfile(lockfilename, block=True)
+ # Create symlink to the current log
+ if os.path.lexists(linkfile):
+ os.remove(linkfile)
+ os.symlink(logfile, linkfile)
+ if has_bb:
+ bb.utils.unlockfile(lf)
+ = run
+ return original_class
+class TimeOut(BaseException):
+ pass
+def timeout(seconds):
+ def decorator(fn):
+ if hasattr(signal, 'alarm'):
+ @wraps(fn)
+ def wrapped_f(*args, **kw):
+ current_frame = sys._getframe()
+ def raiseTimeOut(signal, frame):
+ if frame is not current_frame:
+ raise TimeOut('%s seconds' % seconds)
+ prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
+ try:
+ signal.alarm(seconds)
+ return fn(*args, **kw)
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, prev_handler)
+ return wrapped_f
+ else:
+ return fn
+ return decorator
+__tag_prefix = "tag__"
+def tag(*args, **kwargs):
+ """Decorator that adds attributes to classes or functions
+ for use with the Attribute (-a) plugin.
+ """
+ def wrap_ob(ob):
+ for name in args:
+ setattr(ob, __tag_prefix + name, True)
+ for name, value in kwargs.items():
+ setattr(ob, __tag_prefix + name, value)
+ return ob
+ return wrap_ob
+def gettag(obj, key, default=None):
+ key = __tag_prefix + key
+ if not isinstance(obj, unittest.TestCase):
+ return getattr(obj, key, default)
+ tc_method = getattr(obj, obj._testMethodName)
+ ret = getattr(tc_method, key, getattr(obj, key, default))
+ return ret
+def getAllTags(obj):
+ def __gettags(o):
+ r = {k[len(__tag_prefix):]:getattr(o,k) for k in dir(o) if k.startswith(__tag_prefix)}
+ return r
+ if not isinstance(obj, unittest.TestCase):
+ return __gettags(obj)
+ tc_method = getattr(obj, obj._testMethodName)
+ ret = __gettags(obj)
+ ret.update(__gettags(tc_method))
+ return ret
+def timeout_handler(seconds):
+ def decorator(fn):
+ if hasattr(signal, 'alarm'):
+ @wraps(fn)
+ def wrapped_f(self, *args, **kw):
+ current_frame = sys._getframe()
+ def raiseTimeOut(signal, frame):
+ if frame is not current_frame:
+ try:
+ raise TimeOut('%s seconds' % seconds)
+ except:
+ raise TimeOut('%s seconds' % seconds)
+ prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
+ try:
+ signal.alarm(seconds)
+ return fn(self, *args, **kw)
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, prev_handler)
+ return wrapped_f
+ else:
+ return fn
+ return decorator
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..79c22b75
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,91 @@
+import os
+import sys
+import errno
+import datetime
+import itertools
+from .commands import runCmd
+class BaseDumper(object):
+ """ Base class to dump commands from host/target """
+ def __init__(self, cmds, parent_dir):
+ self.cmds = []
+ # Some testing doesn't inherit testimage, so it is needed
+ # to set some defaults.
+ self.parent_dir = parent_dir
+ dft_cmds = """ top -bn1
+ iostat -x -z -N -d -p ALL 20 2
+ ps -ef
+ free
+ df
+ memstat
+ dmesg
+ ip -s link
+ netstat -an"""
+ if not cmds:
+ cmds = dft_cmds
+ for cmd in cmds.split('\n'):
+ cmd = cmd.lstrip()
+ if not cmd or cmd[0] == '#':
+ continue
+ self.cmds.append(cmd)
+ def create_dir(self, dir_suffix):
+ dump_subdir = ("%s_%s" % (
+ dir_suffix))
+ dump_dir = os.path.join(self.parent_dir, dump_subdir)
+ try:
+ os.makedirs(dump_dir)
+ except OSError as err:
+ if err.errno != errno.EEXIST:
+ raise err
+ self.dump_dir = dump_dir
+ def _write_dump(self, command, output):
+ if isinstance(self, HostDumper):
+ prefix = "host"
+ elif isinstance(self, TargetDumper):
+ prefix = "target"
+ else:
+ prefix = "unknown"
+ for i in itertools.count():
+ filename = "%s_%02d_%s" % (prefix, i, command)
+ fullname = os.path.join(self.dump_dir, filename)
+ if not os.path.exists(fullname):
+ break
+ with open(fullname, 'w') as dump_file:
+ dump_file.write(output)
+class HostDumper(BaseDumper):
+ """ Class to get dumps from the host running the tests """
+ def __init__(self, cmds, parent_dir):
+ super(HostDumper, self).__init__(cmds, parent_dir)
+ def dump_host(self, dump_dir=""):
+ if dump_dir:
+ self.dump_dir = dump_dir
+ for cmd in self.cmds:
+ result = runCmd(cmd, ignore_status=True)
+ self._write_dump(cmd.split()[0], result.output)
+class TargetDumper(BaseDumper):
+ """ Class to get dumps from target, it only works with QemuRunner """
+ def __init__(self, cmds, parent_dir, runner):
+ super(TargetDumper, self).__init__(cmds, parent_dir)
+ self.runner = runner
+ def dump_target(self, dump_dir=""):
+ if dump_dir:
+ self.dump_dir = dump_dir
+ for cmd in self.cmds:
+ # We can continue with the testing if serial commands fail
+ try:
+ (status, output) = self.runner.run_serial(cmd)
+ self._write_dump(cmd.split()[0], output)
+ except:
+ print("Tried to dump info from target but "
+ "serial console failed")
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..a7233d4c
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,46 @@
+import os
+import re
+import errno
+def write_file(path, data):
+ # In case data is None, return immediately
+ if data is None:
+ return
+ wdata = data.rstrip() + "\n"
+ with open(path, "w") as f:
+ f.write(wdata)
+def append_file(path, data):
+ # In case data is None, return immediately
+ if data is None:
+ return
+ wdata = data.rstrip() + "\n"
+ with open(path, "a") as f:
+ f.write(wdata)
+def read_file(path):
+ data = None
+ with open(path) as f:
+ data =
+ return data
+def remove_from_file(path, data):
+ # In case data is None, return immediately
+ if data is None:
+ return
+ try:
+ rdata = read_file(path)
+ except IOError as e:
+ # if file does not exit, just quit, otherwise raise an exception
+ if e.errno == errno.ENOENT:
+ return
+ else:
+ raise
+ contents = rdata.strip().splitlines()
+ for r in data.strip().splitlines():
+ try:
+ contents.remove(r)
+ except ValueError:
+ pass
+ write_file(path, "\n".join(contents))
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..757e3f0c
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,80 @@
+# Copyright (C) 2016 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+"""Git repository interactions"""
+import os
+from oeqa.utils.commands import runCmd
+class GitError(Exception):
+ """Git error handling"""
+ pass
+class GitRepo(object):
+ """Class representing a Git repository clone"""
+ def __init__(self, path, is_topdir=False):
+ git_dir = self._run_git_cmd_at(['rev-parse', '--git-dir'], path)
+ git_dir = git_dir if os.path.isabs(git_dir) else os.path.join(path, git_dir)
+ self.git_dir = os.path.realpath(git_dir)
+ if self._run_git_cmd_at(['rev-parse', '--is-bare-repository'], path) == 'true':
+ self.bare = True
+ self.top_dir = self.git_dir
+ else:
+ self.bare = False
+ self.top_dir = self._run_git_cmd_at(['rev-parse', '--show-toplevel'],
+ path)
+ realpath = os.path.realpath(path)
+ if is_topdir and realpath != self.top_dir:
+ raise GitError("{} is not a Git top directory".format(realpath))
+ @staticmethod
+ def _run_git_cmd_at(git_args, cwd, **kwargs):
+ """Run git command at a specified directory"""
+ git_cmd = 'git ' if isinstance(git_args, str) else ['git']
+ git_cmd += git_args
+ ret = runCmd(git_cmd, ignore_status=True, cwd=cwd, **kwargs)
+ if ret.status:
+ cmd_str = git_cmd if isinstance(git_cmd, str) \
+ else ' '.join(git_cmd)
+ raise GitError("'{}' failed with exit code {}: {}".format(
+ cmd_str, ret.status, ret.output))
+ return ret.output.strip()
+ @staticmethod
+ def init(path, bare=False):
+ """Initialize a new Git repository"""
+ cmd = ['init']
+ if bare:
+ cmd.append('--bare')
+ GitRepo._run_git_cmd_at(cmd, cwd=path)
+ return GitRepo(path, is_topdir=True)
+ def run_cmd(self, git_args, env_update=None):
+ """Run Git command"""
+ env = None
+ if env_update:
+ env = os.environ.copy()
+ env.update(env_update)
+ return self._run_git_cmd_at(git_args, self.top_dir, env=env)
+ def rev_parse(self, revision):
+ """Do git rev-parse"""
+ try:
+ return self.run_cmd(['rev-parse', '--verify', revision])
+ except GitError:
+ # Revision does not exist
+ return None
+ def get_current_branch(self):
+ """Get current branch"""
+ try:
+ # Strip 11 chars, i.e. 'refs/heads' from the beginning
+ return self.run_cmd(['symbolic-ref', 'HEAD'])[11:]
+ except GitError:
+ return None
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..9520b2e1
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,244 @@
+# Helper functions for committing data to git and pushing upstream
+# Copyright (c) 2017, Intel Corporation.
+# Copyright (c) 2019, Linux Foundation
+# This program is free software; you can redistribute it and/or modify it
+# under the terms and conditions of the GNU General Public License,
+# version 2, as published by the Free Software Foundation.
+# This program is distributed in the hope it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+import os
+import re
+import sys
+from operator import attrgetter
+from collections import namedtuple
+from oeqa.utils.git import GitRepo, GitError
+class ArchiveError(Exception):
+ """Internal error handling of this script"""
+def format_str(string, fields):
+ """Format string using the given fields (dict)"""
+ try:
+ return string.format(**fields)
+ except KeyError as err:
+ raise ArchiveError("Unable to expand string '{}': unknown field {} "
+ "(valid fields are: {})".format(
+ string, err, ', '.join(sorted(fields.keys()))))
+def init_git_repo(path, no_create, bare, log):
+ """Initialize local Git repository"""
+ path = os.path.abspath(path)
+ if os.path.isfile(path):
+ raise ArchiveError("Invalid Git repo at {}: path exists but is not a "
+ "directory".format(path))
+ if not os.path.isdir(path) or not os.listdir(path):
+ if no_create:
+ raise ArchiveError("No git repo at {}, refusing to create "
+ "one".format(path))
+ if not os.path.isdir(path):
+ try:
+ os.mkdir(path)
+ except (FileNotFoundError, PermissionError) as err:
+ raise ArchiveError("Failed to mkdir {}: {}".format(path, err))
+ if not os.listdir(path):
+"Initializing a new Git repo at %s", path)
+ repo = GitRepo.init(path, bare)
+ try:
+ repo = GitRepo(path, is_topdir=True)
+ except GitError:
+ raise ArchiveError("Non-empty directory that is not a Git repository "
+ "at {}\nPlease specify an existing Git repository, "
+ "an empty directory or a non-existing directory "
+ "path.".format(path))
+ return repo
+def git_commit_data(repo, data_dir, branch, message, exclude, notes, log):
+ """Commit data into a Git repository"""
+"Committing data into to branch %s", branch)
+ tmp_index = os.path.join(repo.git_dir, 'index.oe-git-archive')
+ try:
+ # Create new tree object from the data
+ env_update = {'GIT_INDEX_FILE': tmp_index,
+ 'GIT_WORK_TREE': os.path.abspath(data_dir)}
+ repo.run_cmd('add .', env_update)
+ # Remove files that are excluded
+ if exclude:
+ repo.run_cmd(['rm', '--cached'] + [f for f in exclude], env_update)
+ tree = repo.run_cmd('write-tree', env_update)
+ # Create new commit object from the tree
+ parent = repo.rev_parse(branch)
+ if not parent:
+ parent = repo.rev_parse("origin/" + branch)
+ git_cmd = ['commit-tree', tree, '-m', message]
+ if parent:
+ git_cmd += ['-p', parent]
+ commit = repo.run_cmd(git_cmd, env_update)
+ # Create git notes
+ for ref, filename in notes:
+ ref = ref.format(branch_name=branch)
+ repo.run_cmd(['notes', '--ref', ref, 'add',
+ '-F', os.path.abspath(filename), commit])
+ # Update branch head
+ git_cmd = ['update-ref', 'refs/heads/' + branch, commit]
+ repo.run_cmd(git_cmd)
+ # Update current HEAD, if we're on branch 'branch'
+ if not repo.bare and repo.get_current_branch() == branch:
+"Updating %s HEAD to latest commit", repo.top_dir)
+ repo.run_cmd('reset --hard')
+ return commit
+ finally:
+ if os.path.exists(tmp_index):
+ os.unlink(tmp_index)
+def expand_tag_strings(repo, name_pattern, msg_subj_pattern, msg_body_pattern,
+ keywords):
+ """Generate tag name and message, with support for running id number"""
+ keyws = keywords.copy()
+ # Tag number is handled specially: if not defined, we autoincrement it
+ if 'tag_number' not in keyws:
+ # Fill in all other fields than 'tag_number'
+ keyws['tag_number'] = '{tag_number}'
+ tag_re = format_str(name_pattern, keyws)
+ # Replace parentheses for proper regex matching
+ tag_re = tag_re.replace('(', '\(').replace(')', '\)') + '$'
+ # Inject regex group pattern for 'tag_number'
+ tag_re = tag_re.format(tag_number='(?P<tag_number>[0-9]{1,5})')
+ keyws['tag_number'] = 0
+ for existing_tag in repo.run_cmd('tag').splitlines():
+ match = re.match(tag_re, existing_tag)
+ if match and int('tag_number')) >= keyws['tag_number']:
+ keyws['tag_number'] = int('tag_number')) + 1
+ tag_name = format_str(name_pattern, keyws)
+ msg_subj= format_str(msg_subj_pattern.strip(), keyws)
+ msg_body = format_str(msg_body_pattern, keyws)
+ return tag_name, msg_subj + '\n\n' + msg_body
+def gitarchive(data_dir, git_dir, no_create, bare, commit_msg_subject, commit_msg_body, branch_name, no_tag, tagname, tag_msg_subject, tag_msg_body, exclude, notes, push, keywords, log):
+ if not os.path.isdir(data_dir):
+ raise ArchiveError("Not a directory: {}".format(data_dir))
+ data_repo = init_git_repo(git_dir, no_create, bare, log)
+ # Expand strings early in order to avoid getting into inconsistent
+ # state (e.g. no tag even if data was committed)
+ commit_msg = format_str(commit_msg_subject.strip(), keywords)
+ commit_msg += '\n\n' + format_str(commit_msg_body, keywords)
+ branch_name = format_str(branch_name, keywords)
+ tag_name = None
+ if not no_tag and tagname:
+ tag_name, tag_msg = expand_tag_strings(data_repo, tagname,
+ tag_msg_subject,
+ tag_msg_body, keywords)
+ # Commit data
+ commit = git_commit_data(data_repo, data_dir, branch_name,
+ commit_msg, exclude, notes, log)
+ # Create tag
+ if tag_name:
+"Creating tag %s", tag_name)
+ data_repo.run_cmd(['tag', '-a', '-m', tag_msg, tag_name, commit])
+ # Push data to remote
+ if push:
+ cmd = ['push', '--tags']
+ # If no remote is given we push with the default settings from
+ # gitconfig
+ if push is not True:
+ notes_refs = ['refs/notes/' + ref.format(branch_name=branch_name)
+ for ref, _ in notes]
+ cmd.extend([push, branch_name] + notes_refs)
+"Pushing data to remote")
+ data_repo.run_cmd(cmd)
+# Container class for tester revisions
+TestedRev = namedtuple('TestedRev', 'commit commit_number tags')
+def get_test_runs(log, repo, tag_name, **kwargs):
+ """Get a sorted list of test runs, matching given pattern"""
+ # First, get field names from the tag name pattern
+ field_names = [ for m in re.finditer(r'{(\w+)}', tag_name)]
+ undef_fields = [f for f in field_names if f not in kwargs.keys()]
+ # Fields for formatting tag name pattern
+ str_fields = dict([(f, '*') for f in field_names])
+ str_fields.update(kwargs)
+ # Get a list of all matching tags
+ tag_pattern = tag_name.format(**str_fields)
+ tags = repo.run_cmd(['tag', '-l', tag_pattern]).splitlines()
+ log.debug("Found %d tags matching pattern '%s'", len(tags), tag_pattern)
+ # Parse undefined fields from tag names
+ str_fields = dict([(f, r'(?P<{}>[\w\-.()]+)'.format(f)) for f in field_names])
+ str_fields['branch'] = r'(?P<branch>[\w\-.()/]+)'
+ str_fields['commit'] = '(?P<commit>[0-9a-f]{7,40})'
+ str_fields['commit_number'] = '(?P<commit_number>[0-9]{1,7})'
+ str_fields['tag_number'] = '(?P<tag_number>[0-9]{1,5})'
+ # escape parenthesis in fields in order to not messa up the regexp
+ fixed_fields = dict([(k, v.replace('(', r'\(').replace(')', r'\)')) for k, v in kwargs.items()])
+ str_fields.update(fixed_fields)
+ tag_re = re.compile(tag_name.format(**str_fields))
+ # Parse fields from tags
+ revs = []
+ for tag in tags:
+ m = tag_re.match(tag)
+ groups = m.groupdict()
+ revs.append([groups[f] for f in undef_fields] + [tag])
+ # Return field names and a sorted list of revs
+ return undef_fields, sorted(revs)
+def get_test_revs(log, repo, tag_name, **kwargs):
+ """Get list of all tested revisions"""
+ fields, runs = get_test_runs(log, repo, tag_name, **kwargs)
+ revs = {}
+ commit_i = fields.index('commit')
+ commit_num_i = fields.index('commit_number')
+ for run in runs:
+ commit = run[commit_i]
+ commit_num = run[commit_num_i]
+ tag = run[-1]
+ if not commit in revs:
+ revs[commit] = TestedRev(commit, commit_num, [tag])
+ else:
+ assert commit_num == revs[commit].commit_number, "Commit numbers do not match"
+ revs[commit].tags.append(tag)
+ # Return in sorted table
+ revs = sorted(revs.values(), key=attrgetter('commit_number'))
+ log.debug("Found %d tested revisions:\n %s", len(revs),
+ "\n ".join(['{} ({})'.format(rev.commit_number, rev.commit) for rev in revs]))
+ return revs
+def rev_find(revs, attr, val):
+ """Search from a list of TestedRev"""
+ for i, rev in enumerate(revs):
+ if getattr(rev, attr) == val:
+ return i
+ raise ValueError("Unable to find '{}' value '{}'".format(attr, val))
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..a48d4994
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,57 @@
+import http.server
+import multiprocessing
+import os
+import traceback
+import signal
+from socketserver import ThreadingMixIn
+class HTTPServer(ThreadingMixIn, http.server.HTTPServer):
+ def server_start(self, root_dir, logger):
+ os.chdir(root_dir)
+ self.serve_forever()
+class HTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
+ def log_message(self, format_str, *args):
+ pass
+class HTTPService(object):
+ def __init__(self, root_dir, host='', logger=None):
+ self.root_dir = root_dir
+ = host
+ self.port = 0
+ self.logger = logger
+ def start(self):
+ if not os.path.exists(self.root_dir):
+"Not starting HTTPService for directory %s which doesn't exist" % (self.root_dir))
+ return
+ self.server = HTTPServer((, self.port), HTTPRequestHandler)
+ if self.port == 0:
+ self.port = self.server.server_port
+ self.process = multiprocessing.Process(target=self.server.server_start, args=[self.root_dir, self.logger])
+ # The signal handler from testimage.bbclass can cause deadlocks here
+ # if the HTTPServer is terminated before it can restore the standard
+ #signal behaviour
+ orig = signal.getsignal(signal.SIGTERM)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ self.process.start()
+ signal.signal(signal.SIGTERM, orig)
+ if self.logger:
+"Started HTTPService on %s:%s" % (, self.port))
+ def stop(self):
+ if hasattr(self, "server"):
+ self.server.server_close()
+ if hasattr(self, "process"):
+ self.process.terminate()
+ self.process.join()
+ if self.logger:
+"Stopped HTTPService on %s:%s" % (, self.port))
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..32fde14a
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+import sys
+import os
+import re
+# A parser that can be used to identify weather a line is a test result or a section statement.
+class PtestParser(object):
+ def __init__(self):
+ self.results = {}
+ self.sections = {}
+ def parse(self, logfile):
+ test_regex = {}
+ test_regex['PASSED'] = re.compile(r"^PASS:(.+)")
+ test_regex['FAILED'] = re.compile(r"^FAIL:(.+)")
+ test_regex['SKIPPED'] = re.compile(r"^SKIP:(.+)")
+ section_regex = {}
+ section_regex['begin'] = re.compile(r"^BEGIN: .*/(.+)/ptest")
+ section_regex['end'] = re.compile(r"^END: .*/(.+)/ptest")
+ section_regex['duration'] = re.compile(r"^DURATION: (.+)")
+ section_regex['exitcode'] = re.compile(r"^ERROR: Exit status is (.+)")
+ section_regex['timeout'] = re.compile(r"^TIMEOUT: .*/(.+)/ptest")
+ def newsection():
+ return { 'name': "No-section", 'log': "" }
+ current_section = newsection()
+ with open(logfile, errors='replace') as f:
+ for line in f:
+ result = section_regex['begin'].search(line)
+ if result:
+ current_section['name'] =
+ continue
+ result = section_regex['end'].search(line)
+ if result:
+ if current_section['name'] !=
+ bb.warn("Ptest END log section mismatch %s vs. %s" % (current_section['name'],
+ if current_section['name'] in self.sections:
+ bb.warn("Ptest duplicate section for %s" % (current_section['name']))
+ self.sections[current_section['name']] = current_section
+ del self.sections[current_section['name']]['name']
+ current_section = newsection()
+ continue
+ result = section_regex['timeout'].search(line)
+ if result:
+ if current_section['name'] !=
+ bb.warn("Ptest TIMEOUT log section mismatch %s vs. %s" % (current_section['name'],
+ current_section['timeout'] = True
+ continue
+ for t in ['duration', 'exitcode']:
+ result = section_regex[t].search(line)
+ if result:
+ current_section[t] =
+ continue
+ current_section['log'] = current_section['log'] + line
+ for t in test_regex:
+ result = test_regex[t].search(line)
+ if result:
+ if current_section['name'] not in self.results:
+ self.results[current_section['name']] = {}
+ self.results[current_section['name']][] = t
+ return self.results, self.sections
+ # Log the results as files. The file name is the section name and the contents are the tests in that section.
+ def results_as_files(self, target_dir):
+ if not os.path.exists(target_dir):
+ raise Exception("Target directory does not exist: %s" % target_dir)
+ for section in self.results:
+ prefix = 'No-section'
+ if section:
+ prefix = section
+ section_file = os.path.join(target_dir, prefix)
+ # purge the file contents if it exists
+ with open(section_file, 'w') as f:
+ for test_name in sorted(self.results[section]):
+ status = self.results[section][test_name]
+ f.write(status + ": " + test_name + "\n")
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..b7def772
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,121 @@
+# Copyright (C) 2016 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+# Functions to get metadata from the testing host used
+# for analytics of test results.
+from collections import OrderedDict
+from import MutableMapping
+from xml.dom.minidom import parseString
+from xml.etree.ElementTree import Element, tostring
+from oe.lsb import get_os_release
+from oeqa.utils.commands import runCmd, get_bb_vars
+def metadata_from_bb():
+ """ Returns test's metadata as OrderedDict.
+ Data will be gathered using bitbake -e thanks to get_bb_vars.
+ """
+ metadata_config_vars = ('MACHINE', 'BB_NUMBER_THREADS', 'PARALLEL_MAKE')
+ info_dict = OrderedDict()
+ hostname = runCmd('hostname')
+ info_dict['hostname'] = hostname.output
+ data_dict = get_bb_vars()
+ # Distro information
+ info_dict['distro'] = {'id': data_dict['DISTRO'],
+ 'version_id': data_dict['DISTRO_VERSION'],
+ 'pretty_name': '%s %s' % (data_dict['DISTRO'], data_dict['DISTRO_VERSION'])}
+ # Host distro information
+ os_release = get_os_release()
+ if os_release:
+ info_dict['host_distro'] = OrderedDict()
+ for key in ('ID', 'VERSION_ID', 'PRETTY_NAME'):
+ if key in os_release:
+ info_dict['host_distro'][key.lower()] = os_release[key]
+ info_dict['layers'] = get_layers(data_dict['BBLAYERS'])
+ info_dict['bitbake'] = git_rev_info(os.path.dirname(bb.__file__))
+ info_dict['config'] = OrderedDict()
+ for var in sorted(metadata_config_vars):
+ info_dict['config'][var] = data_dict[var]
+ return info_dict
+def metadata_from_data_store(d):
+ """ Returns test's metadata as OrderedDict.
+ Data will be collected from the provided data store.
+ """
+ # TODO: Getting metadata from the data store would
+ # be useful when running within bitbake.
+ pass
+def git_rev_info(path):
+ """Get git revision information as a dict"""
+ info = OrderedDict()
+ try:
+ from git import Repo, InvalidGitRepositoryError, NoSuchPathError
+ except ImportError:
+ import subprocess
+ try:
+ info['branch'] = subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=path).decode('utf-8').strip()
+ except subprocess.CalledProcessError:
+ pass
+ try:
+ info['commit'] = subprocess.check_output(["git", "rev-parse", "HEAD"], cwd=path).decode('utf-8').strip()
+ except subprocess.CalledProcessError:
+ pass
+ return info
+ try:
+ repo = Repo(path, search_parent_directories=True)
+ except (InvalidGitRepositoryError, NoSuchPathError):
+ return info
+ info['commit'] = repo.head.commit.hexsha
+ info['commit_count'] = repo.head.commit.count()
+ try:
+ info['branch'] =
+ except TypeError:
+ info['branch'] = '(nobranch)'
+ return info
+def get_layers(layers):
+ """Returns layer information in dict format"""
+ layer_dict = OrderedDict()
+ for layer in layers.split():
+ layer_name = os.path.basename(layer)
+ layer_dict[layer_name] = git_rev_info(layer)
+ return layer_dict
+def write_metadata_file(file_path, metadata):
+ """ Writes metadata to a XML file in directory. """
+ xml = dict_to_XML('metadata', metadata)
+ xml_doc = parseString(tostring(xml).decode('UTF-8'))
+ with open(file_path, 'w') as f:
+ f.write(xml_doc.toprettyxml())
+def dict_to_XML(tag, dictionary, **kwargs):
+ """ Return XML element converting dicts recursively. """
+ elem = Element(tag, **kwargs)
+ for key, val in dictionary.items():
+ if tag == 'layers':
+ child = (dict_to_XML('layer', val, name=key))
+ elif isinstance(val, MutableMapping):
+ child = (dict_to_XML(key, val))
+ else:
+ if tag == 'config':
+ child = Element('variable', name=key)
+ else:
+ child = Element(key)
+ child.text = str(val)
+ elem.append(child)
+ return elem
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..2768f6c5
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,8 @@
+import socket
+def get_free_port():
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.bind(('', 0))
+ addr = s.getsockname()
+ s.close()
+ return addr[1]
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..1495f873
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,213 @@
+import os
+import json
+import shutil
+from oeqa.core.utils.test import getCaseFile, getCaseMethod
+def get_package_manager(d, root_path):
+ """
+ Returns an OE package manager that can install packages in root_path.
+ """
+ from oe.package_manager import RpmPM, OpkgPM, DpkgPM
+ pkg_class = d.getVar("IMAGE_PKGTYPE")
+ if pkg_class == "rpm":
+ pm = RpmPM(d,
+ root_path,
+ d.getVar('TARGET_VENDOR'),
+ filterbydependencies=False)
+ pm.create_configs()
+ elif pkg_class == "ipk":
+ pm = OpkgPM(d,
+ root_path,
+ d.getVar("IPKGCONF_TARGET"),
+ filterbydependencies=False)
+ elif pkg_class == "deb":
+ pm = DpkgPM(d,
+ root_path,
+ d.getVar('PACKAGE_ARCHS'),
+ d.getVar('DPKG_ARCH'),
+ filterbydependencies=False)
+ pm.write_index()
+ pm.update()
+ return pm
+def find_packages_to_extract(test_suite):
+ """
+ Returns packages to extract required by runtime tests.
+ """
+ from oeqa.core.utils.test import getSuiteCasesFiles
+ needed_packages = {}
+ files = getSuiteCasesFiles(test_suite)
+ for f in set(files):
+ json_file = _get_json_file(f)
+ if json_file:
+ needed_packages.update(_get_needed_packages(json_file))
+ return needed_packages
+def _get_json_file(module_path):
+ """
+ Returns the path of the JSON file for a module, empty if doesn't exitst.
+ """
+ json_file = '%s.json' % module_path.rsplit('.', 1)[0]
+ if os.path.isfile(module_path) and os.path.isfile(json_file):
+ return json_file
+ else:
+ return ''
+def _get_needed_packages(json_file, test=None):
+ """
+ Returns a dict with needed packages based on a JSON file.
+ If a test is specified it will return the dict just for that test.
+ """
+ needed_packages = {}
+ with open(json_file) as f:
+ test_packages = json.load(f)
+ for key,value in test_packages.items():
+ needed_packages[key] = value
+ if test:
+ if test in needed_packages:
+ needed_packages = needed_packages[test]
+ else:
+ needed_packages = {}
+ return needed_packages
+def extract_packages(d, needed_packages):
+ """
+ Extract packages that will be needed during runtime.
+ """
+ import bb
+ import oe.path
+ extracted_path = d.getVar('TEST_EXTRACTED_DIR')
+ for key,value in needed_packages.items():
+ packages = ()
+ if isinstance(value, dict):
+ packages = (value, )
+ elif isinstance(value, list):
+ packages = value
+ else:
+ bb.fatal('Failed to process needed packages for %s; '
+ 'Value must be a dict or list' % key)
+ for package in packages:
+ pkg = package['pkg']
+ rm = package.get('rm', False)
+ extract = package.get('extract', True)
+ if extract:
+ #logger.debug(1, 'Extracting %s' % pkg)
+ dst_dir = os.path.join(extracted_path, pkg)
+ # Same package used for more than one test,
+ # don't need to extract again.
+ if os.path.exists(dst_dir):
+ continue
+ # Extract package and copy it to TEST_EXTRACTED_DIR
+ pkg_dir = _extract_in_tmpdir(d, pkg)
+ oe.path.copytree(pkg_dir, dst_dir)
+ shutil.rmtree(pkg_dir)
+ else:
+ #logger.debug(1, 'Copying %s' % pkg)
+ _copy_package(d, pkg)
+def _extract_in_tmpdir(d, pkg):
+ """"
+ Returns path to a temp directory where the package was
+ extracted without dependencies.
+ """
+ from oeqa.utils.package_manager import get_package_manager
+ pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
+ pm = get_package_manager(d, pkg_path)
+ extract_dir = pm.extract(pkg)
+ shutil.rmtree(pkg_path)
+ return extract_dir
+def _copy_package(d, pkg):
+ """
+ Copy the RPM, DEB or IPK package to dst_dir
+ """
+ from oeqa.utils.package_manager import get_package_manager
+ pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
+ dst_dir = d.getVar('TEST_PACKAGED_DIR')
+ pm = get_package_manager(d, pkg_path)
+ pkg_info = pm.package_info(pkg)
+ file_path = pkg_info[pkg]['filepath']
+ shutil.copy2(file_path, dst_dir)
+ shutil.rmtree(pkg_path)
+def install_package(test_case):
+ """
+ Installs package in DUT if required.
+ """
+ needed_packages = test_needs_package(test_case)
+ if needed_packages:
+ _install_uninstall_packages(needed_packages, test_case, True)
+def uninstall_package(test_case):
+ """
+ Uninstalls package in DUT if required.
+ """
+ needed_packages = test_needs_package(test_case)
+ if needed_packages:
+ _install_uninstall_packages(needed_packages, test_case, False)
+def test_needs_package(test_case):
+ """
+ Checks if a test case requires to install/uninstall packages.
+ """
+ test_file = getCaseFile(test_case)
+ json_file = _get_json_file(test_file)
+ if json_file:
+ test_method = getCaseMethod(test_case)
+ needed_packages = _get_needed_packages(json_file, test_method)
+ if needed_packages:
+ return needed_packages
+ return None
+def _install_uninstall_packages(needed_packages, test_case, install=True):
+ """
+ Install/Uninstall packages in the DUT without using a package manager
+ """
+ if isinstance(needed_packages, dict):
+ packages = [needed_packages]
+ elif isinstance(needed_packages, list):
+ packages = needed_packages
+ for package in packages:
+ pkg = package['pkg']
+ rm = package.get('rm', False)
+ extract = package.get('extract', True)
+ src_dir = os.path.join(, pkg)
+ # Install package
+ if install and extract:
+, '/')
+ # Uninstall package
+ elif not install and rm:
+, '/')
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..49564f9a
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,601 @@
+# Copyright (C) 2013 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+# This module provides a class for starting qemu images using runqemu.
+# It's used by testimage.bbclass.
+import subprocess
+import os
+import sys
+import time
+import signal
+import re
+import socket
+import select
+import errno
+import string
+import threading
+import codecs
+import logging
+from oeqa.utils.dump import HostDumper
+# Get Unicode non printable control chars
+control_range = list(range(0,32))+list(range(127,160))
+control_chars = [chr(x) for x in control_range
+ if chr(x) not in string.printable]
+re_control_char = re.compile('[%s]' % re.escape("".join(control_chars)))
+class QemuRunner:
+ def __init__(self, machine, rootfs, display, tmpdir, deploy_dir_image, logfile, boottime, dump_dir, dump_host_cmds, use_kvm, logger):
+ # Popen object for runqemu
+ self.runqemu = None
+ # pid of the qemu process that runqemu will start
+ self.qemupid = None
+ # target ip - from the command line or runqemu output
+ self.ip = None
+ # host ip - where qemu is running
+ self.server_ip = None
+ # target ip netmask
+ self.netmask = None
+ self.machine = machine
+ self.rootfs = rootfs
+ self.display = display
+ self.tmpdir = tmpdir
+ self.deploy_dir_image = deploy_dir_image
+ self.logfile = logfile
+ self.boottime = boottime
+ self.logged = False
+ self.thread = None
+ self.use_kvm = use_kvm
+ self.msg = ''
+ self.runqemutime = 120
+ self.qemu_pidfile = 'pidfile_'+str(os.getpid())
+ self.host_dumper = HostDumper(dump_host_cmds, dump_dir)
+ self.logger = logger
+ def create_socket(self):
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setblocking(0)
+ sock.bind(("",0))
+ sock.listen(2)
+ port = sock.getsockname()[1]
+ self.logger.debug("Created listening socket for qemu serial console on:" % port)
+ return (sock, port)
+ except socket.error:
+ sock.close()
+ raise
+ def log(self, msg):
+ if self.logfile:
+ # It is needed to sanitize the data received from qemu
+ # because is possible to have control characters
+ msg = msg.decode("utf-8", errors='ignore')
+ msg = re_control_char.sub('', msg)
+ self.msg += msg
+ with, "a", encoding="utf-8") as f:
+ f.write("%s" % msg)
+ def getOutput(self, o):
+ import fcntl
+ fl = fcntl.fcntl(o, fcntl.F_GETFL)
+ fcntl.fcntl(o, fcntl.F_SETFL, fl | os.O_NONBLOCK)
+ return, 1000000).decode("utf-8")
+ def handleSIGCHLD(self, signum, frame):
+ if self.runqemu and self.runqemu.poll():
+ if self.runqemu.returncode:
+ self.logger.debug('runqemu exited with code %d' % self.runqemu.returncode)
+ self.logger.debug("Output from runqemu:\n%s" % self.getOutput(self.runqemu.stdout))
+ self.stop()
+ self._dump_host()
+ raise SystemExit
+ def start(self, qemuparams = None, get_ip = True, extra_bootparams = None, runqemuparams='', launch_cmd=None, discard_writes=True):
+ env = os.environ.copy()
+ if self.display:
+ env["DISPLAY"] = self.display
+ # Set this flag so that Qemu doesn't do any grabs as SDL grabs
+ # interact badly with screensavers.
+ env["QEMU_DONT_GRAB"] = "1"
+ if not os.path.exists(self.rootfs):
+ self.logger.error("Invalid rootfs %s" % self.rootfs)
+ return False
+ if not os.path.exists(self.tmpdir):
+ self.logger.error("Invalid TMPDIR path %s" % self.tmpdir)
+ return False
+ else:
+ env["OE_TMPDIR"] = self.tmpdir
+ if not os.path.exists(self.deploy_dir_image):
+ self.logger.error("Invalid DEPLOY_DIR_IMAGE path %s" % self.deploy_dir_image)
+ return False
+ else:
+ env["DEPLOY_DIR_IMAGE"] = self.deploy_dir_image
+ if not launch_cmd:
+ launch_cmd = 'runqemu %s %s ' % ('snapshot' if discard_writes else '', runqemuparams)
+ if self.use_kvm:
+ self.logger.debug('Using kvm for runqemu')
+ launch_cmd += ' kvm'
+ else:
+ self.logger.debug('Not using kvm for runqemu')
+ if not self.display:
+ launch_cmd += ' nographic'
+ launch_cmd += ' %s %s' % (self.machine, self.rootfs)
+ return self.launch(launch_cmd, qemuparams=qemuparams, get_ip=get_ip, extra_bootparams=extra_bootparams, env=env)
+ def launch(self, launch_cmd, get_ip = True, qemuparams = None, extra_bootparams = None, env = None):
+ try:
+ self.threadsock, threadport = self.create_socket()
+ self.server_socket, self.serverport = self.create_socket()
+ except socket.error as msg:
+ self.logger.error("Failed to create listening socket: %s" % msg[1])
+ return False
+ bootparams = 'console=tty1 console=ttyS0,115200n8 printk.time=1'
+ if extra_bootparams:
+ bootparams = bootparams + ' ' + extra_bootparams
+ # Ask QEMU to store the QEMU process PID in file, this way we don't have to parse running processes
+ # and analyze descendents in order to determine it.
+ if os.path.exists(self.qemu_pidfile):
+ os.remove(self.qemu_pidfile)
+ self.qemuparams = 'bootparams="{0}" qemuparams="-serial tcp:{1} -pidfile {2}"'.format(bootparams, threadport, self.qemu_pidfile)
+ if qemuparams:
+ self.qemuparams = self.qemuparams[:-1] + " " + qemuparams + " " + '\"'
+ launch_cmd += ' tcpserial=%s %s' % (self.serverport, self.qemuparams)
+ self.origchldhandler = signal.getsignal(signal.SIGCHLD)
+ signal.signal(signal.SIGCHLD, self.handleSIGCHLD)
+ self.logger.debug('launchcmd=%s'%(launch_cmd))
+ # FIXME: We pass in stdin=subprocess.PIPE here to work around stty
+ # blocking at the end of the runqemu script when using this within
+ # oe-selftest (this makes stty error out immediately). There ought
+ # to be a proper fix but this will suffice for now.
+ self.runqemu = subprocess.Popen(launch_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, preexec_fn=os.setpgrp, env=env)
+ output = self.runqemu.stdout
+ #
+ # We need the preexec_fn above so that all runqemu processes can easily be killed
+ # (by killing their process group). This presents a problem if this controlling
+ # process itself is killed however since those processes don't notice the death
+ # of the parent and merrily continue on.
+ #
+ # Rather than hack runqemu to deal with this, we add something here instead.
+ # Basically we fork off another process which holds an open pipe to the parent
+ # and also is setpgrp. If/when the pipe sees EOF from the parent dieing, it kills
+ # the process group. This is like pctrl's PDEATHSIG but for a process group
+ # rather than a single process.
+ #
+ r, w = os.pipe()
+ self.monitorpid = os.fork()
+ if self.monitorpid:
+ os.close(r)
+ self.monitorpipe = os.fdopen(w, "w")
+ else:
+ # child process
+ os.setpgrp()
+ os.close(w)
+ r = os.fdopen(r)
+ x =
+ os.killpg(os.getpgid(, signal.SIGTERM)
+ sys.exit(0)
+ self.logger.debug("runqemu started, pid is %s" %
+ self.logger.debug("waiting at most %s seconds for qemu pid (%s)" %
+ (self.runqemutime, time.strftime("%D %H:%M:%S")))
+ endtime = time.time() + self.runqemutime
+ while not self.is_alive() and time.time() < endtime:
+ if self.runqemu.poll():
+ if self.runqemu.returncode:
+ # No point waiting any longer
+ self.logger.debug('runqemu exited with code %d' % self.runqemu.returncode)
+ self._dump_host()
+ self.logger.debug("Output from runqemu:\n%s" % self.getOutput(output))
+ self.stop()
+ return False
+ time.sleep(0.5)
+ if not self.is_alive():
+ self.logger.error("Qemu pid didn't appear in %s seconds (%s)" %
+ (self.runqemutime, time.strftime("%D %H:%M:%S")))
+ # Dump all processes to help us to figure out what is going on...
+ ps = subprocess.Popen(['ps', 'axww', '-o', 'pid,ppid,command '], stdout=subprocess.PIPE).communicate()[0]
+ processes = ps.decode("utf-8")
+ self.logger.debug("Running processes:\n%s" % processes)
+ self._dump_host()
+ op = self.getOutput(output)
+ self.stop()
+ if op:
+ self.logger.error("Output from runqemu:\n%s" % op)
+ else:
+ self.logger.error("No output from runqemu.\n")
+ return False
+ # We are alive: qemu is running
+ out = self.getOutput(output)
+ netconf = False # network configuration is not required by default
+ self.logger.debug("qemu started in %s seconds - qemu procces pid is %s (%s)" %
+ (time.time() - (endtime - self.runqemutime),
+ self.qemupid, time.strftime("%D %H:%M:%S")))
+ if get_ip:
+ cmdline = ''
+ with open('/proc/%s/cmdline' % self.qemupid) as p:
+ cmdline =
+ # It is needed to sanitize the data received
+ # because is possible to have control characters
+ cmdline = re_control_char.sub(' ', cmdline)
+ try:
+ ips = re.findall(r"((?:[0-9]{1,3}\.){3}[0-9]{1,3})", cmdline.split("ip=")[1])
+ self.ip = ips[0]
+ self.server_ip = ips[1]
+ self.logger.debug("qemu cmdline used:\n{}".format(cmdline))
+ except (IndexError, ValueError):
+ # Try to get network configuration from runqemu output
+ match = re.match(r'.*Network configuration: ([0-9.]+)::([0-9.]+):([0-9.]+)$.*',
+ out, re.MULTILINE|re.DOTALL)
+ if match:
+ self.ip, self.server_ip, self.netmask = match.groups()
+ # network configuration is required as we couldn't get it
+ # from the runqemu command line, so qemu doesn't run kernel
+ # and guest networking is not configured
+ netconf = True
+ else:
+ self.logger.error("Couldn't get ip from qemu command line and runqemu output! "
+ "Here is the qemu command line used:\n%s\n"
+ "and output from runqemu:\n%s" % (cmdline, out))
+ self._dump_host()
+ self.stop()
+ return False
+ self.logger.debug("Target IP: %s" % self.ip)
+ self.logger.debug("Server IP: %s" % self.server_ip)
+ self.thread = LoggingThread(self.log, self.threadsock, self.logger)
+ self.thread.start()
+ if not self.thread.connection_established.wait(self.boottime):
+ self.logger.error("Didn't receive a console connection from qemu. "
+ "Here is the qemu command line used:\n%s\nand "
+ "output from runqemu:\n%s" % (cmdline, out))
+ self.stop_thread()
+ return False
+ self.logger.debug("Output from runqemu:\n%s", out)
+ self.logger.debug("Waiting at most %d seconds for login banner (%s)" %
+ (self.boottime, time.strftime("%D %H:%M:%S")))
+ endtime = time.time() + self.boottime
+ socklist = [self.server_socket]
+ reachedlogin = False
+ stopread = False
+ qemusock = None
+ bootlog = b''
+ data = b''
+ while time.time() < endtime and not stopread:
+ try:
+ sread, swrite, serror =, [], [], 5)
+ except InterruptedError:
+ continue
+ for sock in sread:
+ if sock is self.server_socket:
+ qemusock, addr = self.server_socket.accept()
+ qemusock.setblocking(0)
+ socklist.append(qemusock)
+ socklist.remove(self.server_socket)
+ self.logger.debug("Connection from %s:%s" % addr)
+ else:
+ data = data + sock.recv(1024)
+ if data:
+ bootlog += data
+ data = b''
+ if b' login:' in bootlog:
+ self.server_socket = qemusock
+ stopread = True
+ reachedlogin = True
+ self.logger.debug("Reached login banner in %s seconds (%s)" %
+ (time.time() - (endtime - self.boottime),
+ time.strftime("%D %H:%M:%S")))
+ else:
+ # no need to check if reachedlogin unless we support multiple connections
+ self.logger.debug("QEMU socket disconnected before login banner reached. (%s)" %
+ time.strftime("%D %H:%M:%S"))
+ socklist.remove(sock)
+ sock.close()
+ stopread = True
+ if not reachedlogin:
+ if time.time() >= endtime:
+ self.logger.debug("Target didn't reach login banner in %d seconds (%s)" %
+ (self.boottime, time.strftime("%D %H:%M:%S")))
+ tail = lambda l: "\n".join(l.splitlines()[-25:])
+ bootlog = bootlog.decode("utf-8")
+ # in case bootlog is empty, use tail qemu log store at self.msg
+ lines = tail(bootlog if bootlog else self.msg)
+ self.logger.debug("Last 25 lines of text:\n%s" % lines)
+ self.logger.debug("Check full boot log: %s" % self.logfile)
+ self._dump_host()
+ self.stop()
+ return False
+ # If we are not able to login the tests can continue
+ try:
+ (status, output) = self.run_serial("root\n", raw=True)
+ if"root@[a-zA-Z0-9\-]+:~#", output):
+ self.logged = True
+ self.logger.debug("Logged as root in serial console")
+ if netconf:
+ # configure guest networking
+ cmd = "ifconfig eth0 %s netmask %s up\n" % (self.ip, self.netmask)
+ output = self.run_serial(cmd, raw=True)[1]
+ if"root@[a-zA-Z0-9\-]+:~#", output):
+ self.logger.debug("configured ip address %s", self.ip)
+ else:
+ self.logger.debug("Couldn't configure guest networking")
+ else:
+ self.logger.debug("Couldn't login into serial console"
+ " as root using blank password")
+ self.logger.debug("The output:\n%s" % output)
+ except:
+ self.logger.debug("Serial console failed while trying to login")
+ return True
+ def stop(self):
+ if hasattr(self, "origchldhandler"):
+ signal.signal(signal.SIGCHLD, self.origchldhandler)
+ self.stop_thread()
+ self.stop_qemu_system()
+ if self.runqemu:
+ if hasattr(self, "monitorpid"):
+ os.kill(self.monitorpid, signal.SIGKILL)
+ self.logger.debug("Sending SIGTERM to runqemu")
+ try:
+ os.killpg(os.getpgid(, signal.SIGTERM)
+ except OSError as e:
+ if e.errno != errno.ESRCH:
+ raise
+ endtime = time.time() + self.runqemutime
+ while self.runqemu.poll() is None and time.time() < endtime:
+ time.sleep(1)
+ if self.runqemu.poll() is None:
+ self.logger.debug("Sending SIGKILL to runqemu")
+ os.killpg(os.getpgid(, signal.SIGKILL)
+ self.runqemu.stdin.close()
+ self.runqemu.stdout.close()
+ self.runqemu = None
+ if hasattr(self, 'server_socket') and self.server_socket:
+ self.server_socket.close()
+ self.server_socket = None
+ if hasattr(self, 'threadsock') and self.threadsock:
+ self.threadsock.close()
+ self.threadsock = None
+ self.qemupid = None
+ self.ip = None
+ if os.path.exists(self.qemu_pidfile):
+ os.remove(self.qemu_pidfile)
+ if self.monitorpipe:
+ self.monitorpipe.close()
+ def stop_qemu_system(self):
+ if self.qemupid:
+ try:
+ # qemu-system behaves well and a SIGTERM is enough
+ os.kill(self.qemupid, signal.SIGTERM)
+ except ProcessLookupError as e:
+ self.logger.warning('qemu-system ended unexpectedly')
+ def stop_thread(self):
+ if self.thread and self.thread.is_alive():
+ self.thread.stop()
+ self.thread.join()
+ def restart(self, qemuparams = None):
+ self.logger.debug("Restarting qemu process")
+ if self.runqemu.poll() is None:
+ self.stop()
+ if self.start(qemuparams):
+ return True
+ return False
+ def is_alive(self):
+ if not self.runqemu or self.runqemu.poll() is not None:
+ return False
+ if os.path.isfile(self.qemu_pidfile):
+ f = open(self.qemu_pidfile, 'r')
+ qemu_pid =
+ f.close()
+ qemupid = int(qemu_pid)
+ if os.path.exists("/proc/" + str(qemupid)):
+ self.qemupid = qemupid
+ return True
+ return False
+ def run_serial(self, command, raw=False, timeout=60):
+ # We assume target system have echo to get command status
+ if not raw:
+ command = "%s; echo $?\n" % command
+ data = ''
+ status = 0
+ self.server_socket.sendall(command.encode('utf-8'))
+ start = time.time()
+ end = start + timeout
+ while True:
+ now = time.time()
+ if now >= end:
+ data += "<<< run_serial(): command timed out after %d seconds without output >>>\r\n\r\n" % timeout
+ break
+ try:
+ sread, _, _ =[self.server_socket],[],[], end - now)
+ except InterruptedError:
+ continue
+ if sread:
+ answer = self.server_socket.recv(1024)
+ if answer:
+ data += answer.decode('utf-8')
+ # Search the prompt to stop
+ if"[a-zA-Z0-9]+@[a-zA-Z0-9\-]+:~#", data):
+ break
+ else:
+ raise Exception("No data on serial console socket")
+ if data:
+ if raw:
+ status = 1
+ else:
+ # Remove first line (command line) and last line (prompt)
+ data = data[data.find('$?\r\n')+4:data.rfind('\r\n')]
+ index = data.rfind('\r\n')
+ if index == -1:
+ status_cmd = data
+ data = ""
+ else:
+ status_cmd = data[index+2:]
+ data = data[:index]
+ if (status_cmd == "0"):
+ status = 1
+ return (status, str(data))
+ def _dump_host(self):
+ self.host_dumper.create_dir("qemu")
+ self.logger.warning("Qemu ended unexpectedly, dump data from host"
+ " is in %s" % self.host_dumper.dump_dir)
+ self.host_dumper.dump_host()
+# This class is for reading data from a socket and passing it to logfunc
+# to be processed. It's completely event driven and has a straightforward
+# event loop. The mechanism for stopping the thread is a simple pipe which
+# will wake up the poll and allow for tearing everything down.
+class LoggingThread(threading.Thread):
+ def __init__(self, logfunc, sock, logger):
+ self.connection_established = threading.Event()
+ self.serversock = sock
+ self.logfunc = logfunc
+ self.logger = logger
+ self.readsock = None
+ self.running = False
+ self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
+ self.readevents = select.POLLIN | select.POLLPRI
+ threading.Thread.__init__(self, target=self.threadtarget)
+ def threadtarget(self):
+ try:
+ self.eventloop()
+ finally:
+ self.teardown()
+ def run(self):
+ self.logger.debug("Starting logging thread")
+ self.readpipe, self.writepipe = os.pipe()
+ def stop(self):
+ self.logger.debug("Stopping logging thread")
+ if self.running:
+ os.write(self.writepipe, bytes("stop", "utf-8"))
+ def teardown(self):
+ self.logger.debug("Tearing down logging thread")
+ self.close_socket(self.serversock)
+ if self.readsock is not None:
+ self.close_socket(self.readsock)
+ self.close_ignore_error(self.readpipe)
+ self.close_ignore_error(self.writepipe)
+ self.running = False
+ def eventloop(self):
+ poll = select.poll()
+ event_read_mask = self.errorevents | self.readevents
+ poll.register(self.serversock.fileno())
+ poll.register(self.readpipe, event_read_mask)
+ breakout = False
+ self.running = True
+ self.logger.debug("Starting thread event loop")
+ while not breakout:
+ events = poll.poll()
+ for event in events:
+ # An error occurred, bail out
+ if event[1] & self.errorevents:
+ raise Exception(self.stringify_event(event[1]))
+ # Event to stop the thread
+ if self.readpipe == event[0]:
+ self.logger.debug("Stop event received")
+ breakout = True
+ break
+ # A connection request was received
+ elif self.serversock.fileno() == event[0]:
+ self.logger.debug("Connection request received")
+ self.readsock, _ = self.serversock.accept()
+ self.readsock.setblocking(0)
+ poll.unregister(self.serversock.fileno())
+ poll.register(self.readsock.fileno(), event_read_mask)
+ self.logger.debug("Setting connection established event")
+ self.connection_established.set()
+ # Actual data to be logged
+ elif self.readsock.fileno() == event[0]:
+ data = self.recv(1024)
+ self.logfunc(data)
+ # Since the socket is non-blocking make sure to honor EAGAIN
+ def recv(self, count):
+ try:
+ data = self.readsock.recv(count)
+ except socket.error as e:
+ if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
+ return ''
+ else:
+ raise
+ if data is None:
+ raise Exception("No data on read ready socket")
+ elif not data:
+ # This actually means an orderly shutdown
+ # happened. But for this code it counts as an
+ # error since the connection shouldn't go away
+ # until qemu exits.
+ raise Exception("Console connection closed unexpectedly")
+ return data
+ def stringify_event(self, event):
+ val = ''
+ if select.POLLERR == event:
+ val = 'POLLER'
+ elif select.POLLHUP == event:
+ val = 'POLLHUP'
+ elif select.POLLNVAL == event:
+ val = 'POLLNVAL'
+ return val
+ def close_socket(self, sock):
+ sock.shutdown(socket.SHUT_RDWR)
+ sock.close()
+ def close_ignore_error(self, fd):
+ try:
+ os.close(fd)
+ except OSError:
+ pass
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..5aa99d06
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,176 @@
+# Copyright (C) 2015 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+# This module provides a class for starting qemu images of poky tiny.
+# It's used by testimage.bbclass.
+import subprocess
+import os
+import time
+import signal
+import re
+import socket
+import select
+import bb
+from .qemurunner import QemuRunner
+class QemuTinyRunner(QemuRunner):
+ def __init__(self, machine, rootfs, display, tmpdir, deploy_dir_image, logfile, kernel, boottime, logger):
+ # Popen object for runqemu
+ self.runqemu = None
+ # pid of the qemu process that runqemu will start
+ self.qemupid = None
+ # target ip - from the command line
+ self.ip = None
+ # host ip - where qemu is running
+ self.server_ip = None
+ self.machine = machine
+ self.rootfs = rootfs
+ self.display = display
+ self.tmpdir = tmpdir
+ self.deploy_dir_image = deploy_dir_image
+ self.logfile = logfile
+ self.boottime = boottime
+ self.runqemutime = 60
+ self.socketfile = "console.sock"
+ self.server_socket = None
+ self.kernel = kernel
+ self.logger = logger
+ def create_socket(self):
+ tries = 3
+ while tries > 0:
+ try:
+ self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.server_socket.connect(self.socketfile)
+ bb.note("Created listening socket for qemu serial console.")
+ tries = 0
+ except socket.error as msg:
+ self.server_socket.close()
+ bb.fatal("Failed to create listening socket.")
+ tries -= 1
+ def log(self, msg):
+ if self.logfile:
+ with open(self.logfile, "a") as f:
+ f.write("%s" % msg)
+ def start(self, qemuparams = None, ssh=True, extra_bootparams=None, runqemuparams='', discard_writes=True):
+ if self.display:
+ os.environ["DISPLAY"] = self.display
+ else:
+ bb.error("To start qemu I need a X desktop, please set DISPLAY correctly (e.g. DISPLAY=:1)")
+ return False
+ if not os.path.exists(self.rootfs):
+ bb.error("Invalid rootfs %s" % self.rootfs)
+ return False
+ if not os.path.exists(self.tmpdir):
+ bb.error("Invalid TMPDIR path %s" % self.tmpdir)
+ return False
+ else:
+ os.environ["OE_TMPDIR"] = self.tmpdir
+ if not os.path.exists(self.deploy_dir_image):
+ bb.error("Invalid DEPLOY_DIR_IMAGE path %s" % self.deploy_dir_image)
+ return False
+ else:
+ os.environ["DEPLOY_DIR_IMAGE"] = self.deploy_dir_image
+ # Set this flag so that Qemu doesn't do any grabs as SDL grabs interact
+ # badly with screensavers.
+ os.environ["QEMU_DONT_GRAB"] = "1"
+ self.qemuparams = '--append "root=/dev/ram0 console=ttyS0" -nographic -serial unix:%s,server,nowait' % self.socketfile
+ launch_cmd = 'qemu-system-i386 -kernel %s -initrd %s %s' % (self.kernel, self.rootfs, self.qemuparams)
+ self.runqemu = subprocess.Popen(launch_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,preexec_fn=os.setpgrp)
+ bb.note("runqemu started, pid is %s" %
+ bb.note("waiting at most %s seconds for qemu pid" % self.runqemutime)
+ endtime = time.time() + self.runqemutime
+ while not self.is_alive() and time.time() < endtime:
+ time.sleep(1)
+ if self.is_alive():
+ bb.note("qemu started - qemu procces pid is %s" % self.qemupid)
+ self.create_socket()
+ else:
+ bb.note("Qemu pid didn't appeared in %s seconds" % self.runqemutime)
+ output = self.runqemu.stdout
+ self.stop()
+ bb.note("Output from runqemu:\n%s" %"utf-8"))
+ return False
+ return self.is_alive()
+ def run_serial(self, command, timeout=60):
+ self.server_socket.sendall(command+'\n')
+ data = ''
+ status = 0
+ stopread = False
+ endtime = time.time()+timeout
+ while time.time()<endtime and not stopread:
+ try:
+ sread, _, _ =[self.server_socket],[],[],1)
+ except InterruptedError:
+ continue
+ for sock in sread:
+ answer = sock.recv(1024)
+ if answer:
+ data += answer
+ else:
+ sock.close()
+ stopread = True
+ if not data:
+ status = 1
+ if not stopread:
+ data += "<<< run_serial(): command timed out after %d seconds without output >>>\r\n\r\n" % timeout
+ return (status, str(data))
+ def find_child(self,parent_pid):
+ #
+ # Walk the process tree from the process specified looking for a qemu-system. Return its [pid'cmd]
+ #
+ ps = subprocess.Popen(['ps', 'axww', '-o', 'pid,ppid,command'], stdout=subprocess.PIPE).communicate()[0]
+ processes = ps.decode("utf-8").split('\n')
+ nfields = len(processes[0].split()) - 1
+ pids = {}
+ commands = {}
+ for row in processes[1:]:
+ data = row.split(None, nfields)
+ if len(data) != 3:
+ continue
+ if data[1] not in pids:
+ pids[data[1]] = []
+ pids[data[1]].append(data[0])
+ commands[data[0]] = data[2]
+ if parent_pid not in pids:
+ return []
+ parents = []
+ newparents = pids[parent_pid]
+ while newparents:
+ next = []
+ for p in newparents:
+ if p in pids:
+ for n in pids[p]:
+ if n not in parents and n not in next:
+ next.append(n)
+ if p not in parents:
+ parents.append(p)
+ newparents = next
+ #print("Children matching %s:" % str(parents))
+ for p in parents:
+ # Need to be careful here since runqemu runs "ldd qemu-system-xxxx"
+ # Also, old versions of ldd (2.11) run "LD_XXXX qemu-system-xxxx"
+ basecmd = commands[p].split()[0]
+ basecmd = os.path.basename(basecmd)
+ if "qemu-system" in basecmd and "-serial unix" in commands[p]:
+ return [int(p),commands[p]]
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..d292893c
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,242 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+# Provides a class for setting up ssh connections,
+# running commands and copying files to/from a target.
+# It's used by testimage.bbclass and tests in lib/oeqa/runtime.
+import subprocess
+import time
+import os
+import select
+class SSHProcess(object):
+ def __init__(self, **options):
+ self.defaultopts = {
+ "stdout": subprocess.PIPE,
+ "stderr": subprocess.STDOUT,
+ "stdin": None,
+ "shell": False,
+ "bufsize": -1,
+ "preexec_fn": os.setsid,
+ }
+ self.options = dict(self.defaultopts)
+ self.options.update(options)
+ self.status = None
+ self.output = None
+ self.process = None
+ self.starttime = None
+ self.logfile = None
+ # Unset DISPLAY which means we won't trigger SSH_ASKPASS
+ env = os.environ.copy()
+ if "DISPLAY" in env:
+ del env['DISPLAY']
+ self.options['env'] = env
+ def log(self, msg):
+ if self.logfile:
+ with open(self.logfile, "a") as f:
+ f.write("%s" % msg)
+ def _run(self, command, timeout=None, logfile=None):
+ self.logfile = logfile
+ self.starttime = time.time()
+ output = ''
+ self.process = subprocess.Popen(command, **self.options)
+ if timeout:
+ endtime = self.starttime + timeout
+ eof = False
+ while time.time() < endtime and not eof:
+ try:
+ if[self.process.stdout], [], [], 5)[0] != []:
+ data =, 1024)
+ if not data:
+ self.process.stdout.close()
+ eof = True
+ else:
+ data = data.decode("utf-8")
+ output += data
+ self.log(data)
+ endtime = time.time() + timeout
+ except InterruptedError:
+ continue
+ # process hasn't returned yet
+ if not eof:
+ self.process.terminate()
+ time.sleep(5)
+ try:
+ self.process.kill()
+ except OSError:
+ pass
+ lastline = "\nProcess killed - no output for %d seconds. Total running time: %d seconds." % (timeout, time.time() - self.starttime)
+ self.log(lastline)
+ output += lastline
+ else:
+ output = self.process.communicate()[0]
+ self.log(output.rstrip())
+ self.status = self.process.wait()
+ self.output = output.rstrip()
+ def run(self, command, timeout=None, logfile=None):
+ try:
+ self._run(command, timeout, logfile)
+ except:
+ # Need to guard against a SystemExit or other exception occuring whilst running
+ # and ensure we don't leave a process behind.
+ if self.process.poll() is None:
+ self.process.kill()
+ self.status = self.process.wait()
+ raise
+ return (self.status, self.output)
+class SSHControl(object):
+ def __init__(self, ip, logfile=None, timeout=300, user='root', port=None):
+ self.ip = ip
+ self.defaulttimeout = timeout
+ self.ignore_status = True
+ self.logfile = logfile
+ self.user = user
+ self.ssh_options = [
+ '-o', 'UserKnownHostsFile=/dev/null',
+ '-o', 'StrictHostKeyChecking=no',
+ '-o', 'LogLevel=ERROR'
+ ]
+ self.ssh = ['ssh', '-l', self.user ] + self.ssh_options
+ self.scp = ['scp'] + self.ssh_options
+ if port:
+ self.ssh = self.ssh + [ '-p', port ]
+ self.scp = self.scp + [ '-P', port ]
+ def log(self, msg):
+ if self.logfile:
+ with open(self.logfile, "a") as f:
+ f.write("%s\n" % msg)
+ def _internal_run(self, command, timeout=None, ignore_status = True):
+ self.log("[Running]$ %s" % " ".join(command))
+ proc = SSHProcess()
+ status, output =, timeout, logfile=self.logfile)
+ self.log("[Command returned '%d' after %.2f seconds]" % (status, time.time() - proc.starttime))
+ if status and not ignore_status:
+ raise AssertionError("Command '%s' returned non-zero exit status %d:\n%s" % (command, status, output))
+ return (status, output)
+ def run(self, command, timeout=None):
+ """
+ command - ssh command to run
+ timeout=<val> - kill command if there is no output after <val> seconds
+ timeout=None - kill command if there is no output after a default value seconds
+ timeout=0 - no timeout, let command run until it returns
+ """
+ command = self.ssh + [self.ip, 'export PATH=/usr/sbin:/sbin:/usr/bin:/bin; ' + command]
+ if timeout is None:
+ return self._internal_run(command, self.defaulttimeout, self.ignore_status)
+ if timeout == 0:
+ return self._internal_run(command, None, self.ignore_status)
+ return self._internal_run(command, timeout, self.ignore_status)
+ def copy_to(self, localpath, remotepath):
+ if os.path.islink(localpath):
+ localpath = os.path.dirname(localpath) + "/" + os.readlink(localpath)
+ command = self.scp + [localpath, '%s@%s:%s' % (self.user, self.ip, remotepath)]
+ return self._internal_run(command, ignore_status=False)
+ def copy_from(self, remotepath, localpath):
+ command = self.scp + ['%s@%s:%s' % (self.user, self.ip, remotepath), localpath]
+ return self._internal_run(command, ignore_status=False)
+ def copy_dir_to(self, localpath, remotepath):
+ """
+ Copy recursively localpath directory to remotepath in target.
+ """
+ for root, dirs, files in os.walk(localpath):
+ # Create directories in the target as needed
+ for d in dirs:
+ tmp_dir = os.path.join(root, d).replace(localpath, "")
+ new_dir = os.path.join(remotepath, tmp_dir.lstrip("/"))
+ cmd = "mkdir -p %s" % new_dir
+ # Copy files into the target
+ for f in files:
+ tmp_file = os.path.join(root, f).replace(localpath, "")
+ dst_file = os.path.join(remotepath, tmp_file.lstrip("/"))
+ src_file = os.path.join(root, f)
+ self.copy_to(src_file, dst_file)
+ def delete_files(self, remotepath, files):
+ """
+ Delete files in target's remote path.
+ """
+ cmd = "rm"
+ if not isinstance(files, list):
+ files = [files]
+ for f in files:
+ cmd = "%s %s" % (cmd, os.path.join(remotepath, f))
+ def delete_dir(self, remotepath):
+ """
+ Delete remotepath directory in target.
+ """
+ cmd = "rmdir %s" % remotepath
+ def delete_dir_structure(self, localpath, remotepath):
+ """
+ Delete recursively localpath structure directory in target's remotepath.
+ This function is very usefult to delete a package that is installed in
+ the DUT and the host running the test has such package extracted in tmp
+ directory.
+ Example:
+ pwd: /home/user/tmp
+ tree: .
+ └── work
+ ├── dir1
+ │   └── file1
+ └── dir2
+ localpath = "/home/user/tmp" and remotepath = "/home/user"
+ With the above variables this function will try to delete the
+ directory in the DUT in this order:
+ /home/user/work/dir1/file1
+ /home/user/work/dir1 (if dir is empty)
+ /home/user/work/dir2 (if dir is empty)
+ /home/user/work (if dir is empty)
+ """
+ for root, dirs, files in os.walk(localpath, topdown=False):
+ # Delete files first
+ tmpdir = os.path.join(root).replace(localpath, "")
+ remotedir = os.path.join(remotepath, tmpdir.lstrip("/"))
+ self.delete_files(remotedir, files)
+ # Remove dirs if empty
+ for d in dirs:
+ tmpdir = os.path.join(root, d).replace(localpath, "")
+ remotedir = os.path.join(remotepath, tmpdir.lstrip("/"))
+ self.delete_dir(remotepath)
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..1f7d11b5
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,19 @@
+import subprocess
+class OETestCalledProcessError(subprocess.CalledProcessError):
+ def __str__(self):
+ def strify(o):
+ if isinstance(o, bytes):
+ return o.decode("utf-8", errors="replace")
+ else:
+ return o
+ s = "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
+ if hasattr(self, "output") and self.output:
+ s = s + "\nStandard Output: " + strify(self.output)
+ if hasattr(self, "stderr") and self.stderr:
+ s = s + "\nStandard Error: " + strify(self.stderr)
+ return s
+def errors_have_output():
+ subprocess.CalledProcessError = OETestCalledProcessError
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..b8db7b2a
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,140 @@
+# Copyright (C) 2013 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+# Provides a class for automating build tests for projects
+import os
+import re
+import bb.utils
+import subprocess
+import tempfile
+from abc import ABCMeta, abstractmethod
+class BuildProject(metaclass=ABCMeta):
+ def __init__(self, d, uri, foldername=None, tmpdir=None):
+ self.d = d
+ self.uri = uri
+ self.archive = os.path.basename(uri)
+ if not tmpdir:
+ tmpdir = self.d.getVar('WORKDIR')
+ if not tmpdir:
+ self.tempdirobj = tempfile.TemporaryDirectory(prefix='buildproject-')
+ tmpdir =
+ self.localarchive = os.path.join(tmpdir, self.archive)
+ if foldername:
+ self.fname = foldername
+ else:
+ self.fname = re.sub(r'\.tar\.bz2$|\.tar\.gz$|\.tar\.xz$', '', self.archive)
+ # Download self.archive to self.localarchive
+ def _download_archive(self):
+ dl_dir = self.d.getVar("DL_DIR")
+ if dl_dir and os.path.exists(os.path.join(dl_dir, self.archive)):
+ bb.utils.copyfile(os.path.join(dl_dir, self.archive), self.localarchive)
+ return
+ exportvars = ['HTTP_PROXY', 'http_proxy',
+ 'HTTPS_PROXY', 'https_proxy',
+ 'FTP_PROXY', 'ftp_proxy',
+ 'FTPS_PROXY', 'ftps_proxy',
+ 'NO_PROXY', 'no_proxy',
+ 'ALL_PROXY', 'all_proxy',
+ cmd = ''
+ for var in exportvars:
+ val = self.d.getVar(var)
+ if val:
+ cmd = 'export ' + var + '=\"%s\"; %s' % (val, cmd)
+ cmd = cmd + "wget -O %s %s" % (self.localarchive, self.uri)
+ subprocess.check_output(cmd, shell=True)
+ # This method should provide a way to run a command in the desired environment.
+ @abstractmethod
+ def _run(self, cmd):
+ pass
+ # The timeout parameter of is set to 0 to make the ssh command
+ # run with no timeout.
+ def run_configure(self, configure_args='', extra_cmds=''):
+ return self._run('cd %s; %s ./configure %s' % (self.targetdir, extra_cmds, configure_args))
+ def run_make(self, make_args=''):
+ return self._run('cd %s; make %s' % (self.targetdir, make_args))
+ def run_install(self, install_args=''):
+ return self._run('cd %s; make install %s' % (self.targetdir, install_args))
+ def clean(self):
+ self._run('rm -rf %s' % self.targetdir)
+ subprocess.check_call('rm -f %s' % self.localarchive, shell=True)
+ pass
+class TargetBuildProject(BuildProject):
+ def __init__(self, target, d, uri, foldername=None):
+ = target
+ self.targetdir = "~/"
+ BuildProject.__init__(self, d, uri, foldername)
+ def download_archive(self):
+ self._download_archive()
+ (status, output) =, self.targetdir)
+ if status != 0:
+ raise Exception("Failed to copy archive to target, output: %s" % output)
+ (status, output) ='tar xf %s%s -C %s' % (self.targetdir, self.archive, self.targetdir))
+ if status != 0:
+ raise Exception("Failed to extract archive, output: %s" % output)
+ #Change targetdir to project folder
+ self.targetdir = self.targetdir + self.fname
+ # The timeout parameter of is set to 0 to make the ssh command
+ # run with no timeout.
+ def _run(self, cmd):
+ return, 0)[0]
+class SDKBuildProject(BuildProject):
+ def __init__(self, testpath, sdkenv, d, uri, foldername=None):
+ self.sdkenv = sdkenv
+ self.testdir = testpath
+ self.targetdir = testpath
+ bb.utils.mkdirhier(testpath)
+ self.datetime = d.getVar('DATETIME')
+ self.testlogdir = d.getVar("TEST_LOG_DIR")
+ bb.utils.mkdirhier(self.testlogdir)
+ self.logfile = os.path.join(self.testlogdir, "sdk_target_log.%s" % self.datetime)
+ BuildProject.__init__(self, d, uri, foldername, tmpdir=testpath)
+ def download_archive(self):
+ self._download_archive()
+ cmd = 'tar xf %s%s -C %s' % (self.targetdir, self.archive, self.targetdir)
+ subprocess.check_output(cmd, shell=True)
+ #Change targetdir to project folder
+ self.targetdir = os.path.join(self.targetdir, self.fname)
+ def run_configure(self, configure_args='', extra_cmds=' gnu-configize; '):
+ return super(SDKBuildProject, self).run_configure(configure_args=(configure_args or '$CONFIGURE_FLAGS'), extra_cmds=extra_cmds)
+ def run_install(self, install_args=''):
+ return super(SDKBuildProject, self).run_install(install_args=(install_args or "DESTDIR=%s/../install" % self.targetdir))
+ def log(self, msg):
+ if self.logfile:
+ with open(self.logfile, "a") as f:
+ f.write("%s\n" % msg)
+ def _run(self, cmd):
+ self.log("Running . %s; " % self.sdkenv + cmd)
+ return subprocess.check_call(". %s; " % self.sdkenv + cmd, shell=True)
diff --git a/external/poky/meta/lib/oeqa/utils/ b/external/poky/meta/lib/oeqa/utils/
new file mode 100644
index 00000000..be2a2110
--- /dev/null
+++ b/external/poky/meta/lib/oeqa/utils/
@@ -0,0 +1,263 @@
+# Copyright (C) 2015 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+# Provides functions to help with exporting binaries obtained from built targets
+import os, re, glob as g, shutil as sh,sys
+from time import sleep
+from .commands import runCmd
+from difflib import SequenceMatcher as SM
+ import bb
+except ImportError:
+ class my_log():
+ def __init__(self):
+ pass
+ def plain(self, msg):
+ if msg:
+ print(msg)
+ def warn(self, msg):
+ if msg:
+ print("WARNING: " + msg)
+ def fatal(self, msg):
+ if msg:
+ print("FATAL:" + msg)
+ sys.exit(1)
+ bb = my_log()
+def determine_if_poky_env():
+ """
+ used to determine if we are inside the poky env or not. Usefull for remote machine where poky is not present
+ """
+ check_env = True if ("/scripts" and "/bitbake/bin") in os.getenv("PATH") else False
+ return check_env
+def get_dest_folder(tune_features, folder_list):
+ """
+ Function to determine what rpm deploy dir to choose for a given architecture based on TUNE_FEATURES
+ """
+ features_list = tune_features.split(" ")
+ features_list.reverse()
+ features_list = "_".join(features_list)
+ match_rate = 0
+ best_match = None
+ for folder in folder_list:
+ curr_match_rate = SM(None, folder, features_list).ratio()
+ if curr_match_rate > match_rate:
+ match_rate = curr_match_rate
+ best_match = folder
+ return best_match
+def process_binaries(d, params):
+ param_list = params
+ export_env = d.getVar("TEST_EXPORT_ONLY")
+ def extract_binary(pth_to_pkg, dest_pth=None):
+ cpio_command = runCmd("which cpio")
+ rpm2cpio_command = runCmd("ls /usr/bin/rpm2cpio")
+ if (cpio_command.status != 0) and (rpm2cpio_command.status != 0):
+ bb.fatal("Either \"rpm2cpio\" or \"cpio\" tools are not available on your system."
+ "All binaries extraction processes will not be available, crashing all related tests."
+ "Please install them according to your OS recommendations") # will exit here
+ if dest_pth:
+ os.chdir(dest_pth)
+ else:
+ os.chdir("%s" % os.sep)# this is for native package
+ extract_bin_command = runCmd("%s %s | %s -idm" % (rpm2cpio_command.output, pth_to_pkg, cpio_command.output)) # semi-hardcoded because of a bug on poky's rpm2cpio
+ return extract_bin_command
+ if determine_if_poky_env(): # machine with poky environment
+ exportpath = d.getVar("TEST_EXPORT_DIR") if export_env else d.getVar("DEPLOY_DIR")
+ rpm_deploy_dir = d.getVar("DEPLOY_DIR_RPM")
+ arch = get_dest_folder(d.getVar("TUNE_FEATURES"), os.listdir(rpm_deploy_dir))
+ arch_rpm_dir = os.path.join(rpm_deploy_dir, arch)
+ extracted_bin_dir = os.path.join(exportpath,"binaries", arch, "extracted_binaries")
+ packaged_bin_dir = os.path.join(exportpath,"binaries", arch, "packaged_binaries")
+ # creating necessary directory structure in case testing is done in poky env.
+ if export_env == "0":
+ if not os.path.exists(extracted_bin_dir): bb.utils.mkdirhier(extracted_bin_dir)
+ if not os.path.exists(packaged_bin_dir): bb.utils.mkdirhier(packaged_bin_dir)
+ if param_list[3] == "native":
+ if export_env == "1": #this is a native package and we only need to copy it. no need for extraction
+ native_rpm_dir = os.path.join(rpm_deploy_dir, get_dest_folder("{} nativesdk".format(d.getVar("BUILD_SYS")), os.listdir(rpm_deploy_dir)))
+ native_rpm_file_list = [item for item in os.listdir(native_rpm_dir) if"nativesdk-" + param_list[0] + "-([0-9]+\.*)", item)]
+ if not native_rpm_file_list:
+ bb.warn("Couldn't find any version of {} native package. Related tests will most probably fail.".format(param_list[0]))
+ return ""
+ for item in native_rpm_file_list:# will copy all versions of package. Used version will be selected on remote machine
+ bb.plain("Copying native package file: %s" % item)
+ sh.copy(os.path.join(rpm_deploy_dir, native_rpm_dir, item), os.path.join(d.getVar("TEST_EXPORT_DIR"), "binaries", "native"))
+ else: # nothing to do here; running tests under bitbake, so we asume native binaries are in sysroots dir.
+ if param_list[1] or param_list[4]:
+ bb.warn("Native binary %s %s%s. Running tests under bitbake environment. Version can't be checked except when the test itself does it"
+ " and binary can't be removed."%(param_list[0],"has assigned ver. " + param_list[1] if param_list[1] else "",
+ ", is marked for removal" if param_list[4] else ""))
+ else:# the package is target aka DUT intended and it is either required to be delivered in an extracted form or in a packaged version
+ target_rpm_file_list = [item for item in os.listdir(arch_rpm_dir) if[0] + "-([0-9]+\.*)", item)]
+ if not target_rpm_file_list:
+ bb.warn("Couldn't find any version of target package %s. Please ensure it was built. "
+ "Related tests will probably fail." % param_list[0])
+ return ""
+ if param_list[2] == "rpm": # binary should be deployed as rpm; (other, .deb, .ipk? ; in the near future)
+ for item in target_rpm_file_list: # copying all related rpm packages. "Intuition" reasons, someone may need other versions too. Deciding later on version
+ bb.plain("Copying target specific packaged file: %s" % item)
+ sh.copy(os.path.join(arch_rpm_dir, item), packaged_bin_dir)
+ return "copied"
+ else: # it is required to extract the binary
+ if param_list[1]: # the package is versioned
+ for item in target_rpm_file_list:
+ if re.match(".*-{}-.*\.rpm".format(param_list[1]), item):
+ destination = os.path.join(extracted_bin_dir,param_list[0], param_list[1])
+ bb.utils.mkdirhier(destination)
+ extract_binary(os.path.join(arch_rpm_dir, item), destination)
+ break
+ else:
+ bb.warn("Couldn't find the desired version %s for target binary %s. Related test cases will probably fail." % (param_list[1], param_list[0]))
+ return ""
+ return "extracted"
+ else: # no version provided, just extract one binary
+ destination = os.path.join(extracted_bin_dir,param_list[0],
+".*-([0-9]+\.[0-9]+)-.*rpm", target_rpm_file_list[0]).group(1))
+ bb.utils.mkdirhier(destination)
+ extract_binary(os.path.join(arch_rpm_dir, target_rpm_file_list[0]), destination)
+ return "extracted"
+ else: # remote machine
+ binaries_path = os.getenv("bin_dir")# in order to know where the binaries are, bin_dir is set as env. variable
+ if param_list[3] == "native": #need to extract the native pkg here
+ native_rpm_dir = os.path.join(binaries_path, "native")
+ native_rpm_file_list = os.listdir(native_rpm_dir)
+ for item in native_rpm_file_list:
+ if param_list[1] and re.match("nativesdk-{}-{}-.*\.rpm".format(param_list[0], param_list[1]), item): # native package has version
+ extract_binary(os.path.join(native_rpm_dir, item))
+ break
+ else:# just copy any related native binary
+ found_version = re.match("nativesdk-{}-([0-9]+\.[0-9]+)-".format(param_list[0]), item).group(1)
+ if found_version:
+ extract_binary(os.path.join(native_rpm_dir, item))
+ else:
+ bb.warn("Couldn't find native package %s%s. Related test cases will be influenced." %
+ (param_list[0], " with version " + param_list[1] if param_list[1] else ""))
+ return
+ else: # this is for target device
+ if param_list[2] == "rpm":
+ return "No need to extract, this is an .rpm file"
+ arch = get_dest_folder(d.getVar("TUNE_FEATURES"), os.listdir(binaries_path))
+ extracted_bin_path = os.path.join(binaries_path, arch, "extracted_binaries")
+ extracted_bin_list = [item for item in os.listdir(extracted_bin_path)]
+ packaged_bin_path = os.path.join(binaries_path, arch, "packaged_binaries")
+ packaged_bin_file_list = os.listdir(packaged_bin_path)
+ # see if the package is already in the extracted ones; maybe it was deployed when exported the env.
+ if os.path.exists(os.path.join(extracted_bin_path, param_list[0], param_list[1] if param_list[1] else "")):
+ return "binary %s is already extracted" % param_list[0]
+ else: # we need to search for it in the packaged binaries directory. It may have been shipped after export
+ for item in packaged_bin_file_list:
+ if param_list[1]:
+ if re.match("%s-%s.*rpm" % (param_list[0], param_list[1]), item): # package with version
+ if not os.path.exists(os.path.join(extracted_bin_path, param_list[0],param_list[1])):
+ os.makedirs(os.path.join(extracted_bin_path, param_list[0], param_list[1]))
+ extract_binary(os.path.join(packaged_bin_path, item), os.path.join(extracted_bin_path, param_list[0],param_list[1]))
+ bb.plain("Using {} for {}".format(os.path.join(packaged_bin_path, item), param_list[0]))
+ break
+ else:
+ if re.match("%s-.*rpm" % param_list[0], item):
+ found_version = re.match(".*-([0-9]+\.[0-9]+)-", item).group(1)
+ if not os.path.exists(os.path.join(extracted_bin_path, param_list[0], found_version)):
+ os.makedirs(os.path.join(extracted_bin_path, param_list[0], found_version))
+ bb.plain("Used ver. %s for %s" % (found_version, param_list[0]))
+ extract_binary(os.path.join(packaged_bin_path, item), os.path.join(extracted_bin_path, param_list[0], found_version))
+ break
+ else:
+ bb.warn("Couldn't find target package %s%s. Please ensure it is available "
+ "in either of these directories: extracted_binaries or packaged_binaries. "
+ "Related tests will probably fail." % (param_list[0], " with version " + param_list[1] if param_list[1] else ""))
+ return
+ return "Binary %s extracted successfully." % param_list[0]
+def files_to_copy(base_dir):
+ """
+ Produces a list of files relative to the base dir path sent as param
+ :return: the list of relative path files
+ """
+ files_list = []
+ dir_list = [base_dir]
+ count = 1
+ dir_count = 1
+ while (dir_count == 1 or dir_count != count):
+ count = dir_count
+ for dir in dir_list:
+ for item in os.listdir(dir):
+ if os.path.isdir(os.path.join(dir, item)) and os.path.join(dir, item) not in dir_list:
+ dir_list.append(os.path.join(dir, item))
+ dir_count = len(dir_list)
+ elif os.path.join(dir, item) not in files_list and os.path.isfile(os.path.join(dir, item)):
+ files_list.append(os.path.join(dir, item))
+ return files_list
+def send_bin_to_DUT(d,params):
+ from oeqa.oetest import oeRuntimeTest
+ param_list = params
+ cleanup_list = list()
+ bins_dir = os.path.join(d.getVar("TEST_EXPORT_DIR"), "binaries") if determine_if_poky_env() \
+ else os.getenv("bin_dir")
+ arch = get_dest_folder(d.getVar("TUNE_FEATURES"), os.listdir(bins_dir))
+ arch_rpms_dir = os.path.join(bins_dir, arch, "packaged_binaries")
+ extracted_bin_dir = os.path.join(bins_dir, arch, "extracted_binaries", param_list[0])
+ def send_extracted_binary():
+ bin_local_dir = os.path.join(extracted_bin_dir, param_list[1] if param_list[1] else os.listdir(extracted_bin_dir)[0])
+ for item in files_to_copy(bin_local_dir):
+ split_path = item.split(bin_local_dir)[1]
+ path_on_DUT = split_path if split_path[0] is "/" else "/" + split_path # create the path as on DUT; eg. /usr/bin/bin_file
+ (status, output) =, path_on_DUT)
+ if status != 0:
+ bb.warn("Failed to copy %s binary file %s on the remote target: %s" %
+ (param_list[0], "ver. " + param_list[1] if param_list[1] else "", d.getVar("MACHINE")))
+ return
+ if param_list[4] == "rm":
+ cleanup_list.append(path_on_DUT)
+ return cleanup_list
+ def send_rpm(remote_path): # if it is not required to have an extracted binary, but to send an .rpm file
+ rpm_to_send = ""
+ for item in os.listdir(arch_rpms_dir):
+ if param_list[1] and re.match("%s-%s-.*rpm"%(param_list[0], param_list[1]), item):
+ rpm_to_send = item
+ break
+ elif re.match("%s-[0-9]+\.[0-9]+-.*rpm" % param_list[0], item):
+ rpm_to_send = item
+ break
+ else:
+ bb.warn("No rpm package found for %s %s in .rpm files dir %s. Skipping deployment." %
+ (param_list[0], "ver. " + param_list[1] if param_list[1] else "", rpms_file_dir) )
+ return
+ (status, output) =, rpm_to_send), remote_path)
+ if status != 0:
+ bb.warn("Failed to copy %s on the remote target: %s" %(param_list[0], d.getVar("MACHINE")))
+ return
+ if param_list[4] == "rm":
+ cleanup_list.append(os.path.join(remote_path, rpm_to_send))
+ return cleanup_list
+ if param_list[2] == "rpm": # send an .rpm file
+ return send_rpm("/home/root") # rpms will be sent on home dir of remote machine
+ else:
+ return send_extracted_binary()
+def rm_bin(removal_list): # need to know both if the binary is sent archived and the path where it is sent if archived
+ from oeqa.oetest import oeRuntimeTest
+ for item in removal_list:
+ (status,output) ="rm " + item)
+ if status != 0:
+ bb.warn("Failed to remove: %s. Please ensure connection with the target device is up and running and "
+ "you have the needed rights." % item)