Вступление

Получение управляемых событиями уведомлений об изменениях (EDCN), когда данные изменяются непосредственно из БД, без необходимости опроса для получения обновлений — очень эффективная функциональность. Подобная доступна в некоторых реляционных БД, но не во всех, так как она нестандартная и не является частью спецификации SQL.

В примерах ниже она выражается через реализацию интерфейса, который регистрируетсянепосредственно драйвером JDBC. Это открывает множество потенциальных вариантов применения, которые можно выразить без опроса, кода инфраструктуры для обработки изменений данных и уведомления заинтересованных сторон. Можно взаимодействовать с драйвером напрямую и слушать изменения, а когда они происходят, выполнять работу, управляемую событиями. Примеры:

  • Кэширование (подробнее об этом, когда мы рассмотрим PostgreSQL, см. Good Candidates for CQN).
  • Honeypots для таблиц БД.
  • Проблемы отладки.
  • Протоколирование изменений.
  • Аналитика и отчетность.

Но здесь есть свои последствия. Эта нестандартная функция связывает приложение напрямую с БД.

Майкл Дюргнер о примере реализации в PostgreSQL:

Недостаток заключается в том, что вы перемещаете логику приложения в СУБД. Убедитесь, что у вас есть люди с пониманием вашей СУБД, потому что обычное ПО не сможет справиться с проблемами типа непрерывной доставки. СУБД должна быть глубоко интегрирована с этим процессом.

И я с ним согласен — необходимо сохранять бизнес-логику вне БД.

Проекты, использующие инструменты объектно-реляционного отображения (ORM), такие как Java Persistence API (JPA) для генерации схемы БД непосредственно из одной или нескольких объектных моделей, теряют мобильность и простоту, когда разработчикам требуется добавлять логику на уровне БД, принадлежащую самому приложению. Им приходится использовать ту же БД для тестирования, что и при разработке, а это, в свою очередь, может привести к проблемам.

Может ли приложение функционировать без создаваемого вами ПО, которое требует работы EDCN через драйвер JBDC? Если “да” ,— прекрасно, если “нет”, то это аргумент против EDCN и необходимо подумать, прежде чем применять его.

Эта функция сама по себе не является заменой хорошо спроектированного промежуточного ПО, ориентированного на сообщения (MOM), которое предоставляет готовые решения для гарантированной доставки, сохранения сообщений, доставки хотя-бы-один-раз/строго-однократной через очереди и темы, стратегий управления потоком и решения проблем отказоустойчивости и масштабируемости. Наличие каких-то из этих пунктов показывает, что EDCN-подход нуждается в пересмотре.

Рассмотрим функциональность в том виде, в котором она существует в PostgreSQL, Oracle и H2, и прокомментируем MySQL и его форк MariaDB.

Работаем с Java 13.0.2 и Groovy 3.0.4, также есть ссылки на скрипты на GitHub, которые содержат дополнительные примечания, касающиеся настройки необходимых зависимостей и других требований для примеров.

PostgreSQL

API Postgres включает в себя интерфейс PGNotificationListener, который нужно реализовать и затем зарегистрировать при соединении с БД. Есть две доступные реализации: [по умолчанию] драйвер Postgres и драйвер Impossibl JDBC. Драйвер Postgres будет опрашивать БД на предмет изменений  —  не будем его использовать. Полагаемся на реализацию Impossibl, которая обеспечивает настоящие уведомления, управляемые событиями.

Эрик Брандсберг (Heimdall Data):

“Интерфейс PG notify  —  это один из скрытых алмазов в PG. Мы применяем его для доставки сообщений о недействительности кэша между прокси-серверами вместо отдельного интерфейса pub/sub, как в Redis.”

Heimdall Data предоставляет сложное решение для кэширования приложений, использующих Amazon Relational Database Service (Amazon RDS) и других БД, а это показывает важность такой функции.

Ниже скрипт триггера и функции должен быть выполнен внутри Postgres в качестве предварительного условия для запуска скрипта Groovy. Функция notify_change будет отправлять события всем зарегистрированным слушателям, которые работают с каналом examplechannel (предупреждение: имена каналов чувствительны к регистру).

CREATE OR REPLACE FUNCTION notify_change() RETURNS TRIGGER AS $$
  BEGIN
--   Внимание: регистр здесь очень важен! Если мы используем "exampleChannel", PG преобразует его в
-- --examplechannel и никаких событий получено не будет!!
--
-- Обновление: [нужно проверить] Регистр можно обработать в --PostgreSQL с помощью двойных кавычек.
--
-- Теоретически, если бы у вас была следующая строка в качестве слушателя, она работала бы в camelCase.
--
-- statement.execute('LISTEN "exampleChannel"');
--
-- То же самое относится к любому идентификатору в Postgres.
--
    PERFORM pg_notify('examplechannel', NEW.phrase);
    RETURN NEW;
  END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER table_change
  AFTER INSERT OR UPDATE OR DELETE ON example
  FOR EACH ROW EXECUTE PROCEDURE notify_change();

Рабочий пример реализации com.impossibl.postgres.api.jdbc. PGNotificationListener с использованием PostgreSQL включен далее. Интерфейс PGNotificationListener требует, чтобы разработчик реализовал один метод:

void notification(int processId, String channelName, String payload)

Это можно увидеть на строке 18:

@Grapes(
    @Grab(group='com.impossibl.pgjdbc-ng', module='pgjdbc-ng', version='0.8.4')
)
import com.impossibl.postgres.api.jdbc.PGConnection
import com.impossibl.postgres.api.jdbc.PGNotificationListener
import com.impossibl.postgres.jdbc.PGDataSource
 
PGDataSource dataSource = new PGDataSource();
dataSource.setHost("0.0.0.0")
dataSource.setPort(5432)
dataSource.setDatabaseName("testdb")
dataSource.setUser("postgres")
dataSource.setPassword("password")

final def pgNotificationListener = new PGNotificationListener () { 

  @Override
  public void notification(int processId, String channelName, String payload) {
    println("processId $processId, channelName: $channelName, payload: $payload")
  }
}

final def connection = (PGConnection) dataSource.getConnection()

connection.addNotificationListener(pgNotificationListener)

final def statement = connection.createStatement()

statement.execute("LISTEN examplechannel")
statement.close()

def time = 60 * 60 * 1000

println "Will sleep for $time milliseconds..."

try {
  Thread.sleep (time)
} catch (Throwable thrown) {
  thrown.printStackTrace (System.err)
} finally {
  connection.close ()
}

print "...done!"

Пример выполнения вместе с объяснением и выводом в GroovyConsole:

Объяснение примера PostgreSQL, запущенного в консоли Groovy.

Oracle

Следующий пример будет посвящен Oracle. Ниже шаги, необходимые для настройки уведомлений об изменениях, управляемых событиями, с помощью драйвера JDBC, а также условия для примера.

Важно, чтобы Docker работал на другой машине, которая в данном случае использует Ubuntu. Детальное предупреждение о локальном запуске Oracle в Docker в скрипте DatabaseChangeListenerInOracleDatabaseExample.groovy.

# В этом примере Docker работает на другой машине, поэтому предположим, что я подключился к нему по ssh и
# запускаю следующее:
docker run -d -p 1521:1521 oracleinanutshell/oracle-xe-11g
docker exec -it [container id] /bin/sh
su
#
# Username: system, password: oracle
#
/u01/app/oracle/product/11.2.0/xe/bin/sqlplus

В SQL*Plus можно запустить скрипт конфигурации. После создания примера таблицы (см. строку 8) в следующем разделе можно запустить скрипт Groovy, и любые операции вставки, обновления или удаления в целевой таблице приведут к тому, что событие будет отправлено в скрипт Groovy, а затем на вывод.

--
-- Это необходимо, иначе уведомления не будут отправляться драйверу JDBC.
--
grant change notification to system;

commit;

CREATE TABLE example(
  example_id NUMBER(10) PRIMARY KEY,
  phrase VARCHAR2(120) NOT NULL
);

commit;

insert into example values (1, 'one');
insert into example values (2, 'two');
insert into example values (3, 'three');
insert into example values (4, 'four');
insert into example values (5, 'five');
commit;

--
-- Затем, когда работает DatabaseChangeListenerInOracleDatabaseExample.groovy
-- выполните следующее и обновление появится в консоли Groovy:
--
update example set phrase = 'one / 1' where example_id = 1;

Пример полного скрипта DatabaseChangeListenerInOracleDatabaseExample.groovy. Разработчик должен реализовать один метод:

void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent)

Можно увидеть эту реализацию в строке 55:

@GrabConfig(systemClassLoader=true)

//
// https://docs.oracle.com/cd/E11882_01/appdev.112/e13995/index.html?oracle/jdbc/dcn/DatabaseChangeRegistration.html
//
// https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc6
@Grapes(
    @Grab(group='com.oracle.database.jdbc', module='ojdbc6', version='11.2.0.4')
)
import oracle.jdbc.dcn.DatabaseChangeListener
import oracle.jdbc.dcn.DatabaseChangeEvent
import oracle.jdbc.driver.OracleConnection
import oracle.jdbc.dcn.DatabaseChangeRegistration
import oracle.jdbc.OracleStatement

import java.sql.DriverManager

import java.util.Properties

// Обратите внимание, что драйвер thin поддерживает этот пример.
//
//
// СМ. ПРЕДУПРЕЖДЕНИЕ НИЖЕ О ЗАПУСКЕ ЭТОГО СКРИПТА НА LOCALHOST С БД ORACLE В DOCKER, А ТАКЖЕ НА LOCALHOST.
final def connection = DriverManager.getConnection("jdbc:oracle:thin:@192.168.1.232:1521:xe", "system", "oracle")

def databaseProperties = new Properties ()

/*
* [6] Если тип уведомления-OCN, любая транзакция DML, изменяющая один или несколько зарегистрированных объектов, генерирует
* одно уведомление для каждого объекта, когда он фиксируется.
*
* Если тип уведомления-QRCN, любая транзакция DML, которая изменяет результат одной или нескольких зарегистрированных запросов генерируют уведомление при фиксации. Уведомление включает в себя идентификаторы запросов, чьи результаты изменились.
* 
*
* Для любого типа, уведомление включает в себя:
*
* Название каждой измененной таблицы
*
* Тип операции (INSERT, UPDATE, или DELETE)
*
* ROWID каждой измененной строки, если регистрация была создана с параметром ROWID и количеством измененных строк было не слишком большим.
* Дополнительные сведения см. В разделе параметр ROWID."
*/
databaseProperties.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true")
databaseProperties.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true")

final def databaseChangeRegistration = connection.registerDatabaseChangeNotification(databaseProperties)

public class ExampleDatabaseChangeListener implements DatabaseChangeListener {

    @Override
    public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent) {
        println ("***** databaseChangeEvent: $databaseChangeEvent")
        println ("***** databaseChangeEvent.source: ${databaseChangeEvent.source}")
        println ("***** databaseChangeEvent.queryChangeDescription: ${databaseChangeEvent.queryChangeDescription}")
        println ("***** databaseChangeEvent.tableChangeDescription: ${databaseChangeEvent.tableChangeDescription.each {println '\n  - nextTableChangeDescription: $it' } }")
    }
}

databaseChangeRegistration.addListener(new ExampleDatabaseChangeListener ())

final def statement = connection.createStatement()

statement.setDatabaseChangeRegistration(databaseChangeRegistration)

try {

  resultSet = statement.executeQuery("select * from example")

  while (resultSet.next())
    {} // println "resultSet.phrase: ${resultSet.getString('phrase')}"

} catch (Throwable thrown) {
  thrown.printStackTrace (System.err)
}

println "databaseChangeRegistration.userName: ${databaseChangeRegistration.userName}"

databaseChangeRegistration.tables.each {
    println "tables: $it"
}

final def time = 60 * 60 * 1000

println "Will sleep for $time milliseconds..."

try {
  Thread.sleep (time)
} catch (Throwable thrown) {
  thrown.printStackTrace (System.err)
} finally {
  statement.close ()
  connection.close ()
}

println "...done!"

/ * Предупреждение: я копирую нижеприведенное сообщение, потому что при запуске Oracle в Docker это важно, а затем на localhost. Оно применяется только в том случае, если вы запускаете DOCKER на той же машине, что и этот СКРИПТ! В данный момент я запускаю Docker с Oracle на совершенно другой машине.
*
* Обратите внимание, что я не смог запустить их на той же машине, используя:
*
* docker run -d -p 1521:1521 -p [47632:47632] oracleinanutshell/oracle-xe-11g
*
* ИСТОЧНИК:
*
* https://stackoverflow.com/questions/26003506/databasechangeregistration-in-remote-server
*
* "Вы можете проверить активные прослушиватели в Oracle, выполнив запрос:
*
* Select * FROM USER_CHANGE_NOTIFICATION_REGS
* Если запрос не возвращает никаких строк, вероятно, сервер БД не может получить доступ к порту прослушивателя драйвера jdbc.
*
* По умолчанию драйвер Oracle JDBC прослушивает уведомления на порту 47632. Вам нужно будет убедиться, что к этому порту можно подключиться с сервера БД. Возможно, потребуется добавить правило в брандмауэр, чтобы принимать входящие запросы на этот порт.
*
* Этот порт можно изменить с помощью NTF_LOCAL_TCP_PORT:
*
* prop.setProperty(OracleConnection.NTF_LOCAL_TCP_PORT, "15000");"
*
*/

О том, что делает каждый шаг: 

Пример скрипта, относящегося к Oracle, включая примечания, объясняющие выходные данные.

Показано, что когда мы выполняем пять операций вставки подряд и фиксируем изменения, генерируется только одно событие, которое включает в себя эти операции. События генерируютсятолько тогда, когда коммит выполняется успешно.

SQL*Plus: генерируется событие, которое включает в себя эти операции вставки.

Н2

Это открытая, легкая и мощная реляционная БД, написанная на Java. Она поддерживает много функций и поставляется в виде одного jar-файла размером 2,2 Мб. H2 часто используется при тестировании Java-приложений и хорошо работает в качестве встроенной БД, а также может использоваться с инструментами объектно-реляционного отображения, такими как Java Persistence API (JPA). H2 встроен в сервер приложений JBoss Wildfly (JBoss) и давно используется в JBoss в качестве встроенной БД.

H2 доставляет уведомления о событиях через интерфейс org.h2.api.DatabaseEventListener. DatabaseEventListener предлагает ограниченную функциональность по сравнению со прослушивателями Postgres и Oracle, рассмотренными ранее. Методы, необходимые для реализации интерфейса:

void closingDatabase ()
void exceptionThrown(SQLException sqlException, String sql)
void init (String url)
void opened ()
void setProgress (String state, String name, int x, int max)

Пример реализации org.h2.api.DatabaseEventListener, применяющий H2, можно найти на GitHub,а также он описан ниже.

В примере H2 работает во встроенном режиме  —  то есть полностью в памяти на той же виртуальной машине со сценарием Groovy.

@GrabConfig(systemClassLoader=true)

@Grapes(
  @Grab(group="com.h2database", module="h2", version="1.4.200")
)
import org.h2.api.DatabaseEventListener
import java.sql.SQLException
import java.sql.DriverManager

public class ExampleDatabaseEventListener implements DatabaseEventListener {

  public void closingDatabase () {
    println "closingDatabase: method invoked."
  }

  public void exceptionThrown (SQLException sqle, String sql) {
    println "exceptionThrown: method invoked; sqle: $sqle, sql: $sql"
  }

  public void init (String url) {
    println "init: method invoked; url: $url"
  }

  public void opened () {
    println "opened: method invoked."
  }

  public void setProgress (int state, String name, int x, int max) {
    println "setProgress: method invoked; state: $state, name: $name, x: $x, max: $max"
  }
}

//
// Прослушиватель событий был добавлен в качестве параметра в URL-адрес подключения ниже.
//
def cxn = DriverManager.getConnection("jdbc:h2:mem:EventListenerInH2DatabaseExampleDB;DB_CLOSE_DELAY=-1;DATABASE_EVENT_LISTENER='ExampleDatabaseEventListener';")
def stm = cxn.createStatement()
def resultSet = stm.executeQuery("SELECT 1+1")

if (resultSet.next()) {
  println("next: ${resultSet.getInt(1)}")
}

cxn.close ()

println "...Done!"

Пример выполнения вместе с объяснением и выводом из GroovyConsole:

Пример H2 DatabaseEventListener, работающего в Groovy.

DatabaseEventListener H2 не предлагает аналогичную функциональность, как в PostgreSQL. Вот запрос на новую функцию в репозиторий БД H2 на GitHub.

MySQL/MariaDB

Управляемые событиями уведомления об изменениях через драйвер JDBC не поддерживаются ни MySQL, ни MariaDB, поэтому придется подумать об альтернативах.

Не будем рассматривать триггеры и пользовательские функции (UDF), поскольку они относятся к MySQL и MariaDB для вызова конечной точки веб-службы, которая и является одной из альтернатив. Их можно использовать, однако потенциально это будет иметь последствия для безопасности и производительности.

Вывод

Хотя EDCN предлагает мощные функции в поддерживаемых реляционных БД, у этого метода есть цена, описанная в рабочих примерах, о которой надо подумать.

Читайте также:

Читайте нас в Telegram, VK и Яндекс.Дзен


Перевод статьи Thomas P. Fuller: Hidden Gems: Event-Driven Change Notifications in Relational Databases

Предыдущая статьяОт HTTP до HTTP 3  -  интернета будущего
Следующая статьяSpineNet: нетрадиционная архитектура backbone-сети от Google Brain