Search Replica
Uses postgres logical replication protocol to subscribe and receive any updates almost in realtime.
Table rows, regardless of types, are encoded into JSON with additional ES metadata, which makes a Message (ES Document). Those messages are buffered and flushed into ES regularly, on a timeout or when buffer is already large enough.
(someone on a stackoverflow said, that recommended bulk size is just a 5mb), with limit ~100MB. It seems, that increasing size of a bulk query does not improve throughput. And 4MB works better than 64MB, at least for local workloads.
Check How It Works
Usage E.G:
go run main.go --help
go run main.go -slot qwerty -publication pub1 -reindex
go run main.go -slot qwerty -publication pub1 -recreate
How to configure DB?
A bit hacky, yet easy to use approach was selected: PostgreSQL COMMENTs. So, in order to make table users
indexable. If at leas one field specified as indexable, table would be indexed.
Comment configuration syntax is Go structtags with exception, that one tag can be set multiple times. Strings are concatenation of optionally space separated key:"value,value2"
pairs
E.G: tag1:"val1" tag2:"val2,opt1,opt2"tag2:",opt1,opt2"
-- Theese are subject to improve/change
COMMENT ON TABLE users IS 'index:",all"' -- to index all it's field by default
COMMENT ON COLUMN users.password_hash IS 'index:"-"' -- to skip this field
COMMENT ON COLUMN users.first_name IS 'index:"name"' -- to index and rename the field
Also, all the tables should be included in Publication,
-- for all tables in database
CREATE PUBLICATION "ExamplePublication" FOR ALL TABLES
WITH (publish = 'insert, update, delete, truncate');
-- or you can specify tables
CREATE PUBLICATION "ExamplePublication" FOR TABLE public.users, schemaname.tablename, etc.etc
WITH (publish = 'insert, update, delete, truncate');
Important to specify proper REPLICA IDENTITY
for each table (more examples in ./demo/schema.sql). Unless Elasticsearch and Postgres PK is equal and optional routing field is part of PK or not defined.
ALTER TABLE "table_name" REPLICA IDENTITY FULL;
All tables included in publication would be indexed by default. However you can overwrite this behavior.
Special docType
(in future configurable) field is attached to each document.
Roadmap
- Arrays and Custom types (E.G. Enums support)
- Basic Arrays (like
Text[]
) - Enums
- Enum arrays
- Composite
-
Composite arrays(Too complex; PRs are welcome)
- Basic Arrays (like
- Composite PK
- Better logging
- Reindex when slot is lost
- HealthCheck & Readiness
- Metrics (prometheus)
- REST API with basic actions
- Tests
- Improve auto discovery, for inlines/join relations
How it works
1. Descovery & initial config
There is sql query, to fetch all tables, columns and their comments for all tables within Publication. With this information it's possible to build config, with tables, columns, schemas, parsing awailable table comments.
1.1 Slot is (re) created
2. Cold start (optional)
Having tables and columns (only names) that needs to be indexed, we can make a SELECT * FROM ...
query, and use it's result, to determine column type. There is switch/case
mapping for all default PG type. So we can decode values.
Also, during this step, column names got their json representation "pre-marshaled". Having thet we can iterate over result, decoding each row and generating JSON for ES. Marhaling only values. (most of them are json.Marshaller
s). Those chunks of json are send to chanel.
3. Subscribtion
After cold start, we have current PostgreSQL log position, so we can subscribe only for new events, even if slot is lagging behind.
Use logical replication subscribtion without specifying a position, will use last commited position as stating point. So, in case of crashes, network outage, or overloaded search we are safe. Script always start from a last checkpoint.
Inlining (1:N sub documents)
This is done using scripted update of parent document. Does not require any effort from Postgres, while slightly increasing a load on elasticsearch.
WARNING: N:M relations are not supported. And 1:1 do not make any sense in this context.
In order to put one table as list of objects into another doc, you need to have parent key
, which is usualy foreign key. Imagine we have structire like thise.
So, having two tables with 1-N relationship, like
Parent | Child | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
here:
index:",all"
comment on Users table means index all fields, withdocumentType={tablename}
inline:"comments_inl,comments"
on Users table would inlinecomments_inl
objects intocomments
field.index:"-"
on a Users.phone would not index this field, regardlessindex:",all"
on a table.index:"-"
on Comments table, would not index this table entierly.inline:"comments_inl"
on Comments field id would include this field incomments_inl
inline, preserving field name. And since this column is PK, it would also be used as PK for inline.inline:"comments_inl,parent"
onuser_id
would specify PK of parrent document. So we can append comment into User.comments.inline:"comments_inl,content"
ontext
field id would include this field incomments_inl
inline under newcontent
name.
and this would produce documents like
"_source": {
"docType": "users",
"id": 1631208,
"name": "Rick Sanchez",
"email": "[email protected]",
"comments": [{
"id": 25285149,
"content": "Lorem Ipsum ...",
"user_id": 1631208
},{
"id": 25285753,
"content": "... dolore eu fugiat nulla pariatur...",
"user_id": 1631208
}]
}
Env Config
Variable | Default | Description |
---|---|---|
PG_SLOT | pg2es | replication slot name |
PG_PUBLICATION | search | publication name |
PGHOST | localhost | |
PGPORT | 5432 | |
PGDATABASE | - | |
PGUSER | - | |
PGPASSWORD | - | |
SEARCH_HOST | - | URL or host of ElasticSearch/OpenSearch |
SEARCH_USERNAME | - | optional |
SEARCH_PASSWORD | - | optional |
SEARCH_BULK_SIZE | 4 | (MB) Bulk request size limit. |
SEARCH_PUSH_INTERVAL | 30s | idle push interval, when there is no enough rows for full bulk request. |
LOG_FORMAT | json | json or cli |
LOG_LEVEL | warn | from debug to fatal |
Possible configuration tags:
Table
index
:- docType (
-
means do not index) - Opts:
all
(deprecated) index all fields in this table
- docType (
inline
:- inline name, that will be injected as a field
- field name
- (optional) script name for adding
- (optional) script name for removal
join
:- field name
- (optional) type name (by default docType is used)
Field
index
:inline
:- inline name, where this field should be used
- rename (within inline) NOT IMPLEMENTED
- Opts:
pk
inline PKparent
parent_id
- (optional)
routing
routing value of PARENT document. (in order to find and update it)
join
:- Opts:
parent
field used as parent valuename
field is used as join type (for polymorphism)
- Opts:
Notes
- The script is single threaded (not a bottleneck)... Separate goroutine is used to make ES requests and for periodical logs.
- Links between Database <-> Schema <-> Table <-> Column, shoudld be considered read only, and safe for multithread use... (not yet)
- It's fast. All the the efforts shuld be towards readability, reliability and functionality.
Known Limitations:
- PK/ID/Routing field can not be updated. (yet; undefined behavior)
- Inline can not change parent
- No composite PKs (yet)
- No 1:1 inlines (yet)
- Delete document deletes all inlines (AKA DELETE CASCADE), and they can not be restored.
Update of already deleted document after restart leads to error.
If, after restart uncommited part of WAL contains deletion, SR will try to delete non-existing documents from ES, which would result in error. This error needs to be properly ignored and do not lead to further restarts.
COPY freezes after large dataset
Steps to reproduce:
Probably some kind of timeout on SR or PG side.
Group indice table by relation & attribute
previous query was showing duplicated rows if fields is contained in more than one index, resulting in
Expected: | col | pk | repl | |-----|--------|--------| | foo | true | true | | bar | false | true |
Received: | col | pk | repl | |-----|--------|--------| | foo | true | false | | foo | false | true | | foo | false | false | | bar | false | true |
Fixed by grouping in sub-query
Proper usage of slot snapshot.
For consistency purpose it's not enough to just select all data after slot creation. Either those actions should be in one transaction, or exported snapshot should be explicitly specified in another transaction...
Preferred way would be usage of
COPY
command, instead ofSELECT
, since it's available in simple protocol, and thus can be used within a transaction. Also, this would improve the speed.Improve recreation logic, when document _id changed
If document key's changed, and old keys exist in WAL, document will be recreated. Inlined document can be moved across parent documents (scripted
del
+ and scriptedadd
)both check use byte comparison of defined fields, without actual decoding.
Bulk debounce
If after a while we receive a few records at once, bulk will immediately send first once, while next messages would wait until next bulk request.
Appropriate debounce period, would help in these cases.
either debounce after each message, until bulk is full. or debounce after idle.
Add basic prometheus metrics
close #4 Additionally:
Broken discovery
Steps to reproduce:
Expected: | col | pk | repl | |-----|--------|--------| | foo | true | true | | bar | false | true |
Received: | col | pk | repl | |-----|--------|--------| | foo | true | false | | foo | false | true | | foo | false | false | | foo | false | false | | bar | false | true |
Feat/ci tests
Enable PR workflow with consistency test from
./demo/
Sleep time 60s is due to slow GH runtime (having both postgres and elastic running there), will be changed later (require additional APIs from SearchReplica)Split config discovery and configuration logic
Make configuration pluggable and implement:
Optional:
Each config applies changes on top of previous one in same order as passed to client.
E.G:
changelog, release tags and tagged images.
In order to improve stability, any major change should be semantically versioned. All versions of code should have separate docker images.
Build image on tag push. Rebuild and overwrite image when tag is moved to different commit.
Tests (Thread)
SearchReplica require extensive testing, for each possible edge case and any possible load.
This thread would be used for track and discussion of ongoing test progress.
Useless parent_id within inline document.
Having this value (which is parent
_id
) make no sense. It should be possible to skip/remove this field from inlined json.simplified E.G:
Technically there is no need for this.
Composite PKs
Restructure column magic to allow composite PK. This would probably require additional config tags for Key order specification or even template.
May include composite routing keys etc...