Coverage for aisdb/proc_util.py: 99%

94 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-30 04:22 +0000

1from datetime import datetime, timedelta 

2from functools import partial, reduce 

3from tempfile import SpooledTemporaryFile 

4import csv 

5import io 

6import os 

7import re 

8import typing 

9 

10import numpy as np 

11 

12 

13def _sanitize(s): 

14 # note: the first comma uses ASCII code 44, 

15 # second comma uses ASCII decimal 130 !! 

16 # not the same char! 

17 if s is None: 

18 return '' 

19 elif s == '-': 

20 return '' 

21 else: 

22 return str(s).replace(',', '').replace(chr(130), '').replace( 

23 '#', '').replace('"', '').replace("'", '').replace('\n', '') 

24 

25 

26def _epoch_2_dt(ep_arr, t0=datetime(1970, 1, 1, 0, 0, 0), unit='seconds'): 

27 ''' convert epoch minutes to datetime.datetime. 

28 redefinition of function in aisdb.gis to avoid circular import 

29 ''' 

30 

31 delta = lambda ep, unit: t0 + timedelta(**{unit: ep}) 

32 

33 if isinstance(ep_arr, (list, np.ndarray)): 

34 return np.array(list(map(partial(delta, unit=unit), map(int, ep_arr)))) 

35 

36 elif isinstance(ep_arr, (float, int, np.uint32)): 

37 return delta(int(ep_arr), unit=unit) 

38 

39 else: # pragma: no cover 

40 raise ValueError( 

41 f'input must be integer or array of integers. got {ep_arr=}{type(ep_arr)}' 

42 ) 

43 

44 

45def _splits_idx(vector: np.ndarray, d: timedelta) -> np.ndarray: 

46 assert isinstance(d, timedelta) 

47 vector = np.array(vector, dtype=int) 

48 splits = np.nonzero(vector[1:] - vector[:-1] >= d.total_seconds())[0] + 1 

49 #else: 

50 # splits = np.nonzero(vector[1:] - vector[:-1] >= d)[0] + 1 

51 idx = np.append(np.append([0], splits), [vector.size]) 

52 return idx 

53 

54 

55def _segment_rng(track, maxdelta, key='time') -> filter: 

56 ''' index time segments ''' 

57 for rng in map( 

58 range, 

59 _splits_idx(track[key], maxdelta)[:-1], 

60 _splits_idx(track[key], maxdelta)[1:], 

61 ): 

62 yield rng 

63 

64 

65def write_csv_rows(rows, 

66 pathname='/data/smith6/ais/scripts/output.csv', 

67 mode='a'): 

68 with open(pathname, mode) as f: 

69 f.write('\n'.join( 

70 map( 

71 lambda r: ','.join( 

72 map(lambda r: r.replace(',', '').replace('#', ''), 

73 map(str.rstrip, map(str, r)))), rows)) + '\n') 

74 

75 

76def _datetime_column(tracks): 

77 for track in tracks: 

78 assert isinstance(track, dict), f'got {track=}' 

79 track['datetime'] = np.array( 

80 _epoch_2_dt(track['time'].astype(int)), 

81 dtype=object, 

82 ) 

83 track['dynamic'] = track['dynamic'].union(set(['datetime'])) 

84 yield track 

85 

86 

87_columns_order = [ 

88 'mmsi', 'imo', 'vessel_name', 'name', 'datetime', 'time', 'lon', 'lat', 

89 'cog', 'sog', 'dim_bow', 'dim_stern', 'dim_star', 'dim_port', 

90 'coarse_type_txt', 'vesseltype_generic', 'vesseltype_detailed', 'callsign', 

91 'flag', 'gross_tonnage', 'summer_dwt', 'length_breadth', 'year_built', 

92 'home_port', 'error404' 

93] 

94 

95 

96def tracks_csv(tracks, skipcols: list = ['label', 'in_zone']): 

97 ''' Yields row tuples when given a track generator. 

98 See write_csv() for more info 

99 ''' 

100 tracks_dt = _datetime_column(tracks) 

101 tr1 = next(tracks_dt) 

102 colnames = [ 

103 c for c in _columns_order + list( 

104 set(tr1['static'].union(tr1['dynamic'])) - 

105 set(_columns_order).union(set(['marinetraffic_info']))) 

106 if c in list(tr1['static']) + list(tr1['dynamic']) 

107 ] 

108 colnames = [col for col in colnames if col not in skipcols] 

109 

110 yield colnames 

111 

112 if 'marinetraffic_info' in tr1.keys(): 

113 colnames += tuple(tr1['marinetraffic_info'].keys()) 

114 colnames.remove('error404') 

115 colnames.remove('dim_bow') 

116 colnames.remove('dim_stern') 

117 colnames.remove('dim_star') 

118 colnames.remove('dim_port') 

119 if 'coarse_type_txt' in colnames: # pragma: no cover 

120 colnames.remove('coarse_type_txt') 

121 if 'vessel_name' in colnames: # pragma: no cover 

122 colnames.remove('vessel_name') 

123 colnames = list(dict.fromkeys(colnames)) 

124 

125 decimals = { 

126 'lon': 5, 

127 'lat': 5, 

128 'depth_metres': 2, 

129 'distance_metres': 2, 

130 'submerged_hull_m^2': 0, 

131 } 

132 

133 def _append(track, colnames=colnames, decimals=decimals): 

134 if 'marinetraffic_info' in track.keys(): 

135 for key, val in dict(track['marinetraffic_info']).items(): 

136 if key in ('error404', 'mmsi', 'imo'): 

137 continue 

138 track[key] = val 

139 del track['marinetraffic_info'] 

140 

141 for i in range(0, track['time'].size): 141 ↛ exitline 141 didn't return from function '_append', because the loop on line 141 didn't complete

142 row = [(track[c][i] if c in track['dynamic'] else 

143 (_sanitize(track[c]) if track[c] != 0 else '')) 

144 for c in colnames] 

145 for ci, r in zip(range(len(colnames)), row): 

146 if colnames[ci] in decimals.keys() and r != '': 

147 row[ci] = f'{float(r):.{decimals[colnames[ci]]}f}' 

148 

149 #writer.writerow(row) 

150 return row 

151 

152 yield _append(tr1, colnames, decimals) 

153 for track in tracks_dt: 

154 yield _append(track, colnames, decimals) 

155 

156 

157def write_csv( 

158 tracks, 

159 fpath: typing.Union[io.BytesIO, str, SpooledTemporaryFile], 

160 skipcols: list = ['label', 'in_zone'], 

161): 

162 ''' write track vector dictionaries as CSV file 

163 

164 args: 

165 tracks (iter) 

166 track generator such as returned by 

167 :func:`aisdb.track_gen.TrackGen` 

168 fpath (string) 

169 output CSV filepath (string) or io.BytesIO buffer 

170 skipcols (list) 

171 columns to be omitted from results 

172 ''' 

173 

174 #with open(fpath, 'w', newline='') as f: 

175 if isinstance(fpath, str): 

176 f = open(fpath, mode='w') 

177 elif isinstance(fpath, (io.BytesIO, SpooledTemporaryFile)): 

178 f = io.TextIOWrapper(fpath, encoding='utf8', newline='') 

179 else: 

180 raise ValueError(f'invalid type for fpath: {type(fpath)}') 

181 

182 #with f: 

183 #f.write(','.join(colnames) + '\n') 

184 writer = csv.writer(f, 

185 delimiter=',', 

186 quotechar="'", 

187 quoting=csv.QUOTE_NONE, 

188 dialect='unix') 

189 for row in tracks_csv(tracks): 

190 writer.writerow(row) 

191 

192 if isinstance(fpath, str): 

193 f.close() 

194 else: 

195 # prevent bytesIO buf from being cleaned up with TextIOWrapper 

196 f.detach() 

197 

198 return 

199 

200 

201def glob_files(dirpath, ext='.txt', keyorder=lambda key: key): 

202 ''' walk a directory to glob txt files. can be used with ZoneGeomFromTxt() 

203 

204 zones_dir: string 

205 directory to walk 

206 keyorder: 

207 anonymous function for custom sort ordering 

208 

209 example keyorder: 

210 

211 .. code-block:: 

212 

213 # numeric sort on zone names with strsplit on 'Z' char 

214 keyorder=lambda key: int(key.rsplit(os.path.sep, 1)[1].split('.')[0].split('Z')[1]) 

215 

216 returns: 

217 .txt shapefile paths 

218 

219 ''' 

220 paths = list(os.walk(dirpath)) 

221 

222 extfiles = [[ 

223 p[0], 

224 sorted([f for f in p[2] if f[-len(ext):] == ext], key=keyorder) 

225 ] for p in paths if len(p[2]) > 0] 

226 

227 extpaths = reduce(np.append, [ 

228 list(map(os.path.join, (path[0] for p in path[1]), path[1])) 

229 for path in extfiles 

230 ], np.array([], dtype=object)) 

231 

232 return sorted(extpaths, key=keyorder) 

233 

234 

235def getfiledate(filename): 

236 ''' attempt to parse the first valid epoch timestamp from .nm4 data file. 

237 timestamp will be returned as :class:`datetime.date` if successful, 

238 otherwise will return False if no date could be found 

239 

240 args: 

241 filename (string) 

242 raw AIS data file in .nm4 format 

243 ''' 

244 filesize = os.path.getsize(filename) 

245 if filesize == 0: # pragma: no cover 

246 return False 

247 with open(filename, 'r') as f: 

248 if filename.lower()[-3:] == "csv": 

249 reader = csv.reader(f) 

250 head = next(reader) 

251 row1 = next(reader) 

252 rowdict = {a: b for a, b in zip(head, row1)} 

253 fdate = datetime.strptime(rowdict['Time'], '%Y%m%d_%H%M%S').date() 

254 return fdate 

255 else: 

256 line = f.readline() 

257 head = line.rsplit('\\', 1)[0] 

258 n = 0 

259 while 'c:' not in head: # pragma: no cover 

260 n += 1 

261 line = f.readline() 

262 head = line.rsplit('\\', 1)[0] 

263 #if n > 10000: 

264 # print(f'bad! {filename}') 

265 # return False 

266 assert n <= 10000 

267 split0 = re.split('c:', head)[1] 

268 try: 

269 epoch = int(re.split('[^0-9]', split0)[0]) 

270 except ValueError: # pragma: no cover 

271 return False 

272 except Exception as err: # pragma: no cover 

273 raise err 

274 fdate = datetime.fromtimestamp(epoch).date() 

275 return fdate