EzROS编程示例

1. 话题通信(发布/订阅)

发布者示例:

import ezros

ezros.init()
node = ezros.Node("publisher")
pub = node.create_publisher(ezros.StringMessage, "chatter")
rate = ezros.create_rate(1.0)

count = 0
while ezros.ok():
    msg = ezros.StringMessage()
    msg.data = f"Hello, EzROS! {count}"
    pub.publish(msg)
    node.logger.info(f"发布: {msg.data}")
    count += 1
    rate.sleep()

订阅者示例:

import ezros

def callback(msg):
    print(f"收到消息: {msg.data}")

ezros.init()
node = ezros.Node("subscriber")
sub = node.create_subscriber(ezros.StringMessage, "chatter", callback)
ezros.spin()

2. 服务通信(请求/响应)

服务端示例:

import ezros

def add_callback(a, b):
    return a + b

ezros.init()
node = ezros.Node("add_server")
service = node.create_service("math_service")
service.register_callback("add", add_callback)
ezros.spin()

客户端示例:

import ezros

ezros.init()
node = ezros.Node("add_client")
client = node.create_client("math_service")

# 方法1: 使用 call 方法
result = client.call("add", 5, 3, timeout_sec=5.0)

# 方法2: 使用魔法方法(推荐)
result = client.add(5, 3, timeout_sec=5.0)

if result:
    print(f"计算结果: {result}")
else:
    print("服务调用失败")

# 或者使用全局便捷函数
result = ezros.call_service("math_service", "add", 5, 3, timeout_sec=5.0)

3. 复杂数据传输

import ezros
import numpy as np
import time

# 发布复杂数据
complex_data = {
    "timestamp": time.time(),
    "sensor_array": np.random.rand(100, 100),
    "metadata": {"sensor_id": "cam_01", "version": "1.0"}
}

msg = ezros.BytesMessage(complex_data)
pub.publish(msg)

# 订阅复杂数据
def complex_callback(msg):
    data = msg.get_data()
    print(f"时间戳: {data['timestamp']}")
    print(f"数组形状: {data['sensor_array'].shape}")

4. 消息类型

EzROS 提供了多种内置消息类型:

StringMessage: 字符串消息

BytesMessage: 字节流消息(支持复杂数据和压缩)

IntMessage: 整数消息

FloatMessage: 浮点数消息

BoolMessage: 布尔消息

BytesMessage 支持自动序列化复杂数据和多种压缩算法:

# 基本用法
data = {"key": "value", "numbers": [1, 2, 3]}
msg = ezros.BytesMessage(data)

# 指定压缩算法
msg = ezros.BytesMessage(data, compression='lz4')
# 获取数据
recovered_data = msg.get_data()