Coverage for aisdb/database/dbqry.py: 86%

100 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-30 04:22 +0000

1''' class to convert a dictionary of input parameters into SQL code, and 

2 generate queries 

3''' 

4 

5from collections import UserDict 

6from datetime import datetime, timedelta, date 

7from functools import reduce 

8import warnings 

9import sqlite3 

10 

11import numpy as np 

12import psycopg 

13 

14from aisdb.database import sqlfcn, sqlfcn_callbacks 

15from aisdb.database.create_tables import sql_createtable_dynamic 

16from aisdb.database.dbconn import PostgresDBConn, SQLiteDBConn 

17from aisdb.webdata.marinetraffic import VesselInfo 

18 

19 

20class DBQuery(UserDict): 

21 ''' A database abstraction allowing the creation of SQL code via arguments 

22 passed to __init__(). Args are stored as a dictionary (UserDict). 

23 

24 Args: 

25 dbconn (:class:`aisdb.database.dbconn.ConnectionType`) 

26 database connection object 

27 callback (function) 

28 anonymous function yielding SQL code specifying "WHERE" 

29 clauses. common queries are included in 

30 :mod:`aisdb.database.sqlfcn_callbacks`, e.g. 

31 >>> from aisdb.database.sqlfcn_callbacks import in_timerange_validmmsi 

32 >>> callback = in_timerange_validmmsi 

33 

34 this generates SQL code to apply filtering on columns (mmsi, 

35 time), and requires (start, end) as arguments in datetime 

36 format. 

37 limit (int) 

38 Optionally limit the database query to a finite number of rows 

39 

40 **kwargs (dict) 

41 more arguments that will be supplied to the query function 

42 and callback function 

43 

44 

45 Custom SQL queries are supported by modifying the fcn supplied to 

46 .gen_qry(), or by supplying a callback function. 

47 Alternatively, the database can also be queried directly, see 

48 dbconn.py for more info 

49 

50 complete example: 

51 

52 >>> import os 

53 >>> from datetime import datetime 

54 >>> from aisdb import SQLiteDBConn, DBQuery, decode_msgs 

55 >>> from aisdb.database.sqlfcn_callbacks import in_timerange_validmmsi 

56 

57 >>> dbpath = './testdata/test.db' 

58 >>> start, end = datetime(2021, 7, 1), datetime(2021, 7, 7) 

59 >>> filepaths = ['aisdb/tests/testdata/test_data_20210701.csv', 'aisdb/tests/testdata/test_data_20211101.nm4'] 

60 >>> with SQLiteDBConn(dbpath) as dbconn: 

61 ... decode_msgs(filepaths=filepaths, dbconn=dbconn, source='TESTING', verbose=False) 

62 ... q = DBQuery(dbconn=dbconn, callback=in_timerange_validmmsi, start=start, end=end) 

63 ... for rows in q.gen_qry(): 

64 ... assert dict(rows[0]) == {'mmsi': 204242000, 'time': 1625176725, 

65 ... 'longitude': -8.93166666667, 'latitude': 41.45, 

66 ... 'sog': 4.0, 'cog': 176.0} 

67 ... break 

68 ''' 

69 

70 # dbpath (string) 

71 # database filepath to connect to 

72 # dbpaths (list) 

73 # optionally pass a list of filepaths instead of a single dbpath 

74 

75 def __init__(self, *, dbconn, dbpath=None, dbpaths=[], **kwargs): 

76 assert isinstance( 

77 dbconn, 

78 (SQLiteDBConn, PostgresDBConn)), 'Invalid database connection' 

79 ''' 

80 if isinstance(dbconn, SQLiteDBConn): 

81 if dbpaths == [] and dbpath is None: 

82 raise ValueError( 

83 'must supply either dbpaths list or dbpath string value') 

84 elif dbpaths == []: # pragma: no cover 

85 dbpaths = [dbpath] 

86 

87 elif isinstance(dbconn, PostgresDBConn): 

88 if dbpath is not None: 

89 raise ValueError( 

90 "the dbpath argument may not be used with a Postgres connection" 

91 ) 

92 else: 

93 raise ValueError("Invalid database connection") 

94 ''' 

95 

96 #for dbpath in dbpaths: 

97 # dbconn._attach(dbpath) 

98 ''' 

99 if isinstance(dbconn, ConnectionType): 

100 raise ValueError('Invalid database connection.' 

101 f' Got: {dbconn}.' 

102 f'Requires: {ConnectionType.SQLITE.value}' 

103 f' or {ConnectionType.POSTGRES.value}') 

104 ''' 

105 

106 self.data = kwargs 

107 self.dbconn = dbconn 

108 self.create_qry_params() 

109 

110 def create_qry_params(self): 

111 assert 'start' in self.data.keys() and 'end' in self.data.keys() 

112 if self.data['start'] >= self.data['end']: 

113 raise ValueError('Start must occur before end') 

114 assert isinstance(self.data['start'], (datetime, date)) 

115 self.data.update({'months': sqlfcn_callbacks.dt2monthstr(**self.data)}) 

116 

117 def _build_tables_sqlite(self, 

118 cur: sqlite3.Cursor, 

119 month: str, 

120 rng_string: str, 

121 reaggregate_static: bool = False, 

122 verbose: bool = False): 

123 # check if static tables exist 

124 cur.execute( 

125 'SELECT * FROM sqlite_master ' 

126 'WHERE type="table" AND name=?', [f'ais_{month}_static']) 

127 if len(cur.fetchall()) == 0: 

128 #sqlite_createtable_staticreport(self.dbconn, month) 

129 warnings.warn(f'No results found in ais_{month}_static') 

130 

131 # check if aggregate tables exist 

132 cur.execute(('SELECT * FROM sqlite_master ' 

133 'WHERE type="table" and name=?'), 

134 [f'static_{month}_aggregate']) 

135 res = cur.fetchall() 

136 

137 if len(res) == 0 or reaggregate_static: 

138 if verbose: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true

139 print(f'building static index for month {month}...', 

140 flush=True) 

141 self.dbconn.aggregate_static_msgs([month], verbose) 

142 

143 # check if dynamic tables exist 

144 cur.execute( 

145 'SELECT * FROM sqlite_master WHERE ' 

146 'type="table" and name=?', [f'ais_{month}_dynamic']) 

147 if len(cur.fetchall()) == 0: 

148 if isinstance(self.dbconn, SQLiteDBConn): 

149 self.dbconn.execute(sql_createtable_dynamic.format(month)) 

150 

151 warnings.warn('No data for selected time range! ' 

152 f'{rng_string}') 

153 

154 def _build_tables_postgres(self, 

155 cur: psycopg.Cursor, 

156 month: str, 

157 rng_string: str, 

158 reaggregate_static: bool = False, 

159 verbose: bool = False): 

160 

161 # check if static tables exist 

162 static_qry = psycopg.sql.SQL(''' 

163 SELECT table_name 

164 FROM information_schema.tables 

165 WHERE information_schema.tables.table_name = {TABLE} 

166 ''').format(TABLE=psycopg.sql.Literal(f'ais_{month}_static')) 

167 cur.execute(static_qry) 

168 count_static = cur.fetchall() 

169 

170 if len(count_static) == 0: 

171 warnings.warn('No static data for selected time range! ' 

172 f'{rng_string}') 

173 

174 # check if aggregate tables exist 

175 cur.execute( 

176 psycopg.sql.SQL(''' 

177 SELECT table_name 

178 FROM information_schema.tables 

179 WHERE table_name = {TABLE} 

180 ''').format(TABLE=psycopg.sql.Literal(f'static_{month}_aggregate'))) 

181 res = cur.fetchall() 

182 

183 if len(res) == 0 or reaggregate_static: 

184 if verbose: 184 ↛ 185line 184 didn't jump to line 185, because the condition on line 184 was never true

185 print(f'building static index for month {month}...', 

186 flush=True) 

187 self.dbconn.aggregate_static_msgs([month], verbose) 

188 

189 # check if dynamic tables exist 

190 cur.execute( 

191 psycopg.sql.SQL(''' 

192 SELECT table_name 

193 FROM information_schema.tables 

194 WHERE table_name = {TABLE} 

195 ''').format(TABLE=psycopg.sql.Literal(f'ais_{month}_dynamic'))) 

196 

197 if len(cur.fetchall()) == 0: # pragma: no cover 

198 #if isinstance(self.dbconn, ConnectionType.SQLITE.value): 

199 # sqlite_createtable_dynamicreport(self.dbconn, month) 

200 warnings.warn('No data for selected time range! ' 

201 f'{rng_string}') 

202 

203 def check_marinetraffic(self, trafficDBpath, boundary, retry_404=False): 

204 ''' scrape metadata for vessels in domain from marinetraffic 

205 

206 args: 

207 trafficDBpath (string) 

208 marinetraffic database path 

209 boundary (dict) 

210 uses keys xmin, xmax, ymin, and ymax to denote the region 

211 of vessels that should be checked. 

212 if using :class:`aisdb.gis.Domain`, the `Domain.boundary` 

213 attribute can be supplied here 

214 ''' 

215 vinfo = VesselInfo(trafficDBpath) 

216 

217 print('retrieving vessel info ', end='', flush=True) 

218 for month in self.data['months']: 

219 # check unique mmsis 

220 sql = ( 

221 'SELECT DISTINCT(mmsi) ' 

222 f'FROM ais_{month}_dynamic AS d WHERE ' 

223 f'{sqlfcn_callbacks.in_validmmsi_bbox(alias="d", **boundary)}') 

224 mmsis = self.dbconn.execute(sql).fetchall() 

225 print('.', end='', flush=True) # first dot 

226 

227 # retrieve vessel metadata 

228 if len(mmsis) > 0: 

229 vinfo.vessel_info_callback(mmsis=np.array(mmsis), 

230 retry_404=retry_404, 

231 infotxt=f'{month} ') 

232 

233 def gen_qry(self, 

234 fcn=sqlfcn.crawl_dynamic, 

235 reaggregate_static=False, 

236 verbose=False): 

237 ''' queries the database using the supplied SQL function. 

238 

239 args: 

240 self (UserDict) 

241 Dictionary containing keyword arguments 

242 fcn (function) 

243 Callback function that will generate SQL code using 

244 the args stored in self 

245 reaggregate_static (bool) 

246 If True, the metadata aggregate tables will be regenerated 

247 from 

248 verbose (bool) 

249 Log info to stdout 

250 

251 yields: 

252 numpy array of rows for each unique MMSI 

253 arrays are sorted by MMSI 

254 rows are sorted by time 

255 ''' 

256 

257 # initialize dbconn, run query 

258 assert 'dbpath' not in self.data.keys() 

259 db_rng = self.dbconn.db_daterange 

260 

261 if not self.dbconn.db_daterange: 

262 if verbose: 

263 print('skipping query (empty database)...') 

264 return 

265 elif self['start'].date() > db_rng['end']: 265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true

266 if verbose: 

267 print('skipping query (out of timerange)...') 

268 return 

269 elif self['end'].date() < db_rng['start']: 

270 if verbose: 

271 print('skipping query (out of timerange)...') 

272 return 

273 

274 assert isinstance(db_rng['start'], date) 

275 assert isinstance(db_rng['end'], date) 

276 

277 cur = self.dbconn.cursor() 

278 for month in self.data['months']: 

279 month_date = datetime(int(month[:4]), int(month[4:]), 1) 

280 qry_start = self["start"] - timedelta(days=self["start"].day) 

281 

282 if not (qry_start <= month_date <= self['end']): 

283 raise ValueError(f'{month_date} not in data range ' 

284 f'({qry_start}->{self["end"]})') 

285 

286 rng_string = f'{db_rng["start"].year}-{db_rng["start"].month:02d}-{db_rng["start"].day:02d}' 

287 rng_string += ' -> ' 

288 rng_string += f'{db_rng["end"].year}-{db_rng["end"].month:02d}-{db_rng["end"].day:02d}' 

289 

290 if isinstance(self.dbconn, SQLiteDBConn): 

291 self._build_tables_sqlite(cur, month, rng_string, 

292 reaggregate_static, verbose) 

293 elif isinstance(self.dbconn, PostgresDBConn): 

294 self._build_tables_postgres(cur, month, rng_string, 

295 reaggregate_static, verbose) 

296 else: 

297 assert False 

298 

299 qry = fcn(**self.data) 

300 

301 if 'limit' in self.data.keys(): 301 ↛ 302line 301 didn't jump to line 302, because the condition on line 301 was never true

302 qry += f'\nLIMIT {self.data["limit"]}' 

303 

304 if verbose: 

305 print(qry) 

306 

307 # get 500k rows at a time, yield sets of rows for each unique MMSI 

308 mmsi_rows: list = [] 

309 dt = datetime.now() 

310 _ = cur.execute(qry) 

311 res: list = cur.fetchmany(10**5) 

312 delta = datetime.now() - dt 

313 

314 if verbose: 

315 print( 

316 f'query time: {delta.total_seconds():.2f}s\nfetching rows...') 

317 if res == []: 

318 # raise SyntaxError(f'no results for query!\n{qry}') 

319 warnings.warn('No results for query!') 

320 

321 while len(res) > 0: 

322 mmsi_rows += res 

323 mmsi_rowvals = np.array([r['mmsi'] for r in mmsi_rows]) 

324 ummsi_idx = np.where(mmsi_rowvals[:-1] != mmsi_rowvals[1:])[0] + 1 

325 ummsi_idx = reduce(np.append, ([0], ummsi_idx, [len(mmsi_rows)])) 

326 for i in range(len(ummsi_idx) - 2): 

327 yield mmsi_rows[ummsi_idx[i]:ummsi_idx[i + 1]] 

328 if len(ummsi_idx) > 2: 328 ↛ 331line 328 didn't jump to line 331, because the condition on line 328 was never false

329 mmsi_rows = mmsi_rows[ummsi_idx[i + 1]:] 

330 

331 res = cur.fetchmany(10**5) 

332 yield mmsi_rows