timescaledb-event-streamer is a command line program to create a stream of CDC (Chance Data Capture) TimescaleDB Hypertable events from a PostgreSQL installation running the TimescaleDB extension.

  • By noctarius aka Christoph Engelbert
  • Last update: May 5, 2023
  • Comments: 0

timescaledb-event-streamer

timescaledb-event-streamer is a command line program to create a stream of CDC (Chance Data Capture) TimescaleDB Hypertable events from a PostgreSQL installation running the TimescaleDB extension.

Change Data Capture is a technology where insert, update, delete and similar operations inside the database generate a corresponding set of events, which are commonly distributed through a messaging connector, such as Kafka, NATS, or similar.

Attention: This software is in prototyping / alpha stage and not meant for production usage. No guarantees on the functionality are given.

Attention 2: This is not an official Timescale project, but just developed by a person who works for Timescale. This may change at some point in the future but it is not a given.

Why not just Debezium?

While Debezium already supports PostgreSQL, the implementation doesn't really support the internals of TimescaleDB, most specifically the way data is chunked or partitioned. It is possible to use Debezium to capture change events of the actual chunks itself, but without the catalog handling. This means that every chunk would emit changes on its own, but with no reference to its parent hypertable. The timescaledb-event-streamer changes this, by handling the catalog updates and resolving the parent hypertable before emitting the events.

Anyhow, the final goal is to provide an implementation for Debezium when the prototype (which may stay as its own standalone project) is fully working and every complication has been found and fixed.

Getting Started

timescaledb-event-streamer requires the Go runtime (version 1.20+) to be installed. With this requirement satisfied, the installation can be kicked off using:

$ go install github.com/noctarius/timescaledb-event-streamer/cmd/[email protected]

Before using the program, a configuration file needs to be created. An example configuration can be found here.

For a full reference of the existing configuration options, see the Configuration section.

Supporting non-privileged users (without postgres user)

In addition to the program itself, a function has to be installed into the database which will be used to generate change events from. The function is used to create the initial logical replication publication, since the internal catalog tables from TimescaleDB are owned by the postgres user, as required for a trusted extension.

The function needs to be created by the postgres user when added to the database and runs in security definer mode, inheriting the permissions and ownership of the defining user before giving up the increased permissions voluntarily.

To install the function, please run the following code snippet as postgres user against your database. timescaledb-event-streamer will automatically use it when starting up. If the function is not available the startup will fail!

The function can be found in the github repository.

To install the function you can use as following:

$ wget https://raw.githubusercontent.com/noctarius/timescaledb-event-streamer/main/create_timescaledb_catalog_publication.sql
$ psql "<connstring>" < create_timescaledb_catalog_publication.sql

Using timescaledb-event-streamer

After creating a configuration file, timescaledb-event-streamer can be executed with the following command:

$ timescaledb-event-streamer -config=./config.toml

The tool will connect to your TimescaleDB database, and start replicating incoming events.

Configuration

timescaledb-event-streamer utilizes TOML as its configuration file format due to its simplicity.

The actual configuration values are designed as canonical name (dotted keys).

In addition to the configuration file, all values can be provided as envrionment variables. In this case, the property name uses underscore (_) characters instead of dots (.) characters and all characters are used as uppercase. That means, postgresql.connection becomes POSTGRESQL_CONNECTION. In case the standard property name already contains an underscore the one underscore character is duplicated (test.some_value becomes TEST_SOME__VALUE).

PostgreSQL Configuration

Property Description Data Type Default Value
postgresql.connection The connection string in one of the libpq-supported forms string host=localhost user=repl_user sslmode=disable
postgresql.password The password to connect to the user. string Environment variable: PGPASSWORD
postgresql.snapshot.batchsize The size of rows requested in a single batch iteration when snapshotting tables. int 1000
postgresql.snapshot.initial The value describes the startup behavior for snapshotting. Valid values are always, never, initial_only. NOT YET IMPLEMENTED string initial_only

Topic Configuration

Property Description Data Type Default Value
topic.namingstrategy.type The naming strategy of topic names. At the moment only the value debezium is supported. string debezium
topic.prefix The prefix for all topic named string timescaledb

TimescaleDB Configuration

Property Description Data Type Default Value
timescaledb.hypertables.includes The includes definition defines which hypertables to include in the event stream generation. The available patters are explained in Includes and Excludes Patterns. Excludes have precedence over includes. array of strings empty array
timescaledb.hypertables.excludes The excludes definition defines which hypertables to exclude in the event stream generation. The available patters are explained in Includes and Excludes Patterns. Excludes have precedence over includes. array of strings empty array
timescaledb.events.read The property defines if read events are generated boolean true
timescaledb.events.insert The property defines if insert events are generated boolean true
timescaledb.events.update The property defines if update events are generated boolean true
timescaledb.events.delete The property defines if delete events are generated boolean true
timescaledb.events.truncate The property defines if truncate events are generated boolean true
timescaledb.events.message The property defines if logical replication message events are generated boolean false
timescaledb.events.compression The property defines if compression events are generated boolean false
timescaledb.events.decompression The property defines if decompression events are generated boolean false

Sink Configuration

Property Description Data Type Default Value
sink.type The property defines which sink adapter is to be used. Valid values are stdout, nats, kafka, redis string stdout
sink.tombstone The property defines if delete events will be followed up with a tombstone event boolean false

NATS specific configuration, which is only used if sink.type is set to nats.

Property Description Data Type Default Value
sink.nats.address The NATS connection address, according to the NATS connection string definition string empty string
sink.nats.authorization The NATS authorization type. Valued values are userinfo, credentials, jwt string empty string
sink.nats.userinfo.username The username of userinfo authorization details string empty string
sink.nats.userinfo.password The password of userinfo authorization details string empty string
sink.nats.credentials.certificate The path of the certificate file of credentials authorization details string empty string
sink.nats.credentials.seeds The paths of seeding files of credentials authorization details array of strings empty array

Kafka specific configuration, which is only used if sink.type is set to kafka.

Property Description Data Type Default Value
sink.kafka.brokers The Kafka broker urls array of string empty array
sink.kafka.idempotent The property defines if message handling is idempotent boolean false
sink.kafka.sasl.enabled The property defines if SASL authorization is enabled boolean false
sink.kafka.sasl.user The user value to be used with SASL authorization string empty string
sink.kafka.sasl.password The password value to be used with SASL authorization string empty string
sink.kafka.sasl.mechanism The mechanism to be used with SASL authorization. Valid values are PLAIN string PLAIN
sink.kafka.tls.enabled The property defines if TLS is enabled boolean false
sink.kafka.tls.skipverify The property defines if verification of TLS certificates is skipped boolean false
sink.kafka.tls.clientauth The property defines the client auth value (as defined in Go) boolean false

Redis specific configuration, which is only used if sink.type is set to redis.

Property Description Data Type Default Value
sink.redis.network The network type of the redis connection. Valid values are tcp, unix string tcp
sink.redis.address The connection address as host:port string localhost:6379
sink.redis.password Optional password to connect to the redis server string empty string
sink.redis.database Database to select after connecting to the server int 0
sink.redis.poolsize Maximum number of socket connections int 10 per cpu
sink.redis.retries.maxattempts Maximum number of retries before giving up int 0
sink.redis.retries.backoff.min Minimum backoff between each retry in milliseconds. A value of -1 disables backoff. int 8
sink.redis.retries.backoff.max Maximum backoff between each retry in milliseconds. A value of -1 disables backoff. int 512
sink.redis.timeouts.dial Dial timeout for establishing new connections in seconds. int 5
sink.redis.timeouts.read Timeout for socket reads in seconds. A value of -1 disables the timeout. int 3
sink.redis.timeouts.write Timeout for socket writes in seconds. A value of -1 disables the timeout. int read timeout
sink.redis.timeouts.pool Amount of time in seconds client waits for connection if all connections are busy before returning an error int read timeout + 1s
sink.redis.timeouts.idle Amount of time in minutes after which client closes idle connections int 5
sink.redis.tls.enabled The property defines if TLS is enabled bool false
sink.redis.tls.skipverify The property defines if verification of TLS certificates is skipped bool false
sink.redis.tls.clientauth The property defines the client auth value (as defined in Go) int 0

Includes and Excludes Patterns

Includes and Excludes can be defined as fully canonical references to hypertables (equivalent to PostgreSQL regclass definition) or as patterns with wildcards.

For an example we assume the following hypertables to exist.

Schema Hypertable Canonical Name
public metrics public.metrics
public status_messages public.status_messages
invoicing invoices invoicing.invoices
alarming alarms alarming.alarms

To note, excludes have precedence over includes, meaning, that if both includes and excludes match a specific hypertable, the hypertable will be excluded from the event generation process.

Hypertables can be referred to by their canonical name (dotted notation of schema and hypertable table name). timescaledb.hypertables.includes = [ 'public.metrics', 'invoicing.invoices' ]

When referring to the hypertable by its canonical name, the matcher will only match the exact hypertable. That said, the above example will yield events for the hypertables public.metrics and invoicing.invoices but none of the other ones.

Wildcards

Furthermore, includes and excludes can utilize wildcard characters to match a subset of tables based on the provided pattern.

timescaledb-event-streamer understands 3 types of wildcards:

Wildcard Description
* The asterisk (*) character matches zero or more characters
+ The plus (+) character matches one or more characters
? The question mark (?) character matches exactly one character

Wildcards can be used in the schema or table names. It is also possible to have them in schema and table names at the same time.

Asterisk: Zero Or More Matching

Schema Hypertable Canonical Name
public metrics public.metrics
public status_messages public.status_messages

timescaledb.hypertables.includes = [ 'public.*' ] matches all hypertables in schema public.

Schema Hypertable Canonical Name
public status_1h public.status_1h
public status_12h public.status_12h
public status_messages public.status_messages

timescaledb.hypertables.includes = [ 'public.statis_1*' ] matches public.status_1h and public.status_12h, but not public.status_messages.

Schema Hypertable Canonical Name
customer1 metrics customer1.metrics
customer2 metrics customer2.metrics

Accordingly, it is possible to match a specific hypertable in all customer schemata using a pattern such as timescaledb.hypertables.includes = [ 'customer*.metrics' ].

Plus: One or More Matching

Schema Hypertable Canonical Name
public status_1_day public.status_1_day
public status_1_month public.status_1_month
public status_1_year public.status_12_month

timescaledb.hypertables.includes = [ 'public.statis_+_month' ] matches hypertables public.status_1_month and public.status_12_month, but not public.status_1_day.

Schema Hypertable Canonical Name
customer1 metrics customer1.metrics
customer2 metrics customer2.metrics

Accordingly, it is possible to match a specific hypertable in all customer schemata using a pattern such as timescaledb.hypertables.includes = [ 'customer+.metrics' ].

Question Mark: Exactly One Matching

Schema Hypertable Canonical Name
public status_1_day public.status_1_day
public status_7_day public.status_7_day
public status_14_day public.status_14_day

timescaledb.hypertables.includes = [ 'public.statis_?_day' ] matches hypertables public.status_1_day and public.status_7_day, but not public.status_14_day.

Download

timescaledb-event-streamer.zip