-
Notifications
You must be signed in to change notification settings - Fork 145
/
Copy pathdirect_netcdf_mod.f90
3164 lines (2346 loc) · 116 KB
/
direct_netcdf_mod.f90
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
! DART software - Copyright UCAR. This open source software is provided
! by UCAR, "as is", without charge, subject to all terms of use at
! http://www.image.ucar.edu/DAReS/DART/DART_download
module direct_netcdf_mod
!> \defgroup direct_netcdf_mod direct_netcdf_mod
!> @{
!>
!> Netcdf IO for a domain.
!> The idea is to be generic rather than having a converter for each module.
!> Also aim to go to the filesystem only once, e.g.
!> \verbatim
!> wrf => dart => wrf
!> \endverbatim
!> rather than
!> \verbatim
!> wrf => wrf_to_dart => dart => dart_to_wrf => wrf
!> \endverbatim
!>
!> Every task needs the dimensions of each state variable to calculate
!> what it is going to recieve in the IO transpose (this is calculated in add_domain )
!> \par Aim of the limited transpose:
!>
!>To limit how much of the state vector you can read at once.
!> You can limit the transpose by memory using <code>buffer_state_io</code>.
!>
!>What you (potentially) gain from this:
!>
!>* Don't have to have the whole state vector.
!>* Don't have to use a parallel IO library.
!>
!>If buffer_state_io is false you have the regular transpose + IO, except:
!> 1. You are reading directly from a netcdf file, not a dart state vector file.
!> 2. You only transpose the copies that are being written/read.
!>
!> This code has multiple places where round-robin layout of state onto task is assumed.
!> These are in the section labelled: \n
!> \verbatim
!> !--------------------------------------------------------
!> ! Routines that are making the assumption that the ensemble
!> ! distribution is round-robin (distribution type 1)
!> !--------------------------------------------------------
!> \endverbatim
!>
!> read_transpose() is the read routine.
!> transpose_write() is the write routine.
!>
!> Note dart_index is an inout variable to read_transpose() and transpose_write().
!> This is making the assumption that the calling code is using dart_index in the following way:
!> * dart_index going in to the subroutines is where the domain starts in the state vector.
!> * dart_index coming out of the subroutines is where the domain ends.
!> * The domain is contiguous in the state_vector.
use types_mod, only : r4, r8, i4, i8, MISSING_R8, MISSING_R4, MISSING_I, &
digits12, metadatalength
use options_mod, only : get_missing_ok_status
use ensemble_manager_mod, only : ensemble_type, map_pe_to_task, map_task_to_pe, &
all_copies_to_all_vars, all_vars_to_all_copies, &
get_copy_owner_index, get_copy, get_ensemble_time, &
allocate_vars
use time_manager_mod, only : time_type, get_time, get_calendar_type, &
THIRTY_DAY_MONTHS, JULIAN, GREGORIAN, NOLEAP, &
operator(<), operator(>), operator(+), &
operator(-), operator(/), operator(*), &
operator(==), operator(/=)
use utilities_mod, only : error_handler, file_to_text, &
find_textfile_dims, file_exist, &
E_MSG, E_ALLMSG, E_ERR, E_DBG, E_WARN
use netcdf_utilities_mod, only : nc_check
use mpi_utilities_mod, only : task_count, send_to, receive_from, my_task_id, &
broadcast_flag
use state_structure_mod, only : get_num_variables, get_sum_variables, &
get_sum_variables_below, get_dim_length, &
get_variable_name, get_io_clamping_maxval, &
get_io_clamping_minval, do_io_clamping, &
get_io_num_dims, get_dim_lengths, &
get_variable_size, get_io_num_unique_dims, &
get_io_unique_dim_name, get_dim_name, &
get_io_unique_dim_length, get_scale_factor, &
set_var_id, get_domain_size, do_io_update, &
get_units, get_long_name, get_short_name, &
get_has_missing_value, get_FillValue, &
get_missing_value, get_add_offset, get_xtype, &
get_has_FillValue, &
get_index_start, get_index_end , get_num_dims, &
create_diagnostic_structure, &
end_diagnostic_structure, &
has_unlimited_dim
use io_filenames_mod, only : get_restart_filename, inherit_copy_units, &
stage_metadata_type, get_file_description, &
get_copy_name, copy_is_clamped, query_read_copy, &
query_write_copy, force_copy_back, file_info_type, &
netcdf_file_type, READ_COPY, WRITE_COPY, &
noutput_state_variables
use assim_model_mod, only : get_model_size, read_model_time, write_model_time
!>@todo FIXME : should move to assim_model_mod.f90
use model_mod, only : nc_write_model_atts
use typesizes
use netcdf
implicit none
private
public :: read_transpose, &
transpose_write, &
initialize_single_file_io, &
finalize_single_file_io, &
read_single_file, &
write_single_file, &
write_augmented_state, &
read_variables, &
nc_get_num_times
character(len=*), parameter :: source = 'direct_netcdf_mod.f90'
! only a single MPI Task reads and writes reads the state variable,
! when using single_file_{input,output} = .true.
integer, parameter :: SINGLE_IO_TASK_ID = 0
! module global variables
integer :: ret
character(len=512) :: msgstring, msgstring2
!>@todo FIXME:
!> this should be in a namelist somewhere and passed in from
!> a higher level routine.
!>
!> we had this in the ROMS branch but no one can remember why.
!> we think it's because we copied a template file into place
!> for filter to overwrite and it might not have had the right
!> analysis time in the file. it's true that filter doesn't
!> change the time in the file, but when we're doing direct
!> updates of a netcdf file it's also true that we rarely
!> update the file we read from; in case of an error you've
!> destroyed your input needed to rerun the job.
!> (but we don't remember for sure if this was the reason.)
logical :: overwrite_time_in_output_file = .false.
contains
!=================================================
! PUBLIC ROUTINES
!=================================================
!=================================================
! mutiple file IO
!=================================================
!-------------------------------------------------
!> Read and transpose variables in cyclic-cyclic distribution (round robbin)
subroutine read_transpose(state_ens_handle, name_handle, domain, dart_index, read_single_vars)
type(ensemble_type), intent(inout) :: state_ens_handle !< Ensemble handle
type(stage_metadata_type), intent(in) :: name_handle !< Name handle
integer, intent(in) :: domain !< Which domain to read
integer(i8), intent(inout) :: dart_index !< This is for multiple domains
logical, intent(in) :: read_single_vars !< Read one variable at a time
if (task_count() == 1) then
call read_transpose_single_task(state_ens_handle, name_handle, domain, dart_index)
else
call read_transpose_multi_task( state_ens_handle, name_handle, domain, dart_index, read_single_vars)
endif
end subroutine read_transpose
!-------------------------------------------------
!> This code transposes and writes out the state vector copies
!> This can either be done on a single task or with multiple tasks
subroutine transpose_write(state_ens_handle, name_handle, domain, &
dart_index, write_single_vars, write_single_precision)
type(ensemble_type), intent(inout) :: state_ens_handle
type(stage_metadata_type), intent(in) :: name_handle
integer, intent(in) :: domain
integer(i8), intent(inout) :: dart_index
logical, intent(in) :: write_single_vars !< write one variable at a time
logical, intent(in) :: write_single_precision
if (task_count() == 1) then
call transpose_write_single_task(state_ens_handle, name_handle, domain, &
dart_index, write_single_precision)
else
call transpose_write_multi_task(state_ens_handle, name_handle, domain, &
dart_index, write_single_vars, write_single_precision)
endif
end subroutine transpose_write
!=================================================
! single file IO
!=================================================
!-------------------------------------------------
!> Creates a template file for single file io
!> Calls the model for any model specific attributes to be written
subroutine initialize_single_file_io(ens_handle, file_handle)
type(ensemble_type), intent(inout) :: ens_handle
type(file_info_type), intent(inout) :: file_handle
! Typical sequence:
! NF90_OPEN ! create netCDF dataset: enter define mode
! NF90_def_dim ! define dimenstions: from name and length
! NF90_def_var ! define variables: from name, type, and dims
! NF90_put_att ! assign attribute values
! NF90_ENDDEF ! end definitions: leave define mode
! NF90_put_var ! provide values for variable
! NF90_CLOSE ! close: save updated netCDF dataset
!
! Time is a funny beast ...
! Many packages decode the time:units attribute to convert the offset to a calendar
! date/time format. Using an offset simplifies many operations, but is not the
! way we like to see stuff plotted. The "approved" calendars are:
! gregorian or standard
! Mixed Gregorian/Julian calendar as defined by Udunits. This is the default.
! noleap Modern calendar without leap years, i.e., all years are 365 days long.
! 360_day All years are 360 days divided into 30 day months.
! julian Julian calendar.
! no_calendar No calendar.
!
! location is another one ...
!@todo FIXME : need to have this work for multiple domains
! Variables for outputing input.nml variable
integer :: metadata_length, nlines, linelen, createmode
character(len=129), allocatable :: textblock(:)
character(len=metadatalength), allocatable :: state_meta(:)
! Netcdf variables
type(netcdf_file_type) :: ncFileID
integer :: MemberDimID
integer :: TimeDimID, TimeVarID
integer :: MetaDataDimID, MetadataVarID
integer :: nlinesDimID, linelenDimID, nmlVarID
! For single file output, dart will always have control
! of the file formatting
logical :: local_model_mod_will_write_state_variables = .false.
! Local variables including counters and storing names
character(len=256) :: fname
integer :: icopy, ivar, ret, ens_size, num_output_ens, domain
if (my_task_id() == 0) then
if(.not. byteSizesOK()) then
call error_handler(E_ERR,'initialize_single_file_io', &
'Compiler does not support required kinds of variables.',source)
end if
num_output_ens = file_handle%stage_metadata%noutput_ens
if (num_output_ens <= 0) num_output_ens = 1
allocate(state_meta(num_output_ens))
! Set up the metadata for the output file
do ivar = 1, num_output_ens
write(state_meta(ivar), '(a15, 1x, i6)') 'ensemble member', ivar
write(file_handle%stage_metadata%copy_name(ivar),'(a,i4.4)') 'ens_mem_',ivar
end do
metadata_length = LEN(state_meta(1))
! NetCDF large file support
createmode = NF90_64BIT_OFFSET
! Create the file
fname = get_restart_filename(file_handle%stage_metadata, 1, 1)
ncFileID%fname = fname
call nc_check(nf90_create(path=fname, cmode=createmode, ncid=ncFileID%ncid), &
'initialize_single_file_io', 'create '//trim(fname))
ncFileID%ncid = ncFileID%ncid
! Define the dimensions
call nc_check(nf90_def_dim(ncid=ncFileID%ncid, &
name="metadatalength", len=metadata_length, dimid=MetaDataDimID), &
'initialize_single_file_io', 'def_dim metadatalength '//trim(fname))
call nc_check(nf90_def_dim(ncid=ncFileID%ncid, &
name="member", len=num_output_ens, dimid=MemberDimID), &
'initialize_single_file_io', 'def_dim member '//trim(fname))
call nc_check(nf90_def_dim(ncid=ncFileID%ncid, name="time", &
len=nf90_unlimited, dimid=TimeDimID), &
'initialize_single_file_io', 'def_dim time '//trim(fname))
!----------------------------------------------------------------------------
! Find dimensions of namelist file ... will save it as a variable.
!----------------------------------------------------------------------------
! All DART programs require input.nml, so it is unlikely this can fail, but
! still check and in this case, error out if not found.
call find_textfile_dims("input.nml", nlines, linelen)
if (nlines <= 0 .or. linelen <= 0) then
call error_handler(E_MSG,'initialize_single_file_io', &
'cannot open/read input.nml to save in diagnostic file', source)
endif
allocate(textblock(nlines))
textblock = ''
call nc_check(nf90_def_dim(ncid=ncFileID%ncid, &
name="NMLlinelen", len=LEN(textblock(1)), dimid=linelenDimID), &
'initialize_single_file_io', 'def_dim NMLlinelen '//trim(fname))
call nc_check(nf90_def_dim(ncid=ncFileID%ncid, &
name="NMLnlines", len=nlines, dimid=nlinesDimID), &
'initialize_single_file_io', 'def_dim NMLnlines '//trim(fname))
!----------------------------------------------------------------------------
! Write Global Attributes
!----------------------------------------------------------------------------
call nc_check(nf90_put_att(ncFileID%ncid, NF90_GLOBAL, "title", fname), &
'initialize_single_file_io', 'put_att title '//trim(fname))
call nc_check(nf90_put_att(ncFileID%ncid, NF90_GLOBAL, "assim_model_source", source ), &
'initialize_single_file_io', 'put_att assim_model_source '//trim(fname))
! Metadata for each Copy
call nc_check(nf90_def_var(ncid=ncFileID%ncid, name="MemberMetadata", xtype=nf90_char, &
dimids=(/ MetaDataDimID, MemberDimID /), varid=metadataVarID), &
'initialize_single_file_io', 'def_var MemberMetadata')
call nc_check(nf90_put_att(ncFileID%ncid, metadataVarID, "long_name", &
"Metadata for each copy/member"), 'initialize_single_file_io', 'put_att long_name')
! Input namelist
call nc_check(nf90_def_var(ncid=ncFileID%ncid,name="inputnml", xtype=nf90_char, &
dimids = (/ linelenDimID, nlinesDimID /), varid=nmlVarID), &
'initialize_single_file_io', 'def_var inputnml')
call nc_check(nf90_put_att(ncFileID%ncid, nmlVarID, "long_name", &
"input.nml contents"), 'initialize_single_file_io', 'put_att input.nml')
! Time -- the unlimited dimension
call nc_check(nf90_def_var(ncFileID%ncid, name="time", xtype=nf90_double, dimids=TimeDimID, &
varid =TimeVarID), 'initialize_single_file_io', 'def_var time' )
ret = nc_write_calendar_atts(ncFileID, TimeVarID) ! comes from time_manager_mod
if ( ret /= 0 ) then
write(msgstring, *)'nc_write_calendar_atts bombed with error ', ret
call error_handler(E_MSG,'initialize_single_file_io',msgstring,source)
endif
! Create the time "mirror" with a static length. There is another routine
! to increase it if need be. For now, just pick something.
ncFileID%Ntimes = 0
ncFileID%NtimesMAX = 1000
allocate(ncFileID%rtimes(ncFileID%NtimesMAX), ncFileID%times(ncFileID%NtimesMAX) )
!----------------------------------------------------------------------------
! Leave define mode so we can fill
!----------------------------------------------------------------------------
call nc_check(nf90_enddef(ncFileID%ncid), 'initialize_single_file_io', 'enddef '//trim(fname))
call nc_check(nf90_sync(ncFileID%ncid), 'initialize_single_file_io', 'sync '//trim(fname))
!----------------------------------------------------------------------------
! Define the model-specific components
!----------------------------------------------------------------------------
! for single file io, we are assuming a single domain, no lower order models
! have multiple domains.
domain = 1
call nc_write_model_atts( ncFileID%ncid, domain)
if ( .not. local_model_mod_will_write_state_variables ) then
call write_model_attributes(ncFileID, MemberDimID, TimeDimID, file_handle)
endif
!----------------------------------------------------------------------------
! Create variables and attributes.
! The locations are part of the model (some models have multiple grids).
! They are written by model_mod:nc_write_model_atts
!----------------------------------------------------------------------------
ens_size = ens_handle%num_copies - ens_handle%num_extras
do icopy = ens_size+1, ens_handle%num_copies
if ( file_handle%stage_metadata%io_flag(icopy) == WRITE_COPY ) then
call write_extra_attributes( ncFileID, TimeDimID, file_handle, icopy )
endif
enddo
!----------------------------------------------------------------------------
! Fill the coordinate variables.
! Write the input namelist as a netCDF variable.
! The time variable is filled as time progresses.
!----------------------------------------------------------------------------
call nc_check(nf90_put_var(ncFileID%ncid, metadataVarID, state_meta ), &
'initialize_single_file_io', 'put_var MetaDataVarID')
call file_to_text("input.nml", textblock)
call nc_check(nf90_put_var(ncFileID%ncid, nmlVarID, textblock ), &
'initialize_single_file_io', 'put_var nmlVarID')
deallocate(textblock)
!----------------------------------------------------------------------------
! sync to disk, but leave open
!----------------------------------------------------------------------------
call nc_check(nf90_sync(ncFileID%ncid), 'initialize_single_file_io', 'sync '//trim(fname))
!----------------------------------------------------------------------------
! sync again, but still leave open
!----------------------------------------------------------------------------
call nc_check(nf90_sync(ncFileID%ncid), 'initialize_single_file_io', 'sync '//trim(fname))
file_handle%stage_metadata%ncFileID = ncFileID
endif
! Broadcast the value of model_mod_will_write_state_variables to every task
! This keeps track of whether the model_mod or dart code will write state_variables.
call broadcast_flag(local_model_mod_will_write_state_variables, 0)
ncFileID%model_mod_will_write_state_variables = local_model_mod_will_write_state_variables
file_handle%singlefile_initialized = .true.
end subroutine initialize_single_file_io
!-------------------------------------------------------------------------------
!>
subroutine finalize_single_file_io(file_handle)
type(file_info_type), intent(in) :: file_handle
type(netcdf_file_type) :: ncFileID
integer :: ierr
ncFileID = file_handle%stage_metadata%ncFileID
if (my_task_id()==0) then
ierr = nf90_close(ncFileID%ncid)
call nc_check(ierr, 'finalize_single_file_io: nf90_close', ncFileID%fname)
if(associated(ncFileID%rtimes)) deallocate(ncFileID%rtimes, ncFileID%times )
endif
ncFileID%fname = "notinuse"
ncFileID%ncid = -1
ncFileID%Ntimes = -1
ncFileID%NtimesMax = -1
call end_diagnostic_structure()
end subroutine finalize_single_file_io
!-------------------------------------------------------
!> read a single netcdf file containing all of the members
!> and possibly inflation information
subroutine read_single_file(state_ens_handle, file_handle, use_time_from_file, mtime, pert_from_single_copy)
type(ensemble_type), intent(inout) :: state_ens_handle !! ensemble handle to store data
type(file_info_type), intent(in) :: file_handle !! file handle for file names
logical, intent(in) :: use_time_from_file !! read time from file
type(time_type), intent(inout) :: mtime !! external time
logical, optional, intent(in) :: pert_from_single_copy !! reading single file and perturbing
! NetCDF IO variables
integer :: my_ncid, varid, TimeDimID, MemDimID, ret
character (len=NF90_MAX_NAME ) :: fname, varname, copyname
integer, dimension(NF90_MAX_VAR_DIMS) :: dim_lengths
integer, dimension(NF90_MAX_VAR_DIMS) :: dim_start_point
integer :: icopy, ivar, domain
integer :: ens_size, extra_size, time_size, ndims
integer :: my_pe, recv_pe, recv_start, recv_end, start_rank
integer :: send_start, send_end
logical :: do_perturb, is_sender, is_receiver, is_extra_copy
integer(i8) :: var_size, elm_count, start_pos, end_pos, start_point
real(r8), allocatable :: var_block(:)
do_perturb = .false.
if ( present(pert_from_single_copy) ) then
do_perturb = pert_from_single_copy
endif
! if perturbing we only need to read the first ensemble member
if ( do_perturb ) ens_size = 1
! grab ensemble size and number of extra copies
ens_size = state_ens_handle%num_copies - state_ens_handle%num_extras
extra_size = state_ens_handle%num_extras
!>@todo : only a single domain for single file read supported. need
!> to consider case for multiple domains.
domain = 1
fname = get_restart_filename(file_handle%stage_metadata, 1, domain)
!>@todo Check time consistency across files? This is assuming they are consistent.
! read time from input file if time not set in namelist
if( use_time_from_file ) mtime = read_model_time(fname)
state_ens_handle%time = mtime
ret = nf90_open(fname, NF90_NOWRITE, my_ncid)
call nc_check(ret, 'read_single_file: nf90_open', fname)
ret = nf90_inq_dimid(my_ncid, "member", MemDimID)
if (ret /= NF90_NOERR) then
call error_handler(E_ERR,'direct_netcdf_mod:', &
'If using single_file_in/single_file_out = .true. ', &
source, text2='you must have a member dimension in your input/output file.')
endif
ret = nf90_inq_dimid(my_ncid, "time", TimeDimID)
call nc_check(ret, 'read_single_file', 'inq_varid time : '//trim(fname))
ret = nf90_inquire_dimension(my_ncid, TimeDimID, len=time_size)
call nc_check(ret, 'read_single_file', 'inquire_dimension time '//trim(fname))
! mpi task variables
my_pe = my_task_id()
is_sender = (my_pe == SINGLE_IO_TASK_ID)
! recv_* and send_* are PE's that variables are sent and received
call get_pe_loops(ens_size, recv_start, recv_end, send_start, send_end)
call check_singlefile_member_info(my_ncid, fname, ens_size, do_perturb)
COPY_LOOP: do icopy = 1, ens_size+extra_size
! only SINGLE_IO_TASK_ID reads and distributes data
! extra copies into ensemble handle, this could include
! {variable}_{mean,sd}
! {variable}_priorinf_{mean,sd}
! {variable}_postinf_{mean,sd}
if ( file_handle%stage_metadata%io_flag(icopy) /= READ_COPY ) cycle
is_extra_copy = (icopy > ens_size)
! check that copy infomation is valid
! starting position in the copies array
start_pos = 1
VAR_LOOP: do ivar = 1, get_num_variables(domain)
var_size = get_variable_size(domain, ivar)
varname = get_variable_name(domain, ivar)
! work out netcdf dimension id's and lenghts
call get_dimension_info(icopy, domain, ivar, time_size, is_extra_copy, &
ndims, dim_start_point, dim_lengths)
! read variable from SINGLE_IO_TASK_ID
if ( is_sender ) then
allocate(var_block(var_size))
! append the copyname to the variable (ex. ps_mean)
if ( is_extra_copy ) then
copyname = get_copy_name(file_handle,icopy)
write(varname,'(a,"_",a)') trim(varname), trim(copyname)
endif
! get the variable id from the copy we are reading
ret = nf90_inq_varid(my_ncid, varname, varid)
call nc_check(ret, 'read_single_file', 'inq_varid '//trim(varname)//' : '//trim(fname))
! if it is an ensemble member we are just reading one member from one variable
ret = nf90_get_var(my_ncid, varid, var_block, &
count=dim_lengths(1:ndims), &
start=dim_start_point(1:ndims))
call nc_check(ret, 'read_single_file', 'get_var '//trim(varname)//' : '//trim(fname))
endif
!>@todo FIXME : just reading and distributing one variable at a time.
!> Should probably do this in a block?
!start_rank = get_start_rank(start_var, domain)
! this is where we left off writing variables in the case of multiple tasks
start_rank = get_start_rank(ivar, domain)
RECEIVING_PE_LOOP: do recv_pe = 0, task_count()-1
! work out the start in var_block on the receiving pe.
! elm_count is the sum of variables that have been read.
elm_count = num_elements_on_pe(recv_pe, start_rank, var_size)
end_pos = start_pos + elm_count - 1 ! ending position in the copies array
! work out the start in var_block corresponding to the receiving pe
start_point = find_start_point(recv_pe, start_rank)
is_receiver = (my_pe == recv_pe)
if ( is_receiver ) then
if ( is_sender ) then ! just copy directly into copies
! non-contiguous array. directly copying into the copies array
! since no send required.
state_ens_handle%copies(icopy, start_pos:end_pos ) = &
var_block(start_point:elm_count*task_count():task_count())
else ! post receive
call wait_to_receive(state_ens_handle, SINGLE_IO_TASK_ID, icopy, &
start_pos, end_pos)
endif ! is_sender
! update starting point in the state vector
start_pos = start_pos + elm_count
elseif ( is_sender ) then ! send variables to receivers
call send_to_waiting_task(state_ens_handle, recv_pe, start_point, &
elm_count, var_size, var_block)
endif ! is_receiver
enddo RECEIVING_PE_LOOP
if( is_sender ) then
deallocate(var_block)
endif
enddo VAR_LOOP
enddo COPY_LOOP
ret = nf90_close(my_ncid)
call nc_check(ret, 'read_single_file: nf90_close', fname)
end subroutine read_single_file
!-----------------------------------------------------------
!> write all variable to a single file including all member,
!> and optionally inflation, mean, sd
subroutine write_single_file(ens_handle, file_handle)
type(ensemble_type), intent(inout) :: ens_handle
type(file_info_type), intent(inout) :: file_handle
! Local variables
integer :: member_index, copy_index, my_ncid
integer :: num_output_ens, ens_size
integer(i8) :: model_size
character(len=128) :: copyname
type(time_type) :: curr_ens_time
type(netcdf_file_type) :: ncFileID
real(r8), allocatable :: temp_ens(:)
integer :: timeindex
integer :: is1, id1
! Assumes that mean and spread have already been computed
! make sure vars is up-to-date
call allocate_vars(ens_handle)
call all_copies_to_all_vars(ens_handle)
model_size = get_model_size()
! SINGLE_IO_TASK_ID needs some space
if (my_task_id() == SINGLE_IO_TASK_ID) then
allocate(temp_ens(model_size))
else
allocate(temp_ens(1))
endif
! SINGLE_IO_TASK_ID writes out all files
if (my_task_id() == SINGLE_IO_TASK_ID) then
curr_ens_time = ens_handle%current_time
ncFileID = file_handle%stage_metadata%ncFileID
timeindex = nc_get_tindex(ncFileID, curr_ens_time)
if ( timeindex < 0 ) then
call get_time(curr_ens_time,is1,id1)
write(msgstring,*)'model time (d,s)',id1,is1,' not in ',ncFileID%fname
write(msgstring,'(''model time (d,s) ('',i8,i5,'') is index '',i6, '' in ncFileID '',i10)') &
id1,is1,timeindex,ncFileID%ncid
call error_handler(E_ERR,'write_model_variables', msgstring, source)
endif
call get_time(curr_ens_time,is1,id1)
write(msgstring,'(''model time (d,s) ('',i8,i5,'') is index '',i6, '' in ncFileID '',i10)') &
id1,is1,timeindex,ncFileID%ncid
call error_handler(E_DBG,'write_model_variables', msgstring, source)
my_ncid = ncFileID%ncid
endif
num_output_ens = noutput_state_variables(file_handle)
! Output ensemble members
do member_index = 1, num_output_ens
call get_copy(map_task_to_pe(ens_handle, 0), ens_handle, member_index, temp_ens)
if(my_task_id() == SINGLE_IO_TASK_ID) &
call write_model_variables(ncFileID, temp_ens, member_index, curr_ens_time, timeindex)
enddo
! Output Extras
ens_size = ens_handle%num_copies - ens_handle%num_extras
do copy_index = ens_size+1, ens_handle%num_copies
if ( file_handle%stage_metadata%io_flag(copy_index) == WRITE_COPY ) then
copyname = get_copy_name(file_handle, copy_index)
call get_copy(map_task_to_pe(ens_handle, 0), ens_handle, copy_index, temp_ens)
if(my_task_id() == SINGLE_IO_TASK_ID) &
call write_extra_variables(ncFileID, temp_ens, copyname, curr_ens_time, timeindex)
endif
enddo
if (my_task_id() == SINGLE_IO_TASK_ID) then ! SINGLE_IO_TASK_ID writes out all files
!>@ todo FIXME : This sync is not necessary but ensures that all of the variable
!> information gets written out if filter happens to crash in the middle of a
!> run. This may slow down the code for longer runs.
call nc_check(nf90_sync(my_ncid), 'write_single_file', 'nf90_sync')
file_handle%stage_metadata%ncFileID = ncFileID
endif
deallocate(temp_ens)
end subroutine write_single_file
!-----------------------------------------------------------
!> insert the mean and sd into the input file if the user requests
!> this assumes that the files has already been closed and the
!> variables read in.
subroutine write_augmented_state(ens_handle, file_handle)
type(ensemble_type), intent(inout) :: ens_handle
type(file_info_type), intent(inout) :: file_handle
! Local variables
integer :: copy_index, ens_size, domain
integer :: TimeDimID, my_ncid, ret
integer(i8) :: model_size
type(time_type) :: curr_ens_time
type(netcdf_file_type) :: ncFileID
real(r8), allocatable :: temp_ens(:)
character(len=256) :: fname, copyname
! assumes that mean and spread have already been computed
! make sure vars is up-to-date
call allocate_vars(ens_handle)
call all_copies_to_all_vars(ens_handle)
model_size = get_model_size()
! SINGLE_IO_TASK_ID needs some space
if (my_task_id() == SINGLE_IO_TASK_ID) then
allocate(temp_ens(model_size))
else
allocate(temp_ens(1))
endif
curr_ens_time = ens_handle%current_time
!>@todo : only a single domain for single file read supported.
!> need to consider case for multiple domains.
domain = 1
if (my_task_id() == SINGLE_IO_TASK_ID) then
fname = get_restart_filename(file_handle%stage_metadata, 1, domain)
ret = nf90_open(fname, NF90_WRITE, my_ncid)
call nc_check(ret, 'write_augmented_state: nf90_open', fname)
ret = nf90_inq_dimid(my_ncid, "time", TimeDimID)
call nc_check(ret, 'write_augmented_state', 'inq_varid time : '//trim(fname))
ncFileID%ncid = my_ncid
ncFileID%fname = fname
endif
! Output Mean and SD
ens_size = ens_handle%num_copies - ens_handle%num_extras
! all tasks must execute this loop in order to get_copy from all tasks
do copy_index = ens_size+1, ens_handle%num_copies
if ( file_handle%stage_metadata%io_flag(copy_index) == WRITE_COPY ) then
call get_copy(map_task_to_pe(ens_handle, 0), ens_handle, copy_index, temp_ens)
if(my_task_id() == SINGLE_IO_TASK_ID) then
copyname = get_copy_name(file_handle, copy_index)
call write_extra_attributes( ncFileID, TimeDimID, file_handle, copy_index)
call write_extra_variables( ncFileID, temp_ens, copyname, curr_ens_time, 1)
endif
endif
enddo
if (my_task_id() == SINGLE_IO_TASK_ID) then
ret = nf90_close(my_ncid)
call nc_check(ret, 'write_augmented_state: nf90_close', fname)
endif
deallocate(temp_ens)
end subroutine write_augmented_state
!-------------------------------------------------------------------------------
!> Read in variables from start_var to end_var
!> Read the latest time slice in the file
subroutine read_variables(ncfile_in, var_block, start_var, end_var, domain)
integer, intent(in) :: ncfile_in
real(r8), intent(inout) :: var_block(:)
integer, intent(in) :: start_var
integer, intent(in) :: end_var
integer, intent(in) :: domain
integer :: i
integer(i8) :: istart, iend
integer(i8) :: var_size
integer :: num_dims
integer, allocatable :: counts(:)
integer, allocatable :: slice_start(:) ! slice of variable
integer :: ret, var_id, unlim_dimID
logical :: missing_possible
missing_possible = get_missing_ok_status()
istart = 1
do i = start_var, end_var
var_size = get_variable_size(domain, i)
iend = istart + var_size - 1
! number of dimensions and length the variable
num_dims = get_io_num_dims(domain, i)
allocate(counts(num_dims))
allocate(slice_start(num_dims))
counts(:) = 1
slice_start(:) = 1 ! default to read all dimensions start at 1
if (has_unlimited_dim(domain)) then
counts(num_dims) = 1 ! one slice of unlimited dimesion
counts(1:num_dims-1) = get_dim_lengths(domain, i) ! the state
! read latest time slice - hack to get started with tiegcm
! not sure if it will always be the last time slice
ret = nf90_inquire(ncfile_in, unlimitedDimID=unlim_dimID)
call nc_check(ret, 'read_variables: nf90_inquire', 'unlimitedDimID')
if (unlim_dimID /= -1) then ! unlimited dimension exists
ret = nf90_inquire_dimension(ncfile_in, unlim_dimID, len=slice_start(num_dims))
call nc_check(ret, 'read_variables: nf90_inquire_dimension', 'unlimitedDim length')
if (slice_start(num_dims) == 0) slice_start(num_dims) = 1 ! newly created file
else ! file does not have an unlimited dimension because it was created by DART
slice_start(num_dims) = 1
endif
else
counts(1:get_num_dims(domain,i)) = get_dim_lengths(domain, i) ! the state
endif
ret = nf90_inq_varid(ncfile_in, get_variable_name(domain, i), var_id)
call nc_check(ret, 'read_variables: nf90_inq_varid',trim(get_variable_name(domain,i)) )
ret = nf90_get_var(ncfile_in, var_id, var_block(istart:iend), count=counts, start=slice_start)
call nc_check(ret, 'read_variables: nf90_get_var',trim(get_variable_name(domain,i)) )
if (missing_possible) call set_dart_missing_value(var_block(istart:iend), domain, i)
istart = istart + var_size
deallocate(counts, slice_start)
enddo
end subroutine read_variables
!=================================================
! HELPER ROUTINES
!=================================================
!-------------------------------------------------
!> Single processor version of read_transpose. Reads ens_size whole vectors from
!> netcdf files and fills up a row of %copies for each file.
subroutine read_transpose_single_task(state_ens_handle, name_handle, domain, dart_index)
type(ensemble_type), intent(inout) :: state_ens_handle
type(stage_metadata_type), intent(in) :: name_handle
integer, intent(in) :: domain
integer(i8), intent(inout) :: dart_index !< This is for multiple domains
real(r8), allocatable :: vector(:)
integer :: ncfile !< netcdf input file identifier
character(len=256) :: netcdf_filename
integer(i8) :: block_size, istart, iend
integer :: copy , start_var
istart = dart_index ! position in state_ens_handle%vars
block_size = 0
! need to read into a tempory array, then fill up copies
allocate(vector(get_domain_size(domain)))
COPIES: do copy = 1, state_ens_handle%my_num_copies
start_var = 1 ! read first variable first
! open netcdf file
if (query_read_copy(name_handle, copy)) then
netcdf_filename = get_restart_filename(name_handle, copy, domain)
ret = nf90_open(netcdf_filename, NF90_NOWRITE, ncfile)
call nc_check(ret, 'read_transpose_single_task: opening', netcdf_filename)
endif
block_size = get_domain_size(domain)
iend = istart + block_size -1
if (query_read_copy(name_handle, copy)) then
call read_variables(ncfile, vector, 1, get_num_variables(domain), domain)
! close netcdf file
ret = nf90_close(ncfile)
call nc_check(ret, 'read_transpose_single_task: closing', netcdf_filename)
state_ens_handle%copies(copy, istart:iend) = vector
endif
enddo COPIES
! update starting point
istart = istart + block_size
dart_index = istart
deallocate(vector)
end subroutine read_transpose_single_task
!-------------------------------------------------
!> Single processor version of transpose write. Takes copies array one row
!> at a time and writes copy to a netcdf file.
subroutine transpose_write_single_task(state_ens_handle, name_handle, domain, &
dart_index, write_single_precision)
type(ensemble_type), intent(inout) :: state_ens_handle
type(stage_metadata_type), intent(in) :: name_handle
integer, intent(in) :: domain
integer(i8), intent(inout) :: dart_index
logical, intent(in) :: write_single_precision
! netcdf variables
integer :: ncfile_out
character(len=256) :: netcdf_filename_out
real(r8), allocatable :: vector(:)
integer(i8) :: block_size , istart, iend
integer :: copy , start_var, end_var
integer :: time_owner, time_owner_index
logical :: clamp_vars, force_copy
type(time_type) :: dart_time
! need to read into a tempory array to fill with one copies
allocate(vector(get_domain_size(domain)))
istart = dart_index ! position in state_ens_handle%vars
block_size = 0
! need to read into a temporary array, then fill up copies
COPIES: do copy = 1, state_ens_handle%my_num_copies