flockompass/main.py
2023-11-20 16:09:19 -06:00

451 lines
16 KiB
Python

from plyer import gps, uniqueid
from kivy.app import App
from kivy.clock import Clock
from kivy.core.window import Window
from kivy.properties import DictProperty, NumericProperty, StringProperty
from kivy.logger import Logger
from kivy.clock import mainthread
from kivy.utils import platform
from kivy_garden.mapview import MapView
from kivy.uix.screenmanager import ScreenManager, Screen, RiseInTransition, SlideTransition
from kivy.uix.behaviors import ButtonBehavior
from kivy.uix.image import Image
from kivy.vector import Vector
from kivy.animation import Animation
from math import atan2, sin, cos, degrees, floor
import requests
from datetime import timedelta, datetime
from dateutil.parser import isoparse
from geopy.distance import geodesic
from os import path
import pickle
import hashlib
class IconButton(ButtonBehavior, Image):
pass
class MapScreen(Screen):
pass
class CompassScreen(Screen):
def __init__(self, **kwargs):
super(CompassScreen, self).__init__(**kwargs)
Window.bind(on_keyboard=self.android_back_click)
def android_back_click(self, window, key, *largs):
if key == 27 and self.parent is not None:
self.parent.current='map'
return True
class SettingsScreen(Screen):
def __init__(self, **kwargs):
super(SettingsScreen, self).__init__(**kwargs)
Window.bind(on_keyboard=self.android_back_click)
def android_back_click(self, window, key, *largs):
if key == 27 and self.parent is not None:
self.parent.transition = SlideTransition(direction='left')
self.parent.current='map'
return True
class FlockompassApp(App):
gps_data = DictProperty()
session_data = DictProperty()
needle_angle = NumericProperty(0)
fbearing = NumericProperty(0)
dbearing = NumericProperty(0)
dest_distance = StringProperty("")
dashboard_dest = StringProperty("")
dashboard_flock = StringProperty("")
theme = DictProperty(
{'needle': "assets/needle_dark.png",
'to_map': "assets/to_map_dark.png",
'bgcolor': "#336645"})
def read_settings(self):
app_folder = path.dirname(path.abspath(__file__))
pickle_path = path.join(app_folder, 'settings.pickle')
if path.isfile(pickle_path):
with open(pickle_path, 'rb') as f:
settings = pickle.load(f)
Logger.info('Loaded settings')
else:
settings = {'settings_smoothing': 0.88,
'settings_compass_update': 0.88,
'settings_theme': "dark",
'settings_flock_server': 'http://flocker.mooo.com'
}
with open(pickle_path, 'wb') as f:
pickle.dump(settings, f)
Logger.info('Wrote default settings')
self.session_data.update(settings)
if self.session_data['settings_theme'] == "dark":
self.set_dark_theme(True)
else:
self.set_dark_theme(False)
def save_settings(self):
app_folder = path.dirname(path.abspath(__file__))
pickle_path = path.join(app_folder, 'settings.pickle')
settings = {k: self.session_data[k]
for k in self.session_data
if k.startswith('settings_')}
with open(pickle_path, 'wb') as f:
pickle.dump(settings, f)
def set_smoothing(self):
self.session_data['settings_smoothing'] = self.ss.ids.slider_smoothing.value
self.save_settings()
def set_flock_server(self):
self.session_data['settings_flock_server'] = self.ss.ids.flock_server.value
self.save_settings()
def set_update_freq(self):
Logger.info('setting update freq to %s' % self.ss.ids.slider_update_freq.value)
self.session_data['settings_update_freq'] = self.ss.ids.slider_update_freq.value
self.save_settings()
Clock.unschedule(self.flock)
Clock.schedule_interval(self.flock,
self.session_data.get('settings_update_freq', 10.0))
def set_compass_update(self):
self.session_data['settings_compass_update'] = self.ss.ids.slider_compass_update.value
self.compass_disable()
self.compass_enable()
self.save_settings()
def set_destination(self):
self.compass_enable()
self.session_data['dest_lat'] = self.ms.ids.centermark.lat
self.session_data['dest_lon'] = self.ms.ids.centermark.lon
Clock.schedule_once(self.center_map_on_session, 0)
self.flock_server_register()
Clock.schedule_interval(self.flock,
self.session_data.get('settings_update_freq', 10.0))
def set_dark_theme(self, active):
if active:
self.theme = {'needle': "assets/needle_dark.png",
'to_map': "assets/to_map_dark.png",
'bgcolor': "#336645",
'fgcolor': "#f3e8d2"
}
self.session_data['settings_theme'] = 'dark'
else:
self.theme = {'needle': "assets/needle_bright.png",
'to_map': "assets/to_map_bright.png",
'bgcolor': "#f3e8d2",
'fgcolor': "#346645"
}
self.session_data['settings_theme'] = 'bright'
self.save_settings()
def request_android_permissions(self):
from android.permissions import request_permissions, Permission
def callback(permissions, results):
if all([res for res in results]):
Logger.info("All permissions granted.")
else:
Logger.info("Some permissions refused. %s" % results)
request_permissions([Permission.ACCESS_COARSE_LOCATION,
Permission.ACCESS_FINE_LOCATION],
callback)
def gps_start(self, minTime, minDistance):
gps.start(minTime, minDistance)
def gps_stop(self):
gps.stop()
@mainthread
def on_location(self, **kwargs):
self.gps_data = kwargs
if ('dest_lat' not in self.session_data and 'dest_lon' not in self.session_data):
self.session_data['dest_lat'] = self.gps_data['lat']
self.session_data['dest_lon'] = self.gps_data['lon']
Clock.schedule_once(self.center_map_on_session, 0)
# self.ms.ids.mapview.center_on(self.session_data['dest_lat'],
# self.session_data['dest_lon'])
else:
dest_distance = geodesic(
(self.gps_data['lat'], self.gps_data['lon']),
(self.session_data['dest_lat'], self.session_data['dest_lon'])).kilometers
if dest_distance < 1:
dest_distance *= 1000
dest_distance = "%i metros" % dest_distance
else:
dest_distance = "%0.1f km" % dest_distance
km_per_mile = 1.609344
self.dest_distance = "Destino: %s\nVel: %0.1f km/h" % (dest_distance,
self.gps_data['speed'] * km_per_mile)
if 'flock_size' in self.session_data:
flock_distance = geodesic(
(self.gps_data['lat'], self.gps_data['lon']),
(self.session_data['flock_lat'], self.session_data['flock_lon'])).kilometers
if flock_distance < 1:
flock_distance *= 1000
flock_distance = "%i metros" % flock_distance
else:
flock_distance = "%0.1f km" % flock_distance
self.dashboard_flock = "%s @ %s\nvel: %0.1f km/h" % (self.session_data['flock_size'],
flock_distance,
self.session_data['flock_avg_speed'] * km_per_mile)
else:
self.dashboard_flock = "no flocks"
self.session_data.update({'speed': self.gps_data['speed'] * km_per_mile,
'bearing': self.gps_data['bearing'],
'lat': self.gps_data['lat'],
'lon': self.gps_data['lon']})
def center_map_on_gps(self):
self.ms.ids.mapview.center_on(self.gps_data['lat'],
self.gps_data['lon'])
def center_map_on_session(self, *args):
self.ms.ids.mapview.center_on(self.session_data['dest_lat'],
self.session_data['dest_lon'])
def get_field(self, dt):
"""
get magnetic field for compass
"""
needle_angle = 0
if self.cs.facade.field != (None, None, None):
x, y, z = self.cs.facade.field
smoothingFactor = self.session_data.get('settings_smoothing', 0.88)
angle = atan2(y, x)
self.last_angle = smoothingFactor * self.last_angle + (1 - smoothingFactor) * angle
needle_angle = degrees(self.last_angle) - 90
# fix animation transition around the unit circle
if (self.needle_angle % 360) - needle_angle > 180:
needle_angle += 360
elif (self.needle_angle % 360) - needle_angle < -180:
needle_angle -= 360
# add the number of revolutions to the result
needle_angle += 360 * floor(self.needle_angle / 360.)
# animate the needle
if self.cs._anim:
self.cs._anim.stop(self)
self.cs._anim = Animation(needle_angle=needle_angle,
d=0.1,
t='out_quad')
lat1 = self.gps_data.get('lat', 0)
lon1 = self.gps_data.get('lon', 0)
lat2 = self.session_data.get('dest_lat', 0)
lon2 = self.session_data.get('dest_lon', 0)
dbearing = atan2(sin(lon2 - lon1) * cos(lat2),
cos(lat1) * sin(lat2)
- sin(lat1) * cos(lat2) * cos(lon2-lon1))
dbearing = self.needle_angle - degrees(dbearing)
self.cs._anim &= Animation(dbearing=dbearing,
d=0.1,
t='out_quad')
if 'flock_lat' in self.session_data:
self.cs.ids.to_flock.source = 'assets/needle_to_flock.png'
lat2 = self.session_data['flock_lat']
lon2 = self.session_data['flock_lon']
fbearing = atan2(sin(lon2 - lon1) * cos(lat2),
cos(lat1) * sin(lat2)
- sin(lat1) * cos(lat2) * cos(lon2-lon1))
fbearing = self.needle_angle - degrees(fbearing)
self.cs._anim &= Animation(fbearing=fbearing,
d=0.1,
t='out_quad')
else:
self.cs.ids.to_flock.source = 'assets/pivot.png'
self.cs._anim.start(self)
def compass_enable(self):
if hasattr(self, 'cs'):
self.cs.facade.enable()
Clock.schedule_interval(self.get_field,
1.0 - self.session_data.get('settings_compass_update', 0.88))
def compass_disable(self):
if hasattr(self, 'cs'):
self.cs.facade.disable()
Clock.unschedule(self.get_field)
def on_pause(self):
self.compass_disable()
self.gps_stop()
return True
def on_resume(self):
self.compass_enable()
self.gps_start(1000, 0)
def flock_server_register(self):
# flush session data
self.session_data.pop('bike_id', None)
self.session_data.pop('client_id', None)
self.session_data.pop('speed', None)
self.session_data.pop('bearing', None)
self.session_data.pop('last_update', None)
key = "%s" % uniqueid.get_uid()
client_id = str(hashlib.sha256(key.encode()).hexdigest())[0:10]
try:
register_url = "%s/register/" % self.session_data['settings_flock_server']
Logger.info('trying %s' % register_url)
r = requests.get(register_url,
params={'dest_lon': self.session_data['dest_lon'],
'dest_lat': self.session_data['dest_lat'],
'client_id': client_id},
timeout=0.5)
self.session_data['bike_id'] = r.json()['bike_id']
self.session_data['client_id'] = client_id
Logger.info('registered! got bike_id: %s' % self.session_data['bike_id'])
self.flock_server_update()
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
Logger.info('http error', status_code)
except requests.exceptions.Timeout:
Logger.info('comm timeout while registering with flock server')
except requests.exceptions.ConnectionError:
Logger.info('unable to connect while registering with flock server')
# finally:
def flock_server_update(self):
try:
update_url = "{server}/update/{bike_id}/?lat={lat}&lon={lon}&speed={speed}&bearing={bearing}"
req = requests.get("%s/update/%s/" % (self.session_data['settings_flock_server'],
self.session_data['bike_id']),
params={
'client_id': self.session_data['client_id'],
'speed': self.session_data.get('speed', 0),
'bearing': self.session_data.get('bearing', 0),
'lat': self.session_data['lat'],
'lon': self.session_data['lon']},
timeout=0.5)
resp = req.json()
if resp:
self.session_data['last_update'] = isoparse(resp.pop('last_update'))
if 'flock_size' not in resp:
self.session_data.pop('flock_avg_speed', None)
self.session_data.pop('flock_lat', None)
self.session_data.pop('flock_lon', None)
self.session_data.pop('flock_size', None)
except requests.exceptions.Timeout:
Logger.info('comm timeout while updating flock server')
def get_session_timeout(self):
min_step = 100 # update at least every 100 meters
speed = self.session_data.get('speed', None)
if speed:
seconds = min_step / speed
else:
seconds = 100
return timedelta(seconds=seconds)
def flock(self, dt):
if ('bike_id' in self.session_data
and 'last_update' in self.session_data
and (self.session_data.get('last_update', 0) + self.get_session_timeout()) > datetime.now()):
self.flock_server_update()
else:
self.flock_server_register()
def build(self):
# start GPS
try:
gps.configure(on_location=self.on_location)
except NotImplementedError:
import traceback
traceback.print_exc()
Logger.info('GPS is not implemented for your platform')
if platform == "android":
self.request_android_permissions()
self.gps_start(1000, 0)
# setup app screens
screen_manager = ScreenManager(transition=RiseInTransition())
# map screen
self.ms = MapScreen(name='map')
screen_manager.add_widget(self.ms)
# settings screen
self.ss = SettingsScreen(name='settings')
screen_manager.add_widget(self.ss)
# compass screen
# smoothing vars for compass
self.last_angle = 0
self.cs = CompassScreen(name='compass')
self.cs._anim = None
self.cs._anim1 = None
screen_manager.add_widget(self.cs)
return screen_manager
if __name__ == '__main__':
app = FlockompassApp()
app.read_settings()
app.run()