Coverage for aisdb/track_gen.py: 94%

77 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-30 04:22 +0000

1''' generation, segmentation, and filtering of vessel trajectories ''' 

2 

3from functools import reduce 

4from datetime import timedelta 

5import sqlite3 

6import types 

7 

8import numpy as np 

9 

10from aisdb.aisdb import simplify_linestring_idx 

11from aisdb.gis import delta_knots 

12from aisdb.proc_util import _segment_rng 

13from aisdb import Domain 

14 

15staticcols = set([ 

16 'mmsi', 'vessel_name', 'ship_type', 'ship_type_txt', 'dim_bow', 

17 'dim_stern', 'dim_port', 'dim_star', 'imo', 'draught' 

18]) 

19 

20 

21def _segment_longitude(track, tolerance=300): 

22 ''' segment track vectors where difference in longitude exceeds 300 degrees 

23 ''' 

24 

25 if len(track['time']) == 1: 

26 yield track 

27 return 

28 

29 diff = np.nonzero( 

30 np.abs(track['lon'][1:] - track['lon'][:-1]) > tolerance)[0] + 1 

31 

32 if diff.size == 0: 

33 assert 'time' in track.keys() 

34 yield track 

35 return 

36 

37 segments_idx = reduce(np.append, ([0], diff, [track['time'].size])) 

38 for i in range(segments_idx.size - 1): 

39 tracksplit = dict( 

40 **{k: track[k] 

41 for k in track['static']}, 

42 **{ 

43 k: track[k][segments_idx[i]:segments_idx[i + 1]] 

44 for k in track['dynamic'] 

45 }, 

46 static=track['static'], 

47 dynamic=track['dynamic'], 

48 ) 

49 assert 'time' in tracksplit.keys() 

50 yield tracksplit 

51 

52 

53def _yieldsegments(rows, staticcols, dynamiccols, decimate=0.0001): 

54 if decimate is True: 

55 decimate = 0.0001 

56 lon = np.array([r['longitude'] for r in rows], dtype=float) 

57 lat = np.array([r['latitude'] for r in rows], dtype=float) 

58 time = np.array([r['time'] for r in rows], dtype=np.uint32) 

59 if decimate is not False: 

60 idx = simplify_linestring_idx(lon, lat, precision=decimate) 

61 else: 

62 idx = np.array(range(len(lon))) 

63 trackdict = dict( 

64 **{col: rows[0][col] 

65 for col in staticcols}, 

66 lon=lon[idx].astype(np.float32), 

67 lat=lat[idx].astype(np.float32), 

68 time=time[idx], 

69 sog=np.array([r['sog'] for r in rows], dtype=np.float32)[idx], 

70 cog=np.array([r['cog'] for r in rows], dtype=np.uint32)[idx], 

71 static=staticcols, 

72 dynamic=dynamiccols, 

73 ) 

74 assert 'time' in trackdict.keys() 

75 

76 for segment in _segment_longitude(trackdict): 

77 for key in segment['dynamic']: 

78 assert len(segment[key]) == len(segment['time']) 

79 yield segment 

80 

81 

82class EmptyRowsException(Exception): 

83 pass 

84 

85 

86def TrackGen(rowgen: iter, decimate: False) -> dict: 

87 ''' generator converting sets of rows sorted by MMSI to a 

88 dictionary containing track column vectors. 

89 each row contains columns from database: mmsi time lon lat name ... 

90 rows must be sorted by first by mmsi, then time 

91 

92 args: 

93 rowgen (aisdb.database.dbqry.DBQuery.gen_qry()) 

94 DBQuery rows generator. Yields rows returned 

95 by a database query 

96 decimate (bool) 

97 if True, linear curve decimation will be applied to reduce 

98 the number of unnecessary datapoints 

99 

100 yields: 

101 dictionary containing track column vectors. 

102 static data (e.g. mmsi, name, geometry) will be stored as 

103 scalar values 

104 

105 >>> import os 

106 >>> import numpy as np 

107 >>> from datetime import datetime 

108 >>> from aisdb import SQLiteDBConn, DBQuery, TrackGen, decode_msgs 

109 >>> from aisdb.database import sqlfcn_callbacks 

110 >>> # create example database file 

111 >>> dbpath = 'track_gen_test.db' 

112 >>> filepaths = ['aisdb/tests/testdata/test_data_20210701.csv', 

113 ... 'aisdb/tests/testdata/test_data_20211101.nm4'] 

114 >>> with SQLiteDBConn(dbpath) as dbconn: 

115 ... decode_msgs(filepaths, dbconn=dbconn, source='TESTING', verbose=False) 

116 ... q = DBQuery(callback=sqlfcn_callbacks.in_timerange_validmmsi, 

117 ... dbconn=dbconn, 

118 ... start=datetime(2021, 7, 1), 

119 ... end=datetime(2021, 7, 7)) 

120 ... rowgen = q.gen_qry() 

121 ... for track in TrackGen(rowgen, decimate=True): 

122 ... result = (track['mmsi'], track['lon'], track['lat'], track['time']) 

123 ... assert result == (204242000, np.array([-8.931666], dtype=np.float32), 

124 ... np.array([41.45], dtype=np.float32), np.array([1625176725], dtype=np.uint32)) 

125 ... break 

126 

127 ''' 

128 ''' 

129 >>> os.remove(dbpath) 

130 ''' 

131 firstrow = True 

132 assert isinstance(rowgen, types.GeneratorType) 

133 for rows in rowgen: 

134 if (rows is None or len(rows) == 0): 

135 raise EmptyRowsException('rows cannot be empty') 

136 assert isinstance( 

137 rows[0], (sqlite3.Row, dict)), f'unknown row type: {type(rows[0])}' 

138 if firstrow: 

139 keys = set(rows[0].keys()) 

140 static = keys.intersection(set(staticcols)) 

141 dynamiccols = keys ^ static 

142 dynamiccols = dynamiccols.difference(set(['longitude', 

143 'latitude'])) 

144 dynamiccols = dynamiccols.union(set(['lon', 'lat'])) 

145 firstrow = False 

146 for track in _yieldsegments(rows, static, dynamiccols, decimate): 

147 yield track 

148 

149 

150def split_timedelta(tracks, maxdelta=timedelta(weeks=2)): 

151 ''' partitions tracks where delta time exceeds maxdelta 

152 

153 args: 

154 tracks (aisdb.track_gen.TrackGen) 

155 track vectors generator 

156 maxdelta (datetime.timedelta) 

157 threshold at which tracks should be 

158 partitioned 

159 ''' 

160 for track in tracks: 

161 for rng in _segment_rng(track, maxdelta): 

162 assert len(rng) > 0 

163 yield dict( 

164 **{k: track[k] 

165 for k in track['static']}, 

166 **{ 

167 k: np.array(track[k], dtype=type(track[k][0]))[rng] 

168 for k in track['dynamic'] 

169 }, 

170 static=track['static'], 

171 dynamic=track['dynamic'], 

172 ) 

173 

174 

175def fence_tracks(tracks, domain): 

176 ''' compute points-in-polygons for vessel positions within domain polygons 

177 

178 yields track dictionaries 

179 

180 Also see zone_mask() 

181 ''' 

182 assert isinstance(domain, Domain), 'Not a domain object' 

183 

184 for track in tracks: 

185 assert isinstance(track, dict) 

186 if 'in_zone' not in track.keys(): 186 ↛ 195line 186 didn't jump to line 195, because the condition on line 186 was never false

187 track['in_zone'] = np.array( 

188 [ 

189 domain.point_in_polygon(x, y) 

190 for x, y in zip(track['lon'], track['lat']) 

191 ], 

192 dtype=object, 

193 ) 

194 track['dynamic'] = set(track['dynamic']).union(set(['in_zone'])) 

195 yield track 

196 

197 

198def zone_mask(tracks, domain): 

199 ''' compute points-in-polygons for track positions, and filter results to 

200 positions within domain. 

201 

202 yields track dictionaries. 

203 

204 also see fence_tracks() 

205 ''' 

206 for track in fence_tracks(tracks, domain): 

207 mask = track['in_zone'] != 'Z0' 

208 yield dict( 

209 **{k: track[k] 

210 for k in track['static']}, 

211 **{k: track[k][mask] 

212 for k in track['dynamic']}, 

213 static=track['static'], 

214 dynamic=track['dynamic'], 

215 ) 

216 

217 

218def min_speed_filter(tracks, minspeed): 

219 for track in tracks: 

220 if len(track['time']) == 1: 

221 yield track 

222 continue 

223 deltas = delta_knots(track) 

224 deltas = np.append(deltas, [deltas[-1]]) 

225 mask = deltas >= minspeed 

226 yield dict( 

227 **{k: track[k] 

228 for k in track['static']}, 

229 **{k: track[k][mask] 

230 for k in track['dynamic']}, 

231 static=track['static'], 

232 dynamic=track['dynamic'], 

233 )