summaryrefslogtreecommitdiffstats
path: root/external/poky/bitbake/lib/bb/tests
diff options
context:
space:
mode:
authorToshikazuOhiwa <toshikazu_ohiwa@mail.toyota.co.jp>2020-03-30 09:24:26 +0900
committerToshikazuOhiwa <toshikazu_ohiwa@mail.toyota.co.jp>2020-03-30 09:24:26 +0900
commit5b80bfd7bffd4c20d80b7c70a7130529e9a755dd (patch)
treeb4bb18dcd1487dbf1ea8127e5671b7bb2eded033 /external/poky/bitbake/lib/bb/tests
parent706ad73eb02caf8532deaf5d38995bd258725cb8 (diff)
agl-basesystem
Diffstat (limited to 'external/poky/bitbake/lib/bb/tests')
-rw-r--r--external/poky/bitbake/lib/bb/tests/__init__.py0
-rw-r--r--external/poky/bitbake/lib/bb/tests/codeparser.py428
-rw-r--r--external/poky/bitbake/lib/bb/tests/cooker.py83
-rw-r--r--external/poky/bitbake/lib/bb/tests/cow.py136
-rw-r--r--external/poky/bitbake/lib/bb/tests/data.py676
-rw-r--r--external/poky/bitbake/lib/bb/tests/event.py986
-rw-r--r--external/poky/bitbake/lib/bb/tests/fetch.py1838
-rw-r--r--external/poky/bitbake/lib/bb/tests/parse.py189
-rw-r--r--external/poky/bitbake/lib/bb/tests/utils.py607
9 files changed, 4943 insertions, 0 deletions
diff --git a/external/poky/bitbake/lib/bb/tests/__init__.py b/external/poky/bitbake/lib/bb/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/external/poky/bitbake/lib/bb/tests/__init__.py
diff --git a/external/poky/bitbake/lib/bb/tests/codeparser.py b/external/poky/bitbake/lib/bb/tests/codeparser.py
new file mode 100644
index 00000000..e30e78c1
--- /dev/null
+++ b/external/poky/bitbake/lib/bb/tests/codeparser.py
@@ -0,0 +1,428 @@
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# BitBake Test for codeparser.py
+#
+# Copyright (C) 2010 Chris Larson
+# Copyright (C) 2012 Richard Purdie
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+import unittest
+import logging
+import bb
+
+logger = logging.getLogger('BitBake.TestCodeParser')
+
+# bb.data references bb.parse but can't directly import due to circular dependencies.
+# Hack around it for now :(
+import bb.parse
+import bb.data
+
+class ReferenceTest(unittest.TestCase):
+ def setUp(self):
+ self.d = bb.data.init()
+
+ def setEmptyVars(self, varlist):
+ for k in varlist:
+ self.d.setVar(k, "")
+
+ def setValues(self, values):
+ for k, v in values.items():
+ self.d.setVar(k, v)
+
+ def assertReferences(self, refs):
+ self.assertEqual(self.references, refs)
+
+ def assertExecs(self, execs):
+ self.assertEqual(self.execs, execs)
+
+ def assertContains(self, contains):
+ self.assertEqual(self.contains, contains)
+
+class VariableReferenceTest(ReferenceTest):
+
+ def parseExpression(self, exp):
+ parsedvar = self.d.expandWithRefs(exp, None)
+ self.references = parsedvar.references
+
+ def test_simple_reference(self):
+ self.setEmptyVars(["FOO"])
+ self.parseExpression("${FOO}")
+ self.assertReferences(set(["FOO"]))
+
+ def test_nested_reference(self):
+ self.setEmptyVars(["BAR"])
+ self.d.setVar("FOO", "BAR")
+ self.parseExpression("${${FOO}}")
+ self.assertReferences(set(["FOO", "BAR"]))
+
+ def test_python_reference(self):
+ self.setEmptyVars(["BAR"])
+ self.parseExpression("${@d.getVar('BAR') + 'foo'}")
+ self.assertReferences(set(["BAR"]))
+
+class ShellReferenceTest(ReferenceTest):
+
+ def parseExpression(self, exp):
+ parsedvar = self.d.expandWithRefs(exp, None)
+ parser = bb.codeparser.ShellParser("ParserTest", logger)
+ parser.parse_shell(parsedvar.value)
+
+ self.references = parsedvar.references
+ self.execs = parser.execs
+
+ def test_quotes_inside_assign(self):
+ self.parseExpression('foo=foo"bar"baz')
+ self.assertReferences(set([]))
+
+ def test_quotes_inside_arg(self):
+ self.parseExpression('sed s#"bar baz"#"alpha beta"#g')
+ self.assertExecs(set(["sed"]))
+
+ def test_arg_continuation(self):
+ self.parseExpression("sed -i -e s,foo,bar,g \\\n *.pc")
+ self.assertExecs(set(["sed"]))
+
+ def test_dollar_in_quoted(self):
+ self.parseExpression('sed -i -e "foo$" *.pc')
+ self.assertExecs(set(["sed"]))
+
+ def test_quotes_inside_arg_continuation(self):
+ self.setEmptyVars(["bindir", "D", "libdir"])
+ self.parseExpression("""
+sed -i -e s#"moc_location=.*$"#"moc_location=${bindir}/moc4"# \\
+-e s#"uic_location=.*$"#"uic_location=${bindir}/uic4"# \\
+${D}${libdir}/pkgconfig/*.pc
+""")
+ self.assertReferences(set(["bindir", "D", "libdir"]))
+
+ def test_assign_subshell_expansion(self):
+ self.parseExpression("foo=$(echo bar)")
+ self.assertExecs(set(["echo"]))
+
+ def test_shell_unexpanded(self):
+ self.setEmptyVars(["QT_BASE_NAME"])
+ self.parseExpression('echo "${QT_BASE_NAME}"')
+ self.assertExecs(set(["echo"]))
+ self.assertReferences(set(["QT_BASE_NAME"]))
+
+ def test_incomplete_varexp_single_quotes(self):
+ self.parseExpression("sed -i -e 's:IP{:I${:g' $pc")
+ self.assertExecs(set(["sed"]))
+
+
+ def test_until(self):
+ self.parseExpression("until false; do echo true; done")
+ self.assertExecs(set(["false", "echo"]))
+ self.assertReferences(set())
+
+ def test_case(self):
+ self.parseExpression("""
+case $foo in
+*)
+bar
+;;
+esac
+""")
+ self.assertExecs(set(["bar"]))
+ self.assertReferences(set())
+
+ def test_assign_exec(self):
+ self.parseExpression("a=b c='foo bar' alpha 1 2 3")
+ self.assertExecs(set(["alpha"]))
+
+ def test_redirect_to_file(self):
+ self.setEmptyVars(["foo"])
+ self.parseExpression("echo foo >${foo}/bar")
+ self.assertExecs(set(["echo"]))
+ self.assertReferences(set(["foo"]))
+
+ def test_heredoc(self):
+ self.setEmptyVars(["theta"])
+ self.parseExpression("""
+cat <<END
+alpha
+beta
+${theta}
+END
+""")
+ self.assertReferences(set(["theta"]))
+
+ def test_redirect_from_heredoc(self):
+ v = ["B", "SHADOW_MAILDIR", "SHADOW_MAILFILE", "SHADOW_UTMPDIR", "SHADOW_LOGDIR", "bindir"]
+ self.setEmptyVars(v)
+ self.parseExpression("""
+cat <<END >${B}/cachedpaths
+shadow_cv_maildir=${SHADOW_MAILDIR}
+shadow_cv_mailfile=${SHADOW_MAILFILE}
+shadow_cv_utmpdir=${SHADOW_UTMPDIR}
+shadow_cv_logdir=${SHADOW_LOGDIR}
+shadow_cv_passwd_dir=${bindir}
+END
+""")
+ self.assertReferences(set(v))
+ self.assertExecs(set(["cat"]))
+
+# def test_incomplete_command_expansion(self):
+# self.assertRaises(reftracker.ShellSyntaxError, reftracker.execs,
+# bbvalue.shparse("cp foo`", self.d), self.d)
+
+# def test_rogue_dollarsign(self):
+# self.setValues({"D" : "/tmp"})
+# self.parseExpression("install -d ${D}$")
+# self.assertReferences(set(["D"]))
+# self.assertExecs(set(["install"]))
+
+
+class PythonReferenceTest(ReferenceTest):
+
+ def setUp(self):
+ self.d = bb.data.init()
+ if hasattr(bb.utils, "_context"):
+ self.context = bb.utils._context
+ else:
+ import builtins
+ self.context = builtins.__dict__
+
+ def parseExpression(self, exp):
+ parsedvar = self.d.expandWithRefs(exp, None)
+ parser = bb.codeparser.PythonParser("ParserTest", logger)
+ parser.parse_python(parsedvar.value)
+
+ self.references = parsedvar.references | parser.references
+ self.execs = parser.execs
+ self.contains = parser.contains
+
+ @staticmethod
+ def indent(value):
+ """Python Snippets have to be indented, python values don't have to
+be. These unit tests are testing snippets."""
+ return " " + value
+
+ def test_getvar_reference(self):
+ self.parseExpression("d.getVar('foo')")
+ self.assertReferences(set(["foo"]))
+ self.assertExecs(set())
+
+ def test_getvar_computed_reference(self):
+ self.parseExpression("d.getVar('f' + 'o' + 'o')")
+ self.assertReferences(set())
+ self.assertExecs(set())
+
+ def test_getvar_exec_reference(self):
+ self.parseExpression("eval('d.getVar(\"foo\")')")
+ self.assertReferences(set())
+ self.assertExecs(set(["eval"]))
+
+ def test_var_reference(self):
+ self.context["foo"] = lambda x: x
+ self.setEmptyVars(["FOO"])
+ self.parseExpression("foo('${FOO}')")
+ self.assertReferences(set(["FOO"]))
+ self.assertExecs(set(["foo"]))
+ del self.context["foo"]
+
+ def test_var_exec(self):
+ for etype in ("func", "task"):
+ self.d.setVar("do_something", "echo 'hi mom! ${FOO}'")
+ self.d.setVarFlag("do_something", etype, True)
+ self.parseExpression("bb.build.exec_func('do_something', d)")
+ self.assertReferences(set([]))
+ self.assertExecs(set(["do_something"]))
+
+ def test_function_reference(self):
+ self.context["testfunc"] = lambda msg: bb.msg.note(1, None, msg)
+ self.d.setVar("FOO", "Hello, World!")
+ self.parseExpression("testfunc('${FOO}')")
+ self.assertReferences(set(["FOO"]))
+ self.assertExecs(set(["testfunc"]))
+ del self.context["testfunc"]
+
+ def test_qualified_function_reference(self):
+ self.parseExpression("time.time()")
+ self.assertExecs(set(["time.time"]))
+
+ def test_qualified_function_reference_2(self):
+ self.parseExpression("os.path.dirname('/foo/bar')")
+ self.assertExecs(set(["os.path.dirname"]))
+
+ def test_qualified_function_reference_nested(self):
+ self.parseExpression("time.strftime('%Y%m%d',time.gmtime())")
+ self.assertExecs(set(["time.strftime", "time.gmtime"]))
+
+ def test_function_reference_chained(self):
+ self.context["testget"] = lambda: "\tstrip me "
+ self.parseExpression("testget().strip()")
+ self.assertExecs(set(["testget"]))
+ del self.context["testget"]
+
+ def test_contains(self):
+ self.parseExpression('bb.utils.contains("TESTVAR", "one", "true", "false", d)')
+ self.assertContains({'TESTVAR': {'one'}})
+
+ def test_contains_multi(self):
+ self.parseExpression('bb.utils.contains("TESTVAR", "one two", "true", "false", d)')
+ self.assertContains({'TESTVAR': {'one two'}})
+
+ def test_contains_any(self):
+ self.parseExpression('bb.utils.contains_any("TESTVAR", "hello", "true", "false", d)')
+ self.assertContains({'TESTVAR': {'hello'}})
+
+ def test_contains_any_multi(self):
+ self.parseExpression('bb.utils.contains_any("TESTVAR", "one two three", "true", "false", d)')
+ self.assertContains({'TESTVAR': {'one', 'two', 'three'}})
+
+ def test_contains_filter(self):
+ self.parseExpression('bb.utils.filter("TESTVAR", "hello there world", d)')
+ self.assertContains({'TESTVAR': {'hello', 'there', 'world'}})
+
+
+class DependencyReferenceTest(ReferenceTest):
+
+ pydata = """
+d.getVar('somevar')
+def test(d):
+ foo = 'bar %s' % 'foo'
+def test2(d):
+ d.getVar(foo)
+ d.getVar('bar', False)
+ test2(d)
+
+def a():
+ \"\"\"some
+ stuff
+ \"\"\"
+ return "heh"
+
+test(d)
+
+d.expand(d.getVar("something", False))
+d.expand("${inexpand} somethingelse")
+d.getVar(a(), False)
+"""
+
+ def test_python(self):
+ self.d.setVar("FOO", self.pydata)
+ self.setEmptyVars(["inexpand", "a", "test2", "test"])
+ self.d.setVarFlags("FOO", {
+ "func": True,
+ "python": True,
+ "lineno": 1,
+ "filename": "example.bb",
+ })
+
+ deps, values = bb.data.build_dependencies("FOO", set(self.d.keys()), set(), set(), self.d)
+
+ self.assertEqual(deps, set(["somevar", "bar", "something", "inexpand", "test", "test2", "a"]))
+
+
+ shelldata = """
+foo () {
+bar
+}
+{
+echo baz
+$(heh)
+eval `moo`
+}
+a=b
+c=d
+(
+true && false
+test -f foo
+testval=something
+$testval
+) || aiee
+! inverted
+echo ${somevar}
+
+case foo in
+bar)
+echo bar
+;;
+baz)
+echo baz
+;;
+foo*)
+echo foo
+;;
+esac
+"""
+
+ def test_shell(self):
+ execs = ["bar", "echo", "heh", "moo", "true", "aiee"]
+ self.d.setVar("somevar", "heh")
+ self.d.setVar("inverted", "echo inverted...")
+ self.d.setVarFlag("inverted", "func", True)
+ self.d.setVar("FOO", self.shelldata)
+ self.d.setVarFlags("FOO", {"func": True})
+ self.setEmptyVars(execs)
+
+ deps, values = bb.data.build_dependencies("FOO", set(self.d.keys()), set(), set(), self.d)
+
+ self.assertEqual(deps, set(["somevar", "inverted"] + execs))
+
+
+ def test_vardeps(self):
+ self.d.setVar("oe_libinstall", "echo test")
+ self.d.setVar("FOO", "foo=oe_libinstall; eval $foo")
+ self.d.setVarFlag("FOO", "vardeps", "oe_libinstall")
+
+ deps, values = bb.data.build_dependencies("FOO", set(self.d.keys()), set(), set(), self.d)
+
+ self.assertEqual(deps, set(["oe_libinstall"]))
+
+ def test_vardeps_expand(self):
+ self.d.setVar("oe_libinstall", "echo test")
+ self.d.setVar("FOO", "foo=oe_libinstall; eval $foo")
+ self.d.setVarFlag("FOO", "vardeps", "${@'oe_libinstall'}")
+
+ deps, values = bb.data.build_dependencies("FOO", set(self.d.keys()), set(), set(), self.d)
+
+ self.assertEqual(deps, set(["oe_libinstall"]))
+
+ def test_contains_vardeps(self):
+ expr = '${@bb.utils.filter("TESTVAR", "somevalue anothervalue", d)} \
+ ${@bb.utils.contains("TESTVAR", "testval testval2", "yetanothervalue", "", d)} \
+ ${@bb.utils.contains("TESTVAR", "testval2 testval3", "blah", "", d)} \
+ ${@bb.utils.contains_any("TESTVAR", "testval2 testval3", "lastone", "", d)}'
+ parsedvar = self.d.expandWithRefs(expr, None)
+ # Check contains
+ self.assertEqual(parsedvar.contains, {'TESTVAR': {'testval2 testval3', 'anothervalue', 'somevalue', 'testval testval2', 'testval2', 'testval3'}})
+ # Check dependencies
+ self.d.setVar('ANOTHERVAR', expr)
+ self.d.setVar('TESTVAR', 'anothervalue testval testval2')
+ deps, values = bb.data.build_dependencies("ANOTHERVAR", set(self.d.keys()), set(), set(), self.d)
+ self.assertEqual(sorted(values.splitlines()),
+ sorted([expr,
+ 'TESTVAR{anothervalue} = Set',
+ 'TESTVAR{somevalue} = Unset',
+ 'TESTVAR{testval testval2} = Set',
+ 'TESTVAR{testval2 testval3} = Unset',
+ 'TESTVAR{testval2} = Set',
+ 'TESTVAR{testval3} = Unset'
+ ]))
+ # Check final value
+ self.assertEqual(self.d.getVar('ANOTHERVAR').split(), ['anothervalue', 'yetanothervalue', 'lastone'])
+
+ #Currently no wildcard support
+ #def test_vardeps_wildcards(self):
+ # self.d.setVar("oe_libinstall", "echo test")
+ # self.d.setVar("FOO", "foo=oe_libinstall; eval $foo")
+ # self.d.setVarFlag("FOO", "vardeps", "oe_*")
+ # self.assertEquals(deps, set(["oe_libinstall"]))
+
+
diff --git a/external/poky/bitbake/lib/bb/tests/cooker.py b/external/poky/bitbake/lib/bb/tests/cooker.py
new file mode 100644
index 00000000..2b442365
--- /dev/null
+++ b/external/poky/bitbake/lib/bb/tests/cooker.py
@@ -0,0 +1,83 @@
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# BitBake Tests for cooker.py
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+import unittest
+import tempfile
+import os
+import bb, bb.cooker
+import re
+import logging
+
+# Cooker tests
+class CookerTest(unittest.TestCase):
+ def setUp(self):
+ # At least one variable needs to be set
+ self.d = bb.data.init()
+ topdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "testdata/cooker")
+ self.d.setVar('TOPDIR', topdir)
+
+ def test_CookerCollectFiles_sublayers(self):
+ '''Test that a sublayer of an existing layer does not trigger
+ No bb files matched ...'''
+
+ def append_collection(topdir, path, d):
+ collection = path.split('/')[-1]
+ pattern = "^" + topdir + "/" + path + "/"
+ regex = re.compile(pattern)
+ priority = 5
+
+ d.setVar('BBFILE_COLLECTIONS', (d.getVar('BBFILE_COLLECTIONS') or "") + " " + collection)
+ d.setVar('BBFILE_PATTERN_%s' % (collection), pattern)
+ d.setVar('BBFILE_PRIORITY_%s' % (collection), priority)
+
+ return (collection, pattern, regex, priority)
+
+ topdir = self.d.getVar("TOPDIR")
+
+ # Priorities: list of (collection, pattern, regex, priority)
+ bbfile_config_priorities = []
+ # Order is important for this test, shortest to longest is typical failure case
+ bbfile_config_priorities.append( append_collection(topdir, 'first', self.d) )
+ bbfile_config_priorities.append( append_collection(topdir, 'second', self.d) )
+ bbfile_config_priorities.append( append_collection(topdir, 'second/third', self.d) )
+
+ pkgfns = [ topdir + '/first/recipes/sample1_1.0.bb',
+ topdir + '/second/recipes/sample2_1.0.bb',
+ topdir + '/second/third/recipes/sample3_1.0.bb' ]
+
+ class LogHandler(logging.Handler):
+ def __init__(self):
+ logging.Handler.__init__(self)
+ self.logdata = []
+
+ def emit(self, record):
+ self.logdata.append(record.getMessage())
+
+ # Move cooker to use my special logging
+ logger = bb.cooker.logger
+ log_handler = LogHandler()
+ logger.addHandler(log_handler)
+ collection = bb.cooker.CookerCollectFiles(bbfile_config_priorities)
+ collection.collection_priorities(pkgfns, self.d)
+ logger.removeHandler(log_handler)
+
+ # Should be empty (no generated messages)
+ expected = []
+
+ self.assertEqual(log_handler.logdata, expected)
diff --git a/external/poky/bitbake/lib/bb/tests/cow.py b/external/poky/bitbake/lib/bb/tests/cow.py
new file mode 100644
index 00000000..d149d84d
--- /dev/null
+++ b/external/poky/bitbake/lib/bb/tests/cow.py
@@ -0,0 +1,136 @@
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# BitBake Tests for Copy-on-Write (cow.py)
+#
+# Copyright 2006 Holger Freyther <freyther@handhelds.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+import unittest
+import os
+
+class COWTestCase(unittest.TestCase):
+ """
+ Test case for the COW module from mithro
+ """
+
+ def testGetSet(self):
+ """
+ Test and set
+ """
+ from bb.COW import COWDictBase
+ a = COWDictBase.copy()
+
+ self.assertEqual(False, 'a' in a)
+
+ a['a'] = 'a'
+ a['b'] = 'b'
+ self.assertEqual(True, 'a' in a)
+ self.assertEqual(True, 'b' in a)
+ self.assertEqual('a', a['a'] )
+ self.assertEqual('b', a['b'] )
+
+ def testCopyCopy(self):
+ """
+ Test the copy of copies
+ """
+
+ from bb.COW import COWDictBase
+
+ # create two COW dict 'instances'
+ b = COWDictBase.copy()
+ c = COWDictBase.copy()
+
+ # assign some keys to one instance, some keys to another
+ b['a'] = 10
+ b['c'] = 20
+ c['a'] = 30
+
+ # test separation of the two instances
+ self.assertEqual(False, 'c' in c)
+ self.assertEqual(30, c['a'])
+ self.assertEqual(10, b['a'])
+
+ # test copy
+ b_2 = b.copy()
+ c_2 = c.copy()
+
+ self.assertEqual(False, 'c' in c_2)
+ self.assertEqual(10, b_2['a'])
+
+ b_2['d'] = 40
+ self.assertEqual(False, 'd' in c_2)
+ self.assertEqual(True, 'd' in b_2)
+ self.assertEqual(40, b_2['d'])
+ self.assertEqual(False, 'd' in b)
+ self.assertEqual(False, 'd' in c)
+
+ c_2['d'] = 30
+ self.assertEqual(True, 'd' in c_2)
+ self.assertEqual(True, 'd' in b_2)
+ self.assertEqual(30, c_2['d'])
+ self.assertEqual(40, b_2['d'])
+ self.assertEqual(False, 'd' in b)
+ self.assertEqual(False, 'd' in c)
+
+ # test copy of the copy
+ c_3 = c_2.copy()
+ b_3 = b_2.copy()
+ b_3_2 = b_2.copy()
+
+ c_3['e'] = 4711
+ self.assertEqual(4711, c_3['e'])
+ self.assertEqual(False, 'e' in c_2)
+ self.assertEqual(False, 'e' in b_3)
+ self.assertEqual(False, 'e' in b_3_2)
+ self.assertEqual(False, 'e' in b_2)
+
+ b_3['e'] = 'viel'
+ self.assertEqual('viel', b_3['e'])
+ self.assertEqual(4711, c_3['e'])
+ self.assertEqual(False, 'e' in c_2)
+ self.assertEqual(True, 'e' in b_3)
+ self.assertEqual(False, 'e' in b_3_2)
+ self.assertEqual(False, 'e' in b_2)
+
+ def testCow(self):
+ from bb.COW import COWDictBase
+ c = COWDictBase.copy()
+ c['123'] = 1027
+ c['other'] = 4711
+ c['d'] = { 'abc' : 10, 'bcd' : 20 }
+
+ copy = c.copy()
+
+ self.assertEqual(1027, c['123'])
+ self.assertEqual(4711, c['other'])
+ self.assertEqual({'abc':10, 'bcd':20}, c['d'])
+ self.assertEqual(1027, copy['123'])
+ self.assertEqual(4711, copy['other'])
+ self.assertEqual({'abc':10, 'bcd':20}, copy['d'])
+
+ # cow it now
+ copy['123'] = 1028
+ copy['other'] = 4712
+ copy['d']['abc'] = 20
+
+
+ self.assertEqual(1027, c['123'])
+ self.assertEqual(4711, c['other'])
+ self.assertEqual({'abc':10, 'bcd':20}, c['d'])
+ self.assertEqual(1028, copy['123'])
+ self.assertEqual(4712, copy['other'])
+ self.assertEqual({'abc':20, 'bcd':20}, copy['d'])
diff --git a/external/poky/bitbake/lib/bb/tests/data.py b/external/poky/bitbake/lib/bb/tests/data.py
new file mode 100644
index 00000000..db3e2010
--- /dev/null
+++ b/external/poky/bitbake/lib/bb/tests/data.py
@@ -0,0 +1,676 @@
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# BitBake Tests for the Data Store (data.py/data_smart.py)
+#
+# Copyright (C) 2010 Chris Larson
+# Copyright (C) 2012 Richard Purdie
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+import unittest
+import bb
+import bb.data
+import bb.parse
+import logging
+
+class LogRecord():
+ def __enter__(self):
+ logs = []
+ class LogHandler(logging.Handler):
+ def emit(self, record):
+ logs.append(record)
+ logger = logging.getLogger("BitBake")
+ handler = LogHandler()
+ self.handler = handler
+ logger.addHandler(handler)
+ return logs
+ def __exit__(self, type, value, traceback):
+ logger = logging.getLogger("BitBake")
+ logger.removeHandler(self.handler)
+ return
+
+def logContains(item, logs):
+ for l in logs:
+ m = l.getMessage()
+ if item in m:
+ return True
+ return False
+
+class DataExpansions(unittest.TestCase):
+ def setUp(self):
+ self.d = bb.data.init()
+ self.d["foo"] = "value_of_foo"
+ self.d["bar"] = "value_of_bar"
+ self.d["value_of_foo"] = "value_of_'value_of_foo'"
+
+ def test_one_var(self):
+ val = self.d.expand("${foo}")
+ self.assertEqual(str(val), "value_of_foo")
+
+ def test_indirect_one_var(self):
+ val = self.d.expand("${${foo}}")
+ self.assertEqual(str(val), "value_of_'value_of_foo'")
+
+ def test_indirect_and_another(self):
+ val = self.d.expand("${${foo}} ${bar}")
+ self.assertEqual(str(val), "value_of_'value_of_foo' value_of_bar")
+
+ def test_python_snippet(self):
+ val = self.d.expand("${@5*12}")
+ self.assertEqual(str(val), "60")
+
+ def test_expand_in_python_snippet(self):
+ val = self.d.expand("${@'boo ' + '${foo}'}")
+ self.assertEqual(str(val), "boo value_of_foo")
+
+ def test_python_snippet_getvar(self):
+ val = self.d.expand("${@d.getVar('foo') + ' ${bar}'}")
+ self.assertEqual(str(val), "value_of_foo value_of_bar")
+
+ def test_python_unexpanded(self):
+ self.d.setVar("bar", "${unsetvar}")
+ val = self.d.expand("${@d.getVar('foo') + ' ${bar}'}")
+ self.assertEqual(str(val), "${@d.getVar('foo') + ' ${unsetvar}'}")
+
+ def test_python_snippet_syntax_error(self):
+ self.d.setVar("FOO", "${@foo = 5}")
+ self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True)
+
+ def test_python_snippet_runtime_error(self):
+ self.d.setVar("FOO", "${@int('test')}")
+ self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True)
+
+ def test_python_snippet_error_path(self):
+ self.d.setVar("FOO", "foo value ${BAR}")
+ self.d.setVar("BAR", "bar value ${@int('test')}")
+ self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True)
+
+ def test_value_containing_value(self):
+ val = self.d.expand("${@d.getVar('foo') + ' ${bar}'}")
+ self.assertEqual(str(val), "value_of_foo value_of_bar")
+
+ def test_reference_undefined_var(self):
+ val = self.d.expand("${undefinedvar} meh")
+ self.assertEqual(str(val), "${undefinedvar} meh")
+
+ def test_double_reference(self):
+ self.d.setVar("BAR", "bar value")
+ self.d.setVar("FOO", "${BAR} foo ${BAR}")
+ val = self.d.getVar("FOO")
+ self.assertEqual(str(val), "bar value foo bar value")
+
+ def test_direct_recursion(self):
+ self.d.setVar("FOO", "${FOO}")
+ self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True)
+
+ def test_indirect_recursion(self):
+ self.d.setVar("FOO", "${BAR}")
+ self.d.setVar("BAR", "${BAZ}")
+ self.d.setVar("BAZ", "${FOO}")
+ self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True)
+
+ def test_recursion_exception(self):
+ self.d.setVar("FOO", "${BAR}")
+ self.d.setVar("BAR", "${${@'FOO'}}")
+ self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True)
+
+ def test_incomplete_varexp_single_quotes(self):
+ self.d.setVar("FOO", "sed -i -e 's:IP{:I${:g' $pc")
+ val = self.d.getVar("FOO")
+ self.assertEqual(str(val), "sed -i -e 's:IP{:I${:g' $pc")
+
+ def test_nonstring(self):
+ self.d.setVar("TEST", 5)
+ val = self.d.getVar("TEST")
+ self.assertEqual(str(val), "5")
+
+ def test_rename(self):
+ self.d.renameVar("foo", "newfoo")
+ self.assertEqual(self.d.getVar("newfoo", False), "value_of_foo")
+ self.assertEqual(self.d.getVar("foo", False), None)
+
+ def test_deletion(self):
+ self.d.delVar("foo")
+ self.assertEqual(self.d.getVar("foo", False), None)
+
+ def test_keys(self):
+ keys = list(self.d.keys())
+ self.assertCountEqual(keys, ['value_of_foo', 'foo', 'bar'])
+
+ def test_keys_deletion(self):
+ newd = bb.data.createCopy(self.d)
+ newd.delVar("bar")
+ keys = list(newd.keys())
+ self.assertCountEqual(keys, ['value_of_foo', 'foo'])
+
+class TestNestedExpansions(unittest.TestCase):
+ def setUp(self):
+ self.d = bb.data.init()
+ self.d["foo"] = "foo"
+ self.d["bar"] = "bar"
+ self.d["value_of_foobar"] = "187"
+
+ def test_refs(self):
+ val = self.d.expand("${value_of_${foo}${bar}}")
+ self.assertEqual(str(val), "187")
+
+ #def test_python_refs(self):
+ # val = self.d.expand("${@${@3}**2 + ${@4}**2}")
+ # self.assertEqual(str(val), "25")
+
+ def test_ref_in_python_ref(self):
+ val = self.d.expand("${@'${foo}' + 'bar'}")
+ self.assertEqual(str(val), "foobar")
+
+ def test_python_ref_in_ref(self):
+ val = self.d.expand("${${@'f'+'o'+'o'}}")
+ self.assertEqual(str(val), "foo")
+
+ def test_deep_nesting(self):
+ depth = 100
+ val = self.d.expand("${" * depth + "foo" + "}" * depth)
+ self.assertEqual(str(val), "foo")
+
+ #def test_deep_python_nesting(self):
+ # depth = 50
+ # val = self.d.expand("${@" * depth + "1" + "+1}" * depth)
+ # self.assertEqual(str(val), str(depth + 1))
+
+ def test_mixed(self):
+ val = self.d.expand("${value_of_${@('${foo}'+'bar')[0:3]}${${@'BAR'.lower()}}}")
+ self.assertEqual(str(val), "187")
+
+ def test_runtime(self):
+ val = self.d.expand("${${@'value_of' + '_f'+'o'+'o'+'b'+'a'+'r'}}")
+ self.assertEqual(str(val), "187")
+
+class TestMemoize(unittest.TestCase):
+ def test_memoized(self):
+ d = bb.data.init()
+ d.setVar("FOO", "bar")
+ self.assertTrue(d.getVar("FOO", False) is d.getVar("FOO", False))
+
+ def test_not_memoized(self):
+ d1 = bb.data.init()
+ d2 = bb.data.init()
+ d1.setVar("FOO", "bar")
+ d2.setVar("FOO", "bar2")
+ self.assertTrue(d1.getVar("FOO", False) is not d2.getVar("FOO", False))
+
+ def test_changed_after_memoized(self):
+ d = bb.data.init()
+ d.setVar("foo", "value of foo")
+ self.assertEqual(str(d.getVar("foo", False)), "value of foo")
+ d.setVar("foo", "second value of foo")
+ self.assertEqual(str(d.getVar("foo", False)), "second value of foo")
+
+ def test_same_value(self):
+ d = bb.data.init()
+ d.setVar("foo", "value of")
+ d.setVar("bar", "value of")
+ self.assertEqual(d.getVar("foo", False),
+ d.getVar("bar", False))
+
+class TestConcat(unittest.TestCase):
+ def setUp(self):
+ self.d = bb.data.init()
+ self.d.setVar("FOO", "foo")
+ self.d.setVar("VAL", "val")
+ self.d.setVar("BAR", "bar")
+
+ def test_prepend(self):
+ self.d.setVar("TEST", "${VAL}")
+ self.d.prependVar("TEST", "${FOO}:")
+ self.assertEqual(self.d.getVar("TEST"), "foo:val")
+
+ def test_append(self):
+ self.d.setVar("TEST", "${VAL}")
+ self.d.appendVar("TEST", ":${BAR}")
+ self.assertEqual(self.d.getVar("TEST"), "val:bar")
+
+ def test_multiple_append(self):
+ self.d.setVar("TEST", "${VAL}")
+ self.d.prependVar("TEST", "${FOO}:")
+ self.d.appendVar("TEST", ":val2")
+ self.d.appendVar("TEST", ":${BAR}")
+ self.assertEqual(self.d.getVar("TEST"), "foo:val:val2:bar")
+
+class TestConcatOverride(unittest.TestCase):
+ def setUp(self):
+ self.d = bb.data.init()
+ self.d.setVar("FOO", "foo")
+ self.d.setVar("VAL", "val")
+ self.d.setVar("BAR", "bar")
+
+ def test_prepend(self):
+ self.d.setVar("TEST", "${VAL}")
+ self.d.setVar("TEST_prepend", "${FOO}:")
+ self.assertEqual(self.d.getVar("TEST"), "foo:val")
+
+ def test_append(self):
+ self.d.setVar("TEST", "${VAL}")
+ self.d.setVar("TEST_append", ":${BAR}")
+ self.assertEqual(self.d.getVar("TEST"), "val:bar")
+
+ def test_multiple_append(self):
+ self.d.setVar("TEST", "${VAL}")
+ self.d.setVar("TEST_prepend", "${FOO}:")
+ self.d.setVar("TEST_append", ":val2")
+ self.d.setVar("TEST_append", ":${BAR}")
+ self.assertEqual(self.d.getVar("TEST"), "foo:val:val2:bar")
+
+ def test_append_unset(self):
+ self.d.setVar("TEST_prepend", "${FOO}:")
+ self.d.setVar("TEST_append", ":val2")
+ self.d.setVar("TEST_append", ":${BAR}")
+ self.assertEqual(self.d.getVar("TEST"), "foo::val2:bar")
+
+ def test_remove(self):
+ self.d.setVar("TEST", "${VAL} ${BAR}")
+ self.d.setVar("TEST_remove", "val")
+ self.assertEqual(self.d.getVar("TEST"), " bar")
+
+ def test_remove_cleared(self):
+ self.d.setVar("TEST", "${VAL} ${BAR}")
+ self.d.setVar("TEST_remove", "val")
+ self.d.setVar("TEST", "${VAL} ${BAR}")
+ self.assertEqual(self.d.getVar("TEST"), "val bar")
+
+ # Ensure the value is unchanged if we have an inactive remove override
+ # (including that whitespace is preserved)
+ def test_remove_inactive_override(self):
+ self.d.setVar("TEST", "${VAL} ${BAR} 123")
+ self.d.setVar("TEST_remove_inactiveoverride", "val")
+ self.assertEqual(self.d.getVar("TEST"), "val bar 123")
+
+ def test_doubleref_remove(self):
+ self.d.setVar("TEST", "${VAL} ${BAR}")
+ self.d.setVar("TEST_remove", "val")
+ self.d.setVar("TEST_TEST", "${TEST} ${TEST}")
+ self.assertEqual(self.d.getVar("TEST_TEST"), " bar bar")
+
+ def test_empty_remove(self):
+ self.d.setVar("TEST", "")
+ self.d.setVar("TEST_remove", "val")
+ self.assertEqual(self.d.getVar("TEST"), "")
+
+ def test_remove_expansion(self):
+ self.d.setVar("BAR", "Z")
+ self.d.setVar("TEST", "${BAR}/X Y")
+ self.d.setVar("TEST_remove", "${BAR}/X")
+ self.assertEqual(self.d.getVar("TEST"), " Y")
+
+ def test_remove_expansion_items(self):
+ self.d.setVar("TEST", "A B C D")
+ self.d.setVar("BAR", "B D")
+ self.d.setVar("TEST_remove", "${BAR}")
+ self.assertEqual(self.d.getVar("TEST"), "A C ")
+
+ def test_remove_preserve_whitespace(self):
+ # When the removal isn't active, the original value should be preserved
+ self.d.setVar("TEST", " A B")
+ self.d.setVar("TEST_remove", "C")
+ self.assertEqual(self.d.getVar("TEST"), " A B")
+
+ def test_remove_preserve_whitespace2(self):
+ # When the removal is active preserve the whitespace
+ self.d.setVar("TEST", " A B")
+ self.d.setVar("TEST_remove", "B")
+ self.assertEqual(self.d.getVar("TEST"), " A ")
+
+class TestOverrides(unittest.TestCase):
+ def setUp(self):
+ self.d = bb.data.init()
+ self.d.setVar("OVERRIDES", "foo:bar:local")
+ self.d.setVar("TEST", "testvalue")
+
+ def test_no_override(self):
+ self.assertEqual(self.d.getVar("TEST"), "testvalue")
+
+ def test_one_override(self):
+ self.d.setVar("TEST_bar", "testvalue2")
+ self.assertEqual(self.d.getVar("TEST"), "testvalue2")
+
+ def test_one_override_unset(self):
+ self.d.setVar("TEST2_bar", "testvalue2")
+
+ self.assertEqual(self.d.getVar("TEST2"), "testvalue2")
+ self.assertCountEqual(list(self.d.keys()), ['TEST', 'TEST2', 'OVERRIDES', 'TEST2_bar'])
+
+ def test_multiple_override(self):
+ self.d.setVar("TEST_bar", "testvalue2")
+ self.d.setVar("TEST_local", "testvalue3")
+ self.d.setVar("TEST_foo", "testvalue4")
+ self.assertEqual(self.d.getVar("TEST"), "testvalue3")
+ self.assertCountEqual(list(self.d.keys()), ['TEST', 'TEST_foo', 'OVERRIDES', 'TEST_bar', 'TEST_local'])
+
+ def test_multiple_combined_overrides(self):
+ self.d.setVar("TEST_local_foo_bar", "testvalue3")
+ self.assertEqual(self.d.getVar("TEST"), "testvalue3")
+
+ def test_multiple_overrides_unset(self):
+ self.d.setVar("TEST2_local_foo_bar", "testvalue3")
+ self.assertEqual(self.d.getVar("TEST2"), "testvalue3")
+
+ def test_keyexpansion_override(self):
+ self.d.setVar("LOCAL", "local")
+ self.d.setVar("TEST_bar", "testvalue2")
+ self.d.setVar("TEST_${LOCAL}", "testvalue3")
+ self.d.setVar("TEST_foo", "testvalue4")
+ bb.data.expandKeys(self.d)
+ self.assertEqual(self.d.getVar("TEST"), "testvalue3")
+
+ def test_rename_override(self):
+ self.d.setVar("ALTERNATIVE_ncurses-tools_class-target", "a")
+ self.d.setVar("OVERRIDES", "class-target")
+ self.d.renameVar("ALTERNATIVE_ncurses-tools", "ALTERNATIVE_lib32-ncurses-tools")
+ self.assertEqual(self.d.getVar("ALTERNATIVE_lib32-ncurses-tools"), "a")
+
+ def test_underscore_override(self):
+ self.d.setVar("TEST_bar", "testvalue2")
+ self.d.setVar("TEST_some_val", "testvalue3")
+ self.d.setVar("TEST_foo", "testvalue4")
+ self.d.setVar("OVERRIDES", "foo:bar:some_val")
+ self.assertEqual(self.d.getVar("TEST"), "testvalue3")
+
+ def test_remove_with_override(self):
+ self.d.setVar("TEST_bar", "testvalue2")
+ self.d.setVar("TEST_some_val", "testvalue3 testvalue5")
+ self.d.setVar("TEST_some_val_remove", "testvalue3")
+ self.d.setVar("TEST_foo", "testvalue4")
+ self.d.setVar("OVERRIDES", "foo:bar:some_val")
+ self.assertEqual(self.d.getVar("TEST"), " testvalue5")
+
+
+class TestKeyExpansion(unittest.TestCase):
+ def setUp(self):
+ self.d = bb.data.init()
+ self.d.setVar("FOO", "foo")
+ self.d.setVar("BAR", "foo")
+
+ def test_keyexpand(self):
+ self.d.setVar("VAL_${FOO}", "A")
+ self.d.setVar("VAL_${BAR}", "B")
+ with LogRecord() as logs:
+ bb.data.expandKeys(self.d)
+ self.assertTrue(logContains("Variable key VAL_${FOO} (A) replaces original key VAL_foo (B)", logs))
+ self.assertEqual(self.d.getVar("VAL_foo"), "A")
+
+class TestFlags(unittest.TestCase):
+ def setUp(self):
+ self.d = bb.data.init()
+ self.d.setVar("foo", "value of foo")
+ self.d.setVarFlag("foo", "flag1", "value of flag1")
+ self.d.setVarFlag("foo", "flag2", "value of flag2")
+
+ def test_setflag(self):
+ self.assertEqual(self.d.getVarFlag("foo", "flag1", False), "value of flag1")
+ self.assertEqual(self.d.getVarFlag("foo", "flag2", False), "value of flag2")
+
+ def test_delflag(self):
+ self.d.delVarFlag("foo", "flag2")
+ self.assertEqual(self.d.getVarFlag("foo", "flag1", False), "value of flag1")
+ self.assertEqual(self.d.getVarFlag("foo", "flag2", False), None)
+
+
+class Contains(unittest.TestCase):
+ def setUp(self):
+ self.d = bb.data.init()
+ self.d.setVar("SOMEFLAG", "a b c")
+
+ def test_contains(self):
+ self.assertTrue(bb.utils.contains("SOMEFLAG", "a", True, False, self.d))
+ self.assertTrue(bb.utils.contains("SOMEFLAG", "b", True, False, self.d))
+ self.assertTrue(bb.utils.contains("SOMEFLAG", "c", True, False, self.d))
+
+ self.assertTrue(bb.utils.contains("SOMEFLAG", "a b", True, False, self.d))
+ self.assertTrue(bb.utils.contains("SOMEFLAG", "b c", True, False, self.d))
+ self.assertTrue(bb.utils.contains("SOMEFLAG", "c a", True, False, self.d))
+
+ self.assertTrue(bb.utils.contains("SOMEFLAG", "a b c", True, False, self.d))
+ self.assertTrue(bb.utils.contains("SOMEFLAG", "c b a", True, False, self.d))
+
+ self.assertFalse(bb.utils.contains("SOMEFLAG", "x", True, False, self.d))
+ self.assertFalse(bb.utils.contains("SOMEFLAG", "a x", True, False, self.d))
+ self.assertFalse(bb.utils.contains("SOMEFLAG", "x c b", True, False, self.d))
+ self.assertFalse(bb.utils.contains("SOMEFLAG", "x c b a", True, False, self.d))
+
+ def test_contains_any(self):
+ self.assertTrue(bb.utils.contains_any("SOMEFLAG", "a", True, False, self.d))
+ self.assertTrue(bb.utils.contains_any("SOMEFLAG", "b", True, False, self.d))
+ self.assertTrue(bb.utils.contains_any("SOMEFLAG", "c", True, False, self.d))
+
+ self.assertTrue(bb.utils.contains_any("SOMEFLAG", "a b", True, False, self.d))
+ self.assertTrue(bb.utils.contains_any("SOMEFLAG", "b c", True, False, self.d))
+ self.assertTrue(bb.utils.contains_any("SOMEFLAG", "c a", True, False, self.d))
+
+ self.assertTrue(bb.utils.contains_any("SOMEFLAG", "a x", True, False, self.d))
+ self.assertTrue(bb.utils.contains_any("SOMEFLAG", "x c", True, False, self.d))
+
+ self.assertFalse(bb.utils.contains_any("SOMEFLAG", "x", True, False, self.d))
+ self.assertFalse(bb.utils.contains_any("SOMEFLAG", "x y z", True, False, self.d))
+
+
+class TaskHash(unittest.TestCase):
+ def test_taskhashes(self):
+ def gettask_bashhash(taskname, d):
+ tasklist, gendeps, lookupcache = bb.data.generate_dependencies(d)
+ taskdeps, basehash = bb.data.generate_dependency_hash(tasklist, gendeps, lookupcache, set(), "somefile")
+ bb.warn(str(lookupcache))
+ return basehash["somefile." + taskname]
+
+ d = bb.data.init()
+ d.setVar("__BBTASKS", ["mytask"])
+ d.setVar("__exportlist", [])
+ d.setVar("mytask", "${MYCOMMAND}")
+ d.setVar("MYCOMMAND", "${VAR}; foo; bar; exit 0")
+ d.setVar("VAR", "val")
+ orighash = gettask_bashhash("mytask", d)
+
+ # Changing a variable should change the hash
+ d.setVar("VAR", "val2")
+ nexthash = gettask_bashhash("mytask", d)
+ self.assertNotEqual(orighash, nexthash)
+
+ d.setVar("VAR", "val")
+ # Adding an inactive removal shouldn't change the hash
+ d.setVar("BAR", "notbar")
+ d.setVar("MYCOMMAND_remove", "${BAR}")
+ nexthash = gettask_bashhash("mytask", d)
+ self.assertEqual(orighash, nexthash)
+
+ # Adding an active removal should change the hash
+ d.setVar("BAR", "bar;")
+ nexthash = gettask_bashhash("mytask", d)
+ self.assertNotEqual(orighash, nexthash)
+
+ # Setup an inactive contains()
+ d.setVar("VAR", "${@bb.utils.contains('VAR2', 'A', 'val', '', d)}")
+ orighash = gettask_bashhash("mytask", d)
+
+ # Activate the contains() and the hash should change
+ d.setVar("VAR2", "A")
+ nexthash = gettask_bashhash("mytask", d)
+ self.assertNotEqual(orighash, nexthash)
+
+ # The contains should be inactive but even though VAR2 has a
+ # different value the hash should match the original
+ d.setVar("VAR2", "B")
+ nexthash = gettask_bashhash("mytask", d)
+ self.assertEqual(orighash, nexthash)
+
+class Serialize(unittest.TestCase):
+
+ def test_serialize(self):
+ import tempfile
+ import pickle
+ d = bb.data.init()
+ d.enableTracking()
+ d.setVar('HELLO', 'world')
+ d.setVarFlag('HELLO', 'other', 'planet')
+ with tempfile.NamedTemporaryFile(delete=False) as tmpfile:
+ tmpfilename = tmpfile.name
+ pickle.dump(d, tmpfile)
+
+ with open(tmpfilename, 'rb') as f:
+ newd = pickle.load(f)
+
+ os.remove(tmpfilename)
+
+ self.assertEqual(d, newd)
+ self.assertEqual(newd.getVar('HELLO'), 'world')
+ self.assertEqual(newd.getVarFlag('HELLO', 'other'), 'planet')
+
+
+# Remote datastore tests
+# These really only test the interface, since in actual usage we have a
+# tinfoil connector that does everything over RPC, and this doesn't test
+# that.
+
+class TestConnector:
+ d = None
+ def __init__(self, d):
+ self.d = d
+ def getVar(self, name):
+ return self.d._findVar(name)
+ def getKeys(self):
+ return set(self.d.keys())
+ def getVarHistory(self, name):
+ return self.d.varhistory.variable(name)
+ def expandPythonRef(self, varname, expr, d):
+ localdata = self.d.createCopy()
+ for key in d.localkeys():
+ localdata.setVar(d.getVar(key))
+ varparse = bb.data_smart.VariableParse(varname, localdata)
+ return varparse.python_sub(expr)
+ def setVar(self, name, value):
+ self.d.setVar(name, value)
+ def setVarFlag(self, name, flag, value):
+ self.d.setVarFlag(name, flag, value)
+ def delVar(self, name):
+ self.d.delVar(name)
+ return False
+ def delVarFlag(self, name, flag):
+ self.d.delVarFlag(name, flag)
+ return False
+ def renameVar(self, name, newname):
+ self.d.renameVar(name, newname)
+ return False
+
+class Remote(unittest.TestCase):
+ def test_remote(self):
+
+ d1 = bb.data.init()
+ d1.enableTracking()
+ d2 = bb.data.init()
+ d2.enableTracking()
+ connector = TestConnector(d1)
+
+ d2.setVar('_remote_data', connector)
+
+ d1.setVar('HELLO', 'world')
+ d1.setVarFlag('OTHER', 'flagname', 'flagvalue')
+ self.assertEqual(d2.getVar('HELLO'), 'world')
+ self.assertEqual(d2.expand('${HELLO}'), 'world')
+ self.assertEqual(d2.expand('${@d.getVar("HELLO")}'), 'world')
+ self.assertIn('flagname', d2.getVarFlags('OTHER'))
+ self.assertEqual(d2.getVarFlag('OTHER', 'flagname'), 'flagvalue')
+ self.assertEqual(d1.varhistory.variable('HELLO'), d2.varhistory.variable('HELLO'))
+ # Test setVar on client side affects server
+ d2.setVar('HELLO', 'other-world')
+ self.assertEqual(d1.getVar('HELLO'), 'other-world')
+ # Test setVarFlag on client side affects server
+ d2.setVarFlag('HELLO', 'flagname', 'flagvalue')
+ self.assertEqual(d1.getVarFlag('HELLO', 'flagname'), 'flagvalue')
+ # Test client side data is incorporated in python expansion (which is done on server)
+ d2.setVar('FOO', 'bar')
+ self.assertEqual(d2.expand('${@d.getVar("FOO")}'), 'bar')
+ # Test overrides work
+ d1.setVar('FOO_test', 'baz')
+ d1.appendVar('OVERRIDES', ':test')
+ self.assertEqual(d2.getVar('FOO'), 'baz')
+
+
+# Remote equivalents of local test classes
+# Note that these aren't perfect since we only test in one direction
+
+class RemoteDataExpansions(DataExpansions):
+ def setUp(self):
+ self.d1 = bb.data.init()
+ self.d = bb.data.init()
+ self.d1["foo"] = "value_of_foo"
+ self.d1["bar"] = "value_of_bar"
+ self.d1["value_of_foo"] = "value_of_'value_of_foo'"
+ connector = TestConnector(self.d1)
+ self.d.setVar('_remote_data', connector)
+
+class TestRemoteNestedExpansions(TestNestedExpansions):
+ def setUp(self):
+ self.d1 = bb.data.init()
+ self.d = bb.data.init()
+ self.d1["foo"] = "foo"
+ self.d1["bar"] = "bar"
+ self.d1["value_of_foobar"] = "187"
+ connector = TestConnector(self.d1)
+ self.d.setVar('_remote_data', connector)
+
+class TestRemoteConcat(TestConcat):
+ def setUp(self):
+ self.d1 = bb.data.init()
+ self.d = bb.data.init()
+ self.d1.setVar("FOO", "foo")
+ self.d1.setVar("VAL", "val")
+ self.d1.setVar("BAR", "bar")
+ connector = TestConnector(self.d1)
+ self.d.setVar('_remote_data', connector)
+
+class TestRemoteConcatOverride(TestConcatOverride):
+ def setUp(self):
+ self.d1 = bb.data.init()
+ self.d = bb.data.init()
+ self.d1.setVar("FOO", "foo")
+ self.d1.setVar("VAL", "val")
+ self.d1.setVar("BAR", "bar")
+ connector = TestConnector(self.d1)
+ self.d.setVar('_remote_data', connector)
+
+class TestRemoteOverrides(TestOverrides):
+ def setUp(self):
+ self.d1 = bb.data.init()
+ self.d = bb.data.init()
+ self.d1.setVar("OVERRIDES", "foo:bar:local")
+ self.d1.setVar("TEST", "testvalue")
+ connector = TestConnector(self.d1)
+ self.d.setVar('_remote_data', connector)
+
+class TestRemoteKeyExpansion(TestKeyExpansion):
+ def setUp(self):
+ self.d1 = bb.data.init()
+ self.d = bb.data.init()
+ self.d1.setVar("FOO", "foo")
+ self.d1.setVar("BAR", "foo")
+ connector = TestConnector(self.d1)
+ self.d.setVar('_remote_data', connector)
+
+class TestRemoteFlags(TestFlags):
+ def setUp(self):
+ self.d1 = bb.data.init()
+ self.d = bb.data.init()
+ self.d1.setVar("foo", "value of foo")
+ self.d1.setVarFlag("foo", "flag1", "value of flag1")
+ self.d1.setVarFlag("foo", "flag2", "value of flag2")
+ connector = TestConnector(self.d1)
+ self.d.setVar('_remote_data', connector)
diff --git a/external/poky/bitbake/lib/bb/tests/event.py b/external/poky/bitbake/lib/bb/tests/event.py
new file mode 100644
index 00000000..d3a5f626
--- /dev/null
+++ b/external/poky/bitbake/lib/bb/tests/event.py
@@ -0,0 +1,986 @@
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# BitBake Tests for the Event implementation (event.py)
+#
+# Copyright (C) 2017 Intel Corporation
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+import unittest
+import bb
+import logging
+import bb.compat
+import bb.event
+import importlib
+import threading
+import time
+import pickle
+from unittest.mock import Mock
+from unittest.mock import call
+from bb.msg import BBLogFormatter
+
+
+class EventQueueStubBase(object):
+ """ Base class for EventQueueStub classes """
+ def __init__(self):
+ self.event_calls = []
+ return
+
+ def _store_event_data_string(self, event):
+ if isinstance(event, logging.LogRecord):
+ formatter = BBLogFormatter("%(levelname)s: %(message)s")
+ self.event_calls.append(formatter.format(event))
+ else:
+ self.event_calls.append(bb.event.getName(event))
+ return
+
+
+class EventQueueStub(EventQueueStubBase):
+ """ Class used as specification for UI event handler queue stub objects """
+ def __init__(self):
+ super(EventQueueStub, self).__init__()
+
+ def send(self, event):
+ super(EventQueueStub, self)._store_event_data_string(event)
+
+
+class PickleEventQueueStub(EventQueueStubBase):
+ """ Class used as specification for UI event handler queue stub objects
+ with sendpickle method """
+ def __init__(self):
+ super(PickleEventQueueStub, self).__init__()
+
+ def sendpickle(self, pickled_event):
+ event = pickle.loads(pickled_event)
+ super(PickleEventQueueStub, self)._store_event_data_string(event)
+
+
+class UIClientStub(object):
+ """ Class used as specification for UI event handler stub objects """
+ def __init__(self):
+ self.event = None
+
+
+class EventHandlingTest(unittest.TestCase):
+ """ Event handling test class """
+
+
+ def setUp(self):
+ self._test_process = Mock()
+ ui_client1 = UIClientStub()
+ ui_client2 = UIClientStub()
+ self._test_ui1 = Mock(wraps=ui_client1)
+ self._test_ui2 = Mock(wraps=ui_client2)
+ importlib.reload(bb.event)
+
+ def _create_test_handlers(self):
+ """ Method used to create a test handler ordered dictionary """
+ test_handlers = bb.compat.OrderedDict()
+ test_handlers["handler1"] = self._test_process.handler1
+ test_handlers["handler2"] = self._test_process.handler2
+ return test_handlers
+
+ def test_class_handlers(self):
+ """ Test set_class_handlers and get_class_handlers methods """
+ test_handlers = self._create_test_handlers()
+ bb.event.set_class_handlers(test_handlers)
+ self.assertEqual(test_handlers,
+ bb.event.get_class_handlers())
+
+ def test_handlers(self):
+ """ Test set_handlers and get_handlers """
+ test_handlers = self._create_test_handlers()
+ bb.event.set_handlers(test_handlers)
+ self.assertEqual(test_handlers,
+ bb.event.get_handlers())
+
+ def test_clean_class_handlers(self):
+ """ Test clean_class_handlers method """
+ cleanDict = bb.compat.OrderedDict()
+ self.assertEqual(cleanDict,
+ bb.event.clean_class_handlers())
+
+ def test_register(self):
+ """ Test register method for class handlers """
+ result = bb.event.register("handler", self._test_process.handler)
+ self.assertEqual(result, bb.event.Registered)
+ handlers_dict = bb.event.get_class_handlers()
+ self.assertIn("handler", handlers_dict)
+
+ def test_already_registered(self):
+ """ Test detection of an already registed class handler """
+ bb.event.register("handler", self._test_process.handler)
+ handlers_dict = bb.event.get_class_handlers()
+ self.assertIn("handler", handlers_dict)
+ result = bb.event.register("handler", self._test_process.handler)
+ self.assertEqual(result, bb.event.AlreadyRegistered)
+
+ def test_register_from_string(self):
+ """ Test register method receiving code in string """
+ result = bb.event.register("string_handler", " return True")
+ self.assertEqual(result, bb.event.Registered)
+ handlers_dict = bb.event.get_class_handlers()
+ self.assertIn("string_handler", handlers_dict)
+
+ def test_register_with_mask(self):
+ """ Test register method with event masking """
+ mask = ["bb.event.OperationStarted",
+ "bb.event.OperationCompleted"]
+ result = bb.event.register("event_handler",
+ self._test_process.event_handler,
+ mask)
+ self.assertEqual(result, bb.event.Registered)
+ handlers_dict = bb.event.get_class_handlers()
+ self.assertIn("event_handler", handlers_dict)
+
+ def test_remove(self):
+ """ Test remove method for class handlers """
+ test_handlers = self._create_test_handlers()
+ bb.event.set_class_handlers(test_handlers)
+ count = len(test_handlers)
+ bb.event.remove("handler1", None)
+ test_handlers = bb.event.get_class_handlers()
+ self.assertEqual(len(test_handlers), count - 1)
+ with self.assertRaises(KeyError):
+ bb.event.remove("handler1", None)
+
+ def test_execute_handler(self):
+ """ Test execute_handler method for class handlers """
+ mask = ["bb.event.OperationProgress"]
+ result = bb.event.register("event_handler",
+ self._test_process.event_handler,
+ mask)
+ self.assertEqual(result, bb.event.Registered)
+ event = bb.event.OperationProgress(current=10, total=100)
+ bb.event.execute_handler("event_handler",
+ self._test_process.event_handler,
+ event,
+ None)
+ self._test_process.event_handler.assert_called_once_with(event)
+
+ def test_fire_class_handlers(self):
+ """ Test fire_class_handlers method """
+ mask = ["bb.event.OperationStarted"]
+ result = bb.event.register("event_handler1",
+ self._test_process.event_handler1,
+ mask)
+ self.assertEqual(result, bb.event.Registered)
+ result = bb.event.register("event_handler2",
+ self._test_process.event_handler2,
+ "*")
+ self.assertEqual(result, bb.event.Registered)
+ event1 = bb.event.OperationStarted()
+ event2 = bb.event.OperationCompleted(total=123)
+ bb.event.fire_class_handlers(event1, None)
+ bb.event.fire_class_handlers(event2, None)
+ bb.event.fire_class_handlers(event2, None)
+ expected_event_handler1 = [call(event1)]
+ expected_event_handler2 = [call(event1),
+ call(event2),
+ call(event2)]
+ self.assertEqual(self._test_process.event_handler1.call_args_list,
+ expected_event_handler1)
+ self.assertEqual(self._test_process.event_handler2.call_args_list,
+ expected_event_handler2)
+
+ def test_class_handler_filters(self):
+ """ Test filters for class handlers """
+ mask = ["bb.event.OperationStarted"]
+ result = bb.event.register("event_handler1",
+ self._test_process.event_handler1,
+ mask)
+ self.assertEqual(result, bb.event.Registered)
+ result = bb.event.register("event_handler2",
+ self._test_process.event_handler2,
+ "*")
+ self.assertEqual(result, bb.event.Registered)
+ bb.event.set_eventfilter(
+ lambda name, handler, event, d :
+ name == 'event_handler2' and
+ bb.event.getName(event) == "OperationStarted")
+ event1 = bb.event.OperationStarted()
+ event2 = bb.event.OperationCompleted(total=123)
+ bb.event.fire_class_handlers(event1, None)
+ bb.event.fire_class_handlers(event2, None)
+ bb.event.fire_class_handlers(event2, None)
+ expected_event_handler1 = []
+ expected_event_handler2 = [call(event1)]
+ self.assertEqual(self._test_process.event_handler1.call_args_list,
+ expected_event_handler1)
+ self.assertEqual(self._test_process.event_handler2.call_args_list,
+ expected_event_handler2)
+
+ def test_change_handler_event_mapping(self):
+ """ Test changing the event mapping for class handlers """
+ event1 = bb.event.OperationStarted()
+ event2 = bb.event.OperationCompleted(total=123)
+
+ # register handler for all events
+ result = bb.event.register("event_handler1",
+ self._test_process.event_handler1,
+ "*")
+ self.assertEqual(result, bb.event.Registered)
+ bb.event.fire_class_handlers(event1, None)
+ bb.event.fire_class_handlers(event2, None)
+ expected = [call(event1), call(event2)]
+ self.assertEqual(self._test_process.event_handler1.call_args_list,
+ expected)
+
+ # unregister handler and register it only for OperationStarted
+ bb.event.remove("event_handler1",
+ self._test_process.event_handler1)
+ mask = ["bb.event.OperationStarted"]
+ result = bb.event.register("event_handler1",
+ self._test_process.event_handler1,
+ mask)
+ self.assertEqual(result, bb.event.Registered)
+ bb.event.fire_class_handlers(event1, None)
+ bb.event.fire_class_handlers(event2, None)
+ expected = [call(event1), call(event2), call(event1)]
+ self.assertEqual(self._test_process.event_handler1.call_args_list,
+ expected)
+
+ # unregister handler and register it only for OperationCompleted
+ bb.event.remove("event_handler1",
+ self._test_process.event_handler1)
+ mask = ["bb.event.OperationCompleted"]
+ result = bb.event.register("event_handler1",
+ self._test_process.event_handler1,
+ mask)
+ self.assertEqual(result, bb.event.Registered)
+ bb.event.fire_class_handlers(event1, None)
+ bb.event.fire_class_handlers(event2, None)
+ expected = [call(event1), call(event2), call(event1), call(event2)]
+ self.assertEqual(self._test_process.event_handler1.call_args_list,
+ expected)
+
+ def test_register_UIHhandler(self):
+ """ Test register_UIHhandler method """
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ self.assertEqual(result, 1)
+
+ def test_UIHhandler_already_registered(self):
+ """ Test registering an UIHhandler already existing """
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ self.assertEqual(result, 1)
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ self.assertEqual(result, 2)
+
+ def test_unregister_UIHhandler(self):
+ """ Test unregister_UIHhandler method """
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ self.assertEqual(result, 1)
+ result = bb.event.unregister_UIHhandler(1)
+ self.assertIs(result, None)
+
+ def test_fire_ui_handlers(self):
+ """ Test fire_ui_handlers method """
+ self._test_ui1.event = Mock(spec_set=EventQueueStub)
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ self.assertEqual(result, 1)
+ self._test_ui2.event = Mock(spec_set=PickleEventQueueStub)
+ result = bb.event.register_UIHhandler(self._test_ui2, mainui=True)
+ self.assertEqual(result, 2)
+ event1 = bb.event.OperationStarted()
+ bb.event.fire_ui_handlers(event1, None)
+ expected = [call(event1)]
+ self.assertEqual(self._test_ui1.event.send.call_args_list,
+ expected)
+ expected = [call(pickle.dumps(event1))]
+ self.assertEqual(self._test_ui2.event.sendpickle.call_args_list,
+ expected)
+
+ def test_ui_handler_mask_filter(self):
+ """ Test filters for UI handlers """
+ mask = ["bb.event.OperationStarted"]
+ debug_domains = {}
+ self._test_ui1.event = Mock(spec_set=EventQueueStub)
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ bb.event.set_UIHmask(result, logging.INFO, debug_domains, mask)
+ self._test_ui2.event = Mock(spec_set=PickleEventQueueStub)
+ result = bb.event.register_UIHhandler(self._test_ui2, mainui=True)
+ bb.event.set_UIHmask(result, logging.INFO, debug_domains, mask)
+
+ event1 = bb.event.OperationStarted()
+ event2 = bb.event.OperationCompleted(total=1)
+
+ bb.event.fire_ui_handlers(event1, None)
+ bb.event.fire_ui_handlers(event2, None)
+ expected = [call(event1)]
+ self.assertEqual(self._test_ui1.event.send.call_args_list,
+ expected)
+ expected = [call(pickle.dumps(event1))]
+ self.assertEqual(self._test_ui2.event.sendpickle.call_args_list,
+ expected)
+
+ def test_ui_handler_log_filter(self):
+ """ Test log filters for UI handlers """
+ mask = ["*"]
+ debug_domains = {'BitBake.Foo': logging.WARNING}
+
+ self._test_ui1.event = EventQueueStub()
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ bb.event.set_UIHmask(result, logging.ERROR, debug_domains, mask)
+ self._test_ui2.event = PickleEventQueueStub()
+ result = bb.event.register_UIHhandler(self._test_ui2, mainui=True)
+ bb.event.set_UIHmask(result, logging.ERROR, debug_domains, mask)
+
+ event1 = bb.event.OperationStarted()
+ bb.event.fire_ui_handlers(event1, None) # All events match
+
+ event_log_handler = bb.event.LogHandler()
+ logger = logging.getLogger("BitBake")
+ logger.addHandler(event_log_handler)
+ logger1 = logging.getLogger("BitBake.Foo")
+ logger1.warning("Test warning LogRecord1") # Matches debug_domains level
+ logger1.info("Test info LogRecord") # Filtered out
+ logger2 = logging.getLogger("BitBake.Bar")
+ logger2.error("Test error LogRecord") # Matches filter base level
+ logger2.warning("Test warning LogRecord2") # Filtered out
+ logger.removeHandler(event_log_handler)
+
+ expected = ['OperationStarted',
+ 'WARNING: Test warning LogRecord1',
+ 'ERROR: Test error LogRecord']
+ self.assertEqual(self._test_ui1.event.event_calls, expected)
+ self.assertEqual(self._test_ui2.event.event_calls, expected)
+
+ def test_fire(self):
+ """ Test fire method used to trigger class and ui event handlers """
+ mask = ["bb.event.ConfigParsed"]
+ result = bb.event.register("event_handler1",
+ self._test_process.event_handler1,
+ mask)
+
+ self._test_ui1.event = Mock(spec_set=EventQueueStub)
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ self.assertEqual(result, 1)
+
+ event1 = bb.event.ConfigParsed()
+ bb.event.fire(event1, None)
+ expected = [call(event1)]
+ self.assertEqual(self._test_process.event_handler1.call_args_list,
+ expected)
+ self.assertEqual(self._test_ui1.event.send.call_args_list,
+ expected)
+
+ def test_fire_from_worker(self):
+ """ Test fire_from_worker method """
+ self._test_ui1.event = Mock(spec_set=EventQueueStub)
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ self.assertEqual(result, 1)
+ event1 = bb.event.ConfigParsed()
+ bb.event.fire_from_worker(event1, None)
+ expected = [call(event1)]
+ self.assertEqual(self._test_ui1.event.send.call_args_list,
+ expected)
+
+ def test_worker_fire(self):
+ """ Test the triggering of bb.event.worker_fire callback """
+ bb.event.worker_fire = Mock()
+ event = bb.event.Event()
+ bb.event.fire(event, None)
+ expected = [call(event, None)]
+ self.assertEqual(bb.event.worker_fire.call_args_list, expected)
+
+ def test_print_ui_queue(self):
+ """ Test print_ui_queue method """
+ event1 = bb.event.OperationStarted()
+ event2 = bb.event.OperationCompleted(total=123)
+ bb.event.fire(event1, None)
+ bb.event.fire(event2, None)
+ event_log_handler = bb.event.LogHandler()
+ logger = logging.getLogger("BitBake")
+ logger.addHandler(event_log_handler)
+ logger.info("Test info LogRecord")
+ logger.warning("Test warning LogRecord")
+ with self.assertLogs("BitBake", level="INFO") as cm:
+ bb.event.print_ui_queue()
+ logger.removeHandler(event_log_handler)
+ self.assertEqual(cm.output,
+ ["INFO:BitBake:Test info LogRecord",
+ "WARNING:BitBake:Test warning LogRecord"])
+
+ def _set_threadlock_test_mockups(self):
+ """ Create UI event handler mockups used in enable and disable
+ threadlock tests """
+ def ui1_event_send(event):
+ if type(event) is bb.event.ConfigParsed:
+ self._threadlock_test_calls.append("w1_ui1")
+ if type(event) is bb.event.OperationStarted:
+ self._threadlock_test_calls.append("w2_ui1")
+ time.sleep(2)
+
+ def ui2_event_send(event):
+ if type(event) is bb.event.ConfigParsed:
+ self._threadlock_test_calls.append("w1_ui2")
+ if type(event) is bb.event.OperationStarted:
+ self._threadlock_test_calls.append("w2_ui2")
+ time.sleep(2)
+
+ self._threadlock_test_calls = []
+ self._test_ui1.event = EventQueueStub()
+ self._test_ui1.event.send = ui1_event_send
+ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True)
+ self.assertEqual(result, 1)
+ self._test_ui2.event = EventQueueStub()
+ self._test_ui2.event.send = ui2_event_send
+ result = bb.event.register_UIHhandler(self._test_ui2, mainui=True)
+ self.assertEqual(result, 2)
+
+ def _set_and_run_threadlock_test_workers(self):
+ """ Create and run the workers used to trigger events in enable and
+ disable threadlock tests """
+ worker1 = threading.Thread(target=self._thread_lock_test_worker1)
+ worker2 = threading.Thread(target=self._thread_lock_test_worker2)
+ worker1.start()
+ time.sleep(1)
+ worker2.start()
+ worker1.join()
+ worker2.join()
+
+ def _thread_lock_test_worker1(self):
+ """ First worker used to fire the ConfigParsed event for enable and
+ disable threadlocks tests """
+ bb.event.fire(bb.event.ConfigParsed(), None)
+
+ def _thread_lock_test_worker2(self):
+ """ Second worker used to fire the OperationStarted event for enable
+ and disable threadlocks tests """
+ bb.event.fire(bb.event.OperationStarted(), None)
+
+ def test_enable_threadlock(self):
+ """ Test enable_threadlock method """
+ self._set_threadlock_test_mockups()
+ bb.event.enable_threadlock()
+ self._set_and_run_threadlock_test_workers()
+ # Calls to UI handlers should be in order as all the registered
+ # handlers for the event coming from the first worker should be
+ # called before processing the event from the second worker.
+ self.assertEqual(self._threadlock_test_calls,
+ ["w1_ui1", "w1_ui2", "w2_ui1", "w2_ui2"])
+
+
+ def test_disable_threadlock(self):
+ """ Test disable_threadlock method """
+ self._set_threadlock_test_mockups()
+ bb.event.disable_threadlock()
+ self._set_and_run_threadlock_test_workers()
+ # Calls to UI handlers should be intertwined together. Thanks to the
+ # delay in the registered handlers for the event coming from the first
+ # worker, the event coming from the second worker starts being
+ # processed before finishing handling the first worker event.
+ self.assertEqual(self._threadlock_test_calls,
+ ["w1_ui1", "w2_ui1", "w1_ui2", "w2_ui2"])
+
+
+class EventClassesTest(unittest.TestCase):
+ """ Event classes test class """
+
+ _worker_pid = 54321
+
+ def setUp(self):
+ bb.event.worker_pid = EventClassesTest._worker_pid
+
+ def test_Event(self):
+ """ Test the Event base class """
+ event = bb.event.Event()
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_HeartbeatEvent(self):
+ """ Test the HeartbeatEvent class """
+ time = 10
+ event = bb.event.HeartbeatEvent(time)
+ self.assertEqual(event.time, time)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_OperationStarted(self):
+ """ Test OperationStarted event class """
+ msg = "Foo Bar"
+ event = bb.event.OperationStarted(msg)
+ self.assertEqual(event.msg, msg)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_OperationCompleted(self):
+ """ Test OperationCompleted event class """
+ msg = "Foo Bar"
+ total = 123
+ event = bb.event.OperationCompleted(total, msg)
+ self.assertEqual(event.msg, msg)
+ self.assertEqual(event.total, total)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_OperationProgress(self):
+ """ Test OperationProgress event class """
+ msg = "Foo Bar"
+ total = 123
+ current = 111
+ event = bb.event.OperationProgress(current, total, msg)
+ self.assertEqual(event.msg, msg + ": %s/%s" % (current, total))
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_ConfigParsed(self):
+ """ Test the ConfigParsed class """
+ event = bb.event.ConfigParsed()
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_MultiConfigParsed(self):
+ """ Test MultiConfigParsed event class """
+ mcdata = {"foobar": "Foo Bar"}
+ event = bb.event.MultiConfigParsed(mcdata)
+ self.assertEqual(event.mcdata, mcdata)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_RecipeEvent(self):
+ """ Test RecipeEvent event base class """
+ callback = lambda a: 2 * a
+ event = bb.event.RecipeEvent(callback)
+ self.assertEqual(event.fn(1), callback(1))
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_RecipePreFinalise(self):
+ """ Test RecipePreFinalise event class """
+ callback = lambda a: 2 * a
+ event = bb.event.RecipePreFinalise(callback)
+ self.assertEqual(event.fn(1), callback(1))
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_RecipeTaskPreProcess(self):
+ """ Test RecipeTaskPreProcess event class """
+ callback = lambda a: 2 * a
+ tasklist = [("foobar", callback)]
+ event = bb.event.RecipeTaskPreProcess(callback, tasklist)
+ self.assertEqual(event.fn(1), callback(1))
+ self.assertEqual(event.tasklist, tasklist)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_RecipeParsed(self):
+ """ Test RecipeParsed event base class """
+ callback = lambda a: 2 * a
+ event = bb.event.RecipeParsed(callback)
+ self.assertEqual(event.fn(1), callback(1))
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_StampUpdate(self):
+ targets = ["foo", "bar"]
+ stampfns = [lambda:"foobar"]
+ event = bb.event.StampUpdate(targets, stampfns)
+ self.assertEqual(event.targets, targets)
+ self.assertEqual(event.stampPrefix, stampfns)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_BuildBase(self):
+ """ Test base class for bitbake build events """
+ name = "foo"
+ pkgs = ["bar"]
+ failures = 123
+ event = bb.event.BuildBase(name, pkgs, failures)
+ self.assertEqual(event.name, name)
+ self.assertEqual(event.pkgs, pkgs)
+ self.assertEqual(event.getFailures(), failures)
+ name = event.name = "bar"
+ pkgs = event.pkgs = ["foo"]
+ self.assertEqual(event.name, name)
+ self.assertEqual(event.pkgs, pkgs)
+ self.assertEqual(event.getFailures(), failures)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_BuildInit(self):
+ """ Test class for bitbake build invocation events """
+ event = bb.event.BuildInit()
+ self.assertEqual(event.name, None)
+ self.assertEqual(event.pkgs, [])
+ self.assertEqual(event.getFailures(), 0)
+ name = event.name = "bar"
+ pkgs = event.pkgs = ["foo"]
+ self.assertEqual(event.name, name)
+ self.assertEqual(event.pkgs, pkgs)
+ self.assertEqual(event.getFailures(), 0)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_BuildStarted(self):
+ """ Test class for build started events """
+ name = "foo"
+ pkgs = ["bar"]
+ failures = 123
+ event = bb.event.BuildStarted(name, pkgs, failures)
+ self.assertEqual(event.name, name)
+ self.assertEqual(event.pkgs, pkgs)
+ self.assertEqual(event.getFailures(), failures)
+ self.assertEqual(event.msg, "Building Started")
+ name = event.name = "bar"
+ pkgs = event.pkgs = ["foo"]
+ msg = event.msg = "foobar"
+ self.assertEqual(event.name, name)
+ self.assertEqual(event.pkgs, pkgs)
+ self.assertEqual(event.getFailures(), failures)
+ self.assertEqual(event.msg, msg)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_BuildCompleted(self):
+ """ Test class for build completed events """
+ total = 1000
+ name = "foo"
+ pkgs = ["bar"]
+ failures = 123
+ interrupted = 1
+ event = bb.event.BuildCompleted(total, name, pkgs, failures,
+ interrupted)
+ self.assertEqual(event.name, name)
+ self.assertEqual(event.pkgs, pkgs)
+ self.assertEqual(event.getFailures(), failures)
+ self.assertEqual(event.msg, "Building Failed")
+ event2 = bb.event.BuildCompleted(total, name, pkgs)
+ self.assertEqual(event2.name, name)
+ self.assertEqual(event2.pkgs, pkgs)
+ self.assertEqual(event2.getFailures(), 0)
+ self.assertEqual(event2.msg, "Building Succeeded")
+ self.assertEqual(event2.pid, EventClassesTest._worker_pid)
+
+ def test_DiskFull(self):
+ """ Test DiskFull event class """
+ dev = "/dev/foo"
+ type = "ext4"
+ freespace = "104M"
+ mountpoint = "/"
+ event = bb.event.DiskFull(dev, type, freespace, mountpoint)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_MonitorDiskEvent(self):
+ """ Test MonitorDiskEvent class """
+ available_bytes = 10000000
+ free_bytes = 90000000
+ total_bytes = 1000000000
+ du = bb.event.DiskUsageSample(available_bytes, free_bytes,
+ total_bytes)
+ event = bb.event.MonitorDiskEvent(du)
+ self.assertEqual(event.disk_usage.available_bytes, available_bytes)
+ self.assertEqual(event.disk_usage.free_bytes, free_bytes)
+ self.assertEqual(event.disk_usage.total_bytes, total_bytes)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_NoProvider(self):
+ """ Test NoProvider event class """
+ item = "foobar"
+ event1 = bb.event.NoProvider(item)
+ self.assertEqual(event1.getItem(), item)
+ self.assertEqual(event1.isRuntime(), False)
+ self.assertEqual(str(event1), "Nothing PROVIDES 'foobar'")
+ runtime = True
+ dependees = ["foo", "bar"]
+ reasons = None
+ close_matches = ["foibar", "footbar"]
+ event2 = bb.event.NoProvider(item, runtime, dependees, reasons,
+ close_matches)
+ self.assertEqual(event2.isRuntime(), True)
+ expected = ("Nothing RPROVIDES 'foobar' (but foo, bar RDEPENDS"
+ " on or otherwise requires it). Close matches:\n"
+ " foibar\n"
+ " footbar")
+ self.assertEqual(str(event2), expected)
+ reasons = ["Item does not exist on database"]
+ close_matches = ["foibar", "footbar"]
+ event3 = bb.event.NoProvider(item, runtime, dependees, reasons,
+ close_matches)
+ expected = ("Nothing RPROVIDES 'foobar' (but foo, bar RDEPENDS"
+ " on or otherwise requires it)\n"
+ "Item does not exist on database")
+ self.assertEqual(str(event3), expected)
+ self.assertEqual(event3.pid, EventClassesTest._worker_pid)
+
+ def test_MultipleProviders(self):
+ """ Test MultipleProviders event class """
+ item = "foobar"
+ candidates = ["foobarv1", "foobars"]
+ event1 = bb.event.MultipleProviders(item, candidates)
+ self.assertEqual(event1.isRuntime(), False)
+ self.assertEqual(event1.getItem(), item)
+ self.assertEqual(event1.getCandidates(), candidates)
+ expected = ("Multiple providers are available for foobar (foobarv1,"
+ " foobars)\n"
+ "Consider defining a PREFERRED_PROVIDER entry to match "
+ "foobar")
+ self.assertEqual(str(event1), expected)
+ runtime = True
+ event2 = bb.event.MultipleProviders(item, candidates, runtime)
+ self.assertEqual(event2.isRuntime(), runtime)
+ expected = ("Multiple providers are available for runtime foobar "
+ "(foobarv1, foobars)\n"
+ "Consider defining a PREFERRED_RPROVIDER entry to match "
+ "foobar")
+ self.assertEqual(str(event2), expected)
+ self.assertEqual(event2.pid, EventClassesTest._worker_pid)
+
+ def test_ParseStarted(self):
+ """ Test ParseStarted event class """
+ total = 123
+ event = bb.event.ParseStarted(total)
+ self.assertEqual(event.msg, "Recipe parsing Started")
+ self.assertEqual(event.total, total)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_ParseCompleted(self):
+ """ Test ParseCompleted event class """
+ cached = 10
+ parsed = 13
+ skipped = 7
+ virtuals = 2
+ masked = 1
+ errors = 0
+ total = 23
+ event = bb.event.ParseCompleted(cached, parsed, skipped, masked,
+ virtuals, errors, total)
+ self.assertEqual(event.msg, "Recipe parsing Completed")
+ expected = [cached, parsed, skipped, virtuals, masked, errors,
+ cached + parsed, total]
+ actual = [event.cached, event.parsed, event.skipped, event.virtuals,
+ event.masked, event.errors, event.sofar, event.total]
+ self.assertEqual(str(actual), str(expected))
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_ParseProgress(self):
+ """ Test ParseProgress event class """
+ current = 10
+ total = 100
+ event = bb.event.ParseProgress(current, total)
+ self.assertEqual(event.msg,
+ "Recipe parsing" + ": %s/%s" % (current, total))
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_CacheLoadStarted(self):
+ """ Test CacheLoadStarted event class """
+ total = 123
+ event = bb.event.CacheLoadStarted(total)
+ self.assertEqual(event.msg, "Loading cache Started")
+ self.assertEqual(event.total, total)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_CacheLoadProgress(self):
+ """ Test CacheLoadProgress event class """
+ current = 10
+ total = 100
+ event = bb.event.CacheLoadProgress(current, total)
+ self.assertEqual(event.msg,
+ "Loading cache" + ": %s/%s" % (current, total))
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_CacheLoadCompleted(self):
+ """ Test CacheLoadCompleted event class """
+ total = 23
+ num_entries = 12
+ event = bb.event.CacheLoadCompleted(total, num_entries)
+ self.assertEqual(event.msg, "Loading cache Completed")
+ expected = [total, num_entries]
+ actual = [event.total, event.num_entries]
+ self.assertEqual(str(actual), str(expected))
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_TreeDataPreparationStarted(self):
+ """ Test TreeDataPreparationStarted event class """
+ event = bb.event.TreeDataPreparationStarted()
+ self.assertEqual(event.msg, "Preparing tree data Started")
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_TreeDataPreparationProgress(self):
+ """ Test TreeDataPreparationProgress event class """
+ current = 10
+ total = 100
+ event = bb.event.TreeDataPreparationProgress(current, total)
+ self.assertEqual(event.msg,
+ "Preparing tree data" + ": %s/%s" % (current, total))
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_TreeDataPreparationCompleted(self):
+ """ Test TreeDataPreparationCompleted event class """
+ total = 23
+ event = bb.event.TreeDataPreparationCompleted(total)
+ self.assertEqual(event.msg, "Preparing tree data Completed")
+ self.assertEqual(event.total, total)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_DepTreeGenerated(self):
+ """ Test DepTreeGenerated event class """
+ depgraph = Mock()
+ event = bb.event.DepTreeGenerated(depgraph)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_TargetsTreeGenerated(self):
+ """ Test TargetsTreeGenerated event class """
+ model = Mock()
+ event = bb.event.TargetsTreeGenerated(model)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_ReachableStamps(self):
+ """ Test ReachableStamps event class """
+ stamps = [Mock(), Mock()]
+ event = bb.event.ReachableStamps(stamps)
+ self.assertEqual(event.stamps, stamps)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_FilesMatchingFound(self):
+ """ Test FilesMatchingFound event class """
+ pattern = "foo.*bar"
+ matches = ["foobar"]
+ event = bb.event.FilesMatchingFound(pattern, matches)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_ConfigFilesFound(self):
+ """ Test ConfigFilesFound event class """
+ variable = "FOO_BAR"
+ values = ["foo", "bar"]
+ event = bb.event.ConfigFilesFound(variable, values)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_ConfigFilePathFound(self):
+ """ Test ConfigFilePathFound event class """
+ path = "/foo/bar"
+ event = bb.event.ConfigFilePathFound(path)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_message_classes(self):
+ """ Test message event classes """
+ msg = "foobar foo bar"
+ event = bb.event.MsgBase(msg)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+ event = bb.event.MsgDebug(msg)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+ event = bb.event.MsgNote(msg)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+ event = bb.event.MsgWarn(msg)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+ event = bb.event.MsgError(msg)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+ event = bb.event.MsgFatal(msg)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+ event = bb.event.MsgPlain(msg)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_LogExecTTY(self):
+ """ Test LogExecTTY event class """
+ msg = "foo bar"
+ prog = "foo.sh"
+ sleep_delay = 10
+ retries = 3
+ event = bb.event.LogExecTTY(msg, prog, sleep_delay, retries)
+ self.assertEqual(event.msg, msg)
+ self.assertEqual(event.prog, prog)
+ self.assertEqual(event.sleep_delay, sleep_delay)
+ self.assertEqual(event.retries, retries)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def _throw_zero_division_exception(self):
+ a = 1 / 0
+ return
+
+ def _worker_handler(self, event, d):
+ self._returned_event = event
+ return
+
+ def test_LogHandler(self):
+ """ Test LogHandler class """
+ logger = logging.getLogger("TestEventClasses")
+ logger.propagate = False
+ handler = bb.event.LogHandler(logging.INFO)
+ logger.addHandler(handler)
+ bb.event.worker_fire = self._worker_handler
+ try:
+ self._throw_zero_division_exception()
+ except ZeroDivisionError as ex:
+ logger.exception(ex)
+ event = self._returned_event
+ try:
+ pe = pickle.dumps(event)
+ newevent = pickle.loads(pe)
+ except:
+ self.fail('Logged event is not serializable')
+ self.assertEqual(event.taskpid, EventClassesTest._worker_pid)
+
+ def test_MetadataEvent(self):
+ """ Test MetadataEvent class """
+ eventtype = "footype"
+ eventdata = {"foo": "bar"}
+ event = bb.event.MetadataEvent(eventtype, eventdata)
+ self.assertEqual(event.type, eventtype)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_ProcessStarted(self):
+ """ Test ProcessStarted class """
+ processname = "foo"
+ total = 9783128974
+ event = bb.event.ProcessStarted(processname, total)
+ self.assertEqual(event.processname, processname)
+ self.assertEqual(event.total, total)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_ProcessProgress(self):
+ """ Test ProcessProgress class """
+ processname = "foo"
+ progress = 243224
+ event = bb.event.ProcessProgress(processname, progress)
+ self.assertEqual(event.processname, processname)
+ self.assertEqual(event.progress, progress)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_ProcessFinished(self):
+ """ Test ProcessFinished class """
+ processname = "foo"
+ total = 1242342344
+ event = bb.event.ProcessFinished(processname)
+ self.assertEqual(event.processname, processname)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_SanityCheck(self):
+ """ Test SanityCheck class """
+ event1 = bb.event.SanityCheck()
+ self.assertEqual(event1.generateevents, True)
+ self.assertEqual(event1.pid, EventClassesTest._worker_pid)
+ generateevents = False
+ event2 = bb.event.SanityCheck(generateevents)
+ self.assertEqual(event2.generateevents, generateevents)
+ self.assertEqual(event2.pid, EventClassesTest._worker_pid)
+
+ def test_SanityCheckPassed(self):
+ """ Test SanityCheckPassed class """
+ event = bb.event.SanityCheckPassed()
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
+
+ def test_SanityCheckFailed(self):
+ """ Test SanityCheckFailed class """
+ msg = "The sanity test failed."
+ event1 = bb.event.SanityCheckFailed(msg)
+ self.assertEqual(event1.pid, EventClassesTest._worker_pid)
+ network_error = True
+ event2 = bb.event.SanityCheckFailed(msg, network_error)
+ self.assertEqual(event2.pid, EventClassesTest._worker_pid)
+
+ def test_network_event_classes(self):
+ """ Test network event classes """
+ event1 = bb.event.NetworkTest()
+ generateevents = False
+ self.assertEqual(event1.pid, EventClassesTest._worker_pid)
+ event2 = bb.event.NetworkTest(generateevents)
+ self.assertEqual(event2.pid, EventClassesTest._worker_pid)
+ event3 = bb.event.NetworkTestPassed()
+ self.assertEqual(event3.pid, EventClassesTest._worker_pid)
+ event4 = bb.event.NetworkTestFailed()
+ self.assertEqual(event4.pid, EventClassesTest._worker_pid)
+
+ def test_FindSigInfoResult(self):
+ """ Test FindSigInfoResult event class """
+ result = [Mock()]
+ event = bb.event.FindSigInfoResult(result)
+ self.assertEqual(event.result, result)
+ self.assertEqual(event.pid, EventClassesTest._worker_pid)
diff --git a/external/poky/bitbake/lib/bb/tests/fetch.py b/external/poky/bitbake/lib/bb/tests/fetch.py
new file mode 100644
index 00000000..9c71207f
--- /dev/null
+++ b/external/poky/bitbake/lib/bb/tests/fetch.py
@@ -0,0 +1,1838 @@
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# BitBake Tests for the Fetcher (fetch2/)
+#
+# Copyright (C) 2012 Richard Purdie
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+import unittest
+import hashlib
+import tempfile
+import subprocess
+import collections
+import os
+from bb.fetch2 import URI
+from bb.fetch2 import FetchMethod
+import bb
+
+def skipIfNoNetwork():
+ if os.environ.get("BB_SKIP_NETTESTS") == "yes":
+ return unittest.skip("Network tests being skipped")
+ return lambda f: f
+
+class URITest(unittest.TestCase):
+ test_uris = {
+ "http://www.google.com/index.html" : {
+ 'uri': 'http://www.google.com/index.html',
+ 'scheme': 'http',
+ 'hostname': 'www.google.com',
+ 'port': None,
+ 'hostport': 'www.google.com',
+ 'path': '/index.html',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {},
+ 'query': {},
+ 'relative': False
+ },
+ "http://www.google.com/index.html;param1=value1" : {
+ 'uri': 'http://www.google.com/index.html;param1=value1',
+ 'scheme': 'http',
+ 'hostname': 'www.google.com',
+ 'port': None,
+ 'hostport': 'www.google.com',
+ 'path': '/index.html',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {
+ 'param1': 'value1'
+ },
+ 'query': {},
+ 'relative': False
+ },
+ "http://www.example.org/index.html?param1=value1" : {
+ 'uri': 'http://www.example.org/index.html?param1=value1',
+ 'scheme': 'http',
+ 'hostname': 'www.example.org',
+ 'port': None,
+ 'hostport': 'www.example.org',
+ 'path': '/index.html',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {},
+ 'query': {
+ 'param1': 'value1'
+ },
+ 'relative': False
+ },
+ "http://www.example.org/index.html?qparam1=qvalue1;param2=value2" : {
+ 'uri': 'http://www.example.org/index.html?qparam1=qvalue1;param2=value2',
+ 'scheme': 'http',
+ 'hostname': 'www.example.org',
+ 'port': None,
+ 'hostport': 'www.example.org',
+ 'path': '/index.html',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {
+ 'param2': 'value2'
+ },
+ 'query': {
+ 'qparam1': 'qvalue1'
+ },
+ 'relative': False
+ },
+ "http://www.example.com:8080/index.html" : {
+ 'uri': 'http://www.example.com:8080/index.html',
+ 'scheme': 'http',
+ 'hostname': 'www.example.com',
+ 'port': 8080,
+ 'hostport': 'www.example.com:8080',
+ 'path': '/index.html',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {},
+ 'query': {},
+ 'relative': False
+ },
+ "cvs://anoncvs@cvs.handhelds.org/cvs;module=familiar/dist/ipkg" : {
+ 'uri': 'cvs://anoncvs@cvs.handhelds.org/cvs;module=familiar/dist/ipkg',
+ 'scheme': 'cvs',
+ 'hostname': 'cvs.handhelds.org',
+ 'port': None,
+ 'hostport': 'cvs.handhelds.org',
+ 'path': '/cvs',
+ 'userinfo': 'anoncvs',
+ 'username': 'anoncvs',
+ 'password': '',
+ 'params': {
+ 'module': 'familiar/dist/ipkg'
+ },
+ 'query': {},
+ 'relative': False
+ },
+ "cvs://anoncvs:anonymous@cvs.handhelds.org/cvs;tag=V0-99-81;module=familiar/dist/ipkg": {
+ 'uri': 'cvs://anoncvs:anonymous@cvs.handhelds.org/cvs;tag=V0-99-81;module=familiar/dist/ipkg',
+ 'scheme': 'cvs',
+ 'hostname': 'cvs.handhelds.org',
+ 'port': None,
+ 'hostport': 'cvs.handhelds.org',
+ 'path': '/cvs',
+ 'userinfo': 'anoncvs:anonymous',
+ 'username': 'anoncvs',
+ 'password': 'anonymous',
+ 'params': collections.OrderedDict([
+ ('tag', 'V0-99-81'),
+ ('module', 'familiar/dist/ipkg')
+ ]),
+ 'query': {},
+ 'relative': False
+ },
+ "file://example.diff": { # NOTE: Not RFC compliant!
+ 'uri': 'file:example.diff',
+ 'scheme': 'file',
+ 'hostname': '',
+ 'port': None,
+ 'hostport': '',
+ 'path': 'example.diff',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {},
+ 'query': {},
+ 'relative': True
+ },
+ "file:example.diff": { # NOTE: RFC compliant version of the former
+ 'uri': 'file:example.diff',
+ 'scheme': 'file',
+ 'hostname': '',
+ 'port': None,
+ 'hostport': '',
+ 'path': 'example.diff',
+ 'userinfo': '',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {},
+ 'query': {},
+ 'relative': True
+ },
+ "file:///tmp/example.diff": {
+ 'uri': 'file:///tmp/example.diff',
+ 'scheme': 'file',
+ 'hostname': '',
+ 'port': None,
+ 'hostport': '',
+ 'path': '/tmp/example.diff',
+ 'userinfo': '',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {},
+ 'query': {},
+ 'relative': False
+ },
+ "git:///path/example.git": {
+ 'uri': 'git:///path/example.git',
+ 'scheme': 'git',
+ 'hostname': '',
+ 'port': None,
+ 'hostport': '',
+ 'path': '/path/example.git',
+ 'userinfo': '',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {},
+ 'query': {},
+ 'relative': False
+ },
+ "git:path/example.git": {
+ 'uri': 'git:path/example.git',
+ 'scheme': 'git',
+ 'hostname': '',
+ 'port': None,
+ 'hostport': '',
+ 'path': 'path/example.git',
+ 'userinfo': '',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {},
+ 'query': {},
+ 'relative': True
+ },
+ "git://example.net/path/example.git": {
+ 'uri': 'git://example.net/path/example.git',
+ 'scheme': 'git',
+ 'hostname': 'example.net',
+ 'port': None,
+ 'hostport': 'example.net',
+ 'path': '/path/example.git',
+ 'userinfo': '',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {},
+ 'query': {},
+ 'relative': False
+ },
+ "http://somesite.net;someparam=1": {
+ 'uri': 'http://somesite.net;someparam=1',
+ 'scheme': 'http',
+ 'hostname': 'somesite.net',
+ 'port': None,
+ 'hostport': 'somesite.net',
+ 'path': '',
+ 'userinfo': '',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {"someparam" : "1"},
+ 'query': {},
+ 'relative': False
+ },
+ "file://somelocation;someparam=1": {
+ 'uri': 'file:somelocation;someparam=1',
+ 'scheme': 'file',
+ 'hostname': '',
+ 'port': None,
+ 'hostport': '',
+ 'path': 'somelocation',
+ 'userinfo': '',
+ 'userinfo': '',
+ 'username': '',
+ 'password': '',
+ 'params': {"someparam" : "1"},
+ 'query': {},
+ 'relative': True
+ }
+
+ }
+
+ def test_uri(self):
+ for test_uri, ref in self.test_uris.items():
+ uri = URI(test_uri)
+
+ self.assertEqual(str(uri), ref['uri'])
+
+ # expected attributes
+ self.assertEqual(uri.scheme, ref['scheme'])
+
+ self.assertEqual(uri.userinfo, ref['userinfo'])
+ self.assertEqual(uri.username, ref['username'])
+ self.assertEqual(uri.password, ref['password'])
+
+ self.assertEqual(uri.hostname, ref['hostname'])
+ self.assertEqual(uri.port, ref['port'])
+ self.assertEqual(uri.hostport, ref['hostport'])
+
+ self.assertEqual(uri.path, ref['path'])
+ self.assertEqual(uri.params, ref['params'])
+
+ self.assertEqual(uri.relative, ref['relative'])
+
+ def test_dict(self):
+ for test in self.test_uris.values():
+ uri = URI()
+
+ self.assertEqual(uri.scheme, '')
+ self.assertEqual(uri.userinfo, '')
+ self.assertEqual(uri.username, '')
+ self.assertEqual(uri.password, '')
+ self.assertEqual(uri.hostname, '')
+ self.assertEqual(uri.port, None)
+ self.assertEqual(uri.path, '')
+ self.assertEqual(uri.params, {})
+
+
+ uri.scheme = test['scheme']
+ self.assertEqual(uri.scheme, test['scheme'])
+
+ uri.userinfo = test['userinfo']
+ self.assertEqual(uri.userinfo, test['userinfo'])
+ self.assertEqual(uri.username, test['username'])
+ self.assertEqual(uri.password, test['password'])
+
+ # make sure changing the values doesn't do anything unexpected
+ uri.username = 'changeme'
+ self.assertEqual(uri.username, 'changeme')
+ self.assertEqual(uri.password, test['password'])
+ uri.password = 'insecure'
+ self.assertEqual(uri.username, 'changeme')
+ self.assertEqual(uri.password, 'insecure')
+
+ # reset back after our trickery
+ uri.userinfo = test['userinfo']
+ self.assertEqual(uri.userinfo, test['userinfo'])
+ self.assertEqual(uri.username, test['username'])
+ self.assertEqual(uri.password, test['password'])
+
+ uri.hostname = test['hostname']
+ self.assertEqual(uri.hostname, test['hostname'])
+ self.assertEqual(uri.hostport, test['hostname'])
+
+ uri.port = test['port']
+ self.assertEqual(uri.port, test['port'])
+ self.assertEqual(uri.hostport, test['hostport'])
+
+ uri.path = test['path']
+ self.assertEqual(uri.path, test['path'])
+
+ uri.params = test['params']
+ self.assertEqual(uri.params, test['params'])
+
+ uri.query = test['query']
+ self.assertEqual(uri.query, test['query'])
+
+ self.assertEqual(str(uri), test['uri'])
+
+ uri.params = {}
+ self.assertEqual(uri.params, {})
+ self.assertEqual(str(uri), (str(uri).split(";"))[0])
+
+class FetcherTest(unittest.TestCase):
+
+ def setUp(self):
+ self.origdir = os.getcwd()
+ self.d = bb.data.init()
+ self.tempdir = tempfile.mkdtemp()
+ self.dldir = os.path.join(self.tempdir, "download")
+ os.mkdir(self.dldir)
+ self.d.setVar("DL_DIR", self.dldir)
+ self.unpackdir = os.path.join(self.tempdir, "unpacked")
+ os.mkdir(self.unpackdir)
+ persistdir = os.path.join(self.tempdir, "persistdata")
+ self.d.setVar("PERSISTENT_DIR", persistdir)
+
+ def tearDown(self):
+ os.chdir(self.origdir)
+ if os.environ.get("BB_TMPDIR_NOCLEAN") == "yes":
+ print("Not cleaning up %s. Please remove manually." % self.tempdir)
+ else:
+ bb.utils.prunedir(self.tempdir)
+
+class MirrorUriTest(FetcherTest):
+
+ replaceuris = {
+ ("git://git.invalid.infradead.org/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/.*", "http://somewhere.org/somedir/")
+ : "http://somewhere.org/somedir/git2_git.invalid.infradead.org.mtd-utils.git.tar.gz",
+ ("git://git.invalid.infradead.org/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/([^/]+/)*([^/]*)", "git://somewhere.org/somedir/\\2;protocol=http")
+ : "git://somewhere.org/somedir/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http",
+ ("git://git.invalid.infradead.org/foo/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/([^/]+/)*([^/]*)", "git://somewhere.org/somedir/\\2;protocol=http")
+ : "git://somewhere.org/somedir/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http",
+ ("git://git.invalid.infradead.org/foo/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/([^/]+/)*([^/]*)", "git://somewhere.org/\\2;protocol=http")
+ : "git://somewhere.org/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http",
+ ("git://someserver.org/bitbake;tag=1234567890123456789012345678901234567890", "git://someserver.org/bitbake", "git://git.openembedded.org/bitbake")
+ : "git://git.openembedded.org/bitbake;tag=1234567890123456789012345678901234567890",
+ ("file://sstate-xyz.tgz", "file://.*", "file:///somewhere/1234/sstate-cache")
+ : "file:///somewhere/1234/sstate-cache/sstate-xyz.tgz",
+ ("file://sstate-xyz.tgz", "file://.*", "file:///somewhere/1234/sstate-cache/")
+ : "file:///somewhere/1234/sstate-cache/sstate-xyz.tgz",
+ ("http://somewhere.org/somedir1/somedir2/somefile_1.2.3.tar.gz", "http://.*/.*", "http://somewhere2.org/somedir3")
+ : "http://somewhere2.org/somedir3/somefile_1.2.3.tar.gz",
+ ("http://somewhere.org/somedir1/somefile_1.2.3.tar.gz", "http://somewhere.org/somedir1/somefile_1.2.3.tar.gz", "http://somewhere2.org/somedir3/somefile_1.2.3.tar.gz")
+ : "http://somewhere2.org/somedir3/somefile_1.2.3.tar.gz",
+ ("http://www.apache.org/dist/subversion/subversion-1.7.1.tar.bz2", "http://www.apache.org/dist", "http://archive.apache.org/dist")
+ : "http://archive.apache.org/dist/subversion/subversion-1.7.1.tar.bz2",
+ ("http://www.apache.org/dist/subversion/subversion-1.7.1.tar.bz2", "http://.*/.*", "file:///somepath/downloads/")
+ : "file:///somepath/downloads/subversion-1.7.1.tar.bz2",
+ ("git://git.invalid.infradead.org/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/.*", "git://somewhere.org/somedir/BASENAME;protocol=http")
+ : "git://somewhere.org/somedir/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http",
+ ("git://git.invalid.infradead.org/foo/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/.*", "git://somewhere.org/somedir/BASENAME;protocol=http")
+ : "git://somewhere.org/somedir/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http",
+ ("git://git.invalid.infradead.org/foo/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/.*", "git://somewhere.org/somedir/MIRRORNAME;protocol=http")
+ : "git://somewhere.org/somedir/git.invalid.infradead.org.foo.mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http",
+ ("http://somewhere.org/somedir1/somedir2/somefile_1.2.3.tar.gz", "http://.*/.*", "http://somewhere2.org")
+ : "http://somewhere2.org/somefile_1.2.3.tar.gz",
+ ("http://somewhere.org/somedir1/somedir2/somefile_1.2.3.tar.gz", "http://.*/.*", "http://somewhere2.org/")
+ : "http://somewhere2.org/somefile_1.2.3.tar.gz",
+ ("git://someserver.org/bitbake;tag=1234567890123456789012345678901234567890;branch=master", "git://someserver.org/bitbake;branch=master", "git://git.openembedded.org/bitbake;protocol=http")
+ : "git://git.openembedded.org/bitbake;tag=1234567890123456789012345678901234567890;branch=master;protocol=http",
+
+ #Renaming files doesn't work
+ #("http://somewhere.org/somedir1/somefile_1.2.3.tar.gz", "http://somewhere.org/somedir1/somefile_1.2.3.tar.gz", "http://somewhere2.org/somedir3/somefile_2.3.4.tar.gz") : "http://somewhere2.org/somedir3/somefile_2.3.4.tar.gz"
+ #("file://sstate-xyz.tgz", "file://.*/.*", "file:///somewhere/1234/sstate-cache") : "file:///somewhere/1234/sstate-cache/sstate-xyz.tgz",
+ }
+
+ mirrorvar = "http://.*/.* file:///somepath/downloads/ \n" \
+ "git://someserver.org/bitbake git://git.openembedded.org/bitbake \n" \
+ "https://.*/.* file:///someotherpath/downloads/ \n" \
+ "http://.*/.* file:///someotherpath/downloads/ \n"
+
+ def test_urireplace(self):
+ for k, v in self.replaceuris.items():
+ ud = bb.fetch.FetchData(k[0], self.d)
+ ud.setup_localpath(self.d)
+ mirrors = bb.fetch2.mirror_from_string("%s %s" % (k[1], k[2]))
+ newuris, uds = bb.fetch2.build_mirroruris(ud, mirrors, self.d)
+ self.assertEqual([v], newuris)
+
+ def test_urilist1(self):
+ fetcher = bb.fetch.FetchData("http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", self.d)
+ mirrors = bb.fetch2.mirror_from_string(self.mirrorvar)
+ uris, uds = bb.fetch2.build_mirroruris(fetcher, mirrors, self.d)
+ self.assertEqual(uris, ['file:///somepath/downloads/bitbake-1.0.tar.gz', 'file:///someotherpath/downloads/bitbake-1.0.tar.gz'])
+
+ def test_urilist2(self):
+ # Catch https:// -> files:// bug
+ fetcher = bb.fetch.FetchData("https://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", self.d)
+ mirrors = bb.fetch2.mirror_from_string(self.mirrorvar)
+ uris, uds = bb.fetch2.build_mirroruris(fetcher, mirrors, self.d)
+ self.assertEqual(uris, ['file:///someotherpath/downloads/bitbake-1.0.tar.gz'])
+
+ def test_mirror_of_mirror(self):
+ # Test if mirror of a mirror works
+ mirrorvar = self.mirrorvar + " http://.*/.* http://otherdownloads.yoctoproject.org/downloads/ \n"
+ mirrorvar = mirrorvar + " http://otherdownloads.yoctoproject.org/.* http://downloads2.yoctoproject.org/downloads/ \n"
+ fetcher = bb.fetch.FetchData("http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", self.d)
+ mirrors = bb.fetch2.mirror_from_string(mirrorvar)
+ uris, uds = bb.fetch2.build_mirroruris(fetcher, mirrors, self.d)
+ self.assertEqual(uris, ['file:///somepath/downloads/bitbake-1.0.tar.gz',
+ 'file:///someotherpath/downloads/bitbake-1.0.tar.gz',
+ 'http://otherdownloads.yoctoproject.org/downloads/bitbake-1.0.tar.gz',
+ 'http://downloads2.yoctoproject.org/downloads/bitbake-1.0.tar.gz'])
+
+ recmirrorvar = "https://.*/[^/]* http://AAAA/A/A/A/ \n" \
+ "https://.*/[^/]* https://BBBB/B/B/B/ \n"
+
+ def test_recursive(self):
+ fetcher = bb.fetch.FetchData("https://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", self.d)
+ mirrors = bb.fetch2.mirror_from_string(self.recmirrorvar)
+ uris, uds = bb.fetch2.build_mirroruris(fetcher, mirrors, self.d)
+ self.assertEqual(uris, ['http://AAAA/A/A/A/bitbake/bitbake-1.0.tar.gz',
+ 'https://BBBB/B/B/B/bitbake/bitbake-1.0.tar.gz',
+ 'http://AAAA/A/A/A/B/B/bitbake/bitbake-1.0.tar.gz'])
+
+
+class GitDownloadDirectoryNamingTest(FetcherTest):
+ def setUp(self):
+ super(GitDownloadDirectoryNamingTest, self).setUp()
+ self.recipe_url = "git://git.openembedded.org/bitbake"
+ self.recipe_dir = "git.openembedded.org.bitbake"
+ self.mirror_url = "git://github.com/openembedded/bitbake.git"
+ self.mirror_dir = "github.com.openembedded.bitbake.git"
+
+ self.d.setVar('SRCREV', '82ea737a0b42a8b53e11c9cde141e9e9c0bd8c40')
+
+ def setup_mirror_rewrite(self):
+ self.d.setVar("PREMIRRORS", self.recipe_url + " " + self.mirror_url + " \n")
+
+ @skipIfNoNetwork()
+ def test_that_directory_is_named_after_recipe_url_when_no_mirroring_is_used(self):
+ self.setup_mirror_rewrite()
+ fetcher = bb.fetch.Fetch([self.recipe_url], self.d)
+
+ fetcher.download()
+
+ dir = os.listdir(self.dldir + "/git2")
+ self.assertIn(self.recipe_dir, dir)
+
+ @skipIfNoNetwork()
+ def test_that_directory_exists_for_mirrored_url_and_recipe_url_when_mirroring_is_used(self):
+ self.setup_mirror_rewrite()
+ fetcher = bb.fetch.Fetch([self.recipe_url], self.d)
+
+ fetcher.download()
+
+ dir = os.listdir(self.dldir + "/git2")
+ self.assertIn(self.mirror_dir, dir)
+ self.assertIn(self.recipe_dir, dir)
+
+ @skipIfNoNetwork()
+ def test_that_recipe_directory_and_mirrored_directory_exists_when_mirroring_is_used_and_the_mirrored_directory_already_exists(self):
+ self.setup_mirror_rewrite()
+ fetcher = bb.fetch.Fetch([self.mirror_url], self.d)
+ fetcher.download()
+ fetcher = bb.fetch.Fetch([self.recipe_url], self.d)
+
+ fetcher.download()
+
+ dir = os.listdir(self.dldir + "/git2")
+ self.assertIn(self.mirror_dir, dir)
+ self.assertIn(self.recipe_dir, dir)
+
+
+class TarballNamingTest(FetcherTest):
+ def setUp(self):
+ super(TarballNamingTest, self).setUp()
+ self.recipe_url = "git://git.openembedded.org/bitbake"
+ self.recipe_tarball = "git2_git.openembedded.org.bitbake.tar.gz"
+ self.mirror_url = "git://github.com/openembedded/bitbake.git"
+ self.mirror_tarball = "git2_github.com.openembedded.bitbake.git.tar.gz"
+
+ self.d.setVar('BB_GENERATE_MIRROR_TARBALLS', '1')
+ self.d.setVar('SRCREV', '82ea737a0b42a8b53e11c9cde141e9e9c0bd8c40')
+
+ def setup_mirror_rewrite(self):
+ self.d.setVar("PREMIRRORS", self.recipe_url + " " + self.mirror_url + " \n")
+
+ @skipIfNoNetwork()
+ def test_that_the_recipe_tarball_is_created_when_no_mirroring_is_used(self):
+ fetcher = bb.fetch.Fetch([self.recipe_url], self.d)
+
+ fetcher.download()
+
+ dir = os.listdir(self.dldir)
+ self.assertIn(self.recipe_tarball, dir)
+
+ @skipIfNoNetwork()
+ def test_that_the_mirror_tarball_is_created_when_mirroring_is_used(self):
+ self.setup_mirror_rewrite()
+ fetcher = bb.fetch.Fetch([self.recipe_url], self.d)
+
+ fetcher.download()
+
+ dir = os.listdir(self.dldir)
+ self.assertIn(self.mirror_tarball, dir)
+
+
+class GitShallowTarballNamingTest(FetcherTest):
+ def setUp(self):
+ super(GitShallowTarballNamingTest, self).setUp()
+ self.recipe_url = "git://git.openembedded.org/bitbake"
+ self.recipe_tarball = "gitshallow_git.openembedded.org.bitbake_82ea737-1_master.tar.gz"
+ self.mirror_url = "git://github.com/openembedded/bitbake.git"
+ self.mirror_tarball = "gitshallow_github.com.openembedded.bitbake.git_82ea737-1_master.tar.gz"
+
+ self.d.setVar('BB_GIT_SHALLOW', '1')
+ self.d.setVar('BB_GENERATE_SHALLOW_TARBALLS', '1')
+ self.d.setVar('SRCREV', '82ea737a0b42a8b53e11c9cde141e9e9c0bd8c40')
+
+ def setup_mirror_rewrite(self):
+ self.d.setVar("PREMIRRORS", self.recipe_url + " " + self.mirror_url + " \n")
+
+ @skipIfNoNetwork()
+ def test_that_the_tarball_is_named_after_recipe_url_when_no_mirroring_is_used(self):
+ fetcher = bb.fetch.Fetch([self.recipe_url], self.d)
+
+ fetcher.download()
+
+ dir = os.listdir(self.dldir)
+ self.assertIn(self.recipe_tarball, dir)
+
+ @skipIfNoNetwork()
+ def test_that_the_mirror_tarball_is_created_when_mirroring_is_used(self):
+ self.setup_mirror_rewrite()
+ fetcher = bb.fetch.Fetch([self.recipe_url], self.d)
+
+ fetcher.download()
+
+ dir = os.listdir(self.dldir)
+ self.assertIn(self.mirror_tarball, dir)
+
+
+class FetcherLocalTest(FetcherTest):
+ def setUp(self):
+ def touch(fn):
+ with open(fn, 'a'):
+ os.utime(fn, None)
+
+ super(FetcherLocalTest, self).setUp()
+ self.localsrcdir = os.path.join(self.tempdir, 'localsrc')
+ os.makedirs(self.localsrcdir)
+ touch(os.path.join(self.localsrcdir, 'a'))
+ touch(os.path.join(self.localsrcdir, 'b'))
+ os.makedirs(os.path.join(self.localsrcdir, 'dir'))
+ touch(os.path.join(self.localsrcdir, 'dir', 'c'))
+ touch(os.path.join(self.localsrcdir, 'dir', 'd'))
+ os.makedirs(os.path.join(self.localsrcdir, 'dir', 'subdir'))
+ touch(os.path.join(self.localsrcdir, 'dir', 'subdir', 'e'))
+ self.d.setVar("FILESPATH", self.localsrcdir)
+
+ def fetchUnpack(self, uris):
+ fetcher = bb.fetch.Fetch(uris, self.d)
+ fetcher.download()
+ fetcher.unpack(self.unpackdir)
+ flst = []
+ for root, dirs, files in os.walk(self.unpackdir):
+ for f in files:
+ flst.append(os.path.relpath(os.path.join(root, f), self.unpackdir))
+ flst.sort()
+ return flst
+
+ def test_local(self):
+ tree = self.fetchUnpack(['file://a', 'file://dir/c'])
+ self.assertEqual(tree, ['a', 'dir/c'])
+
+ def test_local_wildcard(self):
+ tree = self.fetchUnpack(['file://a', 'file://dir/*'])
+ self.assertEqual(tree, ['a', 'dir/c', 'dir/d', 'dir/subdir/e'])
+
+ def test_local_dir(self):
+ tree = self.fetchUnpack(['file://a', 'file://dir'])
+ self.assertEqual(tree, ['a', 'dir/c', 'dir/d', 'dir/subdir/e'])
+
+ def test_local_subdir(self):
+ tree = self.fetchUnpack(['file://dir/subdir'])
+ self.assertEqual(tree, ['dir/subdir/e'])
+
+ def test_local_subdir_file(self):
+ tree = self.fetchUnpack(['file://dir/subdir/e'])
+ self.assertEqual(tree, ['dir/subdir/e'])
+
+ def test_local_subdirparam(self):
+ tree = self.fetchUnpack(['file://a;subdir=bar', 'file://dir;subdir=foo/moo'])
+ self.assertEqual(tree, ['bar/a', 'foo/moo/dir/c', 'foo/moo/dir/d', 'foo/moo/dir/subdir/e'])
+
+ def test_local_deepsubdirparam(self):
+ tree = self.fetchUnpack(['file://dir/subdir/e;subdir=bar'])
+ self.assertEqual(tree, ['bar/dir/subdir/e'])
+
+ def test_local_absolutedir(self):
+ # Unpacking to an absolute path that is a subdirectory of the root
+ # should work
+ tree = self.fetchUnpack(['file://a;subdir=%s' % os.path.join(self.unpackdir, 'bar')])
+
+ # Unpacking to an absolute path outside of the root should fail
+ with self.assertRaises(bb.fetch2.UnpackError):
+ self.fetchUnpack(['file://a;subdir=/bin/sh'])
+
+class FetcherNoNetworkTest(FetcherTest):
+ def setUp(self):
+ super().setUp()
+ # all test cases are based on not having network
+ self.d.setVar("BB_NO_NETWORK", "1")
+
+ def test_missing(self):
+ string = "this is a test file\n".encode("utf-8")
+ self.d.setVarFlag("SRC_URI", "md5sum", hashlib.md5(string).hexdigest())
+ self.d.setVarFlag("SRC_URI", "sha256sum", hashlib.sha256(string).hexdigest())
+
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+ fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/test-file.tar.gz"], self.d)
+ with self.assertRaises(bb.fetch2.NetworkAccess):
+ fetcher.download()
+
+ def test_valid_missing_donestamp(self):
+ # create the file in the download directory with correct hash
+ string = "this is a test file\n".encode("utf-8")
+ with open(os.path.join(self.dldir, "test-file.tar.gz"), "wb") as f:
+ f.write(string)
+
+ self.d.setVarFlag("SRC_URI", "md5sum", hashlib.md5(string).hexdigest())
+ self.d.setVarFlag("SRC_URI", "sha256sum", hashlib.sha256(string).hexdigest())
+
+ self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+ fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/test-file.tar.gz"], self.d)
+ fetcher.download()
+ self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+
+ def test_invalid_missing_donestamp(self):
+ # create an invalid file in the download directory with incorrect hash
+ string = "this is a test file\n".encode("utf-8")
+ with open(os.path.join(self.dldir, "test-file.tar.gz"), "wb"):
+ pass
+
+ self.d.setVarFlag("SRC_URI", "md5sum", hashlib.md5(string).hexdigest())
+ self.d.setVarFlag("SRC_URI", "sha256sum", hashlib.sha256(string).hexdigest())
+
+ self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+ fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/test-file.tar.gz"], self.d)
+ with self.assertRaises(bb.fetch2.NetworkAccess):
+ fetcher.download()
+ # the existing file should not exist or should have be moved to "bad-checksum"
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+
+ def test_nochecksums_missing(self):
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+ # ssh fetch does not support checksums
+ fetcher = bb.fetch.Fetch(["ssh://invalid@invalid.yoctoproject.org/test-file.tar.gz"], self.d)
+ # attempts to download with missing donestamp
+ with self.assertRaises(bb.fetch2.NetworkAccess):
+ fetcher.download()
+
+ def test_nochecksums_missing_donestamp(self):
+ # create a file in the download directory
+ with open(os.path.join(self.dldir, "test-file.tar.gz"), "wb"):
+ pass
+
+ self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+ # ssh fetch does not support checksums
+ fetcher = bb.fetch.Fetch(["ssh://invalid@invalid.yoctoproject.org/test-file.tar.gz"], self.d)
+ # attempts to download with missing donestamp
+ with self.assertRaises(bb.fetch2.NetworkAccess):
+ fetcher.download()
+
+ def test_nochecksums_has_donestamp(self):
+ # create a file in the download directory with the donestamp
+ with open(os.path.join(self.dldir, "test-file.tar.gz"), "wb"):
+ pass
+ with open(os.path.join(self.dldir, "test-file.tar.gz.done"), "wb"):
+ pass
+
+ self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+ self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+ # ssh fetch does not support checksums
+ fetcher = bb.fetch.Fetch(["ssh://invalid@invalid.yoctoproject.org/test-file.tar.gz"], self.d)
+ # should not fetch
+ fetcher.download()
+ # both files should still exist
+ self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+ self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+
+ def test_nochecksums_missing_has_donestamp(self):
+ # create a file in the download directory with the donestamp
+ with open(os.path.join(self.dldir, "test-file.tar.gz.done"), "wb"):
+ pass
+
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+ self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+ # ssh fetch does not support checksums
+ fetcher = bb.fetch.Fetch(["ssh://invalid@invalid.yoctoproject.org/test-file.tar.gz"], self.d)
+ with self.assertRaises(bb.fetch2.NetworkAccess):
+ fetcher.download()
+ # both files should still exist
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz")))
+ self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done")))
+
+class FetcherNetworkTest(FetcherTest):
+ @skipIfNoNetwork()
+ def test_fetch(self):
+ fetcher = bb.fetch.Fetch(["http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", "http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.1.tar.gz"], self.d)
+ fetcher.download()
+ self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749)
+ self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.1.tar.gz"), 57892)
+ self.d.setVar("BB_NO_NETWORK", "1")
+ fetcher = bb.fetch.Fetch(["http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", "http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.1.tar.gz"], self.d)
+ fetcher.download()
+ fetcher.unpack(self.unpackdir)
+ self.assertEqual(len(os.listdir(self.unpackdir + "/bitbake-1.0/")), 9)
+ self.assertEqual(len(os.listdir(self.unpackdir + "/bitbake-1.1/")), 9)
+
+ @skipIfNoNetwork()
+ def test_fetch_mirror(self):
+ self.d.setVar("MIRRORS", "http://.*/.* http://downloads.yoctoproject.org/releases/bitbake")
+ fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz"], self.d)
+ fetcher.download()
+ self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749)
+
+ @skipIfNoNetwork()
+ def test_fetch_mirror_of_mirror(self):
+ self.d.setVar("MIRRORS", "http://.*/.* http://invalid2.yoctoproject.org/ \n http://invalid2.yoctoproject.org/.* http://downloads.yoctoproject.org/releases/bitbake")
+ fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz"], self.d)
+ fetcher.download()
+ self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749)
+
+ @skipIfNoNetwork()
+ def test_fetch_file_mirror_of_mirror(self):
+ self.d.setVar("MIRRORS", "http://.*/.* file:///some1where/ \n file:///some1where/.* file://some2where/ \n file://some2where/.* http://downloads.yoctoproject.org/releases/bitbake")
+ fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz"], self.d)
+ os.mkdir(self.dldir + "/some2where")
+ fetcher.download()
+ self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749)
+
+ @skipIfNoNetwork()
+ def test_fetch_premirror(self):
+ self.d.setVar("PREMIRRORS", "http://.*/.* http://downloads.yoctoproject.org/releases/bitbake")
+ fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz"], self.d)
+ fetcher.download()
+ self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749)
+
+ @skipIfNoNetwork()
+ def gitfetcher(self, url1, url2):
+ def checkrevision(self, fetcher):
+ fetcher.unpack(self.unpackdir)
+ revision = bb.process.run("git rev-parse HEAD", shell=True, cwd=self.unpackdir + "/git")[0].strip()
+ self.assertEqual(revision, "270a05b0b4ba0959fe0624d2a4885d7b70426da5")
+
+ self.d.setVar("BB_GENERATE_MIRROR_TARBALLS", "1")
+ self.d.setVar("SRCREV", "270a05b0b4ba0959fe0624d2a4885d7b70426da5")
+ fetcher = bb.fetch.Fetch([url1], self.d)
+ fetcher.download()
+ checkrevision(self, fetcher)
+ # Wipe out the dldir clone and the unpacked source, turn off the network and check mirror tarball works
+ bb.utils.prunedir(self.dldir + "/git2/")
+ bb.utils.prunedir(self.unpackdir)
+ self.d.setVar("BB_NO_NETWORK", "1")
+ fetcher = bb.fetch.Fetch([url2], self.d)
+ fetcher.download()
+ checkrevision(self, fetcher)
+
+ @skipIfNoNetwork()
+ def test_gitfetch(self):
+ url1 = url2 = "git://git.openembedded.org/bitbake"
+ self.gitfetcher(url1, url2)
+
+ @skipIfNoNetwork()
+ def test_gitfetch_goodsrcrev(self):
+ # SRCREV is set but matches rev= parameter
+ url1 = url2 = "git://git.openembedded.org/bitbake;rev=270a05b0b4ba0959fe0624d2a4885d7b70426da5"
+ self.gitfetcher(url1, url2)
+
+ @skipIfNoNetwork()
+ def test_gitfetch_badsrcrev(self):
+ # SRCREV is set but does not match rev= parameter
+ url1 = url2 = "git://git.openembedded.org/bitbake;rev=dead05b0b4ba0959fe0624d2a4885d7b70426da5"
+ self.assertRaises(bb.fetch.FetchError, self.gitfetcher, url1, url2)
+
+ @skipIfNoNetwork()
+ def test_gitfetch_tagandrev(self):
+ # SRCREV is set but does not match rev= parameter
+ url1 = url2 = "git://git.openembedded.org/bitbake;rev=270a05b0b4ba0959fe0624d2a4885d7b70426da5;tag=270a05b0b4ba0959fe0624d2a4885d7b70426da5"
+ self.assertRaises(bb.fetch.FetchError, self.gitfetcher, url1, url2)
+
+ @skipIfNoNetwork()
+ def test_gitfetch_localusehead(self):
+ # Create dummy local Git repo
+ src_dir = tempfile.mkdtemp(dir=self.tempdir,
+ prefix='gitfetch_localusehead_')
+ src_dir = os.path.abspath(src_dir)
+ bb.process.run("git init", cwd=src_dir)
+ bb.process.run("git commit --allow-empty -m'Dummy commit'",
+ cwd=src_dir)
+ # Use other branch than master
+ bb.process.run("git checkout -b my-devel", cwd=src_dir)
+ bb.process.run("git commit --allow-empty -m'Dummy commit 2'",
+ cwd=src_dir)
+ stdout = bb.process.run("git rev-parse HEAD", cwd=src_dir)
+ orig_rev = stdout[0].strip()
+
+ # Fetch and check revision
+ self.d.setVar("SRCREV", "AUTOINC")
+ url = "git://" + src_dir + ";protocol=file;usehead=1"
+ fetcher = bb.fetch.Fetch([url], self.d)
+ fetcher.download()
+ fetcher.unpack(self.unpackdir)
+ stdout = bb.process.run("git rev-parse HEAD",
+ cwd=os.path.join(self.unpackdir, 'git'))
+ unpack_rev = stdout[0].strip()
+ self.assertEqual(orig_rev, unpack_rev)
+
+ @skipIfNoNetwork()
+ def test_gitfetch_remoteusehead(self):
+ url = "git://git.openembedded.org/bitbake;usehead=1"
+ self.assertRaises(bb.fetch.ParameterError, self.gitfetcher, url, url)
+
+ @skipIfNoNetwork()
+ def test_gitfetch_finds_local_tarball_for_mirrored_url_when_previous_downloaded_by_the_recipe_url(self):
+ recipeurl = "git://git.openembedded.org/bitbake"
+ mirrorurl = "git://someserver.org/bitbake"
+ self.d.setVar("PREMIRRORS", "git://someserver.org/bitbake git://git.openembedded.org/bitbake \n")
+ self.gitfetcher(recipeurl, mirrorurl)
+
+ @skipIfNoNetwork()
+ def test_gitfetch_finds_local_tarball_when_previous_downloaded_from_a_premirror(self):
+ recipeurl = "git://someserver.org/bitbake"
+ self.d.setVar("PREMIRRORS", "git://someserver.org/bitbake git://git.openembedded.org/bitbake \n")
+ self.gitfetcher(recipeurl, recipeurl)
+
+ @skipIfNoNetwork()
+ def test_gitfetch_finds_local_repository_when_premirror_rewrites_the_recipe_url(self):
+ realurl = "git://git.openembedded.org/bitbake"
+ recipeurl = "git://someserver.org/bitbake"
+ self.sourcedir = self.unpackdir.replace("unpacked", "sourcemirror.git")
+ os.chdir(self.tempdir)
+ bb.process.run("git clone %s %s 2> /dev/null" % (realurl, self.sourcedir), shell=True)
+ self.d.setVar("PREMIRRORS", "%s git://%s;protocol=file \n" % (recipeurl, self.sourcedir))
+ self.gitfetcher(recipeurl, recipeurl)
+
+ @skipIfNoNetwork()
+ def test_git_submodule(self):
+ # URL with ssh submodules
+ url = "gitsm://git.yoctoproject.org/git-submodule-test;branch=ssh-gitsm-tests;rev=049da4a6cb198d7c0302e9e8b243a1443cb809a7"
+ # Original URL (comment this if you have ssh access to git.yoctoproject.org)
+ url = "gitsm://git.yoctoproject.org/git-submodule-test;branch=master;rev=a2885dd7d25380d23627e7544b7bbb55014b16ee"
+ fetcher = bb.fetch.Fetch([url], self.d)
+ fetcher.download()
+ # Previous cwd has been deleted
+ os.chdir(os.path.dirname(self.unpackdir))
+ fetcher.unpack(self.unpackdir)
+
+ repo_path = os.path.join(self.tempdir, 'unpacked', 'git')
+ self.assertTrue(os.path.exists(repo_path), msg='Unpacked repository missing')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'bitbake')), msg='bitbake submodule missing')
+ self.assertFalse(os.path.exists(os.path.join(repo_path, 'na')), msg='uninitialized submodule present')
+
+ # Only when we're running the extended test with a submodule's submodule, can we check this.
+ if os.path.exists(os.path.join(repo_path, 'bitbake-gitsm-test1')):
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'bitbake-gitsm-test1', 'bitbake')), msg='submodule of submodule missing')
+
+ def test_git_submodule_dbus_broker(self):
+ # The following external repositories have show failures in fetch and unpack operations
+ # We want to avoid regressions!
+ url = "gitsm://github.com/bus1/dbus-broker;protocol=git;rev=fc874afa0992d0c75ec25acb43d344679f0ee7d2"
+ fetcher = bb.fetch.Fetch([url], self.d)
+ fetcher.download()
+ # Previous cwd has been deleted
+ os.chdir(os.path.dirname(self.unpackdir))
+ fetcher.unpack(self.unpackdir)
+
+ repo_path = os.path.join(self.tempdir, 'unpacked', 'git')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/subprojects/c-dvar/config')), msg='Missing submodule config "subprojects/c-dvar"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/subprojects/c-list/config')), msg='Missing submodule config "subprojects/c-list"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/subprojects/c-rbtree/config')), msg='Missing submodule config "subprojects/c-rbtree"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/subprojects/c-sundry/config')), msg='Missing submodule config "subprojects/c-sundry"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/subprojects/c-utf8/config')), msg='Missing submodule config "subprojects/c-utf8"')
+
+ def test_git_submodule_CLI11(self):
+ url = "gitsm://github.com/CLIUtils/CLI11;protocol=git;rev=bd4dc911847d0cde7a6b41dfa626a85aab213baf"
+ fetcher = bb.fetch.Fetch([url], self.d)
+ fetcher.download()
+ # Previous cwd has been deleted
+ os.chdir(os.path.dirname(self.unpackdir))
+ fetcher.unpack(self.unpackdir)
+
+ repo_path = os.path.join(self.tempdir, 'unpacked', 'git')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/extern/googletest/config')), msg='Missing submodule config "extern/googletest"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/extern/json/config')), msg='Missing submodule config "extern/json"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/extern/sanitizers/config')), msg='Missing submodule config "extern/sanitizers"')
+
+ def test_git_submodule_update_CLI11(self):
+ """ Prevent regression on update detection not finding missing submodule, or modules without needed commits """
+ url = "gitsm://github.com/CLIUtils/CLI11;protocol=git;rev=cf6a99fa69aaefe477cc52e3ef4a7d2d7fa40714"
+ fetcher = bb.fetch.Fetch([url], self.d)
+ fetcher.download()
+
+ # CLI11 that pulls in a newer nlohmann-json
+ url = "gitsm://github.com/CLIUtils/CLI11;protocol=git;rev=49ac989a9527ee9bb496de9ded7b4872c2e0e5ca"
+ fetcher = bb.fetch.Fetch([url], self.d)
+ fetcher.download()
+ # Previous cwd has been deleted
+ os.chdir(os.path.dirname(self.unpackdir))
+ fetcher.unpack(self.unpackdir)
+
+ repo_path = os.path.join(self.tempdir, 'unpacked', 'git')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/extern/googletest/config')), msg='Missing submodule config "extern/googletest"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/extern/json/config')), msg='Missing submodule config "extern/json"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/extern/sanitizers/config')), msg='Missing submodule config "extern/sanitizers"')
+
+ def test_git_submodule_aktualizr(self):
+ url = "gitsm://github.com/advancedtelematic/aktualizr;branch=master;protocol=git;rev=d00d1a04cc2366d1a5f143b84b9f507f8bd32c44"
+ fetcher = bb.fetch.Fetch([url], self.d)
+ fetcher.download()
+ # Previous cwd has been deleted
+ os.chdir(os.path.dirname(self.unpackdir))
+ fetcher.unpack(self.unpackdir)
+
+ repo_path = os.path.join(self.tempdir, 'unpacked', 'git')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/partial/extern/isotp-c/config')), msg='Missing submodule config "partial/extern/isotp-c/config"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/partial/extern/isotp-c/modules/deps/bitfield-c/config')), msg='Missing submodule config "partial/extern/isotp-c/modules/deps/bitfield-c/config"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'partial/extern/isotp-c/deps/bitfield-c/.git')), msg="Submodule of submodule isotp-c did not unpack properly")
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/tests/tuf-test-vectors/config')), msg='Missing submodule config "tests/tuf-test-vectors/config"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/third_party/googletest/config')), msg='Missing submodule config "third_party/googletest/config"')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, '.git/modules/third_party/HdrHistogram_c/config')), msg='Missing submodule config "third_party/HdrHistogram_c/config"')
+
+ def test_git_submodule_iotedge(self):
+ """ Prevent regression on deeply nested submodules not being checked out properly, even though they were fetched. """
+
+ # This repository also has submodules where the module (name), path and url do not align
+ url = "gitsm://github.com/azure/iotedge.git;protocol=git;rev=d76e0316c6f324345d77c48a83ce836d09392699"
+ fetcher = bb.fetch.Fetch([url], self.d)
+ fetcher.download()
+ # Previous cwd has been deleted
+ os.chdir(os.path.dirname(self.unpackdir))
+ fetcher.unpack(self.unpackdir)
+
+ repo_path = os.path.join(self.tempdir, 'unpacked', 'git')
+
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/c-shared/README.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/c-shared/testtools/ctest/README.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/c-shared/testtools/testrunner/readme.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/c-shared/testtools/umock-c/readme.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/c-shared/testtools/umock-c/deps/ctest/README.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/c-shared/testtools/umock-c/deps/testrunner/readme.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/utpm/README.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/utpm/deps/c-utility/README.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/utpm/deps/c-utility/testtools/ctest/README.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/utpm/deps/c-utility/testtools/testrunner/readme.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/utpm/deps/c-utility/testtools/umock-c/readme.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/utpm/deps/c-utility/testtools/umock-c/deps/ctest/README.md')), msg='Missing submodule checkout')
+ self.assertTrue(os.path.exists(os.path.join(repo_path, 'edgelet/hsm-sys/azure-iot-hsm-c/deps/utpm/deps/c-utility/testtools/umock-c/deps/testrunner/readme.md')), msg='Missing submodule checkout')
+
+class TrustedNetworksTest(FetcherTest):
+ def test_trusted_network(self):
+ # Ensure trusted_network returns False when the host IS in the list.
+ url = "git://Someserver.org/foo;rev=1"
+ self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org someserver.org server2.org server3.org")
+ self.assertTrue(bb.fetch.trusted_network(self.d, url))
+
+ def test_wild_trusted_network(self):
+ # Ensure trusted_network returns true when the *.host IS in the list.
+ url = "git://Someserver.org/foo;rev=1"
+ self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org *.someserver.org server2.org server3.org")
+ self.assertTrue(bb.fetch.trusted_network(self.d, url))
+
+ def test_prefix_wild_trusted_network(self):
+ # Ensure trusted_network returns true when the prefix matches *.host.
+ url = "git://git.Someserver.org/foo;rev=1"
+ self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org *.someserver.org server2.org server3.org")
+ self.assertTrue(bb.fetch.trusted_network(self.d, url))
+
+ def test_two_prefix_wild_trusted_network(self):
+ # Ensure trusted_network returns true when the prefix matches *.host.
+ url = "git://something.git.Someserver.org/foo;rev=1"
+ self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org *.someserver.org server2.org server3.org")
+ self.assertTrue(bb.fetch.trusted_network(self.d, url))
+
+ def test_port_trusted_network(self):
+ # Ensure trusted_network returns True, even if the url specifies a port.
+ url = "git://someserver.org:8080/foo;rev=1"
+ self.d.setVar("BB_ALLOWED_NETWORKS", "someserver.org")
+ self.assertTrue(bb.fetch.trusted_network(self.d, url))
+
+ def test_untrusted_network(self):
+ # Ensure trusted_network returns False when the host is NOT in the list.
+ url = "git://someserver.org/foo;rev=1"
+ self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org server2.org server3.org")
+ self.assertFalse(bb.fetch.trusted_network(self.d, url))
+
+ def test_wild_untrusted_network(self):
+ # Ensure trusted_network returns False when the host is NOT in the list.
+ url = "git://*.someserver.org/foo;rev=1"
+ self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org server2.org server3.org")
+ self.assertFalse(bb.fetch.trusted_network(self.d, url))
+
+class URLHandle(unittest.TestCase):
+
+ datatable = {
+ "http://www.google.com/index.html" : ('http', 'www.google.com', '/index.html', '', '', {}),
+ "cvs://anoncvs@cvs.handhelds.org/cvs;module=familiar/dist/ipkg" : ('cvs', 'cvs.handhelds.org', '/cvs', 'anoncvs', '', {'module': 'familiar/dist/ipkg'}),
+ "cvs://anoncvs:anonymous@cvs.handhelds.org/cvs;tag=V0-99-81;module=familiar/dist/ipkg" : ('cvs', 'cvs.handhelds.org', '/cvs', 'anoncvs', 'anonymous', collections.OrderedDict([('tag', 'V0-99-81'), ('module', 'familiar/dist/ipkg')])),
+ "git://git.openembedded.org/bitbake;branch=@foo" : ('git', 'git.openembedded.org', '/bitbake', '', '', {'branch': '@foo'}),
+ "file://somelocation;someparam=1": ('file', '', 'somelocation', '', '', {'someparam': '1'}),
+ }
+ # we require a pathname to encodeurl but users can still pass such urls to
+ # decodeurl and we need to handle them
+ decodedata = datatable.copy()
+ decodedata.update({
+ "http://somesite.net;someparam=1": ('http', 'somesite.net', '/', '', '', {'someparam': '1'}),
+ })
+
+ def test_decodeurl(self):
+ for k, v in self.decodedata.items():
+ result = bb.fetch.decodeurl(k)
+ self.assertEqual(result, v)
+
+ def test_encodeurl(self):
+ for k, v in self.datatable.items():
+ result = bb.fetch.encodeurl(v)
+ self.assertEqual(result, k)
+
+class FetchLatestVersionTest(FetcherTest):
+
+ test_git_uris = {
+ # version pattern "X.Y.Z"
+ ("mx-1.0", "git://github.com/clutter-project/mx.git;branch=mx-1.4", "9b1db6b8060bd00b121a692f942404a24ae2960f", "")
+ : "1.99.4",
+ # version pattern "vX.Y"
+ ("mtd-utils", "git://git.infradead.org/mtd-utils.git", "ca39eb1d98e736109c64ff9c1aa2a6ecca222d8f", "")
+ : "1.5.0",
+ # version pattern "pkg_name-X.Y"
+ ("presentproto", "git://anongit.freedesktop.org/git/xorg/proto/presentproto", "24f3a56e541b0a9e6c6ee76081f441221a120ef9", "")
+ : "1.0",
+ # version pattern "pkg_name-vX.Y.Z"
+ ("dtc", "git://git.qemu.org/dtc.git", "65cc4d2748a2c2e6f27f1cf39e07a5dbabd80ebf", "")
+ : "1.4.0",
+ # combination version pattern
+ ("sysprof", "git://gitlab.gnome.org/GNOME/sysprof.git;protocol=https", "cd44ee6644c3641507fb53b8a2a69137f2971219", "")
+ : "1.2.0",
+ ("u-boot-mkimage", "git://git.denx.de/u-boot.git;branch=master;protocol=git", "62c175fbb8a0f9a926c88294ea9f7e88eb898f6c", "")
+ : "2014.01",
+ # version pattern "yyyymmdd"
+ ("mobile-broadband-provider-info", "git://gitlab.gnome.org/GNOME/mobile-broadband-provider-info.git;protocol=https", "4ed19e11c2975105b71b956440acdb25d46a347d", "")
+ : "20120614",
+ # packages with a valid UPSTREAM_CHECK_GITTAGREGEX
+ ("xf86-video-omap", "git://anongit.freedesktop.org/xorg/driver/xf86-video-omap", "ae0394e687f1a77e966cf72f895da91840dffb8f", "(?P<pver>(\d+\.(\d\.?)*))")
+ : "0.4.3",
+ ("build-appliance-image", "git://git.yoctoproject.org/poky", "b37dd451a52622d5b570183a81583cc34c2ff555", "(?P<pver>(([0-9][\.|_]?)+[0-9]))")
+ : "11.0.0",
+ ("chkconfig-alternatives-native", "git://github.com/kergoth/chkconfig;branch=sysroot", "cd437ecbd8986c894442f8fce1e0061e20f04dee", "chkconfig\-(?P<pver>((\d+[\.\-_]*)+))")
+ : "1.3.59",
+ ("remake", "git://github.com/rocky/remake.git", "f05508e521987c8494c92d9c2871aec46307d51d", "(?P<pver>(\d+\.(\d+\.)*\d*(\+dbg\d+(\.\d+)*)*))")
+ : "3.82+dbg0.9",
+ }
+
+ test_wget_uris = {
+ # packages with versions inside directory name
+ ("util-linux", "http://kernel.org/pub/linux/utils/util-linux/v2.23/util-linux-2.24.2.tar.bz2", "", "")
+ : "2.24.2",
+ ("enchant", "http://www.abisource.com/downloads/enchant/1.6.0/enchant-1.6.0.tar.gz", "", "")
+ : "1.6.0",
+ ("cmake", "http://www.cmake.org/files/v2.8/cmake-2.8.12.1.tar.gz", "", "")
+ : "2.8.12.1",
+ # packages with versions only in current directory
+ ("eglic", "http://downloads.yoctoproject.org/releases/eglibc/eglibc-2.18-svnr23787.tar.bz2", "", "")
+ : "2.19",
+ ("gnu-config", "http://downloads.yoctoproject.org/releases/gnu-config/gnu-config-20120814.tar.bz2", "", "")
+ : "20120814",
+ # packages with "99" in the name of possible version
+ ("pulseaudio", "http://freedesktop.org/software/pulseaudio/releases/pulseaudio-4.0.tar.xz", "", "")
+ : "5.0",
+ ("xserver-xorg", "http://xorg.freedesktop.org/releases/individual/xserver/xorg-server-1.15.1.tar.bz2", "", "")
+ : "1.15.1",
+ # packages with valid UPSTREAM_CHECK_URI and UPSTREAM_CHECK_REGEX
+ ("cups", "http://www.cups.org/software/1.7.2/cups-1.7.2-source.tar.bz2", "https://github.com/apple/cups/releases", "(?P<name>cups\-)(?P<pver>((\d+[\.\-_]*)+))\-source\.tar\.gz")
+ : "2.0.0",
+ ("db", "http://download.oracle.com/berkeley-db/db-5.3.21.tar.gz", "http://www.oracle.com/technetwork/products/berkeleydb/downloads/index-082944.html", "http://download.oracle.com/otn/berkeley-db/(?P<name>db-)(?P<pver>((\d+[\.\-_]*)+))\.tar\.gz")
+ : "6.1.19",
+ }
+
+ @skipIfNoNetwork()
+ def test_git_latest_versionstring(self):
+ for k, v in self.test_git_uris.items():
+ self.d.setVar("PN", k[0])
+ self.d.setVar("SRCREV", k[2])
+ self.d.setVar("UPSTREAM_CHECK_GITTAGREGEX", k[3])
+ ud = bb.fetch2.FetchData(k[1], self.d)
+ pupver= ud.method.latest_versionstring(ud, self.d)
+ verstring = pupver[0]
+ self.assertTrue(verstring, msg="Could not find upstream version for %s" % k[0])
+ r = bb.utils.vercmp_string(v, verstring)
+ self.assertTrue(r == -1 or r == 0, msg="Package %s, version: %s <= %s" % (k[0], v, verstring))
+
+ @skipIfNoNetwork()
+ def test_wget_latest_versionstring(self):
+ for k, v in self.test_wget_uris.items():
+ self.d.setVar("PN", k[0])
+ self.d.setVar("UPSTREAM_CHECK_URI", k[2])
+ self.d.setVar("UPSTREAM_CHECK_REGEX", k[3])
+ ud = bb.fetch2.FetchData(k[1], self.d)
+ pupver = ud.method.latest_versionstring(ud, self.d)
+ verstring = pupver[0]
+ self.assertTrue(verstring, msg="Could not find upstream version for %s" % k[0])
+ r = bb.utils.vercmp_string(v, verstring)
+ self.assertTrue(r == -1 or r == 0, msg="Package %s, version: %s <= %s" % (k[0], v, verstring))
+
+
+class FetchCheckStatusTest(FetcherTest):
+ test_wget_uris = ["http://www.cups.org/software/1.7.2/cups-1.7.2-source.tar.bz2",
+ "http://www.cups.org/",
+ "http://downloads.yoctoproject.org/releases/sato/sato-engine-0.1.tar.gz",
+ "http://downloads.yoctoproject.org/releases/sato/sato-engine-0.2.tar.gz",
+ "http://downloads.yoctoproject.org/releases/sato/sato-engine-0.3.tar.gz",
+ "https://yoctoproject.org/",
+ "https://yoctoproject.org/documentation",
+ "http://downloads.yoctoproject.org/releases/opkg/opkg-0.1.7.tar.gz",
+ "http://downloads.yoctoproject.org/releases/opkg/opkg-0.3.0.tar.gz",
+ "ftp://sourceware.org/pub/libffi/libffi-1.20.tar.gz",
+ "http://ftp.gnu.org/gnu/autoconf/autoconf-2.60.tar.gz",
+ "https://ftp.gnu.org/gnu/chess/gnuchess-5.08.tar.gz",
+ "https://ftp.gnu.org/gnu/gmp/gmp-4.0.tar.gz",
+ # GitHub releases are hosted on Amazon S3, which doesn't support HEAD
+ "https://github.com/kergoth/tslib/releases/download/1.1/tslib-1.1.tar.xz"
+ ]
+
+ @skipIfNoNetwork()
+ def test_wget_checkstatus(self):
+ fetch = bb.fetch2.Fetch(self.test_wget_uris, self.d)
+ for u in self.test_wget_uris:
+ with self.subTest(url=u):
+ ud = fetch.ud[u]
+ m = ud.method
+ ret = m.checkstatus(fetch, ud, self.d)
+ self.assertTrue(ret, msg="URI %s, can't check status" % (u))
+
+ @skipIfNoNetwork()
+ def test_wget_checkstatus_connection_cache(self):
+ from bb.fetch2 import FetchConnectionCache
+
+ connection_cache = FetchConnectionCache()
+ fetch = bb.fetch2.Fetch(self.test_wget_uris, self.d,
+ connection_cache = connection_cache)
+
+ for u in self.test_wget_uris:
+ with self.subTest(url=u):
+ ud = fetch.ud[u]
+ m = ud.method
+ ret = m.checkstatus(fetch, ud, self.d)
+ self.assertTrue(ret, msg="URI %s, can't check status" % (u))
+
+ connection_cache.close_connections()
+
+
+class GitMakeShallowTest(FetcherTest):
+ def setUp(self):
+ FetcherTest.setUp(self)
+ self.gitdir = os.path.join(self.tempdir, 'gitshallow')
+ bb.utils.mkdirhier(self.gitdir)
+ bb.process.run('git init', cwd=self.gitdir)
+
+ def assertRefs(self, expected_refs):
+ actual_refs = self.git(['for-each-ref', '--format=%(refname)']).splitlines()
+ full_expected = self.git(['rev-parse', '--symbolic-full-name'] + expected_refs).splitlines()
+ self.assertEqual(sorted(full_expected), sorted(actual_refs))
+
+ def assertRevCount(self, expected_count, args=None):
+ if args is None:
+ args = ['HEAD']
+ revs = self.git(['rev-list'] + args)
+ actual_count = len(revs.splitlines())
+ self.assertEqual(expected_count, actual_count, msg='Object count `%d` is not the expected `%d`' % (actual_count, expected_count))
+
+ def git(self, cmd):
+ if isinstance(cmd, str):
+ cmd = 'git ' + cmd
+ else:
+ cmd = ['git'] + cmd
+ return bb.process.run(cmd, cwd=self.gitdir)[0]
+
+ def make_shallow(self, args=None):
+ if args is None:
+ args = ['HEAD']
+ return bb.process.run([bb.fetch2.git.Git.make_shallow_path] + args, cwd=self.gitdir)
+
+ def add_empty_file(self, path, msg=None):
+ if msg is None:
+ msg = path
+ open(os.path.join(self.gitdir, path), 'w').close()
+ self.git(['add', path])
+ self.git(['commit', '-m', msg, path])
+
+ def test_make_shallow_single_branch_no_merge(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.assertRevCount(2)
+ self.make_shallow()
+ self.assertRevCount(1)
+
+ def test_make_shallow_single_branch_one_merge(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('checkout -b a_branch')
+ self.add_empty_file('c')
+ self.git('checkout master')
+ self.add_empty_file('d')
+ self.git('merge --no-ff --no-edit a_branch')
+ self.git('branch -d a_branch')
+ self.add_empty_file('e')
+ self.assertRevCount(6)
+ self.make_shallow(['HEAD~2'])
+ self.assertRevCount(5)
+
+ def test_make_shallow_at_merge(self):
+ self.add_empty_file('a')
+ self.git('checkout -b a_branch')
+ self.add_empty_file('b')
+ self.git('checkout master')
+ self.git('merge --no-ff --no-edit a_branch')
+ self.git('branch -d a_branch')
+ self.assertRevCount(3)
+ self.make_shallow()
+ self.assertRevCount(1)
+
+ def test_make_shallow_annotated_tag(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('tag -a -m a_tag a_tag')
+ self.assertRevCount(2)
+ self.make_shallow(['a_tag'])
+ self.assertRevCount(1)
+
+ def test_make_shallow_multi_ref(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('checkout -b a_branch')
+ self.add_empty_file('c')
+ self.git('checkout master')
+ self.add_empty_file('d')
+ self.git('checkout -b a_branch_2')
+ self.add_empty_file('a_tag')
+ self.git('tag a_tag')
+ self.git('checkout master')
+ self.git('branch -D a_branch_2')
+ self.add_empty_file('e')
+ self.assertRevCount(6, ['--all'])
+ self.make_shallow()
+ self.assertRevCount(5, ['--all'])
+
+ def test_make_shallow_multi_ref_trim(self):
+ self.add_empty_file('a')
+ self.git('checkout -b a_branch')
+ self.add_empty_file('c')
+ self.git('checkout master')
+ self.assertRevCount(1)
+ self.assertRevCount(2, ['--all'])
+ self.assertRefs(['master', 'a_branch'])
+ self.make_shallow(['-r', 'master', 'HEAD'])
+ self.assertRevCount(1, ['--all'])
+ self.assertRefs(['master'])
+
+ def test_make_shallow_noop(self):
+ self.add_empty_file('a')
+ self.assertRevCount(1)
+ self.make_shallow()
+ self.assertRevCount(1)
+
+ @skipIfNoNetwork()
+ def test_make_shallow_bitbake(self):
+ self.git('remote add origin https://github.com/openembedded/bitbake')
+ self.git('fetch --tags origin')
+ orig_revs = len(self.git('rev-list --all').splitlines())
+ self.make_shallow(['refs/tags/1.10.0'])
+ self.assertRevCount(orig_revs - 1746, ['--all'])
+
+class GitShallowTest(FetcherTest):
+ def setUp(self):
+ FetcherTest.setUp(self)
+ self.gitdir = os.path.join(self.tempdir, 'git')
+ self.srcdir = os.path.join(self.tempdir, 'gitsource')
+
+ bb.utils.mkdirhier(self.srcdir)
+ self.git('init', cwd=self.srcdir)
+ self.d.setVar('WORKDIR', self.tempdir)
+ self.d.setVar('S', self.gitdir)
+ self.d.delVar('PREMIRRORS')
+ self.d.delVar('MIRRORS')
+
+ uri = 'git://%s;protocol=file;subdir=${S}' % self.srcdir
+ self.d.setVar('SRC_URI', uri)
+ self.d.setVar('SRCREV', '${AUTOREV}')
+ self.d.setVar('AUTOREV', '${@bb.fetch2.get_autorev(d)}')
+
+ self.d.setVar('BB_GIT_SHALLOW', '1')
+ self.d.setVar('BB_GENERATE_MIRROR_TARBALLS', '0')
+ self.d.setVar('BB_GENERATE_SHALLOW_TARBALLS', '1')
+
+ def assertRefs(self, expected_refs, cwd=None):
+ if cwd is None:
+ cwd = self.gitdir
+ actual_refs = self.git(['for-each-ref', '--format=%(refname)'], cwd=cwd).splitlines()
+ full_expected = self.git(['rev-parse', '--symbolic-full-name'] + expected_refs, cwd=cwd).splitlines()
+ self.assertEqual(sorted(set(full_expected)), sorted(set(actual_refs)))
+
+ def assertRevCount(self, expected_count, args=None, cwd=None):
+ if args is None:
+ args = ['HEAD']
+ if cwd is None:
+ cwd = self.gitdir
+ revs = self.git(['rev-list'] + args, cwd=cwd)
+ actual_count = len(revs.splitlines())
+ self.assertEqual(expected_count, actual_count, msg='Object count `%d` is not the expected `%d`' % (actual_count, expected_count))
+
+ def git(self, cmd, cwd=None):
+ if isinstance(cmd, str):
+ cmd = 'git ' + cmd
+ else:
+ cmd = ['git'] + cmd
+ if cwd is None:
+ cwd = self.gitdir
+ return bb.process.run(cmd, cwd=cwd)[0]
+
+ def add_empty_file(self, path, cwd=None, msg=None):
+ if msg is None:
+ msg = path
+ if cwd is None:
+ cwd = self.srcdir
+ open(os.path.join(cwd, path), 'w').close()
+ self.git(['add', path], cwd)
+ self.git(['commit', '-m', msg, path], cwd)
+
+ def fetch(self, uri=None):
+ if uri is None:
+ uris = self.d.getVar('SRC_URI', True).split()
+ uri = uris[0]
+ d = self.d
+ else:
+ d = self.d.createCopy()
+ d.setVar('SRC_URI', uri)
+ uri = d.expand(uri)
+ uris = [uri]
+
+ fetcher = bb.fetch2.Fetch(uris, d)
+ fetcher.download()
+ ud = fetcher.ud[uri]
+ return fetcher, ud
+
+ def fetch_and_unpack(self, uri=None):
+ fetcher, ud = self.fetch(uri)
+ fetcher.unpack(self.d.getVar('WORKDIR'))
+ assert os.path.exists(self.d.getVar('S'))
+ return fetcher, ud
+
+ def fetch_shallow(self, uri=None, disabled=False, keepclone=False):
+ """Fetch a uri, generating a shallow tarball, then unpack using it"""
+ fetcher, ud = self.fetch_and_unpack(uri)
+ assert os.path.exists(ud.clonedir), 'Git clone in DLDIR (%s) does not exist for uri %s' % (ud.clonedir, uri)
+
+ # Confirm that the unpacked repo is unshallow
+ if not disabled:
+ assert os.path.exists(os.path.join(self.dldir, ud.mirrortarballs[0]))
+
+ # fetch and unpack, from the shallow tarball
+ bb.utils.remove(self.gitdir, recurse=True)
+ bb.utils.remove(ud.clonedir, recurse=True)
+ bb.utils.remove(ud.clonedir.replace('gitsource', 'gitsubmodule'), recurse=True)
+
+ # confirm that the unpacked repo is used when no git clone or git
+ # mirror tarball is available
+ fetcher, ud = self.fetch_and_unpack(uri)
+ if not disabled:
+ assert os.path.exists(os.path.join(self.gitdir, '.git', 'shallow')), 'Unpacked git repository at %s is not shallow' % self.gitdir
+ else:
+ assert not os.path.exists(os.path.join(self.gitdir, '.git', 'shallow')), 'Unpacked git repository at %s is shallow' % self.gitdir
+ return fetcher, ud
+
+ def test_shallow_disabled(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ self.d.setVar('BB_GIT_SHALLOW', '0')
+ self.fetch_shallow(disabled=True)
+ self.assertRevCount(2)
+
+ def test_shallow_nobranch(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ srcrev = self.git('rev-parse HEAD', cwd=self.srcdir).strip()
+ self.d.setVar('SRCREV', srcrev)
+ uri = self.d.getVar('SRC_URI', True).split()[0]
+ uri = '%s;nobranch=1;bare=1' % uri
+
+ self.fetch_shallow(uri)
+ self.assertRevCount(1)
+
+ # shallow refs are used to ensure the srcrev sticks around when we
+ # have no other branches referencing it
+ self.assertRefs(['refs/shallow/default'])
+
+ def test_shallow_default_depth_1(self):
+ # Create initial git repo
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ self.fetch_shallow()
+ self.assertRevCount(1)
+
+ def test_shallow_depth_0_disables(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0')
+ self.fetch_shallow(disabled=True)
+ self.assertRevCount(2)
+
+ def test_shallow_depth_default_override(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH', '2')
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH_default', '1')
+ self.fetch_shallow()
+ self.assertRevCount(1)
+
+ def test_shallow_depth_default_override_disable(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.add_empty_file('c')
+ self.assertRevCount(3, cwd=self.srcdir)
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0')
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH_default', '2')
+ self.fetch_shallow()
+ self.assertRevCount(2)
+
+ def test_current_shallow_out_of_date_clone(self):
+ # Create initial git repo
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.add_empty_file('c')
+ self.assertRevCount(3, cwd=self.srcdir)
+
+ # Clone and generate mirror tarball
+ fetcher, ud = self.fetch()
+
+ # Ensure we have a current mirror tarball, but an out of date clone
+ self.git('update-ref refs/heads/master refs/heads/master~1', cwd=ud.clonedir)
+ self.assertRevCount(2, cwd=ud.clonedir)
+
+ # Fetch and unpack, from the current tarball, not the out of date clone
+ bb.utils.remove(self.gitdir, recurse=True)
+ fetcher, ud = self.fetch()
+ fetcher.unpack(self.d.getVar('WORKDIR'))
+ self.assertRevCount(1)
+
+ def test_shallow_single_branch_no_merge(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ self.fetch_shallow()
+ self.assertRevCount(1)
+ assert os.path.exists(os.path.join(self.gitdir, 'a'))
+ assert os.path.exists(os.path.join(self.gitdir, 'b'))
+
+ def test_shallow_no_dangling(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ self.fetch_shallow()
+ self.assertRevCount(1)
+ assert not self.git('fsck --dangling')
+
+ def test_shallow_srcrev_branch_truncation(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ b_commit = self.git('rev-parse HEAD', cwd=self.srcdir).rstrip()
+ self.add_empty_file('c')
+ self.assertRevCount(3, cwd=self.srcdir)
+
+ self.d.setVar('SRCREV', b_commit)
+ self.fetch_shallow()
+
+ # The 'c' commit was removed entirely, and 'a' was removed from history
+ self.assertRevCount(1, ['--all'])
+ self.assertEqual(self.git('rev-parse HEAD').strip(), b_commit)
+ assert os.path.exists(os.path.join(self.gitdir, 'a'))
+ assert os.path.exists(os.path.join(self.gitdir, 'b'))
+ assert not os.path.exists(os.path.join(self.gitdir, 'c'))
+
+ def test_shallow_ref_pruning(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('branch a_branch', cwd=self.srcdir)
+ self.assertRefs(['master', 'a_branch'], cwd=self.srcdir)
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ self.fetch_shallow()
+
+ self.assertRefs(['master', 'origin/master'])
+ self.assertRevCount(1)
+
+ def test_shallow_submodules(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+
+ smdir = os.path.join(self.tempdir, 'gitsubmodule')
+ bb.utils.mkdirhier(smdir)
+ self.git('init', cwd=smdir)
+ # Make this look like it was cloned from a remote...
+ self.git('config --add remote.origin.url "%s"' % smdir, cwd=smdir)
+ self.git('config --add remote.origin.fetch "+refs/heads/*:refs/remotes/origin/*"', cwd=smdir)
+ self.add_empty_file('asub', cwd=smdir)
+ self.add_empty_file('bsub', cwd=smdir)
+
+ self.git('submodule init', cwd=self.srcdir)
+ self.git('submodule add file://%s' % smdir, cwd=self.srcdir)
+ self.git('submodule update', cwd=self.srcdir)
+ self.git('commit -m submodule -a', cwd=self.srcdir)
+
+ uri = 'gitsm://%s;protocol=file;subdir=${S}' % self.srcdir
+ fetcher, ud = self.fetch_shallow(uri)
+
+ # Verify the main repository is shallow
+ self.assertRevCount(1)
+
+ # Verify the gitsubmodule directory is present
+ assert os.listdir(os.path.join(self.gitdir, 'gitsubmodule'))
+
+ # Verify the submodule is also shallow
+ self.assertRevCount(1, cwd=os.path.join(self.gitdir, 'gitsubmodule'))
+
+
+ if any(os.path.exists(os.path.join(p, 'git-annex')) for p in os.environ.get('PATH').split(':')):
+ def test_shallow_annex(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('annex init', cwd=self.srcdir)
+ open(os.path.join(self.srcdir, 'c'), 'w').close()
+ self.git('annex add c', cwd=self.srcdir)
+ self.git('commit -m annex-c -a', cwd=self.srcdir)
+ bb.process.run('chmod u+w -R %s' % os.path.join(self.srcdir, '.git', 'annex'))
+
+ uri = 'gitannex://%s;protocol=file;subdir=${S}' % self.srcdir
+ fetcher, ud = self.fetch_shallow(uri)
+
+ self.assertRevCount(1)
+ assert './.git/annex/' in bb.process.run('tar -tzf %s' % os.path.join(self.dldir, ud.mirrortarballs[0]))[0]
+ assert os.path.exists(os.path.join(self.gitdir, 'c'))
+
+ def test_shallow_multi_one_uri(self):
+ # Create initial git repo
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('checkout -b a_branch', cwd=self.srcdir)
+ self.add_empty_file('c')
+ self.add_empty_file('d')
+ self.git('checkout master', cwd=self.srcdir)
+ self.git('tag v0.0 a_branch', cwd=self.srcdir)
+ self.add_empty_file('e')
+ self.git('merge --no-ff --no-edit a_branch', cwd=self.srcdir)
+ self.add_empty_file('f')
+ self.assertRevCount(7, cwd=self.srcdir)
+
+ uri = self.d.getVar('SRC_URI', True).split()[0]
+ uri = '%s;branch=master,a_branch;name=master,a_branch' % uri
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0')
+ self.d.setVar('BB_GIT_SHALLOW_REVS', 'v0.0')
+ self.d.setVar('SRCREV_master', '${AUTOREV}')
+ self.d.setVar('SRCREV_a_branch', '${AUTOREV}')
+
+ self.fetch_shallow(uri)
+
+ self.assertRevCount(5)
+ self.assertRefs(['master', 'origin/master', 'origin/a_branch'])
+
+ def test_shallow_multi_one_uri_depths(self):
+ # Create initial git repo
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('checkout -b a_branch', cwd=self.srcdir)
+ self.add_empty_file('c')
+ self.add_empty_file('d')
+ self.git('checkout master', cwd=self.srcdir)
+ self.add_empty_file('e')
+ self.git('merge --no-ff --no-edit a_branch', cwd=self.srcdir)
+ self.add_empty_file('f')
+ self.assertRevCount(7, cwd=self.srcdir)
+
+ uri = self.d.getVar('SRC_URI', True).split()[0]
+ uri = '%s;branch=master,a_branch;name=master,a_branch' % uri
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0')
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH_master', '3')
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH_a_branch', '1')
+ self.d.setVar('SRCREV_master', '${AUTOREV}')
+ self.d.setVar('SRCREV_a_branch', '${AUTOREV}')
+
+ self.fetch_shallow(uri)
+
+ self.assertRevCount(4, ['--all'])
+ self.assertRefs(['master', 'origin/master', 'origin/a_branch'])
+
+ def test_shallow_clone_preferred_over_shallow(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+
+ # Fetch once to generate the shallow tarball
+ fetcher, ud = self.fetch()
+ assert os.path.exists(os.path.join(self.dldir, ud.mirrortarballs[0]))
+
+ # Fetch and unpack with both the clonedir and shallow tarball available
+ bb.utils.remove(self.gitdir, recurse=True)
+ fetcher, ud = self.fetch_and_unpack()
+
+ # The unpacked tree should *not* be shallow
+ self.assertRevCount(2)
+ assert not os.path.exists(os.path.join(self.gitdir, '.git', 'shallow'))
+
+ def test_shallow_mirrors(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+
+ # Fetch once to generate the shallow tarball
+ fetcher, ud = self.fetch()
+ mirrortarball = ud.mirrortarballs[0]
+ assert os.path.exists(os.path.join(self.dldir, mirrortarball))
+
+ # Set up the mirror
+ mirrordir = os.path.join(self.tempdir, 'mirror')
+ bb.utils.mkdirhier(mirrordir)
+ self.d.setVar('PREMIRRORS', 'git://.*/.* file://%s/\n' % mirrordir)
+
+ os.rename(os.path.join(self.dldir, mirrortarball),
+ os.path.join(mirrordir, mirrortarball))
+
+ # Fetch from the mirror
+ bb.utils.remove(self.dldir, recurse=True)
+ bb.utils.remove(self.gitdir, recurse=True)
+ self.fetch_and_unpack()
+ self.assertRevCount(1)
+
+ def test_shallow_invalid_depth(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH', '-12')
+ with self.assertRaises(bb.fetch2.FetchError):
+ self.fetch()
+
+ def test_shallow_invalid_depth_default(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH_default', '-12')
+ with self.assertRaises(bb.fetch2.FetchError):
+ self.fetch()
+
+ def test_shallow_extra_refs(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('branch a_branch', cwd=self.srcdir)
+ self.assertRefs(['master', 'a_branch'], cwd=self.srcdir)
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ self.d.setVar('BB_GIT_SHALLOW_EXTRA_REFS', 'refs/heads/a_branch')
+ self.fetch_shallow()
+
+ self.assertRefs(['master', 'origin/master', 'origin/a_branch'])
+ self.assertRevCount(1)
+
+ def test_shallow_extra_refs_wildcard(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('branch a_branch', cwd=self.srcdir)
+ self.git('tag v1.0', cwd=self.srcdir)
+ self.assertRefs(['master', 'a_branch', 'v1.0'], cwd=self.srcdir)
+ self.assertRevCount(2, cwd=self.srcdir)
+
+ self.d.setVar('BB_GIT_SHALLOW_EXTRA_REFS', 'refs/tags/*')
+ self.fetch_shallow()
+
+ self.assertRefs(['master', 'origin/master', 'v1.0'])
+ self.assertRevCount(1)
+
+ def test_shallow_missing_extra_refs(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+
+ self.d.setVar('BB_GIT_SHALLOW_EXTRA_REFS', 'refs/heads/foo')
+ with self.assertRaises(bb.fetch2.FetchError):
+ self.fetch()
+
+ def test_shallow_missing_extra_refs_wildcard(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+
+ self.d.setVar('BB_GIT_SHALLOW_EXTRA_REFS', 'refs/tags/*')
+ self.fetch()
+
+ def test_shallow_remove_revs(self):
+ # Create initial git repo
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+ self.git('checkout -b a_branch', cwd=self.srcdir)
+ self.add_empty_file('c')
+ self.add_empty_file('d')
+ self.git('checkout master', cwd=self.srcdir)
+ self.git('tag v0.0 a_branch', cwd=self.srcdir)
+ self.add_empty_file('e')
+ self.git('merge --no-ff --no-edit a_branch', cwd=self.srcdir)
+ self.git('branch -d a_branch', cwd=self.srcdir)
+ self.add_empty_file('f')
+ self.assertRevCount(7, cwd=self.srcdir)
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0')
+ self.d.setVar('BB_GIT_SHALLOW_REVS', 'v0.0')
+
+ self.fetch_shallow()
+
+ self.assertRevCount(5)
+
+ def test_shallow_invalid_revs(self):
+ self.add_empty_file('a')
+ self.add_empty_file('b')
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0')
+ self.d.setVar('BB_GIT_SHALLOW_REVS', 'v0.0')
+
+ with self.assertRaises(bb.fetch2.FetchError):
+ self.fetch()
+
+ @skipIfNoNetwork()
+ def test_bitbake(self):
+ self.git('remote add --mirror=fetch origin git://github.com/openembedded/bitbake', cwd=self.srcdir)
+ self.git('config core.bare true', cwd=self.srcdir)
+ self.git('fetch', cwd=self.srcdir)
+
+ self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0')
+ # Note that the 1.10.0 tag is annotated, so this also tests
+ # reference of an annotated vs unannotated tag
+ self.d.setVar('BB_GIT_SHALLOW_REVS', '1.10.0')
+
+ self.fetch_shallow()
+
+ # Confirm that the history of 1.10.0 was removed
+ orig_revs = len(self.git('rev-list master', cwd=self.srcdir).splitlines())
+ revs = len(self.git('rev-list master').splitlines())
+ self.assertNotEqual(orig_revs, revs)
+ self.assertRefs(['master', 'origin/master'])
+ self.assertRevCount(orig_revs - 1758)
+
+ def test_that_unpack_throws_an_error_when_the_git_clone_nor_shallow_tarball_exist(self):
+ self.add_empty_file('a')
+ fetcher, ud = self.fetch()
+ bb.utils.remove(self.gitdir, recurse=True)
+ bb.utils.remove(self.dldir, recurse=True)
+
+ with self.assertRaises(bb.fetch2.UnpackError) as context:
+ fetcher.unpack(self.d.getVar('WORKDIR'))
+
+ self.assertIn("No up to date source found", context.exception.msg)
+ self.assertIn("clone directory not available or not up to date", context.exception.msg)
+
+ @skipIfNoNetwork()
+ def test_that_unpack_does_work_when_using_git_shallow_tarball_but_tarball_is_not_available(self):
+ self.d.setVar('SRCREV', 'e5939ff608b95cdd4d0ab0e1935781ab9a276ac0')
+ self.d.setVar('BB_GIT_SHALLOW', '1')
+ self.d.setVar('BB_GENERATE_SHALLOW_TARBALLS', '1')
+ fetcher = bb.fetch.Fetch(["git://git.yoctoproject.org/fstests"], self.d)
+ fetcher.download()
+
+ bb.utils.remove(self.dldir + "/*.tar.gz")
+ fetcher.unpack(self.unpackdir)
+
+ dir = os.listdir(self.unpackdir + "/git/")
+ self.assertIn("fstests.doap", dir)
diff --git a/external/poky/bitbake/lib/bb/tests/parse.py b/external/poky/bitbake/lib/bb/tests/parse.py
new file mode 100644
index 00000000..1bc47405
--- /dev/null
+++ b/external/poky/bitbake/lib/bb/tests/parse.py
@@ -0,0 +1,189 @@
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# BitBake Test for lib/bb/parse/
+#
+# Copyright (C) 2015 Richard Purdie
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+import unittest
+import tempfile
+import logging
+import bb
+import os
+
+logger = logging.getLogger('BitBake.TestParse')
+
+import bb.parse
+import bb.data
+import bb.siggen
+
+class ParseTest(unittest.TestCase):
+
+ testfile = """
+A = "1"
+B = "2"
+do_install() {
+ echo "hello"
+}
+
+C = "3"
+"""
+
+ def setUp(self):
+ self.origdir = os.getcwd()
+ self.d = bb.data.init()
+ bb.parse.siggen = bb.siggen.init(self.d)
+
+ def tearDown(self):
+ os.chdir(self.origdir)
+
+ def parsehelper(self, content, suffix = ".bb"):
+
+ f = tempfile.NamedTemporaryFile(suffix = suffix)
+ f.write(bytes(content, "utf-8"))
+ f.flush()
+ os.chdir(os.path.dirname(f.name))
+ return f
+
+ def test_parse_simple(self):
+ f = self.parsehelper(self.testfile)
+ d = bb.parse.handle(f.name, self.d)['']
+ self.assertEqual(d.getVar("A"), "1")
+ self.assertEqual(d.getVar("B"), "2")
+ self.assertEqual(d.getVar("C"), "3")
+
+ def test_parse_incomplete_function(self):
+ testfileB = self.testfile.replace("}", "")
+ f = self.parsehelper(testfileB)
+ with self.assertRaises(bb.parse.ParseError):
+ d = bb.parse.handle(f.name, self.d)['']
+
+ unsettest = """
+A = "1"
+B = "2"
+B[flag] = "3"
+
+unset A
+unset B[flag]
+"""
+
+ def test_parse_unset(self):
+ f = self.parsehelper(self.unsettest)
+ d = bb.parse.handle(f.name, self.d)['']
+ self.assertEqual(d.getVar("A"), None)
+ self.assertEqual(d.getVarFlag("A","flag"), None)
+ self.assertEqual(d.getVar("B"), "2")
+
+ exporttest = """
+A = "a"
+export B = "b"
+export C
+exportD = "d"
+"""
+
+ def test_parse_exports(self):
+ f = self.parsehelper(self.exporttest)
+ d = bb.parse.handle(f.name, self.d)['']
+ self.assertEqual(d.getVar("A"), "a")
+ self.assertIsNone(d.getVarFlag("A", "export"))
+ self.assertEqual(d.getVar("B"), "b")
+ self.assertEqual(d.getVarFlag("B", "export"), 1)
+ self.assertIsNone(d.getVar("C"))
+ self.assertEqual(d.getVarFlag("C", "export"), 1)
+ self.assertIsNone(d.getVar("D"))
+ self.assertIsNone(d.getVarFlag("D", "export"))
+ self.assertEqual(d.getVar("exportD"), "d")
+ self.assertIsNone(d.getVarFlag("exportD", "export"))
+
+
+ overridetest = """
+RRECOMMENDS_${PN} = "a"
+RRECOMMENDS_${PN}_libc = "b"
+OVERRIDES = "libc:${PN}"
+PN = "gtk+"
+"""
+
+ def test_parse_overrides(self):
+ f = self.parsehelper(self.overridetest)
+ d = bb.parse.handle(f.name, self.d)['']
+ self.assertEqual(d.getVar("RRECOMMENDS"), "b")
+ bb.data.expandKeys(d)
+ self.assertEqual(d.getVar("RRECOMMENDS"), "b")
+ d.setVar("RRECOMMENDS_gtk+", "c")
+ self.assertEqual(d.getVar("RRECOMMENDS"), "c")
+
+ overridetest2 = """
+EXTRA_OECONF = ""
+EXTRA_OECONF_class-target = "b"
+EXTRA_OECONF_append = " c"
+"""
+
+ def test_parse_overrides(self):
+ f = self.parsehelper(self.overridetest2)
+ d = bb.parse.handle(f.name, self.d)['']
+ d.appendVar("EXTRA_OECONF", " d")
+ d.setVar("OVERRIDES", "class-target")
+ self.assertEqual(d.getVar("EXTRA_OECONF"), "b c d")
+
+ overridetest3 = """
+DESCRIPTION = "A"
+DESCRIPTION_${PN}-dev = "${DESCRIPTION} B"
+PN = "bc"
+"""
+
+ def test_parse_combinations(self):
+ f = self.parsehelper(self.overridetest3)
+ d = bb.parse.handle(f.name, self.d)['']
+ bb.data.expandKeys(d)
+ self.assertEqual(d.getVar("DESCRIPTION_bc-dev"), "A B")
+ d.setVar("DESCRIPTION", "E")
+ d.setVar("DESCRIPTION_bc-dev", "C D")
+ d.setVar("OVERRIDES", "bc-dev")
+ self.assertEqual(d.getVar("DESCRIPTION"), "C D")
+
+
+ classextend = """
+VAR_var_override1 = "B"
+EXTRA = ":override1"
+OVERRIDES = "nothing${EXTRA}"
+
+BBCLASSEXTEND = "###CLASS###"
+"""
+ classextend_bbclass = """
+EXTRA = ""
+python () {
+ d.renameVar("VAR_var", "VAR_var2")
+}
+"""
+
+ #
+ # Test based upon a real world data corruption issue. One
+ # data store changing a variable poked through into a different data
+ # store. This test case replicates that issue where the value 'B' would
+ # become unset/disappear.
+ #
+ def test_parse_classextend_contamination(self):
+ cls = self.parsehelper(self.classextend_bbclass, suffix=".bbclass")
+ #clsname = os.path.basename(cls.name).replace(".bbclass", "")
+ self.classextend = self.classextend.replace("###CLASS###", cls.name)
+ f = self.parsehelper(self.classextend)
+ alldata = bb.parse.handle(f.name, self.d)
+ d1 = alldata['']
+ d2 = alldata[cls.name]
+ self.assertEqual(d1.getVar("VAR_var"), "B")
+ self.assertEqual(d2.getVar("VAR_var"), None)
+
diff --git a/external/poky/bitbake/lib/bb/tests/utils.py b/external/poky/bitbake/lib/bb/tests/utils.py
new file mode 100644
index 00000000..f1cd83a4
--- /dev/null
+++ b/external/poky/bitbake/lib/bb/tests/utils.py
@@ -0,0 +1,607 @@
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# BitBake Tests for utils.py
+#
+# Copyright (C) 2012 Richard Purdie
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+import unittest
+import bb
+import os
+import tempfile
+import re
+
+class VerCmpString(unittest.TestCase):
+
+ def test_vercmpstring(self):
+ result = bb.utils.vercmp_string('1', '2')
+ self.assertTrue(result < 0)
+ result = bb.utils.vercmp_string('2', '1')
+ self.assertTrue(result > 0)
+ result = bb.utils.vercmp_string('1', '1.0')
+ self.assertTrue(result < 0)
+ result = bb.utils.vercmp_string('1', '1.1')
+ self.assertTrue(result < 0)
+ result = bb.utils.vercmp_string('1.1', '1_p2')
+ self.assertTrue(result < 0)
+ result = bb.utils.vercmp_string('1.0', '1.0+1.1-beta1')
+ self.assertTrue(result < 0)
+ result = bb.utils.vercmp_string('1.1', '1.0+1.1-beta1')
+ self.assertTrue(result > 0)
+ result = bb.utils.vercmp_string('1.', '1.1')
+ self.assertTrue(result < 0)
+ result = bb.utils.vercmp_string('1.1', '1.')
+ self.assertTrue(result > 0)
+
+ def test_explode_dep_versions(self):
+ correctresult = {"foo" : ["= 1.10"]}
+ result = bb.utils.explode_dep_versions2("foo (= 1.10)")
+ self.assertEqual(result, correctresult)
+ result = bb.utils.explode_dep_versions2("foo (=1.10)")
+ self.assertEqual(result, correctresult)
+ result = bb.utils.explode_dep_versions2("foo ( = 1.10)")
+ self.assertEqual(result, correctresult)
+ result = bb.utils.explode_dep_versions2("foo ( =1.10)")
+ self.assertEqual(result, correctresult)
+ result = bb.utils.explode_dep_versions2("foo ( = 1.10 )")
+ self.assertEqual(result, correctresult)
+ result = bb.utils.explode_dep_versions2("foo ( =1.10 )")
+ self.assertEqual(result, correctresult)
+
+ def test_vercmp_string_op(self):
+ compareops = [('1', '1', '=', True),
+ ('1', '1', '==', True),
+ ('1', '1', '!=', False),
+ ('1', '1', '>', False),
+ ('1', '1', '<', False),
+ ('1', '1', '>=', True),
+ ('1', '1', '<=', True),
+ ('1', '0', '=', False),
+ ('1', '0', '==', False),
+ ('1', '0', '!=', True),
+ ('1', '0', '>', True),
+ ('1', '0', '<', False),
+ ('1', '0', '>>', True),
+ ('1', '0', '<<', False),
+ ('1', '0', '>=', True),
+ ('1', '0', '<=', False),
+ ('0', '1', '=', False),
+ ('0', '1', '==', False),
+ ('0', '1', '!=', True),
+ ('0', '1', '>', False),
+ ('0', '1', '<', True),
+ ('0', '1', '>>', False),
+ ('0', '1', '<<', True),
+ ('0', '1', '>=', False),
+ ('0', '1', '<=', True)]
+
+ for arg1, arg2, op, correctresult in compareops:
+ result = bb.utils.vercmp_string_op(arg1, arg2, op)
+ self.assertEqual(result, correctresult, 'vercmp_string_op("%s", "%s", "%s") != %s' % (arg1, arg2, op, correctresult))
+
+ # Check that clearly invalid operator raises an exception
+ self.assertRaises(bb.utils.VersionStringException, bb.utils.vercmp_string_op, '0', '0', '$')
+
+
+class Path(unittest.TestCase):
+ def test_unsafe_delete_path(self):
+ checkitems = [('/', True),
+ ('//', True),
+ ('///', True),
+ (os.getcwd().count(os.sep) * ('..' + os.sep), True),
+ (os.environ.get('HOME', '/home/test'), True),
+ ('/home/someone', True),
+ ('/home/other/', True),
+ ('/home/other/subdir', False),
+ ('', False)]
+ for arg1, correctresult in checkitems:
+ result = bb.utils._check_unsafe_delete_path(arg1)
+ self.assertEqual(result, correctresult, '_check_unsafe_delete_path("%s") != %s' % (arg1, correctresult))
+
+
+class EditMetadataFile(unittest.TestCase):
+ _origfile = """
+# A comment
+HELLO = "oldvalue"
+
+THIS = "that"
+
+# Another comment
+NOCHANGE = "samevalue"
+OTHER = 'anothervalue'
+
+MULTILINE = "a1 \\
+ a2 \\
+ a3"
+
+MULTILINE2 := " \\
+ b1 \\
+ b2 \\
+ b3 \\
+ "
+
+
+MULTILINE3 = " \\
+ c1 \\
+ c2 \\
+ c3 \\
+"
+
+do_functionname() {
+ command1 ${VAL1} ${VAL2}
+ command2 ${VAL3} ${VAL4}
+}
+"""
+ def _testeditfile(self, varvalues, compareto, dummyvars=None):
+ if dummyvars is None:
+ dummyvars = []
+ with tempfile.NamedTemporaryFile('w', delete=False) as tf:
+ tf.write(self._origfile)
+ tf.close()
+ try:
+ varcalls = []
+ def handle_file(varname, origvalue, op, newlines):
+ self.assertIn(varname, varvalues, 'Callback called for variable %s not in the list!' % varname)
+ self.assertNotIn(varname, dummyvars, 'Callback called for variable %s in dummy list!' % varname)
+ varcalls.append(varname)
+ return varvalues[varname]
+
+ bb.utils.edit_metadata_file(tf.name, varvalues.keys(), handle_file)
+ with open(tf.name) as f:
+ modfile = f.readlines()
+ # Ensure the output matches the expected output
+ self.assertEqual(compareto.splitlines(True), modfile)
+ # Ensure the callback function was called for every variable we asked for
+ # (plus allow testing behaviour when a requested variable is not present)
+ self.assertEqual(sorted(varvalues.keys()), sorted(varcalls + dummyvars))
+ finally:
+ os.remove(tf.name)
+
+
+ def test_edit_metadata_file_nochange(self):
+ # Test file doesn't get modified with nothing to do
+ self._testeditfile({}, self._origfile)
+ # Test file doesn't get modified with only dummy variables
+ self._testeditfile({'DUMMY1': ('should_not_set', None, 0, True),
+ 'DUMMY2': ('should_not_set_again', None, 0, True)}, self._origfile, dummyvars=['DUMMY1', 'DUMMY2'])
+ # Test file doesn't get modified with some the same values
+ self._testeditfile({'THIS': ('that', None, 0, True),
+ 'OTHER': ('anothervalue', None, 0, True),
+ 'MULTILINE3': (' c1 c2 c3 ', None, 4, False)}, self._origfile)
+
+ def test_edit_metadata_file_1(self):
+
+ newfile1 = """
+# A comment
+HELLO = "newvalue"
+
+THIS = "that"
+
+# Another comment
+NOCHANGE = "samevalue"
+OTHER = 'anothervalue'
+
+MULTILINE = "a1 \\
+ a2 \\
+ a3"
+
+MULTILINE2 := " \\
+ b1 \\
+ b2 \\
+ b3 \\
+ "
+
+
+MULTILINE3 = " \\
+ c1 \\
+ c2 \\
+ c3 \\
+"
+
+do_functionname() {
+ command1 ${VAL1} ${VAL2}
+ command2 ${VAL3} ${VAL4}
+}
+"""
+ self._testeditfile({'HELLO': ('newvalue', None, 4, True)}, newfile1)
+
+
+ def test_edit_metadata_file_2(self):
+
+ newfile2 = """
+# A comment
+HELLO = "oldvalue"
+
+THIS = "that"
+
+# Another comment
+NOCHANGE = "samevalue"
+OTHER = 'anothervalue'
+
+MULTILINE = " \\
+ d1 \\
+ d2 \\
+ d3 \\
+ "
+
+MULTILINE2 := " \\
+ b1 \\
+ b2 \\
+ b3 \\
+ "
+
+
+MULTILINE3 = "nowsingle"
+
+do_functionname() {
+ command1 ${VAL1} ${VAL2}
+ command2 ${VAL3} ${VAL4}
+}
+"""
+ self._testeditfile({'MULTILINE': (['d1','d2','d3'], None, 4, False),
+ 'MULTILINE3': ('nowsingle', None, 4, True),
+ 'NOTPRESENT': (['a', 'b'], None, 4, False)}, newfile2, dummyvars=['NOTPRESENT'])
+
+
+ def test_edit_metadata_file_3(self):
+
+ newfile3 = """
+# A comment
+HELLO = "oldvalue"
+
+# Another comment
+NOCHANGE = "samevalue"
+OTHER = "yetanothervalue"
+
+MULTILINE = "e1 \\
+ e2 \\
+ e3 \\
+ "
+
+MULTILINE2 := "f1 \\
+\tf2 \\
+\t"
+
+
+MULTILINE3 = " \\
+ c1 \\
+ c2 \\
+ c3 \\
+"
+
+do_functionname() {
+ othercommand_one a b c
+ othercommand_two d e f
+}
+"""
+
+ self._testeditfile({'do_functionname()': (['othercommand_one a b c', 'othercommand_two d e f'], None, 4, False),
+ 'MULTILINE2': (['f1', 'f2'], None, '\t', True),
+ 'MULTILINE': (['e1', 'e2', 'e3'], None, -1, True),
+ 'THIS': (None, None, 0, False),
+ 'OTHER': ('yetanothervalue', None, 0, True)}, newfile3)
+
+
+ def test_edit_metadata_file_4(self):
+
+ newfile4 = """
+# A comment
+HELLO = "oldvalue"
+
+THIS = "that"
+
+# Another comment
+OTHER = 'anothervalue'
+
+MULTILINE = "a1 \\
+ a2 \\
+ a3"
+
+MULTILINE2 := " \\
+ b1 \\
+ b2 \\
+ b3 \\
+ "
+
+
+"""
+
+ self._testeditfile({'NOCHANGE': (None, None, 0, False),
+ 'MULTILINE3': (None, None, 0, False),
+ 'THIS': ('that', None, 0, False),
+ 'do_functionname()': (None, None, 0, False)}, newfile4)
+
+
+ def test_edit_metadata(self):
+ newfile5 = """
+# A comment
+HELLO = "hithere"
+
+# A new comment
+THIS += "that"
+
+# Another comment
+NOCHANGE = "samevalue"
+OTHER = 'anothervalue'
+
+MULTILINE = "a1 \\
+ a2 \\
+ a3"
+
+MULTILINE2 := " \\
+ b1 \\
+ b2 \\
+ b3 \\
+ "
+
+
+MULTILINE3 = " \\
+ c1 \\
+ c2 \\
+ c3 \\
+"
+
+NEWVAR = "value"
+
+do_functionname() {
+ command1 ${VAL1} ${VAL2}
+ command2 ${VAL3} ${VAL4}
+}
+"""
+
+
+ def handle_var(varname, origvalue, op, newlines):
+ if varname == 'THIS':
+ newlines.append('# A new comment\n')
+ elif varname == 'do_functionname()':
+ newlines.append('NEWVAR = "value"\n')
+ newlines.append('\n')
+ valueitem = varvalues.get(varname, None)
+ if valueitem:
+ return valueitem
+ else:
+ return (origvalue, op, 0, True)
+
+ varvalues = {'HELLO': ('hithere', None, 0, True), 'THIS': ('that', '+=', 0, True)}
+ varlist = ['HELLO', 'THIS', 'do_functionname()']
+ (updated, newlines) = bb.utils.edit_metadata(self._origfile.splitlines(True), varlist, handle_var)
+ self.assertTrue(updated, 'List should be updated but isn\'t')
+ self.assertEqual(newlines, newfile5.splitlines(True))
+
+ # Make sure the orig value matches what we expect it to be
+ def test_edit_metadata_origvalue(self):
+ origfile = """
+MULTILINE = " stuff \\
+ morestuff"
+"""
+ expected_value = "stuff morestuff"
+ global value_in_callback
+ value_in_callback = ""
+
+ def handle_var(varname, origvalue, op, newlines):
+ global value_in_callback
+ value_in_callback = origvalue
+ return (origvalue, op, -1, False)
+
+ bb.utils.edit_metadata(origfile.splitlines(True),
+ ['MULTILINE'],
+ handle_var)
+
+ testvalue = re.sub('\s+', ' ', value_in_callback.strip())
+ self.assertEqual(expected_value, testvalue)
+
+class EditBbLayersConf(unittest.TestCase):
+
+ def _test_bblayers_edit(self, before, after, add, remove, notadded, notremoved):
+ with tempfile.NamedTemporaryFile('w', delete=False) as tf:
+ tf.write(before)
+ tf.close()
+ try:
+ actual_notadded, actual_notremoved = bb.utils.edit_bblayers_conf(tf.name, add, remove)
+ with open(tf.name) as f:
+ actual_after = f.readlines()
+ self.assertEqual(after.splitlines(True), actual_after)
+ self.assertEqual(notadded, actual_notadded)
+ self.assertEqual(notremoved, actual_notremoved)
+ finally:
+ os.remove(tf.name)
+
+
+ def test_bblayers_remove(self):
+ before = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS = " \
+ /home/user/path/layer1 \
+ /home/user/path/layer2 \
+ /home/user/path/subpath/layer3 \
+ /home/user/path/layer4 \
+ "
+"""
+ after = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS = " \
+ /home/user/path/layer1 \
+ /home/user/path/subpath/layer3 \
+ /home/user/path/layer4 \
+ "
+"""
+ self._test_bblayers_edit(before, after,
+ None,
+ '/home/user/path/layer2',
+ [],
+ [])
+
+
+ def test_bblayers_add(self):
+ before = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS = " \
+ /home/user/path/layer1 \
+ /home/user/path/layer2 \
+ /home/user/path/subpath/layer3 \
+ /home/user/path/layer4 \
+ "
+"""
+ after = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS = " \
+ /home/user/path/layer1 \
+ /home/user/path/layer2 \
+ /home/user/path/subpath/layer3 \
+ /home/user/path/layer4 \
+ /other/path/to/layer5 \
+ "
+"""
+ self._test_bblayers_edit(before, after,
+ '/other/path/to/layer5/',
+ None,
+ [],
+ [])
+
+
+ def test_bblayers_add_remove(self):
+ before = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS = " \
+ /home/user/path/layer1 \
+ /home/user/path/layer2 \
+ /home/user/path/subpath/layer3 \
+ /home/user/path/layer4 \
+ "
+"""
+ after = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS = " \
+ /home/user/path/layer1 \
+ /home/user/path/layer2 \
+ /home/user/path/layer4 \
+ /other/path/to/layer5 \
+ "
+"""
+ self._test_bblayers_edit(before, after,
+ ['/other/path/to/layer5', '/home/user/path/layer2/'], '/home/user/path/subpath/layer3/',
+ ['/home/user/path/layer2'],
+ [])
+
+
+ def test_bblayers_add_remove_home(self):
+ before = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS = " \
+ ~/path/layer1 \
+ ~/path/layer2 \
+ ~/otherpath/layer3 \
+ ~/path/layer4 \
+ "
+"""
+ after = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS = " \
+ ~/path/layer2 \
+ ~/path/layer4 \
+ ~/path2/layer5 \
+ "
+"""
+ self._test_bblayers_edit(before, after,
+ [os.environ['HOME'] + '/path/layer4', '~/path2/layer5'],
+ [os.environ['HOME'] + '/otherpath/layer3', '~/path/layer1', '~/path/notinlist'],
+ [os.environ['HOME'] + '/path/layer4'],
+ ['~/path/notinlist'])
+
+
+ def test_bblayers_add_remove_plusequals(self):
+ before = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS += " \
+ /home/user/path/layer1 \
+ /home/user/path/layer2 \
+ "
+"""
+ after = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS += " \
+ /home/user/path/layer2 \
+ /home/user/path/layer3 \
+ "
+"""
+ self._test_bblayers_edit(before, after,
+ '/home/user/path/layer3',
+ '/home/user/path/layer1',
+ [],
+ [])
+
+
+ def test_bblayers_add_remove_plusequals2(self):
+ before = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS += " \
+ /home/user/path/layer1 \
+ /home/user/path/layer2 \
+ /home/user/path/layer3 \
+ "
+BBLAYERS += "/home/user/path/layer4"
+BBLAYERS += "/home/user/path/layer5"
+"""
+ after = r"""
+# A comment
+
+BBPATH = "${TOPDIR}"
+BBFILES ?= ""
+BBLAYERS += " \
+ /home/user/path/layer2 \
+ /home/user/path/layer3 \
+ "
+BBLAYERS += "/home/user/path/layer5"
+BBLAYERS += "/home/user/otherpath/layer6"
+"""
+ self._test_bblayers_edit(before, after,
+ ['/home/user/otherpath/layer6', '/home/user/path/layer3'], ['/home/user/path/layer1', '/home/user/path/layer4', '/home/user/path/layer7'],
+ ['/home/user/path/layer3'],
+ ['/home/user/path/layer7'])