@@ -236,6 +236,7 @@ use tracing::instrument;
236
236
237
237
use crate :: data:: DataBlock ;
238
238
use crate :: encoder:: { values_column_encoding, EncodedBatch } ;
239
+ use crate :: encodings:: logical:: binary:: BinaryFieldScheduler ;
239
240
use crate :: encodings:: logical:: list:: { ListFieldScheduler , OffsetPageInfo } ;
240
241
use crate :: encodings:: logical:: primitive:: PrimitiveFieldScheduler ;
241
242
use crate :: encodings:: logical:: r#struct:: { SimpleStructDecoder , SimpleStructScheduler } ;
@@ -622,7 +623,7 @@ impl CoreFieldDecoderStrategy {
622
623
}
623
624
624
625
fn is_primitive ( data_type : & DataType ) -> bool {
625
- if data_type. is_primitive ( ) | data_type . is_binary_like ( ) {
626
+ if data_type. is_primitive ( ) {
626
627
true
627
628
} else {
628
629
match data_type {
@@ -676,6 +677,83 @@ impl CoreFieldDecoderStrategy {
676
677
pb:: array_encoding:: ArrayEncoding :: PackedStruct ( _)
677
678
)
678
679
}
680
+
681
+ fn create_list_scheduler < ' a > (
682
+ & self ,
683
+ list_field : & Field ,
684
+ column_infos : & mut ColumnInfoIter ,
685
+ buffers : FileBuffers ,
686
+ offsets_column : & ColumnInfo ,
687
+ chain : DecoderMiddlewareChainCursor < ' a > ,
688
+ ) -> Result < ChosenFieldScheduler < ' a > > {
689
+ Self :: ensure_values_encoded ( offsets_column, chain. current_path ( ) ) ?;
690
+ let offsets_column_buffers = ColumnBuffers {
691
+ file_buffers : buffers,
692
+ positions_and_sizes : & offsets_column. buffer_offsets_and_sizes ,
693
+ } ;
694
+ let ( chain, items_scheduler) = chain. new_child (
695
+ /*child_idx=*/ 0 ,
696
+ & list_field. children [ 0 ] ,
697
+ column_infos,
698
+ buffers,
699
+ ) ?;
700
+ let items_scheduler = items_scheduler?;
701
+
702
+ let ( inner_infos, null_offset_adjustments) : ( Vec < _ > , Vec < _ > ) = offsets_column
703
+ . page_infos
704
+ . iter ( )
705
+ . filter ( |offsets_page| offsets_page. num_rows > 0 )
706
+ . map ( |offsets_page| {
707
+ if let Some ( pb:: array_encoding:: ArrayEncoding :: List ( list_encoding) ) =
708
+ & offsets_page. encoding . array_encoding
709
+ {
710
+ let inner = PageInfo {
711
+ buffer_offsets_and_sizes : offsets_page. buffer_offsets_and_sizes . clone ( ) ,
712
+ encoding : list_encoding. offsets . as_ref ( ) . unwrap ( ) . as_ref ( ) . clone ( ) ,
713
+ num_rows : offsets_page. num_rows ,
714
+ } ;
715
+ (
716
+ inner,
717
+ OffsetPageInfo {
718
+ offsets_in_page : offsets_page. num_rows ,
719
+ null_offset_adjustment : list_encoding. null_offset_adjustment ,
720
+ num_items_referenced_by_page : list_encoding. num_items ,
721
+ } ,
722
+ )
723
+ } else {
724
+ // TODO: Should probably return Err here
725
+ panic ! ( "Expected a list column" ) ;
726
+ }
727
+ } )
728
+ . unzip ( ) ;
729
+ let inner = Arc :: new ( PrimitiveFieldScheduler :: new (
730
+ offsets_column. index ,
731
+ DataType :: UInt64 ,
732
+ Arc :: from ( inner_infos. into_boxed_slice ( ) ) ,
733
+ offsets_column_buffers,
734
+ self . validate_data ,
735
+ ) ) as Arc < dyn FieldScheduler > ;
736
+ let items_field = match list_field. data_type ( ) {
737
+ DataType :: List ( inner) => inner,
738
+ DataType :: LargeList ( inner) => inner,
739
+ _ => unreachable ! ( ) ,
740
+ } ;
741
+ let offset_type = if matches ! ( list_field. data_type( ) , DataType :: List ( _) ) {
742
+ DataType :: Int32
743
+ } else {
744
+ DataType :: Int64
745
+ } ;
746
+ Ok ( (
747
+ chain,
748
+ Ok ( Arc :: new ( ListFieldScheduler :: new (
749
+ inner,
750
+ items_scheduler,
751
+ items_field,
752
+ offset_type,
753
+ null_offset_adjustments,
754
+ ) ) as Arc < dyn FieldScheduler > ) ,
755
+ ) )
756
+ }
679
757
}
680
758
681
759
impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
@@ -696,6 +774,60 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
696
774
buffers,
697
775
) ?;
698
776
return Ok ( ( chain, Ok ( scheduler) ) ) ;
777
+ } else if data_type. is_binary_like ( ) {
778
+ let column_info = column_infos. next ( ) . unwrap ( ) . clone ( ) ;
779
+ if let Some ( page_info) = column_info. page_infos . first ( ) {
780
+ if matches ! (
781
+ page_info. encoding,
782
+ pb:: ArrayEncoding {
783
+ array_encoding: Some ( pb:: array_encoding:: ArrayEncoding :: List ( ..) )
784
+ }
785
+ ) {
786
+ let list_type = if matches ! ( data_type, DataType :: Utf8 | DataType :: Binary ) {
787
+ DataType :: List ( Arc :: new ( ArrowField :: new ( "item" , DataType :: UInt8 , false ) ) )
788
+ } else {
789
+ DataType :: LargeList ( Arc :: new ( ArrowField :: new (
790
+ "item" ,
791
+ DataType :: UInt8 ,
792
+ false ,
793
+ ) ) )
794
+ } ;
795
+ let list_field = Field :: try_from ( ArrowField :: new (
796
+ field. name . clone ( ) ,
797
+ list_type,
798
+ field. nullable ,
799
+ ) )
800
+ . unwrap ( ) ;
801
+ let ( chain, list_scheduler) = self . create_list_scheduler (
802
+ & list_field,
803
+ column_infos,
804
+ buffers,
805
+ & column_info,
806
+ chain,
807
+ ) ?;
808
+ let binary_scheduler = Arc :: new ( BinaryFieldScheduler :: new (
809
+ list_scheduler?,
810
+ field. data_type ( ) . clone ( ) ,
811
+ ) ) ;
812
+ return Ok ( ( chain, Ok ( binary_scheduler) ) ) ;
813
+ } else {
814
+ let scheduler = self . create_primitive_scheduler (
815
+ & data_type,
816
+ chain. current_path ( ) ,
817
+ & column_info,
818
+ buffers,
819
+ ) ?;
820
+ return Ok ( ( chain, Ok ( scheduler) ) ) ;
821
+ }
822
+ } else {
823
+ let scheduler = self . create_primitive_scheduler (
824
+ & data_type,
825
+ chain. current_path ( ) ,
826
+ & column_info,
827
+ buffers,
828
+ ) ?;
829
+ return Ok ( ( chain, Ok ( scheduler) ) ) ;
830
+ }
699
831
}
700
832
match & data_type {
701
833
DataType :: FixedSizeList ( inner, _dimension) => {
@@ -715,7 +847,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
715
847
}
716
848
}
717
849
DataType :: Dictionary ( _key_type, value_type) => {
718
- if Self :: is_primitive ( value_type) {
850
+ if Self :: is_primitive ( value_type) || value_type . is_binary_like ( ) {
719
851
let primitive_col = column_infos. expect_next ( ) ?;
720
852
let scheduler = self . create_primitive_scheduler (
721
853
& data_type,
@@ -735,71 +867,10 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
735
867
} )
736
868
}
737
869
}
738
- DataType :: List ( items_field ) | DataType :: LargeList ( items_field ) => {
870
+ DataType :: List ( _ ) | DataType :: LargeList ( _ ) => {
739
871
let offsets_column = column_infos. expect_next ( ) ?. clone ( ) ;
740
872
column_infos. next_top_level ( ) ;
741
- Self :: ensure_values_encoded ( offsets_column. as_ref ( ) , chain. current_path ( ) ) ?;
742
- let offsets_column_buffers = ColumnBuffers {
743
- file_buffers : buffers,
744
- positions_and_sizes : & offsets_column. buffer_offsets_and_sizes ,
745
- } ;
746
- let ( chain, items_scheduler) = chain. new_child (
747
- /*child_idx=*/ 0 ,
748
- & field. children [ 0 ] ,
749
- column_infos,
750
- buffers,
751
- ) ?;
752
- let items_scheduler = items_scheduler?;
753
-
754
- let ( inner_infos, null_offset_adjustments) : ( Vec < _ > , Vec < _ > ) = offsets_column
755
- . page_infos
756
- . iter ( )
757
- . filter ( |offsets_page| offsets_page. num_rows > 0 )
758
- . map ( |offsets_page| {
759
- if let Some ( pb:: array_encoding:: ArrayEncoding :: List ( list_encoding) ) =
760
- & offsets_page. encoding . array_encoding
761
- {
762
- let inner = PageInfo {
763
- buffer_offsets_and_sizes : offsets_page
764
- . buffer_offsets_and_sizes
765
- . clone ( ) ,
766
- encoding : list_encoding. offsets . as_ref ( ) . unwrap ( ) . as_ref ( ) . clone ( ) ,
767
- num_rows : offsets_page. num_rows ,
768
- } ;
769
- (
770
- inner,
771
- OffsetPageInfo {
772
- offsets_in_page : offsets_page. num_rows ,
773
- null_offset_adjustment : list_encoding. null_offset_adjustment ,
774
- num_items_referenced_by_page : list_encoding. num_items ,
775
- } ,
776
- )
777
- } else {
778
- // TODO: Should probably return Err here
779
- panic ! ( "Expected a list column" ) ;
780
- }
781
- } )
782
- . unzip ( ) ;
783
- let inner = Arc :: new ( PrimitiveFieldScheduler :: new (
784
- offsets_column. index ,
785
- DataType :: UInt64 ,
786
- Arc :: from ( inner_infos. into_boxed_slice ( ) ) ,
787
- offsets_column_buffers,
788
- self . validate_data ,
789
- ) ) as Arc < dyn FieldScheduler > ;
790
- let offset_type = if matches ! ( data_type, DataType :: List ( _) ) {
791
- DataType :: Int32
792
- } else {
793
- DataType :: Int64
794
- } ;
795
- let list_scheduler = Ok ( Arc :: new ( ListFieldScheduler :: new (
796
- inner,
797
- items_scheduler,
798
- items_field. clone ( ) ,
799
- offset_type,
800
- null_offset_adjustments,
801
- ) ) as Arc < dyn FieldScheduler > ) ;
802
- Ok ( ( chain, list_scheduler) )
873
+ self . create_list_scheduler ( field, column_infos, buffers, & offsets_column, chain)
803
874
}
804
875
DataType :: Struct ( fields) => {
805
876
let column_info = column_infos. expect_next ( ) ?;
0 commit comments