Coverage for aisdb/database/decoder.py: 94%
133 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
1''' Parsing NMEA messages to create an SQL database.
2 See function decode_msgs() for usage
3'''
5from hashlib import md5
6from functools import partial
7from copy import deepcopy
8from datetime import timedelta
9import gzip
10import os
11import pickle
12import tempfile
13import zipfile
15from dateutil.rrule import rrule, MONTHLY
16import psycopg
18from aisdb.aisdb import decoder
19from aisdb.database.dbconn import SQLiteDBConn, PostgresDBConn
20from aisdb.proc_util import getfiledate
21from aisdb import sqlpath
24class FileChecksums():
26 def __init__(self, *, dbconn):
27 assert isinstance(dbconn, (PostgresDBConn, SQLiteDBConn))
28 self.dbconn = dbconn
29 self.checksums_table()
30 if not os.path.isdir(
31 '/tmp') and os.name == 'posix': # pragma: no cover
32 os.mkdir('/tmp')
33 self.tmp_dir = tempfile.mkdtemp()
34 if not os.path.isdir(self.tmp_dir): 34 ↛ 35line 34 didn't jump to line 35, because the condition on line 34 was never true
35 os.mkdir(self.tmp_dir)
37 def checksums_table(self):
38 ''' instantiates new database connection and creates a checksums
39 hashmap table if it doesn't exist yet.
41 creates a temporary directory and saves path to ``self.tmp_dir``
43 creates SQLite connection attribute ``self.dbconn``, which should
44 be closed after use
46 e.g.
47 self.dbconn.close()
48 '''
49 cur = self.dbconn.cursor()
50 if isinstance(self.dbconn, SQLiteDBConn):
51 cur.execute('''
52 CREATE TABLE IF NOT EXISTS
53 hashmap(
54 hash INTEGER PRIMARY KEY,
55 bytes BLOB
56 )
57 ''')
58 elif isinstance(self.dbconn, PostgresDBConn):
59 cur.execute('''
60 CREATE TABLE IF NOT EXISTS
61 hashmap(
62 hash TEXT PRIMARY KEY,
63 bytes BYTEA
64 );''')
66 cur.execute('CREATE UNIQUE INDEX '
67 'IF NOT EXISTS '
68 'idx_map on hashmap(hash)')
69 self.dbconn.commit()
71 def insert_checksum(self, checksum):
72 if isinstance(self.dbconn, SQLiteDBConn):
73 self.dbconn.execute('INSERT INTO hashmap VALUES (?,?)',
74 [checksum, pickle.dumps(None)])
75 elif isinstance(self.dbconn, PostgresDBConn):
76 self.dbconn.execute(
77 'INSERT INTO hashmap VALUES ($1,$2) ON CONFLICT DO NOTHING',
78 [checksum, pickle.dumps(None)])
80 def checksum_exists(self, checksum):
81 cur = self.dbconn.cursor()
82 if isinstance(self.dbconn, SQLiteDBConn):
83 cur.execute('SELECT * FROM hashmap WHERE hash = ?', [checksum])
84 elif isinstance(self.dbconn, PostgresDBConn):
85 cur.execute('SELECT * FROM hashmap WHERE hash = %s', [checksum])
86 res = cur.fetchone()
88 if res is None or res is False:
89 return False
91 return True
93 def get_md5(self, path, f):
94 ''' get md5 hash from the first kilobyte of data '''
95 # skip header row in CSV format(~1.6kb)
96 if path[-4:].lower() == '.csv':
97 _ = f.read(1600)
98 digest = md5(f.read(1000)).hexdigest()
99 return digest
102def _fast_unzip(zipf, dirname):
103 ''' parallel process worker for fast_unzip() '''
104 if zipf.lower()[-4:] == '.zip':
105 exists = set(sorted(os.listdir(dirname)))
106 with zipfile.ZipFile(zipf, 'r') as zip_ref:
107 contents = set(zip_ref.namelist())
108 members = list(contents - exists)
109 zip_ref.extractall(path=dirname, members=members)
110 elif zipf.lower()[-3:] == '.gz':
111 unzip_file = os.path.join(dirname,
112 zipf.rsplit(os.path.sep, 1)[-1][:-3])
113 with gzip.open(zipf, 'rb') as f1, open(unzip_file, 'wb') as f2:
114 f2.write(f1.read())
115 else:
116 raise ValueError('unknown zip file type')
119def fast_unzip(zipfilenames, dirname, processes=12):
120 ''' unzip many files in parallel
121 any existing unzipped files in the target directory will be skipped
122 '''
124 print(f'unzipping files to {dirname} ... '
125 '(set the TMPDIR environment variable to change this)')
127 fcn = partial(_fast_unzip, dirname=dirname)
128 '''
129 with Pool(processes) as p:
130 p.imap_unordered(fcn, zipfilenames)
131 p.close()
132 p.join()
133 '''
134 for file in zipfilenames:
135 fcn(file)
138def decode_msgs(filepaths,
139 dbconn,
140 source,
141 vacuum=False,
142 skip_checksum=False,
143 verbose=True):
144 ''' Decode NMEA format AIS messages and store in an SQLite database.
145 To speed up decoding, create the database on a different hard drive
146 from where the raw data is stored.
147 A checksum of the first kilobyte of every file will be stored to
148 prevent loading the same file twice.
150 If the filepath has a .gz or .zip extension, the file will be
151 decompressed into a temporary directory before database insert.
153 args:
154 filepaths (list)
155 absolute filepath locations for AIS message files to be
156 ingested into the database
157 dbconn (:class:`aisdb.database.dbconn.DBConn`)
158 database connection object
159 source (string)
160 data source name or description. will be used as a primary key
161 column, so duplicate messages from different sources will not
162 be ignored as duplicates upon insert
163 vacuum (boolean, str)
164 if True, the database will be vacuumed after completion.
165 if string, the database will be vacuumed into the filepath
166 given. Consider vacuuming to second hard disk to speed this up
168 returns:
169 None
171 example:
174 .. _example_decode:
176 >>> import os
177 >>> from aisdb import decode_msgs, DBConn
178 >>> filepaths = ['aisdb/tests/testdata/test_data_20210701.csv',
179 ... 'aisdb/tests/testdata/test_data_20211101.nm4']
180 >>> with SQLiteDBConn('test_decode_msgs.db') as dbconn:
181 ... decode_msgs(filepaths=filepaths, dbconn=dbconn,
182 ... source='TESTING', verbose=False)
183 '''
184 '''
185 >>> os.remove('test_decode_msgs.db')
186 '''
187 # psql_conn_string (string)
188 # Postgres connection string. If dbconn is an SQLite database
189 # connection, set this to ``None``.
190 if not isinstance(dbconn,
191 (SQLiteDBConn, PostgresDBConn)): # pragma: no cover
192 raise ValueError('db argument must be a DBConn database connection. '
193 f'got {dbconn}')
195 if len(filepaths) == 0: # pragma: no cover
196 raise ValueError('must supply atleast one filepath.')
198 dbindex = FileChecksums(dbconn=dbconn)
200 # handle zipfiles
201 zipped = {
202 f
203 for f in filepaths
204 if f.lower()[-4:] == '.zip' or f.lower()[-3:] == '.gz'
205 }
206 not_zipped = sorted(list(set(filepaths) - set(zipped)))
207 zipped_checksums = []
208 not_zipped_checksums = []
209 unzipped_checksums = []
210 _skipped = []
212 if verbose: 212 ↛ 215line 212 didn't jump to line 215, because the condition on line 212 was never false
213 print('generating file checksums...')
215 for item in deepcopy(zipped):
216 with open(os.path.abspath(item), 'rb') as f:
217 signature = dbindex.get_md5(item, f)
218 if skip_checksum:
219 continue
220 if dbindex.checksum_exists(signature):
221 _skipped.append(item)
222 zipped.remove(item)
223 if verbose: 223 ↛ 215line 223 didn't jump to line 215, because the condition on line 223 was never false
224 print(f'found matching checksum, skipping {item}')
225 else:
226 zipped_checksums.append(signature)
228 for item in deepcopy(not_zipped):
229 with open(os.path.abspath(item), 'rb') as f:
230 signature = dbindex.get_md5(item, f)
231 if skip_checksum:
232 continue
233 if dbindex.checksum_exists(signature):
234 _skipped.append(item)
235 not_zipped.remove(item)
236 if verbose: 236 ↛ 228line 236 didn't jump to line 228, because the condition on line 236 was never false
237 print(f'found matching checksum, skipping {item}')
238 else:
239 not_zipped_checksums.append(signature)
241 if zipped:
242 fast_unzip(zipped, dbindex.tmp_dir)
243 unzipped = sorted([
244 os.path.join(dbindex.tmp_dir, f)
245 for f in os.listdir(dbindex.tmp_dir)
246 ])
247 assert unzipped
248 if not skip_checksum:
249 for item in unzipped:
250 with open(os.path.abspath(item), 'rb') as f:
251 signature = dbindex.get_md5(item, f)
252 unzipped_checksums.append(signature)
253 else:
254 unzipped = []
256 raw_files = not_zipped + unzipped
257 raw_files
259 if not raw_files:
260 print('All files returned an existing checksum.',
261 'Cleaning temporary data...')
262 for tmpfile in unzipped: 262 ↛ 263line 262 didn't jump to line 263, because the loop on line 262 never started
263 os.remove(tmpfile)
264 os.removedirs(dbindex.tmp_dir)
265 return
267 assert skip_checksum or len(not_zipped) == len(not_zipped_checksums)
268 assert skip_checksum or len(zipped) == len(zipped_checksums)
269 assert skip_checksum or len(unzipped) == len(unzipped_checksums)
271 # TODO: get file dates and create new tables before insert
272 if verbose: 272 ↛ 274line 272 didn't jump to line 274, because the condition on line 272 was never false
273 print('checking file dates...')
274 filedates = [getfiledate(f) for f in raw_files]
275 months = [
276 month.strftime('%Y%m') for month in rrule(
277 freq=MONTHLY,
278 dtstart=min(filedates) - (timedelta(days=min(filedates).day - 1)),
279 until=max(filedates),
280 )
281 ]
283 if verbose:
284 print('creating tables and dropping table indexes...')
286 # drop constraints and indexes to speed up insert,
287 # and rebuild them after inserting
288 if isinstance(dbconn, PostgresDBConn):
289 with open(
290 os.path.join(sqlpath, 'psql_createtable_dynamic_noindex.sql'),
291 'r') as f:
292 create_dynamic_table_stmt = f.read()
293 with open(os.path.join(sqlpath, 'createtable_static.sql'), 'r') as f:
294 create_static_table_stmt = f.read()
295 for month in months:
296 dbconn.execute(create_dynamic_table_stmt.format(month))
297 dbconn.execute(create_static_table_stmt.format(month))
298 dbconn.execute(
299 f'ALTER TABLE ais_{month}_dynamic '
300 f'DROP CONSTRAINT IF EXISTS ais_{month}_dynamic_pkey')
301 for idx_name in ('mmsi', 'time', 'lon', 'lat', 'cluster'):
302 dbconn.execute(
303 f'DROP INDEX IF EXISTS idx_ais_{month}_dynamic_{idx_name}')
304 dbconn.commit()
305 completed_files = decoder(dbpath='',
306 psql_conn_string=dbconn.connection_string,
307 files=raw_files,
308 source=source,
309 verbose=verbose)
311 elif isinstance(dbconn, SQLiteDBConn):
312 with open(os.path.join(sqlpath, 'createtable_dynamic_clustered.sql'),
313 'r') as f:
314 create_table_stmt = f.read()
315 for month in months:
316 dbconn.execute(create_table_stmt.format(month))
317 completed_files = decoder(dbpath=dbconn.dbpath,
318 psql_conn_string='',
319 files=raw_files,
320 source=source,
321 verbose=verbose)
322 else:
323 assert False
325 if verbose and not skip_checksum:
326 print('saving checksums...')
328 for filename, signature in zip(not_zipped + unzipped,
329 not_zipped_checksums + unzipped_checksums):
330 if filename in completed_files: 330 ↛ 331line 330 didn't jump to line 331, because the condition on line 330 was never true
331 dbindex.insert_checksum(signature)
332 else:
333 if verbose: 333 ↛ 328line 333 didn't jump to line 328, because the condition on line 333 was never false
334 print(f'error processing {filename}, skipping checksum...')
336 dbindex.dbconn.commit()
338 if verbose: 338 ↛ 341line 338 didn't jump to line 341, because the condition on line 338 was never false
339 print('cleaning temporary data...')
341 for tmpfile in unzipped:
342 os.remove(tmpfile)
343 os.removedirs(dbindex.tmp_dir)
345 if isinstance(dbconn, PostgresDBConn):
346 if verbose:
347 print('rebuilding indexes...')
348 for month in months:
349 dbconn.rebuild_indexes(month, verbose)
350 dbconn.execute('ANALYZE')
351 dbconn.commit()
353 dbconn.aggregate_static_msgs(months, verbose)
355 if vacuum is not False:
356 print("finished parsing data\nvacuuming...")
357 if isinstance(dbconn, SQLiteDBConn):
358 if vacuum is True:
359 dbconn.execute('VACUUM')
360 elif isinstance(vacuum, str):
361 assert not os.path.isfile(vacuum)
362 dbconn.execute(f"VACUUM INTO '{vacuum}'")
363 else:
364 raise ValueError(
365 'vacuum arg must be boolean or filepath string')
366 dbconn.commit()
367 elif isinstance(dbconn, (PostgresDBConn, psycopg.Connection)):
368 pass
369 else:
370 raise RuntimeError
371 '''
372 if vacuum is True:
373 dbconn.commit()
374 previous = dbconn.conn.autocommit
375 dbconn.conn.autocommit = True
376 dbconn.execute(
377 'VACUUM (verbose, index_cleanup, analyze)')
378 dbconn.conn.autocommit = previous
379 elif isinstance(vacuum, str):
380 raise ValueError(
381 'vacuum parameter must be True or False for PostgresDBConn'
382 )
383 else:
384 assert vacuum is False
385 '''
387 return