-
Notifications
You must be signed in to change notification settings - Fork 69
/
Copy pathmessage.go
116 lines (90 loc) · 2.31 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"log"
"net/http"
"time"
"github.com/streadway/amqp"
)
type NotifyResponse int
const (
NotifySuccess = 1
NotifyFailure = 0
)
type Message struct {
queueConfig QueueConfig
amqpDelivery *amqp.Delivery // message read from rabbitmq
notifyResponse NotifyResponse // notify result from callback url
}
func (m Message) CurrentMessageRetries() int {
msg := m.amqpDelivery
xDeathArray, ok := msg.Headers["x-death"].([]interface{})
if !ok {
m.Printf("x-death array case fail")
return 0
}
if len(xDeathArray) <= 0 {
return 0
}
for _, h := range xDeathArray {
xDeathItem := h.(amqp.Table)
if xDeathItem["reason"] == "rejected" {
return int(xDeathItem["count"].(int64))
}
}
return 0
}
func (m *Message) Notify(client *http.Client) *Message {
qc := m.queueConfig
msg := m.amqpDelivery
client.Timeout = time.Duration(qc.NotifyTimeoutWithDefault()) * time.Second
statusCode := notifyUrl(client, qc.NotifyUrl(), msg.Body)
m.Printf("notify url %s, result: %d", qc.NotifyUrl(), statusCode)
if statusCode == 200 || statusCode == 201 {
m.notifyResponse = NotifySuccess
} else {
m.notifyResponse = NotifyFailure
}
return m
}
func (m Message) IsMaxRetry() bool {
retries := m.CurrentMessageRetries()
maxRetries := m.queueConfig.RetryTimesWithDefault()
return retries >= maxRetries
}
func (m Message) IsNotifySuccess() bool {
return m.notifyResponse == NotifySuccess
}
func (m Message) Ack() error {
m.Printf("acker: ack message")
err := m.amqpDelivery.Ack(false)
LogOnError(err)
return err
}
func (m Message) Reject() error {
m.Printf("acker: reject message")
err := m.amqpDelivery.Reject(false)
LogOnError(err)
return err
}
func (m Message) Republish(out chan<- Message) error {
m.Printf("acker: ERROR republish message")
out <- m
err := m.amqpDelivery.Ack(false)
LogOnError(err)
return err
}
func (m Message) CloneAndPublish(channel *amqp.Channel) error {
msg := m.amqpDelivery
qc := m.queueConfig
errMsg := cloneToPublishMsg(msg)
err := channel.Publish(qc.ErrorExchangeName(), msg.RoutingKey, false, false, *errMsg)
LogOnError(err)
return err
}
func (m Message) Printf(v ...interface{}) {
msg := m.amqpDelivery
vv := []interface{}{}
vv = append(vv, msg.MessageId, msg.RoutingKey)
vv = append(vv, v[1:]...)
log.Printf("[%s] [%s] "+v[0].(string), vv...)
}