We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ ] 2023-11-02 08:35:50.457 -ERROR [inlong-plugin-0] o.a.i.m.p.f.FlinkService :146 - submit job from info FlinkInfo(endpoint=null, jobName=InLong-Sort-test_group_10, inlongStreamInfoList=[InlongStreamInfo(id=13, inlongGroupId=test_group_10, inlongStreamId=test_stream_10, name=null, description=, mqResource=test_stream_10, dataType=null, dataEncoding=UTF-8, dataSeparator=124, dataEscapeChar=null, syncSend=0, dailyRecords=10, dailyStorage=10, peakRecords=1000, maxLength=10240, storagePeriod=1, extParams={"ignoreParseError":true,"useExtendedFields":false}, status=130, previousStatus=100, creator=admin, modifier=admin, createTime=Thu Nov 02 00:28:51 UTC 2023, modifyTime=Thu Nov 02 00:35:42 UTC 2023, fieldList=[StreamField(id=58, inlongGroupId=test_group_10, inlongStreamId=test_stream_10, fieldName=id, fieldType=int, fieldComment=null, isPredefinedField=null, fieldValue=null, preExpression=null, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, extParams=null), StreamField(id=59, inlongGroupId=test_group_10, inlongStreamId=test_stream_10, fieldName=name, fieldType=string, fieldComment=null, isPredefinedField=null, fieldValue=null, preExpression=null, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, extParams=null)], extList=null, sourceList=[MySQLBinlogSource(super=StreamSource(id=12, inlongGroupId=test_group_10, inlongStreamId=test_stream_10, sourceType=MYSQL_BINLOG, sourceName=test_source_10, agentIp=null, uuid=null, inlongClusterName=null, inlongClusterNodeTag=null, dataNodeName=null, serializationType=debezium_json, snapshot=null, version=1, status=101, previousStatus=110, creator=admin, modifier=admin, createTime=Thu Nov 02 00:29:16 UTC 2023, modifyTime=Thu Nov 02 00:35:42 UTC 2023, properties={}, templateId=null, subSourceList=null, ignoreParseError=false), user=root, password=****** hostname=*, port=3306, serverId=0, includeSchema=null, databaseWhiteList=test, tableWhiteList=test.source_table, serverTimezone=null, intervalMs=500, snapshotMode=initial, offsetFilename=null, historyFilename=null, monitoredDdl=null, timestampFormatStandard=SQL, allMigration=false, primaryKey=null, specificOffsetFile=null, specificOffsetPos=null)], sinkList=[HudiSink(super=StreamSink(super=StreamNode(preNodes=null, postNodes=null, fieldList=null), id=12, inlongGroupId=test_group_10, inlongStreamId=test_stream_10, sinkType=HUDI, sinkName=test_sink_10, description=null, inlongClusterName=null, dataNodeName=65be8d12-4815-40ed-b52e-57d5a8ecdc5c, sortTaskName=null, sortConsumerGroup=null, enableCreateResource=1, operateLog=success to create Hudi resource, status=130, previousStatus=130, creator=admin, modifier=admin, createTime=Thu Nov 02 00:35:06 UTC 2023, modifyTime=Thu Nov 02 00:35:42 UTC 2023, sinkFieldList=[SinkField(id=37, sinkType=null, inlongGroupId=null, inlongStreamId=null, fieldName=id, fieldType=int, fieldComment=id, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, sourceFieldName=id, sourceFieldType=int, extParams=null), SinkField(id=38, sinkType=null, inlongGroupId=null, inlongStreamId=null, fieldName=name, fieldType=string, fieldComment=name, isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldName=null, sourceFieldName=name, sourceFieldType=string, extParams=null)], properties={}, dataEncoding=UTF-8, dataFormat=NONE, authentication=null, version=1), catalogType=HIVE, catalogUri=thrift://*:9083, warehouse=hdfs://*/warehouse, dbName=test_db, tableName=sink_table, dataPath=null, fileFormat=Parquet, partitionType=null, primaryKey=, extList=[], partitionKey=null)], version=3, wrapType=INLONG_MSG_V0, useExtendedFields=false, ignoreParseError=true)], localJarPath=/opt/inlong-sort/sort-dist-1.10.0-SNAPSHOT.jar, connectorJarPaths=[/opt/inlong-sort/connectors/sort-connector-mysql-cdc-1.10.0-SNAPSHOT.jar, /opt/inlong-sort/connectors/sort-connector-hudi-1.10.0-SNAPSHOT.jar], localConfPath=/opt/inlong-manager/lib/InLong-Sort-test_group_10, sourceType=null, sinkType=null, jobId=null, savepointPath=null, isException=false, exceptionMsg=null) failed: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a sink for writing table 'default_catalog.default_database.sink_table'. Table options are: 'connector'='hudi-inlong' 'hive_sync.db'='test_db' 'hive_sync.enabled'='true' 'hive_sync.metastore.uris'='thrift://*:9083' 'hive_sync.mode'='hms' 'hive_sync.table'='sink_table' 'hoodie.database.name'='test_db' 'hoodie.datasource.write.recordkey.field'='' 'hoodie.table.name'='sink_table' 'inlong.metric.labels'='groupId=test_group_10&streamId=test_stream_10&nodeId=test_sink_10' 'metrics.audit.key'='16' 'metrics.audit.proxy.hosts'='audit:10081' 'path'='hdfs://*/warehouse/test_db.db/sink_table' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-clients_2.11-1.13.5.jar:1.13.5] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-clients_2.11-1.13.5.jar:1.13.5] at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) ~[flink-clients_2.11-1.13.5.jar:1.13.5] at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82) ~[flink-clients_2.11-1.13.5.jar:1.13.5] at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117) ~[flink-clients_2.11-1.13.5.jar:1.13.5] at org.apache.inlong.manager.plugin.flink.FlinkService.submitJobBySavepoint(FlinkService.java:192) ~[manager-plugins-base-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:144) ~[manager-plugins-base-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58) ~[manager-plugins-base-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_342] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_342] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_342] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342] Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink_table'. Table options are: 'connector'='hudi-inlong' 'hive_sync.db'='test_db' 'hive_sync.enabled'='true' 'hive_sync.metastore.uris'='thrift://*:9083' 'hive_sync.mode'='hms' 'hive_sync.table'='sink_table' 'hoodie.database.name'='test_db' 'hoodie.datasource.write.recordkey.field'='' 'hoodie.table.name'='sink_table' 'inlong.metric.labels'='groupId=test_group_10&streamId=test_stream_10&nodeId=test_sink_10' 'metrics.audit.key'='16' 'metrics.audit.proxy.hosts'='audit:10081' 'path'='hdfs://*/warehouse/test_db.db/sink_table' at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171) ~[flink-table-common-1.13.5.jar:1.13.5] at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:367) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[scala-library-2.11.12.jar:?] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[scala-library-2.11.12.jar:?] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.11.12.jar:?] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[scala-library-2.11.12.jar:?] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) ~[flink-table-api-java-1.13.5.jar:1.13.5] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) ~[flink-table-api-java-1.13.5.jar:1.13.5] at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99) ~[flink-table-api-java-1.13.5.jar:1.13.5] at org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:84) ~[?:?] at org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63) ~[?:?] at org.apache.inlong.sort.Entrance.main(Entrance.java:76) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_342] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_342] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_342] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-clients_2.11-1.13.5.jar:1.13.5] ... 12 more Caused by: org.apache.hudi.exception.HoodieValidationException: Field '' specified in option 'hoodie.datasource.write.recordkey.field' does not exist in the table schema. at org.apache.hudi.table.HoodieTableFactory.lambda$sanityCheck$2(HoodieTableFactory.java:139) ~[?:?] at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_342] at org.apache.hudi.table.HoodieTableFactory.sanityCheck(HoodieTableFactory.java:137) ~[?:?] at org.apache.hudi.table.HoodieTableFactory.createDynamicTableSink(HoodieTableFactory.java:91) ~[?:?] at org.apache.inlong.sort.hudi.table.HudiTableInlongFactory.createDynamicTableSink(HudiTableInlongFactory.java:51) ~[?:?] at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168) ~[flink-table-common-1.13.5.jar:1.13.5] at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:367) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[scala-library-2.11.12.jar:?] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[scala-library-2.11.12.jar:?] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.11.12.jar:?] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[scala-library-2.11.12.jar:?] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) ~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) ~[flink-table-api-java-1.13.5.jar:1.13.5] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) ~[flink-table-api-java-1.13.5.jar:1.13.5] at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99) ~[flink-table-api-java-1.13.5.jar:1.13.5] at org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:84) ~[?:?] at org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63) ~[?:?] at org.apache.inlong.sort.Entrance.main(Entrance.java:76) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_342] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_342] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_342] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-clients_2.11-1.13.5.jar:1.13.5] ... 12 more
Use Hudi sink
No response
master
InLong Manager
The text was updated successfully, but these errors were encountered:
EMsnap
Successfully merging a pull request may close this issue.
What happened
What you expected to happen
Use Hudi sink
How to reproduce
Use Hudi sink
Environment
No response
InLong version
master
InLong Component
InLong Manager
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: