Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: String 字段包含空值,NULL指针异常 #1104

Closed
svea-vip opened this issue Sep 4, 2024 · 7 comments
Closed

[Bug]: String 字段包含空值,NULL指针异常 #1104

svea-vip opened this issue Sep 4, 2024 · 7 comments
Assignees
Labels
bug Something isn't working

Comments

@svea-vip
Copy link
Contributor

svea-vip commented Sep 4, 2024

What happened?

source 是 mysql, sink是HDFS,如果源表字段存在空值的行,出现空指针异常。

Version

4.1.6 (Default)

OS Type

Linux (Default)

Java JDK Version

Oracle JDK 1.8.0

Relevant log output

WriterRunner         - Writer Runner Received Exceptions:
	java.lang.NullPointerException: null
		at org.apache.parquet.io.api.Binary$FromStringBinary.encodeUTF8(Binary.java:241)
		at org.apache.parquet.io.api.Binary$FromStringBinary.<init>(Binary.java:232)
		at org.apache.parquet.io.api.Binary.fromString(Binary.java:602)
		at org.apache.parquet.example.data.Group.append(Group.java:131)
		at com.wgzhao.addax.plugin.writer.hdfswriter.HdfsHelper.transportParRecord(HdfsHelper.java:267)
		at com.wgzhao.addax.plugin.writer.hdfswriter.HdfsHelper.parquetFileStartWrite(HdfsHelper.java:606)
		at com.wgzhao.addax.plugin.writer.hdfswriter.HdfsWriter$Task.startWrite(HdfsWriter.java:411)
		at com.wgzhao.addax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:77)
		at java.lang.Thread.run(Thread.java:745)
	Exception in thread "taskGroup-0" com.wgzhao.addax.common.exception.AddaxException: java.lang.NullPointerException
		at org.apache.parquet.io.api.Binary$FromStringBinary.encodeUTF8(Binary.java:241)
		at org.apache.parquet.io.api.Binary$FromStringBinary.<init>(Binary.java:232)
		at org.apache.parquet.io.api.Binary.fromString(Binary.java:602)
		at org.apache.parquet.example.data.Group.append(Group.java:131)
		at com.wgzhao.addax.plugin.writer.hdfswriter.HdfsHelper.transportParRecord(HdfsHelper.java:267)
		at com.wgzhao.addax.plugin.writer.hdfswriter.HdfsHelper.parquetFileStartWrite(HdfsHelper.java:606)
		at com.wgzhao.addax.plugin.writer.hdfswriter.HdfsWriter$Task.startWrite(HdfsWriter.java:411)
		at com.wgzhao.addax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:77)
		at java.lang.Thread.run(Thread.java:745)
	
		at com.wgzhao.addax.common.exception.AddaxException.asAddaxException(AddaxException.java:66)
		at com.wgzhao.addax.core.taskgroup.TaskGroupContainer.start(TaskGroupContainer.java:188)
		at com.wgzhao.addax.core.taskgroup.runner.TaskGroupContainerRunner.run(TaskGroupContainerRunner.java:44)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
		at java.lang.Thread.run(Thread.java:745)
	Caused by: java.lang.NullPointerException
		at org.apache.parquet.io.api.Binary$FromStringBinary.encodeUTF8(Binary.java:241)
		at org.apache.parquet.io.api.Binary$FromStringBinary.<init>(Binary.java:232)
		at org.apache.parquet.io.api.Binary.fromString(Binary.java:602)
		at org.apache.parquet.example.data.Group.append(Group.java:131)
		at com.wgzhao.addax.plugin.writer.hdfswriter.HdfsHelper.transportParRecord(HdfsHelper.java:267)
		at com.wgzhao.addax.plugin.writer.hdfswriter.HdfsHelper.parquetFileStartWrite(HdfsHelper.java:606)
		at com.wgzhao.addax.plugin.writer.hdfswriter.HdfsWriter$Task.startWrite(HdfsWriter.java:411)
		at com.wgzhao.addax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:77)
		... 1 more
@svea-vip svea-vip added the bug Something isn't working label Sep 4, 2024
@wgzhao
Copy link
Owner

wgzhao commented Sep 4, 2024

请粘贴完整的输出日志,附加运行环境说明更佳

@svea-vip
Copy link
Contributor Author

svea-vip commented Sep 4, 2024

我找到了大概问题的原因,在HdfsHelper.java中的244至251行代码处的逻辑判断出了点问题。
原代码
column = record.getColumn(i);
String colName = columns.get(i).getString(Key.NAME);
String typename = columns.get(i).getString(Key.TYPE).toUpperCase();
if (null == column || column.getRawData() == null) {
group.append(colName, "");
}
SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename);
assert column != null;
对于column的空判断后进入group.append(colName,"");后面应该添加continue;以中断当次循环处理
这样处理后下面的assert column != null; 似乎就永远为true了,可以去除。
当源端的数据一行数据中的其中一个字段的值为NULL时
这里的断言是不完整的,存在column!=null但是column..getRawData()==null的情况,这会导致实际下面的CASE STRING中的column.asString抛出issue所述的异常。

@wgzhao
Copy link
Owner

wgzhao commented Sep 5, 2024

本地测试无问题

mysql> select col1, col2 from test.tbl0 where col2 > 4 limit 10;
+--------------------------------------------------------+------+
| col1                                                   | col2 |
+--------------------------------------------------------+------+
| NULL                                                   |    8 |
| a5jkjvVU7tRdHtyuTLVqc5UZ7eVj8nF4vIG2fg72baWY8Jfjj2zW   |    5 |
| RXXfnK69VjKi2arp3iUTexXIPDfjMXZWCXX3Zn9Am0AxyL5rEDGC4M |    5 |
| NULL                                                   |    9 |
| NULL                                                   |    7 |
| NULL                                                   |    9 |
| Vk5g0QobjimCkPMzjS4Nxq42Nd7jCnPPGQqHCH9BqgwIydc3JL     |    6 |
| NULL                                                   |    7 |
| NULL                                                   |    7 |
| NULL                                                   |    8 |
+--------------------------------------------------------+------+
10 rows in set (0.00 sec)
 bin/addax.sh mysql2hdfs.json

  ___      _     _
 / _ \    | |   | |
/ /_\ \ __| | __| | __ ___  __
|  _  |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |>  <
\_| |_/\__,_|\__,_|\__,_/_/\_\

:: Addax version ::    (v4.1.5-SNAPSHOT)

2024-09-05 09:32:02.521 [        main] INFO  VMInfo               - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2024-09-05 09:32:02.532 [        main] INFO  Engine               -
{
        "setting":{
                "speed":{
                        "byte":-1,
                        "channel":1
                }
        },
        "content":{
                "reader":{
                        "name":"mysqlreader",
                        "parameter":{
                                "username":"root",
                                "password":"*****",
                                "column":[
                                        "*"
                                ],
                                "connection":[
                                        {
                                                "table":[
                                                        "tbl0"
                                                ],
                                                "jdbcUrl":[
                                                        "jdbc:mysql://127.0.0.1:3306/test"
                                                ],
                                                "driver":"com.mysql.jdbc.Driver"
                                        }
                                ],
                                "where":""
                        }
                },
                "writer":{
                        "name":"hdfswriter",
                        "parameter":{
                                "defaultFS":"hdfs://cluster",
                                "fileType":"orc",
                                "path":"/tmp/out",
                                "fileName":"test",
                                "column":[
                                        {
                                                "name":"col1",
                                                "type":"string"
                                        },
                                        {
                                                "name":"col2",
                                                "type":"int"
                                        },
                                        {
                                                "name":"col3",
                                                "type":"string"
                                        },
                                        {
                                                "name":"col4",
                                                "type":"int"
                                        },
                                        {
                                                "name":"col5",
                                                "type":"binary"
                                        }
                                ],
                                "writeMode":"overwrite",
                                "fieldDelimiter":"\u0001",
                                "compress":"SNAPPY",
                                "hadoopConfig":{
                                        "dfs.nameservices":"cluster",
                                        "dfs.ha.namenodes.cluster":"nn1,nn2",
                                        "dfs.namenode.rpc-address.cluster.nn1":"nn01:8020",
                                        "dfs.namenode.rpc-address.cluster.nn2":"nn02:8020",
                                        "dfs.client.failover.proxy.provider.cluster":"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                                }
                        }
                }
        }
}

2024-09-05 09:32:02.544 [        main] INFO  JobContainer         - The jobContainer begins to process the job.
2024-09-05 09:32:02.551 [       job-0] WARN  OriginalConfPretreatmentUtil - use specified driver class: com.mysql.jdbc.Driver
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2024-09-05 09:32:02.851 [       job-0] INFO  OriginalConfPretreatmentUtil - Available jdbcUrl [jdbc:mysql://127.0.0.1:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true&serverTimezone=GMT%2B8].
2024-09-05 09:32:02.852 [       job-0] WARN  OriginalConfPretreatmentUtil - There are some risks in the column configuration. Because you did not configure the columns to read the database table, changes in the number and types of fields in your table may affect the correctness of the task or even cause errors.
2024-09-05 09:32:03.030 [       job-0] WARN  NativeCodeLoader     - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
log4j:WARN No appenders could be found for logger (org.apache.htrace.core.Tracer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2024-09-05 09:32:03.390 [       job-0] INFO  JobContainer         - The Reader.Job [mysqlreader] perform prepare work .
2024-09-05 09:32:03.391 [       job-0] INFO  JobContainer         - The Writer.Job [hdfswriter] perform prepare work .
2024-09-05 09:32:03.544 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channel(s).
2024-09-05 09:32:03.545 [       job-0] INFO  JobContainer         - The Reader.Job [mysqlreader] is divided into [1] task(s).
2024-09-05 09:32:03.546 [       job-0] INFO  HdfsWriter$Job       - Begin splitting ...
2024-09-05 09:32:03.561 [       job-0] INFO  HdfsWriter$Job       - The split wrote files :[/tmp/out/.77bef630_980f_4e71_a7e8_e412f01fd762/test_20240905_093203_560_g28w1426.orc]
2024-09-05 09:32:03.561 [       job-0] INFO  HdfsWriter$Job       - Finish splitting.
2024-09-05 09:32:03.561 [       job-0] INFO  JobContainer         - The Writer.Job [hdfswriter] is divided into [1] task(s).
2024-09-05 09:32:03.576 [       job-0] INFO  JobContainer         - The Scheduler launches [1] taskGroup(s).
2024-09-05 09:32:03.580 [ taskGroup-0] INFO  TaskGroupContainer   - The taskGroupId=[0] started [1] channels for [1] tasks.
2024-09-05 09:32:03.582 [ taskGroup-0] INFO  Channel              - The Channel set byte_speed_limit to -1, No bps activated.
2024-09-05 09:32:03.582 [ taskGroup-0] INFO  Channel              - The Channel set record_speed_limit to -1, No tps activated.
2024-09-05 09:32:03.588 [  reader-0-0] INFO  CommonRdbmsReader$Task - Begin reading records by executing SQL query: [select * from tbl0 ].
2024-09-05 09:32:03.594 [  writer-0-0] INFO  HdfsWriter$Task      - Begin to write file : [/tmp/out/.77bef630_980f_4e71_a7e8_e412f01fd762/test_20240905_093203_560_g28w1426.orc]
2024-09-05 09:32:03.613 [  writer-0-0] INFO  HadoopShimsCurrent   - Can't get KeyProvider for ORC encryption from hadoop.security.key.provider.path.
2024-09-05 09:32:03.639 [  reader-0-0] INFO  CommonRdbmsReader$Task - Finished reading records by executing SQL query: [select * from tbl0 ].
2024-09-05 09:32:03.822 [  writer-0-0] INFO  PhysicalFsWriter     - ORC writer created for path: /tmp/out/.77bef630_980f_4e71_a7e8_e412f01fd762/test_20240905_093203_560_g28w1426.orc with stripeSize: 67108864 blockSize: 268435456 compression: Compress: SNAPPY buffer: 262144
2024-09-05 09:32:03.846 [  writer-0-0] INFO  WriterImpl           - ORC writer created for path: /tmp/out/.77bef630_980f_4e71_a7e8_e412f01fd762/test_20240905_093203_560_g28w1426.orc with stripeSize: 67108864 options: Compress: SNAPPY buffer: 262144
2024-09-05 09:32:04.167 [  writer-0-0] INFO  HdfsWriter$Task      - Finish write
2024-09-05 09:32:06.587 [       job-0] INFO  AbstractScheduler    - The scheduler has completed all tasks.
2024-09-05 09:32:06.588 [       job-0] INFO  JobContainer         - The Writer.Job [hdfswriter] perform post work.
2024-09-05 09:32:06.614 [       job-0] INFO  HdfsHelper           - Begin to move file from [hdfs://cluster/tmp/out/.77bef630_980f_4e71_a7e8_e412f01fd762/test_20240905_093203_560_g28w1426.orc] to [out].
2024-09-05 09:32:06.660 [       job-0] INFO  HdfsHelper           - Finish move file(s).
2024-09-05 09:32:06.661 [       job-0] INFO  HdfsHelper           - Begin to delete temporary dir [/tmp/out/.77bef630_980f_4e71_a7e8_e412f01fd762] .
2024-09-05 09:32:06.701 [       job-0] INFO  HdfsHelper           - Finish deleting temporary dir [/tmp/out/.77bef630_980f_4e71_a7e8_e412f01fd762] .
2024-09-05 09:32:06.703 [       job-0] INFO  JobContainer         - The Reader.Job [mysqlreader] perform post work.
2024-09-05 09:32:06.711 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 95 records, 4808 bytes | Speed 1.56KB/s, 31 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2024-09-05 09:32:06.714 [       job-0] INFO  JobContainer         -
Job start  at             : 2024-09-05 09:32:02
Job end    at             : 2024-09-05 09:32:06
Job took secs             :                  4s
Average   bps             :            1.56KB/s
Average   rps             :             31rec/s
Number of rec             :                  95
Failed record             :                   0

@svea-vip
Copy link
Contributor Author

svea-vip commented Sep 5, 2024

那就奇怪了,我用的4.1.6的版本测试是存在问题的,我修改后重新编译可以正常运行通过

@wgzhao
Copy link
Owner

wgzhao commented Sep 5, 2024

方便的话,请给出完整的任务运行输出,能附上MySQL 样例数据更佳

@svea-vip
Copy link
Contributor Author

svea-vip commented Sep 5, 2024

对了,我这里输出的不是orc是parquet格式,看上去你测试的orcFileStartWrite方法不会进入到issue代码

@svea-vip
Copy link
Contributor Author

svea-vip commented Sep 5, 2024

transportParRecord的方法在HdfsHelper只有一个调用

public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
                                      TaskPluginCollector taskPluginCollector) {

        List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
        String compress = config.getString(Key.COMPRESS, "UNCOMPRESSED").toUpperCase().trim();
        if ("NONE".equals(compress)) {
            compress = "UNCOMPRESSED";
        }
        CompressionCodecName codecName = CompressionCodecName.fromConf(compress);
        // construct parquet schema
        MessageType s = generateParquetSchema(columns);
        Path path = new Path(fileName);
        LOG.info("Begin to write parquet file [{}]", fileName);

        GenericData decimalSupport = new GenericData();
        decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
        hadoopConf.setBoolean(AvroReadSupport.READ_INT96_AS_FIXED, true);
        hadoopConf.setBoolean(AvroWriteSupport.WRITE_FIXED_AS_INT96, true);
        GroupWriteSupport.setSchema(s, hadoopConf);
        Map<String, String> extraMeta = new HashMap<>();
        // hive need timezone info to handle timestamp
        extraMeta.put("writer.time.zone", ZoneId.systemDefault().toString());
        try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(path, hadoopConf))
                .withCompressionCodec(codecName)
                .withConf(hadoopConf)
                .enableDictionaryEncoding()
                .withPageSize(1024)
                .withDictionaryPageSize(512)
                .withValidation(false)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                .withExtraMetaData(extraMeta)
                .build()) {
            SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(s);
            Group group;
            Record record;
            while ((record = lineReceiver.getFromReader()) != null) {
                group = transportParRecord(record, columns, taskPluginCollector, simpleGroupFactory);
                writer.write(group);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

orc格式进入的是orcFileStartWrite的方法,不会调用group = transportParRecord(record, columns, taskPluginCollector, simpleGroupFactory);

@wgzhao wgzhao closed this as completed in 4923ad9 Sep 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants