Source code for arduino_motor

#!/usr/bin/env python

# modified from http://docs.ros.org/fuerte/api/differential_drive/html/twist__to__motors_8py_source.html
import smbus
import rospy
import struct
from geometry_msgs.msg import Twist, TransformStamped, Quaternion
from std_msgs.msg import UInt8MultiArray, MultiArrayDimension


[docs]class ArduinoMotor: """Sends commands to the Arduino to relay to the motor controller """ def __init__(self): # Initialize the serial port rospy.init_node('arduino_motor', anonymous=True) rospy.loginfo("%s started" % rospy.get_name()) # Get params self.width = float(rospy.get_param("~width", 31.5 * 0.0254)) self.rate = rospy.Rate(int(rospy.get_param("~poll_rate", 10))) self.retry_limit = int(rospy.get_param("~retry_limit", 10)) self.reset_pin = int(rospy.get_param("reset_pin", 4)) self.err_count = 0 try: global GPIO import RPi.GPIO as GPIO # Setup pin 4 as an output pin for resetting the Arduino GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) GPIO.setup(4, GPIO.OUT) self.on_arduino = True except (ImportError, RuntimeError): rospy.logwarn("Not running on Raspberry Pi, so cannot reset Arduino") self.on_arduino = False pass # Reset the Arduino # self.reset_arduino() self.is_virtual = int(rospy.get_param("~is_virtual", 0)) # Setup the i2c bus if self.is_virtual: self.bus = smbus.SMBus(0) self.pub = rospy.Publisher('motorcmd', UInt8MultiArray, queue_size=10) else: self.bus = smbus.SMBus(1) # The address and command byte to use for i2c communication self.address = 0x04 self.move_cmd = ord('m') self.left = 0 self.right = 0 self.stopped = False rospy.Subscriber("cmd_vel", Twist, self.callback)
[docs] def reset_arduino(self): if self.on_arduino: rospy.logerr("Resetting arduino") GPIO.output(self.reset_pin, GPIO.LOW) rospy.sleep(0.1) GPIO.output(self.reset_pin, GPIO.HIGH) rospy.sleep(3.5)
[docs] def callback(self, msg): """Updates the linear and angular velocity instance variables The message definition can be found here: http://docs.ros.org/api/geometry_msgs/html/msg/Twist.html Args: msg (geometry_msgs.msg.Twist): The desired speed """ dx = msg.linear.x dr = msg.angular.z self.right = 1.0 * dx + dr * self.width / 2 self.left = 1.0 * dx - dr * self.width / 2 if dx != 0 or dr != 0: rospy.loginfo_throttle(1, "linear.x = " + str(dx) + " angular.z: " + str(dr))
# send data to arduino
[docs] def send_speed_to_motor(self, m1, m2): """ Args: m1 (float): The left motor's speed in m/s m2 (float): The right motor's speed in m/s """ val = list(bytearray(struct.pack("=ff", m1, m2))) if not self.stopped or m1 != 0 or m2 != 0: if self.is_virtual: self.pub.publish(UInt8MultiArray(data=val)) # rospy.loginfo(str(val)) else: self.bus.write_i2c_block_data(self.address, self.move_cmd, val) if m1 == 0 and m2 == 0: self.stopped = True else: self.stopped = False
[docs] def begin(self): """Start the node and spin forever """ # main loop while not rospy.is_shutdown(): # and self.ticks_since_target < self.timeout_ticks: self.spin_once() self.rate.sleep()
[docs] def spin_once(self): """Sends the speeds to the motor once and retries if it fails """ count = 0 last_err = '' while not rospy.is_shutdown() and count < self.retry_limit: try: if rospy.is_shutdown(): break self.send_speed_to_motor(float(self.left), float(self.right)) rospy.loginfo_throttle(1, "Sent speed successfully") break except IOError as e: count += 1 last_err = str(e) pass if count > 0: self.err_count += count rospy.logwarn("Failed to send speed %d times or %d total: %s" % (count, self.err_count, last_err)) if count == self.retry_limit: self.reset_arduino()
if __name__ == "__main__": try: controller = ArduinoMotor() controller.begin() except rospy.ROSInterruptException: pass