微批处理工具选项

使用 vkconfig 脚本的微批处理工具,可以配置调度程序的微批处理。

语法

vkconfig microbatch {--create | --read | --update | --delete} [--microbatch name] [other_options...]
--create

创建新的负载规范,不能与 ‑‑delete‑‑read‑‑update 一起使用。

--read
输出调度程序中定义的所有微批处理的当前设置。此输出采用 JSON 格式。不能与 ‑‑create‑‑delete‑‑update 配合使用。

可以使用 ‑‑consumer-group-id‑‑enabled‑‑load-spec‑‑microbatch‑‑rejection-schema‑‑rejection-table‑‑target-schema‑‑target-table‑‑target-columns 选项将输出限制到特定的微批处理。‑‑enabled 选项仅接受 true 或 false 值。

您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词

--update

更新现有的 Set Snippet Variable Value in Topic。不能与 --create--delete--read 配合使用。

--delete

删除 Set Snippet Variable Value in Topic。不能与 --create--read--update 配合使用。

\--microbatch name
微批处理的唯一名称,不区分大小写。--create--update--delete 需要使用此选项。
\--add-source-cluster cluster_name
要分配给您使用 --microbatch 选项指定的微批处理的群集的名称。您可以在每个命令中使用此参数一次。此外,还可以将它与 --update 一起使用,以将源添加到微批处理。您只能将来自同一群集的源添加到同一个微批处理中。需要 \--add-source
\--add-source source_name
要分配给此微批处理的源的名称。您可以在每个命令中使用此参数一次。此外,还可以将它与 --update 一起使用,以将源添加到微批处理。需要 \--add-source-cluster
\--cluster cluster_name
--offset 选项将应用于的群集的名称。仅当微批处理定义了多个群集或提供了 --source 参数时才需要。需要 --offset 选项。
\--consumer-group-id id_name

Kafka 使用者组(Vertica 将向其报告消费消息进度)的名称。设置此值以禁用向 Kafka 使用者组报告进度。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

默认值: vertica_database-name

--dump

当您将此选项与 --read 选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read 一起使用,此选项无效。

--enabled TRUE|FALSE
为 TRUE 时,允许执行微批处理。
\--load-spec loadspec_name
处理此微批处理时要使用的加载规范。
\--max-parallelism max_num_loads
为微批处理创建的同步 COPY 语句的最大数量。调度程序会将具有多个分区的单个微批处理动态拆分为具有较少分区的 max_num_loads COPY 语句。

使用此选项可以:

\--new-microbatch updated_name
更新后的微批处理名称。需要 --update 选项。
\--offset partition_offset[,...]
微批处理开始加载的源中消息的偏移量。如果使用此参数,则必须为源中的每个分区或在 --partition 选项中列出的每个分区提供偏移量值。

您可以使用此选项跳过源中的某些消息或重新加载之前读取的消息。

有关详细信息,请参阅下面的特殊起始偏移量值

\--partition partition[,...]
--offset 选项中指定的偏移量将应用于的一个或多个分区。如果您提供此选项,则 --offset 选项中指定的偏移量将应用于您指定的分区。需要 --offset 选项。
\--rejection-schema schema_name
现有的 Vertica 架构,其中包含用于存储被拒绝消息的表。
\--rejection-table table_name
用于存储被拒绝消息的现有 Vertica 表。
\--remove-source-cluster cluster_name
要从此微批处理中移除的群集的名称。 您可以在每个命令中使用此参数一次。需要 --remove-source
\--remove-source source_name
要从此微批处理中移除的源的名称。您可以在每个命令中使用此参数一次。此外,还可以将它与 --update 一起使用,以从微批处理中移除多个源。需要 --remove-source-cluster
\--source source_name
--offset 选项中的偏移量将应用于的源的名称。当微批处理定义多个源或指定 --cluster 参数时需要。需要 --offset 选项。
\--target-columns column_expression
目标表的列表达式,其中 column_expression 可以是列的逗号分隔列表或完整的表达式。

有关列表达式的描述,请参阅 COPY 语句参数

\--target-schema schema_name
与此微批处理关联的现有 Vertica 目标架构。
\--target-table table_name
与目标对应的 Vertica 表的名称。此表必须属于目标架构。
\--validation-type{ERROR|WARN|SKIP}
控制对已创建或已更新的微批处理执行的验证:
  • ERROR - 如果 vkconfig 无法验证该微批处理,则取消配置或创建。这是默认设置。

  • WARN - 验证失败时继续执行任务,但会显示警告。

  • SKIP - 不执行验证。

已重命名自 --skip-validation

请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。

stream 参数的 start_offset 部分允许您从主题分区中的特定点开始加载消息。它还接受两个特殊偏移值之一:

  • -2 告知Set Snippet Variable Value in Topic从主题分区中最早可用的消息开始加载。当您需要从 Kafka 主题的分区中尽量加载更多消息时,此值十分有用。

  • -3 告知Set Snippet Variable Value in Topic从使用者组保存的偏移量开始加载。如果使用者组没有保存的偏移量,它会从主题分区中最早可用的消息开始加载。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

示例

此示例显示如何创建微批处理 mbatch1。此微批处理会标识微批处理的架构、目标表、加载规范和源:

$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --create --microbatch mbatch1 \
                                                    --target-schema public \
                                                    --target-table BatchTarget \
                                                    --load-spec Filterspec \
                                                    --add-source SourceFeed \
                                                    --add-source-cluster StreamCluster1 \
                                                    --conf myscheduler.conf

此示例演示了如何在 weblog.conf 配置文件中定义的调度程序中列出微批处理的当前设置。

$ vkconfig microbatch --read --conf weblog.conf
{"microbatch":"weblog", "target_columns":null, "rejection_schema":null,
"rejection_table":null, "enabled":true, "consumer_group_id":null,
"load_spec":"weblog_load", "filters":null, "parser":"KafkaJSONParser",
"parser_parameters":null, "load_method":"TRICKLE", "message_max_bytes":null,
"uds_kv_parameters":null, "target_schema":"public", "target_table":"web_hits",
"source":"web_hits", "partitions":1, "src_enabled":true, "cluster":"kafka_weblog",
"hosts":"kafka01.example.com:9092,kafka02.example.com:9092"}