Coverage for aisdb/interp.py: 100%

15 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-30 04:14 +0000

1''' linear interpolation of track segments on temporal axis ''' 

2 

3from datetime import timedelta 

4 

5import numpy as np 

6import warnings 

7 

8 

9def np_interp_linear(track, key, intervals): 

10 assert len(track['time']) == len(track[key]) 

11 return np.interp(x=intervals.astype(int), 

12 xp=track['time'].astype(int), 

13 fp=track[key].astype(float)) 

14 

15 

16def interp_time(tracks, step=timedelta(minutes=10)): 

17 ''' linear interpolation on vessel trajectory 

18 

19 args: 

20 tracks (dict) 

21 messages sorted by mmsi then time. 

22 uses mmsi as key with columns: time lon lat cog sog name .. etc 

23 step (datetime.timedelta) 

24 interpolation interval 

25 

26 returns: 

27 dictionary of interpolated tracks 

28 ''' 

29 for track in tracks: 

30 if track['time'].size <= 1: 

31 # yield track 

32 warnings.warn('cannot interpolate track of length 1, skipping...') 

33 continue 

34 

35 intervals = np.arange( 

36 start=track['time'][0], 

37 stop=track['time'][-1] + int(step.total_seconds()), 

38 step=int(step.total_seconds()), 

39 ).astype(int) 

40 

41 assert len(intervals) >= 1 

42 

43 itr = dict( 

44 **{k: track[k] 

45 for k in track['static']}, 

46 time=intervals, 

47 static=track['static'], 

48 dynamic=track['dynamic'], 

49 **{ 

50 k: np_interp_linear(track, k, intervals) 

51 for k in track['dynamic'] if k != 'time' 

52 }, 

53 ) 

54 yield itr 

55 

56 return