Skip to content

Commit b4d07df

Browse files
author
Guy Baron
authored
Improved tracing and added documentation (#142)
1 parent a916269 commit b4d07df

File tree

7 files changed

+86
-32
lines changed

7 files changed

+86
-32
lines changed

README.md

+2-5
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,8 @@ A lightweight transactional message bus on top of RabbitMQ supporting:
2121
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
2222
6) Deadlettering
2323
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
24-
8) Reporting [metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
25-
26-
Planned:
27-
28-
1) Deduplication of inbound messages
24+
8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
25+
9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing
2926

3027
## Stable release
3128
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.

docs/TRACING.md

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Tracing
2+
3+
grabbit supports reporting standard [OpenTracing](https://opentracing.io/) tracing spans to a compatable OpenTracing backend (such as [Jaeger](https://www.jaegertracing.io/)).
4+
5+
NOTE: In your hosting process you will need to set up a global tracer to collect and forward the traces reported by grabbit. See Jaeger go client for an [example](https://github.com/jaegertracing/jaeger-client-go)
6+
7+
Once the global tracer is set up you will need to make sure that in your message handlers you carry over the passed in context to successive messages sent by the handler.
8+
9+
```go
10+
11+
func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
12+
reply := gbus.NewBusMessage(MyReply{})
13+
cmd := gbus.NewBusMessage(MyCommand{})
14+
ctx := invocation.Ctx()
15+
16+
if err := invocation.Send(ctx, "another-service", cmd); err != nil{
17+
return err
18+
}
19+
if err := invocation.Reply(ctx, reply); err != nil{
20+
return err
21+
}
22+
return nil
23+
}
24+
25+
```

gbus/bus.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,8 @@ func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp.
623623
func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) {
624624
b.SenderLock.Lock()
625625
defer b.SenderLock.Unlock()
626-
span, _ := opentracing.StartSpanFromContext(sctx, "sendImpl")
626+
span, _ := opentracing.StartSpanFromContext(sctx, "SendMessage")
627+
627628
defer func() {
628629
if err := recover(); err != nil {
629630
errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack())

gbus/saga/glue.go

+26-9
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package saga
22

33
import (
4+
"context"
45
"database/sql"
56
"errors"
67
"fmt"
78
"reflect"
89
"strings"
910
"sync"
1011

12+
"github.com/opentracing/opentracing-go"
13+
slog "github.com/opentracing/opentracing-go/log"
1114
"github.com/sirupsen/logrus"
1215
"github.com/wework/grabbit/gbus"
1316
"github.com/wework/grabbit/gbus/metrics"
@@ -98,7 +101,8 @@ func (imsm *Glue) getDefsForMsgName(msgName string) []*Def {
98101
return defs
99102
}
100103

101-
func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) error {
104+
//SagaHandler is the generic handler invoking saga instances
105+
func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessage) error {
102106

103107
imsm.lock.Lock()
104108
defer imsm.lock.Unlock()
@@ -117,11 +121,12 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
117121
*/
118122
startNew := def.shouldStartNewSaga(message)
119123
if startNew {
124+
120125
newInstance := def.newInstance()
121126
imsm.Log().
122127
WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}).
123128
Info("created new saga")
124-
if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil {
129+
if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil {
125130
imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga")
126131
return invkErr
127132
}
@@ -154,7 +159,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
154159
return e
155160
}
156161
def.configureSaga(instance)
157-
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
162+
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
158163
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
159164
return invkErr
160165
}
@@ -176,7 +181,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
176181

177182
for _, instance := range instances {
178183
def.configureSaga(instance)
179-
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
184+
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
180185
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
181186
return invkErr
182187
}
@@ -191,13 +196,16 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
191196
return nil
192197
}
193198

194-
func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {
199+
func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {
200+
201+
span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), def.String())
202+
defer span.Finish()
195203
sginv := &sagaInvocation{
196204
decoratedBus: invocation.Bus(),
197205
decoratedInvocation: invocation,
198206
inboundMsg: message,
199207
sagaID: instance.ID,
200-
ctx: invocation.Ctx(),
208+
ctx: sctx,
201209
invokingService: imsm.svcName,
202210
}
203211
sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{
@@ -207,7 +215,11 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat
207215
}))
208216

209217
exchange, routingKey := invocation.Routing()
210-
return instance.invoke(exchange, routingKey, sginv, message)
218+
err := instance.invoke(exchange, routingKey, sginv, message)
219+
if err != nil {
220+
span.LogFields(slog.Error(err))
221+
}
222+
return err
211223
}
212224

213225
func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {
@@ -232,7 +244,7 @@ func (imsm *Glue) registerMessage(message gbus.Message) error {
232244
return nil
233245
}
234246
imsm.alreadyRegistred[message.SchemaName()] = true
235-
return imsm.bus.HandleMessage(message, imsm.handler)
247+
return imsm.bus.HandleMessage(message, imsm.SagaHandler)
236248
}
237249

238250
func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) error {
@@ -241,7 +253,7 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro
241253
return nil
242254
}
243255
imsm.alreadyRegistred[event.SchemaName()] = true
244-
return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler)
256+
return imsm.bus.HandleEvent(exchange, topic, event, imsm.SagaHandler)
245257
}
246258

247259
//TimeoutSaga fetches a saga instance and calls its timeout interface
@@ -257,7 +269,12 @@ func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error {
257269
if err != nil {
258270
return err
259271
}
272+
273+
span, _ := opentracing.StartSpanFromContext(context.Background(), "SagaTimeout")
274+
span.SetTag("saga_type", saga.String())
275+
defer span.Finish()
260276
timeoutErr := saga.timeout(tx, imsm.bus)
277+
261278
if timeoutErr != nil {
262279
imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
263280
return timeoutErr

gbus/saga/instance.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import (
66
"reflect"
77
"time"
88

9-
"github.com/sirupsen/logrus"
10-
"github.com/wework/grabbit/gbus/metrics"
11-
9+
"github.com/opentracing/opentracing-go"
1210
"github.com/rs/xid"
11+
"github.com/sirupsen/logrus"
1312
"github.com/wework/grabbit/gbus"
13+
"github.com/wework/grabbit/gbus/metrics"
1414
)
1515

1616
//Instance represent a living instance of a saga of a particular definition
@@ -22,7 +22,7 @@ type Instance struct {
2222
Log logrus.FieldLogger
2323
}
2424

25-
func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error {
25+
func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocation, message *gbus.BusMessage) error {
2626

2727
methodsToInvoke := si.getSagaMethodNameToInvoke(exchange, routingKey, message)
2828

@@ -48,6 +48,13 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati
4848
"method_name": methodName, "saga_id": si.ID,
4949
}).Info("invoking method on saga")
5050

51+
span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), methodName)
52+
// replace the original context with the conext built arround the span so we ca
53+
// trace the saga handler that is invoked
54+
invocation.ctx = sctx
55+
56+
defer span.Finish()
57+
5158
err := metrics.RunHandlerWithMetric(func() error {
5259
returns := method.Call(params)
5360

gbus/saga/instance_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package saga
22

33
import (
4+
"context"
45
"errors"
56
"reflect"
67
"testing"
@@ -19,7 +20,9 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) {
1920
m2 := TestMsg2{}
2021

2122
exchange, routingKey := "", "kong"
22-
invocationStub := &sagaInvocation{}
23+
invocationStub := &sagaInvocation{
24+
ctx: context.Background(),
25+
}
2326

2427
failName := gbus.MessageHandler(s.Fail).Name()
2528
failFilter := gbus.NewMessageFilter(exchange, routingKey, m1)

gbus/worker.go

+16-12
Original file line numberDiff line numberDiff line change
@@ -241,16 +241,20 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
241241
}
242242

243243
func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {
244-
var ctx context.Context
244+
245+
rootCtx := context.Background()
245246
var spanOptions []opentracing.StartSpanOption
246247

247248
spCtx, err := amqptracer.Extract(delivery.Headers)
249+
248250
if err != nil {
249251
worker.log().WithError(err).Debug("could not extract SpanContext from headers")
250252
} else {
251253
spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx))
252254
}
253-
worker.span, ctx = opentracing.StartSpanFromContext(context.Background(), "processMessage", spanOptions...)
255+
span, ctx := opentracing.StartSpanFromContext(rootCtx, "ProcessMessage", spanOptions...)
256+
worker.span = span
257+
//defer worker.span.Finish()
254258

255259
//catch all error handling so goroutine will not crash
256260
defer func() {
@@ -265,7 +269,6 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {
265269
worker.span.LogFields(slog.String("panic", "failed to process message"))
266270
logEntry.Error("failed to process message")
267271
}
268-
worker.span.Finish()
269272
}()
270273

271274
worker.log().WithFields(logrus.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG")
@@ -318,15 +321,17 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
318321

319322
action := func(attempt uint) (actionErr error) {
320323

324+
attemptSpan, sctx := opentracing.StartSpanFromContext(sctx, "InvokeHandler")
325+
defer attemptSpan.Finish()
326+
321327
tx, txCreateErr := worker.txProvider.New()
322328
if txCreateErr != nil {
323329
worker.log().WithError(txCreateErr).Error("failed creating new tx")
324-
worker.span.LogFields(slog.Error(txCreateErr))
330+
attemptSpan.LogFields(slog.Error(txCreateErr))
325331
return txCreateErr
326332
}
327333

328-
worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers")
329-
worker.span.LogFields(slog.Uint64("attempt", uint64(attempt+1)))
334+
attemptSpan.LogFields(slog.Uint64("attempt", uint64(attempt+1)))
330335
defer func() {
331336
if p := recover(); p != nil {
332337
pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack())
@@ -341,10 +346,9 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
341346
worker.span.Finish()
342347
}()
343348
var handlerErr error
344-
var hspan opentracing.Span
345-
var hsctx context.Context
349+
346350
for _, handler := range handlers {
347-
hspan, hsctx = opentracing.StartSpanFromContext(sctx, handler.Name())
351+
hspan, hsctx := opentracing.StartSpanFromContext(sctx, handler.Name())
348352

349353
ctx := &defaultInvocationContext{
350354
invocingSvc: delivery.ReplyTo,
@@ -364,22 +368,22 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
364368
return handler(ctx, message)
365369
}, handler.Name(), worker.log())
366370
if handlerErr != nil {
367-
hspan.LogFields(slog.Error(handlerErr))
368371
break
369372
}
370373
hspan.Finish()
371374
}
372375
if handlerErr != nil {
373-
hspan.LogFields(slog.Error(handlerErr))
376+
attemptSpan.LogFields(slog.Error(handlerErr))
374377
rbkErr := tx.Rollback()
375378
if rbkErr != nil {
379+
attemptSpan.LogFields(slog.Error(rbkErr))
376380
worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error")
377381
}
378-
hspan.Finish()
379382
return handlerErr
380383
}
381384
cmtErr := tx.Commit()
382385
if cmtErr != nil {
386+
attemptSpan.LogFields(slog.Error(cmtErr))
383387
worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers")
384388
return cmtErr
385389
}

0 commit comments

Comments
 (0)