这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

使用通知程序生成 Kafka 消息

可以使用通知程序帮助您监控 Vertica 数据库,使用第三方 Kafka 感知工具向 Kafka 主题生成消息。可以直接发布消息(例如,通过 SQL 脚本指示长时间运行的查询已完成)。通知程序还可以在数据收集器表中的组件更新时自动发送消息。

1 - 创建 Kafka 通知程序

通过以下过程创建 Kafka 通知程序。通知程序至少定义:

  • 一个独特的名称。

  • 一种消息协议。向 Kafka 发送消息时,它是 kafka://

  • 要与之通信的服务器。对于 Kafka,它是 Kafka 代理的地址和端口号。

  • 最大消息缓冲区大小。如果要通过通知程序发送的消息队列超过此限制,则丢弃消息。

使用 CREATE NOTIFIER 创建通知程序。以下示例创建一个名为 load_progress_notifier 的通知程序,该通知程序通过运行于端口 9092 上的 kafka01.example.com 的 Kafka 代理发送消息:

=> CREATE NOTIFIER load_progress_notifier
    ACTION 'kafka://kafka01.example.com:9092'
    MAXMEMORYSIZE '10M';

虽然不是必需项,但最佳实践是创建使用加密连接的通知程序。以下示例创建一个使用加密连接的通知程序,并使用提供的 CA 捆绑包验证 Kafka 服务器的证书:

=> CREATE NOTIFIER encrypted_notifier
    ACTION 'kafka://127.0.0.1:9092'
    MAXMEMORYSIZE '10M'
    TLSMODE 'verify-ca'
    CA BUNDLE ca_bundle;

按照此步骤,为使用 SASL_SSL 的 Kafka 端点创建或更改通知程序。请注意,每当您更改给定通知程序的 TLSMODE、证书或 CA 捆绑包时,都必须重复此步骤。

  1. 在设置 TLSMODE、证书和 CA 捆绑包时,使用 CREATE 或 ALTER 以禁用通知程序。

    => ALTER NOTIFIER encrypted_notifier
        DISABLE
        TLSMODE 'verify-ca'
        CA BUNDLE ca_bundle2;
    
  2. 更改通知程序并为 SASL_SSL 设置适合的 rdkafka 适配器参数。

    => ALTER NOTIFIER encrypted_notifier PARAMETERS
      'sasl.username=user;sasl.password=password;sasl.mechanism=PLAIN;security.protocol=SASL_SSL';
    
  3. 启用通知程序。

    => ALTER NOTIFIER encrypted_notifier ENABLE;
    

2 - 通过 Kafka 通知程序发送单个消息

可以使用 NOTIFY 函数通过 Kafka 通知程序发送单个消息。此功能对于向第三方报告工具报告 SQL 脚本(如 ETL 任务)的进度非常有用。

向此函数传递三个字符串值:

  • 要发送的消息。

  • 要发送消息的通知程序的名称。

  • 要接收消息的 Kafka 主题。

例如,假定要向前面创建的 load_progress_notifier 通知程序中定义的 Kafka 群集的 vertica_notifications 主题发送消息“每日加载完成 (Daily load finished)”。那么可以执行以下语句:

=> SELECT NOTIFY('Daily load finished.',
                 'load_progress_notifier',
                 'vertica_notifications');
 NOTIFY
--------
 OK
(1 row)

通知程序发送给 Kafka 的消息采用 JSON 格式。可以使用 Kafka 节点上的控制台使用者查看生成的消息。例如:

$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
                                           --from-beginning \
                                           --topic vertica_notifications \
                                           --max-messages 1

{"_db":"vmart","_schema":"v_internal","_table":"dc_notifications",
"channel":"vertica_notifications","message":"Daily load finished.",
"node_name":"v_vmart_node0001","notifier":"load_progress_notifier",
"request_id":2,"session_id":"v_vmart_node0001-463079:0x4ba6f",
"statement_id":-1,"time":"2018-06-19 09:48:42.314181-04",
"transaction_id":45035996275565458,"user_id":45035996273704962,
"user_name":"dbadmin"}

Processed a total of 1 messages

3 - 使用 Kafka 通知程序监控 DC 表

Vertica 数据收集器 (DC) 表监控许多不同的数据库功能。当 DC 组件更新时,可以让通知程序自动向 Kafka 端点发送消息。可以查询 DATA_COLLECTOR 表以获取 DC 组件列表。

使用函数 SET_DATA_COLLECTOR_NOTIFY_POLICY 配置通知程序向 Kafka 发送 DC 组件更新。

要在登录尝试失败时收到通知,您可以创建一个通知程序,使其在 DC 组件 LoginFailures 更新时发送通知。TLSMODE 'verify-ca’ 将验证服务器的证书是否由受信任的 CA 签名。

=> CREATE NOTIFIER vertica_stats ACTION 'kafka://kafka01.example.com:9092' MAXMEMORYSIZE '10M' TLSMODE 'verify-ca';
CREATE NOTIFIER
=> SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures','vertica_stats', 'vertica_notifications', true);
SET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
 SET
(1 row)

与通过 NOTIFY 函数发送的消息一样,从 DC 组件发送到 Kafka 的数据采用 JSON 格式。前面的示例导致将如下消息发送到 vertica_notifications Kafka 主题:

{"_db":"vmart","_schema":"v_internal","_table":"dc_login_failures",
"authentication_method":"Reject","client_authentication_name":"",
"client_hostname":"::1","client_label":"","client_os_user_name":"dbadmin",
"client_pid":481535,"client_version":"","database_name":"alice",
"effective_protocol":"3.8","node_name":"v_vmart_node0001",
"reason":"INVALID USER","requested_protocol":"3.8","ssl_client_fingerprint":"",
"time":"2018-06-19 14:51:22.437035-04","user_name":"alice"}

查看 DC 组件的通知策略

使用 GET_DATA_COLLECTOR_NOTIFY_POLICY 函数列出为 DC 组件设置的策略。

=> SELECT GET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures');
                   GET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------------------------------------------
 Notifiable;  Notifier: vertica_stats; Channel: vertica_notifications
(1 row)

禁用通知策略

可以调用 SET_DATA_COLLECTOR_NOTIFY_POLICY 函数并将其第四个实参设置为 FALSE,以此来禁用通知策略。以下示例禁用 LoginFailures 组件的通知策略:

=> SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures','vertica_stats', 'vertica_notifications', false);
 SET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
 SET
(1 row)

=> SELECT GET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures');
 GET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
 Not notifiable;
(1 row)