Source code for aisdb.database.decoder

''' Parsing NMEA messages to create an SQL database.
    See function decode_msgs() for usage
'''

from hashlib import md5
from functools import partial
from copy import deepcopy
from datetime import timedelta
import gzip
import os
import pickle
import tempfile
import zipfile

from dateutil.rrule import rrule, MONTHLY
import psycopg

from aisdb.aisdb import decoder
from aisdb.database.dbconn import SQLiteDBConn, PostgresDBConn
from aisdb.proc_util import getfiledate
from aisdb import sqlpath


[docs] class FileChecksums(): def __init__(self, *, dbconn): assert isinstance(dbconn, (PostgresDBConn, SQLiteDBConn)) self.dbconn = dbconn self.checksums_table() if not os.path.isdir( '/tmp') and os.name == 'posix': # pragma: no cover os.mkdir('/tmp') self.tmp_dir = tempfile.mkdtemp() if not os.path.isdir(self.tmp_dir): os.mkdir(self.tmp_dir)
[docs] def checksums_table(self): ''' instantiates new database connection and creates a checksums hashmap table if it doesn't exist yet. creates a temporary directory and saves path to ``self.tmp_dir`` creates SQLite connection attribute ``self.dbconn``, which should be closed after use e.g. self.dbconn.close() ''' cur = self.dbconn.cursor() if isinstance(self.dbconn, SQLiteDBConn): cur.execute(''' CREATE TABLE IF NOT EXISTS hashmap( hash INTEGER PRIMARY KEY, bytes BLOB ) ''') elif isinstance(self.dbconn, PostgresDBConn): cur.execute(''' CREATE TABLE IF NOT EXISTS hashmap( hash TEXT PRIMARY KEY, bytes BYTEA );''') cur.execute('CREATE UNIQUE INDEX ' 'IF NOT EXISTS ' 'idx_map on hashmap(hash)') self.dbconn.commit()
[docs] def insert_checksum(self, checksum): if isinstance(self.dbconn, SQLiteDBConn): self.dbconn.execute('INSERT INTO hashmap VALUES (?,?)', [checksum, pickle.dumps(None)]) elif isinstance(self.dbconn, PostgresDBConn): self.dbconn.execute( 'INSERT INTO hashmap VALUES ($1,$2) ON CONFLICT DO NOTHING', [checksum, pickle.dumps(None)])
[docs] def checksum_exists(self, checksum): cur = self.dbconn.cursor() if isinstance(self.dbconn, SQLiteDBConn): cur.execute('SELECT * FROM hashmap WHERE hash = ?', [checksum]) elif isinstance(self.dbconn, PostgresDBConn): cur.execute('SELECT * FROM hashmap WHERE hash = %s', [checksum]) res = cur.fetchone() if res is None or res is False: return False return True
[docs] def get_md5(self, path, f): ''' get md5 hash from the first kilobyte of data ''' # skip header row in CSV format(~1.6kb) if path[-4:].lower() == '.csv': _ = f.read(1600) digest = md5(f.read(1000)).hexdigest() return digest
def _fast_unzip(zipf, dirname): ''' parallel process worker for fast_unzip() ''' if zipf.lower()[-4:] == '.zip': exists = set(sorted(os.listdir(dirname))) with zipfile.ZipFile(zipf, 'r') as zip_ref: contents = set(zip_ref.namelist()) members = list(contents - exists) zip_ref.extractall(path=dirname, members=members) elif zipf.lower()[-3:] == '.gz': unzip_file = os.path.join(dirname, zipf.rsplit(os.path.sep, 1)[-1][:-3]) with gzip.open(zipf, 'rb') as f1, open(unzip_file, 'wb') as f2: f2.write(f1.read()) else: raise ValueError('unknown zip file type')
[docs] def fast_unzip(zipfilenames, dirname, processes=12): ''' unzip many files in parallel any existing unzipped files in the target directory will be skipped ''' print(f'unzipping files to {dirname} ... ' '(set the TMPDIR environment variable to change this)') fcn = partial(_fast_unzip, dirname=dirname) ''' with Pool(processes) as p: p.imap_unordered(fcn, zipfilenames) p.close() p.join() ''' for file in zipfilenames: fcn(file)
[docs] def decode_msgs(filepaths, dbconn, source, vacuum=False, skip_checksum=False, verbose=True): ''' Decode NMEA format AIS messages and store in an SQLite database. To speed up decoding, create the database on a different hard drive from where the raw data is stored. A checksum of the first kilobyte of every file will be stored to prevent loading the same file twice. If the filepath has a .gz or .zip extension, the file will be decompressed into a temporary directory before database insert. args: filepaths (list) absolute filepath locations for AIS message files to be ingested into the database dbconn (:class:`aisdb.database.dbconn.DBConn`) database connection object source (string) data source name or description. will be used as a primary key column, so duplicate messages from different sources will not be ignored as duplicates upon insert vacuum (boolean, str) if True, the database will be vacuumed after completion. if string, the database will be vacuumed into the filepath given. Consider vacuuming to second hard disk to speed this up returns: None example: .. _example_decode: >>> import os >>> from aisdb import decode_msgs, DBConn >>> filepaths = ['aisdb/tests/testdata/test_data_20210701.csv', ... 'aisdb/tests/testdata/test_data_20211101.nm4'] >>> with SQLiteDBConn('test_decode_msgs.db') as dbconn: ... decode_msgs(filepaths=filepaths, dbconn=dbconn, ... source='TESTING', verbose=False) ''' ''' >>> os.remove('test_decode_msgs.db') ''' # psql_conn_string (string) # Postgres connection string. If dbconn is an SQLite database # connection, set this to ``None``. if not isinstance(dbconn, (SQLiteDBConn, PostgresDBConn)): # pragma: no cover raise ValueError('db argument must be a DBConn database connection. ' f'got {dbconn}') if len(filepaths) == 0: # pragma: no cover raise ValueError('must supply atleast one filepath.') dbindex = FileChecksums(dbconn=dbconn) # handle zipfiles zipped = { f for f in filepaths if f.lower()[-4:] == '.zip' or f.lower()[-3:] == '.gz' } not_zipped = sorted(list(set(filepaths) - set(zipped))) zipped_checksums = [] not_zipped_checksums = [] unzipped_checksums = [] _skipped = [] if verbose: print('generating file checksums...') for item in deepcopy(zipped): with open(os.path.abspath(item), 'rb') as f: signature = dbindex.get_md5(item, f) if skip_checksum: continue if dbindex.checksum_exists(signature): _skipped.append(item) zipped.remove(item) if verbose: print(f'found matching checksum, skipping {item}') else: zipped_checksums.append(signature) for item in deepcopy(not_zipped): with open(os.path.abspath(item), 'rb') as f: signature = dbindex.get_md5(item, f) if skip_checksum: continue if dbindex.checksum_exists(signature): _skipped.append(item) not_zipped.remove(item) if verbose: print(f'found matching checksum, skipping {item}') else: not_zipped_checksums.append(signature) if zipped: fast_unzip(zipped, dbindex.tmp_dir) unzipped = sorted([ os.path.join(dbindex.tmp_dir, f) for f in os.listdir(dbindex.tmp_dir) ]) assert unzipped if not skip_checksum: for item in unzipped: with open(os.path.abspath(item), 'rb') as f: signature = dbindex.get_md5(item, f) unzipped_checksums.append(signature) else: unzipped = [] raw_files = not_zipped + unzipped raw_files if not raw_files: print('All files returned an existing checksum.', 'Cleaning temporary data...') for tmpfile in unzipped: os.remove(tmpfile) os.removedirs(dbindex.tmp_dir) return assert skip_checksum or len(not_zipped) == len(not_zipped_checksums) assert skip_checksum or len(zipped) == len(zipped_checksums) assert skip_checksum or len(unzipped) == len(unzipped_checksums) # TODO: get file dates and create new tables before insert if verbose: print('checking file dates...') filedates = [getfiledate(f) for f in raw_files] months = [ month.strftime('%Y%m') for month in rrule( freq=MONTHLY, dtstart=min(filedates) - (timedelta(days=min(filedates).day - 1)), until=max(filedates), ) ] if verbose: print('creating tables and dropping table indexes...') # drop constraints and indexes to speed up insert, # and rebuild them after inserting if isinstance(dbconn, PostgresDBConn): with open( os.path.join(sqlpath, 'psql_createtable_dynamic_noindex.sql'), 'r') as f: create_dynamic_table_stmt = f.read() with open(os.path.join(sqlpath, 'createtable_static.sql'), 'r') as f: create_static_table_stmt = f.read() for month in months: dbconn.execute(create_dynamic_table_stmt.format(month)) dbconn.execute(create_static_table_stmt.format(month)) dbconn.execute( f'ALTER TABLE ais_{month}_dynamic ' f'DROP CONSTRAINT IF EXISTS ais_{month}_dynamic_pkey') for idx_name in ('mmsi', 'time', 'lon', 'lat', 'cluster'): dbconn.execute( f'DROP INDEX IF EXISTS idx_ais_{month}_dynamic_{idx_name}') dbconn.commit() completed_files = decoder(dbpath='', psql_conn_string=dbconn.connection_string, files=raw_files, source=source, verbose=verbose) elif isinstance(dbconn, SQLiteDBConn): with open(os.path.join(sqlpath, 'createtable_dynamic_clustered.sql'), 'r') as f: create_table_stmt = f.read() for month in months: dbconn.execute(create_table_stmt.format(month)) completed_files = decoder(dbpath=dbconn.dbpath, psql_conn_string='', files=raw_files, source=source, verbose=verbose) else: assert False if verbose and not skip_checksum: print('saving checksums...') for filename, signature in zip(not_zipped + unzipped, not_zipped_checksums + unzipped_checksums): if filename in completed_files: dbindex.insert_checksum(signature) else: if verbose: print(f'error processing {filename}, skipping checksum...') dbindex.dbconn.commit() if verbose: print('cleaning temporary data...') for tmpfile in unzipped: os.remove(tmpfile) os.removedirs(dbindex.tmp_dir) if isinstance(dbconn, PostgresDBConn): if verbose: print('rebuilding indexes...') for month in months: dbconn.rebuild_indexes(month, verbose) dbconn.execute('ANALYZE') dbconn.commit() dbconn.aggregate_static_msgs(months, verbose) if vacuum is not False: print("finished parsing data\nvacuuming...") if isinstance(dbconn, SQLiteDBConn): if vacuum is True: dbconn.execute('VACUUM') elif isinstance(vacuum, str): assert not os.path.isfile(vacuum) dbconn.execute(f"VACUUM INTO '{vacuum}'") else: raise ValueError( 'vacuum arg must be boolean or filepath string') dbconn.commit() elif isinstance(dbconn, (PostgresDBConn, psycopg.Connection)): pass else: raise RuntimeError ''' if vacuum is True: dbconn.commit() previous = dbconn.conn.autocommit dbconn.conn.autocommit = True dbconn.execute( 'VACUUM (verbose, index_cleanup, analyze)') dbconn.conn.autocommit = previous elif isinstance(vacuum, str): raise ValueError( 'vacuum parameter must be True or False for PostgresDBConn' ) else: assert vacuum is False ''' return