删除文章

确定要删除这篇文章吗?

取消
确定

golang与qpid broker通信

     阅读(74)  2020-04-29 01:33:14

这里我用的是github.com/Azure/go-amqp这个库,它支持AMQP 1.0协议,是纯go语言实现的。qpid用的是1.39.0版本直接启动的话它是不支持AMQP 1.0协议的,需要加载amqp.so库(加载方法)。

下面演示使用sender和receiver的简单例子。

receiver

接收队列examples的消息直接丢弃:go run main.go -queue examples
接收队列examples的10个消息:go run main.go -queue examples -count 10

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "time"

    "github.com/Azure/go-amqp"
)
 const ADDR = "amqp://118.24.114.114"
func main() {
    queueName := flag.String("queue", "", "queue name")
    receiveCount := flag.Int("count", 0, "limit receive count")
    flag.Parse()

    if *queueName == "" {
        log.Fatalln("Error queue is empty")
    }

    client, err := amqp.Dial(ADDR)
    if err != nil {
        log.Fatalln("Dialing AMQP server:", err)
    }
    defer client.Close()

    // Open a session
    session, err := client.NewSession()
    if err != nil {
        log.Fatal("Creating AMQP session:", err)
    }

    ctx := context.Background()
    // Create a receiver
    receiver, err := session.NewReceiver(
        amqp.LinkSourceAddress(*queueName),
        amqp.LinkCredit(10),
    )
    if err != nil {
        log.Fatal("Creating receiver link:", err)
    }
    defer func() {
        ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
        receiver.Close(ctx)
        cancel()
    }()

    count := 0
    for {
        if *receiveCount > 0 && count >= *receiveCount {
            fmt.Println("receive finished count:", count)
            return
        }

        count += 1
        // Receive next message
        msg, err := receiver.Receive(ctx)
        if err != nil {
            log.Fatal("Reading message from AMQP:", err)
        }

        // Accept message
        msg.Accept()
        fmt.Printf("\rreceive:%d", count)
    }
    fmt.Println("receive finished!")
}

sender

向队列examples发送消息:go run main.go -queue examples -msg hello
向队列examples发送消息重复10次:go run main.go -queue examples -msg hello -repeat 10
向队列examples发送消息指定subject:go run main.go -queue examples -msg hello -subject news

package main

import (
    "context"
    "flag"
    "fmt"
    "log"

    "github.com/Azure/go-amqp"
)

const ADDR = "amqp://118.24.114.114"
func main() {
    queueName := flag.String("queue", "", "queue name")
    subject := flag.String("subject", "", "msg subject")
    message := flag.String("msg", "", "send message")
    repeat := flag.Int("repeat", 1, "send repeat count")
    flag.Parse()

    if *queueName == "" {
        log.Fatalln("Error queue is empty")
    }
    if *message == "" {
        log.Fatalln("Error send message is empty")
    }

    client, err := amqp.Dial(ADDR)
    if err != nil {
        log.Fatalln("Dialing AMQP server:", err)
    }
    defer client.Close()

    // Open a session
    session, err := client.NewSession()
    if err != nil {
        log.Fatal("Creating AMQP session:", err)
    }

    ctx := context.Background()
    // Create a sender
    sender, err := session.NewSender(
        amqp.LinkTargetAddress(*queueName),
    )
    if err != nil {
        log.Fatal("Creating sender link:", err)
    }

    //ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    // Send message
    count := 0
    for i := 0; i < *repeat; i++ {
        msg := amqp.NewMessage([]byte(*message))
        if *subject != "" {
            msg.Properties = &amqp.MessageProperties{}
            msg.Properties.Subject = *subject
        }
        err = sender.Send(ctx, msg)
        if err != nil {
            log.Fatal("Sending message:", err)
        }
        count += 1
        fmt.Printf("\r%d", count)
    }

    sender.Close(ctx)
    //cancel()
    fmt.Println("\nsend finished")
}

请求应答

发送一个请求,然后通过一个动态地址接收应答

// handler.go
package handler

import (
    "context"
    "fmt"
    "github.com/Azure/go-amqp"
    "log"
    "sync"
    "time"
)

type TimeoutWait struct {
    wg sync.WaitGroup
    done chan struct{}
    timeout time.Duration
}

func NewTimeoutWait(timeout time.Duration)*TimeoutWait {
    return &TimeoutWait{
        wg: sync.WaitGroup{},
        done: make(chan struct{}),
        timeout: timeout,
    }
}

func(tw *TimeoutWait)Add(delta int) {
    tw.wg.Add(delta)
}

func(tw *TimeoutWait)Done() {
    tw.wg.Done()
}

func(tw *TimeoutWait)Wait() {
    go func() {
        tw.wg.Wait()
        close(tw.done)
    }()
    select {
    case <- tw.done:
    case <-time.After(tw.timeout):
    }
}

type handler struct {
    addr string
    client *amqp.Client
    session *amqp.Session
    ctx context.Context
    senders map[string]*amqp.Sender
    mux sync.Mutex
    exit *TimeoutWait
}

type OnMessage func(message *amqp.Message, err error)

func New(addr string) *handler {
    client, err := amqp.Dial(addr)
    if err != nil {
        log.Fatal(err)
    }

    session, err := client.NewSession()
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    senders := make(map[string]*amqp.Sender)
    return &handler{
        addr: addr,
        client: client,
        session: session,
        ctx: ctx,
        senders:senders,
        exit: NewTimeoutWait(5 * time.Second),
    }
}

func(h *handler)Close() {
    if h == nil {
        return
    }

    fmt.Println("close start")
    h.exit.Wait()

    for _,v := range h.senders {
        v.Close(h.ctx)
    }
    h.senders = nil
    h.session.Close(h.ctx)
    fmt.Println("close end")
}

func(h *handler)getSender(address string) (*amqp.Sender, error) {
    h.mux.Lock()
    defer h.mux.Unlock()
    if sender, ok := h.senders[address]; ok {
        return sender, nil
    }
    sender, err := h.session.NewSender(amqp.LinkTargetAddress(address))
    if err != nil {
        return nil, err
    }
    h.senders[address] = sender
    return sender, nil
}

func(h *handler)Send(queue string, msg *amqp.Message, onMessage OnMessage) error {
    receiver, err := h.session.NewReceiver(amqp.LinkSourceAddress(""), amqp.LinkAddressDynamic())
    if err != nil {
        return err
    }

    msg.Properties = &amqp.MessageProperties{ReplyTo: receiver.Address()}
    sender, err := h.getSender(queue)
    if err != nil {
        return err
    }

    go func() {
        h.exit.Add(1)
        defer h.exit.Done()

        fmt.Println("receiver start")
        ctx, cancel := context.WithTimeout(h.ctx, 3 * time.Second)
        defer func() {
            fmt.Println("receiver end")
            receiver.Close(h.ctx)
            cancel()
        }()
        rspMsg, err := receiver.Receive(ctx)
        onMessage(rspMsg, err)
    }()

    return sender.Send(h.ctx, msg)
}

// main.go
package main

import (
    "fmt"
    "github.com/Azure/go-amqp"
    "gotest/handler"
    "log"
)

func main() {
    handler := handler.New("amqp://118.24.114.114")
    defer handler.Close()
    if handler == nil {
        panic("handler create failed")
    }
    msg := amqp.NewMessage([]byte("hello"))
    err := handler.Send("acc_queue_echo", msg, func(rspMsg *amqp.Message, err error) {
        if err != nil {
            log.Fatal(err)
        }
        fmt.Println("received message:", string(rspMsg.GetData()))
    })
    if err != nil {
        panic(err)
    }
    fmt.Println("success")
}



文章评论

Keep it simple,stupid
文章数
329
今日访问
2919
今日IP数
223
最近评论

liangzi: 不错 谢谢分享
tujiaw: registerThreadInactive:如果当前没有激活的线程,就去激活线程,让等待的线程去执行任务。
hgzzx: 佩服佩服。 请教:registerThreadInactive的作用是什么?
xuehaoyun: 很不错,来围观一下
tujiaw: 抱歉csdn code服务关闭了,这个代码我也找不到了
于淞: 你好,这个文章的源码能分享一下吗,songsong9181@163.com,谢谢了 上面的写错了
回到顶部