]> granicus.if.org Git - apache/blob - support/ctauditscts
Xforms.
[apache] / support / ctauditscts
1 #!/usr/bin/env python
2 #
3 # Licensed to the Apache Software Foundation (ASF) under one or more
4 # contributor license agreements.  See the NOTICE file distributed with
5 # this work for additional information regarding copyright ownership.
6 # The ASF licenses this file to You under the Apache License, Version 2.0
7 # (the "License"); you may not use this file except in compliance with
8 # the License.  You may obtain a copy of the License at
9 #
10 #     http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 import binascii
19 import os
20 import sqlite3
21 import ssl
22 import struct
23 import sys
24 import tempfile
25
26 from contextlib import closing
27
28 SERVER_START = 1
29 KEY_START = 2
30 CERT_START = 3
31 SCT_START = 4
32
33
34 def usage():
35     print >> sys.stderr, ('Usage: %s /path/to/audit/files ' +
36                           '[/path/to/log-config-db]') % sys.argv[0]
37     sys.exit(1)
38
39
40 def audit(fn, tmp, already_checked, cur):
41     print 'Auditing %s...' % fn
42
43     # First, parse the audit file into a series of related
44     #
45     #   1. PEM file with certificate chain
46     #   2. Individual SCT files
47     #
48     # Next,  for each SCT, invoke verify_single_proof to verify.
49     log_bytes = open(fn, 'rb').read()
50     offset = 0
51     while offset < len(log_bytes):
52         print 'Got package from server...'
53         val = struct.unpack_from('>H', log_bytes, offset)
54         assert val[0] == SERVER_START
55         offset += 2
56
57         assert struct.unpack_from('>H', log_bytes, offset)[0] == KEY_START
58         offset += 2
59
60         key_size = struct.unpack_from('>H', log_bytes, offset)[0]
61         assert key_size > 0
62         offset += 2
63
64         key = log_bytes[offset:offset + key_size]
65         offset += key_size
66
67         # at least one certificate
68         assert struct.unpack_from('>H', log_bytes, offset)[0] == CERT_START
69
70         # for each certificate:
71         leaf = None
72         while struct.unpack_from('>H', log_bytes, offset)[0] == CERT_START:
73             offset += 2
74             val = struct.unpack_from('BBB', log_bytes, offset)
75             offset += 3
76             der_size = (val[0] << 16) | (val[1] << 8) | (val[2] << 0)
77             print '  Certificate size:', hex(der_size)
78             if not leaf:
79                 leaf = (offset, der_size)
80             offset += der_size
81
82         pem = ssl.DER_cert_to_PEM_cert(log_bytes[leaf[0]:leaf[0] + leaf[1]])
83
84         tmp_leaf_pem = tempfile.mkstemp(text=True)
85         with closing(os.fdopen(tmp_leaf_pem[0], 'w')) as f:
86             f.write(pem)
87
88         # at least one SCT
89         assert struct.unpack_from('>H', log_bytes, offset)[0] == SCT_START
90
91         # for each SCT:
92         while offset < len(log_bytes) and \
93                 struct.unpack_from('>H', log_bytes, offset)[0] == SCT_START:
94             offset += 2
95             len_offset = offset
96             sct_size = struct.unpack_from('>H', log_bytes, len_offset)[0]
97             offset += 2
98             print '  SCT size:', hex(sct_size)
99             log_id = log_bytes[offset + 1:offset + 1 + 32]
100             log_id_hex = binascii.hexlify(log_id).upper()
101             print '    Log id: %s' % log_id_hex
102             timestamp_ms = struct.unpack_from('>Q', log_bytes, offset + 33)[0]
103             print '    Timestamp: %s' % timestamp_ms
104
105             #  If we ever need the full SCT: sct = (offset, sct_size)
106             offset += sct_size
107
108             if key in already_checked:
109                 print '  (SCTs already checked)'
110                 continue
111
112             already_checked[key] = True
113
114             log_url_arg = ''
115             if cur:
116                 stmt = 'SELECT * FROM loginfo WHERE log_id = ?'
117                 cur.execute(stmt, [log_id_hex])
118                 recs = list(cur.fetchall())
119                 if len(recs) > 0 and recs[0][6] is not None:
120                     log_url = recs[0][6]
121
122                     # verify_single_proof doesn't accept <scheme>://
123                     if '://' in log_url:
124                         log_url = log_url.split('://')[1]
125                     log_url_arg = '--log_url %s' % log_url
126
127                     print '    Log URL: ' + log_url
128
129             cmd = 'verify_single_proof.py --cert %s --timestamp %s %s' % \
130                   (tmp_leaf_pem[1], timestamp_ms, log_url_arg)
131             print '>%s<' % cmd
132             os.system(cmd)
133
134         os.unlink(tmp_leaf_pem[1])
135
136
137 def main():
138     if len(sys.argv) != 2 and len(sys.argv) != 3:
139         usage()
140
141     top = sys.argv[1]
142     tmp = '/tmp'
143
144     if len(sys.argv) == 3:
145         cxn = sqlite3.connect(sys.argv[2])
146         cur = cxn.cursor()
147     else:
148         cur = None
149
150     # could serialize this between runs to further limit duplicate checking
151     already_checked = dict()
152
153     for dirpath, dnames, fnames in os.walk(top):
154         fnames = [fn for fn in fnames if fn[-4:] == '.out']
155         for fn in fnames:
156             audit(os.path.join(dirpath, fn), tmp, already_checked, cur)
157
158
159 if __name__ == "__main__":
160     main()