ROSPY Acemyzoe

Rospy参数 & log

# 从rospy设置参数
roscd rospy_tutorials/006_parameters
# 获取一个全局参数
rospy.get_param('/global_param_name')
# 从我们的父命名空间获取一个参数
rospy.get_param('param_name')
# 从我们的私有名称空间获取参数
rospy.get_param('~private_param_name')
# 参数不存在,指定默认值:
rospy.get_param('foo', 'default_value')
# 设置参数
rospy.set_param(param_name, param_value)
rospy.set_param('some_numbers', [1., 2., 3., 4.])
rospy.set_param('truth', True)
rospy.set_param('~private_bar', 1+2)
# 判断删除
if rospy.has_param('to_delete'):
    rospy.delete_param('to_delete')
# 重映射来打印参数的值 
value = rospy.get_param('~foo')
rospy.loginfo('Parameter %s has value %s', rospy.resolve_name('~foo'), value)
# 搜索参数
full_param_name = rospy.search_param('foo')
param_value = rospy.get_param(full_param_name)
# 当您运行多个节点时,很难看到节点的命令行输出。相反,将调试消息发布到rosout然后使用rqt_console/rxconsole查看它要容易得多
rospy.logdebug(msg, *args)
rospy.logwarn(msg, *args)
rospy.loginfo(msg, *args)
rospy.logerr(msg, *args)
rospy.logfatal(msg, *args)
rospy.init_node('my_node', log_level=rospy.DEBUG)
rospy.logerr("%s returned the invalid value %s", other_name, other_value)
# example
def talker():
    topic = 'floats'
    pub = rospy.Publisher(topic, numpy_msg(Floats))
    rospy.init_node(topic, anonymous=True)
    rospy.loginfo("I will publish to the topic %s", topic)
    while not rospy.is_shutdown():
        a = numpy.array([1.0, 2.1, 3.2, 4.3, 5.4, 6.5], dtype=numpy.float32)
        rospy.loginfo(a)
        pub.publish(a)
        rospy.sleep(0.1)

if __name__ == '__main__':
    talker()
    
# rostopic echo rosout
# rqt_console

Makefile

# 编写ROS Python Makefile
# 这些构建文件CMakeLists.txt,Makefile提供的功能:自动生成消息和服务代码,运行测试
cd ~/catkin_ws/src
catkin_create_pkg my_pkg message_generation rospy
cmake_minimum_required(VERSION 2.8.3)
project(my_pkg)
find_package(catkin REQUIRED COMPONENTS message_generation rospy)
add_message_files(
  FILES  # e.g. Floats.msg HeaderString.msg
)
add_service_files(
  DIRECTORY srv 
  FILES AddTwoInts.srv BadTwoInts.srv
)
## Generate services in the 'srv' folder
# add_service_files(
#  FILES  # e.g. Floats.srv HeaderString.srv
#)
## Generate added messages and services with any dependencies
generate_messages()
catkin_package(
  CATKIN_DEPENDS message_runtime
)

安装脚本和导出模块

cd ~/catkin_ws/src/my_pkg    # new catkin package, in the workspace
mkdir bin
mkdir src
mkdir src/tutorial_package
touch src/tutorial_package/__init__.py
# Within my_pkg, create file src/tutorial_package/hello.py
def say(name):
    print('Hello ' + name)
# create file bin/hello
#! /usr/bin/env python
import tutorial_package.hello
if __name__ == '__main__':
    tutorial_package.hello.say('my friend!')
# chmod u+x bin/hello
# create setup.py 定义一个安装过程,将文件移至PYTHONPATH中。
## ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD
from distutils.core import setup
from catkin_pkg.python_setup import generate_distutils_setup
# fetch values from package.xml
# 仅需提供的信息就是package.xml中没有的信息,要安装的脚本的名称,python软件包的名称,在何处可以找到这些软件包以及python软件包的依赖关系
setup_args = generate_distutils_setup(
    packages=['tutorial_package'],
    package_dir={'': 'src'},
)
setup(**setup_args)
## Uncomment if the package has a setup.py
catkin_python_setup()
catkin_install_python(PROGRAMS bin/hello
  DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION})
cd ~/catkin_ws
catkin_make
. devel/setup.bash
rosrun my_pkg hello 

设置pythonpath

cd catkin_ws/src
catkin_create_pkg listener_extend rospy beginner_tutorials
cd listener_extend
mkdir nodes
# create a file listener_extend.py
#!/usr/bin/env python
import beginner_tutorials.msg
num = beginner_tutorials.msg.Num()
print(num)
source ~/catkin_ws/devel/setup.bash
chmod u+x nodes/listener_extend.py
python nodes/listener_extend.py

Numpy

catkin_create_pkg numpy_tutorial rospy rospy_tutorials
<build_depend>python-numpy</build_depend>
<run_depend>python-numpy</run_depend>
# numpy_listener.py
#!/usr/bin/env python
PKG = 'numpy_tutorial'
import roslib; roslib.load_manifest(PKG)
import rospy
from rospy_tutorials.msg import Floats
from rospy.numpy_msg import numpy_msg
def callback(data):
    print rospy.get_name(), "I heard %s"%str(data.data)
def listener():
    rospy.init_node('listener')
    # rospy.Subscriber("floats", Floats, callback)
    rospy.Subscriber("floats", numpy_msg(Floats), callback) # Numpy-ize the Listener
    rospy.spin()
if __name__ == '__main__':
    listener()   
 # chmod +x numpy_listener.py
# terminal 1
roscore
# termianl 2
rosrun numpy_tutorial numpy_listener.py
# termianl 3
rostopic pub -r 1 floats rospy_tutorials/Floats "[1.1, 2.2, 3.3, 4.4, 5.5]"
# numpy_talker.py
#!/usr/bin/env python
PKG = 'numpy_tutorial'
import roslib; roslib.load_manifest(PKG)
import rospy
from rospy.numpy_msg import numpy_msg
from rospy_tutorials.msg import Floats
import numpy
def talker():
    pub = rospy.Publisher('floats', numpy_msg(Floats),queue_size=10)
    rospy.init_node('talker', anonymous=True)
    r = rospy.Rate(10) # 10hz
    while not rospy.is_shutdown():
        a = numpy.array([1.0, 2.1, 3.2, 4.3, 5.4, 6.5], dtype=numpy.float32)
        pub.publish(a)
        r.sleep()
if __name__ == '__main__':
    talker()

more eg

# talker_color.py
#!/usr/bin/env python
import roslib; roslib.load_manifest('beginner_tutorials')
import rospy
from std_msgs.msg import ColorRGBA
def talker():
    #pub = rospy.Publisher('chatter', String)
    pub = rospy.Publisher('chatter_color', ColorRGBA)
    rospy.init_node('talker_color')
    while not rospy.is_shutdown():
        pub.publish(a=1.0)
        rospy.sleep(1.0)
if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException: pass
# listener_color.py
#!/usr/bin/env python
import roslib; roslib.load_manifest('beginner_tutorials')
import rospy
from std_msgs.msg import ColorRGBA
def callback(data):
    rospy.loginfo(rospy.get_name()+ "I heard r=%s g=%s b=%s a=%s", data.r, data.g, data.b, data.a)

def listener():
    rospy.init_node('listener_color', anonymous=True)
    rospy.Subscriber("chatter_color", ColorRGBA, callback)
    rospy.spin()

if __name__ == '__main__':
    listener()
# Python CompressedImage订阅服务器发布者
# 本示例订阅了一个包含sensor_msgs::CompressedImage的ros主题,
# 将CompressedImage转换为numpy.ndarray,
# 然后opencv检测并标记该图像中的特征,
# 最后,它显示并发布新图像-再次作为CompressedImage主题。

#!/usr/bin/env python
"""OpenCV feature detectors with ros CompressedImage Topics in python.

This example subscribes to a ros topic containing sensor_msgs 
CompressedImage. It converts the CompressedImage into a numpy.ndarray, 
then detects and marks features in that image. It finally displays 
and publishes the new image - again as CompressedImage topic.
"""
__author__ =  'Simon Haller <simon.haller at uibk.ac.at>'
__version__=  '0.1'
__license__ = 'BSD'
# Python libs
import sys, time

# numpy and scipy
import numpy as np
from scipy.ndimage import filters

# OpenCV
import cv2

# Ros libraries
import roslib
import rospy

# Ros Messages
from sensor_msgs.msg import CompressedImage
# We do not use cv_bridge it does not support CompressedImage in python
# from cv_bridge import CvBridge, CvBridgeError

VERBOSE=False # True将在命令行上打印一些其他信息(特征检测方法,点数,检测时间)

class image_feature:
    def __init__(self):
        '''Initialize ros publisher, ros subscriber'''
        # topic where we publish
        self.image_pub = rospy.Publisher("/output/image_raw/compressed",
            CompressedImage)
        # self.bridge = CvBridge()
        # subscribed Topic
        self.subscriber = rospy.Subscriber("/camera/image/compressed",
            CompressedImage, self.callback,  queue_size = 1)
        if VERBOSE :
            print "subscribed to /camera/image/compressed"
            
    def callback(self, ros_data):
        '''Callback function of subscribed topic. 
        Here images get converted and features detected'''
        if VERBOSE :
            print 'received image of type: "%s"' % ros_data.format
            
        #### direct conversion to CV2 #### 将压缩的图像直接转换为cv2
        np_arr = np.fromstring(ros_data.data, np.uint8)
        image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
        #image_np = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) # OpenCV >= 3.0:
        
        #### Feature detectors using CV2 #### 选择并创建特征检测器
        # "","Grid","Pyramid" + 
        # "FAST","GFTT","HARRIS","MSER","ORB","SIFT","STAR","SURF"
        method = "GridFAST"
        feat_det = cv2.FeatureDetector_create(method)
        time1 = time.time()

        # convert np image to grayscale 
        featPoints = feat_det.detect(
            cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY))
        time2 = time.time()
        if VERBOSE :
            print '%s detector found: %s points in: %s sec.'%(method,
                len(featPoints),time2-time1)

        for featpoint in featPoints:
            x,y = featpoint.pt
            cv2.circle(image_np,(int(x),int(y)), 3, (0,0,255), -1)
        
        cv2.imshow('cv_img', image_np)
        cv2.waitKey(2)

        #### Create CompressedIamge #### 创建要发布的压缩图像
        msg = CompressedImage()
        msg.header.stamp = rospy.Time.now()
        msg.format = "jpeg"
        msg.data = np.array(cv2.imencode('.jpg', image_np)[1]).tostring()
        # Publish new image
        self.image_pub.publish(msg)
        
        #self.subscriber.unregister()

def main(args):
    '''Initializes and cleanup ros node'''
    ic = image_feature()
    rospy.init_node('image_feature', anonymous=True)
    try:
        rospy.spin()
    except KeyboardInterrupt:
        print "Shutting down ROS Image feature detector module"
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main(sys.argv)