#!/usr/bin/env python
import rospy
import evdev
import asyncore
import pyaudio
import wave
import rospkg
import time
import sys
from geometry_msgs.msg import Twist, Vector3
[docs]class SixaxisPublisher:
"""Sixaxis Publisher is a ROS joystick driver for PS3 controllers
This module looks for connected PlayStation 3 controllers and registers a callback to translate joystick inputs to
velocity commands.
"""
def __init__(self):
# set up node
rospy.init_node('sixaxis_publisher', anonymous=True)
# Publish velocities to motor
self.pub = rospy.Publisher('cmd_vel', Twist, queue_size=10)
# Default params
self.MAX_SPEED = float(rospy.get_param("~max_speed", 2.2))
self.MAX_REVERSE_SPEED = float(rospy.get_param("~max_reverse_speed", 0.5))
self.MAX_ROT_SPEED = float(rospy.get_param("~max_rot_speed", 1.75))
self.threshold = int(rospy.get_param("~joystick_threshold", 5))
is_test = int(rospy.get_param("~test", 0))
# Instance variables for helping the callback remember its state
self.used_key = False
# self.stopped = True
self.stop = False
self.rot_vel = 0
self.x_vel = 0
self.gamepad = None
# Setup audio
rospack = rospkg.RosPack()
self.wav = wave.open(rospack.get_path('sixaxis_publisher') + '/dixie-horn_daniel-simion.wav', 'r')
self.audio = pyaudio.PyAudio()
self.chunk = 1024
# Open an audio stream for the horn
try:
self.stream = self.audio.open(format=self.audio.get_format_from_width(self.wav.getsampwidth()),
channels=self.wav.getnchannels(),
rate=self.wav.getframerate(),
output=True)
except IOError:
self.stream = None
if is_test == 1:
self.sub = rospy.Subscriber('joytest', Vector3, self.callback)
self.is_test = True
else:
self.is_test = False
rospy.loginfo("Finding PS3 controller.")
self.gamepad = None
if not self.connect_joystick():
rospy.loginfo("Waiting for controller...")
# Check every second
rate = rospy.Rate(1.0)
while not rospy.is_shutdown():
if self.connect_joystick():
break
rate.sleep()
[docs] def connect_joystick(self):
if self.gamepad is not None:
return True
devices = [evdev.InputDevice(fn) for fn in evdev.list_devices()]
for device in devices:
if 'Sony PLAYSTATION(R)3 Controller' == device.name:
rospy.loginfo("Found the PS3 controller.")
self.gamepad = device
asyncore.file_dispatcher.__init__(self, self.gamepad)
return True
return False
[docs] def scale_stick(self, value):
"""Normalizes a joystick output byte (0-255) to the range -1 to 1
Args:
value (float or int): the value to scale from (0,255) to (-1,1)
Returns:
float: the normalized value
"""
return (float(value)*2/255) - 1.0
# Never need to write anything
[docs] def writable(self):
return False
# Close connection on error
[docs] def handle_expt(self):
self.close()
rospy.loginfo("Closing asyncore")
[docs] def handle_close(self):
self.close()
rospy.loginfo("Closing asyncore")
[docs] def callback(self, msg):
curr_time = time.time()
secs = int(curr_time)
usecs = int(((curr_time % 1) * 10e6))
event = evdev.InputEvent(sec=secs, usec=usecs, type=int(msg.x), code=int(msg.y), value=int(msg.z))
self.process_event(event)
if event.type == 0: # Syn packet, so done reading
self.send_cmd()
self.stop = False
[docs] def process_event(self, event):
# Check for joystick inputs and if we're in joystick mode
mag = abs(event.value - 128)
if event.type == 3 and not self.used_key:
if event.code == 4:
# right joystick y axis controls moving forward
if mag > self.threshold:
scaled = self.scale_stick(event.value)
if scaled < 0:
self.x_vel = -scaled * self.MAX_SPEED
else:
self.x_vel = -scaled * self.MAX_REVERSE_SPEED
self.used_key = False
else:
self.x_vel = 0
elif event.code == 3:
# right joystick x-axis controls turning
if mag > self.threshold:
self.rot_vel = -self.scale_stick(event.value) * self.MAX_ROT_SPEED
self.used_key = False
else:
self.rot_vel = 0
# Key presses
elif event.type == 1:
if event.value == 1:
# Key down press
if event.code in [293, 547]:
# turn right
self.rot_vel = -self.MAX_ROT_SPEED / 2
self.used_key = True
elif event.code in [292, 544]:
# move forward
self.x_vel = self.MAX_SPEED / 2
self.rot_vel = 0
self.used_key = True
elif event.code in [294, 545]:
# move back
self.x_vel = -self.MAX_REVERSE_SPEED
self.rot_vel = 0
self.used_key = True
elif event.code in [295, 546]:
# turn left
self.rot_vel = self.MAX_ROT_SPEED / 2
self.used_key = True
elif event.code in [302, 303, 304]:
# x key, stop
self.stop = True
self.used_key = True
elif event.code in [301, 308]:
# Only play horn if not already playing
if self.stream is not None and self.wav.tell() == 0:
wav_data = self.wav.readframes(self.chunk)
while wav_data:
self.stream.write(wav_data)
wav_data = self.wav.readframes(self.chunk)
# Reset wav file pointer
self.wav.rewind()
if event.value == 0:
# Key up press
self.stop = True
self.used_key = False
# Construct message if valid command was read
if self.x_vel != 0 or self.rot_vel != 0 or self.stop:
rospy.loginfo("Set x_vel=%f rot_vel=%f stop=%r" % (self.x_vel, self.rot_vel, self.stop))
[docs] def send_cmd(self):
rospy.loginfo("Sending x_vel=%d rot_vel=%d stop=%r" % (self.x_vel, self.rot_vel, self.stop))
if self.rot_vel == 0 and self.x_vel == 0:
# When both joysticks are centered, stop
self.stop = True
elif self.stop:
self.rot_vel = 0
self.x_vel = 0
# Send a new twist if we have a nonzero command or an explicit stop command
twist = Twist()
twist.linear = Vector3(self.x_vel, 0, 0)
twist.angular = Vector3(0, 0, self.rot_vel)
# rospy.loginfo(str(twist))
self.pub.publish(twist)
# Update stopped variable
# if self.stop:
# self.stopped = True
if self.x_vel != 0 or self.rot_vel != 0:
# self.stopped = False
rospy.loginfo_throttle(1, "Sending vx=%f vth=%f" % (self.x_vel, self.rot_vel))
# goal = MoveBaseGoal()
# goal.target_pose.header.frame_id = "base_link"
# goal.target_pose.header.stamp = rospy.get_time()
# goal.target_pose.pose.position = Point(new_x, new_y, 0)
# quat = quaternion_from_euler(0, 0, new_rot)
# goal.target_pose.pose.orientation = Quaternion(math.cos(new_rot), 0, 0, math.sin(new_rot))
# self.action_client.send_goal(goal)
[docs] def handle_read(self):
self.stop = False
for event in self.gamepad.read():
self.process_event(event)
self.send_cmd()
if rospy.is_shutdown():
raise asyncore.ExitNow('ROS is quitting')
if __name__ == "__main__":
sp = None
try:
sp = SixaxisPublisher()
if sp.is_test:
rospy.spin()
else:
asyncore.loop()
except (rospy.ROSInterruptException, asyncore.ExitNow):
sp.stream.stop_stream()
sp.stream.close()
pass