#!/usr/bin/python # -*- coding: utf-8 -*- # This task takes patch series messages off one of the ‘patchq_test_*’ # queues and tests the patch series. # # If you run it without arguments, it will pick up patches from each # test queue in turn and test them (but not in parallel). You can # also list one or more tests on the command line, in which case only # that test or tests are considered. # # This script does not perform tests in parallel, but doesn't care if # other instances of the script are running (even across machines). # Because of the message broker each patch series is only tested once. import email import json import mailbox import os import pika import shutil import subprocess import sys import tempfile import config # Because tests are very long running, turn off heartbeats. # https://stackoverflow.com/questions/14572020/handling-long-running-tasks-in-pika-rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters( host = config.mq_server, heartbeat_interval=0)) channel = connection.channel() def ack(method): channel.basic_ack(delivery_tag = method.delivery_tag) # Which tests to run? if len(sys.argv) <= 1: tests = config.tests else: tests = [] for arg in sys.argv[1:]: if arg not in config.tests: sys.exit("%s is not listed in config.tests" % arg) tests.append(arg) pwd = os.getcwd() for t in tests: qname = "patchq_test_%s" % t while True: method, _, body = channel.basic_get(queue = qname, no_ack = False) if not method: break # Parse the ordered list of messages forming the patch series. msgs = json.loads(body) msgs = [email.message_from_string(m) for m in msgs] # This should never happen, but the rest of the code # below assumes number of msgs > 0, so ... if len(msgs) == 0: ack(method) continue print ("%s: Running test:" % t) # Save them to a temporary directory. # Create large files in /var/tmp tempfile.tempdir = "/var/tmp" dir = tempfile.mkdtemp() os.chdir(dir) # Save the patches to files. i = 0 args = [] args.append("%s/%s.sh" % (pwd, t)) last_msg = None for m in msgs: i = i+1 filename = ("%05d" % i) args.append(filename) print ("%05d %s" % (i, m['Subject'])) with open(filename, "w") as file: file.write(m.as_string()) last_msg = m # Run the test. with open("output", "w") as out: r = subprocess.call(args, stdout=out, stderr=out) if r == 77: print ("%s: Test skipped" % t) else: # Do a "group reply" to the last email. tos = last_msg.get_all('to', []) ccs = last_msg.get_all('cc', []) from_ = last_msg['From'] to = email.utils.getaddresses(tos + ccs + [from_]) ref = last_msg['Message-Id'] if r == 0: status = "success" else: status = "FAILED" subject = "%s %s (was: Re: %s)" % (t, status, last_msg['Subject']) with open("output", "r") as file: content = file.read() body = json.dumps((to, subject, ref, content)) channel.basic_publish(exchange = 'patchq_reports', routing_key = '', body = body) # Ack the input message since we have processed it. ack(method) shutil.rmtree(dir, ignore_errors = True)