Coverage for aisdb/web_interface.py: 29%
104 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 04:22 +0000
1import asyncio
2import http.server
3import logging
4import multiprocessing
5import os
6import socketserver
7import webbrowser
8from datetime import datetime
9from functools import partial
10from tempfile import SpooledTemporaryFile
12import orjson
13import websockets.server
15logging.getLogger("websockets").setLevel(logging.WARNING)
16logging.getLogger("shapely").setLevel(logging.WARNING)
18wwwpath = os.path.abspath(
19 os.path.join(os.path.dirname(os.path.dirname(__file__)), 'aisdb_web',
20 'dist_map'))
22wwwpath_visualearth = os.path.abspath(
23 os.path.join(os.path.dirname(os.path.dirname(__file__)), 'aisdb_web',
24 'dist_map_bingmaps'))
27def _start_webclient(visualearth=False):
28 if not visualearth:
29 path = wwwpath
30 else:
31 path = wwwpath_visualearth
33 class AISDB_HTML(http.server.SimpleHTTPRequestHandler):
35 extensions_map = {
36 '': 'application/octet-stream',
37 '.css': 'text/css',
38 '.html': 'text/html',
39 '.jpg': 'image/jpg',
40 '.js': 'application/x-javascript',
41 '.json': 'application/json',
42 '.png': 'image/png',
43 '.wasm': 'application/wasm',
44 }
46 def __init__(self, *args, **kwargs):
47 super().__init__(*args, directory=path, **kwargs)
49 socketserver.TCPServer.allow_reuse_address = True
50 with socketserver.TCPServer(("localhost", 3000), AISDB_HTML) as httpd:
51 try:
52 print('Serving HTTP assets on localhost:3000')
53 httpd.serve_forever()
54 except KeyboardInterrupt:
55 httpd.server_close()
56 httpd.shutdown()
57 except Exception as e:
58 httpd.server_close()
59 httpd.shutdown()
60 raise e
63def serialize_zone_json(name, zone) -> bytes:
64 zone_dict = {
65 'msgtype': 'zone',
66 'meta': {
67 'name': name
68 },
69 'x': tuple(zone['geometry'].boundary.xy[0]),
70 'y': tuple(zone['geometry'].boundary.xy[1]),
71 't': [],
72 }
73 return orjson.dumps(zone_dict)
76def serialize_track_json(track) -> (bytes, bytes):
77 ''' serializes a single track dictionary to JSON format encoded as UTF8 '''
78 vector = {
79 'msgtype': 'track_vector',
80 # currently, database_server sends all metadata as strings
81 # reproduce this behaviour by coercion to string type, even for int
82 'meta': {
83 'mmsi': str(track['mmsi'])
84 },
85 't': track['time'],
86 'x': track['lon'],
87 'y': track['lat'],
88 }
90 meta = {k: track[k] for k in track['static'] if k != 'marinetraffic_info'}
92 if 'color' in track.keys():
93 meta['color'] = track['color']
95 meta['msgtype'] = 'vesselinfo'
97 if 'marinetraffic_info' in track.keys():
98 meta.update({
99 k: track['marinetraffic_info'][k]
100 for k in track['marinetraffic_info'].keys()
101 })
103 vector_json = orjson.dumps(vector, option=orjson.OPT_SERIALIZE_NUMPY)
104 meta_json = orjson.dumps(meta)
105 return (vector_json, meta_json)
108async def _send_tracks(websocket, tmp_vectors, tmp_meta, domain=None):
109 ''' send tracks serialized as JSON to the connected websocket client '''
110 done = {}
111 async for message_json in websocket:
112 message = orjson.loads(message_json)
113 print(
114 f'{websocket.remote_address[0]}:{websocket.remote_address[1]} - received: {message}'
115 )
117 if message == {"msgtype": "validrange"}:
118 now = datetime.now().timestamp()
119 validrange = {"msgtype": "validrange", "start": now, "end": now}
120 await websocket.send(orjson.dumps(validrange))
121 done['validrange'] = True
123 elif message == {"msgtype": "zones"}:
124 await websocket.send(b'{"msgtype": "doneZones"}')
125 done['zones'] = True
126 elif message == {"msgtype": "meta"}:
127 done['meta'] = True
128 else:
129 raise RuntimeError(f'unknown request {message_json}')
131 if 'validrange' in done.keys() and 'zones' in done.keys():
132 assert len(done.keys()) == 2
134 if domain is not None:
135 for name, zone in domain.zones.items():
136 zone_json = serialize_zone_json(name, zone)
137 await websocket.send(zone_json)
139 tmp_vectors.seek(0)
140 for vector_json in tmp_vectors:
141 await websocket.send(vector_json)
143 elif 'meta' in done.keys():
144 assert len(done.keys()) == 1
145 tmp_meta.seek(0)
146 for meta_json in tmp_meta:
147 await websocket.send(meta_json)
150async def _start_webserver(tracks,
151 domain=None,
152 visualearth=False,
153 open_browser=True):
154 ''' Display tracks in the web interface. Serves data to the web client '''
155 print('Querying database...', end='\t')
156 with SpooledTemporaryFile(max_size=1024 * 1e6, newline=b'\n') as vectors, \
157 SpooledTemporaryFile(max_size=256 * 1e6, newline=b'\n') as meta:
158 for vector, info in map(serialize_track_json, tracks):
159 vectors.write(vector)
160 vectors.write(b'\n')
161 meta.write(info)
162 meta.write(b'\n')
164 print('done query')
166 if open_browser:
167 print('Opening a new browser window to display track data. '
168 'Press Ctrl-C to stop the server and close the webpage')
169 tag = 1 if not visualearth else 2
170 url = f'http://localhost:3000/index.html?python={tag}&z=2'
171 if not webbrowser.open_new_tab(url):
172 print(f'Failed to open webbrowser, instead use URL: {url}')
174 fcn = partial(_send_tracks,
175 tmp_vectors=vectors,
176 tmp_meta=meta,
177 domain=domain)
178 async with websockets.server.serve(fcn, 'localhost', 9924) as server:
179 stop = asyncio.Future()
180 await stop
181 await server
184def visualize(tracks, domain=None, visualearth=False, open_browser=True):
185 ''' Display tracks using the web interface.
187 Starts the web client HTTP server in a separate process, and
188 serves track data via websocket on port 9924.
190 If a domain object is given, zone polygons will be drawn on the map
191 from domain zone geometries.
193 If visualearth is True, microsoft visual earth map tiles will be used
194 for the map background.
196 If open_browser is True, python will attempt to open the web application
197 in a new tab using the default browser.
199 To customize the color of each vessel track, set the 'color' value to
200 a color string or RGB value string:
202 >>> def color_tracks(tracks):
203 ... for track in tracks:
204 ... track['color'] = 'red' or 'rgb(255,0,0)'
205 ... yield track
206 ...
207 >>> tracks = [
208 ... {'mmsi': 204242000, 'lon': [-8.931666], 'lat':[41.45], 'time': [1625176725]},
209 ... {'mmsi': 204814000, 'lon': [-25.668333], 'lat': [37.736668], 'time': [1625147353]},
210 ... ]
211 >>> tracks_colored = color_tracks(tracks)
212 '''
213 proc = multiprocessing.Process(target=_start_webclient, args=[visualearth])
214 proc.start()
215 try:
216 asyncio.run(_start_webserver(tracks, domain, visualearth,
217 open_browser))
218 proc.join()
219 except KeyboardInterrupt:
220 print('Received KeyboardInterrupt, stopping server...')
221 proc.terminate()
222 except Exception as err:
223 proc.terminate()
224 raise err