Streaming Data changes from Postgres to Elasticsearch

Postgres logical replication enables the streaming of the changes in the write-ahead log (WAL). This functionality uses Logical Decoding to transform the write-ahead log (WAL) into a format that can be consumed by external applications. This is further extended via a collection of plugins:

In this article, we will setup logical replication to stream changes from Postgres to Elasticsearch. We will use the wal2json plugin to output JSON documents for each change in Postgres WAL.

Toplogy

The diagram below illustrates the different components of our cluster:

We will use a separate container for each service. We will mount directories on the host machine as volumes in Postgres to make the WAL available from the host machine. This is simply for convenience of WAL processing.

Debezium toplogy

Build Docker image for Postgres with wal2json

The wal2json plugin is not shipped with Postgres, we need to install it manually. The following Dockerfile Dockerfile-postgres uses Postgres base image and then setup wal2json:

FROM postgres:16

RUN apt update && apt install -y postgresql-16-wal2json postgresql-contrib

Setup services with Docker-compose

The following Docker-compose file docker-compose.yaml setup the topology by building the Postgres image on the fly and create the container, as well as configures Elasticsearch

with Docker Compose using the following docker-compose.yaml file:

version: '3.7'
services:
  db:
    container_name: db
    build:
      context: .
      dockerfile: Dockerfile-postgres
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=inventory
    ports:
      - '6432:5432'
    volumes: 
      - ./data:/var/lib/postgresql/data/
      - ./stream:/stream
  es:
    container_name: es
    image: docker.elastic.co/elasticsearch/elasticsearch:7.3.0
    ports:
     - "9200:9200"
    environment:
     - http.host=0.0.0.0
     - transport.host=127.0.0.1
     - xpack.security.enabled=false
     - "ES_JAVA_OPTS=-Xms512m -Xmx512m"

Now let’s start the topology with docker-compose up -d:

$ docker-compose up -d   

WARNING: The Docker Engine you're using is running in swarm mode.

Compose does not use swarm mode to deploy services to multiple nodes in a swarm. All containers will be scheduled on the current node.

To deploy your application across the swarm, use `docker stack deploy`.

Creating network "pg-wal2json_default" with the default driver
Building postgres
Step 1/2 : FROM postgres:16
 ---> 2490d47edbe0
Step 2/2 : RUN apt update && apt install -y postgresql-16-wal2json postgresql-contrib
 ---> Using cache
 ---> 33d5e697f329

Successfully built 33d5e697f329
Successfully tagged pg-wal2json_postgres:latest

Creating db ... done

Setup Logical Decoding

To setup logical decoding so that the WAL changes are streamed in JSON, we need to update Postgres configucation file at postgresql.conf with the following mininum changes:

wal_level = logical
max_replication_slots = 1
shared_preload_libraries = 'wal2json'

We can either locate the file and edit it manually:

$ docker-compose exec db psql -U postgres -c 'SHOW config_file'

               config_file                
------------------------------------------
 /var/lib/postgresql/data/postgresql.conf
(1 row)

Alternatively, we can update these settings using SQL queries as follows:

$ docker-compose exec db psql -U postgres                      

psql (17.0 (Debian 17.0-1.pgdg120+1), server 16.4 (Debian 16.4-1.pgdg120+2))
Type "help" for help.

postgres=# show wal_level;
 wal_level 
-----------
 replica
(1 row)

postgres=# show max_replication_slots;
 max_replication_slots 
-----------------------
 10
(1 row)

postgres=# show shared_preload_libraries;
 shared_preload_libraries 
--------------------------

(1 row)

postgres=# ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM
postgres=# ALTER SYSTEM SET  shared_preload_libraries = 'wal2json';
ALTER SYSTEM
postgres=# \q

After updating the configuration file, we need to restart Postgres for the changes to take effect:

$ docker-compose restart db

Restarting db ... done

Setup Replication Slot

After configuring the Logical Decoding, next we to set up the replication slot. First, connect to SQL interpreter in Postgres

$ docker-compose exec db psql -U postgres -d inventory

Run the following SQL query to create the replication slot using wal2json

SELECT * FROM pg_create_logical_replication_slot('my_slot', 'wal2json');

This query will output something like this:

    slot_name    |    lsn    
-----------------+-----------
 my_slot         | 0/1953610
(1 row)

Now we run the following query to get more information about replication slot we just created:

SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;

You could see that the slot we just created is not yet active (see column active set to f for false):

    slot_name    |  plugin  | slot_type | database  | active | restart_lsn | confirmed_flush_lsn 
-----------------+----------+-----------+-----------+--------+-------------+---------------------
 my_slot         | wal2json | logical   | inventory | f      | 0/19535D8   | 0/1953610
(1 row)

On a separate shell activate Logical Replication slot with pg_recvlogical

$ docker-compose exec db pg_recvlogical -d inventory -U postgres --slot my_slot --start -o pretty-print=1 -f /stream/my-slot.jsonl

Going back to the SQL intererpreter and running the check query we run earlier, we do see now that the slot is active and ready for streaming changes.

inventory=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;

    slot_name    |  plugin  | slot_type | database  | active | restart_lsn | confirmed_flush_lsn 
-----------------+----------+-----------+-----------+--------+-------------+---------------------
 my_slot         | wal2json | logical   | inventory | t      | 0/19536C0   | 0/19536F8

Populate Postgres with Data

To populate Postgres with Data, we can connect to the Postgres containers and open a client shell to execute the data SQL queries:

$ docker-compose exec db psql -U postgres -d inventory
inventory=# 

The following are few example queries that can be used to populate Postgres with Data (based on inventory.sql)

CREATE SCHEMA inventory;
SET search_path TO inventory;
-- Create some customers ...
CREATE TABLE customers (
  id SERIAL NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
ALTER SEQUENCE customers_id_seq RESTART WITH 1001;
ALTER TABLE customers REPLICA IDENTITY FULL;

INSERT INTO customers
VALUES (default,'Sally','Thomas','sally.thomas@acme.com'),
       (default,'George','Bailey','gbailey@foobar.com'),
       (default,'Edward','Walker','ed@walker.com'),
       (default,'Anne','Kretchmar','annek@noanswer.org');

Let’s perform more changes to our data so we can get all types of queries (INSERT, UPDATE, DELETE) represented in the WAL.

For instance, insert a new record:

$ docker-compose exec db psql -U postgres -d inventory \
  -c "insert into inventory.customers values(default, 'John', 'Doe', 'john.doe@example.com')"

Then, update the record we just created:

$ docker-compose exec db psql -U postgres -d inventory \
  -c "update inventory.customers set first_name='Jane', last_name='Roe' where last_name='Doe'"

Finaly, delete the record

$ docker-compose exec db psql -U postgres -d inventory \
  -c "delete from inventory.customers where email='john.doe@example.com';"

Setup Elasticsearch

We need to create the Elasticsearch index where the WAL transactions will forwarded. We can use the Create Index API for this as follows:

$ curl -X PUT http://localhost:9200/customers

{"acknowledged":true,"shards_acknowledged":true,"index":"customers"}

CDC stream processing

Finaly, the last piece of the puzzle is setting up the stream processing that captures WAL changes as they are streamed from the replication slot, transform them and then insert them to Elasticsearch.

The following script defines the following:

import argparse
import json
import os
import requests
import sys

# Helper class to interact with Elasticsearch
class Elasticsearch:
  def __init__(self, base_url):
    self.base_url = base_url

  def upsert(self, index_name, document):
    headers = {'Content-Type': 'application/json'}
    url = f"{self.base_url}/{index_name}/_doc"
    try:
      response = requests.post(url, data=json.dumps(document), headers=headers)
      response.raise_for_status()
      return response.json()
    except requests.exceptions.RequestException as e:
      print(f"Error writing document to Elasticsearch: {e}")
      return None

# Process a single transaction
def process_transaction(in_obj):
  out_obj = {}
  for key in ['kind', 'schema', 'table']:
    out_obj[key] = in_obj[key]
  if in_obj['kind'] != 'delete':
    for key, value in zip(in_obj['columnnames'], in_obj['columnvalues']):
      out_obj[key] = value
  if 'oldkeys' in in_obj:
    out_obj['old'] = {}
    for key, value in zip(in_obj['oldkeys']['keynames'], in_obj['oldkeys']['keyvalues']):
      out_obj['old'][key] = value
  print(out_obj)
  return out_obj

# Process a file of change transactions
def process_input(es_url):
  es = Elasticsearch(es_url)
  for line in sys.stdin:
    txs = json.loads(line)
    if txs['change'] == []:
      continue
    for tx in txs['change']:
      result = process_transaction(tx)
      es.upsert(result['table'], result)

# Parse CLI arguments
def parse_arguments():
    parser = argparse.ArgumentParser(description="Description of your program")
    parser.add_argument('-u', '--url', default='http://localhost:9200', help='Base URL for Elasticsearch')
    return parser.parse_args()

def main():
    args = parse_arguments()
    print(f'Uploading transactions to {args.url}')
    process_input(args.url)

if __name__ == "__main__":
    main()

In a separate shell, start the stream procesing from as follows

$ cat ./stream/my-slot.jsonl | jq -c | python process.py

As changes are writing to ./stream/my-slot.jsonl by the pg_recvlogical process that we started earlier, our process.py will transform them into something that looks like the following:

{'kind': 'insert', 'schema': 'inventory', 'table': 'customers', 'id': 1001, 'first_name': 'Sally', 'last_name': 'Thomas', 'email': 'sally.thomas@acme.com'}
{'kind': 'insert', 'schema': 'inventory', 'table': 'customers', 'id': 1002, 'first_name': 'George', 'last_name': 'Bailey', 'email': 'gbailey@foobar.com'}
{'kind': 'insert', 'schema': 'inventory', 'table': 'customers', 'id': 1003, 'first_name': 'Edward', 'last_name': 'Walker', 'email': 'ed@walker.com'}
{'kind': 'insert', 'schema': 'inventory', 'table': 'customers', 'id': 1004, 'first_name': 'Anne', 'last_name': 'Kretchmar', 'email': 'annek@noanswer.org'}
{'kind': 'insert', 'schema': 'inventory', 'table': 'customers', 'id': 1005, 'first_name': 'John', 'last_name': 'Doe', 'email': 'john.doe@example.com'}
{'kind': 'update', 'schema': 'inventory', 'table': 'customers', 'id': 1005, 'first_name': 'Jane', 'last_name': 'Roe', 'email': 'john.doe@example.com', 'old': {'id': 1005, 'first_name': 'John', 'last_name': 'Doe', 'email': 'john.doe@example.com'}}
{'kind': 'delete', 'schema': 'inventory', 'table': 'customers', 'old': {'id': 1005, 'first_name': 'Jane', 'last_name': 'Roe', 'email': 'john.doe@example.com'}}

Verifying the data in Elasticsearch

We can simply verify that the WAL changes had landed in Elasticsearch by querying the customers index as follows:

curl 'http://localhost:9200/customers/_search?pretty'

We can entries like this:

      {
        "_index": "customers",
        "_type": "_doc",
        "_id": "Hl72PpIBc0uZl7MqeRNu",
        "_score": 1,
        "_source": {
          "kind": "delete",
          "schema": "inventory",
          "table": "customers",
          "old": {
            "id": 1005,
            "first_name": "Jane",
            "last_name": "Roe",
            "email": "john.doe@example.com"
          }
        }
      }

Shut down the cluster

End the application:

$ docker-compose down

Stopping db ... done
Stopping es ... done
Removing db ... done
Removing es ... done
Removing network pg-wal2json_default

That’s all folks

In this article, we saw how to configure Postgres Logical Decoding to stream WAL transactions out of Postgres in JSON format, then we created a python script to consume the changes in WAL to later insert them into Elasticsearch.

I hope you enjoyed this article, feel free to leave a comment or reach out on twitter @bachiirc.