Coverage for aisdb/network_graph.py: 94%
128 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
1'''
2 collect vessel transits between domain zones (graph nodes), and aggregate
3 trajectory statistics within the overall domain
4'''
6import os
7import pickle
8import re
9import tempfile
10import types
11from datetime import timedelta
12from functools import partial, reduce
14import numpy as np
15import warnings
17import aisdb
18from aisdb.database import sqlfcn
19from aisdb.gis import (
20 delta_knots,
21 delta_meters,
22 epoch_2_dt,
23)
24from aisdb.track_gen import (
25 TrackGen,
26 fence_tracks,
27 split_timedelta,
28)
29from aisdb.denoising_encoder import encode_greatcircledistance
30from aisdb.database.dbconn import PostgresDBConn, SQLiteDBConn, ConnectionType
31from aisdb.interp import interp_time
32from aisdb.proc_util import _sanitize
33from aisdb.proc_util import _segment_rng
34from aisdb.webdata.bathymetry import Gebco
35from aisdb.webdata.marinetraffic import vessel_info, VesselInfo
36from aisdb.webdata.shore_dist import ShoreDist, PortDist
37from aisdb.wsa import wetted_surface_area
40def _fstr(s):
41 return f'{float(s):.2f}'
44def _depth_nonnegative(track, zoneset):
45 ''' returns absolute value of bathymetric depths with topographic heights
46 converted to 0
47 '''
48 return np.array(
49 [d if d >= 0 else 0 for d in track['depth_metres'][zoneset]])
52def _time_in_shoredist_rng(track, subset, dist0=0.01, dist1=5):
53 ''' returns minutes spent within kilometers range from shore '''
54 return sum(t for t in map(
55 len,
56 _segment_rng(
57 {
58 'time':
59 track['time'][subset]
60 [[dist0 <= d <= dist1 for d in track['km_from_shore'][subset]]]
61 },
62 maxdelta=timedelta(minutes=1),
63 key='time'),
64 ))
67def _staticinfo(track, domain):
68 ''' collect categorical vessel data as a dictionary '''
69 static = {'mmsi': track['mmsi']}
70 for key in track['marinetraffic_info'].keys():
71 if key in static.keys() or key not in track['marinetraffic_info'].keys(
72 ):
73 continue
74 static.update({key: _sanitize(track['marinetraffic_info'][key])})
75 for key in ['label', 'hull_submerged_m^2']:
76 if key in static.keys() or key not in track.keys():
77 continue
78 static.update({key: _sanitize(f'{track[key]:.0f}')})
79 return static
82def _transitinfo(track, zoneset, interp_resolution=timedelta(hours=1)):
83 ''' aggregate statistics on vessel network graph connectivity '''
85 dynamic = {}
87 # geofencing
88 dynamic.update(
89 dict(
90 src_zone=int(re.sub('[^0-9]', '', track['in_zone'][zoneset][0])),
91 rcv_zone=int(re.sub('[^0-9]', '', track['in_zone'][zoneset][-1])),
92 transit_nodes=
93 f"{track['in_zone'][zoneset][0]}_{track['in_zone'][zoneset][-1]}"))
95 # timestamp info
96 dynamic.update(
97 dict(first_seen_in_zone=epoch_2_dt(
98 track['time'][zoneset][0]).strftime('%Y-%m-%d %H:%M UTC'),
99 last_seen_in_zone=epoch_2_dt(
100 track['time'][zoneset][-1]).strftime('%Y-%m-%d %H:%M UTC'),
101 year=epoch_2_dt(track['time'][zoneset][0]).year,
102 month=epoch_2_dt(track['time'][zoneset][0]).month,
103 day=epoch_2_dt(track['time'][zoneset][0]).day))
105 # distance travelled
106 dynamic.update(
107 dict(total_distance_meters=np.sum(delta_meters(
108 track, zoneset[[0, -1]])).astype(int),
109 cumulative_distance_meters=np.sum(delta_meters(
110 track, zoneset)).astype(int)))
111 # shore dist
112 if 'km_from_shore' in track.keys(): 112 ↛ 134line 112 didn't jump to line 134, because the condition on line 112 was never false
113 dynamic.update(
114 dict(
115 min_shore_dist=f"{np.min(track['km_from_shore'][zoneset]):.2f}",
116 avg_shore_dist=
117 f"{np.average(track['km_from_shore'][zoneset]):.2f}"
118 if 'km_from_shore' in track.keys() else None,
119 max_shore_dist=f"{np.max(track['km_from_shore'][zoneset]):.2f}"
120 if 'km_from_shore' in track.keys() else None,
121 ))
122 # elapsed time in distance from shore
123 dynamic.update(
124 dict(
125 minutes_within_10m_5km_shoredist=_time_in_shoredist_rng(
126 track, zoneset, 0.01, 5),
127 minutes_within_30m_20km_shoredist=_time_in_shoredist_rng(
128 track, zoneset, 0.03, 20),
129 minutes_within_100m_50km_shoredist=_time_in_shoredist_rng(
130 track, zoneset, 0.1, 50),
131 ))
133 # port dist
134 if 'km_from_port' in track.keys(): 134 ↛ 145line 134 didn't jump to line 145, because the condition on line 134 was never false
135 dynamic.update(
136 dict(
137 min_port_dist=_fstr(np.min(track['km_from_port'][zoneset])),
138 avg_port_dist=_fstr(np.average(track['km_from_port'][zoneset]))
139 if 'km_from_port' in track.keys() else None,
140 max_port_dist=_fstr(np.max(track['km_from_port'][zoneset]))
141 if 'km_from_port' in track.keys() else None,
142 ))
144 # depth charts
145 if 'depth_metres' in track.keys(): 145 ↛ 157line 145 didn't jump to line 157, because the condition on line 145 was never false
146 dynamic.update(
147 dict(
148 min_depth=_fstr(np.min(_depth_nonnegative(track, zoneset)))
149 if 'depth_metres' in track.keys() else None,
150 avg_depth=_fstr(np.average(_depth_nonnegative(track, zoneset)))
151 if 'depth_metres' in track.keys() else None,
152 max_depth=_fstr(np.max(_depth_nonnegative(track, zoneset)))
153 if 'depth_metres' in track.keys() else None,
154 ))
156 # computed velocity (knots)
157 dynamic.update(
158 dict(
159 velocity_knots_min=f"{np.min(delta_knots(track, zoneset)):.2f}"
160 if len(zoneset) > 1 else 'NULL',
161 velocity_knots_avg=f"{np.average(delta_knots(track, zoneset)):.2f}"
162 if len(zoneset) > 1 else 'NULL',
163 velocity_knots_max=f"{np.max(delta_knots(track, zoneset)):.2f}"
164 if len(zoneset) > 1 else 'NULL',
165 ))
167 # elapsed time spent in zones
168 dynamic.update(
169 dict(minutes_spent_in_zone=_fstr(
170 (epoch_2_dt(track['time'][zoneset][-1]) -
171 epoch_2_dt(track['time'][zoneset][0])).total_seconds() /
172 60) if len(zoneset) > 1 else 'NULL', ))
174 return dynamic
177def _serialize_network_edge(tracks, domain, tmp_dir):
178 ''' at each track position where the zone changes, a transit
179 index is recorded, and trajectory statistics are aggregated for this
180 index range using _staticinfo() and _transitinfo()
182 results will be serialized as binary files labelled by mmsi into the
183 'tmp_dir' directory, as defined in the config file. see graph() for
184 deserialization and concatenation of results
186 args:
187 tracks: dict
188 dictionary of vessel trajectory data, as output by
189 ais.track_gen.TrackGen() or its wrapper functions
191 returns: None
192 '''
193 for track in tracks:
194 assert isinstance(track, dict)
195 assert len(track['time']) > 0
196 filepath = os.path.join(tmp_dir, str(track['mmsi']).zfill(9))
197 assert 'in_zone' in track.keys(
198 ), 'need to append zone info from fence_tracks'
200 with open(filepath, 'ab') as f:
201 transits = np.where(
202 track['in_zone'][:-1] != track['in_zone'][1:])[0] + 1
204 for i in range(len(transits) - 1): 204 ↛ 205line 204 didn't jump to line 205, because the loop on line 204 never started
205 rng = np.array(range(transits[i], transits[i + 1] + 1))
206 track_stats = _staticinfo(track, domain)
207 track_stats.update(_transitinfo(track, rng))
208 pickle.dump(track_stats, f)
210 i0 = transits[-1] if len(transits) >= 1 else 0
211 rng = np.array(range(i0, len(track['in_zone'])))
212 track_stats = _staticinfo(track, domain)
213 track_stats.update(_transitinfo(track, rng))
214 track_stats['rcv_zone'] = 'NULL'
215 track_stats['transit_nodes'] = track_stats['src_zone']
216 pickle.dump(track_stats, f)
217 yield
220def _aggregate_output(outputfile, tmp_dir, filters=[lambda row: False]):
221 ''' concatenate serialized output from geofence()
223 args:
224 outputfile (string)
225 filepath location to output CSV data
226 tmp_dir (string)
227 files will temporarily be placed here while processing
228 filters (list)
229 list of callback functions. each callable function should
230 accept a dictionary describing a network edge as input. if any
231 return True, the edge will be filtered from the output rows.
232 see _staticinfo() and _transitinfo() for more info on
233 network edge dict keys
235 for example, to filter all rows where the max speed exceeds 50
236 knots, and filter non-transiting vessels from zone Z0:
238 >>> filters = [
239 ... lambda r: float(r['velocity_knots_max']) > 50,
240 ... lambda r: r['src_zone'] == '0' and r['rcv_zone'] == 'NULL'
241 ... ]
242 '''
243 assert os.path.isdir(
244 os.path.dirname(outputfile)), f'no directory for {outputfile}!'
246 picklefiles = [
247 os.path.join(tmp_dir, fname) for fname in sorted(os.listdir(tmp_dir))
248 if '_' not in fname
249 ]
250 assert len(picklefiles) > 0, 'failed to geofence any data...'
252 with open(outputfile, 'w') as output:
253 with open(picklefiles[0], 'rb') as f0:
254 getrow = pickle.load(f0)
255 output.write(','.join(map(str, getrow.keys())) + '\n')
257 for picklefile in picklefiles:
258 results = []
259 with open(picklefile, 'rb') as f:
260 while True:
261 try:
262 getrow = pickle.load(f)
263 except EOFError:
264 break
265 except Exception as e:
266 raise e
267 if not reduce(np.logical_or,
268 [f(getrow)
269 for f in filters]): # pragma: no cover
270 results.append(','.join(map(str, getrow.values())))
272 if len(results) == 0:
273 warnings.warn(f'no results for {outputfile}')
274 else:
275 output.write('\n'.join(results) + '\n')
277 os.remove(picklefile)
280def graph(
281 qry,
282 *,
283 outputfile,
284 domain,
285 dbconn: ConnectionType,
286 data_dir: str,
287 trafficDBpath: str or None, # none if using PostgresDBConn
288 maxdelta: timedelta = timedelta(weeks=1),
289 speed_threshold: float = 50,
290 distance_threshold: float = 200000,
291 interp_delta: float = timedelta(minutes=10),
292 minscore: float = 0,
293 qryfcn=sqlfcn.crawl_dynamic_static,
294 bathy_dir: str = None,
295 shoredist_raster: str = None,
296 portdist_raster: str = None,
297 decimate: float = 0.0001,
298 verbose: bool = False):
299 ''' Compute network graph of vessel movements within domain zones.
300 Zone polygons will be used as network nodes, with graph edges
301 represented by movements between zones.
303 args:
304 qry (:py:class:`aisdb.database.dbqry.DBQuery`)
305 database query generator
306 domain (:py:class:`aisdb.gis.Domain`)
307 collection of zones defined as polygons, these will
308 be used as nodes in the network graph
309 dbconn (ConnectionType)
310 Either a :class:`aisdb.database.dbconn.SQLiteDBConn` or
311 :class:`aisdb.database.dbconn.PostgresDBConn` database
312 connection objects
313 data_dir (string)
314 location of raster data
315 trafficDBpath (string)
316 path to marinetraffic database file
317 outpufile (string)
318 filepath for resulting CSV output
319 maxdelta (datetime.timedelta)
320 maximum time between vessel messages before considering
321 it to be part of a new trajectory. See
322 :func:`aisdb.track_gen.split_timedelta` for more info
323 speed_threshold (int, float)
324 maximum speed in knots for encoder segmentation. See
325 :func:`aisdb.denoising_encoder.encode_greatcircledistance` for
326 more info
327 distance_threshold (int, float)
328 maximum distance in meters for encoder segmentation. See
329 :func:`aisdb.denoising_encoder.encode_greatcircledistance` for
330 more info
331 interp_delta (timedelta)
332 track positions will be interpolated to the given sample rate
333 minscore (float)
334 minimum score for segments to be considered sequential. See
335 :func:`aisdb.denoising_encoder.encode_greatcircledistance` for
336 more info
338 Network graph activity is computed following these steps:
340 - Create database query with
341 :meth:`aisdb.database.dbqry.DBQuery.gen_qry`, and supply
342 resulting generator as rowgen arg. Define a domain
343 (:class:`aisdb.gis.Domain`) in which to compute movements
344 - Vectorize tracks using :py:func:`aisdb.track_gen.TrackGen`
345 - Append vessel metadata to track vessels with
346 :func:`aisdb.webdata.marinetraffic.vessel_info`
347 - Segment track vectors where time between messages exceeds
348 maxdelta using :func:`aisdb.track_gen.split_timedelta`
349 - Segment track vectors as encoded by
350 :py:func:`aisdb.denoising_encoder.encode_greatcircledistance`
351 - Perform geofencing on track segments using
352 :py:func:`aisdb.track_gen.fence_tracks` to determine zone
353 containment
354 - Check where zone boundaries are transited and serialize results
355 to ``outputfile``. Additional metrics per zone activity is also
356 aggregated at this step.
358 Example usage:
360 >>> import os
361 >>> import shapely
362 >>> from datetime import datetime
363 >>> from aisdb import SQLiteDBConn, DBQuery, Domain, graph, decode_msgs
364 >>> from aisdb.database.sqlfcn_callbacks import in_bbox_time
366 >>> # create example database file
367 >>> dbpath = './example.sqlitedb'
368 >>> filepaths = ['./aisdb/tests/testdata/test_data_20210701.csv',
369 ... './aisdb/tests/testdata/test_data_20211101.nm4']
370 >>> with SQLiteDBConn(dbpath) as dbconn:
371 ... decode_msgs(filepaths=filepaths, dbconn=dbconn,
372 ... source='TESTING', verbose=False)
374 Next, configure query area using Domain to compute region boundary
376 >>> zones = [{
377 ... 'name': 'Zone1',
378 ... 'geometry': shapely.geometry.Polygon(zip(
379 ... [-170.24, -170.24, -38.5, -38.5, -170.24],
380 ... [29.0, 75.2, 75.2, 29.0, 29.0],
381 ... ))
382 ... }]
383 >>> domain = Domain(name='new_domain', zones=zones)
384 >>> trafficDBpath = './testdata/marinetraffic_test.db'
385 >>> data_dir = os.environ.get('AISDBDATADIR', '/tmp/ais/')
387 Then, query db for points in domain
389 >>> with SQLiteDBConn(dbpath) as dbconn:
390 ... qry = DBQuery(
391 ... dbconn=dbconn,
392 ... callback=in_bbox_time,
393 ... start=datetime(2021, 7, 1),
394 ... end=datetime(2021, 7, 3),
395 ... **domain.boundary,
396 ... )
397 ... graph(qry,
398 ... outputfile=os.path.join('testdata', 'test_graph.csv'),
399 ... dbconn=dbconn,
400 ... domain=domain,
401 ... data_dir=data_dir,
402 ... trafficDBpath=trafficDBpath)
404 Afterwards, delete the example database file
406 >>> os.remove(dbpath)
407 >>> os.remove(os.path.join('testdata', 'test_graph.csv'))
409 process the vessel movement graph edges.
410 caution: this may consume a large amount of memory
411 '''
412 assert not isinstance(qry, types.GeneratorType),\
413 'Got a generator for "qry" arg instead of DBQuery'
415 assert isinstance(qry, aisdb.database.dbqry.DBQuery),\
416 f'Not a DBQuery object! Got {qry}'
418 if not isinstance(dbconn, (
419 ConnectionType.SQLITE.value,
420 ConnectionType.POSTGRES.value,
421 )):
422 raise ValueError("Invalid dbconn connection type")
423 if isinstance(dbconn, ConnectionType.SQLITE.value):
424 assert trafficDBpath is not None
425 assert isinstance(trafficDBpath, str)
426 vinfoDB = VesselInfo(trafficDBpath).trafficDB
427 else:
428 vinfoDB = dbconn
430 rowgen = qry.gen_qry(fcn=qryfcn, verbose=verbose)
431 tracks = TrackGen(rowgen, decimate)
433 if portdist_raster is not None: 433 ↛ 438line 433 didn't jump to line 438, because the condition on line 433 was never false
434 pdist = PortDist(portdist_raster)
435 with pdist:
436 tracks = list(pdist.get_distance(tracks))
438 if shoredist_raster is not None: 438 ↛ 444line 438 didn't jump to line 444, because the condition on line 438 was never false
439 sdist_dir, sdist_name = shoredist_raster.rsplit(os.path.sep, 1)
440 sdist = ShoreDist(sdist_dir, sdist_name)
441 with sdist:
442 tracks = list(sdist.get_distance(tracks))
444 if bathy_dir is not None:
445 bathy = Gebco(data_dir=bathy_dir)
446 with bathy:
447 tracks = list(bathy.merge_tracks(tracks))
449 # initialize raster data sources
450 if not os.path.isdir('/tmp'): # pragma: no cover
451 os.mkdir('/tmp')
452 with tempfile.TemporaryDirectory() as tmp_dir:
453 if os.environ.get('DEBUG'):
454 print(f'network graph {tmp_dir = }')
455 print(f'\n{domain.name=} {domain.boundary=}')
457 # configure processing pipeline
458 serialize_CSV = partial(_serialize_network_edge,
459 domain=domain,
460 tmp_dir=tmp_dir)
461 geofence = partial(fence_tracks, domain=domain)
462 interp = partial(interp_time, step=interp_delta)
463 encode_tracks = partial(encode_greatcircledistance,
464 distance_threshold=distance_threshold,
465 minscore=minscore,
466 speed_threshold=speed_threshold)
467 timesplit = partial(split_timedelta, maxdelta=maxdelta)
468 vinfo = partial(vessel_info, dbconn=vinfoDB)
470 # pipeline execution order
471 tracks = vinfo(tracks)
472 tracks = wetted_surface_area(tracks)
473 tracks = timesplit(tracks)
474 tracks = encode_tracks(tracks)
475 tracks = interp(tracks)
476 tracks = geofence(tracks)
477 result = serialize_CSV(tracks)
479 for res in result:
480 assert res is None
482 if os.listdir(tmp_dir) == []:
483 warnings.warn(f'no data for {outputfile}, skipping...\n')
484 else:
485 _aggregate_output(outputfile=outputfile, tmp_dir=tmp_dir)