Source code for aisdb.proc_util

from datetime import datetime, timedelta
from functools import partial, reduce
from tempfile import SpooledTemporaryFile
import csv
import io
import os
import re
import typing

import numpy as np


def _sanitize(s):
    # note: the first comma uses ASCII code 44,
    # second comma uses ASCII decimal 130 !!
    # not the same char!
    if s is None:
        return ''
    elif s == '-':
        return ''
    else:
        return str(s).replace(',', '').replace(chr(130), '').replace(
            '#', '').replace('"', '').replace("'", '').replace('\n', '')


def _epoch_2_dt(ep_arr, t0=datetime(1970, 1, 1, 0, 0, 0), unit='seconds'):
    ''' convert epoch minutes to datetime.datetime.
        redefinition of function in aisdb.gis to avoid circular import
    '''

    delta = lambda ep, unit: t0 + timedelta(**{unit: ep})

    if isinstance(ep_arr, (list, np.ndarray)):
        return np.array(list(map(partial(delta, unit=unit), map(int, ep_arr))))

    elif isinstance(ep_arr, (float, int, np.uint32)):
        return delta(int(ep_arr), unit=unit)

    else:  # pragma: no cover
        raise ValueError(
            f'input must be integer or array of integers. got {ep_arr=}{type(ep_arr)}'
        )


def _splits_idx(vector: np.ndarray, d: timedelta) -> np.ndarray:
    assert isinstance(d, timedelta)
    vector = np.array(vector, dtype=int)
    splits = np.nonzero(vector[1:] - vector[:-1] >= d.total_seconds())[0] + 1
    #else:
    #    splits = np.nonzero(vector[1:] - vector[:-1] >= d)[0] + 1
    idx = np.append(np.append([0], splits), [vector.size])
    return idx


def _segment_rng(track, maxdelta, key='time') -> filter:
    ''' index time segments '''
    for rng in map(
            range,
            _splits_idx(track[key], maxdelta)[:-1],
            _splits_idx(track[key], maxdelta)[1:],
    ):
        yield rng


[docs] def write_csv_rows(rows, pathname='/data/smith6/ais/scripts/output.csv', mode='a'): with open(pathname, mode) as f: f.write('\n'.join( map( lambda r: ','.join( map(lambda r: r.replace(',', '').replace('#', ''), map(str.rstrip, map(str, r)))), rows)) + '\n')
def _datetime_column(tracks): for track in tracks: assert isinstance(track, dict), f'got {track=}' track['datetime'] = np.array( _epoch_2_dt(track['time'].astype(int)), dtype=object, ) track['dynamic'] = track['dynamic'].union(set(['datetime'])) yield track _columns_order = [ 'mmsi', 'imo', 'vessel_name', 'name', 'datetime', 'time', 'lon', 'lat', 'cog', 'sog', 'dim_bow', 'dim_stern', 'dim_star', 'dim_port', 'coarse_type_txt', 'vesseltype_generic', 'vesseltype_detailed', 'callsign', 'flag', 'gross_tonnage', 'summer_dwt', 'length_breadth', 'year_built', 'home_port', 'error404' ]
[docs] def tracks_csv(tracks, skipcols: list = ['label', 'in_zone']): ''' Yields row tuples when given a track generator. See write_csv() for more info ''' tracks_dt = _datetime_column(tracks) tr1 = next(tracks_dt) colnames = [ c for c in _columns_order + list( set(tr1['static'].union(tr1['dynamic'])) - set(_columns_order).union(set(['marinetraffic_info']))) if c in list(tr1['static']) + list(tr1['dynamic']) ] colnames = [col for col in colnames if col not in skipcols] yield colnames if 'marinetraffic_info' in tr1.keys(): colnames += tuple(tr1['marinetraffic_info'].keys()) colnames.remove('error404') colnames.remove('dim_bow') colnames.remove('dim_stern') colnames.remove('dim_star') colnames.remove('dim_port') if 'coarse_type_txt' in colnames: # pragma: no cover colnames.remove('coarse_type_txt') if 'vessel_name' in colnames: # pragma: no cover colnames.remove('vessel_name') colnames = list(dict.fromkeys(colnames)) decimals = { 'lon': 5, 'lat': 5, 'depth_metres': 2, 'distance_metres': 2, 'submerged_hull_m^2': 0, } def _append(track, colnames=colnames, decimals=decimals): if 'marinetraffic_info' in track.keys(): for key, val in dict(track['marinetraffic_info']).items(): if key in ('error404', 'mmsi', 'imo'): continue track[key] = val del track['marinetraffic_info'] for i in range(0, track['time'].size): row = [(track[c][i] if c in track['dynamic'] else (_sanitize(track[c]) if track[c] != 0 else '')) for c in colnames] for ci, r in zip(range(len(colnames)), row): if colnames[ci] in decimals.keys() and r != '': row[ci] = f'{float(r):.{decimals[colnames[ci]]}f}' #writer.writerow(row) return row yield _append(tr1, colnames, decimals) for track in tracks_dt: yield _append(track, colnames, decimals)
[docs] def write_csv( tracks, fpath: typing.Union[io.BytesIO, str, SpooledTemporaryFile], skipcols: list = ['label', 'in_zone'], ): ''' write track vector dictionaries as CSV file args: tracks (iter) track generator such as returned by :func:`aisdb.track_gen.TrackGen` fpath (string) output CSV filepath (string) or io.BytesIO buffer skipcols (list) columns to be omitted from results ''' #with open(fpath, 'w', newline='') as f: if isinstance(fpath, str): f = open(fpath, mode='w') elif isinstance(fpath, (io.BytesIO, SpooledTemporaryFile)): f = io.TextIOWrapper(fpath, encoding='utf8', newline='') else: raise ValueError(f'invalid type for fpath: {type(fpath)}') #with f: #f.write(','.join(colnames) + '\n') writer = csv.writer(f, delimiter=',', quotechar="'", quoting=csv.QUOTE_NONE, dialect='unix') for row in tracks_csv(tracks): writer.writerow(row) if isinstance(fpath, str): f.close() else: # prevent bytesIO buf from being cleaned up with TextIOWrapper f.detach() return
[docs] def glob_files(dirpath, ext='.txt', keyorder=lambda key: key): ''' walk a directory to glob txt files. can be used with ZoneGeomFromTxt() zones_dir: string directory to walk keyorder: anonymous function for custom sort ordering example keyorder: .. code-block:: # numeric sort on zone names with strsplit on 'Z' char keyorder=lambda key: int(key.rsplit(os.path.sep, 1)[1].split('.')[0].split('Z')[1]) returns: .txt shapefile paths ''' paths = list(os.walk(dirpath)) extfiles = [[ p[0], sorted([f for f in p[2] if f[-len(ext):] == ext], key=keyorder) ] for p in paths if len(p[2]) > 0] extpaths = reduce(np.append, [ list(map(os.path.join, (path[0] for p in path[1]), path[1])) for path in extfiles ], np.array([], dtype=object)) return sorted(extpaths, key=keyorder)
[docs] def getfiledate(filename): ''' attempt to parse the first valid epoch timestamp from .nm4 data file. timestamp will be returned as :class:`datetime.date` if successful, otherwise will return False if no date could be found args: filename (string) raw AIS data file in .nm4 format ''' filesize = os.path.getsize(filename) if filesize == 0: # pragma: no cover return False with open(filename, 'r') as f: if filename.lower()[-3:] == "csv": reader = csv.reader(f) head = next(reader) row1 = next(reader) rowdict = {a: b for a, b in zip(head, row1)} fdate = datetime.strptime(rowdict['Time'], '%Y%m%d_%H%M%S').date() return fdate else: line = f.readline() head = line.rsplit('\\', 1)[0] n = 0 while 'c:' not in head: # pragma: no cover n += 1 line = f.readline() head = line.rsplit('\\', 1)[0] #if n > 10000: # print(f'bad! {filename}') # return False assert n <= 10000 split0 = re.split('c:', head)[1] try: epoch = int(re.split('[^0-9]', split0)[0]) except ValueError: # pragma: no cover return False except Exception as err: # pragma: no cover raise err fdate = datetime.fromtimestamp(epoch).date() return fdate