Skip to main content

Синхронизация 2 баз Postgres

Несколько подходов к синхронизации баз данных PostgreSQL:

1. Физическая репликация (pg_basebackup)

# На мастер-сервере

pg_basebackup -h source_host -U replication_user \

    -D /path/to/backup/directory -P -v -R

2. Логическая репликация:

-- На исходной базе

CREATE PUBLICATION my_publication

FOR TABLE table1, table2, table3;

-- На целевой базе

CREATE SUBSCRIPTION my_subscription

CONNECTION 'host=source_host port=5432 dbname=source_db'

PUBLICATION my_publication;

3. Скрипт на Python с psycopg2:


import psycopg2

import logging


class DatabaseSynchronizer:

    def __init__(self, source_conn_params, target_conn_params):

        self.source_conn_params = source_conn_params

        self.target_conn_params = target_conn_params

        logging.basicConfig(level=logging.INFO)

        self.logger = logging.getLogger(__name__)

    def get_source_data(self, query):

        try:

            with psycopg2.connect(**self.source_conn_params) as conn:

                with conn.cursor() as cur:

                    cur.execute(query)

                    return cur.fetchall()

        except Exception as e:

            self.logger.error(f"Source data retrieval error: {e}")

            return None

    def sync_table(self, table_name, sync_query=None):

        try:

            # Получаем данные из источника

            source_data = self.get_source_data(sync_query or f"SELECT * FROM {table_name}")

         

            if not source_data:

                self.logger.warning(f"No data to sync for {table_name}")

                return False


            # Подключение к целевой базе

            with psycopg2.connect(**self.target_conn_params) as conn:

                with conn.cursor() as cur:

                    # Очистка существующих данных

                    cur.execute(f"DELETE FROM {table_name}")                    

                    # Вставка новых данных

                    insert_query = f"INSERT INTO {table_name} VALUES %s"

                    from psycopg2.extras import execute_values

                    execute_values(cur, insert_query, source_data)

            

                conn.commit()

                self.logger.info(f"Successfully synced {table_name}")

            return True

        

        except Exception as e:

            self.logger.error(f"Sync error for {table_name}: {e}")

            return False

    def sync_multiple_tables(self, tables):

        sync_results = {}

        for table in tables:

            sync_results[table] = self.sync_table(table)

        return sync_results


# Пример использования

source_params = {

    'dbname': 'source_db',

    'user': 'source_user',

    'password': 'source_pass',

    'host': 'source_host'

}

target_params = {

    'dbname': 'target_db',

    'user': 'target_user',

    'password': 'target_pass',

    'host': 'target_host'

}

synchronizer = DatabaseSynchronizer(source_params, target_params)

# Синхронизация всех таблиц

tables_to_sync = ['users', 'products', 'orders']

sync_results = synchronizer.sync_multiple_tables(tables_to_sync)

4. Ansible-плейбук для синхронизации:

---

- hosts: postgres_servers

  vars:

    source_db:

      host: source_host

      db: source_database

      user: source_user

      password: source_password

    target_db:

      host: target_host

      db: target_database

      user: target_user

      password: target_password

    tables_to_sync:

      - users

      - products

      - orders

  tasks:

    - name: Sync PostgreSQL Tables

      block:

        - name: Dump Source Tables

          postgresql_db:

            login_host: "{{ source_db.host }}"

            login_user: "{{ source_db.user }}"

            login_password: "{{ source_db.password }}"

            name: "{{ source_db.db }}"

            target: "/tmp/source_dump.sql"

            state: dump

            tables: "{{ tables_to_sync }}"


        - name: Restore to Target Database

          postgresql_db:

            login_host: "{{ target_db.host }}"

            login_user: "{{ target_db.user }}"

            login_password: "{{ target_db.password }}"

            name: "{{ target_db.db }}"

            target: "/tmp/source_dump.sql"

            state: restore


      rescue:

        - name: Sync Failure Notification

          mail:

            subject: "Database Sync Failed"

            body: "Synchronization between {{ source_db.host }} and {{ target_db.host }} failed"

5. Триггеры для синхронизации:

-- Создание функции триггера

CREATE OR REPLACE FUNCTION sync_table()

RETURNS TRIGGER AS $$

BEGIN

    -- Логика синхронизации

    IF TG_OP = 'INSERT' THEN

        INSERT INTO target_table SELECT NEW.*;

    ELSIF TG_OP = 'UPDATE' THEN

        UPDATE target_table SET ... WHERE ...;

    ELSIF TG_OP = 'DELETE' THEN

        DELETE FROM target_table WHERE ...;

    END IF;

    RETURN NEW;

END;

$$ LANGUAGE plpgsql;

-- Создание триггера

CREATE TRIGGER sync_trigger

AFTER INSERT OR UPDATE OR DELETE ON source_table

FOR EACH ROW EXECUTE FUNCTION sync_table();

6. Утилита pg_dump для полной синхронизации:

#!/bin/bash

# Полный дамп базы данных

pg_dump -h source_host -U source_user -d source_db > /tmp/full_backup.sql


# Восстановление на целевой базе

psql -h target_host -U target_user -d target_db -f /tmp/full_backup.sql

Рекомендации:

1. Выбирайте метод в зависимости от объема данных

2. Учитывайте согласованность данных

3. Настройте обработку ошибок

4. Используйте транзакции

5. Контролируйте нагрузку на базы данных

Ограничения:

- Физическая репликация требует одинаковых версий PostgreSQL

- Логическая репликация не поддерживает все типы данных

- Большие объемы могут вызвать производительность


Best practices:

- Используйте расписание для синхронизации

- Логируйте процесс

- Настройте мониторинг

- Предусмотрите откат изменений

- Шифруйте подключения


Инструменты для мониторинга:

- pg_stat_replication

- pg_stat_database

- pg_stat_activity