Skip to content

Commit 31d9fc5

Browse files
author
Guy Baron
authored
v1.x rollup to master (#121)
* add handler metrics to bus and saga (#101) * add handler metrics to bus and saga + tests * fix build * add 0 to the default buckets to catch fast message handling * PR correction - changed latency to summary(removed bucket configuration), add registration for saga handlers * PR correction - getting logger as a param * PR correction - new line in eof * PR corrections message handler + sync.map + latency as summary * add rejected messages metric * dead letter handler should reject messages on failures and rollbacks and ack on commit success (#105) * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * return an error from the saga store when deleting a saga if saga can not (#110) be found In order to deal with concurrent deletes of the sage saga instance we would wan't to indicate that deleting the saga failed if the saga is not stored so callers can take proper action * Persisted timeouts (#107) * decouple transaction manager from glue * moved timeout manager to gbus/tx package * initial commit in order to support persisted timeouts * first working version of a mysql persisted timeout manager * fixing ci lint errors * refactored ensure schema of timeout manager * cleanup timeout manager when bs shuts down * fixing formatting issues * changed logging level from Info to Debug when inserting a new timeout * resusing timeouts tablename (PR review) * renamed AcceptTimeoutFunction to SetTimeoutFunction on the TimeoutManager interface (PR review) * refactored glue to implement the Logged inetrface and use the GLogged helper struct * locking timeout record before executing timeout In order to prevent having a timeout beeing executed twice due to two concurrent grabbit instances running the same service a lock (FOR UPDATE) has been placed on the timeout record in the scope of the executing transaction * Commiting the select transaction when querying for pending timeouts * feat(timeout:lock): This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances * Enable returning a message back from the dead to the queue (#112) * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * return to q * return to q * return to q * return to q * return dead to q * allow no retries * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * added metric report on saga timeout (#114) 1) added reporting saga timeouts to the glue component 2) fixed mysql timeoutmanager error when trying to clear a timeout * Added documentation for grabbit metrics (#117) * added initial documentation for grabbit metrics * including metrics section in readme.md * fixing goreportcard issues (#118) * removed logging a warning when worker message channel returns an error (#116) * corrected saga metrics name and added to metrics documentation (#119) * corrected saga metrics name and added documentatio * corrected saga metric name * corrected typos * removed non transactional bus mode (#120)
1 parent 44f71ca commit 31d9fc5

31 files changed

+1120
-401
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ debug
99

1010
# Test binary, built with `go test -c`
1111
*.test
12-
12+
.vscode
1313
# Output of the go coverage tool, specifically when used with LiteIDE
1414
*.out
1515
.DS_Store

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ A lightweight transactional message bus on top of RabbitMQ supporting:
2121
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
2222
6) Deadlettering
2323
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
24+
8) Reporting [metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
2425

2526
Planned:
2627

docs/METRICS.md

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Metrics
2+
3+
grabbit exposes and reports the following metrics to Prometheus
4+
5+
| Namespace | Subsystem | Name | Description |
6+
| ------------- | ------------- | ----------------------------------| --------------------------------------------------------------------------- |
7+
| grabbit | handler | [name of message handler]_result | records and counts each succesfull or failed execution of a message handler |
8+
| grabbit | handler | [name of message handler]_latency | records the execution time of each handler |
9+
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
10+
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |

gbus/abstractions.go

+31-6
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,20 @@ package gbus
33
import (
44
"context"
55
"database/sql"
6-
"github.com/sirupsen/logrus"
76
"time"
87

8+
"github.com/sirupsen/logrus"
9+
910
"github.com/streadway/amqp"
1011
)
1112

13+
//Semantics reopresents the semantics of a grabbit message
1214
type Semantics string
1315

1416
const (
17+
//CMD represenst a messge with command semantics in grabbit
1518
CMD Semantics = "cmd"
19+
//EVT represenst a messge with event semantics in grabbit
1620
EVT Semantics = "evt"
1721
)
1822

@@ -25,7 +29,7 @@ type BusConfiguration struct {
2529
//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
2630
type Bus interface {
2731
HandlerRegister
28-
RegisterDeadletterHandler
32+
Deadlettering
2933
BusSwitch
3034
Messaging
3135
SagaRegister
@@ -106,9 +110,6 @@ type HandlerRegister interface {
106110
HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
107111
}
108112

109-
//MessageHandler signature for all command handlers
110-
type MessageHandler func(invocation Invocation, message *BusMessage) error
111-
112113
//Saga is the base interface for all Sagas.
113114
type Saga interface {
114115
//StartedBy returns the messages that when received should create a new saga instance
@@ -127,8 +128,9 @@ type Saga interface {
127128
}
128129

129130
//RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
130-
type RegisterDeadletterHandler interface {
131+
type Deadlettering interface {
131132
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
133+
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
132134
}
133135

134136
//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
@@ -145,6 +147,14 @@ type SagaRegister interface {
145147
RegisterSaga(saga Saga, conf ...SagaConfFn) error
146148
}
147149

150+
//SagaGlue glues together all the parts needed in order to orchistrate saga instances
151+
type SagaGlue interface {
152+
SagaRegister
153+
Logged
154+
Start() error
155+
Stop() error
156+
}
157+
148158
//Builder is the main interface that should be used to create an instance of a Bus
149159
type Builder interface {
150160
PurgeOnStartUp() Builder
@@ -217,6 +227,21 @@ type TxOutbox interface {
217227
Stop() error
218228
}
219229

230+
//TimeoutManager abstracts the implementation of determining when a saga should be timed out
231+
type TimeoutManager interface {
232+
//RegisterTimeout requests the TimeoutManager to register a timeout for a specific saga instance
233+
RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error
234+
//ClearTimeout clears a timeout for a specific saga
235+
ClearTimeout(tx *sql.Tx, sagaID string) error
236+
//SetTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires
237+
SetTimeoutFunction(func(tx *sql.Tx, sagaID string) error)
238+
//Start starts the timeout manager
239+
Start() error
240+
//Stop shuts the timeout manager down
241+
Stop() error
242+
}
243+
244+
//Logged represents a grabbit component that can be logged
220245
type Logged interface {
221246
SetLogger(entry logrus.FieldLogger)
222247
Log() logrus.FieldLogger

gbus/builder/builder.go

+29-32
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package builder
22

33
import (
44
"fmt"
5-
"github.com/sirupsen/logrus"
65
"sync"
76
"time"
87

8+
"github.com/sirupsen/logrus"
9+
910
"github.com/wework/grabbit/gbus"
1011
"github.com/wework/grabbit/gbus/saga"
11-
"github.com/wework/grabbit/gbus/saga/stores"
1212
"github.com/wework/grabbit/gbus/serialization"
1313
"github.com/wework/grabbit/gbus/tx/mysql"
1414
)
@@ -36,17 +36,14 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
3636
gb := &gbus.DefaultBus{
3737
AmqpConnStr: builder.connStr,
3838
PrefetchCount: builder.PrefetchCount,
39-
Outgoing: &gbus.AMQPOutbox{
40-
SvcName: svcName,
41-
},
39+
4240
SvcName: svcName,
4341
PurgeOnStartup: builder.purgeOnStartup,
4442
DelayedSubscriptions: [][]string{},
4543
HandlersLock: &sync.Mutex{},
4644
RPCLock: &sync.Mutex{},
4745
SenderLock: &sync.Mutex{},
4846
ConsumerLock: &sync.Mutex{},
49-
IsTxnl: builder.txnl,
5047
Registrations: make([]*gbus.Registration, 0),
5148
RPCHandlers: make(map[string]gbus.MessageHandler),
5249
Serializer: builder.serializer,
@@ -68,46 +65,46 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
6865
gb.WorkerNum = builder.workerNum
6966
}
7067
var (
71-
sagaStore saga.Store
68+
sagaStore saga.Store
69+
timeoutManager gbus.TimeoutManager
7270
)
73-
if builder.txnl {
74-
gb.IsTxnl = true
75-
switch builder.txnlProvider {
7671

77-
case "mysql":
78-
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
72+
switch builder.txnlProvider {
73+
74+
case "mysql":
75+
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
76+
if err != nil {
77+
panic(err)
78+
}
79+
gb.TxProvider = mysqltx
80+
//TODO move purge logic into the NewSagaStore factory method
81+
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
82+
if builder.purgeOnStartup {
83+
err := sagaStore.Purge()
7984
if err != nil {
8085
panic(err)
8186
}
82-
gb.TxProvider = mysqltx
83-
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
84-
if builder.purgeOnStartup {
85-
err := sagaStore.Purge()
86-
if err != nil {
87-
panic(err)
88-
}
89-
}
90-
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
91-
92-
default:
93-
err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider)
94-
panic(err)
9587
}
96-
} else {
97-
sagaStore = stores.NewInMemoryStore()
98-
}
88+
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
89+
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)
9990

91+
default:
92+
err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider)
93+
panic(err)
94+
}
10095
if builder.usingPingTimeout {
10196
gb.DbPingTimeout = builder.dbPingTimeout
10297
}
10398

99+
//TODO move this into the NewSagaStore factory methods
104100
if builder.purgeOnStartup {
105101
err := sagaStore.Purge()
106102
if err != nil {
107103
panic(err)
108104
}
109105
}
110-
gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider)
106+
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
107+
gb.Glue = glue
111108
return gb
112109
}
113110

@@ -179,9 +176,9 @@ func (builder *defaultBuilder) ConfigureHealthCheck(timeoutInSeconds time.Durati
179176
}
180177

181178
func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) gbus.Builder {
182-
if config.MaxRetryCount > 0 {
183-
gbus.MaxRetryCount = config.MaxRetryCount
184-
}
179+
180+
gbus.MaxRetryCount = config.MaxRetryCount
181+
185182
if config.BaseRetryDuration > 0 {
186183
gbus.BaseRetryDuration = time.Millisecond * time.Duration(config.BaseRetryDuration)
187184
}

0 commit comments

Comments
 (0)