Coverage for aisdb/track_gen.py: 94%
77 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
1''' generation, segmentation, and filtering of vessel trajectories '''
3from functools import reduce
4from datetime import timedelta
5import sqlite3
6import types
8import numpy as np
10from aisdb.aisdb import simplify_linestring_idx
11from aisdb.gis import delta_knots
12from aisdb.proc_util import _segment_rng
13from aisdb import Domain
15staticcols = set([
16 'mmsi', 'vessel_name', 'ship_type', 'ship_type_txt', 'dim_bow',
17 'dim_stern', 'dim_port', 'dim_star', 'imo', 'draught'
18])
21def _segment_longitude(track, tolerance=300):
22 ''' segment track vectors where difference in longitude exceeds 300 degrees
23 '''
25 if len(track['time']) == 1:
26 yield track
27 return
29 diff = np.nonzero(
30 np.abs(track['lon'][1:] - track['lon'][:-1]) > tolerance)[0] + 1
32 if diff.size == 0:
33 assert 'time' in track.keys()
34 yield track
35 return
37 segments_idx = reduce(np.append, ([0], diff, [track['time'].size]))
38 for i in range(segments_idx.size - 1):
39 tracksplit = dict(
40 **{k: track[k]
41 for k in track['static']},
42 **{
43 k: track[k][segments_idx[i]:segments_idx[i + 1]]
44 for k in track['dynamic']
45 },
46 static=track['static'],
47 dynamic=track['dynamic'],
48 )
49 assert 'time' in tracksplit.keys()
50 yield tracksplit
53def _yieldsegments(rows, staticcols, dynamiccols, decimate=0.0001):
54 if decimate is True:
55 decimate = 0.0001
56 lon = np.array([r['longitude'] for r in rows], dtype=float)
57 lat = np.array([r['latitude'] for r in rows], dtype=float)
58 time = np.array([r['time'] for r in rows], dtype=np.uint32)
59 if decimate is not False:
60 idx = simplify_linestring_idx(lon, lat, precision=decimate)
61 else:
62 idx = np.array(range(len(lon)))
63 trackdict = dict(
64 **{col: rows[0][col]
65 for col in staticcols},
66 lon=lon[idx].astype(np.float32),
67 lat=lat[idx].astype(np.float32),
68 time=time[idx],
69 sog=np.array([r['sog'] for r in rows], dtype=np.float32)[idx],
70 cog=np.array([r['cog'] for r in rows], dtype=np.uint32)[idx],
71 static=staticcols,
72 dynamic=dynamiccols,
73 )
74 assert 'time' in trackdict.keys()
76 for segment in _segment_longitude(trackdict):
77 for key in segment['dynamic']:
78 assert len(segment[key]) == len(segment['time'])
79 yield segment
82class EmptyRowsException(Exception):
83 pass
86def TrackGen(rowgen: iter, decimate: False) -> dict:
87 ''' generator converting sets of rows sorted by MMSI to a
88 dictionary containing track column vectors.
89 each row contains columns from database: mmsi time lon lat name ...
90 rows must be sorted by first by mmsi, then time
92 args:
93 rowgen (aisdb.database.dbqry.DBQuery.gen_qry())
94 DBQuery rows generator. Yields rows returned
95 by a database query
96 decimate (bool)
97 if True, linear curve decimation will be applied to reduce
98 the number of unnecessary datapoints
100 yields:
101 dictionary containing track column vectors.
102 static data (e.g. mmsi, name, geometry) will be stored as
103 scalar values
105 >>> import os
106 >>> import numpy as np
107 >>> from datetime import datetime
108 >>> from aisdb import SQLiteDBConn, DBQuery, TrackGen, decode_msgs
109 >>> from aisdb.database import sqlfcn_callbacks
110 >>> # create example database file
111 >>> dbpath = 'track_gen_test.db'
112 >>> filepaths = ['aisdb/tests/testdata/test_data_20210701.csv',
113 ... 'aisdb/tests/testdata/test_data_20211101.nm4']
114 >>> with SQLiteDBConn(dbpath) as dbconn:
115 ... decode_msgs(filepaths, dbconn=dbconn, source='TESTING', verbose=False)
116 ... q = DBQuery(callback=sqlfcn_callbacks.in_timerange_validmmsi,
117 ... dbconn=dbconn,
118 ... start=datetime(2021, 7, 1),
119 ... end=datetime(2021, 7, 7))
120 ... rowgen = q.gen_qry()
121 ... for track in TrackGen(rowgen, decimate=True):
122 ... result = (track['mmsi'], track['lon'], track['lat'], track['time'])
123 ... assert result == (204242000, np.array([-8.931666], dtype=np.float32),
124 ... np.array([41.45], dtype=np.float32), np.array([1625176725], dtype=np.uint32))
125 ... break
127 '''
128 '''
129 >>> os.remove(dbpath)
130 '''
131 firstrow = True
132 assert isinstance(rowgen, types.GeneratorType)
133 for rows in rowgen:
134 if (rows is None or len(rows) == 0):
135 raise EmptyRowsException('rows cannot be empty')
136 assert isinstance(
137 rows[0], (sqlite3.Row, dict)), f'unknown row type: {type(rows[0])}'
138 if firstrow:
139 keys = set(rows[0].keys())
140 static = keys.intersection(set(staticcols))
141 dynamiccols = keys ^ static
142 dynamiccols = dynamiccols.difference(set(['longitude',
143 'latitude']))
144 dynamiccols = dynamiccols.union(set(['lon', 'lat']))
145 firstrow = False
146 for track in _yieldsegments(rows, static, dynamiccols, decimate):
147 yield track
150def split_timedelta(tracks, maxdelta=timedelta(weeks=2)):
151 ''' partitions tracks where delta time exceeds maxdelta
153 args:
154 tracks (aisdb.track_gen.TrackGen)
155 track vectors generator
156 maxdelta (datetime.timedelta)
157 threshold at which tracks should be
158 partitioned
159 '''
160 for track in tracks:
161 for rng in _segment_rng(track, maxdelta):
162 assert len(rng) > 0
163 yield dict(
164 **{k: track[k]
165 for k in track['static']},
166 **{
167 k: np.array(track[k], dtype=type(track[k][0]))[rng]
168 for k in track['dynamic']
169 },
170 static=track['static'],
171 dynamic=track['dynamic'],
172 )
175def fence_tracks(tracks, domain):
176 ''' compute points-in-polygons for vessel positions within domain polygons
178 yields track dictionaries
180 Also see zone_mask()
181 '''
182 assert isinstance(domain, Domain), 'Not a domain object'
184 for track in tracks:
185 assert isinstance(track, dict)
186 if 'in_zone' not in track.keys(): 186 ↛ 195line 186 didn't jump to line 195, because the condition on line 186 was never false
187 track['in_zone'] = np.array(
188 [
189 domain.point_in_polygon(x, y)
190 for x, y in zip(track['lon'], track['lat'])
191 ],
192 dtype=object,
193 )
194 track['dynamic'] = set(track['dynamic']).union(set(['in_zone']))
195 yield track
198def zone_mask(tracks, domain):
199 ''' compute points-in-polygons for track positions, and filter results to
200 positions within domain.
202 yields track dictionaries.
204 also see fence_tracks()
205 '''
206 for track in fence_tracks(tracks, domain):
207 mask = track['in_zone'] != 'Z0'
208 yield dict(
209 **{k: track[k]
210 for k in track['static']},
211 **{k: track[k][mask]
212 for k in track['dynamic']},
213 static=track['static'],
214 dynamic=track['dynamic'],
215 )
218def min_speed_filter(tracks, minspeed):
219 for track in tracks:
220 if len(track['time']) == 1:
221 yield track
222 continue
223 deltas = delta_knots(track)
224 deltas = np.append(deltas, [deltas[-1]])
225 mask = deltas >= minspeed
226 yield dict(
227 **{k: track[k]
228 for k in track['static']},
229 **{k: track[k][mask]
230 for k in track['dynamic']},
231 static=track['static'],
232 dynamic=track['dynamic'],
233 )