1 #! /usr/bin/env python3
2 """ Testcases for zziplib build system """
4 __copyright__ = "(C) Guido Draheim, all rights reserved"""
5 __version__ = "0.13.70"
17 from fnmatch import fnmatchcase as fnmatch
22 if sys.version[0] == '3':
26 logg = logging.getLogger("TESTING")
27 _python = "/usr/bin/python"
29 SAVETO = "localhost:5000/zziplib"
30 IMAGES = "localhost:5000/zziplib/image"
31 CENTOS = "centos:7.7.1908"
32 UBUNTU = "ubuntu:14.04"
33 OPENSUSE = "opensuse/leap:15.0"
35 DOCKER_SOCKET = "/var/run/docker.sock"
38 if text is None: return None
39 if isinstance(text, bytes):
40 encoded = sys.getdefaultencoding()
41 if encoded in ["ascii"]:
44 return text.decode(encoded)
46 return text.decode("latin-1")
48 def sh____(cmd, shell=True):
49 if isinstance(cmd, basestring):
50 logg.info(": %s", cmd)
52 logg.info(": %s", " ".join(["'%s'" % item for item in cmd]))
53 return subprocess.check_call(cmd, shell=shell)
54 def sx____(cmd, shell=True):
55 if isinstance(cmd, basestring):
56 logg.info(": %s", cmd)
58 logg.info(": %s", " ".join(["'%s'" % item for item in cmd]))
59 return subprocess.call(cmd, shell=shell)
60 def output(cmd, shell=True):
61 if isinstance(cmd, basestring):
62 logg.info(": %s", cmd)
64 logg.info(": %s", " ".join(["'%s'" % item for item in cmd]))
65 run = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE)
66 out, err = run.communicate()
68 def output2(cmd, shell=True):
69 if isinstance(cmd, basestring):
70 logg.info(": %s", cmd)
72 logg.info(": %s", " ".join(["'%s'" % item for item in cmd]))
73 run = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE)
74 out, err = run.communicate()
75 return decodes(out), run.returncode
76 def output3(cmd, shell=True):
77 if isinstance(cmd, basestring):
78 logg.info(": %s", cmd)
80 logg.info(": %s", " ".join(["'%s'" % item for item in cmd]))
81 run = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
82 out, err = run.communicate()
83 return decodes(out), decodes(err), run.returncode
84 def background(cmd, shell=True):
85 BackgroundProcess = collections.namedtuple("BackgroundProcess", ["pid", "run", "log" ])
86 log = open(os.devnull, "wb")
87 run = subprocess.Popen(cmd, shell=shell, stdout=log, stderr=log)
89 logg.info("PID %s = %s", pid, cmd)
90 return BackgroundProcess(pid, run, log)
95 if isinstance(lines, basestring):
96 lines = lines.split("\n")
97 if len(lines) and lines[-1] == "":
102 for line in _lines(text):
103 lines.append(line.rstrip())
105 def grep(pattern, lines):
106 for line in _lines(lines):
107 if re.search(pattern, line.rstrip()):
109 def greps(lines, pattern):
110 return list(grep(pattern, lines))
112 def download(base_url, filename, into):
113 if not os.path.isdir(into):
115 if not os.path.exists(os.path.join(into, filename)):
116 sh____("cd {into} && wget {base_url}/{filename}".format(**locals()))
117 def text_file(filename, content):
118 filedir = os.path.dirname(filename)
119 if not os.path.isdir(filedir):
121 f = open(filename, "w")
122 if content.startswith("\n"):
123 x = re.match("(?s)\n( *)", content)
125 for line in content[1:].split("\n"):
126 if line.startswith(indent):
127 line = line[len(indent):]
132 def shell_file(filename, content):
133 text_file(filename, content)
134 os.chmod(filename, 0o770)
135 def copy_file(filename, target):
136 targetdir = os.path.dirname(target)
137 if not os.path.isdir(targetdir):
138 os.makedirs(targetdir)
139 shutil.copyfile(filename, target)
140 def copy_tool(filename, target):
141 copy_file(filename, target)
142 os.chmod(target, 0o750)
144 def get_caller_name():
145 frame = inspect.currentframe().f_back.f_back
146 return frame.f_code.co_name
147 def get_caller_caller_name():
148 frame = inspect.currentframe().f_back.f_back.f_back
149 return frame.f_code.co_name
150 def os_path(root, path):
155 while path.startswith(os.path.sep):
157 return os.path.join(root, path)
159 return os.path.splitext(os.path.basename(path))[0]
161 class ZZiplibBuildTest(unittest.TestCase):
162 def caller_testname(self):
163 name = get_caller_caller_name()
165 if x1 < 0: return name
166 x2 = name.find("_", x1+1)
167 if x2 < 0: return name
169 def testname(self, suffix = None):
170 name = self.caller_testname()
172 return name + "_" + suffix
174 def testdir(self, testname = None):
175 testname = testname or self.caller_testname()
176 newdir = "tmp/tmp."+testname
177 if os.path.isdir(newdir):
178 shutil.rmtree(newdir)
181 def rm_testdir(self, testname = None):
182 testname = testname or self.caller_testname()
183 newdir = "tmp/tmp."+testname
184 if os.path.isdir(newdir):
185 shutil.rmtree(newdir)
187 def makedirs(self, path):
188 if not os.path.isdir(path):
193 def ip_container(self, name):
194 values = output("docker inspect "+name)
195 values = json.loads(values)
196 if not values or "NetworkSettings" not in values[0]:
197 logg.critical(" docker inspect %s => %s ", name, values)
198 return values[0]["NetworkSettings"]["IPAddress"]
199 def local_system(self):
200 distro, version = "", ""
201 if os.path.exists("/etc/os-release"):
202 # rhel:7.4 # VERSION="7.4 (Maipo)" ID="rhel" VERSION_ID="7.4"
203 # centos:7.3 # VERSION="7 (Core)" ID="centos" VERSION_ID="7"
204 # centos:7.4 # VERSION="7 (Core)" ID="centos" VERSION_ID="7"
205 # centos:7.7.1908 # VERSION="7 (Core)" ID="centos" VERSION_ID="7"
206 # opensuse:42.3 # VERSION="42.3" ID=opensuse VERSION_ID="42.3"
207 # opensuse/leap:15.0 # VERSION="15.0" ID="opensuse-leap" VERSION_ID="15.0"
208 # ubuntu:16.04 # VERSION="16.04.3 LTS (Xenial Xerus)" ID=ubuntu VERSION_ID="16.04"
209 # ubuntu:18.04 # VERSION="18.04.1 LTS (Bionic Beaver)" ID=ubuntu VERSION_ID="18.04"
210 for line in open("/etc/os-release"):
212 m = re.match('^([_\\w]+)=([^"].*).*', line.strip())
214 key, value = m.group(1), m.group(2)
215 m = re.match('^([_\\w]+)="([^"]*)".*', line.strip())
217 key, value = m.group(1), m.group(2)
218 # logg.debug("%s => '%s' '%s'", line.strip(), key, value)
220 distro = value.replace("-","/")
221 if key in ["VERSION_ID"]:
223 if os.path.exists("/etc/redhat-release"):
224 for line in open("/etc/redhat-release"):
225 m = re.search("release (\\d+[.]\\d+).*", line)
229 if os.path.exists("/etc/centos-release"):
230 # CentOS Linux release 7.5.1804 (Core)
231 for line in open("/etc/centos-release"):
232 m = re.search("release (\\d+[.]\\d+).*", line)
236 logg.info(":: local_system %s:%s", distro, version)
237 if distro and version:
238 return "%s:%s" % (distro, version)
240 def with_local_ubuntu_mirror(self, ver = None):
241 """ detects a local ubuntu mirror or starts a local
242 docker container with a ubunut repo mirror. It
243 will return the extra_hosts setting to start
244 other docker containers"""
245 rmi = "localhost:5000/mirror-packages"
247 ver = ver or UBUNTU.split(":")[1]
248 return self.with_local(rmi, rep, ver, "archive.ubuntu.com", "security.ubuntu.com")
249 def with_local_centos_mirror(self, ver = None):
250 """ detects a local centos mirror or starts a local
251 docker container with a centos repo mirror. It
252 will return the setting for extrahosts"""
253 rmi = "localhost:5000/mirror-packages"
255 ver = ver or CENTOS.split(":")[1]
256 return self.with_local(rmi, rep, ver, "mirrorlist.centos.org")
257 def with_local_opensuse_mirror(self, ver = None):
258 """ detects a local opensuse mirror or starts a local
259 docker container with a centos repo mirror. It
260 will return the extra_hosts setting to start
261 other docker containers"""
262 rmi = "localhost:5000/mirror-packages"
263 rep = "opensuse-repo"
264 ver = ver or OPENSUSE.split(":")[1]
265 return self.with_local(rmi, rep, ver, "download.opensuse.org")
266 def with_local(self, rmi, rep, ver, *hosts):
267 image = "{rmi}/{rep}:{ver}".format(**locals())
268 container = "{rep}-{ver}".format(**locals())
269 out, err, ok = output3("docker inspect {image}".format(**locals()))
270 image_found = json.loads(out)
273 out, err, ok = output3("docker inspect {container}".format(**locals()))
274 container_found = json.loads(out)
276 container_status = container_found[0]["State"]["Status"]
277 logg.info("::: %s -> %s", container, container_status)
278 latest_image_id = image_found[0]["Id"]
279 container_image_id = container_found[0]["Image"]
280 if latest_image_id != container_image_id or container_status not in ["running"]:
281 cmd = "docker rm --force {container}"
282 sx____(cmd.format(**locals()))
284 if not container_found:
285 cmd = "docker run --rm=true --detach --name {container} {image}"
286 sh____(cmd.format(**locals()))
287 ip_a = self.ip_container(container)
288 logg.info("::: %s => %s", container, ip_a)
289 return dict(zip(hosts, [ ip_a ] * len(hosts)))
290 def with_local_mirror(self, image):
291 """ attach local centos-repo / opensuse-repo to docker-start enviroment.
292 Effectivly when it is required to 'docker start centos:x.y' then do
293 'docker start centos-repo:x.y' before and extend the original to
294 'docker start --add-host mirror...:centos-repo centos:x.y'. """
296 if image.startswith("centos:"):
297 version = image[len("centos:"):]
298 hosts = self.with_local_centos_mirror(version)
299 if image.startswith("opensuse/leap:"):
300 version = image[len("opensuse/leap:"):]
301 hosts = self.with_local_opensuse_mirror(version)
302 if image.startswith("opensuse:"):
303 version = image[len("opensuse:"):]
304 hosts = self.with_local_opensuse_mirror(version)
305 if image.startswith("ubuntu:"):
306 version = image[len("ubuntu:"):]
307 hosts = self.with_local_ubuntu_mirror(version)
309 def add_hosts(self, hosts):
310 return " ".join(["--add-host %s:%s" % (host, ip_a) for host, ip_a in hosts.items() ])
311 # for host, ip_a in mapping.items():
312 # yield "--add-host {host}:{ip_a}"
313 def local_image(self, image):
314 """ attach local centos-repo / opensuse-repo to docker-start enviroment.
315 Effectivly when it is required to 'docker start centos:x.y' then do
316 'docker start centos-repo:x.y' before and extend the original to
317 'docker start --add-host mirror...:centos-repo centos:x.y'. """
318 if os.environ.get("NONLOCAL",""):
320 hosts = self.with_local_mirror(image)
322 add_hosts = self.add_hosts(hosts)
323 logg.debug("%s %s", add_hosts, image)
324 return "{add_hosts} {image}".format(**locals())
326 def local_addhosts(self, dockerfile):
328 for line in open(dockerfile):
329 m = re.match('[Ff][Rr][Oo][Mm] *"([^"]*)"', line)
333 m = re.match("[Ff][Rr][Oo][Mm] *(\w[^ ]*)", line)
335 image = m.group(1).strip()
337 logg.debug("--\n-- '%s' FROM '%s'", dockerfile, image)
339 hosts = self.with_local_mirror(image)
340 return self.add_hosts(hosts)
342 def drop_container(self, name):
343 cmd = "docker rm --force {name}"
344 sx____(cmd.format(**locals()))
345 def drop_centos(self):
346 self.drop_container("centos")
347 def drop_ubuntu(self):
348 self.drop_container("ubuntu")
349 def drop_opensuse(self):
350 self.drop_container("opensuse")
351 def make_opensuse(self):
352 self.make_container("opensuse", OPENSUSE)
353 def make_ubuntu(self):
354 self.make_container("ubuntu", UBUNTU)
355 def make_centos(self):
356 self.make_container("centos", CENTOS)
357 def make_container(self, name, image):
358 self.drop_container(name)
359 local_image = self.local_image(image)
360 cmd = "docker run --detach --name {name} {local_image} sleep 1000"
361 sh____(cmd.format(**locals()))
362 print(" # " + local_image)
363 print(" docker exec -it "+name+" bash")
365 # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
368 logg.info("\n CENTOS = '%s'", CENTOS)
369 self.with_local_centos_mirror()
370 def test_201_opensuse15_build_dockerfile(self):
371 if not os.path.exists(DOCKER_SOCKET): self.skipTest("docker-based test")
372 testname=self.testname()
373 testdir = self.testdir()
374 name="opensuse15-zziplib"
375 dockerfile="testbuilds/opensuse15-build.dockerfile"
376 addhosts = self.local_addhosts(dockerfile)
377 savename = docname(dockerfile)
380 cmd = "docker build . -f {dockerfile} {addhosts} --tag {images}:{testname}"
381 sh____(cmd.format(**locals()))
382 cmd = "docker rm --force {testname}"
383 sx____(cmd.format(**locals()))
385 cmd = "docker rmi {saveto}/{savename}:latest"
386 sx____(cmd.format(**locals()))
387 cmd = "docker tag {images}:{testname} {saveto}/{savename}:latest"
388 sh____(cmd.format(**locals()))
389 cmd = "docker rmi {images}:{testname}"
390 sx____(cmd.format(**locals()))
394 if __name__ == "__main__":
395 from optparse import OptionParser
396 _o = OptionParser("%prog [options] test*",
397 epilog=__doc__.strip().split("\n")[0])
398 _o.add_option("-v","--verbose", action="count", default=0,
399 help="increase logging level [%default]")
400 _o.add_option("-p","--python", metavar="EXE", default=_python,
401 help="use another python execution engine [%default]")
402 _o.add_option("-l","--logfile", metavar="FILE", default="",
403 help="additionally save the output log to a file [%default]")
404 _o.add_option("--xmlresults", metavar="FILE", default=None,
405 help="capture results as a junit xml file [%default]")
406 opt, args = _o.parse_args()
407 logging.basicConfig(level = logging.WARNING - opt.verbose * 5)
413 if os.path.exists(opt.logfile):
414 os.remove(opt.logfile)
415 logfile = logging.FileHandler(opt.logfile)
416 logfile.setFormatter(logging.Formatter("%(levelname)s:%(relativeCreated)d:%(message)s"))
417 logging.getLogger().addHandler(logfile)
418 logg.info("log diverted to %s", opt.logfile)
421 if os.path.exists(opt.xmlresults):
422 os.remove(opt.xmlresults)
423 xmlresults = open(opt.xmlresults, "w")
424 logg.info("xml results into %s", opt.xmlresults)
427 suite = unittest.TestSuite()
428 if not args: args = [ "test_*" ]
430 for classname in sorted(globals()):
431 if not classname.endswith("Test"):
433 testclass = globals()[classname]
434 for method in sorted(dir(testclass)):
435 if "*" not in arg: arg += "*"
436 if arg.startswith("_"): arg = arg[1:]
437 if fnmatch(method, arg):
438 suite.addTest(testclass(method))
443 Runner = xmlrunner.XMLTestRunner
444 Runner(xmlresults).run(suite)
446 Runner = unittest.TextTestRunner
447 Runner(verbosity=opt.verbose).run(suite)
449 Runner = unittest.TextTestRunner
452 Runner = xmlrunner.XMLTestRunner
453 Runner(logfile.stream, verbosity=opt.verbose).run(suite)