#!/usr/bin/env python
import socket
import math
from collections import defaultdict
import asyncore
import rospy
import tf
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Point, Pose, Quaternion, Twist, Vector3
[docs]class Circle(object):
"""An object that holds a point and a radius
"""
def __init__(self, point, radius):
self.center = point
self.radius = radius
[docs]class LocalinoPublisher:
"""Listens for Localino packets, triangulates the tags, and publishers their location to ROS
"""
def __init__(self):
# initialize the node
self.pub = rospy.Publisher('vo', Odometry, queue_size=100)
rospy.init_node('localino', anonymous=True)
self.vo_broadcaster = tf.TransformBroadcaster()
# TODO: parse settings from yaml file instead
# The tag ID of the localino tag mounted to the wheelchair
self.base_id = str(rospy.get_param("~base_tag_id", "1002"))
# The number of nanoseconds between samples before the data is ignored
# 50 us is approx 2x the time between responses from all 3 anchors
self.timeout = float(rospy.get_param("~timeout", 0.15))
# The rate at which each localino sends packets
self.rate = rospy.Rate(int(rospy.get_param("~poll_rate", 4000)))
# The UDP port to listen on for localino packets
self.port = int(rospy.get_param("~port", 10000))
# The IP to bind on, either all interfaces (0.0.0.0) or the localino subnet (192.168.4.255)
self.ip = rospy.get_param("~ip", '')
# Keep a list of the anchor names
self.anchor_names = str(rospy.get_param("~anchor_names", "9002,9003,9005")).split(',')
# Keep a list of the tag names
self.tag_ids = str(rospy.get_param("~tag_names", "1002,1001")).split(',')
self.anchor_coords = defaultdict(Point)
# Make a dictionary from the anchor's name to its position
if all(rospy.has_param("~anchor_%s" % anchor_name) for anchor_name in self.anchor_names):
for anchor_name in self.anchor_names:
coords = rospy.get_param("~anchor_%s" % anchor_name).split(',')
self.anchor_coords[anchor_name] = Point(float(coords[0]), float(coords[1]), 0)
else:
self.anchor_coords = {'9002': Point(0, 0, 0),
'9003': Point(3.78, 0.28, 0),
'9005': Point(1.12, 2.03, 0)
}
# Create 2D dictionaries to store distances reported from each anchor\tag pair
self.dists = {tagID: {anchorID: None for anchorID in self.anchor_names} for tagID in self.tag_ids}
# Create dictionaries to hold the last timestamp and
self.last_timestamp = {tagID: {anchorID: 0 for anchorID in self.anchor_names} for tagID in self.tag_ids}
# Bind to IP 0.0.0.0 UDP port 10000 using asyncore to capture the tag's traffic
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
self.bind(('', self.port))
# Never need to write, only read
[docs] def writeable(self):
"""Never need to write
Returns:
bool: False
"""
return False
# start the triangulation
[docs] def handle_read(self):
"""On a read, save the distance info and try to triangulate the tag
Overrides asyncore.dispatcher's handler
"""
data, addr = self.recvfrom(100)
rospy.loginfo("Received message:" + data.decode("ascii"))
data_arr = data.decode("utf-8").split(",")
if len(data_arr) == 5 and data_arr[0] in self.anchor_names and data_arr[1] in self.tag_ids:
# Give array of results human readable names
anchor_id = str(data_arr[0])
tag_id = str(data_arr[1])
# rospy.loginfo("Found tag "+tag_id+" type")
dist = data_arr[2]
# seq_num = data_arr[3]
# tag_power = data_arr[4]
# TODO: verify inputs to prevent crashing
# TODO: scale distances to make sure we get a correct coordinate
# TODO: test if expiring data via timeout or seq_num is better
self.dists[tag_id][anchor_id] = Circle(self.anchor_coords[anchor_id], float(dist))
self.last_timestamp[tag_id][anchor_id] = rospy.get_time()
# Each anchor responded with a distance to this tag
if None not in self.dists[tag_id].values():
# Check that the data isn't stale
delta_time = self.last_timestamp[tag_id][anchor_id] - min(self.last_timestamp[tag_id].values())
#rospy.loginfo("Have enough to do some calcs delta_time=%f s" % delta_time)
if delta_time < self.timeout:
# Use trilateration to locate the tag
# Algorithm from https://github.com/noomrevlis/trilateration
#rospy.loginfo("Didn't timeout")
inner_points = []
for p in LocalinoPublisher.get_all_intersecting_points(self.dists[tag_id].values()):
if LocalinoPublisher.is_contained_in_circles(p, self.dists[tag_id].values()):
inner_points.append(p)
center = LocalinoPublisher.get_polygon_center(inner_points)
self.dists = {tagID: {anchorID: None for anchorID in self.anchor_names} for tagID in self.tag_ids}
# An Odometry message generated at time=stamp and published on topic /vo
odom = Odometry()
odom.header.stamp = rospy.Time.now()
if tag_id == self.base_id:
#rospy.loginfo("Sending tf and publishing")
odom.header.frame_id = "vo"
odom.header.stamp = rospy.Time.now()
# Give the XY coordinates and set the covariance high on the rest so they aren't used
odom.pose.pose = Pose(center, Quaternion(1, 0, 0, 0))
# TODO: measure covariance with experiment + statistics
# This should be less accurate than the Arduino encoder odometry
odom.pose.covariance = [1e-9, 0, 0, 0, 0, 0,
0, 1e-3, 1e-9, 0, 0, 0,
0, 0, 1e6, 0, 0, 0,
0, 0, 0, 1e6, 0, 0,
0, 0, 0, 0, 1e6, 0,
0, 0, 0, 0, 0, 1e-9]
odom.child_frame_id = "map"
odom.twist.twist = Twist(Vector3(0, 0, 0), Vector3(0, 0, 0))
odom.twist.covariance = [1e-9, 0, 0, 0, 0, 0,
0, 1e-3, 1e-9, 0, 0, 0,
0, 0, 1e6, 0, 0, 0,
0, 0, 0, 1e6, 0, 0,
0, 0, 0, 0, 1e6, 0,
0, 0, 0, 0, 0, 1e-9]
self.pub.publish(odom)
self.vo_broadcaster.sendTransform((center.x, center.y, 0.), (1, 0, 0, 0),
odom.header.stamp, "map", "vo")
else:
# Publish a the tag's location to let Alexa know where to send the wheelchair
self.vo_broadcaster.sendTransform((center.x, center.y, 0.), (1, 0, 0, 0),
odom.header.stamp, "map", 'tag_' + str(tag_id))
#rospy.loginfo("Sending tf only, %s != %s" % (tag_id, self.base_id))
# Immediately after receiving all of a frame, the localinos will take 0.2-0.3ms before sending
# a new packet, so wait until then
self.rate.sleep()
else:
rospy.logwarn("Localino packet timed out at " + str(delta_time) + " ns")
#else:
# for anchor, dist in self.dists[tag_id].items():
# if dist is None:
# rospy.loginfo("Waiting for packet from anchor %s" % anchor)
[docs] @staticmethod
def get_two_circles_intersecting_points(c1, c2):
"""Finds the intersecting points between two circles
Args:
c1 (Circle): The first circle
c2 (Circle): The second circle
Returns:
list: two points where the circles intersect or None
"""
p1 = c1.center
p2 = c2.center
r1 = c1.radius
r2 = c2.radius
d = LocalinoPublisher.get_two_points_distance(p1, p2)
# if to far away, or self contained - can't be done
if d >= (r1 + r2) or d <= math.fabs(r1 - r2):
return None
a = (pow(r1, 2) - pow(r2, 2) + pow(d, 2)) / (2 * d)
h = math.sqrt(pow(r1, 2) - pow(a, 2))
x0 = p1.x + a * (p2.x - p1.x) / d
y0 = p1.y + a * (p2.y - p1.y) / d
rx = -(p2.y - p1.y) * (h / d)
ry = -(p2.x - p1.x) * (h / d)
return [Point(x0 + rx, y0 - ry, 0), Point(x0 - rx, y0 + ry, 0)]
[docs] @staticmethod
def get_all_intersecting_points(circles):
"""Finds the intersecting points among a list of circles
Args:
circles (list): A list of Circle objects
Returns:
List: A list of points where at least two circles intersect
"""
points = []
num = len(circles)
for i in range(num):
j = i + 1
for k in range(j, num):
res = LocalinoPublisher.get_two_circles_intersecting_points(circles[i], circles[k])
if res:
points.extend(res)
return points
[docs] @staticmethod
def get_two_points_distance(p1, p2):
"""Returns the distance between two points
Args:
p1 (geometry_msgs.msg.Point): The first point
p2 (geometry_msgs.msg.Point): The second point
Returns:
float: The distance between the two points
"""
return math.sqrt(pow((p1.x - p2.x), 2) + pow((p1.y - p2.y), 2))
[docs] @staticmethod
def is_contained_in_circles(point, circles):
"""Checks if a point is inside every circle
Args:
point (geometry_msgs.msg.Point): a point
circles (List): A list of Circles
Returns:
bool: True if the point is in the circles, false otherwise
"""
for circle in circles:
if LocalinoPublisher.get_two_points_distance(point, circle.center) > circle.radius:
return False
return True
[docs] @staticmethod
def get_polygon_center(points):
"""Returns a point in the middle of the other points
Args:
points (list): A list of Points
Returns:
geometry_msgs.msg.Point: The point at the center
"""
center = Point(0, 0, 0)
if len(points) != 0:
for point in points:
center.x += point.x
center.y += point.y
center.x = center.x / len(points)
center.y = center.y / len(points)
return center
if __name__ == "__main__":
lp = LocalinoPublisher()
try:
while not rospy.is_shutdown():
asyncore.loop(timeout=1, count=100)
except rospy.ROSInterruptException:
pass
asyncore.close_all()