Coverage for aisdb/proc_util.py: 99%
94 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
1from datetime import datetime, timedelta
2from functools import partial, reduce
3from tempfile import SpooledTemporaryFile
4import csv
5import io
6import os
7import re
8import typing
10import numpy as np
13def _sanitize(s):
14 # note: the first comma uses ASCII code 44,
15 # second comma uses ASCII decimal 130 !!
16 # not the same char!
17 if s is None:
18 return ''
19 elif s == '-':
20 return ''
21 else:
22 return str(s).replace(',', '').replace(chr(130), '').replace(
23 '#', '').replace('"', '').replace("'", '').replace('\n', '')
26def _epoch_2_dt(ep_arr, t0=datetime(1970, 1, 1, 0, 0, 0), unit='seconds'):
27 ''' convert epoch minutes to datetime.datetime.
28 redefinition of function in aisdb.gis to avoid circular import
29 '''
31 delta = lambda ep, unit: t0 + timedelta(**{unit: ep})
33 if isinstance(ep_arr, (list, np.ndarray)):
34 return np.array(list(map(partial(delta, unit=unit), map(int, ep_arr))))
36 elif isinstance(ep_arr, (float, int, np.uint32)):
37 return delta(int(ep_arr), unit=unit)
39 else: # pragma: no cover
40 raise ValueError(
41 f'input must be integer or array of integers. got {ep_arr=}{type(ep_arr)}'
42 )
45def _splits_idx(vector: np.ndarray, d: timedelta) -> np.ndarray:
46 assert isinstance(d, timedelta)
47 vector = np.array(vector, dtype=int)
48 splits = np.nonzero(vector[1:] - vector[:-1] >= d.total_seconds())[0] + 1
49 #else:
50 # splits = np.nonzero(vector[1:] - vector[:-1] >= d)[0] + 1
51 idx = np.append(np.append([0], splits), [vector.size])
52 return idx
55def _segment_rng(track, maxdelta, key='time') -> filter:
56 ''' index time segments '''
57 for rng in map(
58 range,
59 _splits_idx(track[key], maxdelta)[:-1],
60 _splits_idx(track[key], maxdelta)[1:],
61 ):
62 yield rng
65def write_csv_rows(rows,
66 pathname='/data/smith6/ais/scripts/output.csv',
67 mode='a'):
68 with open(pathname, mode) as f:
69 f.write('\n'.join(
70 map(
71 lambda r: ','.join(
72 map(lambda r: r.replace(',', '').replace('#', ''),
73 map(str.rstrip, map(str, r)))), rows)) + '\n')
76def _datetime_column(tracks):
77 for track in tracks:
78 assert isinstance(track, dict), f'got {track=}'
79 track['datetime'] = np.array(
80 _epoch_2_dt(track['time'].astype(int)),
81 dtype=object,
82 )
83 track['dynamic'] = track['dynamic'].union(set(['datetime']))
84 yield track
87_columns_order = [
88 'mmsi', 'imo', 'vessel_name', 'name', 'datetime', 'time', 'lon', 'lat',
89 'cog', 'sog', 'dim_bow', 'dim_stern', 'dim_star', 'dim_port',
90 'coarse_type_txt', 'vesseltype_generic', 'vesseltype_detailed', 'callsign',
91 'flag', 'gross_tonnage', 'summer_dwt', 'length_breadth', 'year_built',
92 'home_port', 'error404'
93]
96def tracks_csv(tracks, skipcols: list = ['label', 'in_zone']):
97 ''' Yields row tuples when given a track generator.
98 See write_csv() for more info
99 '''
100 tracks_dt = _datetime_column(tracks)
101 tr1 = next(tracks_dt)
102 colnames = [
103 c for c in _columns_order + list(
104 set(tr1['static'].union(tr1['dynamic'])) -
105 set(_columns_order).union(set(['marinetraffic_info'])))
106 if c in list(tr1['static']) + list(tr1['dynamic'])
107 ]
108 colnames = [col for col in colnames if col not in skipcols]
110 yield colnames
112 if 'marinetraffic_info' in tr1.keys():
113 colnames += tuple(tr1['marinetraffic_info'].keys())
114 colnames.remove('error404')
115 colnames.remove('dim_bow')
116 colnames.remove('dim_stern')
117 colnames.remove('dim_star')
118 colnames.remove('dim_port')
119 if 'coarse_type_txt' in colnames: # pragma: no cover
120 colnames.remove('coarse_type_txt')
121 if 'vessel_name' in colnames: # pragma: no cover
122 colnames.remove('vessel_name')
123 colnames = list(dict.fromkeys(colnames))
125 decimals = {
126 'lon': 5,
127 'lat': 5,
128 'depth_metres': 2,
129 'distance_metres': 2,
130 'submerged_hull_m^2': 0,
131 }
133 def _append(track, colnames=colnames, decimals=decimals):
134 if 'marinetraffic_info' in track.keys():
135 for key, val in dict(track['marinetraffic_info']).items():
136 if key in ('error404', 'mmsi', 'imo'):
137 continue
138 track[key] = val
139 del track['marinetraffic_info']
141 for i in range(0, track['time'].size): 141 ↛ exitline 141 didn't return from function '_append', because the loop on line 141 didn't complete
142 row = [(track[c][i] if c in track['dynamic'] else
143 (_sanitize(track[c]) if track[c] != 0 else ''))
144 for c in colnames]
145 for ci, r in zip(range(len(colnames)), row):
146 if colnames[ci] in decimals.keys() and r != '':
147 row[ci] = f'{float(r):.{decimals[colnames[ci]]}f}'
149 #writer.writerow(row)
150 return row
152 yield _append(tr1, colnames, decimals)
153 for track in tracks_dt:
154 yield _append(track, colnames, decimals)
157def write_csv(
158 tracks,
159 fpath: typing.Union[io.BytesIO, str, SpooledTemporaryFile],
160 skipcols: list = ['label', 'in_zone'],
161):
162 ''' write track vector dictionaries as CSV file
164 args:
165 tracks (iter)
166 track generator such as returned by
167 :func:`aisdb.track_gen.TrackGen`
168 fpath (string)
169 output CSV filepath (string) or io.BytesIO buffer
170 skipcols (list)
171 columns to be omitted from results
172 '''
174 #with open(fpath, 'w', newline='') as f:
175 if isinstance(fpath, str):
176 f = open(fpath, mode='w')
177 elif isinstance(fpath, (io.BytesIO, SpooledTemporaryFile)):
178 f = io.TextIOWrapper(fpath, encoding='utf8', newline='')
179 else:
180 raise ValueError(f'invalid type for fpath: {type(fpath)}')
182 #with f:
183 #f.write(','.join(colnames) + '\n')
184 writer = csv.writer(f,
185 delimiter=',',
186 quotechar="'",
187 quoting=csv.QUOTE_NONE,
188 dialect='unix')
189 for row in tracks_csv(tracks):
190 writer.writerow(row)
192 if isinstance(fpath, str):
193 f.close()
194 else:
195 # prevent bytesIO buf from being cleaned up with TextIOWrapper
196 f.detach()
198 return
201def glob_files(dirpath, ext='.txt', keyorder=lambda key: key):
202 ''' walk a directory to glob txt files. can be used with ZoneGeomFromTxt()
204 zones_dir: string
205 directory to walk
206 keyorder:
207 anonymous function for custom sort ordering
209 example keyorder:
211 .. code-block::
213 # numeric sort on zone names with strsplit on 'Z' char
214 keyorder=lambda key: int(key.rsplit(os.path.sep, 1)[1].split('.')[0].split('Z')[1])
216 returns:
217 .txt shapefile paths
219 '''
220 paths = list(os.walk(dirpath))
222 extfiles = [[
223 p[0],
224 sorted([f for f in p[2] if f[-len(ext):] == ext], key=keyorder)
225 ] for p in paths if len(p[2]) > 0]
227 extpaths = reduce(np.append, [
228 list(map(os.path.join, (path[0] for p in path[1]), path[1]))
229 for path in extfiles
230 ], np.array([], dtype=object))
232 return sorted(extpaths, key=keyorder)
235def getfiledate(filename):
236 ''' attempt to parse the first valid epoch timestamp from .nm4 data file.
237 timestamp will be returned as :class:`datetime.date` if successful,
238 otherwise will return False if no date could be found
240 args:
241 filename (string)
242 raw AIS data file in .nm4 format
243 '''
244 filesize = os.path.getsize(filename)
245 if filesize == 0: # pragma: no cover
246 return False
247 with open(filename, 'r') as f:
248 if filename.lower()[-3:] == "csv":
249 reader = csv.reader(f)
250 head = next(reader)
251 row1 = next(reader)
252 rowdict = {a: b for a, b in zip(head, row1)}
253 fdate = datetime.strptime(rowdict['Time'], '%Y%m%d_%H%M%S').date()
254 return fdate
255 else:
256 line = f.readline()
257 head = line.rsplit('\\', 1)[0]
258 n = 0
259 while 'c:' not in head: # pragma: no cover
260 n += 1
261 line = f.readline()
262 head = line.rsplit('\\', 1)[0]
263 #if n > 10000:
264 # print(f'bad! {filename}')
265 # return False
266 assert n <= 10000
267 split0 = re.split('c:', head)[1]
268 try:
269 epoch = int(re.split('[^0-9]', split0)[0])
270 except ValueError: # pragma: no cover
271 return False
272 except Exception as err: # pragma: no cover
273 raise err
274 fdate = datetime.fromtimestamp(epoch).date()
275 return fdate