Source code for aisdb.webdata.bathymetry

''' load bathymetry data from GEBCO raster files '''

import hashlib
import os
import zipfile
from functools import reduce

import numpy as np
import requests
import warnings
from tqdm import tqdm

from aisdb.gis import shiftcoord
from aisdb.webdata.load_raster import RasterFile

url = 'https://www.bodc.ac.uk/data/open_download/gebco/gebco_2022/geotiff/'


def _filebounds(fpath):
    return {
        f[0]: float(f[1:])
        for f in fpath.split('gebco_2022_', 1)[1].rsplit('.tif', 1)[0].split(
            '_')
    }


[docs] class Gebco(): def __init__(self, data_dir): ''' args: data_dir (string) folder where rasters should be stored ''' self.data_dir = data_dir assert os.path.isdir(data_dir) self.__enter__() def __enter__(self): self.rasterfiles = { f: _filebounds(f) for f in { k: None for k in sorted([ f for f in os.listdir(self.data_dir) if f[-4:] == '.tif' and 'gebco_2022' in f ]) } } # download bathymetry rasters if missing self.fetch_bathymetry_grid() return self def __exit__(self, exc_type, exc_val, exc_tb): self._close_all()
[docs] def fetch_bathymetry_grid(self): # pragma: no cover """ download geotiff zip archive and extract it """ zipf = os.path.join(self.data_dir, "gebco_2022_geotiff.zip") try: # download the file if necessary if not os.path.isfile(zipf): print('downloading gebco bathymetry...') with requests.get(url, stream=True) as payload: assert payload.status_code == 200, 'error fetching file' with open(zipf, 'wb') as f: with tqdm(total=4011413504, desc=zipf, unit='B', unit_scale=True) as t: for chunk in payload.iter_content(chunk_size=8192): _ = t.update(f.write(chunk)) with open(zipf, 'rb') as z: sha256sum = hashlib.sha256(z.read()).hexdigest() print('verifying checksum...') assert sha256sum == '5ade15083909fcd6003409df678bdc6537b8691df996f8d806b48de962470cc3',\ 'checksum failed!' with zipfile.ZipFile(zipf, 'r') as zip_ref: members = list( set(zip_ref.namelist()) - set(sorted(os.listdir(self.data_dir)))) print('extracting bathymetry data...') zip_ref.extractall(path=self.data_dir, members=members) except (Exception, KeyboardInterrupt) as err: os.remove(os.path.join(self.data_dir, "gebco_2022_geotiff.zip")) raise (err) return
def _load_raster(self, key): self.rasterfiles[key]['raster'] = RasterFile( imgpath=os.path.join(self.data_dir, key)) def _check_in_bounds(self, track): for lon, lat in zip(track['lon'], track['lat']): if not (-180 <= lon <= 180) or not (-90 <= lat <= 90): # pragma: no cover warnings.warn('coordinates out of range!') lon = shiftcoord([lon])[0] lat = shiftcoord([lat], rng=90)[0] if os.environ.get('DEBUG'): tracer = False for key, bounds in self.rasterfiles.items(): if (bounds['w'] <= lon <= bounds['e'] and bounds['s'] <= lat <= bounds['n']): tracer = True if 'raster' not in bounds.keys(): self._load_raster(key) yield key break if os.environ.get('DEBUG') and not tracer: print(f'{lon = } {lat = }') assert tracer return def _close_all(self): for filepath, bounds in self.rasterfiles.items(): if 'raster' in bounds.keys(): bounds['raster'].img.close()
[docs] def merge_tracks(self, tracks): ''' append `depth_metres` column to track dictionaries ''' for track in tracks: # mapping of filepaths to the corresponding boundary region raster_keys = np.array(list(self._check_in_bounds(track)), dtype=object) # ensure that each vector time slice has a value if len(raster_keys) != len(track['time']): raise ValueError('no rasters found for track') bathy_segments = np.append( np.append([0], np.where(raster_keys[:-1] != raster_keys[:1])[0]), [len(raster_keys)], ) track['depth_metres'] = reduce(np.append, [ self.rasterfiles[raster_keys[i]] ['raster']._track_coordinate_values( track, rng=range(bathy_segments[i], bathy_segments[i + 1])) for i in range(len(bathy_segments) - 1) ]) * -1 track['dynamic'] = set(track['dynamic']).union( set(['depth_metres'])) yield track