139 lines
3.3 KiB
Python
139 lines
3.3 KiB
Python
import pytest
|
|
import requests
|
|
from LatLon23 import LatLon
|
|
from geopy import distance
|
|
from math import isclose
|
|
|
|
FLOCK_SERVER="http://127.0.0.1:5000"
|
|
|
|
UACM = (19.37425,-99.17168)
|
|
|
|
points = [
|
|
(19.37634,-99.12660), # playa pie de la cuesta y mirador
|
|
(19.37521,-99.11923), # antropólogos y cardiólogos
|
|
(19.37521,-99.11923), # antropólogos y cardiólogos
|
|
]
|
|
|
|
bikes = []
|
|
|
|
from pymongo import MongoClient
|
|
|
|
client = MongoClient()
|
|
db = client.test
|
|
col = db["bike"]
|
|
d = col.delete_many({})
|
|
print(d)
|
|
|
|
|
|
def flip(coord):
|
|
return (coord[1], coord[0])
|
|
|
|
def as_tuple(coord):
|
|
"""
|
|
Return LatLon point as (lat, lon) tuple
|
|
"""
|
|
return (float(coord.lat),
|
|
float(coord.lon))
|
|
|
|
|
|
class Bike(dict):
|
|
|
|
def server_register(self, dest):
|
|
self['dest'] = dest
|
|
|
|
register_url = "{server}/register/?dest_lon={dest_lon}&dest_lat={dest_lat}"
|
|
r = requests.get(register_url.format(server=FLOCK_SERVER,
|
|
dest_lon=dest[1],
|
|
dest_lat=dest[0]))
|
|
out = r.json()
|
|
|
|
self.update({'bike_id': out['bike_id'], })
|
|
# "flock_avg_speed":3.5,
|
|
# "flock_distance":0.2105884044243021,
|
|
# "flock_heading":71.99999999869847,
|
|
# "flock_size": out['flock_size']})
|
|
|
|
|
|
def server_update(self):
|
|
update_url = "{server}/update/{bike_id}/?lat={lat}&lon={lon}&speed={speed}&bearing={bearing}"
|
|
req = requests.get(update_url.format(server=FLOCK_SERVER,
|
|
bike_id=self['bike_id'],
|
|
speed=self['speed'],
|
|
bearing=self['bearing'],
|
|
lat=self['lat'],
|
|
lon=self['lon']))
|
|
resp = req.json()
|
|
self.update(resp)
|
|
|
|
|
|
def test_register():
|
|
global bikes
|
|
|
|
|
|
for i, p in enumerate(points):
|
|
# bikes will be bound north (90 degrees), 10km away
|
|
t0 = LatLon(*p)
|
|
t1 = t0.offset(90, 10)
|
|
|
|
bikes.append(Bike())
|
|
bikes[i].server_register(dest=as_tuple(t1))
|
|
|
|
assert 'bike_id' in bikes[i]
|
|
|
|
|
|
|
|
def test_update():
|
|
"""
|
|
test update with coords, which are points of origin for
|
|
our test bikes.
|
|
"""
|
|
global bikes
|
|
|
|
for i, p in enumerate(points):
|
|
t0 = LatLon(*p)
|
|
bearing = t0.heading_initial(LatLon(*bikes[i]['dest']))
|
|
|
|
bikes[i].update(
|
|
{'speed': 3,
|
|
'bearing': bearing,
|
|
'lat': p[0],
|
|
'lon': p[1],
|
|
})
|
|
|
|
bikes[i].server_update()
|
|
|
|
assert isclose(bikes[i]['bearing'], 90)
|
|
|
|
|
|
|
|
def test_flock_forward():
|
|
"""
|
|
Bike 1 is closely followed by 2. Bike 1 updates at t=0, bike 2
|
|
twenty seconds later.
|
|
|
|
test for proper distances, flock heading, dest heading, etc.
|
|
"""
|
|
global bikes
|
|
|
|
#
|
|
t0 = LatLon(*points[0])
|
|
t1 = t0.offset(45, 1)
|
|
|
|
bikes[0].update(
|
|
{'speed': 3,
|
|
'bearing': t0.heading_initial(t1),
|
|
'lat': t1.lat,
|
|
'lon': t1.lon,
|
|
})
|
|
|
|
bikes[0].server_update()
|
|
|
|
print(bikes)
|
|
|
|
#b2.server_update(lat, lon, speed, bearing)
|
|
|
|
assert True
|
|
|
|
|
|
|