Postgres
Deliver observability data to the PostgreSQL database
Warnings
PostgreSQL’s default values defined in the destination table
are not supported. If the ingested event is missing a field which is present as a table column,
a null
value will be inserted for that record even if that column has a default value defined.
This is a limitation of the jsonb_populate_recordset
function of PostgreSQL.
As a workaround, you can add a NOT NULL
constraint to the column, so when inserting an event which is missing that field
a NOT NULL
constraint violation would be raised, and define a constraint trigger
to catch the exception and set the desired default value.
Configuration
Example configurations
{
"sinks": {
"my_sink_id": {
"type": "postgres",
"inputs": [
"my-source-or-transform-id"
]
}
}
}
[sinks.my_sink_id]
type = "postgres"
inputs = [ "my-source-or-transform-id" ]
sinks:
my_sink_id:
type: postgres
inputs:
- my-source-or-transform-id
{
"sinks": {
"my_sink_id": {
"type": "postgres",
"inputs": [
"my-source-or-transform-id"
],
"pool_size": 5
}
}
}
[sinks.my_sink_id]
type = "postgres"
inputs = [ "my-source-or-transform-id" ]
pool_size = 5
sinks:
my_sink_id:
type: postgres
inputs:
- my-source-or-transform-id
pool_size: 5
acknowledgements
optional objectControls how acknowledgements are handled for this sink.
See End-to-end Acknowledgements for more information on how event acknowledgement is handled.
acknowledgements.enabled
optional boolWhether or not end-to-end acknowledgements are enabled.
When enabled for a sink, any source that supports end-to-end acknowledgements that is connected to that sink waits for events to be acknowledged by all connected sinks before acknowledging them at the source.
Enabling or disabling acknowledgements at the sink level takes precedence over any global
acknowledgements
configuration.
batch
optional objectEvent batching behavior.
Note that as PostgreSQL’s jsonb_populate_recordset
function is used to insert events,
a single event in the batch can make the whole batch to fail. For example, if a single event within the batch triggers
a unique constraint violation in the destination table, the whole event batch will fail.
As a workaround, triggers on constraint violations
can be defined at a database level to change the behavior of the insert operation on specific tables.
Alternatively, setting max_events
batch setting to 1
will make each event to be inserted independently,
so events that trigger a constraint violation will not affect the rest of the events.
batch.max_bytes
optional uintThe maximum size of a batch that is processed by a sink.
This is based on the uncompressed size of the batched events, before they are serialized/compressed.
1e+07
(bytes)batch.max_events
optional uintbatch.timeout_secs
optional float1
(seconds)buffer
optional objectConfigures the buffering behavior for this sink.
More information about the individual buffer types, and buffer behavior, can be found in the Buffering Model section.
buffer.max_events
optional uinttype = "memory"
500
buffer.max_size
required uintThe maximum size of the buffer on disk.
Must be at least ~256 megabytes (268435488 bytes).
type = "disk"
buffer.type
optional string literal enumOption | Description |
---|---|
disk | Events are buffered on disk. This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes. Data is synchronized to disk every 500ms. |
memory | Events are buffered in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully or crashes. |
memory
buffer.when_full
optional string literal enumOption | Description |
---|---|
block | Wait for free space in the buffer. This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge. |
drop_newest | Drops the event instead of waiting for free space in buffer. The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events. |
block
endpoint
required string literalhealthcheck
optional objecthealthcheck.enabled
optional booltrue
inputs
required [string]A list of upstream source or transform IDs.
Wildcards (*
) are supported.
See configuration for more info.
pool_size
optional uint5
request
optional objectMiddleware settings for outbound requests.
Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior.
Note that the retry backoff policy follows the Fibonacci sequence.
request.adaptive_concurrency
optional objectConfiguration of adaptive concurrency parameters.
These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or unstable performance and sink behavior. Proceed with caution.
request.adaptive_concurrency.decrease_ratio
optional floatThe fraction of the current value to set the new concurrency limit when decreasing the limit.
Valid values are greater than 0
and less than 1
. Smaller values cause the algorithm to scale back rapidly
when latency increases.
Note: The new limit is rounded down after applying this ratio.
0.9
request.adaptive_concurrency.ewma_alpha
optional floatThe weighting of new measurements compared to older measurements.
Valid values are greater than 0
and less than 1
.
ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has unusually high response variability.
0.4
request.adaptive_concurrency.initial_concurrency
optional uintThe initial concurrency limit to use. If not specified, the initial limit is 1 (no concurrency).
Datadog recommends setting this value to your service’s average limit if you’re seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
adaptive_concurrency_limit
metric.
1
request.adaptive_concurrency.max_concurrency_limit
optional uintThe maximum concurrency limit.
The adaptive request concurrency limit does not go above this bound. This is put in place as a safeguard.
200
request.adaptive_concurrency.rtt_deviation_scale
optional floatScale of RTT deviations which are not considered anomalous.
Valid values are greater than or equal to 0
, and we expect reasonable values to range from 1.0
to 3.0
.
When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable those values are. We use that deviation when comparing the past RTT average to the current measurements, so we can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT.
2.5
request.concurrency
optional string literal enum uintConfiguration for outbound request concurrency.
This can be set either to one of the below enum values or to a positive integer, which denotes a fixed concurrency limit.
Option | Description |
---|---|
adaptive | Concurrency is managed by Vector’s Adaptive Request Concurrency feature. |
none | A fixed concurrency of 1. Only one request can be outstanding at any given time. |
adaptive
request.rate_limit_duration_secs
optional uintrate_limit_num
option.1
(seconds)request.rate_limit_num
optional uintrate_limit_duration_secs
time window.9.223372036854776e+18
(requests)request.retry_attempts
optional uint9.223372036854776e+18
(retries)request.retry_initial_backoff_secs
optional uintThe amount of time to wait before attempting the first retry for a failed request.
After the first retry has failed, the fibonacci sequence is used to select future backoffs.
1
(seconds)request.retry_jitter_mode
optional string literal enumOption | Description |
---|---|
Full | Full jitter. The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff strategy. Incorporating full jitter into your backoff strategy can greatly reduce the likelihood of creating accidental denial of service (DoS) conditions against your own systems when many clients are recovering from a failure state. |
None | No jitter. |
Full
request.retry_max_duration_secs
optional uint30
(seconds)request.timeout_secs
optional uintThe time a request can take before being aborted.
Datadog highly recommends that you do not lower this value below the service’s internal timeout, as this could create orphaned requests, pile on retries, and result in duplicate data downstream.
60
(seconds)table
required string literalTelemetry
Metrics
linkbuffer_byte_size
gaugebuffer_discarded_events_total
counterbuffer_events
gaugebuffer_received_event_bytes_total
counterbuffer_received_events_total
counterbuffer_sent_event_bytes_total
counterbuffer_sent_events_total
countercomponent_discarded_events_total
counterfilter
transform, or false if due to an error.component_errors_total
countercomponent_received_event_bytes_total
countercomponent_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_received_events_total
countercomponent_sent_bytes_total
countercomponent_sent_event_bytes_total
countercomponent_sent_events_total
counterutilization
gaugeHow it works
Buffers and batches
This component buffers & batches data as shown in the diagram above. You’ll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.
Batches are flushed when 1 of 2 conditions are met:
- The batch age meets or exceeds the configured
timeout_secs
. - The batch size meets or exceeds the configured
max_bytes
ormax_events
.
Buffers are controlled via the buffer.*
options.
Health checks
Require health checks
If you’d like to exit immediately upon a health check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.yaml --require-healthy
Disable health checks
healthcheck
option to
false
.Inserting events into PostgreSQL
In order to insert data into a PostgreSQL table, you must first create a table that matches
the json serialization of your event data. Note that this sink accepts log
, metric
, and trace
events
and the inserting behavior will be the same for all of them.
For example, if your event is a log whose JSON serialization would have the following structure:
{
"host": "localhost",
"message": "239.215.85.26 - AmbientTech [04/Mar/2025:15:09:25 +0100] "DELETE /observability/metrics/production HTTP/1.0" 300 37142",
"service": "vector",
"source_type": "demo_logs",
"timestamp": "2025-03-04T14:09:25.883572054Z"
}
And you want to store all those fields, the table should be created as follows:
CREATE TABLE logs (
host TEXT,
message TEXT,
service TEXT,
source_type TEXT,
timestamp TIMESTAMPTZ
);
Note that not all fields must be declared in the table, only the ones you want to store. If a field is not present in the table but it is present in the event, it will be ignored.
When inserting the event into the table, PostgreSQL will do a best-effort job of converting the JSON serialized
event to the correct PostgreSQL data types.
The semantics of the insertion will follow the jsonb_populate_record
function of PostgresSQL,
see PostgreSQL documentation about that function
for more details about the inserting behavior.
The correspondence between Vector types and PostgreSQL types can be found
in the sqlx
crate’s documentation
Practical example
Spin up a PostgreSQL instance with Docker:
docker run -d --name postgres -e POSTGRES_PASSWORD=password123 -p 5432:5432 postgres
Create the following PostgreSQL table inside the test
database:
CREATE TABLE logs (
message TEXT,
payload JSONB,
timestamp TIMESTAMPTZ
);
And the following Vector configuration:
sources:
demo_logs:
type: demo_logs
format: apache_common
transforms:
payload:
type: remap
inputs:
- demo_logs
source: |
.payload = .
sinks:
postgres:
type: postgres
inputs:
- payload
endpoint: postgres://postgres:password123@localhost/test
table: logs
Then, you can see those log events ingested in the logs
table.
Composite Types
When using PostgreSQL composite types, the sink will attempt to insert the event data into the composite type, following its structure.
Using the previous example, if you want to store the payload
column as a composite type instead of JSONB
,
you should create the following composite type:
CREATE TYPE payload_type AS (
host TEXT,
message TEXT,
service TEXT,
source_type TEXT,
timestamp TIMESTAMPTZ
);
And the table should be created as follows:
CREATE TABLE logs (
message TEXT,
payload payload_type,
timestamp TIMESTAMPTZ
);
Then, you can see those log events ingested in the logs
table and the payload
column can be
treated as a regular PostgreSQL composite type.
Ingesting metrics
When ingesting metrics, the sink will behave exactly the same as when ingesting logs. You must declare the table with the same fields as the JSON serialization of the metric event.
For example, in order to ingest Vector’s internal events, and only take into account counter
, gauge
, and aggregated_histogram
metric data,
you should create the following table:
create table metrics(
name TEXT,
namespace TEXT,
tags JSONB,
timestamp TIMESTAMPTZ,
kind TEXT,
counter JSONB,
gauge JSONB,
aggregated_histogram JSONB
);
And with this Vector configuration:
sources:
internal_metrics:
type: internal_metrics
sinks:
postgres:
type: postgres
inputs:
- internal_metrics
endpoint: postgres://postgres:password123@localhost/test
table: metrics
You can see those metric events ingested into the metrics
table.
Rate limits & adaptive concurrency
Adaptive Request Concurrency (ARC)
Adaptive Request Concurrency is a feature of Vector that does away with static concurrency limits and automatically optimizes HTTP concurrency based on downstream service responses. The underlying mechanism is a feedback loop inspired by TCP congestion control algorithms. Checkout the announcement blog post,
We highly recommend enabling this feature as it improves performance and reliability of Vector and the systems it communicates with. As such, we have made it the default, and no further configuration is required.
Static concurrency
If Adaptive Request Concurrency is not for you, you can manually set static concurrency
limits by specifying an integer for request.concurrency
:
sinks:
my-sink:
request:
concurrency: 10
Rate limits
In addition to limiting request concurrency, you can also limit the overall request
throughput via the request.rate_limit_duration_secs
and request.rate_limit_num
options.
sinks:
my-sink:
request:
rate_limit_duration_secs: 1
rate_limit_num: 10
These will apply to both adaptive
and fixed request.concurrency
values.