#!/usr/bin/python # -*- coding: utf-8 -*- # This task takes messages off ‘patchq_input’ queue in any order and # tries to assemble properly ordered threads containing a complete, # self-contained series of patches). If it succeeds, it puts the # single thread objects into the ‘patchq_thread’ exchange from where # they get distributed to queues ready for testing. # # Incomplete patch sets are dealt with in two ways: # # (1) If the emails are not too old, then they stay in ‘patchq_input’ # until a later run of this script. # # (2) If they are too old, they are dropped. We assume in this case # that a patch series was sent and either some emails were dropped or # not all the emails were sent. # # Emails which are not patches or parts of a patch set are filtered # out and dropped by this script. Note it is possible for a single # email to contain a self-contained patch, in which case it is dealt # with as a special case of a thread as above. import calendar import email import email.utils import hashlib import json import mailbox import pika import re import sys import time import config connection = pika.BlockingConnection(pika.ConnectionParameters( host = config.mq_server)) channel = connection.channel() re_diff = re.compile('^diff ', flags = re.MULTILINE) # This is where we try to construct threads of messages. threads = dict() def ack(method): channel.basic_ack(delivery_tag = method.delivery_tag) # Process a single part patch. We can add it immediately to the # outgoing queue and acknowledge it. def single_patch(m, method): # The receiver expects a list of messages, so we have to encode a # message list here, not the single message. body = json.dumps([m.as_string()]) channel.basic_publish(exchange = 'patchq_thread', routing_key = '', body = body) ack(method) # Process a single email which we know is part of a patch series. We # add it to the threads hash based on the parent ID, and we'll deal # with it at the end. def thread_patch(m, method, parent_id, part, n): if parent_id in threads: msg_dict = threads[parent_id] else: msg_dict = dict() msg_dict[part] = [m, method, part, n] threads[parent_id] = msg_dict # Process an email which looks like a patch. # m = the email # title = the '...' in '[PATCH...]' # part, n = patch part number out of n def process_patch(m, method, title, part, n): # If it's a single patch (0/0 or 1/1) then handle that. if n == 0 or n == 1: single_patch(m, method) return # Hopefully the patch series was posted using ‘git send-email # --thread’, in which case all messages should contain an # In-Reply-To header and we can thread them easily. parent_id = m['In-Reply-To'] if parent_id: # Ignore the 'title' field. thread_patch(m, method, parent_id, part, n) return # Otherwise we have to do some horrible heuristics. Make up a # pseudo parent message ID composed of the From and To fields, # title and number of patches in the series, and hope for the # best. h = hashlib.md5() h.update(m['From']) h.update(m['To']) h.update(title) h.update(str(n)) parent_id = h.hexdigest() thread_patch(m, method, parent_id, part, n) # Process a single email. This deals with filtering non-patches. def process(m, method): subj = m['Subject'] print ("Processing %s" % subj) # Filter out and drop non-patches. m1 = re.search('\[PATCH(.*)\]', subj) if not m1: print "... dropped: doesn't have [PATCH] in the subject line" # Drop the non-patch by acknowledging it. ack(method) return # Is it part of a series? m2 = re.search('(\d+)/(\d+)$', m1.group(1)) part = 0 n = 0 if m2: part = int(m2.group(1)) n = int(m2.group(2)) # Is it a cover letter? That is part 0, drop it. if part == 0: print "... dropped: cover letter" ack(method) return # The message must be singlepart and must contain "^diff" somewhere. if m.is_multipart(): print "... dropped: multipart message" ack(method) return if not re.search(re_diff, m.get_payload(decode = True)): print "... dropped: does not contain a 'diff' line" ack(method) return # It looks sufficiently like a patch to continue. process_patch(m, method, m1.group(1), part, n) # Read all messages from ‘patchq_input’ but DO NOT ACKNOWLEDGE THEM (yet). while True: method, _, body = channel.basic_get(queue = 'patchq_input', no_ack = False) if not method: break m = email.message_from_string(body) # the email process(m, method) # Once we've processed all the incoming messages, look at the threads # dictionary to see if we've got any completed threads and send those. for parent_id in threads.keys(): msg_dict = threads[parent_id] # If we didn't get part 1/n, it must be incomplete. if 1 not in msg_dict: continue n = msg_dict[1][3] # Check if we got all parts. incomplete = False msgs = [] for i in range(1, n+1): if i in msg_dict: msgs.append(msg_dict[i][0].as_string()) else: incomplete = True if incomplete: continue # It looks like we got a full set, so we can send it. body = json.dumps(msgs) channel.basic_publish(exchange = 'patchq_thread', routing_key = '', body = body) # Acknowledge each message so they are dropped from the # incoming queue. for i in range(1, n+1): ack(msg_dict[i][1]) del threads[parent_id] # Check for incomplete threads. If they consist only of old messages # then we drop them, otherwise we leave them for the next run. for parent_id in threads.keys(): msg_dict = threads[parent_id] old = True for msg in msg_dict.itervalues(): t = None date_tuple = email.utils.parsedate_tz(msg[0]['Date']) if date_tuple: t = email.utils.mktime_tz(date_tuple) now = calendar.timegm(time.gmtime()) #print ("t = %d, now = %d, now-t = %d" % (t, now, now-t)) if t and now - t < config.assembly_time: old = False # If they're too old, drop. if old: for msg in msg_dict.itervalues(): print "... dropped: incomplete thread too old" ack(msg[1])