25 Jul 2020
7923字
27分
次
ros
CC BY 4.0
(除特别声明或转载文章外)
Rospy参数 & log
# 从rospy设置参数
roscd rospy_tutorials/006_parameters
# 获取一个全局参数
rospy.get_param('/global_param_name')
# 从我们的父命名空间获取一个参数
rospy.get_param('param_name')
# 从我们的私有名称空间获取参数
rospy.get_param('~private_param_name')
# 参数不存在,指定默认值:
rospy.get_param('foo', 'default_value')
# 设置参数
rospy.set_param(param_name, param_value)
rospy.set_param('some_numbers', [1., 2., 3., 4.])
rospy.set_param('truth', True)
rospy.set_param('~private_bar', 1+2)
# 判断删除
if rospy.has_param('to_delete'):
rospy.delete_param('to_delete')
# 重映射来打印参数的值
value = rospy.get_param('~foo')
rospy.loginfo('Parameter %s has value %s', rospy.resolve_name('~foo'), value)
# 搜索参数
full_param_name = rospy.search_param('foo')
param_value = rospy.get_param(full_param_name)
# 当您运行多个节点时,很难看到节点的命令行输出。相反,将调试消息发布到rosout然后使用rqt_console/rxconsole查看它要容易得多
rospy.logdebug(msg, *args)
rospy.logwarn(msg, *args)
rospy.loginfo(msg, *args)
rospy.logerr(msg, *args)
rospy.logfatal(msg, *args)
rospy.init_node('my_node', log_level=rospy.DEBUG)
rospy.logerr("%s returned the invalid value %s", other_name, other_value)
# example
def talker():
topic = 'floats'
pub = rospy.Publisher(topic, numpy_msg(Floats))
rospy.init_node(topic, anonymous=True)
rospy.loginfo("I will publish to the topic %s", topic)
while not rospy.is_shutdown():
a = numpy.array([1.0, 2.1, 3.2, 4.3, 5.4, 6.5], dtype=numpy.float32)
rospy.loginfo(a)
pub.publish(a)
rospy.sleep(0.1)
if __name__ == '__main__':
talker()
# rostopic echo rosout
# rqt_console
Makefile
# 编写ROS Python Makefile
# 这些构建文件CMakeLists.txt,Makefile提供的功能:自动生成消息和服务代码,运行测试
cd ~/catkin_ws/src
catkin_create_pkg my_pkg message_generation rospy
cmake_minimum_required(VERSION 2.8.3)
project(my_pkg)
find_package(catkin REQUIRED COMPONENTS message_generation rospy)
add_message_files(
FILES # e.g. Floats.msg HeaderString.msg
)
add_service_files(
DIRECTORY srv
FILES AddTwoInts.srv BadTwoInts.srv
)
## Generate services in the 'srv' folder
# add_service_files(
# FILES # e.g. Floats.srv HeaderString.srv
#)
## Generate added messages and services with any dependencies
generate_messages()
catkin_package(
CATKIN_DEPENDS message_runtime
)
安装脚本和导出模块
cd ~/catkin_ws/src/my_pkg # new catkin package, in the workspace
mkdir bin
mkdir src
mkdir src/tutorial_package
touch src/tutorial_package/__init__.py
# Within my_pkg, create file src/tutorial_package/hello.py
def say(name):
print('Hello ' + name)
# create file bin/hello
#! /usr/bin/env python
import tutorial_package.hello
if __name__ == '__main__':
tutorial_package.hello.say('my friend!')
# chmod u+x bin/hello
# create setup.py 定义一个安装过程,将文件移至PYTHONPATH中。
## ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD
from distutils.core import setup
from catkin_pkg.python_setup import generate_distutils_setup
# fetch values from package.xml
# 仅需提供的信息就是package.xml中没有的信息,要安装的脚本的名称,python软件包的名称,在何处可以找到这些软件包以及python软件包的依赖关系
setup_args = generate_distutils_setup(
packages=['tutorial_package'],
package_dir={'': 'src'},
)
setup(**setup_args)
## Uncomment if the package has a setup.py
catkin_python_setup()
catkin_install_python(PROGRAMS bin/hello
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION})
cd ~/catkin_ws
catkin_make
. devel/setup.bash
rosrun my_pkg hello
设置pythonpath
cd catkin_ws/src
catkin_create_pkg listener_extend rospy beginner_tutorials
cd listener_extend
mkdir nodes
# create a file listener_extend.py
#!/usr/bin/env python
import beginner_tutorials.msg
num = beginner_tutorials.msg.Num()
print(num)
source ~/catkin_ws/devel/setup.bash
chmod u+x nodes/listener_extend.py
python nodes/listener_extend.py
Numpy
catkin_create_pkg numpy_tutorial rospy rospy_tutorials
<build_depend>python-numpy</build_depend>
<run_depend>python-numpy</run_depend>
# numpy_listener.py
#!/usr/bin/env python
PKG = 'numpy_tutorial'
import roslib; roslib.load_manifest(PKG)
import rospy
from rospy_tutorials.msg import Floats
from rospy.numpy_msg import numpy_msg
def callback(data):
print rospy.get_name(), "I heard %s"%str(data.data)
def listener():
rospy.init_node('listener')
# rospy.Subscriber("floats", Floats, callback)
rospy.Subscriber("floats", numpy_msg(Floats), callback) # Numpy-ize the Listener
rospy.spin()
if __name__ == '__main__':
listener()
# chmod +x numpy_listener.py
# terminal 1
roscore
# termianl 2
rosrun numpy_tutorial numpy_listener.py
# termianl 3
rostopic pub -r 1 floats rospy_tutorials/Floats "[1.1, 2.2, 3.3, 4.4, 5.5]"
# numpy_talker.py
#!/usr/bin/env python
PKG = 'numpy_tutorial'
import roslib; roslib.load_manifest(PKG)
import rospy
from rospy.numpy_msg import numpy_msg
from rospy_tutorials.msg import Floats
import numpy
def talker():
pub = rospy.Publisher('floats', numpy_msg(Floats),queue_size=10)
rospy.init_node('talker', anonymous=True)
r = rospy.Rate(10) # 10hz
while not rospy.is_shutdown():
a = numpy.array([1.0, 2.1, 3.2, 4.3, 5.4, 6.5], dtype=numpy.float32)
pub.publish(a)
r.sleep()
if __name__ == '__main__':
talker()
more eg
# talker_color.py
#!/usr/bin/env python
import roslib; roslib.load_manifest('beginner_tutorials')
import rospy
from std_msgs.msg import ColorRGBA
def talker():
#pub = rospy.Publisher('chatter', String)
pub = rospy.Publisher('chatter_color', ColorRGBA)
rospy.init_node('talker_color')
while not rospy.is_shutdown():
pub.publish(a=1.0)
rospy.sleep(1.0)
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException: pass
# listener_color.py
#!/usr/bin/env python
import roslib; roslib.load_manifest('beginner_tutorials')
import rospy
from std_msgs.msg import ColorRGBA
def callback(data):
rospy.loginfo(rospy.get_name()+ "I heard r=%s g=%s b=%s a=%s", data.r, data.g, data.b, data.a)
def listener():
rospy.init_node('listener_color', anonymous=True)
rospy.Subscriber("chatter_color", ColorRGBA, callback)
rospy.spin()
if __name__ == '__main__':
listener()
# Python CompressedImage订阅服务器发布者
# 本示例订阅了一个包含sensor_msgs::CompressedImage的ros主题,
# 将CompressedImage转换为numpy.ndarray,
# 然后opencv检测并标记该图像中的特征,
# 最后,它显示并发布新图像-再次作为CompressedImage主题。
#!/usr/bin/env python
"""OpenCV feature detectors with ros CompressedImage Topics in python.
This example subscribes to a ros topic containing sensor_msgs
CompressedImage. It converts the CompressedImage into a numpy.ndarray,
then detects and marks features in that image. It finally displays
and publishes the new image - again as CompressedImage topic.
"""
__author__ = 'Simon Haller <simon.haller at uibk.ac.at>'
__version__= '0.1'
__license__ = 'BSD'
# Python libs
import sys, time
# numpy and scipy
import numpy as np
from scipy.ndimage import filters
# OpenCV
import cv2
# Ros libraries
import roslib
import rospy
# Ros Messages
from sensor_msgs.msg import CompressedImage
# We do not use cv_bridge it does not support CompressedImage in python
# from cv_bridge import CvBridge, CvBridgeError
VERBOSE=False # True将在命令行上打印一些其他信息(特征检测方法,点数,检测时间)
class image_feature:
def __init__(self):
'''Initialize ros publisher, ros subscriber'''
# topic where we publish
self.image_pub = rospy.Publisher("/output/image_raw/compressed",
CompressedImage)
# self.bridge = CvBridge()
# subscribed Topic
self.subscriber = rospy.Subscriber("/camera/image/compressed",
CompressedImage, self.callback, queue_size = 1)
if VERBOSE :
print "subscribed to /camera/image/compressed"
def callback(self, ros_data):
'''Callback function of subscribed topic.
Here images get converted and features detected'''
if VERBOSE :
print 'received image of type: "%s"' % ros_data.format
#### direct conversion to CV2 #### 将压缩的图像直接转换为cv2
np_arr = np.fromstring(ros_data.data, np.uint8)
image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
#image_np = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) # OpenCV >= 3.0:
#### Feature detectors using CV2 #### 选择并创建特征检测器
# "","Grid","Pyramid" +
# "FAST","GFTT","HARRIS","MSER","ORB","SIFT","STAR","SURF"
method = "GridFAST"
feat_det = cv2.FeatureDetector_create(method)
time1 = time.time()
# convert np image to grayscale
featPoints = feat_det.detect(
cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY))
time2 = time.time()
if VERBOSE :
print '%s detector found: %s points in: %s sec.'%(method,
len(featPoints),time2-time1)
for featpoint in featPoints:
x,y = featpoint.pt
cv2.circle(image_np,(int(x),int(y)), 3, (0,0,255), -1)
cv2.imshow('cv_img', image_np)
cv2.waitKey(2)
#### Create CompressedIamge #### 创建要发布的压缩图像
msg = CompressedImage()
msg.header.stamp = rospy.Time.now()
msg.format = "jpeg"
msg.data = np.array(cv2.imencode('.jpg', image_np)[1]).tostring()
# Publish new image
self.image_pub.publish(msg)
#self.subscriber.unregister()
def main(args):
'''Initializes and cleanup ros node'''
ic = image_feature()
rospy.init_node('image_feature', anonymous=True)
try:
rospy.spin()
except KeyboardInterrupt:
print "Shutting down ROS Image feature detector module"
cv2.destroyAllWindows()
if __name__ == '__main__':
main(sys.argv)