领取MOLI红包

ORACLE中文网

DataWorks支持哪些Kafka数据同步能力
发布日期:2025-01-03 17:28    点击次数:99
Kafka数据源为您提供读取和写入Kafka的双向通道,本文为您介绍DataWorks的Kafka数据同步的能力支持情况。使用限制Kafka数据源目前仅支持使用独享数据集成资源组。单表离线读同时配置parameter.groupId和parameter.kafkaConfig.group.id时,parameter.groupId优先级高于kafkaConfig配置信息中的group.id。单表实时写写入数据不支持去重,即如果任务重置位点或者Failover后再启动,会导致有重复数据写入。整库实时写实时数据同步任务仅支持使用独享数据集成资源组。对于源端同步表有主键的场景,同步时会使用主键值作为kafka记录的key,确保同主键的变更有序写入kafka的同一分区。对于源端同步表无主键的场景,如果选择了支持无主键表同步选项,则同步时kafka记录的key为空。如果要确保表的变更有序写入kafka,则选择写入的kafka topic必须是单分区。如果选择了自定义同步主键,则同步时使用其他非主键的一个或几个字段的联合,代替主键作为kafka记录的key。如果在kafka集群发生响应异常的情况下,仍要确保有主键表同主键的变更有序写入kafka的同一分区,则需要在配置kafka数据源时,在扩展参数表单中加入如下配置。实时同步写入kafka的消息总体格式、同步任务心跳消息格式及源端更改数据对应的消息格式,详情请参见:附录:消息格式。支持的字段类型Kafka的数据存储为非结构化的存储,通常Kafka记录的数据模块有key、value、offset、timestamp、headers、partition。DataWorks在对Kafka数据进行读写时,会按照以下的策略进行数据处理。离线读数据DataWorks读取Kafka数据时,支持对Kafka数据进行JSON格式的解析,各数据模块的处理方式如下。Kafka记录数据模块处理后的数据类型key取决于数据同步任务配置的keyType配置项,keyType参数介绍请参见下文的全量参数说明章节。value取决于数据同步任务配置的valueType配置项,valueType参数介绍请参见下文的全量参数说明章节。offsetLongtimestampLongheadersStringpartitionLong离线写数据同步任务类型写入Kafka value的格式源端字段类型写入时的处理方式离线同步DataStudio的离线同步节点json字符串UTF8编码字符串布尔转换为UTF8编码字符串"true"或者"false"时间/日期yyyy-MM-dd HH:mm:ss格式UTF8编码字符串数值UTF8编码数值字符串字节流字节流会被视为UTF8编码的字符串,被转换成字符串text字符串UTF8编码字符串布尔转换为UTF8编码字符串"true"或者"false"时间/日期yyyy-MM-dd HH:mm:ss格式UTF8编码字符串数值UTF8编码数值字符串字节流字节流会被视为UTF8编码的字符串,被转换成字符串实时同步:ETL实时同步至KafkaDataStudio的实时同步节点json字符串UTF8编码字符串布尔json布尔类型时间/日期对于精确到毫秒以下精度的时间:转换成表示毫秒时间戳的13位JSON整数。对于精确到微秒或者纳秒精度的时间:转换成带有表示毫秒时间戳的13位整数,和表示纳秒时间戳的6位小数的JSON浮点数。数值json数值类型字节流字节流会进行Base64编码后转换成UTF8编码的字符串text字符串UTF8编码字符串布尔转换为UTF8编码字符串"true"或者"false"时间/日期yyyy-MM-dd HH:mm:ss格式UTF8编码字符串数值UTF8编码数值字符串字节流字节流会进行Base64编码后转换成UTF8编码字符串实时同步:整库实时同步至Kafka纯实时同步增量数据内置JSON格式字符串UTF8编码字符串布尔json布尔类型时间/日期13位毫秒时间戳数值json数值字节流字节流会进行Base64编码后转换成UTF8编码字符串同步解决方案:一键实时同步至Kafka离线全量+实时增量内置JSON格式字符串UTF8编码字符串布尔json布尔类型时间/日期13位毫秒时间戳数值json数值字节流字节流会进行Base64编码后转换成UTF8编码字符串数据同步任务开发Kafka数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。创建数据源在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建与管理数据源。单表离线同步任务配置指导操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。单表、整库实时同步任务配置指导操作流程请参见DataStudio侧实时同步任务配置。单表、整库全增量实时同步任务配置指导操作流程请参见数据集成侧同步任务配置。启用认证配置说明SSLkeystore证书文件、keystore证书密码和SSL密钥密码只有在Kafka集群开启双向SSL认证时才需要进行配置,用于Kafka集群服务端认证客户端身份,Kafka集群server.properties中ssl.client.auth=required时开启双向SSL认证,详情请参见使用SSL加密Kafka链接。GSSAPIKerberos配置文件Keytab文件独享资源组DNS、HOST配置HOST设置PLAIN配置Kafka数据源时,当Sasl机制选择PLAIN时,JAAS文件必须以KafkaClient开头,之后使用一个大括号包含所有配置项。大括号内第一行定义使用的登录组件类,对于各类Sasl认证机制,登录组件类是固定的,后续的每个配置项以key=value格式书写。除最后一个配置项,其他配置项结尾不能有分号。最后一个配置项结尾必须有分号,在大括号"}"之后也必须加上一个分号。不符合以上格式要求将导致JAAS配置文件解析出错,典型的JAAS配置文件格式如下(根据实际情况替换以下内容中的xxx):KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx"; };配置项说明登录模块必须配置org.apache.kafka.common.security.plain.PlainLoginModulusername用户名,请根据实际情况配置该项。password密码,请根据实际情况配置该项。常见问题读取kafka配置了endDateTime来指定所要同步的数据的截止范围,但是在目的数据源中发现了超过这个时间的数据Kafka中数据量少,但是任务出现长时间不读取数据也不结束,一直运行中的现象是为什么?附录:脚本Demo与参数说明附录:离线任务脚本配置方式如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。Reader脚本DemoReader脚本参数参数描述是否必选datasource数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。是serverKafka的broker server地址,格式为ip:port。您可以只配置一个server,但请务必保证Kafka集群中所有broker的IP地址都可以连通DataWorks。是topicKafka的Topic,是Kafka处理资源的消息源(feeds of messages)的聚合。是column需要读取的Kafka数据,支持常量列、数据列和属性列:常量列:使用单引号包裹的列为常量列,例如["'abc'", "'123'"]。数据列 如果您的数据是一个JSON,支持获取JSON的属性,例如["event_id"]。如果您的数据是一个JSON,支持获取JSON的嵌套子属性,例如["tag.desc"]。属性列 __key__表示消息的key。__value__表示消息的完整内容 。__partition__表示当前消息所在分区。__headers__表示当前消息headers信息。__offset__表示当前消息的偏移量。__timestamp__表示当前消息的时间戳。是keyTypeKafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。是valueTypeKafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。是beginDateTime数据消费的开始时间位点,为时间范围(左闭右开)的左边界。yyyymmddhhmmss格式的时间字符串,可以配合调度参数使用。详情请参见调度参数支持的格式。 需要和beginOffset二选一。 endDateTime数据消费的结束时间位点,为时间范围(左闭右开)的右边界。yyyymmddhhmmss格式的时间字符串,可以配合调度参数使用。详情请参见调度参数支持的格式。 需要和endOffset二选一。 beginOffset数据消费的开始时间位点,您可以配置以下形式:数字形式(例如15553274),表示开始消费的点位。seekToBeginning:表示从开始点位消费数据。seekToLast:表示从kafkaConfig配置中指定的group.id对应群组ID保存的位点开始读取数据,注意群组位点在客户端会定时自动提交到Kafka服务端,所以任务失败后,如果重跑任务时可能会有数据重复或者丢失,skipExceedRecord参数配置为true时,任务可能丢弃最后读取的一些记录,而这些丢弃数据的群组位点已经提交到服务端,在下一个周期任务运行时将无法读到这些丢弃的数据。seekToEnd:表示从最后点位消费数据,会读取到空数据。需要和beginDateTime二选一。endOffset数据消费的结束位点,用于控制结束数据消费任务退出的时间。需要和endDateTime二选一。skipExceedRecordKafka使用public ConsumerRecords<K, V> poll(final Duration timeout)消费数据,一次poll调用获取的数据可能在endOffset或者endDateTime之外。skipExceedRecord用于控制是否写出多余的数据至目的端。由于消费数据使用了自动点位提交,建议您: Kafka 0.10.2之前版本:建议配置skipExceedRecord为false。Kafka 0.10.2及以上版本:建议配置skipExceedRecord为true。否,默认值为false。partitionKafka的一个Topic有多个分区(partition),正常情况下数据同步任务是读取Topic(多个分区)一个点位区间的数据。您也可以指定partition,仅读取一个分区点位区间的数据。否,无默认值。kafkaConfig创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap.servers、auto.commit.interval.ms、session.timeout.ms等,您可以基于kafkaConfig控制KafkaConsumer消费数据的行为。否encoding当keyType或valueType配置为STRING时,将使用该配置项指定的编码解析字符串。否,默认为UTF-8。waitTIme消费者对象从Kafka拉取一次数据的最大等待时间,单位为秒。否,默认为60。stopWhenPollEmpty该配置项可选值为true/false。当配置为true时,如果消费者从Kafka拉取数据返回为空(一般是已经读完主题中的全部数据,也可能是网络或者Kafka集群可用性问题),则立即停止任务,否则持续重试直到再次读到数据。否,默认为true。stopWhenReachEndOffset该配置项只在stopWhenPollEmpty为true时生效,可选值为true/false。当配置为true时,如果消费者从Kafka拉取数据返回为空时,会检查当前是否读取到了Kafka Topic分区中的最新位点数据,如果已经读到了Kafka Topic所有分区中的最新位点数据,则立即停止任务,否则继续尝试从Kafka Topic中拉取数据。当配置为false时,如果消费者从Kafka拉取数据返回为空时,不会进行检查,立即停止任务。否,默认为false。kafkaConfig参数说明如下。参数描述fetch.min.bytes指定消费者从broker获取消息的最小字节数,即有足够的数据时,才将其返回给消费者。fetch.max.wait.ms等待broker返回数据的最大时间,默认500毫秒。fetch.min.bytes和fetch.max.wait.ms先满足哪个条件,便按照该方式返回数据。max.partition.fetch.bytes指定broker从每个partition中返回给消费者的最大字节数,默认为1 MB。session.timeout.ms指定消费者不再接收服务之前,可以与服务器断开连接的时间,默认是30秒。auto.offset.reset消费者在读取没有偏移量或者偏移量无效的情况下(因为消费者长时间失效,包含偏移量的记录已经过时并被删除)的处理方式。默认为none(意味着不会自动重置位点),您可以更改为earliest(消费者从起始位置读取partition的记录)。max.poll.records单次调用poll方法能够返回的消息数量。key.deserializer消息Key的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer。value.deserializer数据Value的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer。ssl.truststore.locationSSL根证书的路径。ssl.truststore.password根证书Store的密码。如果是Aliyun Kafka,则配置为KafkaOnsClient。security.protocol接入协议,目前支持使用SASL_SSL协议接入。sasl.mechanismSASL鉴权方式,如果是Aliyun Kafka,使用PLAIN。java.security.auth.login.configSASL鉴权文件路径。Writer脚本DemoWriter脚本参数参数描述是否必选datasource数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。是serverKafka的server地址,格式为ip:port。是topicKafka的topic,是Kafka处理资源的消息源(feeds of messages)的不同分类。 每条发布至Kafka集群的消息都有一个类别,该类别被称为topic,一个topic是对一组消息的归纳。是valueIndexKafka Writer中作为Value的那一列。如果不填写,默认将所有列拼起来作为Value,分隔符为fieldDelimiter。否writeMode当未配置valueIndex时,该配置项决定将源端读取记录的所有列拼接作为写入kafka记录Value的格式,可选值为text和JSON,默认值为text。配置为text,将所有列按照fieldDelimiter指定分隔符拼接。配置为JSON,将所有列按照column参数指定字段名称拼接为JSON字符串。例如源端记录有三列,值为a、b和c,writeMode配置为text、fieldDelimiter配置为#时,写入kafka的记录Value为字符串a#b#c;writeMode配置为JSON、column配置为[{"name":"col1"},{"name":"col2"},{"name":"col3"}]时,写入kafka的记录Value为字符串{"col1":"a","col2":"b","col3":"c"}。如果配置了valueIndex,该配置项无效。否column目标表需要写入数据的字段,字段间用英文逗号分隔。例如:"column": ["id", "name", "age"]。当未配置valueIndex,并且writeMode选择JSON时,该配置项定义源端读取记录的列值在JSON结构中的字段名称。例如,"column": [{"name":id"}, {"name":"name"}, {"name":"age"}]。当源端读取记录列的个数多于column配置的字段名个数时,写入时进行截断。例如:源端记录有三列,值为a、b和c,column配置为[{"name":"col1"},{"name":"col2"}]时,写入kafka的记录Value为字符串{"col1":"a","col2":"b"}。当源端读取记录列的个数少于column配置的字段名个数时,多余column配置字段名填充null或者nullValueFormat指定的字符串。例如:源端记录有两列,值为a和b,column配置为[{"name":"col1"},{"name":"col2"},{"name":"col3"}]时,写入kafka的记录Value为字符串{"col1":"a","col2":"b","col3":null}。如果配置了valueIndex,或者writeMode配置为text,该配置项无效。如果配置了valueIndex,或者writeMode配置为text,该配置项无效。当未配置valueIndex,并且writeMode配置为JSON时必选partition指定写入Kafka topic指定分区的编号,是一个大于等于0的整数。否keyIndexKafka Writer中作为Key的那一列。keyIndex参数取值范围是大于等于0的整数,否则任务会出错。否keyIndexes源端读取记录中作为写入kafka记录Key的列的序号数组。列序号从0开始,例如[0,1,2],会将配置的所有列序号的值用逗号连接作为写入kafka记录的Key。如果不填写,写入kafka记录Key为null,数据轮流写入topic的各个分区中,与keyIndex参数只能二选一。否fieldDelimiter当writeMode配置为text,并且未配置valueIndex时,将源端读取记录的所有列按照该配置项指定列分隔符拼接作为写入kafka记录的Value,支持配置单个或者多个字符作为分隔符,支持以\u0001格式配置unicode字符,支持\t、\n等转义字符。默认值为\t。如果writeMode未配置为text或者配置了valueIndex,该配置项无效。否keyTypeKafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。是valueTypeKafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。是nullKeyFormatkeyIndex或者keyIndexes指定的源端列值为null时,替换为该配置项指定的字符串,如果不配置不做替换。否nullValueFormat当源端列值为null时,组装写入kafka记录Value时替换为该配置项指定的字符串,如果不配置不做替换。否acks初始化Kafka Producer时的acks配置,决定写入成功的确认方式。默认acks参数为all。acks取值如下:0:不进行写入成功确认。1:确认主副本写入成功。all:确认所有副本写入成功。否附录:写入Kafka消息格式定义

  • 上一篇:没有了
  • 下一篇:OLED
  • 相关资讯