Coverage for aisdb/database/dbqry.py: 86%
100 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
1''' class to convert a dictionary of input parameters into SQL code, and
2 generate queries
3'''
5from collections import UserDict
6from datetime import datetime, timedelta, date
7from functools import reduce
8import warnings
9import sqlite3
11import numpy as np
12import psycopg
14from aisdb.database import sqlfcn, sqlfcn_callbacks
15from aisdb.database.create_tables import sql_createtable_dynamic
16from aisdb.database.dbconn import PostgresDBConn, SQLiteDBConn
17from aisdb.webdata.marinetraffic import VesselInfo
20class DBQuery(UserDict):
21 ''' A database abstraction allowing the creation of SQL code via arguments
22 passed to __init__(). Args are stored as a dictionary (UserDict).
24 Args:
25 dbconn (:class:`aisdb.database.dbconn.ConnectionType`)
26 database connection object
27 callback (function)
28 anonymous function yielding SQL code specifying "WHERE"
29 clauses. common queries are included in
30 :mod:`aisdb.database.sqlfcn_callbacks`, e.g.
31 >>> from aisdb.database.sqlfcn_callbacks import in_timerange_validmmsi
32 >>> callback = in_timerange_validmmsi
34 this generates SQL code to apply filtering on columns (mmsi,
35 time), and requires (start, end) as arguments in datetime
36 format.
37 limit (int)
38 Optionally limit the database query to a finite number of rows
40 **kwargs (dict)
41 more arguments that will be supplied to the query function
42 and callback function
45 Custom SQL queries are supported by modifying the fcn supplied to
46 .gen_qry(), or by supplying a callback function.
47 Alternatively, the database can also be queried directly, see
48 dbconn.py for more info
50 complete example:
52 >>> import os
53 >>> from datetime import datetime
54 >>> from aisdb import SQLiteDBConn, DBQuery, decode_msgs
55 >>> from aisdb.database.sqlfcn_callbacks import in_timerange_validmmsi
57 >>> dbpath = './testdata/test.db'
58 >>> start, end = datetime(2021, 7, 1), datetime(2021, 7, 7)
59 >>> filepaths = ['aisdb/tests/testdata/test_data_20210701.csv', 'aisdb/tests/testdata/test_data_20211101.nm4']
60 >>> with SQLiteDBConn(dbpath) as dbconn:
61 ... decode_msgs(filepaths=filepaths, dbconn=dbconn, source='TESTING', verbose=False)
62 ... q = DBQuery(dbconn=dbconn, callback=in_timerange_validmmsi, start=start, end=end)
63 ... for rows in q.gen_qry():
64 ... assert dict(rows[0]) == {'mmsi': 204242000, 'time': 1625176725,
65 ... 'longitude': -8.93166666667, 'latitude': 41.45,
66 ... 'sog': 4.0, 'cog': 176.0}
67 ... break
68 '''
70 # dbpath (string)
71 # database filepath to connect to
72 # dbpaths (list)
73 # optionally pass a list of filepaths instead of a single dbpath
75 def __init__(self, *, dbconn, dbpath=None, dbpaths=[], **kwargs):
76 assert isinstance(
77 dbconn,
78 (SQLiteDBConn, PostgresDBConn)), 'Invalid database connection'
79 '''
80 if isinstance(dbconn, SQLiteDBConn):
81 if dbpaths == [] and dbpath is None:
82 raise ValueError(
83 'must supply either dbpaths list or dbpath string value')
84 elif dbpaths == []: # pragma: no cover
85 dbpaths = [dbpath]
87 elif isinstance(dbconn, PostgresDBConn):
88 if dbpath is not None:
89 raise ValueError(
90 "the dbpath argument may not be used with a Postgres connection"
91 )
92 else:
93 raise ValueError("Invalid database connection")
94 '''
96 #for dbpath in dbpaths:
97 # dbconn._attach(dbpath)
98 '''
99 if isinstance(dbconn, ConnectionType):
100 raise ValueError('Invalid database connection.'
101 f' Got: {dbconn}.'
102 f'Requires: {ConnectionType.SQLITE.value}'
103 f' or {ConnectionType.POSTGRES.value}')
104 '''
106 self.data = kwargs
107 self.dbconn = dbconn
108 self.create_qry_params()
110 def create_qry_params(self):
111 assert 'start' in self.data.keys() and 'end' in self.data.keys()
112 if self.data['start'] >= self.data['end']:
113 raise ValueError('Start must occur before end')
114 assert isinstance(self.data['start'], (datetime, date))
115 self.data.update({'months': sqlfcn_callbacks.dt2monthstr(**self.data)})
117 def _build_tables_sqlite(self,
118 cur: sqlite3.Cursor,
119 month: str,
120 rng_string: str,
121 reaggregate_static: bool = False,
122 verbose: bool = False):
123 # check if static tables exist
124 cur.execute(
125 'SELECT * FROM sqlite_master '
126 'WHERE type="table" AND name=?', [f'ais_{month}_static'])
127 if len(cur.fetchall()) == 0:
128 #sqlite_createtable_staticreport(self.dbconn, month)
129 warnings.warn(f'No results found in ais_{month}_static')
131 # check if aggregate tables exist
132 cur.execute(('SELECT * FROM sqlite_master '
133 'WHERE type="table" and name=?'),
134 [f'static_{month}_aggregate'])
135 res = cur.fetchall()
137 if len(res) == 0 or reaggregate_static:
138 if verbose: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true
139 print(f'building static index for month {month}...',
140 flush=True)
141 self.dbconn.aggregate_static_msgs([month], verbose)
143 # check if dynamic tables exist
144 cur.execute(
145 'SELECT * FROM sqlite_master WHERE '
146 'type="table" and name=?', [f'ais_{month}_dynamic'])
147 if len(cur.fetchall()) == 0:
148 if isinstance(self.dbconn, SQLiteDBConn):
149 self.dbconn.execute(sql_createtable_dynamic.format(month))
151 warnings.warn('No data for selected time range! '
152 f'{rng_string}')
154 def _build_tables_postgres(self,
155 cur: psycopg.Cursor,
156 month: str,
157 rng_string: str,
158 reaggregate_static: bool = False,
159 verbose: bool = False):
161 # check if static tables exist
162 static_qry = psycopg.sql.SQL('''
163 SELECT table_name
164 FROM information_schema.tables
165 WHERE information_schema.tables.table_name = {TABLE}
166 ''').format(TABLE=psycopg.sql.Literal(f'ais_{month}_static'))
167 cur.execute(static_qry)
168 count_static = cur.fetchall()
170 if len(count_static) == 0:
171 warnings.warn('No static data for selected time range! '
172 f'{rng_string}')
174 # check if aggregate tables exist
175 cur.execute(
176 psycopg.sql.SQL('''
177 SELECT table_name
178 FROM information_schema.tables
179 WHERE table_name = {TABLE}
180 ''').format(TABLE=psycopg.sql.Literal(f'static_{month}_aggregate')))
181 res = cur.fetchall()
183 if len(res) == 0 or reaggregate_static:
184 if verbose: 184 ↛ 185line 184 didn't jump to line 185, because the condition on line 184 was never true
185 print(f'building static index for month {month}...',
186 flush=True)
187 self.dbconn.aggregate_static_msgs([month], verbose)
189 # check if dynamic tables exist
190 cur.execute(
191 psycopg.sql.SQL('''
192 SELECT table_name
193 FROM information_schema.tables
194 WHERE table_name = {TABLE}
195 ''').format(TABLE=psycopg.sql.Literal(f'ais_{month}_dynamic')))
197 if len(cur.fetchall()) == 0: # pragma: no cover
198 #if isinstance(self.dbconn, ConnectionType.SQLITE.value):
199 # sqlite_createtable_dynamicreport(self.dbconn, month)
200 warnings.warn('No data for selected time range! '
201 f'{rng_string}')
203 def check_marinetraffic(self, trafficDBpath, boundary, retry_404=False):
204 ''' scrape metadata for vessels in domain from marinetraffic
206 args:
207 trafficDBpath (string)
208 marinetraffic database path
209 boundary (dict)
210 uses keys xmin, xmax, ymin, and ymax to denote the region
211 of vessels that should be checked.
212 if using :class:`aisdb.gis.Domain`, the `Domain.boundary`
213 attribute can be supplied here
214 '''
215 vinfo = VesselInfo(trafficDBpath)
217 print('retrieving vessel info ', end='', flush=True)
218 for month in self.data['months']:
219 # check unique mmsis
220 sql = (
221 'SELECT DISTINCT(mmsi) '
222 f'FROM ais_{month}_dynamic AS d WHERE '
223 f'{sqlfcn_callbacks.in_validmmsi_bbox(alias="d", **boundary)}')
224 mmsis = self.dbconn.execute(sql).fetchall()
225 print('.', end='', flush=True) # first dot
227 # retrieve vessel metadata
228 if len(mmsis) > 0:
229 vinfo.vessel_info_callback(mmsis=np.array(mmsis),
230 retry_404=retry_404,
231 infotxt=f'{month} ')
233 def gen_qry(self,
234 fcn=sqlfcn.crawl_dynamic,
235 reaggregate_static=False,
236 verbose=False):
237 ''' queries the database using the supplied SQL function.
239 args:
240 self (UserDict)
241 Dictionary containing keyword arguments
242 fcn (function)
243 Callback function that will generate SQL code using
244 the args stored in self
245 reaggregate_static (bool)
246 If True, the metadata aggregate tables will be regenerated
247 from
248 verbose (bool)
249 Log info to stdout
251 yields:
252 numpy array of rows for each unique MMSI
253 arrays are sorted by MMSI
254 rows are sorted by time
255 '''
257 # initialize dbconn, run query
258 assert 'dbpath' not in self.data.keys()
259 db_rng = self.dbconn.db_daterange
261 if not self.dbconn.db_daterange:
262 if verbose:
263 print('skipping query (empty database)...')
264 return
265 elif self['start'].date() > db_rng['end']: 265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true
266 if verbose:
267 print('skipping query (out of timerange)...')
268 return
269 elif self['end'].date() < db_rng['start']:
270 if verbose:
271 print('skipping query (out of timerange)...')
272 return
274 assert isinstance(db_rng['start'], date)
275 assert isinstance(db_rng['end'], date)
277 cur = self.dbconn.cursor()
278 for month in self.data['months']:
279 month_date = datetime(int(month[:4]), int(month[4:]), 1)
280 qry_start = self["start"] - timedelta(days=self["start"].day)
282 if not (qry_start <= month_date <= self['end']):
283 raise ValueError(f'{month_date} not in data range '
284 f'({qry_start}->{self["end"]})')
286 rng_string = f'{db_rng["start"].year}-{db_rng["start"].month:02d}-{db_rng["start"].day:02d}'
287 rng_string += ' -> '
288 rng_string += f'{db_rng["end"].year}-{db_rng["end"].month:02d}-{db_rng["end"].day:02d}'
290 if isinstance(self.dbconn, SQLiteDBConn):
291 self._build_tables_sqlite(cur, month, rng_string,
292 reaggregate_static, verbose)
293 elif isinstance(self.dbconn, PostgresDBConn):
294 self._build_tables_postgres(cur, month, rng_string,
295 reaggregate_static, verbose)
296 else:
297 assert False
299 qry = fcn(**self.data)
301 if 'limit' in self.data.keys(): 301 ↛ 302line 301 didn't jump to line 302, because the condition on line 301 was never true
302 qry += f'\nLIMIT {self.data["limit"]}'
304 if verbose:
305 print(qry)
307 # get 500k rows at a time, yield sets of rows for each unique MMSI
308 mmsi_rows: list = []
309 dt = datetime.now()
310 _ = cur.execute(qry)
311 res: list = cur.fetchmany(10**5)
312 delta = datetime.now() - dt
314 if verbose:
315 print(
316 f'query time: {delta.total_seconds():.2f}s\nfetching rows...')
317 if res == []:
318 # raise SyntaxError(f'no results for query!\n{qry}')
319 warnings.warn('No results for query!')
321 while len(res) > 0:
322 mmsi_rows += res
323 mmsi_rowvals = np.array([r['mmsi'] for r in mmsi_rows])
324 ummsi_idx = np.where(mmsi_rowvals[:-1] != mmsi_rowvals[1:])[0] + 1
325 ummsi_idx = reduce(np.append, ([0], ummsi_idx, [len(mmsi_rows)]))
326 for i in range(len(ummsi_idx) - 2):
327 yield mmsi_rows[ummsi_idx[i]:ummsi_idx[i + 1]]
328 if len(ummsi_idx) > 2: 328 ↛ 331line 328 didn't jump to line 331, because the condition on line 328 was never false
329 mmsi_rows = mmsi_rows[ummsi_idx[i + 1]:]
331 res = cur.fetchmany(10**5)
332 yield mmsi_rows