@@ -235,6 +235,7 @@ use tracing::instrument;
235
235
236
236
use crate :: data:: DataBlock ;
237
237
use crate :: encoder:: { values_column_encoding, EncodedBatch } ;
238
+ use crate :: encodings:: logical:: binary:: BinaryFieldScheduler ;
238
239
use crate :: encodings:: logical:: list:: { ListFieldScheduler , OffsetPageInfo } ;
239
240
use crate :: encodings:: logical:: primitive:: PrimitiveFieldScheduler ;
240
241
use crate :: encodings:: logical:: r#struct:: { SimpleStructDecoder , SimpleStructScheduler } ;
@@ -612,7 +613,7 @@ impl CoreFieldDecoderStrategy {
612
613
}
613
614
614
615
fn is_primitive ( data_type : & DataType ) -> bool {
615
- if data_type. is_primitive ( ) | data_type . is_binary_like ( ) {
616
+ if data_type. is_primitive ( ) {
616
617
true
617
618
} else {
618
619
match data_type {
@@ -666,6 +667,81 @@ impl CoreFieldDecoderStrategy {
666
667
pb:: array_encoding:: ArrayEncoding :: PackedStruct ( _)
667
668
)
668
669
}
670
+
671
+ fn create_list_scheduler < ' a > (
672
+ & self ,
673
+ list_field : & Field ,
674
+ column_infos : & mut ColumnInfoIter ,
675
+ buffers : FileBuffers ,
676
+ offsets_column : & ColumnInfo ,
677
+ chain : DecoderMiddlewareChainCursor < ' a > ,
678
+ ) -> Result < ChosenFieldScheduler < ' a > > {
679
+ Self :: ensure_values_encoded ( offsets_column, chain. current_path ( ) ) ?;
680
+ let offsets_column_buffers = ColumnBuffers {
681
+ file_buffers : buffers,
682
+ positions_and_sizes : & offsets_column. buffer_offsets_and_sizes ,
683
+ } ;
684
+ let item_field_name = list_field. children [ 0 ] . name . clone ( ) ;
685
+ let ( chain, items_scheduler) = chain. new_child (
686
+ /*child_idx=*/ 0 ,
687
+ & list_field. children [ 0 ] ,
688
+ column_infos,
689
+ buffers,
690
+ ) ?;
691
+ let items_scheduler = items_scheduler?;
692
+
693
+ let ( inner_infos, null_offset_adjustments) : ( Vec < _ > , Vec < _ > ) = offsets_column
694
+ . page_infos
695
+ . iter ( )
696
+ . filter ( |offsets_page| offsets_page. num_rows > 0 )
697
+ . map ( |offsets_page| {
698
+ if let Some ( pb:: array_encoding:: ArrayEncoding :: List ( list_encoding) ) =
699
+ & offsets_page. encoding . array_encoding
700
+ {
701
+ let inner = PageInfo {
702
+ buffer_offsets_and_sizes : offsets_page. buffer_offsets_and_sizes . clone ( ) ,
703
+ encoding : list_encoding. offsets . as_ref ( ) . unwrap ( ) . as_ref ( ) . clone ( ) ,
704
+ num_rows : offsets_page. num_rows ,
705
+ } ;
706
+ (
707
+ inner,
708
+ OffsetPageInfo {
709
+ offsets_in_page : offsets_page. num_rows ,
710
+ null_offset_adjustment : list_encoding. null_offset_adjustment ,
711
+ num_items_referenced_by_page : list_encoding. num_items ,
712
+ } ,
713
+ )
714
+ } else {
715
+ // TODO: Should probably return Err here
716
+ panic ! ( "Expected a list column" ) ;
717
+ }
718
+ } )
719
+ . unzip ( ) ;
720
+ let inner = Arc :: new ( PrimitiveFieldScheduler :: new (
721
+ offsets_column. index ,
722
+ DataType :: UInt64 ,
723
+ Arc :: from ( inner_infos. into_boxed_slice ( ) ) ,
724
+ offsets_column_buffers,
725
+ self . validate_data ,
726
+ ) ) as Arc < dyn FieldScheduler > ;
727
+ let offset_type = if matches ! ( list_field. data_type( ) , DataType :: List ( _) ) {
728
+ DataType :: Int32
729
+ } else {
730
+ DataType :: Int64
731
+ } ;
732
+ let items_type = list_field. children [ 0 ] . data_type ( ) . clone ( ) ;
733
+ Ok ( (
734
+ chain,
735
+ Ok ( Arc :: new ( ListFieldScheduler :: new (
736
+ inner,
737
+ items_scheduler,
738
+ item_field_name. clone ( ) ,
739
+ items_type,
740
+ offset_type,
741
+ null_offset_adjustments,
742
+ ) ) as Arc < dyn FieldScheduler > ) ,
743
+ ) )
744
+ }
669
745
}
670
746
671
747
impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
@@ -686,6 +762,60 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
686
762
buffers,
687
763
) ?;
688
764
return Ok ( ( chain, Ok ( scheduler) ) ) ;
765
+ } else if data_type. is_binary_like ( ) {
766
+ let column_info = column_infos. next ( ) . unwrap ( ) ;
767
+ if let Some ( page_info) = column_info. page_infos . first ( ) {
768
+ if matches ! (
769
+ page_info. encoding,
770
+ pb:: ArrayEncoding {
771
+ array_encoding: Some ( pb:: array_encoding:: ArrayEncoding :: List ( ..) )
772
+ }
773
+ ) {
774
+ let list_type = if matches ! ( data_type, DataType :: Utf8 | DataType :: Binary ) {
775
+ DataType :: List ( Arc :: new ( ArrowField :: new ( "item" , DataType :: UInt8 , false ) ) )
776
+ } else {
777
+ DataType :: LargeList ( Arc :: new ( ArrowField :: new (
778
+ "item" ,
779
+ DataType :: UInt8 ,
780
+ false ,
781
+ ) ) )
782
+ } ;
783
+ let list_field = Field :: try_from ( ArrowField :: new (
784
+ field. name . clone ( ) ,
785
+ list_type,
786
+ field. nullable ,
787
+ ) )
788
+ . unwrap ( ) ;
789
+ let ( chain, list_scheduler) = self . create_list_scheduler (
790
+ & list_field,
791
+ column_infos,
792
+ buffers,
793
+ column_info,
794
+ chain,
795
+ ) ?;
796
+ let binary_scheduler = Arc :: new ( BinaryFieldScheduler :: new (
797
+ list_scheduler?,
798
+ field. data_type ( ) . clone ( ) ,
799
+ ) ) ;
800
+ return Ok ( ( chain, Ok ( binary_scheduler) ) ) ;
801
+ } else {
802
+ let scheduler = self . create_primitive_scheduler (
803
+ & data_type,
804
+ chain. current_path ( ) ,
805
+ column_info,
806
+ buffers,
807
+ ) ?;
808
+ return Ok ( ( chain, Ok ( scheduler) ) ) ;
809
+ }
810
+ } else {
811
+ let scheduler = self . create_primitive_scheduler (
812
+ & data_type,
813
+ chain. current_path ( ) ,
814
+ column_info,
815
+ buffers,
816
+ ) ?;
817
+ return Ok ( ( chain, Ok ( scheduler) ) ) ;
818
+ }
689
819
}
690
820
match & data_type {
691
821
DataType :: FixedSizeList ( inner, _dimension) => {
@@ -725,73 +855,9 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
725
855
} )
726
856
}
727
857
}
728
- DataType :: List ( items_field ) | DataType :: LargeList ( items_field ) => {
858
+ DataType :: List ( _ ) | DataType :: LargeList ( _ ) => {
729
859
let offsets_column = column_infos. next ( ) . unwrap ( ) ;
730
- Self :: ensure_values_encoded ( offsets_column, chain. current_path ( ) ) ?;
731
- let offsets_column_buffers = ColumnBuffers {
732
- file_buffers : buffers,
733
- positions_and_sizes : & offsets_column. buffer_offsets_and_sizes ,
734
- } ;
735
- let item_field_name = items_field. name ( ) . clone ( ) ;
736
- let ( chain, items_scheduler) = chain. new_child (
737
- /*child_idx=*/ 0 ,
738
- & field. children [ 0 ] ,
739
- column_infos,
740
- buffers,
741
- ) ?;
742
- let items_scheduler = items_scheduler?;
743
-
744
- let ( inner_infos, null_offset_adjustments) : ( Vec < _ > , Vec < _ > ) = offsets_column
745
- . page_infos
746
- . iter ( )
747
- . filter ( |offsets_page| offsets_page. num_rows > 0 )
748
- . map ( |offsets_page| {
749
- if let Some ( pb:: array_encoding:: ArrayEncoding :: List ( list_encoding) ) =
750
- & offsets_page. encoding . array_encoding
751
- {
752
- let inner = PageInfo {
753
- buffer_offsets_and_sizes : offsets_page
754
- . buffer_offsets_and_sizes
755
- . clone ( ) ,
756
- encoding : list_encoding. offsets . as_ref ( ) . unwrap ( ) . as_ref ( ) . clone ( ) ,
757
- num_rows : offsets_page. num_rows ,
758
- } ;
759
- (
760
- inner,
761
- OffsetPageInfo {
762
- offsets_in_page : offsets_page. num_rows ,
763
- null_offset_adjustment : list_encoding. null_offset_adjustment ,
764
- num_items_referenced_by_page : list_encoding. num_items ,
765
- } ,
766
- )
767
- } else {
768
- // TODO: Should probably return Err here
769
- panic ! ( "Expected a list column" ) ;
770
- }
771
- } )
772
- . unzip ( ) ;
773
- let inner = Arc :: new ( PrimitiveFieldScheduler :: new (
774
- offsets_column. index ,
775
- DataType :: UInt64 ,
776
- Arc :: from ( inner_infos. into_boxed_slice ( ) ) ,
777
- offsets_column_buffers,
778
- self . validate_data ,
779
- ) ) as Arc < dyn FieldScheduler > ;
780
- let offset_type = if matches ! ( data_type, DataType :: List ( _) ) {
781
- DataType :: Int32
782
- } else {
783
- DataType :: Int64
784
- } ;
785
- let items_type = items_field. data_type ( ) . clone ( ) ;
786
- let list_scheduler = Ok ( Arc :: new ( ListFieldScheduler :: new (
787
- inner,
788
- items_scheduler,
789
- item_field_name. clone ( ) ,
790
- items_type,
791
- offset_type,
792
- null_offset_adjustments,
793
- ) ) as Arc < dyn FieldScheduler > ) ;
794
- Ok ( ( chain, list_scheduler) )
860
+ self . create_list_scheduler ( field, column_infos, buffers, offsets_column, chain)
795
861
}
796
862
DataType :: Struct ( fields) => {
797
863
let column_info = column_infos. next ( ) . unwrap ( ) ;
0 commit comments