22416e6625339dd6bf83ea8234dabc051d34ec03
[patchq.git] / threader.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 # This task takes messages off ‘patchq_input’ queue in any order and
5 # tries to assemble properly ordered threads containing a complete,
6 # self-contained series of patches).  If it succeeds, it puts the
7 # single thread objects into the ‘patchq_thread’ exchange from where
8 # they get distributed to queues ready for testing.
9 #
10 # Incomplete patch sets are dealt with in two ways:
11 #
12 # (1) If the emails are not too old, then they stay in ‘patchq_input’
13 # until a later run of this script.
14 #
15 # (2) If they are too old, they are dropped.  We assume in this case
16 # that a patch series was sent and either some emails were dropped or
17 # not all the emails were sent.
18 #
19 # Emails which are not patches or parts of a patch set are filtered
20 # out and dropped by this script.  Note it is possible for a single
21 # email to contain a self-contained patch, in which case it is dealt
22 # with as a special case of a thread as above.
23
24 import calendar
25 import email
26 import email.utils
27 import hashlib
28 import json
29 import mailbox
30 import pika
31 import re
32 import sys
33 import time
34
35 import config
36
37 connection = pika.BlockingConnection(pika.ConnectionParameters(
38     host = config.mq_server))
39 channel = connection.channel()
40
41 re_diff = re.compile('^diff ', flags = re.MULTILINE)
42
43 # This is where we try to construct threads of messages.
44 threads = dict()
45
46 def ack(method):
47     channel.basic_ack(delivery_tag = method.delivery_tag)
48
49 # Process a single part patch.  We can add it immediately to the
50 # outgoing queue and acknowledge it.
51 def single_patch(m, method):
52     # The receiver expects a list of messages, so we have to encode a
53     # message list here, not the single message.
54     body = json.dumps([m.as_string()])
55     channel.basic_publish(exchange = 'patchq_thread',
56                           routing_key = '',
57                           body = body)
58     ack(method)
59
60 # Process a single email which we know is part of a patch series.  We
61 # add it to the threads hash based on the parent ID, and we'll deal
62 # with it at the end.
63 def thread_patch(m, method, parent_id, part, n):
64     if parent_id in threads:
65         msg_dict = threads[parent_id]
66     else:
67         msg_dict = dict()
68     msg_dict[part] = [m, method, part, n]
69     threads[parent_id] = msg_dict
70
71 # Process an email which looks like a patch.
72 # m = the email
73 # title = the '...' in '[PATCH...]'
74 # part, n = patch part number out of n
75 def process_patch(m, method, title, part, n):
76     # If it's a single patch (0/0 or 1/1) then handle that.
77     if n == 0 or n == 1:
78         single_patch(m, method)
79         return
80
81     # Hopefully the patch series was posted using ‘git send-email
82     # --thread’, in which case all messages should contain an
83     # In-Reply-To header and we can thread them easily.
84     parent_id = m['In-Reply-To']
85     if parent_id:
86         # Ignore the 'title' field.
87         thread_patch(m, method, parent_id, part, n)
88         return
89
90     # Otherwise we have to do some horrible heuristics.  Make up a
91     # pseudo parent message ID composed of the From and To fields,
92     # title and number of patches in the series, and hope for the
93     # best.
94     h = hashlib.md5()
95     h.update(m['From'])
96     h.update(m['To'])
97     h.update(title)
98     h.update(string(n))
99     parent_id = h.hexdigest()
100     thread_patch(m, method, parent_id, part, n)
101
102 # Process a single email.  This deals with filtering non-patches.
103 def process(m, method):
104     subj = m['Subject']
105     print ("Processing %s" % subj)
106
107     # Filter out and drop non-patches.
108     m1 = re.search('\[PATCH(.*)\]', subj)
109     if not m1:
110         print "... dropped: doesn't have [PATCH] in the subject line"
111         # Drop the non-patch by acknowledging it.
112         ack(method)
113         return
114
115     # Is it part of a series?
116     m2 = re.search('(\d+)/(\d+)$', m1.group(1))
117     part = 0
118     n = 0
119     if m2:
120         part = int(m2.group(1))
121         n = int(m2.group(2))
122         # Is it a cover letter?  That is part 0, drop it.
123         if part == 0:
124             print "... dropped: cover letter"
125             ack(method)
126             return
127
128     # The message must be singlepart and must contain "^diff" somewhere.
129     if m.is_multipart():
130         print "... dropped: multipart message"
131         ack(method)
132         return
133
134     if not re.search(re_diff, m.get_payload(decode = True)):
135         print "... dropped: does not contain a 'diff' line"
136         ack(method)
137         return
138
139     # It looks sufficiently like a patch to continue.
140     process_patch(m, method, m1.group(1), part, n)
141
142 # Read all messages from ‘patchq_input’ but DO NOT ACKNOWLEDGE THEM (yet).
143 while True:
144     method, _, body = channel.basic_get(queue = 'patchq_input', no_ack = False)
145     if not method: break
146     m = email.message_from_string(body) # the email
147     process(m, method)
148
149 # Once we've processed all the incoming messages, look at the threads
150 # dictionary to see if we've got any completed threads and send those.
151 for parent_id in threads.keys():
152     msg_dict = threads[parent_id]
153
154     # If we didn't get part 1/n, it must be incomplete.
155     if 1 not in msg_dict:
156         continue
157     n = msg_dict[1][3]
158
159     # Check if we got all parts.
160     incomplete = False
161     msgs = []
162     for i in range(1, n+1):
163         if i in msg_dict:
164             msgs.append(msg_dict[i][0].as_string())
165         else:
166             incomplete = True
167     if incomplete:
168         continue
169
170     # It looks like we got a full set, so we can send it.
171     body = json.dumps(msgs)
172     channel.basic_publish(exchange = 'patchq_thread',
173                           routing_key = '',
174                           body = body)
175
176     # Acknowledge each message so they are dropped from the
177     # incoming queue.
178     for i in range(1, n+1):
179         ack(msg_dict[i][1])
180
181     del threads[parent_id]
182
183 # Check for incomplete threads.  If they consist only of old messages
184 # then we drop them, otherwise we leave them for the next run.
185 for parent_id in threads.keys():
186     msg_dict = threads[parent_id]
187
188     old = True
189     for msg in msg_dict.itervalues():
190         t = None
191         date_tuple = email.utils.parsedate_tz(msg[0]['Date'])
192         if date_tuple:
193             t = email.utils.mktime_tz(date_tuple)
194         now = calendar.timegm(time.gmtime())
195         #print ("t = %d, now = %d, now-t = %d" % (t, now, now-t))
196         if t and now - t < config.assembly_time:
197             old = False
198
199     # If they're too old, drop.
200     if old:
201         for msg in msg_dict.itervalues():
202             print "... dropped: incomplete thread too old"
203             ack(msg[1])