2 # -*- coding: utf-8 -*-
4 # This task takes messages off ‘patchq_input’ queue in any order and
5 # tries to assemble properly ordered threads containing a complete,
6 # self-contained series of patches). If it succeeds, it puts the
7 # single thread objects into the ‘patchq_thread’ exchange from where
8 # they get distributed to queues ready for testing.
10 # Incomplete patch sets are dealt with in two ways:
12 # (1) If the emails are not too old, then they stay in ‘patchq_input’
13 # until a later run of this script.
15 # (2) If they are too old, they are dropped. We assume in this case
16 # that a patch series was sent and either some emails were dropped or
17 # not all the emails were sent.
19 # Emails which are not patches or parts of a patch set are filtered
20 # out and dropped by this script. Note it is possible for a single
21 # email to contain a self-contained patch, in which case it is dealt
22 # with as a special case of a thread as above.
37 connection = pika.BlockingConnection(pika.ConnectionParameters(
38 host = config.mq_server))
39 channel = connection.channel()
41 re_diff = re.compile('^diff ', flags = re.MULTILINE)
43 # This is where we try to construct threads of messages.
47 channel.basic_ack(delivery_tag = method.delivery_tag)
49 # Process a single part patch. We can add it immediately to the
50 # outgoing queue and acknowledge it.
51 def single_patch(m, method):
52 # The receiver expects a list of messages, so we have to encode a
53 # message list here, not the single message.
54 body = json.dumps([m.as_string()])
55 channel.basic_publish(exchange = 'patchq_thread',
60 # Process a single email which we know is part of a patch series. We
61 # add it to the threads hash based on the parent ID, and we'll deal
63 def thread_patch(m, method, parent_id, part, n):
64 if parent_id in threads:
65 msg_dict = threads[parent_id]
68 msg_dict[part] = [m, method, part, n]
69 threads[parent_id] = msg_dict
71 # Process an email which looks like a patch.
73 # title = the '...' in '[PATCH...]'
74 # part, n = patch part number out of n
75 def process_patch(m, method, title, part, n):
76 # If it's a single patch (0/0 or 1/1) then handle that.
78 single_patch(m, method)
81 # Hopefully the patch series was posted using ‘git send-email
82 # --thread’, in which case all messages should contain an
83 # In-Reply-To header and we can thread them easily.
84 parent_id = m['In-Reply-To']
86 # Ignore the 'title' field.
87 thread_patch(m, method, parent_id, part, n)
90 # Otherwise we have to do some horrible heuristics. Make up a
91 # pseudo parent message ID composed of the From and To fields,
92 # title and number of patches in the series, and hope for the
99 parent_id = h.hexdigest()
100 thread_patch(m, method, parent_id, part, n)
102 # Process a single email. This deals with filtering non-patches.
103 def process(m, method):
105 print ("Processing %s" % subj)
107 # Filter out and drop non-patches.
108 m1 = re.search('\[PATCH(.*)\]', subj)
110 print "... dropped: doesn't have [PATCH] in the subject line"
111 # Drop the non-patch by acknowledging it.
115 # Is it part of a series?
116 m2 = re.search('(\d+)/(\d+)$', m1.group(1))
120 part = int(m2.group(1))
122 # Is it a cover letter? That is part 0, drop it.
124 print "... dropped: cover letter"
128 # The message must be singlepart and must contain "^diff" somewhere.
130 print "... dropped: multipart message"
134 if not re.search(re_diff, m.get_payload(decode = True)):
135 print "... dropped: does not contain a 'diff' line"
139 # It looks sufficiently like a patch to continue.
140 process_patch(m, method, m1.group(1), part, n)
142 # Read all messages from ‘patchq_input’ but DO NOT ACKNOWLEDGE THEM (yet).
144 method, _, body = channel.basic_get(queue = 'patchq_input', no_ack = False)
146 m = email.message_from_string(body) # the email
149 # Once we've processed all the incoming messages, look at the threads
150 # dictionary to see if we've got any completed threads and send those.
151 for parent_id in threads.keys():
152 msg_dict = threads[parent_id]
154 # If we didn't get part 1/n, it must be incomplete.
155 if 1 not in msg_dict:
159 # Check if we got all parts.
162 for i in range(1, n+1):
164 msgs.append(msg_dict[i][0].as_string())
170 # It looks like we got a full set, so we can send it.
171 body = json.dumps(msgs)
172 channel.basic_publish(exchange = 'patchq_thread',
176 # Acknowledge each message so they are dropped from the
178 for i in range(1, n+1):
181 del threads[parent_id]
183 # Check for incomplete threads. If they consist only of old messages
184 # then we drop them, otherwise we leave them for the next run.
185 for parent_id in threads.keys():
186 msg_dict = threads[parent_id]
189 for msg in msg_dict.itervalues():
191 date_tuple = email.utils.parsedate_tz(msg[0]['Date'])
193 t = email.utils.mktime_tz(date_tuple)
194 now = calendar.timegm(time.gmtime())
195 #print ("t = %d, now = %d, now-t = %d" % (t, now, now-t))
196 if t and now - t < config.assembly_time:
199 # If they're too old, drop.
201 for msg in msg_dict.itervalues():
202 print "... dropped: incomplete thread too old"