Skip to content

Commit 488c31a

Browse files
author
Guy Baron
authored
Support handling raw message (#138)
1 parent b4d07df commit 488c31a

File tree

8 files changed

+251
-127
lines changed

8 files changed

+251
-127
lines changed

gbus/abstractions.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type BusConfiguration struct {
3030
type Bus interface {
3131
HandlerRegister
3232
Deadlettering
33+
RawMessageHandling
3334
BusSwitch
3435
Messaging
3536
SagaRegister
@@ -129,10 +130,37 @@ type Saga interface {
129130

130131
//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
131132
type Deadlettering interface {
132-
HandleDeadletter(handler DeadLetterMessageHandler)
133+
/*
134+
HandleDeadletter is deprecated use RawMessageHandling.SetGlobalRawMessageHandler instead.
135+
This function will be removed in future grabbit releases
136+
*/
137+
HandleDeadletter(handler RawMessageHandler)
133138
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
134139
}
135140

141+
/*
142+
RawMessageHandling provides the ability to consume and send raq amqp messages with the transactional guarantees
143+
that the bus provides
144+
*/
145+
type RawMessageHandling interface {
146+
/*
147+
SetGlobalRawMessageHandler registers a handler that gets called for each amqp.Delivery that is delivered
148+
        to the service queue.
149+
        The handler will get called with a scoped transaction that is a different transaction than the ones that
150+
        regular message handlers are scoped by as we want the RawMessage handler to get executed even if the amqp.Delivery
151+
        can not be serialized by the bus to one of the registered schemas
152+
153+
        In case a bus has both a raw message handler and regular ones the bus will first call the raw message handler
154+
        and afterward will call any registered message handlers.
155+
        if the global raw handler returns an error the message gets rejected and any additional
156+
        handlers will not be called.
157+
        You should not use the global raw message handler to drive business logic as it breaks the local transactivity
158+
        guarantees grabbit provides and should only be used in specialized cases.
159+
        If you do decide to use this feature try not shooting yourself in the foot.
160+
*/
161+
SetGlobalRawMessageHandler(handler RawMessageHandler)
162+
}
163+
136164
//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
137165
type RequestSagaTimeout interface {
138166
TimeoutDuration() time.Duration

gbus/bus.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ type DefaultBus struct {
4343
amqpOutbox *AMQPOutbox
4444

4545
RPCHandlers map[string]MessageHandler
46-
deadletterHandler DeadLetterMessageHandler
46+
deadletterHandler RawMessageHandler
47+
globalRawHandler RawMessageHandler
4748
HandlersLock *sync.Mutex
4849
RPCLock *sync.Mutex
4950
SenderLock *sync.Mutex
@@ -73,8 +74,8 @@ var (
7374
//BaseRetryDuration defines the basic milliseconds that the retry algorithm uses
7475
//for a random retry time. Default is 10 but it is configurable.
7576
BaseRetryDuration = 10 * time.Millisecond
76-
//RpcHeaderName used to define the header in grabbit for RPC
77-
RpcHeaderName = "x-grabbit-msg-rpc-id"
77+
//RPCHeaderName used to define the header in grabbit for RPC
78+
RPCHeaderName = "x-grabbit-msg-rpc-id"
7879
)
7980

8081
func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
@@ -286,6 +287,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
286287
rpcLock: b.RPCLock,
287288
rpcHandlers: b.RPCHandlers,
288289
deadletterHandler: b.deadletterHandler,
290+
globalRawHandler: b.globalRawHandler,
289291
handlersLock: &sync.Mutex{},
290292
registrations: b.Registrations,
291293
serializer: b.Serializer,
@@ -547,11 +549,17 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
547549
return b.registerHandlerImpl(exchange, topic, event, handler)
548550
}
549551

550-
//HandleDeadletter implements GBus.HandleDeadletter
551-
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
552+
//HandleDeadletter implements Deadlettering.HandleDeadletter
553+
func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler) {
552554
b.registerDeadLetterHandler(handler)
553555
}
554556

557+
//HandleDeadletter implements RawMessageHandling.SetGlobalRawMessageHandler
558+
func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler) {
559+
metrics.AddHandlerMetrics(handler.Name())
560+
b.globalRawHandler = handler
561+
}
562+
555563
//ReturnDeadToQueue returns a message to its original destination
556564
func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error {
557565
return b.returnDeadToQueue(ctx, nil, publishing)
@@ -692,7 +700,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
692700
return nil
693701
}
694702

695-
func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
703+
func (b *DefaultBus) registerDeadLetterHandler(handler RawMessageHandler) {
696704
metrics.AddHandlerMetrics(handler.Name())
697705
b.deadletterHandler = handler
698706
}
@@ -706,7 +714,7 @@ type rpcPolicy struct {
706714
}
707715

708716
func (p rpcPolicy) Apply(publishing *amqp.Publishing) {
709-
publishing.Headers[RpcHeaderName] = p.rpcID
717+
publishing.Headers[RPCHeaderName] = p.rpcID
710718
}
711719

712720
//Log returns the default logrus.FieldLogger for the bus via the Glogged helper

gbus/message_handler.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,26 @@ package gbus
22

33
import (
44
"database/sql"
5-
"github.com/streadway/amqp"
65
"reflect"
76
"runtime"
87
"strings"
8+
9+
"github.com/streadway/amqp"
910
)
1011

1112
//MessageHandler signature for all command handlers
1213
type MessageHandler func(invocation Invocation, message *BusMessage) error
1314

14-
//DeadLetterMessageHandler signature for dead letter handler
15-
type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error
15+
//RawMessageHandler signature for handlers that handle raw amqp deliveries
16+
type RawMessageHandler func(tx *sql.Tx, delivery *amqp.Delivery) error
1617

1718
//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
1819
func (mg MessageHandler) Name() string {
1920
return nameFromFunc(mg)
2021
}
2122

2223
//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
23-
func (dlmg DeadLetterMessageHandler) Name() string {
24+
func (dlmg RawMessageHandler) Name() string {
2425
return nameFromFunc(dlmg)
2526
}
2627

gbus/messages.go

+27-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package gbus
22

33
import (
4+
"errors"
5+
"fmt"
6+
47
"github.com/opentracing/opentracing-go/log"
58
"github.com/rs/xid"
69
"github.com/streadway/amqp"
@@ -27,11 +30,28 @@ func NewBusMessage(payload Message) *BusMessage {
2730
return bm
2831
}
2932

30-
//NewFromAMQPHeaders creates a BusMessage from headers of an amqp message
31-
func NewFromAMQPHeaders(headers amqp.Table) *BusMessage {
33+
//NewFromDelivery creates a BusMessage from an amqp delivery
34+
func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error) {
3235
bm := &BusMessage{}
33-
bm.SetFromAMQPHeaders(headers)
34-
return bm
36+
bm.SetFromAMQPHeaders(delivery)
37+
38+
bm.ID = delivery.MessageId
39+
bm.CorrelationID = delivery.CorrelationId
40+
if delivery.Exchange != "" {
41+
bm.Semantics = EVT
42+
} else {
43+
bm.Semantics = CMD
44+
}
45+
if bm.PayloadFQN == "" || bm.Semantics == "" {
46+
errMsg := fmt.Sprintf("missing critical headers. message_name:%s semantics: %s", bm.PayloadFQN, bm.Semantics)
47+
return nil, errors.New(errMsg)
48+
}
49+
return bm, nil
50+
}
51+
52+
//GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery
53+
func GetMessageName(delivery amqp.Delivery) string {
54+
return castToString(delivery.Headers["x-msg-name"])
3555
}
3656

3757
//GetAMQPHeaders convert to AMQP headers Table everything but a payload
@@ -46,12 +66,12 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
4666
}
4767

4868
//SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
49-
func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table) {
50-
69+
func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) {
70+
headers := delivery.Headers
5171
bm.SagaID = castToString(headers["x-msg-saga-id"])
5272
bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"])
5373
bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"])
54-
bm.PayloadFQN = castToString(headers["x-msg-name"])
74+
bm.PayloadFQN = GetMessageName(delivery)
5575

5676
}
5777

0 commit comments

Comments
 (0)