Coverage for aisdb/interp.py: 100%
15 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:14 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:14 +0000
1''' linear interpolation of track segments on temporal axis '''
3from datetime import timedelta
5import numpy as np
6import warnings
9def np_interp_linear(track, key, intervals):
10 assert len(track['time']) == len(track[key])
11 return np.interp(x=intervals.astype(int),
12 xp=track['time'].astype(int),
13 fp=track[key].astype(float))
16def interp_time(tracks, step=timedelta(minutes=10)):
17 ''' linear interpolation on vessel trajectory
19 args:
20 tracks (dict)
21 messages sorted by mmsi then time.
22 uses mmsi as key with columns: time lon lat cog sog name .. etc
23 step (datetime.timedelta)
24 interpolation interval
26 returns:
27 dictionary of interpolated tracks
28 '''
29 for track in tracks:
30 if track['time'].size <= 1:
31 # yield track
32 warnings.warn('cannot interpolate track of length 1, skipping...')
33 continue
35 intervals = np.arange(
36 start=track['time'][0],
37 stop=track['time'][-1] + int(step.total_seconds()),
38 step=int(step.total_seconds()),
39 ).astype(int)
41 assert len(intervals) >= 1
43 itr = dict(
44 **{k: track[k]
45 for k in track['static']},
46 time=intervals,
47 static=track['static'],
48 dynamic=track['dynamic'],
49 **{
50 k: np_interp_linear(track, k, intervals)
51 for k in track['dynamic'] if k != 'time'
52 },
53 )
54 yield itr
56 return