Vertica 包含可用于配置调度程序的 vkconfig 脚本。此脚本包含多个可用于设置调度程序中的多组选项的工具,以及启动和关闭调度程序的说明。在调用 vkconfig 脚本时,您可以提供想要使用的工具作为第一个实参。
此部分中的主题介绍 vkconfig 脚本中提供的每个工具及其选项。您可以将常用 vkconfig 脚本选项主题中的选项与任何实用程序配合使用。各实用程序的表中列出了该实用程序专用的选项。
Vertica 包含可用于配置调度程序的 vkconfig 脚本。此脚本包含多个可用于设置调度程序中的多组选项的工具,以及启动和关闭调度程序的说明。在调用 vkconfig 脚本时,您可以提供想要使用的工具作为第一个实参。
此部分中的主题介绍 vkconfig 脚本中提供的每个工具及其选项。您可以将常用 vkconfig 脚本选项主题中的选项与任何实用程序配合使用。各实用程序的表中列出了该实用程序专用的选项。
可以在 vkconfig 脚本中提供的不同工具中找到这些选项。
--conf
filename--config-schema
schema_name默认值:
stream_config
--dbhost
host name默认值:
localhost
--dbport
port_number默认值:
5433
--enable-ssl
--help
--jdbc-opt
option=
value
[&option2=value2...]
--jdbc-url
配合使用。--jdbc-url
url--password
password--ssl-ca-alias
alias_name--ssl-key-alias
alias_name--ssl-key-password
password--username
username默认值:
当前用户
--version
您可以使用配置文件来存储在调用 vkconfig 实用程序时使用的常用参数。配置文件是文本文件,每行包含一个选项设置,格式如下:
option=value
此外,还可以在选项文件中添加注释,方法是在注释前加上井号 (#) 前缀。
#config.properties:
username=myuser
password=mypassword
dbhost=localhost
dbport=5433
可以使用 --conf 选项告知 vkconfig 使用配置文件:
$ /opt/vertica/packages/kafka/bin/vkconfig source --update --conf config.properties
您可以从命令行覆盖所有已存储的参数:
$ /opt/vertica/packages/kafka/bin/vkconfig source --update --conf config.properties --dbhost otherVerticaHost
以下示例显示如何使用共享的实用程序选项。
显示调度程序实用程序的帮助:
$ vkconfig scheduler --help
This command configures a Scheduler, which can run and load data from configured
sources and clusters into Vertica tables. It provides options for changing the
'frame duration' (time given per set of batches to resolve), as well as the
dedicated Vertica resource pool the Scheduler will use while running.
Available Options:
PARAMETER #ARGS DESCRIPTION
conf 1 Allow the use of a properties file to associate
parameter keys and values. This file enables
command string reuse and cleaner command strings.
help 0 Outputs a help context for the given subutility.
version 0 Outputs the current Version of the scheduer.
skip-validation 0 [Depricated] Use --validation-type.
validation-type 1 Determine what happens when there are
configuration errors. Accepts: ERROR - errors
out, WARN - prints out a message and continues,
SKIP - skip running validations
dbhost 1 The Vertica database hostname that contains
metadata and configuration information. The
default value is 'localhost'.
dbport 1 The port at the hostname to connect to the
Vertica database. The default value is '5433'.
username 1 The user to connect to Vertica. The default
value is the current system user.
password 1 The password for the user connecting to Vertica.
The default value is empty.
jdbc-url 1 A JDBC URL that can override Vertica connection
parameters and provide additional JDBC options.
jdbc-opt 1 Options to add to the JDBC URL used to connect
to Vertica ('&'-separated key=value list).
Used with generated URL (i.e. not with
'--jdbc-url' set).
enable-ssl 1 Enable SSL between JDBC and Vertica and/or
Vertica and Kafka.
ssl-ca-alias 1 The alias of the root CA within the provided
truststore used when connecting between
Vertica and Kafka.
ssl-key-alias 1 The alias of the key and certificate pair
within the provided keystore used when
connecting between Vertica and Kafka.
ssl-key-password 1 The password for the key used when connecting
between Vertica and Kafka. Should be hidden
with file access (see --conf).
config-schema 1 The schema containing the configuration details
to be used, created or edited. This parameter
defines the scheduler. The default value is
'stream_config'.
create 0 Create a new instance of the supplied type.
read 0 Read an instance of the supplied type.
update 0 Update an instance of the supplied type.
delete 0 Delete an instance of the supplied type.
drop 0 Drops the specified configuration schema.
CAUTION: this command will completely delete
and remove all configuration and monitoring
data for the specified scheduler.
dump 0 Dump the config schema query string used to
answer this command in the output.
operator 1 Specifies a user designated as an operator for
the created configuration. Used with --create.
add-operator 1 Add a user designated as an operator for the
specified configuration. Used with --update.
remove-operator 1 Removes a user designated as an operator for
the specified configuration. Used with
--update.
upgrade 0 Upgrade the current scheduler configuration
schema to the current version of this
scheduler. WARNING: if upgrading between
EXCAVATOR and FRONTLOADER be aware that the
Scheduler is not backwards compatible. The
upgrade procedure will translate your kafka
model into the new stream model.
upgrade-to-schema 1 Used with upgrade: will upgrade the
configuration to a new given schema instead of
upgrading within the same schema.
fix-config 0 Attempts to fix the configuration (ex: dropped
tables) before doing any other updates. Used
with --update.
frame-duration 1 The duration of the Scheduler's frame, in
which every configured Microbatch runs. Default
is 300 seconds: '00:05:00'
resource-pool 1 The Vertica resource pool to run the Scheduler
on. Default is 'general'.
config-refresh 1 The interval of time between Scheduler
configuration refreshes. Default is 5 minutes:
'00:05'
new-source-policy 1 The policy for new Sources to be scheduled
during a frame. Options are: START, END, and
FAIR. Default is 'FAIR'.
pushback-policy 1
pushback-max-count 1
auto-sync 1 Automatically update configuration based on
metadata from the Kafka cluster
consumer-group-id 1 The Kafka consumer group id to report offsets
to.
eof-timeout-ms 1 [DEPRECATED] This option has no effect.
vkconfig 脚本的调度程序工具可用于配置将数据从 Kafka 不断加载到 Vertica 的调度程序。使用调度程序工具创建、更新或删除由 config-schema
定义的调度程序。如果未指定调度程序,则命令适用于默认 stream_config 调度程序。
vkconfig scheduler {--create | --read | --update | --drop} other_options...
--create
创建新的负载规范,不能与 ‑‑delete
、‑‑read
或 ‑‑update
一起使用。
--read
--create
、--delete
或 --update
配合使用。--update
更新现有的 Set Snippet Variable Value in Topic。不能与 --create
、--delete
或 --read
配合使用。
--drop
--add-operator
user_name--update
共享的实用程序选项。--auto-sync``{TRUE|FALSE}
--config-refresh
中指定的间隔自动同步调度程序源信息。
有关调度程序在每个间隔同步哪些内容的详细信息,请参阅使用调度程序自动使用来自 Kafka 的数据中的“验证调度程序”和“同步调度程序”部分。
默认值: TRUE
--config-refresh
HH:MM:SS --update
选项所做的更改)之前运行的时间间隔。
默认值: 00:05:00
--consumer-group-id
id_nameKafka 使用者组(Vertica 将向其报告消费消息进度)的名称。设置此值以禁用向 Kafka 使用者组报告进度。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况。
默认值:
vertica_database-name
--dump
当您将此选项与 --read
选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read
一起使用,此选项无效。
--eof-timeout-ms
number of milliseconds 有关详细信息,请参阅手动使用来自 Kafka 的数据。
默认值: 1 秒
--fix-config
--update
共享的配置选项时有效。--frame-duration
HH:MM:SS TimePerMicrobatch=(FrameDuration*Parallelism)/Microbatches
这只是粗略估计,因为有多种因素会影响每个微批处理能够运行的时间量。
如果为每个微批处理分配的时间低于 2 秒,vkconfig 实用程序会向您发出警告。您通常应该为每个微批处理分配两秒以上的时间,以便让调度程序能够加载数据流中的所有数据。
默认值: 00:05:00
--message_max_bytes
max_message_size指定 Kafka 协议批处理消息的最大大小(以字节为单位)。
默认值: 25165824
--new-source-policy``{FAIR|START|END}
FAIR:采用以前批处理中的平均时间长度,并适当地调度自己。
START:所有新源均在时间范围开始时启动。批处理获得的运行时间最短。
END:所有新源均在时间范围结束时启动。批处理获得的运行时间最长。
默认值: FAIR
--operator
username此选项会为指定用户授予对调度程序实例的所有权限以及对 libkafka 库及其所有 UDx 的 EXECUTE 权限。
授予操作员权限会让用户有权从任何群集(可从 Vertica 节点进行访问)中的任何源读取数据。
dbadmin 必须为用户授予单独的权限,使其对目标表具有写入权限。
需要使用 --create
共享的实用程序选项。创建调度程序后,使用 --add-operator
选项授予操作权限。
要撤销权限,请使用 --remove-operator
选项。
--remove-operator
user_name--update
共享的实用程序选项。--resource-pool
pool_name默认值: GENERAL 池
--upgrade
upgrade-to-schema
参数。有关详细信息,请参阅在 Vertica 升级后更新调度程序。--upgrade-to-schema
schema name--upgrade
调度程序实用程序选项。--validation-type``{ERROR|WARN|SKIP}
--skip-validation
,用于指定在调度程序上执行的验证级别。无效的 SQL 语法和其他错误可能会导致微批处理无效。Vertica 支持以下验证类型:
错误如果验证失败,则取消配置或创建。
WARN:如果验证失败,则继续执行任务,但会显示警告。
SKIP:不执行验证。
有关验证的详细信息,请参考使用调度程序自动使用来自 Kafka 的数据。
默认值: ERROR
请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。
以下示例显示如何使用调度程序实用程序选项。
授予用户 Jim 对 StreamConfig 调度程序的权限。指定使用 --config-schema
选项对 stream_config 调度程序进行编辑:
$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --update --config-schema stream_config --add-operator Jim
编辑默认 stream_config 调度程序,以便每个微批处理在结束前等待数据一秒钟:
$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --update --eof-timeout-ms 1000
将名为 iot_scheduler_8.1 的调度程序升级到与当前 Vertica 版本兼容的名为 iot_scheduler_9.0 的新调度程序:
$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --upgrade --config-schema iot_scheduler_8.1 \
--upgrade-to-schema iot_scheduler_9.0
删除架构 scheduler219a:
$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --drop --config-schema scheduler219a --username dbadmin
对于 weblogs.conf 中定义的调度程序,请读取可以使用调度程序工具设置的选项的当前设置。
$ vkconfig scheduler --read --conf weblog.conf
{"version":"v9.2.0", "frame_duration":"00:00:10", "resource_pool":"weblog_pool",
"config_refresh":"00:05:00", "new_source_policy":"FAIR",
"pushback_policy":"LINEAR", "pushback_max_count":5, "auto_sync":true,
"consumer_group_id":null}
vkconfig 脚本的群集工具可用于定义调度程序连接到的流式传输主机。
vkconfig cluster {--create | --read | --update | --delete} [--cluster cluster_name] [other_options...]
--create
创建新的负载规范,不能与 ‑‑delete
、‑‑read
或 ‑‑update
一起使用。
--read
--create
、--delete
或 --update
配合使用。
您可以通过在 --cluster
选项中提供一个或多个群集名称将输出限制为特定群集。此外,还可以使用 --hosts
选项将输出限制为包含一个或多个特定主机的群集。使用逗号分隔多个值。
您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词。
--update
更新现有的 Set Snippet Variable Value in Topic。不能与 --create
、--delete
或 --read
配合使用。
--delete
删除 Set Snippet Variable Value in Topic。不能与 --create
、--read
或 --update
配合使用。
--dump
当您将此选项与 --read
选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read
一起使用,此选项无效。
--cluster
cluster_name--create
、--update
和 --delete
需要使用此选项。--hosts
b1:port[,b2:port...]\--kafka_conf 'kafka_configuration_setting'
采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项。
--new-cluster
cluster_name--update
共享的实用程序选项。--validation-typERROR|WARN|SKIP}
ERROR - 如果 vkconfig 无法验证该群集是否存在,请取消配置或创建。这是默认设置。
WARN - 验证失败时继续执行任务,但会显示警告。
SKIP - 不执行验证。
已重命名自 --skip-validation
。
请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。
以下示例显示如何创建群集 StreamCluster1 并分配两个主机:
$ /opt/vertica/packages/kafka/bin/vkconfig cluster --create --cluster StreamCluster1 \
--hosts 10.10.10.10:9092,10.10.10.11:9092
--conf myscheduler.config
以下示例显示如何列出与 weblogs.conf 文件中定义的调度程序关联的所有群集:
$ vkconfig cluster --read --conf weblog.conf
{"cluster":"kafka_weblog",
"hosts":"kafka01.example.com:9092,kafka02.example.com:9092"}
使用 vkconfig 脚本的源工具创建、更新或删除源。
vkconfig source {--create | --read | --update | --delete} --source source_name [other_options...]
--create
创建新的负载规范,不能与 ‑‑delete
、‑‑read
或 ‑‑update
一起使用。
--read
--create
、--delete
或 --update
配合使用。
默认情况下,此选项输出调度程序中定义的所有源。可以使用 --cluster
、--enabled
、--partitions
和 --source
选项限制输出。输出将仅包含与这些选项中的值匹配的源。--enabled
选项只能有 true 或 false 值。--source
选项区分大小写。
您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词。
--update
更新现有的 Set Snippet Variable Value in Topic。不能与 --create
、--delete
或 --read
配合使用。
--delete
删除 Set Snippet Variable Value in Topic。不能与 --create
、--read
或 --update
配合使用。
--source
source_name--create
、--update
和 --delete
需要使用此选项。--cluster
cluster_name--dump
当您将此选项与 --read
选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read
一起使用,此选项无效。
--enabled``TRUE|FALSE
--new-cluster
cluster_name引用旧群集源的所有源现在都面向此群集。
需要: --update
和 --source
选项
--new-source
source_name需要: --update
共用的实用程序选项
--partitions
count默认值:
群集中定义的分区数。
需要: --create
和 --source
选项
您必须将此值与 Kafka 主题中的分区数保持一致。
已重命名自 --num-partitions
。
--validation-typERROR|WARN|SKIP}
ERROR - 如果 vkconfig 无法验证该源,则取消配置或创建。这是默认设置。
WARN - 验证失败时继续执行任务,但会显示警告。
SKIP - 不执行验证。
已重命名自 --skip-validation
。
请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。
以下示例显示了如何创建或更新 SourceFeed。
创建源 SourceFeed 并将其分配给 myscheduler.conf 配置文件定义的调度程序中的群集 StreamCluster1:
$ /opt/vertica/packages/kafka/bin/vkconfig source --create --source SourceFeed \
--cluster StreamCluster1 --partitions 3
--conf myscheduler.conf
更新现有源 SourceFeed,以使用 myscheduler.conf 配置文件定义的调度程序中的现有群集 StreamCluster2:
$ /opt/vertica/packages/kafka/bin/vkconfig source --update --source SourceFeed \
--new-cluster StreamCluster2
--conf myscheduler.conf
以下示例读取 weblogs.conf 文件定义的调度程序中定义的源。
$ vkconfig source --read --conf weblog.conf
{"source":"web_hits", "partitions":1, "src_enabled":true,
"cluster":"kafka_weblog",
"hosts":"kafka01.example.com:9092,kafka02.example.com:9092"}
使用目标工具可以配置 Vertica 表从流式数据传输应用程序接收数据。
vkconfig target {--create | --read | --update | --delete} [--target-table table --table_schema schema] [other_options...]
--create
创建新的负载规范,不能与 ‑‑delete
、‑‑read
或 ‑‑update
一起使用。
--read
--create
、--delete
或 --update
配合使用。
默认情况下,此选项输出配置架构中定义的所有目标。可以使用 --target-schema
和 --target-table
选项将输出限制到特定目标。vkconfig 脚本仅输出与这些选项中设置的值匹配的目标。
您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词。
--update
更新现有的 Set Snippet Variable Value in Topic。不能与 --create
、--delete
或 --read
配合使用。
--delete
删除 Set Snippet Variable Value in Topic。不能与 --create
、--read
或 --update
配合使用。
--target-table
table--create
、--update
和 --delete
需要使用此选项。--target-schema
架构--create
、--update
和 --delete
需要使用此选项。--dump
当您将此选项与 --read
选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read
一起使用,此选项无效。
--new-target-schema
schema_name需要: --update
选项。
--new-target-table
schema_name需要: --update
选项。
--validation-typERROR|WARN|SKIP}
ERROR - 如果 vkconfig 无法验证该表是否存在,则取消配置或创建。这是默认设置。
WARN - 如果验证失败,则创建或更新目标,但会显示警告。
SKIP - 不执行验证。
已重命名自 --skip-validation
。
请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。
以下示例显示了如何从 public.streamtarget 表为 myscheduler.conf 配置文件中定义的调度程序创建目标:
$ /opt/vertica/packages/kafka/bin/vkconfig target --create --target-table streamtarget --conf myscheduler.conf
以下示例列出在 weblogs.conf 配置文件中定义的调度程序中的所有目标。
$ vkconfig target --read --conf weblog.conf
{"target_schema":"public", "target_table":"web_hits"}
vkconfig 脚本的加载规范工具可用于为加载流式传输数据的 COPY 语句提供参数。
$ vkconfig load-spec {--create | --read | --update | --delete} [--load-spec spec‑name] [other‑options...]
--create
创建新的负载规范,不能与 ‑‑delete
、‑‑read
或 ‑‑update
一起使用。
--read
‑‑create
、‑‑delete
或 ‑‑update
配合使用。
默认情况下,此选项会输出调度程序中定义的所有加载规范。您可以通过向这些选项提供一个值或逗号分隔的值列表来限制输出:
--load-spec
--filters
--uds-kv-parameters
--parser
--message-max-bytes
--parser-parameters
vkconfig 脚本仅输出与您提供的值匹配的加载规范配置。
您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词。
--update
更新现有的 Set Snippet Variable Value in Topic。不能与 --create
、--delete
或 --read
配合使用。
--delete
删除 Set Snippet Variable Value in Topic。不能与 --create
、--read
或 --update
配合使用。
\--load-spec spec‑name
--create
、--update
和 --delete
需要使用此选项。--dump
当您将此选项与 --read
选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read
一起使用,此选项无效。
\--filters "filter‑name"
\--message-max-bytes max‑size
指定 Kafka 协议批处理消息的最大大小(以字节为单位)。
默认值: 25165824
\--new-load-spec new‑name
--update
参数。\--parser-parameters "key=value[,...]"
--parser
参数中指定的解析器的参数列表。在使用 Vertica 本机解析器时,调度程序会将这些参数传递给 COPY 语句,然后转而将其传递给解析器。\--parser parser‑name
--parser-parameters
选项的值将传递给 COPY 语句。
默认值: KafkaParser
\--uds-kv-parameters key=value[,...]
--validation-type {ERROR|WARN|SKIP}
ERROR
:如果 vkconfig 无法验证加载规范,则取消配置或创建。这是默认设置。
WARN
:如果验证失败,则继续执行任务,但会显示警告。
SKIP
:不执行验证。
已重命名自 --skip-validation
。
请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。
以下示例显示如何使用加载规范实用程序选项。
创建加载规范 Streamspec1
:
$ /opt/vertica/packages/kafka/bin/vkconfig load-spec --create --load-spec Streamspec1 --conf myscheduler.conf
将加载规范 Streamspec1
重命名为 Streamspec2
:
$ /opt/vertica/packages/kafka/bin/vkconfig load-spec --update --load-spec Streamspec1 \
--new-load-spec Streamspec2 \
--conf myscheduler.conf
更新加载规范 Filterspec
以使用 KafkaInsertLengths
筛选器和自定义解密筛选器:
$ /opt/vertica/packages/kafka/bin/vkconfig load-spec --update --load-spec Filterspec \
--filters "KafkaInsertLengths() DecryptFilter(parameter=Key)" \
--conf myscheduler.conf
读取加载规范 streamspec1
的当前设置:
$ vkconfig load-spec --read --load-spec streamspec1 --conf weblog.conf
{"load_spec":"streamspec1", "filters":null, "parser":"KafkaParser",
"parser_parameters":null, "load_method":"TRICKLE", "message_max_bytes":null,
"uds_kv_parameters":null}
使用 vkconfig 脚本的微批处理工具,可以配置调度程序的微批处理。
vkconfig microbatch {--create | --read | --update | --delete} [--microbatch name] [other_options...]
--create
创建新的负载规范,不能与 ‑‑delete
、‑‑read
或 ‑‑update
一起使用。
--read
‑‑create
、‑‑delete
或 ‑‑update
配合使用。
可以使用 ‑‑consumer-group-id
、‑‑enabled
、‑‑load-spec
、‑‑microbatch
、‑‑rejection-schema
、‑‑rejection-table
、‑‑target-schema
、‑‑target-table
和 ‑‑target-columns
选项将输出限制到特定的微批处理。‑‑enabled
选项仅接受 true 或 false 值。
您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词。
--update
更新现有的 Set Snippet Variable Value in Topic。不能与 --create
、--delete
或 --read
配合使用。
--delete
删除 Set Snippet Variable Value in Topic。不能与 --create
、--read
或 --update
配合使用。
\--microbatch name
--create
、--update
和 --delete
需要使用此选项。\--add-source-cluster cluster_name
--microbatch
选项指定的微批处理的群集的名称。您可以在每个命令中使用此参数一次。此外,还可以将它与 --update
一起使用,以将源添加到微批处理。您只能将来自同一群集的源添加到同一个微批处理中。需要 \--add-source
。\--add-source source_name
--update
一起使用,以将源添加到微批处理。需要 \--add-source-cluster
。\--cluster cluster_name
--offset
选项将应用于的群集的名称。仅当微批处理定义了多个群集或提供了 --source
参数时才需要。需要 --offset
选项。\--consumer-group-id id_name
Kafka 使用者组(Vertica 将向其报告消费消息进度)的名称。设置此值以禁用向 Kafka 使用者组报告进度。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况。
默认值:
vertica_database-name
--dump
当您将此选项与 --read
选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read
一起使用,此选项无效。
--enabled TRUE|FALSE
\--load-spec loadspec_name
\--max-parallelism max_num_loads
使用此选项可以:
控制事务大小。
根据调度程序的调度程序资源池设置优化加载,例如 PLANNEDCONCURRENCY。
\--new-microbatch updated_name
--update
选项。\--offset partition_offset[,...]
--partition
选项中列出的每个分区提供偏移量值。
您可以使用此选项跳过源中的某些消息或重新加载之前读取的消息。
有关详细信息,请参阅下面的特殊起始偏移量值。
\--partition partition[,...]
--offset
选项中指定的偏移量将应用于的一个或多个分区。如果您提供此选项,则 --offset
选项中指定的偏移量将应用于您指定的分区。需要 --offset
选项。\--rejection-schema schema_name
\--rejection-table table_name
\--remove-source-cluster cluster_name
--remove-source
。\--remove-source source_name
--update
一起使用,以从微批处理中移除多个源。需要 --remove-source-cluster
。\--source source_name
--offset
选项中的偏移量将应用于的源的名称。当微批处理定义多个源或指定 --cluster
参数时需要。需要 --offset
选项。\--target-columns column_expression
有关列表达式的描述,请参阅 COPY 语句参数。
\--target-schema schema_name
\--target-table table_name
\--validation-type{ERROR|WARN|SKIP}
ERROR - 如果 vkconfig 无法验证该微批处理,则取消配置或创建。这是默认设置。
WARN - 验证失败时继续执行任务,但会显示警告。
SKIP - 不执行验证。
已重命名自 --skip-validation
。
请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。
stream
参数的 start_offset 部分允许您从主题分区中的特定点开始加载消息。它还接受两个特殊偏移值之一:
-2 告知Set Snippet Variable Value in Topic从主题分区中最早可用的消息开始加载。当您需要从 Kafka 主题的分区中尽量加载更多消息时,此值十分有用。
-3 告知Set Snippet Variable Value in Topic从使用者组保存的偏移量开始加载。如果使用者组没有保存的偏移量,它会从主题分区中最早可用的消息开始加载。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况。
此示例显示如何创建微批处理 mbatch1。此微批处理会标识微批处理的架构、目标表、加载规范和源:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --create --microbatch mbatch1 \
--target-schema public \
--target-table BatchTarget \
--load-spec Filterspec \
--add-source SourceFeed \
--add-source-cluster StreamCluster1 \
--conf myscheduler.conf
此示例演示了如何在 weblog.conf 配置文件中定义的调度程序中列出微批处理的当前设置。
$ vkconfig microbatch --read --conf weblog.conf
{"microbatch":"weblog", "target_columns":null, "rejection_schema":null,
"rejection_table":null, "enabled":true, "consumer_group_id":null,
"load_spec":"weblog_load", "filters":null, "parser":"KafkaJSONParser",
"parser_parameters":null, "load_method":"TRICKLE", "message_max_bytes":null,
"uds_kv_parameters":null, "target_schema":"public", "target_table":"web_hits",
"source":"web_hits", "partitions":1, "src_enabled":true, "cluster":"kafka_weblog",
"hosts":"kafka01.example.com:9092,kafka02.example.com:9092"}
使用 vkconfig 脚本启动工具可为调度程序实例分配名称。
vkconfig launch [options...]
--enable-ssl``{true|false}
--ssl-ca-alias
别名--ssl-key-alias
别名--ssl-key-password
password--instance-name
name\--kafka_conf 'kafka_configuration_setting'
采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项。
请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。
此示例显示如何启动 myscheduler.conf 配置文件中定义的调度程序并为其指定实例名称 PrimaryScheduler:
$ nohup /opt/vertica/packages/kafka/bin/vkconfig launch --instance-name PrimaryScheduler \
--conf myscheduler.conf >/dev/null 2>&1 &
此示例显示如何在启用 SSL 的情况下启动名为 SecureScheduler 的实例:
$ nohup /opt/vertica/packages/kafka/bin/vkconfig launch --instance-name SecureScheduler --enable-SSL true \
--ssl-ca-alias authenticcert --ssl-key-alias ourkey \
--ssl-key-password secret \
--conf myscheduler.conf \
>/dev/null 2>&1 &
使用 vkconfig 脚本的关闭工具终止主机上运行的一个或全部 Vertica 调度程序。始终在重新启动调度程序之前运行此命令,以确保调度程序已正确关闭。
vkconfig shutdown [options...]
请参阅常用 vkconfig 脚本选项了解所有 vkconfig 工具中提供的选项。
要终止在主机上运行的所有调度程序,请使用不带选项的 shutdown
命令:
$ /opt/vertica/packages/kafka/bin/vkconfig shutdown
使用 --conf
或 --config-schema
选项指定要关闭的调度程序。以下命令终止使用相同 --conf myscheduler.conf
选项启动的调度程序:
$ /opt/vertica/packages/kafka/bin/vkconfig shutdown --conf myscheduler.conf
统计信息工具可用于访问调度程序已运行的微批处理的历史记录。该工具以 JSON 格式将微批处理的日志输出到标准输出。可以使用其选项来筛选微批处理列表,以获得您感兴趣的微批处理。
vkconfig statistics [options]
\--cluster "cluster"[,"cluster2"...]
--dump
\--from-timestamp "timestamp"
不能与 --last
结合使用。
\--last number
--from-timestamp
或 --to-timestamp
结合使用。\--microbatch "name"[,"name2"...]
\--partition partition#[,partition#2...]
\--source "source"[,"source2"...]
--target-schema "schema"[,"schema2"...]
--target-table "table"[,"table2"...]
\--to-timestamp "timestamp"
不能与 --last
结合使用。
请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。
可以在提供给 --cluster
、--microbatch
、--source
、--target-schema
和 --target-table
实参的值中使用 LIKE 通配符。此功能可用于匹配微批处理数据中的部分字符串。有关使用通配符的详细信息,请参阅 LIKE 谓词。
--cluster
、--microbatch
、--source
、--target-schema
和 --target-table
实参的字符串比较不区分大小写。
提供给 --from-timestamp
和 --to-timestamp
实参的日期和时间值使用 java.sql.timestamp 格式以便解析该值。此格式的解析可以接受您可能认为无效并希望它拒绝的值。例如,如果提供的时间戳为 01-01-2018 24:99:99,Java 时间戳解析器会静默地将日期转换为 2018-01-02 01:40:39,而不是返回错误。
以下示例获取 weblog.conf 文件中定义的调度程序运行的最后一个微批处理:
$ /opt/vertica/packages/kafka/bin/vkconfig statistics --last 1 --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":80000, "end_offset":79999, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":0, "partition_messages":0,
"timeslice":"00:00:09.793000", "batch_start":"2018-11-06 09:42:00.176747",
"batch_end":"2018-11-06 09:42:00.437787", "source_duration":"00:00:00.214314",
"consecutive_error_count":null, "transaction_id":45035996274513069,
"frame_start":"2018-11-06 09:41:59.949", "frame_end":null}
如果调度程序正在从多个分区读取,则 --last 1
选项会列出来自每个分区的最后一个微批处理:
$ /opt/vertica/packages/kafka/bin/vkconfig statistics --last 1 --conf iot.conf
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":0,
"start_offset":-2, "end_offset":-2, "end_reason":"DEADLINE",
"end_reason_message":null, "partition_bytes":0, "partition_messages":0,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:09.950127",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":1,
"start_offset":1604, "end_offset":1653, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4387, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.220329",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":2,
"start_offset":1603, "end_offset":1652, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4383, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.318997",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":3,
"start_offset":1604, "end_offset":1653, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4375, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.219543",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
可以使用 --partition
实参只获取所需的分区:
$ /opt/vertica/packages/kafka/bin/vkconfig statistics --last 1 --partition 2 --conf iot.conf
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":2,
"start_offset":1603, "end_offset":1652, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4383, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.318997",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
如果调度程序从多个源读取,则 --last 1
选项会输出来自每个源的最后一个微批处理:
$ /opt/vertica/packages/kafka/bin/vkconfig statistics --last 1 --conf weblog.conf
{"microbatch":"weberrors", "target_schema":"public", "target_table":"web_errors",
"source_name":"web_errors", "source_cluster":"kafka_weblog",
"source_partition":0, "start_offset":10000, "end_offset":9999,
"end_reason":"END_OF_STREAM", "end_reason_message":null,
"partition_bytes":0, "partition_messages":0, "timeslice":"00:00:04.909000",
"batch_start":"2018-11-06 10:58:02.632624",
"batch_end":"2018-11-06 10:58:03.058663", "source_duration":"00:00:00.220618",
"consecutive_error_count":null, "transaction_id":45035996274523991,
"frame_start":"2018-11-06 10:58:02.394", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":80000, "end_offset":79999, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":0, "partition_messages":0,
"timeslice":"00:00:09.128000", "batch_start":"2018-11-06 10:58:03.322852",
"batch_end":"2018-11-06 10:58:03.63047", "source_duration":"00:00:00.226493",
"consecutive_error_count":null, "transaction_id":45035996274524004,
"frame_start":"2018-11-06 10:58:02.394", "frame_end":null}
可以使用通配符来启用部分匹配。以下示例演示了如何获取名称以“log”结尾的所有微批处理的最后一个微批处理:
~$ /opt/vertica/packages/kafka/bin/vkconfig statistics --microbatch "%log" \
--last 1 --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":80000, "end_offset":79999, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":0, "partition_messages":0,
"timeslice":"00:00:04.874000", "batch_start":"2018-11-06 11:37:16.17198",
"batch_end":"2018-11-06 11:37:16.460844", "source_duration":"00:00:00.213129",
"consecutive_error_count":null, "transaction_id":45035996274529932,
"frame_start":"2018-11-06 11:37:15.877", "frame_end":null}
要获取特定时间段的微批处理,请使用 --from-timestamp
和 --to-timestamp
实参。以下示例获取 iot.conf 中定义的调度程序在 2018-11-06 12:52:30 到 12:53:00 之间从分区 #2 进行读取的微批处理。
$ /opt/vertica/packages/kafka/bin/vkconfig statistics --partition 1 \
--from-timestamp "2018-11-06 12:52:30" \
--to-timestamp "2018-11-06 12:53:00" --conf iot.conf
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":1,
"start_offset":1604, "end_offset":1653, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4387, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.220329",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":1,
"start_offset":1554, "end_offset":1603, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4371, "partition_messages":50,
"timeslice":"00:00:09.788000", "batch_start":"2018-11-06 12:52:38.930428",
"batch_end":"2018-11-06 12:52:48.932604", "source_duration":"00:00:00.231709",
"consecutive_error_count":null, "transaction_id":45035996274536981,
"frame_start":"2018-11-06 12:52:38.685", "frame_end":null}
以下示例演示了如何使用 --dump
实参获取 vkconfig 为检索上一个示例的输出而执行的 SQL 语句:
$ /opt/vertica/packages/kafka/bin/vkconfig statistics --dump --partition 1 \
--from-timestamp "2018-11-06 12:52:30" \
--to-timestamp "2018-11-06 12:53:00" --conf iot.conf
SELECT microbatch, target_schema, target_table, source_name, source_cluster,
source_partition, start_offset, end_offset, end_reason, end_reason_message,
partition_bytes, partition_messages, timeslice, batch_start, batch_end,
last_batch_duration AS source_duration, consecutive_error_count, transaction_id,
frame_start, frame_end FROM "iot_sched".stream_microbatch_history WHERE
(source_partition = '1') AND (frame_start >= '2018-11-06 12:52:30.0') AND
(frame_start < '2018-11-06 12:53:00.0') ORDER BY frame_start DESC, microbatch,
source_cluster, source_name, source_partition;
同步实用程序通过查询源定义的 Kafka 群集代理来立即更新所有源定义。默认情况下,它会更新目标架构中定义的所有源。要仅更新特定源,请使用 --source
和 --cluster
选项指定要更新的源。
vkconfig sync [options...]
--source
source_name--cluster
cluster_name\--kafka_conf 'kafka_configuration_setting'
采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项。
请参阅常用 vkconfig 脚本选项了解所有 vkconfig 工具中提供的选项。