创建 Kafka 通知程序

通过以下过程创建 Kafka 通知程序。通知程序至少定义:

  • 一个独特的名称。

  • 一种消息协议。向 Kafka 发送消息时,它是 kafka://

  • 要与之通信的服务器。对于 Kafka,它是 Kafka 代理的地址和端口号。

  • 最大消息缓冲区大小。如果要通过通知程序发送的消息队列超过此限制,则丢弃消息。

使用 CREATE NOTIFIER 创建通知程序。以下示例创建一个名为 load_progress_notifier 的通知程序,该通知程序通过运行于端口 9092 上的 kafka01.example.com 的 Kafka 代理发送消息:

=> CREATE NOTIFIER load_progress_notifier
    ACTION 'kafka://kafka01.example.com:9092'
    MAXMEMORYSIZE '10M';

虽然不是必需项,但最佳实践是创建使用加密连接的通知程序。以下示例创建一个使用加密连接的通知程序,并使用提供的 CA 捆绑包验证 Kafka 服务器的证书:

=> CREATE NOTIFIER encrypted_notifier
    ACTION 'kafka://127.0.0.1:9092'
    MAXMEMORYSIZE '10M'
    TLSMODE 'verify-ca'
    CA BUNDLE ca_bundle;

按照此步骤,为使用 SASL_SSL 的 Kafka 端点创建或更改通知程序。请注意,每当您更改给定通知程序的 TLSMODE、证书或 CA 捆绑包时,都必须重复此步骤。

  1. 在设置 TLSMODE、证书和 CA 捆绑包时,使用 CREATE 或 ALTER 以禁用通知程序。

    => ALTER NOTIFIER encrypted_notifier
        DISABLE
        TLSMODE 'verify-ca'
        CA BUNDLE ca_bundle2;
    
  2. 更改通知程序并为 SASL_SSL 设置适合的 rdkafka 适配器参数。

    => ALTER NOTIFIER encrypted_notifier PARAMETERS
      'sasl.username=user;sasl.password=password;sasl.mechanism=PLAIN;security.protocol=SASL_SSL';
    
  3. 启用通知程序。

    => ALTER NOTIFIER encrypted_notifier ENABLE;