Coverage for aisdb/database/decoder.py: 94%

133 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-30 04:22 +0000

1''' Parsing NMEA messages to create an SQL database. 

2 See function decode_msgs() for usage 

3''' 

4 

5from hashlib import md5 

6from functools import partial 

7from copy import deepcopy 

8from datetime import timedelta 

9import gzip 

10import os 

11import pickle 

12import tempfile 

13import zipfile 

14 

15from dateutil.rrule import rrule, MONTHLY 

16import psycopg 

17 

18from aisdb.aisdb import decoder 

19from aisdb.database.dbconn import SQLiteDBConn, PostgresDBConn 

20from aisdb.proc_util import getfiledate 

21from aisdb import sqlpath 

22 

23 

24class FileChecksums(): 

25 

26 def __init__(self, *, dbconn): 

27 assert isinstance(dbconn, (PostgresDBConn, SQLiteDBConn)) 

28 self.dbconn = dbconn 

29 self.checksums_table() 

30 if not os.path.isdir( 

31 '/tmp') and os.name == 'posix': # pragma: no cover 

32 os.mkdir('/tmp') 

33 self.tmp_dir = tempfile.mkdtemp() 

34 if not os.path.isdir(self.tmp_dir): 34 ↛ 35line 34 didn't jump to line 35, because the condition on line 34 was never true

35 os.mkdir(self.tmp_dir) 

36 

37 def checksums_table(self): 

38 ''' instantiates new database connection and creates a checksums 

39 hashmap table if it doesn't exist yet. 

40 

41 creates a temporary directory and saves path to ``self.tmp_dir`` 

42 

43 creates SQLite connection attribute ``self.dbconn``, which should 

44 be closed after use 

45 

46 e.g. 

47 self.dbconn.close() 

48 ''' 

49 cur = self.dbconn.cursor() 

50 if isinstance(self.dbconn, SQLiteDBConn): 

51 cur.execute(''' 

52 CREATE TABLE IF NOT EXISTS 

53 hashmap( 

54 hash INTEGER PRIMARY KEY, 

55 bytes BLOB 

56 ) 

57 ''') 

58 elif isinstance(self.dbconn, PostgresDBConn): 

59 cur.execute(''' 

60 CREATE TABLE IF NOT EXISTS 

61 hashmap( 

62 hash TEXT PRIMARY KEY, 

63 bytes BYTEA 

64 );''') 

65 

66 cur.execute('CREATE UNIQUE INDEX ' 

67 'IF NOT EXISTS ' 

68 'idx_map on hashmap(hash)') 

69 self.dbconn.commit() 

70 

71 def insert_checksum(self, checksum): 

72 if isinstance(self.dbconn, SQLiteDBConn): 

73 self.dbconn.execute('INSERT INTO hashmap VALUES (?,?)', 

74 [checksum, pickle.dumps(None)]) 

75 elif isinstance(self.dbconn, PostgresDBConn): 

76 self.dbconn.execute( 

77 'INSERT INTO hashmap VALUES ($1,$2) ON CONFLICT DO NOTHING', 

78 [checksum, pickle.dumps(None)]) 

79 

80 def checksum_exists(self, checksum): 

81 cur = self.dbconn.cursor() 

82 if isinstance(self.dbconn, SQLiteDBConn): 

83 cur.execute('SELECT * FROM hashmap WHERE hash = ?', [checksum]) 

84 elif isinstance(self.dbconn, PostgresDBConn): 

85 cur.execute('SELECT * FROM hashmap WHERE hash = %s', [checksum]) 

86 res = cur.fetchone() 

87 

88 if res is None or res is False: 

89 return False 

90 

91 return True 

92 

93 def get_md5(self, path, f): 

94 ''' get md5 hash from the first kilobyte of data ''' 

95 # skip header row in CSV format(~1.6kb) 

96 if path[-4:].lower() == '.csv': 

97 _ = f.read(1600) 

98 digest = md5(f.read(1000)).hexdigest() 

99 return digest 

100 

101 

102def _fast_unzip(zipf, dirname): 

103 ''' parallel process worker for fast_unzip() ''' 

104 if zipf.lower()[-4:] == '.zip': 

105 exists = set(sorted(os.listdir(dirname))) 

106 with zipfile.ZipFile(zipf, 'r') as zip_ref: 

107 contents = set(zip_ref.namelist()) 

108 members = list(contents - exists) 

109 zip_ref.extractall(path=dirname, members=members) 

110 elif zipf.lower()[-3:] == '.gz': 

111 unzip_file = os.path.join(dirname, 

112 zipf.rsplit(os.path.sep, 1)[-1][:-3]) 

113 with gzip.open(zipf, 'rb') as f1, open(unzip_file, 'wb') as f2: 

114 f2.write(f1.read()) 

115 else: 

116 raise ValueError('unknown zip file type') 

117 

118 

119def fast_unzip(zipfilenames, dirname, processes=12): 

120 ''' unzip many files in parallel 

121 any existing unzipped files in the target directory will be skipped 

122 ''' 

123 

124 print(f'unzipping files to {dirname} ... ' 

125 '(set the TMPDIR environment variable to change this)') 

126 

127 fcn = partial(_fast_unzip, dirname=dirname) 

128 ''' 

129 with Pool(processes) as p: 

130 p.imap_unordered(fcn, zipfilenames) 

131 p.close() 

132 p.join() 

133 ''' 

134 for file in zipfilenames: 

135 fcn(file) 

136 

137 

138def decode_msgs(filepaths, 

139 dbconn, 

140 source, 

141 vacuum=False, 

142 skip_checksum=False, 

143 verbose=True): 

144 ''' Decode NMEA format AIS messages and store in an SQLite database. 

145 To speed up decoding, create the database on a different hard drive 

146 from where the raw data is stored. 

147 A checksum of the first kilobyte of every file will be stored to 

148 prevent loading the same file twice. 

149 

150 If the filepath has a .gz or .zip extension, the file will be 

151 decompressed into a temporary directory before database insert. 

152 

153 args: 

154 filepaths (list) 

155 absolute filepath locations for AIS message files to be 

156 ingested into the database 

157 dbconn (:class:`aisdb.database.dbconn.DBConn`) 

158 database connection object 

159 source (string) 

160 data source name or description. will be used as a primary key 

161 column, so duplicate messages from different sources will not 

162 be ignored as duplicates upon insert 

163 vacuum (boolean, str) 

164 if True, the database will be vacuumed after completion. 

165 if string, the database will be vacuumed into the filepath 

166 given. Consider vacuuming to second hard disk to speed this up 

167 

168 returns: 

169 None 

170 

171 example: 

172 

173 

174 .. _example_decode: 

175 

176 >>> import os 

177 >>> from aisdb import decode_msgs, DBConn 

178 >>> filepaths = ['aisdb/tests/testdata/test_data_20210701.csv', 

179 ... 'aisdb/tests/testdata/test_data_20211101.nm4'] 

180 >>> with SQLiteDBConn('test_decode_msgs.db') as dbconn: 

181 ... decode_msgs(filepaths=filepaths, dbconn=dbconn, 

182 ... source='TESTING', verbose=False) 

183 ''' 

184 ''' 

185 >>> os.remove('test_decode_msgs.db') 

186 ''' 

187 # psql_conn_string (string) 

188 # Postgres connection string. If dbconn is an SQLite database 

189 # connection, set this to ``None``. 

190 if not isinstance(dbconn, 

191 (SQLiteDBConn, PostgresDBConn)): # pragma: no cover 

192 raise ValueError('db argument must be a DBConn database connection. ' 

193 f'got {dbconn}') 

194 

195 if len(filepaths) == 0: # pragma: no cover 

196 raise ValueError('must supply atleast one filepath.') 

197 

198 dbindex = FileChecksums(dbconn=dbconn) 

199 

200 # handle zipfiles 

201 zipped = { 

202 f 

203 for f in filepaths 

204 if f.lower()[-4:] == '.zip' or f.lower()[-3:] == '.gz' 

205 } 

206 not_zipped = sorted(list(set(filepaths) - set(zipped))) 

207 zipped_checksums = [] 

208 not_zipped_checksums = [] 

209 unzipped_checksums = [] 

210 _skipped = [] 

211 

212 if verbose: 212 ↛ 215line 212 didn't jump to line 215, because the condition on line 212 was never false

213 print('generating file checksums...') 

214 

215 for item in deepcopy(zipped): 

216 with open(os.path.abspath(item), 'rb') as f: 

217 signature = dbindex.get_md5(item, f) 

218 if skip_checksum: 

219 continue 

220 if dbindex.checksum_exists(signature): 

221 _skipped.append(item) 

222 zipped.remove(item) 

223 if verbose: 223 ↛ 215line 223 didn't jump to line 215, because the condition on line 223 was never false

224 print(f'found matching checksum, skipping {item}') 

225 else: 

226 zipped_checksums.append(signature) 

227 

228 for item in deepcopy(not_zipped): 

229 with open(os.path.abspath(item), 'rb') as f: 

230 signature = dbindex.get_md5(item, f) 

231 if skip_checksum: 

232 continue 

233 if dbindex.checksum_exists(signature): 

234 _skipped.append(item) 

235 not_zipped.remove(item) 

236 if verbose: 236 ↛ 228line 236 didn't jump to line 228, because the condition on line 236 was never false

237 print(f'found matching checksum, skipping {item}') 

238 else: 

239 not_zipped_checksums.append(signature) 

240 

241 if zipped: 

242 fast_unzip(zipped, dbindex.tmp_dir) 

243 unzipped = sorted([ 

244 os.path.join(dbindex.tmp_dir, f) 

245 for f in os.listdir(dbindex.tmp_dir) 

246 ]) 

247 assert unzipped 

248 if not skip_checksum: 

249 for item in unzipped: 

250 with open(os.path.abspath(item), 'rb') as f: 

251 signature = dbindex.get_md5(item, f) 

252 unzipped_checksums.append(signature) 

253 else: 

254 unzipped = [] 

255 

256 raw_files = not_zipped + unzipped 

257 raw_files 

258 

259 if not raw_files: 

260 print('All files returned an existing checksum.', 

261 'Cleaning temporary data...') 

262 for tmpfile in unzipped: 262 ↛ 263line 262 didn't jump to line 263, because the loop on line 262 never started

263 os.remove(tmpfile) 

264 os.removedirs(dbindex.tmp_dir) 

265 return 

266 

267 assert skip_checksum or len(not_zipped) == len(not_zipped_checksums) 

268 assert skip_checksum or len(zipped) == len(zipped_checksums) 

269 assert skip_checksum or len(unzipped) == len(unzipped_checksums) 

270 

271 # TODO: get file dates and create new tables before insert 

272 if verbose: 272 ↛ 274line 272 didn't jump to line 274, because the condition on line 272 was never false

273 print('checking file dates...') 

274 filedates = [getfiledate(f) for f in raw_files] 

275 months = [ 

276 month.strftime('%Y%m') for month in rrule( 

277 freq=MONTHLY, 

278 dtstart=min(filedates) - (timedelta(days=min(filedates).day - 1)), 

279 until=max(filedates), 

280 ) 

281 ] 

282 

283 if verbose: 

284 print('creating tables and dropping table indexes...') 

285 

286 # drop constraints and indexes to speed up insert, 

287 # and rebuild them after inserting 

288 if isinstance(dbconn, PostgresDBConn): 

289 with open( 

290 os.path.join(sqlpath, 'psql_createtable_dynamic_noindex.sql'), 

291 'r') as f: 

292 create_dynamic_table_stmt = f.read() 

293 with open(os.path.join(sqlpath, 'createtable_static.sql'), 'r') as f: 

294 create_static_table_stmt = f.read() 

295 for month in months: 

296 dbconn.execute(create_dynamic_table_stmt.format(month)) 

297 dbconn.execute(create_static_table_stmt.format(month)) 

298 dbconn.execute( 

299 f'ALTER TABLE ais_{month}_dynamic ' 

300 f'DROP CONSTRAINT IF EXISTS ais_{month}_dynamic_pkey') 

301 for idx_name in ('mmsi', 'time', 'lon', 'lat', 'cluster'): 

302 dbconn.execute( 

303 f'DROP INDEX IF EXISTS idx_ais_{month}_dynamic_{idx_name}') 

304 dbconn.commit() 

305 completed_files = decoder(dbpath='', 

306 psql_conn_string=dbconn.connection_string, 

307 files=raw_files, 

308 source=source, 

309 verbose=verbose) 

310 

311 elif isinstance(dbconn, SQLiteDBConn): 

312 with open(os.path.join(sqlpath, 'createtable_dynamic_clustered.sql'), 

313 'r') as f: 

314 create_table_stmt = f.read() 

315 for month in months: 

316 dbconn.execute(create_table_stmt.format(month)) 

317 completed_files = decoder(dbpath=dbconn.dbpath, 

318 psql_conn_string='', 

319 files=raw_files, 

320 source=source, 

321 verbose=verbose) 

322 else: 

323 assert False 

324 

325 if verbose and not skip_checksum: 

326 print('saving checksums...') 

327 

328 for filename, signature in zip(not_zipped + unzipped, 

329 not_zipped_checksums + unzipped_checksums): 

330 if filename in completed_files: 330 ↛ 331line 330 didn't jump to line 331, because the condition on line 330 was never true

331 dbindex.insert_checksum(signature) 

332 else: 

333 if verbose: 333 ↛ 328line 333 didn't jump to line 328, because the condition on line 333 was never false

334 print(f'error processing {filename}, skipping checksum...') 

335 

336 dbindex.dbconn.commit() 

337 

338 if verbose: 338 ↛ 341line 338 didn't jump to line 341, because the condition on line 338 was never false

339 print('cleaning temporary data...') 

340 

341 for tmpfile in unzipped: 

342 os.remove(tmpfile) 

343 os.removedirs(dbindex.tmp_dir) 

344 

345 if isinstance(dbconn, PostgresDBConn): 

346 if verbose: 

347 print('rebuilding indexes...') 

348 for month in months: 

349 dbconn.rebuild_indexes(month, verbose) 

350 dbconn.execute('ANALYZE') 

351 dbconn.commit() 

352 

353 dbconn.aggregate_static_msgs(months, verbose) 

354 

355 if vacuum is not False: 

356 print("finished parsing data\nvacuuming...") 

357 if isinstance(dbconn, SQLiteDBConn): 

358 if vacuum is True: 

359 dbconn.execute('VACUUM') 

360 elif isinstance(vacuum, str): 

361 assert not os.path.isfile(vacuum) 

362 dbconn.execute(f"VACUUM INTO '{vacuum}'") 

363 else: 

364 raise ValueError( 

365 'vacuum arg must be boolean or filepath string') 

366 dbconn.commit() 

367 elif isinstance(dbconn, (PostgresDBConn, psycopg.Connection)): 

368 pass 

369 else: 

370 raise RuntimeError 

371 ''' 

372 if vacuum is True: 

373 dbconn.commit() 

374 previous = dbconn.conn.autocommit 

375 dbconn.conn.autocommit = True 

376 dbconn.execute( 

377 'VACUUM (verbose, index_cleanup, analyze)') 

378 dbconn.conn.autocommit = previous 

379 elif isinstance(vacuum, str): 

380 raise ValueError( 

381 'vacuum parameter must be True or False for PostgresDBConn' 

382 ) 

383 else: 

384 assert vacuum is False 

385 ''' 

386 

387 return