Postgres logical decoding to Elasticsearch/Opensearch

  • By PG2ES
  • Last update: Nov 14, 2022
  • Comments: 14

Search Replica

Uses postgres logical replication protocol to subscribe and receive any updates almost in realtime.

Table rows, regardless of types, are encoded into JSON with additional ES metadata, which makes a Message (ES Document). Those messages are buffered and flushed into ES regularly, on a timeout or when buffer is already large enough.

(someone on a stackoverflow said, that recommended bulk size is just a 5mb), with limit ~100MB. It seems, that increasing size of a bulk query does not improve throughput. And 4MB works better than 64MB, at least for local workloads.

Check How It Works

Usage E.G:

go run main.go --help

go run main.go -slot qwerty -publication pub1 -reindex
go run main.go -slot qwerty -publication pub1 -recreate

How to configure DB?

A bit hacky, yet easy to use approach was selected: PostgreSQL COMMENTs. So, in order to make table users indexable. If at leas one field specified as indexable, table would be indexed.

Comment configuration syntax is Go structtags with exception, that one tag can be set multiple times. Strings are concatenation of optionally space separated key:"value,value2" pairs

E.G: tag1:"val1" tag2:"val2,opt1,opt2"tag2:",opt1,opt2"

-- Theese are subject to improve/change
COMMENT ON TABLE users IS 'index:",all"' -- to index all it's field by default
COMMENT ON COLUMN users.password_hash IS 'index:"-"' -- to skip this field
COMMENT ON COLUMN users.first_name IS 'index:"name"' -- to index and rename the field

Also, all the tables should be included in Publication,

-- for all tables in database
CREATE PUBLICATION "ExamplePublication" FOR ALL TABLES
WITH (publish = 'insert, update, delete, truncate');

-- or you can specify tables
CREATE PUBLICATION "ExamplePublication" FOR TABLE public.users, schemaname.tablename, etc.etc
WITH (publish = 'insert, update, delete, truncate');

Important to specify proper REPLICA IDENTITY for each table (more examples in ./demo/schema.sql). Unless Elasticsearch and Postgres PK is equal and optional routing field is part of PK or not defined.

ALTER TABLE "table_name" REPLICA IDENTITY FULL;

All tables included in publication would be indexed by default. However you can overwrite this behavior.

Special docType (in future configurable) field is attached to each document.

Roadmap

  • Arrays and Custom types (E.G. Enums support)
    • Basic Arrays (like Text[])
    • Enums
    • Enum arrays
    • Composite
    • Composite arrays (Too complex; PRs are welcome)
  • Composite PK
  • Better logging
  • Reindex when slot is lost
  • HealthCheck & Readiness
  • Metrics (prometheus)
  • REST API with basic actions
  • Tests
  • Improve auto discovery, for inlines/join relations

How it works

1. Descovery & initial config

There is sql query, to fetch all tables, columns and their comments for all tables within Publication. With this information it's possible to build config, with tables, columns, schemas, parsing awailable table comments.

1.1 Slot is (re) created
2. Cold start (optional)

Having tables and columns (only names) that needs to be indexed, we can make a SELECT * FROM ... query, and use it's result, to determine column type. There is switch/case mapping for all default PG type. So we can decode values.

Also, during this step, column names got their json representation "pre-marshaled". Having thet we can iterate over result, decoding each row and generating JSON for ES. Marhaling only values. (most of them are json.Marshallers). Those chunks of json are send to chanel.

3. Subscribtion

After cold start, we have current PostgreSQL log position, so we can subscribe only for new events, even if slot is lagging behind.

Use logical replication subscribtion without specifying a position, will use last commited position as stating point. So, in case of crashes, network outage, or overloaded search we are safe. Script always start from a last checkpoint.

Inlining (1:N sub documents)

This is done using scripted update of parent document. Does not require any effort from Postgres, while slightly increasing a load on elasticsearch.

WARNING: N:M relations are not supported. And 1:1 do not make any sense in this context.

In order to put one table as list of objects into another doc, you need to have parent key, which is usualy foreign key. Imagine we have structire like thise.

So, having two tables with 1-N relationship, like

Parent Child
Users index:",all" inline:"comments_inl,comments"
id (pk)
name
email
phone index:"-"
Comments index:"-"
id (pk) inline:"comments_inl"
user_id inline:"comments_inl,parent"
text inline:"comments_inl,content"
date

here:

  • index:",all" comment on Users table means index all fields, with documentType={tablename}
  • inline:"comments_inl,comments" on Users table would inline comments_inl objects into comments field.
  • index:"-" on a Users.phone would not index this field, regardless index:",all" on a table.
  • index:"-" on Comments table, would not index this table entierly.
  • inline:"comments_inl" on Comments field id would include this field in comments_inl inline, preserving field name. And since this column is PK, it would also be used as PK for inline.
  • inline:"comments_inl,parent" on user_id would specify PK of parrent document. So we can append comment into User.comments.
  • inline:"comments_inl,content" on text field id would include this field in comments_inl inline under new content name.

and this would produce documents like

"_source": {
    "docType": "users",
    "id": 1631208,
    "name": "Rick Sanchez",
    "email": "[email protected]",
    "comments": [{
        "id": 25285149,
        "content": "Lorem Ipsum ...",
        "user_id": 1631208
    },{
        "id": 25285753,
        "content": "... dolore eu fugiat nulla pariatur...",
        "user_id": 1631208
    }]
}

Env Config

Variable Default Description
PG_SLOT pg2es replication slot name
PG_PUBLICATION search publication name
PGHOST localhost
PGPORT 5432
PGDATABASE -
PGUSER -
PGPASSWORD -
SEARCH_HOST - URL or host of ElasticSearch/OpenSearch
SEARCH_USERNAME - optional
SEARCH_PASSWORD - optional
SEARCH_BULK_SIZE 4 (MB) Bulk request size limit.
SEARCH_PUSH_INTERVAL 30s idle push interval, when there is no enough rows for full bulk request.
LOG_FORMAT json json or cli
LOG_LEVEL warn from debug to fatal

Possible configuration tags:

Table

  • index:
    • docType (- means do not index)
    • Opts:
      • all (deprecated) index all fields in this table
  • inline:
    • inline name, that will be injected as a field
    • field name
    • (optional) script name for adding
    • (optional) script name for removal
  • join:
    • field name
    • (optional) type name (by default docType is used)

Field

  • index:
    • rename (- means do not index)
    • Opts:
      • pk this value, prefixed by table name would be used as document _id.
      • id value would be used as document _id field
      • routing value would be used for routing
  • inline:
    • inline name, where this field should be used
    • rename (within inline) NOT IMPLEMENTED
    • Opts:
      • pk inline PK
      • parent parent _id
      • (optional) routing routing value of PARENT document. (in order to find and update it)
  • join:
    • Opts:
      • parent field used as parent value
      • name field is used as join type (for polymorphism)

Notes

  • The script is single threaded (not a bottleneck)... Separate goroutine is used to make ES requests and for periodical logs.
  • Links between Database <-> Schema <-> Table <-> Column, shoudld be considered read only, and safe for multithread use... (not yet)
  • It's fast. All the the efforts shuld be towards readability, reliability and functionality.

Known Limitations:

  • PK/ID/Routing field can not be updated. (yet; undefined behavior)
  • Inline can not change parent
  • No composite PKs (yet)
  • No 1:1 inlines (yet)
  • Delete document deletes all inlines (AKA DELETE CASCADE), and they can not be restored.

Inlining

image

Download

search-replica.zip

Comments(14)

  • 1

    Update of already deleted document after restart leads to error.

    If, after restart uncommited part of WAL contains deletion, SR will try to delete non-existing documents from ES, which would result in error. This error needs to be properly ignored and do not lead to further restarts.

  • 2

    COPY freezes after large dataset

    Steps to reproduce:

    1. reindex large dataset, slowly. (preferably few tables)
    2. SR never switches to streaming mode.

    Probably some kind of timeout on SR or PG side.

  • 3

    Group indice table by relation & attribute

    previous query was showing duplicated rows if fields is contained in more than one index, resulting in

    Expected: | col | pk | repl | |-----|--------|--------| | foo | true | true | | bar | false | true |

    Received: | col | pk | repl | |-----|--------|--------| | foo | true | false | | foo | false | true | | foo | false | false | | bar | false | true |

    Fixed by grouping in sub-query

  • 4

    Proper usage of slot snapshot.

    For consistency purpose it's not enough to just select all data after slot creation. Either those actions should be in one transaction, or exported snapshot should be explicitly specified in another transaction...

    Preferred way would be usage of COPY command, instead of SELECT, since it's available in simple protocol, and thus can be used within a transaction. Also, this would improve the speed.

  • 5

    Improve recreation logic, when document _id changed

    If document key's changed, and old keys exist in WAL, document will be recreated. Inlined document can be moved across parent documents (scripted del + and scripted add)

    both check use byte comparison of defined fields, without actual decoding.

  • 6

    Bulk debounce

    If after a while we receive a few records at once, bulk will immediately send first once, while next messages would wait until next bulk request.

    Appropriate debounce period, would help in these cases.

    either debounce after each message, until bulk is full. or debounce after idle.

  • 7

    Add basic prometheus metrics

    close #4 Additionally:

    • Split PG and ES via streampipe
    • Improvements of updates/deletes handling regarding

      PK/ID/Routing field can not be updated. Inline can not change parent

    • Removed obsolete extensive periodic logging
    • Removed hard-coded timeout. #14 ...
  • 8

    Broken discovery

    Steps to reproduce:

    1. Create few indices on the table (include PK as field)
    2. Execute discovery query, and find duplicates
    3. Find duplicates, which break config.

    Expected: | col | pk | repl | |-----|--------|--------| | foo | true | true | | bar | false | true |

    Received: | col | pk | repl | |-----|--------|--------| | foo | true | false | | foo | false | true | | foo | false | false | | foo | false | false | | bar | false | true |

  • 9

    Feat/ci tests

    Enable PR workflow with consistency test from ./demo/ Sleep time 60s is due to slow GH runtime (having both postgres and elastic running there), will be changed later (require additional APIs from SearchReplica)

  • 10

    Split config discovery and configuration logic

    Make configuration pluggable and implement:

    • Basic mandatory schema discovery

    Optional:

    • Autodiscovery based on schema, PKs, FKs
    • Conftags based config
    • External config

    Each config applies changes on top of previous one in same order as passed to client.

    E.G:

    
    posgtres.New(
        discovery.WithSchema(),               // parse existing PKs, FKs, Indecies. Constraints 
        discovery.WithConftags("prefix_"),    // Apply conftags changes   E.G: skip some fields
        discovery.WithFileconfig(yamlfile),   // Apply additional external config, E.G: change index name
    )
    
  • 11

    changelog, release tags and tagged images.

    In order to improve stability, any major change should be semantically versioned. All versions of code should have separate docker images.

    Build image on tag push. Rebuild and overwrite image when tag is moved to different commit.

  • 12

    Tests (Thread)

    SearchReplica require extensive testing, for each possible edge case and any possible load.

    This thread would be used for track and discussion of ongoing test progress.

  • 13

    Useless parent_id within inline document.

    Having this value (which is parent _id) make no sense. It should be possible to skip/remove this field from inlined json.

    simplified E.G:

    {
        "id": 100,
        "inlined": [
            {"id": 1, "parent_id": 100, "value": "foo"},
            {"id": 2, "parent_id": 100, "value": "bar"},
            {"id": 3, "parent_id": 100, "value": "baz"}
        ],
        "field": "value"
    }
    

    Technically there is no need for this.

  • 14

    Composite PKs

    Restructure column magic to allow composite PK. This would probably require additional config tags for Key order specification or even template.

    May include composite routing keys etc...