Coverage for aisdb/database/dbconn.py: 90%
228 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
1''' SQLite Database connection
3 Also see: https://docs.python.org/3/library/sqlite3.html#connection-objects
4'''
6from calendar import monthrange
7from collections import Counter
8from datetime import datetime
9from enum import Enum
10import ipaddress
11import os
12import re
13import warnings
15from aisdb import sqlite3, sqlpath
16from aisdb.database.create_tables import (
17 sql_aggregate,
18 sql_createtable_static,
19)
21import numpy as np
22import psycopg
24with open(os.path.join(sqlpath, 'coarsetype.sql'), 'r') as f:
25 coarsetype_sql = f.read().split(';')
28class _DBConn():
29 ''' AISDB Database connection handler '''
31 def _create_table_coarsetype(self):
32 ''' create a table to describe integer vessel type as a human-readable
33 string.
34 '''
35 #cur = self.cursor()
36 for stmt in coarsetype_sql:
37 if stmt == '\n':
38 continue
39 #cur.execute(stmt)
40 self.execute(stmt)
41 self.commit()
42 #cur.close()
45class SQLiteDBConn(_DBConn, sqlite3.Connection):
46 ''' SQLite3 database connection object
48 attributes:
49 dbpath (str)
50 database filepath
51 db_daterange (dict)
52 temporal range of monthly database tables. keys are DB file
53 names
54 '''
56 def __init__(self, dbpath):
57 super().__init__(
58 dbpath,
59 timeout=5,
60 detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
61 )
62 self.dbpath = dbpath
63 self.row_factory = sqlite3.Row
64 coarsetype_exists_qry = (
65 'SELECT * FROM sqlite_master '
66 r'WHERE type="table" AND name LIKE "coarsetype_ref" ')
67 cur = self.cursor()
68 cur.execute(coarsetype_exists_qry)
69 if len(cur.fetchall()) == 0:
70 self._create_table_coarsetype()
71 self._set_db_daterange()
73 def _set_db_daterange(self):
74 # query the temporal range of monthly database tables
75 # results will be stored as a dictionary attribute db_daterange
76 sql_qry = ('SELECT * FROM sqlite_master '
77 r'WHERE type="table" AND name LIKE "ais_%_dynamic" ')
78 cur = self.cursor()
79 cur.execute(sql_qry)
80 dynamic_tables = cur.fetchall()
81 if dynamic_tables != []:
82 db_months = sorted(
83 [table['name'].split('_')[1] for table in dynamic_tables])
84 self.db_daterange = {
85 'start':
86 datetime(int(db_months[0][:4]), int(db_months[0][4:]),
87 1).date(),
88 'end':
89 datetime((y := int(db_months[-1][:4])),
90 (m := int(db_months[-1][4:])),
91 monthrange(y, m)[1]).date(),
92 }
93 else:
94 self.db_daterange = {}
95 cur.close()
97 def aggregate_static_msgs(self, months_str: list, verbose: bool = True):
98 ''' collect an aggregate of static vessel reports for each unique MMSI
99 identifier. The most frequently repeated values for each MMSI will
100 be kept when multiple different reports appear for the same MMSI
102 this function should be called every time data is added to the database
104 args:
105 dbconn (:class:`aisdb.database.dbconn.SQLiteDBConn`)
106 database connection object
107 months_str (list)
108 list of strings with format: YYYYmm
109 verbose (bool)
110 logs messages to stdout
111 '''
113 assert hasattr(self, 'dbpath')
114 assert not hasattr(self, 'dbpaths')
116 cur = self.cursor()
118 for month in months_str:
119 # check for monthly tables in dbfiles containing static reports
120 cur.execute(
121 'SELECT name FROM sqlite_master '
122 'WHERE type="table" AND name=?', [f'ais_{month}_static'])
123 if cur.fetchall() == []:
124 continue
126 cur.execute(sql_createtable_static.format(month))
128 if verbose:
129 print('aggregating static reports into '
130 f'static_{month}_aggregate...')
131 cur.execute('SELECT DISTINCT s.mmsi FROM '
132 f'ais_{month}_static AS s')
133 mmsis = np.array(cur.fetchall(), dtype=int).flatten()
135 cur.execute('DROP TABLE IF EXISTS '
136 f'static_{month}_aggregate')
138 sql_select = '''
139 SELECT
140 s.mmsi, s.imo, TRIM(vessel_name) as vessel_name, s.ship_type,
141 s.call_sign, s.dim_bow, s.dim_stern, s.dim_port, s.dim_star,
142 s.draught
143 FROM ais_{}_static AS s WHERE s.mmsi = ?
144 '''.format(month)
146 agg_rows = []
147 for mmsi in mmsis:
148 _ = cur.execute(sql_select, (str(mmsi), ))
149 cur_mmsi = cur.fetchall()
151 cols = np.array(cur_mmsi, dtype=object).T
152 assert len(cols) > 0
154 filtercols = np.array(
155 [
156 np.array(list(filter(None, col)), dtype=object)
157 for col in cols
158 ],
159 dtype=object,
160 )
162 paddedcols = np.array(
163 [col if len(col) > 0 else [None] for col in filtercols],
164 dtype=object,
165 )
167 aggregated = [
168 Counter(col).most_common(1)[0][0] for col in paddedcols
169 ]
171 agg_rows.append(aggregated)
173 cur.execute(
174 sql_aggregate.format(month).replace(
175 f'static_{month}_aggregate', f'static_{month}_aggregate'))
177 if len(agg_rows) == 0:
178 warnings.warn('no rows to aggregate! '
179 f'table: static_{month}_aggregate')
180 continue
182 skip_nommsi = np.array(agg_rows, dtype=object)
183 assert len(skip_nommsi.shape) == 2
184 skip_nommsi = skip_nommsi[skip_nommsi[:, 0] != None]
185 assert len(skip_nommsi) > 1
186 cur.executemany((
187 f'INSERT INTO static_{month}_aggregate '
188 f"VALUES ({','.join(['?' for _ in range(skip_nommsi.shape[1])])}) "
189 ), skip_nommsi)
191 self.commit()
194# default to local SQLite database
195DBConn = SQLiteDBConn
198class PostgresDBConn(_DBConn, psycopg.Connection):
199 ''' This feature requires optional dependency psycopg for interfacing
200 Postgres databases.
202 The following keyword arguments are accepted by Postgres:
203 | https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
205 Alternatively, a connection string may be used.
206 Information on connection strings and postgres URI format can be found
207 here:
208 | https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
210 Example:
212 .. code-block:: python
214 import os
215 from aisdb.database.dbconn import PostgresDBConn
217 # keyword arguments
218 dbconn = PostgresDBConn(
219 hostaddr='127.0.0.1',
220 user='postgres',
221 port=5432,
222 password=os.environ.get('POSTGRES_PASSWORD'),
223 dbname='postgres',
224 )
226 # Alternatively, connect using a connection string:
227 dbconn = PostgresDBConn('Postgresql://localhost:5433')
229 '''
231 def _set_db_daterange(self):
233 dynamic_tables_qry = psycopg.sql.SQL(
234 "select table_name from information_schema.tables "
235 r"where table_name LIKE 'ais\_______\_dynamic' ORDER BY table_name"
236 )
237 cur = self.cursor()
238 cur.execute(dynamic_tables_qry)
239 dynamic_tables = cur.fetchall()
241 if dynamic_tables != []: 241 ↛ 255line 241 didn't jump to line 255, because the condition on line 241 was never false
242 db_months = sorted([
243 table['table_name'].split('_')[1] for table in dynamic_tables
244 ])
245 self.db_daterange = {
246 'start':
247 datetime(int(db_months[0][:4]), int(db_months[0][4:]),
248 1).date(),
249 'end':
250 datetime((y := int(db_months[-1][:4])),
251 (m := int(db_months[-1][4:])),
252 monthrange(y, m)[1]).date(),
253 }
254 else:
255 self.db_daterange = {}
257 def __enter__(self):
258 self.conn.__enter__()
259 return self
261 def __exit__(self, exc_class, exc, tb):
262 self.conn.__exit__(exc_class, exc, tb)
263 if exc_class or exc or tb: 263 ↛ 264line 263 didn't jump to line 264, because the condition on line 263 was never true
264 print('rolling back...')
265 raise exc
267 def __init__(self, libpq_connstring=None, **kwargs):
269 # store the connection string as an attribute
270 # this info will be passed to rust when possible
271 if libpq_connstring is not None: 271 ↛ 272line 271 didn't jump to line 272, because the condition on line 271 was never true
272 self.conn = psycopg.connect(libpq_connstring,
273 row_factory=psycopg.rows.dict_row)
274 self.connection_string = libpq_connstring
275 else:
276 self.conn = psycopg.connect(row_factory=psycopg.rows.dict_row,
277 **kwargs)
278 self.connection_string = 'postgresql://'
280 if 'user' in kwargs.keys(): 280 ↛ 283line 280 didn't jump to line 283, because the condition on line 280 was never false
281 self.connection_string += kwargs.pop('user')
282 else:
283 self.connection_string += 'postgres'
285 if 'password' in kwargs.keys(): 285 ↛ 288line 285 didn't jump to line 288, because the condition on line 285 was never false
286 self.connection_string += ':'
287 self.connection_string += kwargs.pop('password')
288 self.connection_string += '@'
290 if 'hostaddr' in kwargs.keys(): 290 ↛ 301line 290 didn't jump to line 301, because the condition on line 290 was never false
291 ip = ipaddress.ip_address(kwargs.pop('hostaddr'))
292 if ip.version == 4: 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true
293 self.connection_string += str(ip)
294 elif ip.version == 6:
295 self.connection_string += '['
296 self.connection_string += str(ip)
297 self.connection_string += ']'
298 else:
299 raise ValueError(str(ip))
300 else:
301 self.connection_string += 'localhost'
302 self.connection_string += ':'
304 if 'port' in kwargs.keys(): 304 ↛ 307line 304 didn't jump to line 307, because the condition on line 304 was never false
305 self.connection_string += str(kwargs.pop('port'))
306 else:
307 self.connection_string += '5432'
309 if 'dbname' in kwargs.keys(): 309 ↛ 310line 309 didn't jump to line 310, because the condition on line 309 was never true
310 self.connection_string += '/'
311 self.connection_string += kwargs.pop('dbname')
313 if len(kwargs) > 0: 313 ↛ 314line 313 didn't jump to line 314, because the condition on line 313 was never true
314 self.connection_string += '?'
315 for key, val in kwargs.items():
316 self.connection_string += f'{key}={val}&'
317 self.connection_string = self.connection_string[:-1]
319 self.cursor = self.conn.cursor
320 self.commit = self.conn.commit
321 self.rollback = self.conn.rollback
322 self.close = self.conn.close
323 self.__repr__ = self.conn.__repr__
324 #conn = psycopg.connect(conninfo=libpq_connstring)
325 self.pgconn = self.conn.pgconn
326 self._adapters = self.conn.adapters
328 cur = self.cursor()
330 coarsetype_qry = ("select table_name from information_schema.tables "
331 "where table_name = 'coarsetype_ref'")
333 cur.execute(coarsetype_qry)
334 coarsetype_exists = cur.fetchone()
336 if not coarsetype_exists: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true
337 self._create_table_coarsetype()
339 self._set_db_daterange()
341 def execute(self, sql, args=[]):
342 sql = re.sub(r'\$[0-9][0-9]*', r'%s', sql)
343 with self.cursor() as cur:
344 cur.execute(sql, args)
346 def rebuild_indexes(self, month, verbose=True):
347 if verbose:
348 print(f'indexing {month}...')
349 dbconn = self.conn
350 dbconn.execute(
351 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_mmsi '
352 f'ON ais_{month}_dynamic (mmsi)')
353 dbconn.commit()
354 if verbose:
355 print(f'done indexing mmsi: {month}')
356 dbconn.execute(
357 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_time '
358 f'ON ais_{month}_dynamic (time)')
359 dbconn.commit()
360 if verbose:
361 print(f'done indexing time: {month}')
362 dbconn.execute(
363 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_lon '
364 f'ON ais_{month}_dynamic (longitude)')
365 dbconn.commit()
366 if verbose:
367 print(f'done indexing longitude: {month}')
368 dbconn.execute(
369 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_lat '
370 f'ON ais_{month}_dynamic (latitude)')
371 dbconn.commit()
372 if verbose:
373 print(f'done indexing latitude: {month}')
374 dbconn.execute(
375 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_dynamic_cluster '
376 f'ON ais_{month}_dynamic (mmsi, time, longitude, latitude, source)'
377 )
378 dbconn.commit()
379 if verbose:
380 print(f'done indexing combined index: {month}')
381 dbconn.execute(
382 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_static_mmsi '
383 f'ON ais_{month}_static (mmsi)')
384 dbconn.commit()
385 if verbose:
386 print(f'done indexing static mmsi: {month}')
387 dbconn.execute(
388 f'CREATE INDEX IF NOT EXISTS idx_ais_{month}_static_time '
389 f'ON ais_{month}_static (time)')
390 dbconn.commit()
391 if verbose:
392 print(f'done indexing static time: {month}')
393 dbconn.execute(
394 f'CLUSTER {"VERBOSE" if verbose else ""} ais_{month}_dynamic\n'
395 f'USING idx_ais_{month}_dynamic_cluster')
396 dbconn.commit()
397 if verbose:
398 print(f'done clustering: {month}')
400 def deduplicate_dynamic_msgs(self, month: str, verbose=True):
401 dbconn = self.conn
402 dbconn.execute(f'''
403 DELETE FROM ais_{month}_dynamic WHERE ctid IN
404 (SELECT ctid FROM
405 (SELECT *, ctid, row_number() OVER
406 (PARTITION BY mmsi, time, source ORDER BY ctid)
407 FROM ais_{month}_dynamic ) AS duplicates_{month}
408 WHERE row_number > 1)
409 ''')
410 dbconn.commit()
411 if verbose:
412 print(f'done deduplicating: {month}')
414 def aggregate_static_msgs(self, months_str: list, verbose: bool = True):
415 ''' collect an aggregate of static vessel reports for each unique MMSI
416 identifier. The most frequently repeated values for each MMSI will
417 be kept when multiple different reports appear for the same MMSI
419 this function should be called every time data is added to the database
421 args:
422 months_str (list)
423 list of strings with format: YYYYmm
424 verbose (bool)
425 logs messages to stdout
426 '''
428 cur = self.cursor()
430 for month in months_str:
431 # check for monthly tables in dbfiles containing static reports
432 cur.execute('SELECT table_name FROM information_schema.tables '
433 f'WHERE table_name = \'ais_{month}_static\'')
434 static_tables = cur.fetchall()
435 if static_tables == []: 435 ↛ 436line 435 didn't jump to line 436, because the condition on line 435 was never true
436 continue
438 if verbose:
439 print('aggregating static reports into '
440 f'static_{month}_aggregate...')
441 cur.execute(f'SELECT DISTINCT s.mmsi FROM ais_{month}_static AS s')
442 mmsi_res = cur.fetchall()
443 if mmsi_res == []:
444 mmsis = np.array([], dtype=int)
445 else:
446 mmsis = np.array(sorted([r['mmsi'] for r in mmsi_res]),
447 dtype=int).flatten()
449 cur.execute(
450 psycopg.sql.SQL(
451 f'DROP TABLE IF EXISTS static_{month}_aggregate'))
453 sql_select = psycopg.sql.SQL(f'''
454 SELECT
455 s.mmsi, s.imo, TRIM(vessel_name) as vessel_name, s.ship_type,
456 s.call_sign, s.dim_bow, s.dim_stern, s.dim_port, s.dim_star,
457 s.draught
458 FROM ais_{month}_static AS s WHERE s.mmsi = %s
459 ''')
461 agg_rows = []
462 for mmsi in mmsis:
463 _ = cur.execute(sql_select, (str(mmsi), ))
464 cur_mmsi = [tuple(i.values()) for i in cur.fetchall()]
465 cols = np.array(cur_mmsi, dtype=object).T
466 assert len(cols) > 0
468 filtercols = np.array(
469 [
470 np.array(list(filter(None, col)), dtype=object)
471 for col in cols
472 ],
473 dtype=object,
474 )
476 paddedcols = np.array(
477 [col if len(col) > 0 else [None] for col in filtercols],
478 dtype=object,
479 )
481 aggregated = [
482 Counter(col).most_common(1)[0][0] for col in paddedcols
483 ]
485 agg_rows.append(aggregated)
487 cur.execute(sql_aggregate.format(month))
489 if len(agg_rows) == 0:
490 warnings.warn('no rows to aggregate! '
491 f'table: static_{month}_aggregate')
492 return
494 skip_nommsi = np.array(agg_rows, dtype=object)
495 assert len(skip_nommsi.shape) == 2
496 skip_nommsi = skip_nommsi[skip_nommsi[:, 0] != None]
497 assert len(skip_nommsi) > 1
498 insert_vals = ','.join(['%s' for _ in range(skip_nommsi.shape[1])])
499 insert_stmt = psycopg.sql.SQL(
500 f'INSERT INTO static_{month}_aggregate '
501 f'VALUES ({insert_vals})')
502 cur.executemany(insert_stmt, map(tuple, skip_nommsi))
504 self.commit()
507class ConnectionType(Enum):
508 ''' database connection types enum. used for static type hints '''
509 SQLITE = SQLiteDBConn
510 POSTGRES = PostgresDBConn