乘风原创程序

  • golang操作rocketmq的示例代码
  • 2022/4/6 11:19:28
  • 下载

    go get github.com/apache/rocketmq-client-go/v2

    代码

    func main() {
    	// 1. 创建主题
    	//createtopic("test-04", 10909)
    	// 2. 生产者向主题中发送消息
    	//sendsyncmessage("hello world0002")
    	// 3. 消费者订阅主题并消费
    	subscribemessage()
    }
    func createtopic(topicname string, port int) {
    	// 创建主题
    	testadmin, err := admin.newadmin(admin.withresolver(primitive.newpassthroughresolver([]string{"ip:server_port"})))
    	if err != nil {
    		fmt.println(err)
    	}
    	err = testadmin.createtopic(
    		context.background(),
    		admin.withtopiccreate(topicname),
    		admin.withbrokeraddrcreate(fmt.sprintf("ip:%d", port)),
    	)
    	fmt.println(err)
    func sendsyncmessage(message string) {
    	endpoint := []string{"ip:server_port"}
    	p, err := rocketmq.newproducer(
    		producer.withnameserver(endpoint),
    		//producer.withnsresolver(primitive.newpassthroughresolver(endpoint)),
    		producer.withretry(2),
    	err = p.start()
    	result, err := p.sendsync(context.background(), &primitive.message{
    		topic: "test",
    		body:  []byte(message),
    	})
    	fmt.println(result.status, result)
    func subscribemessage() {
    	// 订阅主题、并消费
    	c, err := rocketmq.newpushconsumer(
    		consumer.withnameserver(endpoint),
    		consumer.withconsumermodel(consumer.clustering),
    		consumer.withgroupname("gid_test01"),
    		//fmt.println(err)
    	err = c.subscribe("test", consumer.messageselector{}, func(ctx context.context,
    		msgs ...*primitive.messageext) (consumer.consumeresult, error) {
    		for i := range msgs {
    			fmt.printf("subscribe callback: %v \n", msgs[i])
    		}
    		return consumer.consumesuccess, nil
    		//fmt.println(err.error())
    	// note: start after subscribe
    	err = c.start()
    		os.exit(-1)
    	c.shutdown()

    参考文档

    到此这篇关于golang操作rocketmq的示例代码的文章就介绍到这了,更多相关golang操作rocketmq内容请搜索本教程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持本教程网!