Introduction
AISDB was created to provide a complete set of tools to aid in the collection and processing of AIS data. AISDB can be used with either livestreaming AIS or historical raw AIS data files. The core data model behind AISDB is an SQLite database, and AISDB provides a Python interface to interact with the database; from database creation, querying, data processing, visualization, and exportation of data in CSV format. AISDB also provides tools for integrating AIS with environmental data in raster file formats. For example, AISDB provides convenience functions to download ocean bathymetric chart grids which may be used to query seafloor depth beneath each surface vessel position, although any raster data using longitude/latitude coordinate grid cells may be appended.
Index
- Python Environment
Install with Pip
Install with Docker
- Database Creation
From MERIDIAN’s data livestream
From historical AIS data files
- Querying the database
Connect to the database
Get vessel trajectories from the database
Query a bounding box encapsulating a collection of zone polygons
- Processing AIS messages
Voyage modelling
Data cleaning and MMSI deduplication
Interpolating, Geofencing, and filtering
Exporting as CSV
- Integration with external metadata
Retrieve detailed vessel metadata from marinetraffic.com
Bathymetric charts
Rasters
- Visualization
Display AIS data in the web interface
- Data collection and sharing
Setting up an AIS receiving antenna
Sharing data to external networks
0. Python Environment
Requires Python version 3.8 or newer.
Optionally requires SQLite (included in Python) or PostgresQL server (installed separately).
The AISDB Python package can be installed using pip.
It is recommended to install the package in a virtual Python environment such as venv
.
python -m venv env_ais
source ./env_ais/*/activate
pip install aisdb
For information on installing AISDB from source code, see Installing from Source
AISDB may also be used with docker-compose to run as services, for more info see AISDB Docker docs. The Python code in the rest of this document can then be run in the new Python environment. When using an interpreter such as Jupyter, ensure that jupyter is installed in the same environment as AISDB.
1. Database Creation
Creating a database from live streaming data
A typical workflow for using AISDB requires a database of recorded AIS messages.
The following code snippet demonstrates how to create a new database from MERIDIAN’s AIS data stream.
with the argument stdout=True
, the raw message input will be copied to stdout before it is decoded and added to the database.
Also see the receiver api docs: aisdb.receiver.start_receiver()
.
from aisdb.receiver import start_receiver
start_receiver(connect_addr='aisdb.meridian.cs.dal.ca:9920', sqlite_dbpath='AIS.sqlitedb', stdout=True)
Creating a database from historical data files
An SQLite database file will be created at the specified database path dbpath
.
The source
string will be stored in the database along with the decoded message data, making it easier to integrate data from multiple sources into the same database.
A checksum of the first 1000 bytes from the input file will be stored to prevent processing the same data file twice.
Checksum validation can be disabled by adding the argument skip_checksum=True
.
Decoding speed can be improved by placing the raw data files on a seperate hard drive from the database.
import aisdb
aisdb.decode_msgs(
filepaths=['aisdb/tests/test_data_20210701.csv', 'aisdb/tests/test_data_20211101.nm4'],
dbpath='AIS.sqlitedb',
dbconn=aisdb.DBConn(),
source='TESTING',
)
The decoder accepts raw AIS data in the .nm4
format, as long as a timestamp is included in the message header.
For example:
\s:41925,c:1635731889,t:1635731965*66\!AIVDM,1,1,,,19NSRM@01v;inKaVqpGVUmN:00Rh,0*7C
\s:41925,c:1635731889,t:1635731965*66\!AIVDM,1,1,,,15Benl0000<P7Te`HQFVrU<804;`,0*39
\s:41925,c:1635731889,t:1635731965*66\!AIVDM,1,1,,,17`BO@7P@9;sbjwUDa7uSH:@00RQ,0*35
CSV formatted data files can also be used to create a database. When using CSV, the following header is expected:
MMSI,Message_ID,Repeat_indicator,Time,Millisecond,Region,Country,Base_station,Online_data,Group_code,Sequence_ID,Channel,Data_length,Vessel_Name,Call_sign,IMO,Ship_Type,Dimension_to_Bow,Dimension_to_stern,Dimension_to_port,Dimension_to_starboard,Draught,Destination,AIS_version,Navigational_status,ROT,SOG,Accuracy,Longitude,Latitude,COG,Heading,Regional,Maneuver,RAIM_flag,Communication_flag,Communication_state,UTC_year,UTC_month,UTC_day,UTC_hour,UTC_minute,UTC_second,Fixing_device,Transmission_control,ETA_month,ETA_day,ETA_hour,ETA_minute,Sequence,Destination_ID,Retransmit_flag,Country_code,Functional_ID,Data,Destination_ID_1,Sequence_1,Destination_ID_2,Sequence_2,Destination_ID_3,Sequence_3,Destination_ID_4,Sequence_4,Altitude,Altitude_sensor,Data_terminal,Mode,Safety_text,Non-standard_bits,Name_extension,Name_extension_padding,Message_ID_1_1,Offset_1_1,Message_ID_1_2,Offset_1_2,Message_ID_2_1,Offset_2_1,Destination_ID_A,Offset_A,Increment_A,Destination_ID_B,offsetB,incrementB,data_msg_type,station_ID,Z_count,num_data_words,health,unit_flag,display,DSC,band,msg22,offset1,num_slots1,timeout1,Increment_1,Offset_2,Number_slots_2,Timeout_2,Increment_2,Offset_3,Number_slots_3,Timeout_3,Increment_3,Offset_4,Number_slots_4,Timeout_4,Increment_4,ATON_type,ATON_name,off_position,ATON_status,Virtual_ATON,Channel_A,Channel_B,Tx_Rx_mode,Power,Message_indicator,Channel_A_bandwidth,Channel_B_bandwidth,Transzone_size,Longitude_1,Latitude_1,Longitude_2,Latitude_2,Station_Type,Report_Interval,Quiet_Time,Part_Number,Vendor_ID,Mother_ship_MMSI,Destination_indicator,Binary_flag,GNSS_status,spare,spare2,spare3,spare4
The aisdb.database.decoder.decode_msgs()
function also accepts compressed .zip
and .gz
file formats as long as they can be decoded into either nm4 or CSV.
2. Querying the Database
Parameters for the database query can be defined using aisdb.database.dbqry.DBQuery
.
Iterate over rows returned from the database for each vessel with aisdb.database.dbqry.DBQuery.gen_qry()
.
Convert the results into generator yielding dictionaries with numpy arrays describing position vectors e.g. lon, lat, and time using aisdb.track_gen.TrackGen()
.
The following query will return vessel positions from the past 48 hours:
import aisdb
from datetime import datetime, timedelta
with aisdb.DBConn() as dbconn:
qry = aisdb.DBQuery(
dbconn=dbconn,
dbpath='AIS.sqlitedb',
callback=aisdb.database.sql_query_strings.in_timerange,
start=datetime.utcnow() - timedelta(hours=48),
end=datetime.utcnow(),
)
for vessel in aisdb.TrackGen(qry.gen_qry()):
print(vessel)
A specific region can be queried for AIS data using aisdb.gis.Domain
or one of its subclasses to define a collection of shapely
polygon features.
For this example, the domain contains a single bounding box polygon derived from a longitude/latitude coordinate pair and radial distance specified in meters.
If multiple features are included in the domain object, the domain boundaries will encompass the convex hull of all features contained within.
with DBConn() as dbconn:
domain = aisdb.DomainFromPoints(points=[(-63.6, 44.6),], radial_distances=[5000,])
qry = aisdb.DBQuery(
dbconn=dbconn,
dbpath='AIS.sqlitedb',
callback=aisdb.database.sqlfcn_callbacks.in_bbox_time_validmmsi,
start=datetime.utcnow() - timedelta(hours=48),
end=datetime.utcnow(),
xmin=domain.boundary['xmin'],
xmax=domain.boundary['xmax'],
ymin=domain.boundary['ymin'],
ymax=domain.boundary['ymax'],
)
for vessel in aisdb.TrackGen(qry.gen_qry()):
print(vessel)
Additional query callbacks for filtering by region, timeframe, identifier, etc. can be found in aisdb.database.sql_query_strings
and aisdb.database.sqlfcn_callbacks
3. Processing
Voyage Modelling
The generator described above can be input into a processing function, yielding modified results. For example, to model the activity of vessels on a per-voyage or per-transit basis, each voyage is defined as a continuous vector of vessel positions where the time between observed timestamps never exceeds a 24-hour period.
import aisdb
from datetime import datetime, timedelta
maxdelta = timedelta(hours=24)
with aisdb.DBConn() as dbconn:
qry = aisdb.DBQuery(
dbconn=dbconn,
dbpath='AIS.sqlitedb',
callback=aisdb.database.sql_query_strings.in_timerange,
start=datetime.utcnow() - timedelta(hours=48),
end=datetime.utcnow(),
)
tracks = aisdb.TrackGen(qry.gen_qry())
track_segments = aisdb.split_timedelta(tracks, maxdelta)
for segment in track_segments:
print(segment)
Data cleaning and MMSI deduplication
A common issue with AIS is that the data is noisy, and databases may contain multiple vessels broadcasting with same identifier at the same time.
The aisdb.denoising_encoder.encode_greatcircledistance()
function uses an encoder to check the approximate distance between each vessel’s position, and then segments resulting vectors where a surface vessel couldn’t reasonably travel there using the most direct path, e.g. above 50 knots.
A distance threshold and speed threshold are used as a hard limit on the maximum delta distance or delta time allowed between messages to be considered continuous.
A score is computed for each position delta, with sequential messages in close proximity at shorter intervals given a higher score, calculated by haversine distance divided by elapsed time.
Any deltas with a score not reaching the minimum threshold are considered as the start of a new segment.
Finally, the beginning of each new segment is compared to the end of each existing segment with a matching vessel identifier, and if the delta exceeds the minimum score, the segments are concatenated.
If multiple existing trajectories meet the minimum score threshold, the new segment will be concatenated the existing segment with the highest score.
Processing functions may be executed in sequence as a processing chain or pipeline, so after segmenting the individual voyages as shown above, results can be input into the encoder to effectively remove noise and correct for vessels with duplicate identifiers.
import aisdb
from datetime import datetime, timedelta
maxdelta = timedelta(hours=24)
distance_threshold = 200000 # meters
speed_threshold = 50 # knots
minscore = 1e-6
with aisdb.DBConn() as dbconn:
qry = aisdb.DBQuery(
dbconn=dbconn,
dbpath='AIS.sqlitedb',
callback=aisdb.database.sql_query_strings.in_timerange,
start=datetime.utcnow() - timedelta(hours=48),
end=datetime.utcnow(),
)
tracks = aisdb.TrackGen(qry.gen_qry())
track_segments = aisdb.split_timedelta(tracks, maxdelta)
tracks_encoded = aisdb.encode_greatcircledistance(track_segments, distance_threshold=distance_threshold, speed_threshold=speed_threshold, minscore=minscore)
In this second example, artificial noise is introduced into the tracks as a hyperbolic demonstration of the denoising capability. The resulting cleaned tracks are then displayed in the web interface.
import os
from datetime import datetime
import aisdb
from aisdb import DBQuery, DBConn
from aisdb.gis import DomainFromTxts
from dotenv import load_dotenv
load_dotenv()
dbpath = os.environ.get('EXAMPLE_NOISE_DB', 'AIS.sqlitedb')
trafficDBpath = os.environ.get('AISDBMARINETRAFFIC', 'marinetraffic.db')
domain = DomainFromTxts('EastCoast', folder=os.environ.get('AISDBZONES'))
start = datetime(2021, 7, 1)
end = datetime(2021, 7, 2)
default_boundary = {'xmin': -180, 'xmax': 180, 'ymin': -90, 'ymax': 90}
def random_noise(tracks, boundary=default_boundary):
for track in tracks:
i = 1
while i < len(track['time']):
track['lon'][i] *= track['mmsi']
track['lon'][i] %= (boundary['xmax'] - boundary['xmin'])
track['lon'][i] += boundary['xmin']
track['lat'][i] *= track['mmsi']
track['lat'][i] %= (boundary['ymax'] - boundary['ymin'])
track['lat'][i] += boundary['ymin']
i += 2
yield track
with DBConn() as dbconn:
vinfoDB = aisdb.webdata.marinetraffic.VesselInfo(trafficDBpath).trafficDB
qry = DBQuery(
dbconn=dbconn,
dbpath=dbpath,
start=start,
end=end,
callback=aisdb.database.sqlfcn_callbacks.in_bbox_time_validmmsi,
**domain.boundary,
)
rowgen = qry.gen_qry(fcn=aisdb.database.sqlfcn.crawl_dynamic_static)
tracks = aisdb.track_gen.TrackGen(rowgen, decimate=True)
tracks = aisdb.webdata.marinetraffic.vessel_info(tracks, vinfoDB)
tracks = random_noise(tracks, boundary=domain.boundary)
tracks = aisdb.encode_greatcircledistance(tracks,
distance_threshold=50000,
minscore=1e-5,
speed_threshold=50)
if __name__ == '__main__':
aisdb.web_interface.visualize(
tracks,
domain=domain,
visualearth=True,
open_browser=True,
)
Interpolating, geofencing and filtering
Building on the above processing pipeline, the resulting cleaned trajectories can then be geofenced and filtered for results contained by atleast one domain polygon, and interpolated for uniformity.
# ...
domain = aisdb.DomainFromPoints(points=[(-63.6, 44.6),], radial_distances=[5000,])
tracks_filtered = aisdb.track_gen.fence_tracks(tracks_encoded, domain)
tracks_interp = aisdb.interp_time(tracks_filtered, step=timedelta(minutes=15))
for segment in track_segments:
print(segment)
Additional processing functions can be found in the aisdb.track_gen
module.
Exporting as CSV
The resulting processed voyage data can be exported in CSV format instead of being printed:
# ...
aisdb.write_csv(tracks_interp, 'ais_24h_processed.csv')
4. Integration with external metadata
AISDB supports integration with external data sources such as bathymetric charts and other raster grids.
Bathymetric charts
To determine the approximate ocean depth at each vessel position, the aisdb.webdata.bathymetry
module can be used.
import aisdb
# set the data storage directory
data_dir = './testdata/'
# download bathymetry grid from the internet
bathy = aisdb.webdata.bathymetry.Gebco(data_dir=data_dir)
bathy.fetch_bathymetry_grid()
Once the data has been downloaded, the Gebco()
class may be used to append bathymetric data to tracks in the context of a TrackGen
processing pipeline in the same manner as the processing functions described above.
# ...
tracks = aisdb.TrackGen(qry.gen_qry())
tracks_bathymetry = bathy.merge_tracks(tracks)
Also see aisdb.webdata.shore_dist.ShoreDist
for determining approximate nearest distance to shore from vessel positions.
Rasters
Similarly, abritrary raster coordinate-gridded data may be appended to vessel tracks
# ...
tracks = aisdb.TrackGen(qry.gen_qry())
raster_path './GMT_intermediate_coast_distance_01d.tif'
raster = aisdb.webdata.load_raster.RasterFile(raster_path)
tracks = raster.merge_tracks(tracks, new_track_key="coast_distance")
Detailed metadata from marinetraffic.com
5. Visualization
AIS data from the database may be overlayed on a map such as the one shown above by using the aisdb.web_interface.visualize()
function.
This function accepts a generator of track dictionaries such as those output by aisdb.track_gen.TrackGen()
.
The color of each vessel track is determined by vessel type metadata.
import os
from datetime import datetime, timedelta
import aisdb
import aisdb.web_interface
from aisdb.tests.create_testing_data import (
sample_database_file,
random_polygons_domain,
)
domain = random_polygons_domain()
example_dir = 'testdata'
if not os.path.isdir(example_dir):
os.mkdir(example_dir)
dbpath = os.path.join(example_dir, 'example_visualize.db')
months = sample_database_file(dbpath)
start = datetime(int(months[0][0:4]), int(months[0][4:6]), 1)
end = datetime(int(months[1][0:4]), int(months[1][4:6]) + 1, 1)
def color_tracks(tracks):
''' set the color of each vessel track using a color name or RGB value '''
for track in tracks:
track['color'] = 'red' or 'rgb(255,0,0)'
yield track
with aisdb.SQLiteDBConn() as dbconn:
qry = aisdb.DBQuery(
dbconn=dbconn,
dbpath=dbpath,
start=start,
end=end,
callback=aisdb.sqlfcn_callbacks.valid_mmsi,
)
rowgen = qry.gen_qry()
tracks = aisdb.track_gen.TrackGen(rowgen, decimate=False)
tracks_segment = aisdb.track_gen.split_timedelta(tracks,
timedelta(weeks=4))
tracks_colored = color_tracks(tracks_segment)
if __name__ == '__main__':
aisdb.web_interface.visualize(
tracks_colored,
domain=domain,
visualearth=True,
open_browser=True,
)
6. Data collection and sharing
AIS station operators are may set up an AIS base station with a Raspberry Pi receiver client, and store messages in a database with the receiver server. For instructions on transmitting AIS from a Raspberry Pi to the receiver server, see Setting up an AIS receiver client. The receiver server can then forward incoming filtered messages to a downstream UDP multicast channel:
import aisdb
# listen for incoming raw AIS messages on port 9921 and share with MERIDIAN network
aisdb.start_receiver(
udp_listen_addr='0.0.0.0:9921',
multicast_rebroadcast_addr='aisdb.meridian.cs.dal.ca:9921',
)