Coverage for aisdb/database/dbconn.py: 90%

228 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-30 04:22 +0000

1''' SQLite Database connection 

2 

3 Also see: https://docs.python.org/3/library/sqlite3.html#connection-objects 

4''' 

5 

6from calendar import monthrange 

7from collections import Counter 

8from datetime import datetime 

9from enum import Enum 

10import ipaddress 

11import os 

12import re 

13import warnings 

14 

15from aisdb import sqlite3, sqlpath 

16from aisdb.database.create_tables import ( 

17 sql_aggregate, 

18 sql_createtable_static, 

19) 

20 

21import numpy as np 

22import psycopg 

23 

24with open(os.path.join(sqlpath, 'coarsetype.sql'), 'r') as f: 

25 coarsetype_sql = f.read().split(';') 

26 

27 

28class _DBConn(): 

29 ''' AISDB Database connection handler ''' 

30 

31 def _create_table_coarsetype(self): 

32 ''' create a table to describe integer vessel type as a human-readable 

33 string. 

34 ''' 

35 #cur = self.cursor() 

36 for stmt in coarsetype_sql: 

37 if stmt == '\n': 

38 continue 

39 #cur.execute(stmt) 

40 self.execute(stmt) 

41 self.commit() 

42 #cur.close() 

43 

44 

45class SQLiteDBConn(_DBConn, sqlite3.Connection): 

46 ''' SQLite3 database connection object 

47 

48 attributes: 

49 dbpath (str) 

50 database filepath 

51 db_daterange (dict) 

52 temporal range of monthly database tables. keys are DB file 

53 names 

54 ''' 

55 

56 def __init__(self, dbpath): 

57 super().__init__( 

58 dbpath, 

59 timeout=5, 

60 detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, 

61 ) 

62 self.dbpath = dbpath 

63 self.row_factory = sqlite3.Row 

64 coarsetype_exists_qry = ( 

65 'SELECT * FROM sqlite_master ' 

66 r'WHERE type="table" AND name LIKE "coarsetype_ref" ') 

67 cur = self.cursor() 

68 cur.execute(coarsetype_exists_qry) 

69 if len(cur.fetchall()) == 0: 

70 self._create_table_coarsetype() 

71 self._set_db_daterange() 

72 

73 def _set_db_daterange(self): 

74 # query the temporal range of monthly database tables 

75 # results will be stored as a dictionary attribute db_daterange 

76 sql_qry = ('SELECT * FROM sqlite_master ' 

77 r'WHERE type="table" AND name LIKE "ais_%_dynamic" ') 

78 cur = self.cursor() 

79 cur.execute(sql_qry) 

80 dynamic_tables = cur.fetchall() 

81 if dynamic_tables != []: 

82 db_months = sorted( 

83 [table['name'].split('_')[1] for table in dynamic_tables]) 

84 self.db_daterange = { 

85 'start': 

86 datetime(int(db_months[0][:4]), int(db_months[0][4:]), 

87 1).date(), 

88 'end': 

89 datetime((y := int(db_months[-1][:4])), 

90 (m := int(db_months[-1][4:])), 

91 monthrange(y, m)[1]).date(), 

92 } 

93 else: 

94 self.db_daterange = {} 

95 cur.close() 

96 

97 def aggregate_static_msgs(self, months_str: list, verbose: bool = True): 

98 ''' collect an aggregate of static vessel reports for each unique MMSI 

99 identifier. The most frequently repeated values for each MMSI will 

100 be kept when multiple different reports appear for the same MMSI 

101 

102 this function should be called every time data is added to the database 

103 

104 args: 

105 dbconn (:class:`aisdb.database.dbconn.SQLiteDBConn`) 

106 database connection object 

107 months_str (list) 

108 list of strings with format: YYYYmm 

109 verbose (bool) 

110 logs messages to stdout 

111 ''' 

112 

113 assert hasattr(self, 'dbpath') 

114 assert not hasattr(self, 'dbpaths') 

115 

116 cur = self.cursor() 

117 

118 for month in months_str: 

119 # check for monthly tables in dbfiles containing static reports 

120 cur.execute( 

121 'SELECT name FROM sqlite_master ' 

122 'WHERE type="table" AND name=?', [f'ais_{month}_static']) 

123 if cur.fetchall() == []: 

124 continue 

125 

126 cur.execute(sql_createtable_static.format(month)) 

127 

128 if verbose: 

129 print('aggregating static reports into ' 

130 f'static_{month}_aggregate...') 

131 cur.execute('SELECT DISTINCT s.mmsi FROM ' 

132 f'ais_{month}_static AS s') 

133 mmsis = np.array(cur.fetchall(), dtype=int).flatten() 

134 

135 cur.execute('DROP TABLE IF EXISTS ' 

136 f'static_{month}_aggregate') 

137 

138 sql_select = ''' 

139 SELECT 

140 s.mmsi, s.imo, TRIM(vessel_name) as vessel_name, s.ship_type, 

141 s.call_sign, s.dim_bow, s.dim_stern, s.dim_port, s.dim_star, 

142 s.draught 

143 FROM ais_{}_static AS s WHERE s.mmsi = ? 

144 '''.format(month) 

145 

146 agg_rows = [] 

147 for mmsi in mmsis: 

148 _ = cur.execute(sql_select, (str(mmsi), )) 

149 cur_mmsi = cur.fetchall() 

150 

151 cols = np.array(cur_mmsi, dtype=object).T 

152 assert len(cols) > 0 

153 

154 filtercols = np.array( 

155 [ 

156 np.array(list(filter(None, col)), dtype=object) 

157 for col in cols 

158 ], 

159 dtype=object, 

160 ) 

161 

162 paddedcols = np.array( 

163 [col if len(col) > 0 else [None] for col in filtercols], 

164 dtype=object, 

165 ) 

166 

167 aggregated = [ 

168 Counter(col).most_common(1)[0][0] for col in paddedcols 

169 ] 

170 

171 agg_rows.append(aggregated) 

172 

173 cur.execute( 

174 sql_aggregate.format(month).replace( 

175 f'static_{month}_aggregate', f'static_{month}_aggregate')) 

176 

177 if len(agg_rows) == 0: 

178 warnings.warn('no rows to aggregate! ' 

179 f'table: static_{month}_aggregate') 

180 continue 

181 

182 skip_nommsi = np.array(agg_rows, dtype=object) 

183 assert len(skip_nommsi.shape) == 2 

184 skip_nommsi = skip_nommsi[skip_nommsi[:, 0] != None] 

185 assert len(skip_nommsi) > 1 

186 cur.executemany(( 

187 f'INSERT INTO static_{month}_aggregate ' 

188 f"VALUES ({','.join(['?' for _ in range(skip_nommsi.shape[1])])}) " 

189 ), skip_nommsi) 

190 

191 self.commit() 

192 

193 

194# default to local SQLite database 

195DBConn = SQLiteDBConn 

196 

197 

198class PostgresDBConn(_DBConn, psycopg.Connection): 

199 ''' This feature requires optional dependency psycopg for interfacing 

200 Postgres databases. 

201 

202 The following keyword arguments are accepted by Postgres: 

203 | https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS 

204 

205 Alternatively, a connection string may be used. 

206 Information on connection strings and postgres URI format can be found 

207 here: 

208 | https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING 

209 

210 Example: 

211 

212 .. code-block:: python 

213 

214 import os 

215 from aisdb.database.dbconn import PostgresDBConn 

216 

217 # keyword arguments 

218 dbconn = PostgresDBConn( 

219 hostaddr='127.0.0.1', 

220 user='postgres', 

221 port=5432, 

222 password=os.environ.get('POSTGRES_PASSWORD'), 

223 dbname='postgres', 

224 ) 

225 

226 # Alternatively, connect using a connection string: 

227 dbconn = PostgresDBConn('Postgresql://localhost:5433') 

228 

229 ''' 

230 

231 def _set_db_daterange(self): 

232 

233 dynamic_tables_qry = psycopg.sql.SQL( 

234 "select table_name from information_schema.tables " 

235 r"where table_name LIKE 'ais\_______\_dynamic' ORDER BY table_name" 

236 ) 

237 cur = self.cursor() 

238 cur.execute(dynamic_tables_qry) 

239 dynamic_tables = cur.fetchall() 

240 

241 if dynamic_tables != []: 241 ↛ 255line 241 didn't jump to line 255, because the condition on line 241 was never false

242 db_months = sorted([ 

243 table['table_name'].split('_')[1] for table in dynamic_tables 

244 ]) 

245 self.db_daterange = { 

246 'start': 

247 datetime(int(db_months[0][:4]), int(db_months[0][4:]), 

248 1).date(), 

249 'end': 

250 datetime((y := int(db_months[-1][:4])), 

251 (m := int(db_months[-1][4:])), 

252 monthrange(y, m)[1]).date(), 

253 } 

254 else: 

255 self.db_daterange = {} 

256 

257 def __enter__(self): 

258 self.conn.__enter__() 

259 return self 

260 

261 def __exit__(self, exc_class, exc, tb): 

262 self.conn.__exit__(exc_class, exc, tb) 

263 if exc_class or exc or tb: 263 ↛ 264line 263 didn't jump to line 264, because the condition on line 263 was never true

264 print('rolling back...') 

265 raise exc 

266 

267 def __init__(self, libpq_connstring=None, **kwargs): 

268 

269 # store the connection string as an attribute 

270 # this info will be passed to rust when possible 

271 if libpq_connstring is not None: 271 ↛ 272line 271 didn't jump to line 272, because the condition on line 271 was never true

272 self.conn = psycopg.connect(libpq_connstring, 

273 row_factory=psycopg.rows.dict_row) 

274 self.connection_string = libpq_connstring 

275 else: 

276 self.conn = psycopg.connect(row_factory=psycopg.rows.dict_row, 

277 **kwargs) 

278 self.connection_string = 'postgresql://' 

279 

280 if 'user' in kwargs.keys(): 280 ↛ 283line 280 didn't jump to line 283, because the condition on line 280 was never false

281 self.connection_string += kwargs.pop('user') 

282 else: 

283 self.connection_string += 'postgres' 

284 

285 if 'password' in kwargs.keys(): 285 ↛ 288line 285 didn't jump to line 288, because the condition on line 285 was never false

286 self.connection_string += ':' 

287 self.connection_string += kwargs.pop('password') 

288 self.connection_string += '@' 

289 

290 if 'hostaddr' in kwargs.keys(): 290 ↛ 301line 290 didn't jump to line 301, because the condition on line 290 was never false

291 ip = ipaddress.ip_address(kwargs.pop('hostaddr')) 

292 if ip.version == 4: 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true

293 self.connection_string += str(ip) 

294 elif ip.version == 6: 

295 self.connection_string += '[' 

296 self.connection_string += str(ip) 

297 self.connection_string += ']' 

298 else: 

299 raise ValueError(str(ip)) 

300 else: 

301 self.connection_string += 'localhost' 

302 self.connection_string += ':' 

303 

304 if 'port' in kwargs.keys(): 304 ↛ 307line 304 didn't jump to line 307, because the condition on line 304 was never false

305 self.connection_string += str(kwargs.pop('port')) 

306 else: 

307 self.connection_string += '5432' 

308 

309 if 'dbname' in kwargs.keys(): 309 ↛ 310line 309 didn't jump to line 310, because the condition on line 309 was never true

310 self.connection_string += '/' 

311 self.connection_string += kwargs.pop('dbname') 

312 

313 if len(kwargs) > 0: 313 ↛ 314line 313 didn't jump to line 314, because the condition on line 313 was never true

314 self.connection_string += '?' 

315 for key, val in kwargs.items(): 

316 self.connection_string += f'{key}={val}&' 

317 self.connection_string = self.connection_string[:-1] 

318 

319 self.cursor = self.conn.cursor 

320 self.commit = self.conn.commit 

321 self.rollback = self.conn.rollback 

322 self.close = self.conn.close 

323 self.__repr__ = self.conn.__repr__ 

324 #conn = psycopg.connect(conninfo=libpq_connstring) 

325 self.pgconn = self.conn.pgconn 

326 self._adapters = self.conn.adapters 

327 

328 cur = self.cursor() 

329 

330 coarsetype_qry = ("select table_name from information_schema.tables " 

331 "where table_name = 'coarsetype_ref'") 

332 

333 cur.execute(coarsetype_qry) 

334 coarsetype_exists = cur.fetchone() 

335 

336 if not coarsetype_exists: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true

337 self._create_table_coarsetype() 

338 

339 self._set_db_daterange() 

340 

341 def execute(self, sql, args=[]): 

342 sql = re.sub(r'\$[0-9][0-9]*', r'%s', sql) 

343 with self.cursor() as cur: 

344 cur.execute(sql, args) 

345 

346 def rebuild_indexes(self, month, verbose=True): 

347 if verbose: 

348 print(f'indexing {month}...') 

349 dbconn = self.conn 

350 dbconn.execute( 

351 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_mmsi ' 

352 f'ON ais_{month}_dynamic (mmsi)') 

353 dbconn.commit() 

354 if verbose: 

355 print(f'done indexing mmsi: {month}') 

356 dbconn.execute( 

357 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_time ' 

358 f'ON ais_{month}_dynamic (time)') 

359 dbconn.commit() 

360 if verbose: 

361 print(f'done indexing time: {month}') 

362 dbconn.execute( 

363 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_lon ' 

364 f'ON ais_{month}_dynamic (longitude)') 

365 dbconn.commit() 

366 if verbose: 

367 print(f'done indexing longitude: {month}') 

368 dbconn.execute( 

369 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_lat ' 

370 f'ON ais_{month}_dynamic (latitude)') 

371 dbconn.commit() 

372 if verbose: 

373 print(f'done indexing latitude: {month}') 

374 dbconn.execute( 

375 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_cluster ' 

376 f'ON ais_{month}_dynamic (mmsi, time, longitude, latitude, source)' 

377 ) 

378 dbconn.commit() 

379 if verbose: 

380 print(f'done indexing combined index: {month}') 

381 dbconn.execute( 

382 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_static_mmsi ' 

383 f'ON ais_{month}_static (mmsi)') 

384 dbconn.commit() 

385 if verbose: 

386 print(f'done indexing static mmsi: {month}') 

387 dbconn.execute( 

388 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_static_time ' 

389 f'ON ais_{month}_static (time)') 

390 dbconn.commit() 

391 if verbose: 

392 print(f'done indexing static time: {month}') 

393 dbconn.execute( 

394 f'CLUSTER {"VERBOSE" if verbose else ""} ais_{month}_dynamic\n' 

395 f'USING idx_ais_{month}_dynamic_cluster') 

396 dbconn.commit() 

397 if verbose: 

398 print(f'done clustering: {month}') 

399 

400 def deduplicate_dynamic_msgs(self, month: str, verbose=True): 

401 dbconn = self.conn 

402 dbconn.execute(f''' 

403 DELETE FROM ais_{month}_dynamic WHERE ctid IN 

404 (SELECT ctid FROM 

405 (SELECT *, ctid, row_number() OVER 

406 (PARTITION BY mmsi, time, source ORDER BY ctid) 

407 FROM ais_{month}_dynamic ) AS duplicates_{month} 

408 WHERE row_number > 1) 

409 ''') 

410 dbconn.commit() 

411 if verbose: 

412 print(f'done deduplicating: {month}') 

413 

414 def aggregate_static_msgs(self, months_str: list, verbose: bool = True): 

415 ''' collect an aggregate of static vessel reports for each unique MMSI 

416 identifier. The most frequently repeated values for each MMSI will 

417 be kept when multiple different reports appear for the same MMSI 

418 

419 this function should be called every time data is added to the database 

420 

421 args: 

422 months_str (list) 

423 list of strings with format: YYYYmm 

424 verbose (bool) 

425 logs messages to stdout 

426 ''' 

427 

428 cur = self.cursor() 

429 

430 for month in months_str: 

431 # check for monthly tables in dbfiles containing static reports 

432 cur.execute('SELECT table_name FROM information_schema.tables ' 

433 f'WHERE table_name = \'ais_{month}_static\'') 

434 static_tables = cur.fetchall() 

435 if static_tables == []: 435 ↛ 436line 435 didn't jump to line 436, because the condition on line 435 was never true

436 continue 

437 

438 if verbose: 

439 print('aggregating static reports into ' 

440 f'static_{month}_aggregate...') 

441 cur.execute(f'SELECT DISTINCT s.mmsi FROM ais_{month}_static AS s') 

442 mmsi_res = cur.fetchall() 

443 if mmsi_res == []: 

444 mmsis = np.array([], dtype=int) 

445 else: 

446 mmsis = np.array(sorted([r['mmsi'] for r in mmsi_res]), 

447 dtype=int).flatten() 

448 

449 cur.execute( 

450 psycopg.sql.SQL( 

451 f'DROP TABLE IF EXISTS static_{month}_aggregate')) 

452 

453 sql_select = psycopg.sql.SQL(f''' 

454 SELECT 

455 s.mmsi, s.imo, TRIM(vessel_name) as vessel_name, s.ship_type, 

456 s.call_sign, s.dim_bow, s.dim_stern, s.dim_port, s.dim_star, 

457 s.draught 

458 FROM ais_{month}_static AS s WHERE s.mmsi = %s 

459 ''') 

460 

461 agg_rows = [] 

462 for mmsi in mmsis: 

463 _ = cur.execute(sql_select, (str(mmsi), )) 

464 cur_mmsi = [tuple(i.values()) for i in cur.fetchall()] 

465 cols = np.array(cur_mmsi, dtype=object).T 

466 assert len(cols) > 0 

467 

468 filtercols = np.array( 

469 [ 

470 np.array(list(filter(None, col)), dtype=object) 

471 for col in cols 

472 ], 

473 dtype=object, 

474 ) 

475 

476 paddedcols = np.array( 

477 [col if len(col) > 0 else [None] for col in filtercols], 

478 dtype=object, 

479 ) 

480 

481 aggregated = [ 

482 Counter(col).most_common(1)[0][0] for col in paddedcols 

483 ] 

484 

485 agg_rows.append(aggregated) 

486 

487 cur.execute(sql_aggregate.format(month)) 

488 

489 if len(agg_rows) == 0: 

490 warnings.warn('no rows to aggregate! ' 

491 f'table: static_{month}_aggregate') 

492 return 

493 

494 skip_nommsi = np.array(agg_rows, dtype=object) 

495 assert len(skip_nommsi.shape) == 2 

496 skip_nommsi = skip_nommsi[skip_nommsi[:, 0] != None] 

497 assert len(skip_nommsi) > 1 

498 insert_vals = ','.join(['%s' for _ in range(skip_nommsi.shape[1])]) 

499 insert_stmt = psycopg.sql.SQL( 

500 f'INSERT INTO static_{month}_aggregate ' 

501 f'VALUES ({insert_vals})') 

502 cur.executemany(insert_stmt, map(tuple, skip_nommsi)) 

503 

504 self.commit() 

505 

506 

507class ConnectionType(Enum): 

508 ''' database connection types enum. used for static type hints ''' 

509 SQLITE = SQLiteDBConn 

510 POSTGRES = PostgresDBConn