Coverage for aisdb/network_graph.py: 94%

128 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-30 04:22 +0000

1''' 

2 collect vessel transits between domain zones (graph nodes), and aggregate 

3 trajectory statistics within the overall domain 

4''' 

5 

6import os 

7import pickle 

8import re 

9import tempfile 

10import types 

11from datetime import timedelta 

12from functools import partial, reduce 

13 

14import numpy as np 

15import warnings 

16 

17import aisdb 

18from aisdb.database import sqlfcn 

19from aisdb.gis import ( 

20 delta_knots, 

21 delta_meters, 

22 epoch_2_dt, 

23) 

24from aisdb.track_gen import ( 

25 TrackGen, 

26 fence_tracks, 

27 split_timedelta, 

28) 

29from aisdb.denoising_encoder import encode_greatcircledistance 

30from aisdb.database.dbconn import PostgresDBConn, SQLiteDBConn, ConnectionType 

31from aisdb.interp import interp_time 

32from aisdb.proc_util import _sanitize 

33from aisdb.proc_util import _segment_rng 

34from aisdb.webdata.bathymetry import Gebco 

35from aisdb.webdata.marinetraffic import vessel_info, VesselInfo 

36from aisdb.webdata.shore_dist import ShoreDist, PortDist 

37from aisdb.wsa import wetted_surface_area 

38 

39 

40def _fstr(s): 

41 return f'{float(s):.2f}' 

42 

43 

44def _depth_nonnegative(track, zoneset): 

45 ''' returns absolute value of bathymetric depths with topographic heights 

46 converted to 0 

47 ''' 

48 return np.array( 

49 [d if d >= 0 else 0 for d in track['depth_metres'][zoneset]]) 

50 

51 

52def _time_in_shoredist_rng(track, subset, dist0=0.01, dist1=5): 

53 ''' returns minutes spent within kilometers range from shore ''' 

54 return sum(t for t in map( 

55 len, 

56 _segment_rng( 

57 { 

58 'time': 

59 track['time'][subset] 

60 [[dist0 <= d <= dist1 for d in track['km_from_shore'][subset]]] 

61 }, 

62 maxdelta=timedelta(minutes=1), 

63 key='time'), 

64 )) 

65 

66 

67def _staticinfo(track, domain): 

68 ''' collect categorical vessel data as a dictionary ''' 

69 static = {'mmsi': track['mmsi']} 

70 for key in track['marinetraffic_info'].keys(): 

71 if key in static.keys() or key not in track['marinetraffic_info'].keys( 

72 ): 

73 continue 

74 static.update({key: _sanitize(track['marinetraffic_info'][key])}) 

75 for key in ['label', 'hull_submerged_m^2']: 

76 if key in static.keys() or key not in track.keys(): 

77 continue 

78 static.update({key: _sanitize(f'{track[key]:.0f}')}) 

79 return static 

80 

81 

82def _transitinfo(track, zoneset, interp_resolution=timedelta(hours=1)): 

83 ''' aggregate statistics on vessel network graph connectivity ''' 

84 

85 dynamic = {} 

86 

87 # geofencing 

88 dynamic.update( 

89 dict( 

90 src_zone=int(re.sub('[^0-9]', '', track['in_zone'][zoneset][0])), 

91 rcv_zone=int(re.sub('[^0-9]', '', track['in_zone'][zoneset][-1])), 

92 transit_nodes= 

93 f"{track['in_zone'][zoneset][0]}_{track['in_zone'][zoneset][-1]}")) 

94 

95 # timestamp info 

96 dynamic.update( 

97 dict(first_seen_in_zone=epoch_2_dt( 

98 track['time'][zoneset][0]).strftime('%Y-%m-%d %H:%M UTC'), 

99 last_seen_in_zone=epoch_2_dt( 

100 track['time'][zoneset][-1]).strftime('%Y-%m-%d %H:%M UTC'), 

101 year=epoch_2_dt(track['time'][zoneset][0]).year, 

102 month=epoch_2_dt(track['time'][zoneset][0]).month, 

103 day=epoch_2_dt(track['time'][zoneset][0]).day)) 

104 

105 # distance travelled 

106 dynamic.update( 

107 dict(total_distance_meters=np.sum(delta_meters( 

108 track, zoneset[[0, -1]])).astype(int), 

109 cumulative_distance_meters=np.sum(delta_meters( 

110 track, zoneset)).astype(int))) 

111 # shore dist 

112 if 'km_from_shore' in track.keys(): 112 ↛ 134line 112 didn't jump to line 134, because the condition on line 112 was never false

113 dynamic.update( 

114 dict( 

115 min_shore_dist=f"{np.min(track['km_from_shore'][zoneset]):.2f}", 

116 avg_shore_dist= 

117 f"{np.average(track['km_from_shore'][zoneset]):.2f}" 

118 if 'km_from_shore' in track.keys() else None, 

119 max_shore_dist=f"{np.max(track['km_from_shore'][zoneset]):.2f}" 

120 if 'km_from_shore' in track.keys() else None, 

121 )) 

122 # elapsed time in distance from shore 

123 dynamic.update( 

124 dict( 

125 minutes_within_10m_5km_shoredist=_time_in_shoredist_rng( 

126 track, zoneset, 0.01, 5), 

127 minutes_within_30m_20km_shoredist=_time_in_shoredist_rng( 

128 track, zoneset, 0.03, 20), 

129 minutes_within_100m_50km_shoredist=_time_in_shoredist_rng( 

130 track, zoneset, 0.1, 50), 

131 )) 

132 

133 # port dist 

134 if 'km_from_port' in track.keys(): 134 ↛ 145line 134 didn't jump to line 145, because the condition on line 134 was never false

135 dynamic.update( 

136 dict( 

137 min_port_dist=_fstr(np.min(track['km_from_port'][zoneset])), 

138 avg_port_dist=_fstr(np.average(track['km_from_port'][zoneset])) 

139 if 'km_from_port' in track.keys() else None, 

140 max_port_dist=_fstr(np.max(track['km_from_port'][zoneset])) 

141 if 'km_from_port' in track.keys() else None, 

142 )) 

143 

144 # depth charts 

145 if 'depth_metres' in track.keys(): 145 ↛ 157line 145 didn't jump to line 157, because the condition on line 145 was never false

146 dynamic.update( 

147 dict( 

148 min_depth=_fstr(np.min(_depth_nonnegative(track, zoneset))) 

149 if 'depth_metres' in track.keys() else None, 

150 avg_depth=_fstr(np.average(_depth_nonnegative(track, zoneset))) 

151 if 'depth_metres' in track.keys() else None, 

152 max_depth=_fstr(np.max(_depth_nonnegative(track, zoneset))) 

153 if 'depth_metres' in track.keys() else None, 

154 )) 

155 

156 # computed velocity (knots) 

157 dynamic.update( 

158 dict( 

159 velocity_knots_min=f"{np.min(delta_knots(track, zoneset)):.2f}" 

160 if len(zoneset) > 1 else 'NULL', 

161 velocity_knots_avg=f"{np.average(delta_knots(track, zoneset)):.2f}" 

162 if len(zoneset) > 1 else 'NULL', 

163 velocity_knots_max=f"{np.max(delta_knots(track, zoneset)):.2f}" 

164 if len(zoneset) > 1 else 'NULL', 

165 )) 

166 

167 # elapsed time spent in zones 

168 dynamic.update( 

169 dict(minutes_spent_in_zone=_fstr( 

170 (epoch_2_dt(track['time'][zoneset][-1]) - 

171 epoch_2_dt(track['time'][zoneset][0])).total_seconds() / 

172 60) if len(zoneset) > 1 else 'NULL', )) 

173 

174 return dynamic 

175 

176 

177def _serialize_network_edge(tracks, domain, tmp_dir): 

178 ''' at each track position where the zone changes, a transit 

179 index is recorded, and trajectory statistics are aggregated for this 

180 index range using _staticinfo() and _transitinfo() 

181 

182 results will be serialized as binary files labelled by mmsi into the 

183 'tmp_dir' directory, as defined in the config file. see graph() for 

184 deserialization and concatenation of results 

185 

186 args: 

187 tracks: dict 

188 dictionary of vessel trajectory data, as output by 

189 ais.track_gen.TrackGen() or its wrapper functions 

190 

191 returns: None 

192 ''' 

193 for track in tracks: 

194 assert isinstance(track, dict) 

195 assert len(track['time']) > 0 

196 filepath = os.path.join(tmp_dir, str(track['mmsi']).zfill(9)) 

197 assert 'in_zone' in track.keys( 

198 ), 'need to append zone info from fence_tracks' 

199 

200 with open(filepath, 'ab') as f: 

201 transits = np.where( 

202 track['in_zone'][:-1] != track['in_zone'][1:])[0] + 1 

203 

204 for i in range(len(transits) - 1): 204 ↛ 205line 204 didn't jump to line 205, because the loop on line 204 never started

205 rng = np.array(range(transits[i], transits[i + 1] + 1)) 

206 track_stats = _staticinfo(track, domain) 

207 track_stats.update(_transitinfo(track, rng)) 

208 pickle.dump(track_stats, f) 

209 

210 i0 = transits[-1] if len(transits) >= 1 else 0 

211 rng = np.array(range(i0, len(track['in_zone']))) 

212 track_stats = _staticinfo(track, domain) 

213 track_stats.update(_transitinfo(track, rng)) 

214 track_stats['rcv_zone'] = 'NULL' 

215 track_stats['transit_nodes'] = track_stats['src_zone'] 

216 pickle.dump(track_stats, f) 

217 yield 

218 

219 

220def _aggregate_output(outputfile, tmp_dir, filters=[lambda row: False]): 

221 ''' concatenate serialized output from geofence() 

222 

223 args: 

224 outputfile (string) 

225 filepath location to output CSV data 

226 tmp_dir (string) 

227 files will temporarily be placed here while processing 

228 filters (list) 

229 list of callback functions. each callable function should 

230 accept a dictionary describing a network edge as input. if any 

231 return True, the edge will be filtered from the output rows. 

232 see _staticinfo() and _transitinfo() for more info on 

233 network edge dict keys 

234 

235 for example, to filter all rows where the max speed exceeds 50 

236 knots, and filter non-transiting vessels from zone Z0: 

237 

238 >>> filters = [ 

239 ... lambda r: float(r['velocity_knots_max']) > 50, 

240 ... lambda r: r['src_zone'] == '0' and r['rcv_zone'] == 'NULL' 

241 ... ] 

242 ''' 

243 assert os.path.isdir( 

244 os.path.dirname(outputfile)), f'no directory for {outputfile}!' 

245 

246 picklefiles = [ 

247 os.path.join(tmp_dir, fname) for fname in sorted(os.listdir(tmp_dir)) 

248 if '_' not in fname 

249 ] 

250 assert len(picklefiles) > 0, 'failed to geofence any data...' 

251 

252 with open(outputfile, 'w') as output: 

253 with open(picklefiles[0], 'rb') as f0: 

254 getrow = pickle.load(f0) 

255 output.write(','.join(map(str, getrow.keys())) + '\n') 

256 

257 for picklefile in picklefiles: 

258 results = [] 

259 with open(picklefile, 'rb') as f: 

260 while True: 

261 try: 

262 getrow = pickle.load(f) 

263 except EOFError: 

264 break 

265 except Exception as e: 

266 raise e 

267 if not reduce(np.logical_or, 

268 [f(getrow) 

269 for f in filters]): # pragma: no cover 

270 results.append(','.join(map(str, getrow.values()))) 

271 

272 if len(results) == 0: 

273 warnings.warn(f'no results for {outputfile}') 

274 else: 

275 output.write('\n'.join(results) + '\n') 

276 

277 os.remove(picklefile) 

278 

279 

280def graph( 

281 qry, 

282 *, 

283 outputfile, 

284 domain, 

285 dbconn: ConnectionType, 

286 data_dir: str, 

287 trafficDBpath: str or None, # none if using PostgresDBConn 

288 maxdelta: timedelta = timedelta(weeks=1), 

289 speed_threshold: float = 50, 

290 distance_threshold: float = 200000, 

291 interp_delta: float = timedelta(minutes=10), 

292 minscore: float = 0, 

293 qryfcn=sqlfcn.crawl_dynamic_static, 

294 bathy_dir: str = None, 

295 shoredist_raster: str = None, 

296 portdist_raster: str = None, 

297 decimate: float = 0.0001, 

298 verbose: bool = False): 

299 ''' Compute network graph of vessel movements within domain zones. 

300 Zone polygons will be used as network nodes, with graph edges 

301 represented by movements between zones. 

302 

303 args: 

304 qry (:py:class:`aisdb.database.dbqry.DBQuery`) 

305 database query generator 

306 domain (:py:class:`aisdb.gis.Domain`) 

307 collection of zones defined as polygons, these will 

308 be used as nodes in the network graph 

309 dbconn (ConnectionType) 

310 Either a :class:`aisdb.database.dbconn.SQLiteDBConn` or 

311 :class:`aisdb.database.dbconn.PostgresDBConn` database 

312 connection objects 

313 data_dir (string) 

314 location of raster data 

315 trafficDBpath (string) 

316 path to marinetraffic database file 

317 outpufile (string) 

318 filepath for resulting CSV output 

319 maxdelta (datetime.timedelta) 

320 maximum time between vessel messages before considering 

321 it to be part of a new trajectory. See 

322 :func:`aisdb.track_gen.split_timedelta` for more info 

323 speed_threshold (int, float) 

324 maximum speed in knots for encoder segmentation. See 

325 :func:`aisdb.denoising_encoder.encode_greatcircledistance` for 

326 more info 

327 distance_threshold (int, float) 

328 maximum distance in meters for encoder segmentation. See 

329 :func:`aisdb.denoising_encoder.encode_greatcircledistance` for 

330 more info 

331 interp_delta (timedelta) 

332 track positions will be interpolated to the given sample rate 

333 minscore (float) 

334 minimum score for segments to be considered sequential. See 

335 :func:`aisdb.denoising_encoder.encode_greatcircledistance` for 

336 more info 

337 

338 Network graph activity is computed following these steps: 

339 

340 - Create database query with 

341 :meth:`aisdb.database.dbqry.DBQuery.gen_qry`, and supply 

342 resulting generator as rowgen arg. Define a domain 

343 (:class:`aisdb.gis.Domain`) in which to compute movements 

344 - Vectorize tracks using :py:func:`aisdb.track_gen.TrackGen` 

345 - Append vessel metadata to track vessels with 

346 :func:`aisdb.webdata.marinetraffic.vessel_info` 

347 - Segment track vectors where time between messages exceeds 

348 maxdelta using :func:`aisdb.track_gen.split_timedelta` 

349 - Segment track vectors as encoded by 

350 :py:func:`aisdb.denoising_encoder.encode_greatcircledistance` 

351 - Perform geofencing on track segments using 

352 :py:func:`aisdb.track_gen.fence_tracks` to determine zone 

353 containment 

354 - Check where zone boundaries are transited and serialize results 

355 to ``outputfile``. Additional metrics per zone activity is also 

356 aggregated at this step. 

357 

358 Example usage: 

359 

360 >>> import os 

361 >>> import shapely 

362 >>> from datetime import datetime 

363 >>> from aisdb import SQLiteDBConn, DBQuery, Domain, graph, decode_msgs 

364 >>> from aisdb.database.sqlfcn_callbacks import in_bbox_time 

365 

366 >>> # create example database file 

367 >>> dbpath = './example.sqlitedb' 

368 >>> filepaths = ['./aisdb/tests/testdata/test_data_20210701.csv', 

369 ... './aisdb/tests/testdata/test_data_20211101.nm4'] 

370 >>> with SQLiteDBConn(dbpath) as dbconn: 

371 ... decode_msgs(filepaths=filepaths, dbconn=dbconn, 

372 ... source='TESTING', verbose=False) 

373 

374 Next, configure query area using Domain to compute region boundary 

375 

376 >>> zones = [{ 

377 ... 'name': 'Zone1', 

378 ... 'geometry': shapely.geometry.Polygon(zip( 

379 ... [-170.24, -170.24, -38.5, -38.5, -170.24], 

380 ... [29.0, 75.2, 75.2, 29.0, 29.0], 

381 ... )) 

382 ... }] 

383 >>> domain = Domain(name='new_domain', zones=zones) 

384 >>> trafficDBpath = './testdata/marinetraffic_test.db' 

385 >>> data_dir = os.environ.get('AISDBDATADIR', '/tmp/ais/') 

386 

387 Then, query db for points in domain 

388 

389 >>> with SQLiteDBConn(dbpath) as dbconn: 

390 ... qry = DBQuery( 

391 ... dbconn=dbconn, 

392 ... callback=in_bbox_time, 

393 ... start=datetime(2021, 7, 1), 

394 ... end=datetime(2021, 7, 3), 

395 ... **domain.boundary, 

396 ... ) 

397 ... graph(qry, 

398 ... outputfile=os.path.join('testdata', 'test_graph.csv'), 

399 ... dbconn=dbconn, 

400 ... domain=domain, 

401 ... data_dir=data_dir, 

402 ... trafficDBpath=trafficDBpath) 

403 

404 Afterwards, delete the example database file 

405 

406 >>> os.remove(dbpath) 

407 >>> os.remove(os.path.join('testdata', 'test_graph.csv')) 

408 

409 process the vessel movement graph edges. 

410 caution: this may consume a large amount of memory 

411 ''' 

412 assert not isinstance(qry, types.GeneratorType),\ 

413 'Got a generator for "qry" arg instead of DBQuery' 

414 

415 assert isinstance(qry, aisdb.database.dbqry.DBQuery),\ 

416 f'Not a DBQuery object! Got {qry}' 

417 

418 if not isinstance(dbconn, ( 

419 ConnectionType.SQLITE.value, 

420 ConnectionType.POSTGRES.value, 

421 )): 

422 raise ValueError("Invalid dbconn connection type") 

423 if isinstance(dbconn, ConnectionType.SQLITE.value): 

424 assert trafficDBpath is not None 

425 assert isinstance(trafficDBpath, str) 

426 vinfoDB = VesselInfo(trafficDBpath).trafficDB 

427 else: 

428 vinfoDB = dbconn 

429 

430 rowgen = qry.gen_qry(fcn=qryfcn, verbose=verbose) 

431 tracks = TrackGen(rowgen, decimate) 

432 

433 if portdist_raster is not None: 433 ↛ 438line 433 didn't jump to line 438, because the condition on line 433 was never false

434 pdist = PortDist(portdist_raster) 

435 with pdist: 

436 tracks = list(pdist.get_distance(tracks)) 

437 

438 if shoredist_raster is not None: 438 ↛ 444line 438 didn't jump to line 444, because the condition on line 438 was never false

439 sdist_dir, sdist_name = shoredist_raster.rsplit(os.path.sep, 1) 

440 sdist = ShoreDist(sdist_dir, sdist_name) 

441 with sdist: 

442 tracks = list(sdist.get_distance(tracks)) 

443 

444 if bathy_dir is not None: 

445 bathy = Gebco(data_dir=bathy_dir) 

446 with bathy: 

447 tracks = list(bathy.merge_tracks(tracks)) 

448 

449 # initialize raster data sources 

450 if not os.path.isdir('/tmp'): # pragma: no cover 

451 os.mkdir('/tmp') 

452 with tempfile.TemporaryDirectory() as tmp_dir: 

453 if os.environ.get('DEBUG'): 

454 print(f'network graph {tmp_dir = }') 

455 print(f'\n{domain.name=} {domain.boundary=}') 

456 

457 # configure processing pipeline 

458 serialize_CSV = partial(_serialize_network_edge, 

459 domain=domain, 

460 tmp_dir=tmp_dir) 

461 geofence = partial(fence_tracks, domain=domain) 

462 interp = partial(interp_time, step=interp_delta) 

463 encode_tracks = partial(encode_greatcircledistance, 

464 distance_threshold=distance_threshold, 

465 minscore=minscore, 

466 speed_threshold=speed_threshold) 

467 timesplit = partial(split_timedelta, maxdelta=maxdelta) 

468 vinfo = partial(vessel_info, dbconn=vinfoDB) 

469 

470 # pipeline execution order 

471 tracks = vinfo(tracks) 

472 tracks = wetted_surface_area(tracks) 

473 tracks = timesplit(tracks) 

474 tracks = encode_tracks(tracks) 

475 tracks = interp(tracks) 

476 tracks = geofence(tracks) 

477 result = serialize_CSV(tracks) 

478 

479 for res in result: 

480 assert res is None 

481 

482 if os.listdir(tmp_dir) == []: 

483 warnings.warn(f'no data for {outputfile}, skipping...\n') 

484 else: 

485 _aggregate_output(outputfile=outputfile, tmp_dir=tmp_dir)