问题:使用ogg for bigdata实现数据传输到kafka,在做update操做时出现问题
问题:使用oggforbigdata实现数据传输到kafka,在做update操做时,before数据没有显示,after中只有被修改的那一列数据,其它列也不显示,需要怎...
问题:使用ogg for bigdata实现数据传输到kafka,在做update操做时,before数据没有显示,after中只有被修改的那一列数据,其它列也不显示,需要怎么配置才能使显示的消息更完整。此处使用的是json格式。
官方显示的json格式时update操作的显示:
6.3.2.2 Sample Update Message
{
"table":"GG.TCUSTORD",
"op_type":"U",
"op_ts":"2013-06-02 22:14:41.000000",
"current_ts":"2015-09-18T13:39:35.748000",
"pos":"00000000000000002891",
"tokens":{
"R":"AADPkvAAEAAEqLzAAA"
},
"before":{
"CUST_CODE":"BILL",
"ORDER_DATE":"1995-12-31:15:00:00",
"PRODUCT_CODE":"CAR",
"ORDER_ID":"765",
"PRODUCT_PRICE":15000.00,
"PRODUCT_AMOUNT":3,
"TRANSACTION_ID":"100"
},
"after":{
"CUST_CODE":"BILL",
"ORDER_DATE":"1995-12-31:15:00:00",
"PRODUCT_CODE":"CAR",
"ORDER_ID":"765",
"PRODUCT_PRICE":14000.00,
"PRODUCT_AMOUNT":3,
"TRANSACTION_ID":"100"
}
}
自己测试时update的显示:
{
"table":"FAFASCHEMA.P",
"op_type":"U",
"op_ts":"2017-02-07 09:09:19.996232",
"current_ts":"2017-02-07T17:09:26.783000",
"pos":"00000000260000002118",
"primary_keys":[
"ID"
],
"tokens":{
"TK-HOST":"ray",
"TK-OSUSER":"oracle",
"TK-SCN":"1346678"
},
"before":{
},
"after":{
"ID":"10",
"NAME":"qqqqqqqq"
}
}
我的kafka.props配置:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties
gg.handler.kafkahandler.TopicName = ggtopic
gg.handler.kafkahandler.BlockingSend = false
gg.handler.kafkahandler.includeTokens = true
gg.handler.kafkahandler.Mode = tx
gg.handler.kafkah andler.topicPartitioning = none
#json
gg.handler.kafkahandler.format = json
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
gg.handler.kafkahandler.format.prettyPrint = true
gg.handler.kafkahandler.format.jsonDelimiter = CDATA[]
gg.handler.kafkahandler.format.generateSchema = true
gg.handler.kafkahandler.format.schemaDirectory = dirdef
#gg.handler.kafkahandler.format.treatAllColumnsAsString = true
gg.handler.kafkahandler.format.includePrimaryKeys = true
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/root/kafka/kafka_2.10-0.9.0.1/libs/*:
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar 展开
官方显示的json格式时update操作的显示:
6.3.2.2 Sample Update Message
{
"table":"GG.TCUSTORD",
"op_type":"U",
"op_ts":"2013-06-02 22:14:41.000000",
"current_ts":"2015-09-18T13:39:35.748000",
"pos":"00000000000000002891",
"tokens":{
"R":"AADPkvAAEAAEqLzAAA"
},
"before":{
"CUST_CODE":"BILL",
"ORDER_DATE":"1995-12-31:15:00:00",
"PRODUCT_CODE":"CAR",
"ORDER_ID":"765",
"PRODUCT_PRICE":15000.00,
"PRODUCT_AMOUNT":3,
"TRANSACTION_ID":"100"
},
"after":{
"CUST_CODE":"BILL",
"ORDER_DATE":"1995-12-31:15:00:00",
"PRODUCT_CODE":"CAR",
"ORDER_ID":"765",
"PRODUCT_PRICE":14000.00,
"PRODUCT_AMOUNT":3,
"TRANSACTION_ID":"100"
}
}
自己测试时update的显示:
{
"table":"FAFASCHEMA.P",
"op_type":"U",
"op_ts":"2017-02-07 09:09:19.996232",
"current_ts":"2017-02-07T17:09:26.783000",
"pos":"00000000260000002118",
"primary_keys":[
"ID"
],
"tokens":{
"TK-HOST":"ray",
"TK-OSUSER":"oracle",
"TK-SCN":"1346678"
},
"before":{
},
"after":{
"ID":"10",
"NAME":"qqqqqqqq"
}
}
我的kafka.props配置:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties
gg.handler.kafkahandler.TopicName = ggtopic
gg.handler.kafkahandler.BlockingSend = false
gg.handler.kafkahandler.includeTokens = true
gg.handler.kafkahandler.Mode = tx
gg.handler.kafkah andler.topicPartitioning = none
#json
gg.handler.kafkahandler.format = json
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
gg.handler.kafkahandler.format.prettyPrint = true
gg.handler.kafkahandler.format.jsonDelimiter = CDATA[]
gg.handler.kafkahandler.format.generateSchema = true
gg.handler.kafkahandler.format.schemaDirectory = dirdef
#gg.handler.kafkahandler.format.treatAllColumnsAsString = true
gg.handler.kafkahandler.format.includePrimaryKeys = true
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/root/kafka/kafka_2.10-0.9.0.1/libs/*:
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar 展开
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询