Skip to content

Commit 716afe8

Browse files
committed
Describe alternative transactional outbox implementation using lock
1 parent f98a8c9 commit 716afe8

7 files changed

+115
-18
lines changed

README.md

+44-8
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
- [Loading any revision of the aggregate](#4-5)
2121
- [Synchronously updating projections](#4-6)
2222
- [Asynchronously sending integration events to a message broker](#4-7)
23-
- [Reliable transactional outbox with PostgreSQL](#4-7-1)
24-
- [Database polling](#4-7-2)
25-
- [Database polling alternative](#4-7-3)
23+
- [Transactional outbox using transaction ID](#4-7-1)
24+
- [Transactional outbox using table-level lock](#4-7-2)
25+
- [Database polling](#4-7-3)
26+
- [Listen/Notify as an alternative to database polling](#4-7-4)
2627
- [Adding new asynchronous event handlers](#4-8)
2728
- [Drawbacks](#4-9)
2829
- [Project structure](#5)
@@ -390,7 +391,7 @@ and processes them:
390391
WHERE SUBSCRIPTION_NAME = :subscriptionName
391392
```
392393
393-
#### <a id="4-7-1"></a>Reliable transactional outbox with PostgreSQL
394+
#### <a id="4-7-1"></a>Transactional outbox using transaction ID
394395
395396
Using only the event ID to track events processed by the subscription is unreliable
396397
and can result in lost events.
@@ -423,9 +424,43 @@ All transaction IDs less than `xmin` are either committed and visible, or rolled
423424
Even if transaction #2 started after transaction #1 and committed first,
424425
the events it created won't be read by the event subscription processor until transaction #1 is committed.
425426

426-
![PostgreSQL reliable transactional outbox](img/postgresql-reliable-outbox.svg)
427+
![PostgreSQL reliable transactional outbox using transaction ID](img/postgresql-reliable-outbox.svg)
427428

428-
#### <a id="4-7-2"></a>Database polling
429+
> **NOTE**
430+
> The transaction ID solution is used by default as it is non-blocking.
431+
432+
#### <a id="4-7-2"></a>Transactional outbox using table-level lock
433+
434+
With the transaction ID solution, event subscription processor doesn't wait for in-progress transactions to complete.
435+
Events created by already committed transactions will not be available for processing
436+
while transactions started earlier are still in-progress.
437+
These events will be processed immediately after these earlier transactions have completed.
438+
439+
An alternative solution is to use PostgreSQL explicit locking to make event subscription processor wait for in-progress transactions.
440+
441+
Before reading new events, the event subscription processor
442+
* gets the most recently issued ID sequence number,
443+
* very briefly locks the table for writes to wait for all pending writes to complete.
444+
445+
The most recently issued `ES_EVENT_ID_SEQ` sequence number is obtained using the `pg_sequence_last_value` function:
446+
`SELECT pg_sequence_last_value('ES_EVENT_ID_SEQ')`.
447+
448+
Events are created with the command `INSERT INTO ES_EVENT...` that acquires the `ROW EXCLUSIVE` lock mode on `ES_EVENT` table.
449+
`ROW EXCLUSIVE (RowExclusiveLock)` lock mode is acquired by any command that modifies data in a table.
450+
451+
The command `LOCK ES_EVENT IN SHARE ROW EXCLUSIVE MODE` acquires the `SHARE ROW EXCLUSIVE` lock mode on `ES_EVENT` table.
452+
`SHARE ROW EXCLUSIVE (ShareRowExclusiveLock)` mode protects a table against concurrent data changes,
453+
and is self-exclusive so that only one session can hold it at a time.
454+
455+
`SHARE ROW EXCLUSIVE` lock must be acquired in a separate transaction (`Propagation.REQUIRES_NEW`).
456+
The transaction must contain only this command and commit quickly to release the lock, so writes can resume.
457+
458+
When the lock is acquired and released, it means
459+
that there are no more uncommitted writes with an ID less than or equal to the ID returned by `pg_sequence_last_value`.
460+
461+
![PostgreSQL reliable transactional outbox using table-level lock](img/postgresql-reliable-outbox-with-lock.svg)
462+
463+
#### <a id="4-7-3"></a>Database polling
429464
430465
To get new events from the `ES_EVENT` table, the application has to poll the database.
431466
The shorter the polling period, the shorter the delay between persisting a new event and processing it by the subscription.
@@ -444,7 +479,7 @@ event-sourcing:
444479
polling-interval: PT1S
445480
```
446481
447-
#### <a id="4-7-3"></a>Database polling alternative
482+
#### <a id="4-7-4"></a>Listen/Notify as an alternative to database polling
448483
449484
To reduce the lag associated with database polling, the polling period can be set to a very low value,
450485
such as 1 second.
@@ -481,7 +516,8 @@ event-sourcing:
481516
subscriptions: postgres-channel # Enable Listen/Notify event subscription processing
482517
```
483518

484-
This mechanism is used by default as more efficient.
519+
> **NOTE**
520+
> The Listen/Notify mechanism is used by default as it is more efficient.
485521

486522
### <a id="4-8"></a>Adding new asynchronous event handlers
487523

img/plantuml/postgresql-naive-outbox.puml

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ skinparam sequence {
88
' LifeLineBackgroundColor LightGrey
99
}
1010

11-
participant "Append Event\nTransaction #1" as tx1
12-
participant "Append Event\nTransaction #2" as tx2
11+
participant "Append event\nTransaction #1" as tx1
12+
participant "Append event\nTransaction #2" as tx2
1313
database "PostgreSQL" as db
14-
participant "Read Events\nAfter Checkpoint\nTransaction #3" as tx3
15-
participant "Read Events\nAfter Checkpoint\nTransaction #4" as tx4
14+
participant "Read events\nafter checkpoint\nTransaction #3" as tx3
15+
participant "Read events\nafter checkpoint\nTransaction #4" as tx4
1616

1717
tx1 --> db: ""BEGIN TRANSACTION""
1818
activate tx1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
@startuml
2+
3+
scale max 1024 width
4+
scale max 1100 height
5+
6+
skinparam sequence {
7+
LifeLineBorderColor Grey
8+
' LifeLineBackgroundColor LightGrey
9+
}
10+
11+
participant "Append event\nTransaction #1" as tx1
12+
participant "Append event\nTransaction #2" as tx2
13+
database "PostgreSQL" as db
14+
participant "Lock table\nfor writes\nTransaction #4" as tx4
15+
participant "Read events\nafter checkpoint\nTransaction #3" as tx3
16+
17+
tx1 --> db: ""BEGIN TRANSACTION""
18+
activate tx1
19+
tx1 --> db: ""SELECT nextval('ES_EVENT_ID_SEQ')""
20+
db --> tx1: //nextval=101//
21+
tx2 --> db: ""BEGIN TRANSACTION""
22+
activate tx2
23+
note over tx2: Transaction #2 starts\nafter transaction #1
24+
tx2 --> db: ""SELECT nextval('ES_EVENT_ID_SEQ')""
25+
db --> tx2: //nextval=102//
26+
|||
27+
tx1 --> db: ""INSERT INTO ES_EVENT(ID)""\n""VALUES(101)""
28+
tx2 --> db: ""INSERT INTO ES_EVENT(ID)""\n""VALUES(102)""
29+
note over tx2: Transaction #2 commits\nbefore transaction #1
30+
tx2 --> db: ""COMMIT""
31+
deactivate tx2
32+
tx3 --> db: ""BEGIN TRANSACTION""
33+
activate tx3
34+
tx3 --> db: ""SELECT LAST_EVENT_ID""\n""AS START_ID""\n""FROM ES_EVENT_SUBSCRIPTION""
35+
db --> tx3: //START_ID=100//
36+
tx3 --> db: ""SELECT""\n""pg_sequence_last_value('ES_EVENT_ID_SEQ')""\n""AS END_ID""
37+
db --> tx3: //END_ID=102//
38+
note over tx4: Very briefly\nlock the table for writes
39+
tx4 --> db: ""LOCK ES_EVENT IN SHARE ROW EXCLUSIVE MODE""
40+
activate tx4
41+
|||
42+
|||
43+
note over tx4: Waiting for pending writes\nto complete (transaction #1)...
44+
|||
45+
|||
46+
tx1 --> db: ""COMMIT""
47+
deactivate tx1
48+
db --> tx4
49+
note over tx4: Table-level lock acquired...
50+
tx4 --> db: ""COMMIT""
51+
note over tx4: ...and immediately released
52+
deactivate tx4
53+
tx3 --> db: ""SELECT ID FROM ES_EVENT""\n""WHERE ID > 100 AND ID <= 102""
54+
db --> tx3: //ID=101, ID=102//
55+
note over tx3: Process events: ID=101, ID=102
56+
tx3 --> db: ""UPDATE ES_EVENT_SUBSCRIPTION""\n""SET LAST_EVENT_ID = 102""
57+
tx3 --> db: ""COMMIT""
58+
deactivate tx3
59+
60+
@enduml

img/plantuml/postgresql-reliable-outbox.puml

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ skinparam sequence {
88
' LifeLineBackgroundColor LightGrey
99
}
1010

11-
participant "Append Event\nTransaction #1" as tx1
12-
participant "Append Event\nTransaction #2" as tx2
11+
participant "Append event\nTransaction #1" as tx1
12+
participant "Append event\nTransaction #2" as tx2
1313
database "PostgreSQL" as db
14-
participant "Read Events\nAfter Checkpoint\nTransaction #3" as tx3
15-
participant "Read Events\nAfter Checkpoint\nTransaction #4" as tx4
14+
participant "Read events\nafter checkpoint\nTransaction #3" as tx3
15+
participant "Read events\nafter checkpoint\nTransaction #4" as tx4
1616

1717
tx1 --> db: ""BEGIN TRANSACTION""
1818
activate tx1

img/postgresql-naive-outbox.svg

+1-1
Loading

img/postgresql-reliable-outbox-with-lock.svg

+1
Loading

img/postgresql-reliable-outbox.svg

+1-1
Loading

0 commit comments

Comments
 (0)