可以使用通知程序帮助您监控 Vertica 数据库,使用第三方 Kafka 感知工具向 Kafka 主题生成消息。可以直接发布消息(例如,通过 SQL 脚本指示长时间运行的查询已完成)。通知程序还可以在数据收集器表中的组件更新时自动发送消息。
使用通知程序生成 Kafka 消息
1 - 创建 Kafka 通知程序
通过以下过程创建 Kafka 通知程序。通知程序至少定义:
-
一个独特的名称。
-
一种消息协议。向 Kafka 发送消息时,它是
kafka://
。 -
要与之通信的服务器。对于 Kafka,它是 Kafka 代理的地址和端口号。
-
最大消息缓冲区大小。如果要通过通知程序发送的消息队列超过此限制,则丢弃消息。
使用 CREATE NOTIFIER 创建通知程序。以下示例创建一个名为 load_progress_notifier
的通知程序,该通知程序通过运行于端口 9092 上的 kafka01.example.com 的 Kafka 代理发送消息:
=> CREATE NOTIFIER load_progress_notifier
ACTION 'kafka://kafka01.example.com:9092'
MAXMEMORYSIZE '10M';
虽然不是必需项,但最佳实践是创建使用加密连接的通知程序。以下示例创建一个使用加密连接的通知程序,并使用提供的 CA 捆绑包验证 Kafka 服务器的证书:
=> CREATE NOTIFIER encrypted_notifier
ACTION 'kafka://127.0.0.1:9092'
MAXMEMORYSIZE '10M'
TLSMODE 'verify-ca'
CA BUNDLE ca_bundle;
按照此步骤,为使用 SASL_SSL 的 Kafka 端点创建或更改通知程序。请注意,每当您更改给定通知程序的 TLSMODE、证书或 CA 捆绑包时,都必须重复此步骤。
-
在设置 TLSMODE、证书和 CA 捆绑包时,使用 CREATE 或 ALTER 以禁用通知程序。
=> ALTER NOTIFIER encrypted_notifier DISABLE TLSMODE 'verify-ca' CA BUNDLE ca_bundle2;
-
更改通知程序并为 SASL_SSL 设置适合的 rdkafka 适配器参数。
=> ALTER NOTIFIER encrypted_notifier PARAMETERS 'sasl.username=user;sasl.password=password;sasl.mechanism=PLAIN;security.protocol=SASL_SSL';
-
启用通知程序。
=> ALTER NOTIFIER encrypted_notifier ENABLE;
2 - 通过 Kafka 通知程序发送单个消息
可以使用 NOTIFY 函数通过 Kafka 通知程序发送单个消息。此功能对于向第三方报告工具报告 SQL 脚本(如 ETL 任务)的进度非常有用。
向此函数传递三个字符串值:
-
要发送的消息。
-
要发送消息的通知程序的名称。
-
要接收消息的 Kafka 主题。
例如,假定要向前面创建的 load_progress_notifier 通知程序中定义的 Kafka 群集的 vertica_notifications 主题发送消息“每日加载完成 (Daily load finished)”。那么可以执行以下语句:
=> SELECT NOTIFY('Daily load finished.',
'load_progress_notifier',
'vertica_notifications');
NOTIFY
--------
OK
(1 row)
通知程序发送给 Kafka 的消息采用 JSON 格式。可以使用 Kafka 节点上的控制台使用者查看生成的消息。例如:
$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--from-beginning \
--topic vertica_notifications \
--max-messages 1
{"_db":"vmart","_schema":"v_internal","_table":"dc_notifications",
"channel":"vertica_notifications","message":"Daily load finished.",
"node_name":"v_vmart_node0001","notifier":"load_progress_notifier",
"request_id":2,"session_id":"v_vmart_node0001-463079:0x4ba6f",
"statement_id":-1,"time":"2018-06-19 09:48:42.314181-04",
"transaction_id":45035996275565458,"user_id":45035996273704962,
"user_name":"dbadmin"}
Processed a total of 1 messages
3 - 使用 Kafka 通知程序监控 DC 表
Vertica 数据收集器 (DC) 表监控许多不同的数据库功能。当 DC 组件更新时,可以让通知程序自动向 Kafka 端点发送消息。可以查询 DATA_COLLECTOR 表以获取 DC 组件列表。
使用函数 SET_DATA_COLLECTOR_NOTIFY_POLICY 配置通知程序向 Kafka 发送 DC 组件更新。
要在登录尝试失败时收到通知,您可以创建一个通知程序,使其在 DC 组件 LoginFailures
更新时发送通知。TLSMODE
'verify-ca’ 将验证服务器的证书是否由受信任的 CA 签名。
=> CREATE NOTIFIER vertica_stats ACTION 'kafka://kafka01.example.com:9092' MAXMEMORYSIZE '10M' TLSMODE 'verify-ca';
CREATE NOTIFIER
=> SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures','vertica_stats', 'vertica_notifications', true);
SET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
SET
(1 row)
与通过 NOTIFY 函数发送的消息一样,从 DC 组件发送到 Kafka 的数据采用 JSON 格式。前面的示例导致将如下消息发送到 vertica_notifications Kafka 主题:
{"_db":"vmart","_schema":"v_internal","_table":"dc_login_failures",
"authentication_method":"Reject","client_authentication_name":"",
"client_hostname":"::1","client_label":"","client_os_user_name":"dbadmin",
"client_pid":481535,"client_version":"","database_name":"alice",
"effective_protocol":"3.8","node_name":"v_vmart_node0001",
"reason":"INVALID USER","requested_protocol":"3.8","ssl_client_fingerprint":"",
"time":"2018-06-19 14:51:22.437035-04","user_name":"alice"}
查看 DC 组件的通知策略
使用 GET_DATA_COLLECTOR_NOTIFY_POLICY 函数列出为 DC 组件设置的策略。
=> SELECT GET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures');
GET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------------------------------------------
Notifiable; Notifier: vertica_stats; Channel: vertica_notifications
(1 row)
禁用通知策略
可以调用 SET_DATA_COLLECTOR_NOTIFY_POLICY 函数并将其第四个实参设置为 FALSE,以此来禁用通知策略。以下示例禁用 LoginFailures 组件的通知策略:
=> SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures','vertica_stats', 'vertica_notifications', false);
SET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
SET
(1 row)
=> SELECT GET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures');
GET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
Not notifiable;
(1 row)