24 integer(I4B),
dimension(:),
pointer :: model_proc_ids
27 type(
vdcptrtype),
dimension(:),
pointer :: all_models => null()
28 type(
vdcptrtype),
dimension(:),
pointer :: all_exchanges => null()
29 type(
vdcptrtype),
dimension(:),
pointer :: rte_models => null()
30 type(
vdcptrtype),
dimension(:),
pointer :: rte_exchanges => null()
35 logical(LGP) :: enable_monitor
36 integer(I4B),
dimension(:, :),
allocatable :: tmr_mpi_wait
75 integer(I4B) :: nr_models, nr_exchanges
77 character(len=LINELENGTH) :: monitor_file
80 allocate (this%tmr_mpi_wait(
nr_sim_stages, this%nr_virt_solutions + 1))
81 this%tmr_mpi_wait = -1
84 this%halo_activated = .false.
87 this%enable_monitor = .false.
90 call this%message_builder%init()
91 call this%msg_cache%init()
97 call this%senders%init()
98 call this%receivers%init()
103 allocate (this%model_proc_ids(nr_models))
104 allocate (this%all_models(nr_models))
105 allocate (this%all_exchanges(nr_exchanges))
109 this%all_models(i)%ptr => vdc
110 if (vdc%is_local)
then
111 this%model_proc_ids(i) =
proc_id
113 this%model_proc_ids(i) = 0
117 call mpi_allreduce(mpi_in_place, this%model_proc_ids, nr_models, &
118 mpi_integer, mpi_sum, this%mpi_world%comm, ierr)
124 call vdc%set_orig_rank(this%model_proc_ids(i))
127 do i = 1, nr_exchanges
129 this%all_exchanges(i)%ptr => vdc
130 select type (vex => vdc)
132 call vex%set_orig_rank(vex%v_model1%orig_rank)
133 if (vex%v_model1%is_local)
then
134 call vex%set_orig_rank(vex%v_model2%orig_rank)
140 if (this%enable_monitor)
then
142 write (monitor_file,
'(a,i0,a)')
"mpi.p",
proc_id,
".log"
143 open (unit=this%imon, file=monitor_file)
144 call this%message_builder%set_monitor(this%imon)
147 write (this%imon,
'(a,/)')
">> initialize MPI Router:"
148 write (this%imon,
'(2x,a,i0)')
"process id: ",
proc_id
149 write (this%imon,
'(2x,a,i0)')
"nr. of processes: ",
nr_procs
150 write (this%imon,
'(2x,a,i0)')
"nr. of models: ", nr_models
151 write (this%imon,
'(2x,a,i0)')
"nr. of exchanges: ", nr_exchanges
152 write (this%imon,
'(2x,2a)')
"model id, processor id:"
154 write (this%imon,
'(4x,2i8)') i, this%model_proc_ids(i)
156 write (this%imon,
'(a,/)')
"<< initialize done"
165 type(
vdcptrtype),
dimension(:),
pointer :: models
166 type(
vdcptrtype),
dimension(:),
pointer :: exchanges
168 this%rte_models => models
169 this%rte_exchanges => exchanges
170 call this%message_builder%attach_data(models, exchanges)
179 this%rte_models => null()
180 this%rte_exchanges => null()
181 call this%message_builder%release_data()
191 integer(I4B) :: stage
193 if (this%enable_monitor)
then
194 write (this%imon,
'(/,2a)')
">> routing all: ",
stg_to_str(stage)
198 call this%activate(this%all_models, this%all_exchanges)
199 call this%route_active(0, stage)
200 call this%deactivate()
202 if (this%enable_monitor)
then
203 write (this%imon,
'(a,/)')
"<< end routing all"
215 integer(I4B) :: stage
217 if (this%enable_monitor)
then
218 write (this%imon,
'(/,a,i0,2a)')
">> routing solution: ", &
219 virtual_sol%solution_id,
", ",
stg_to_str(stage)
223 call this%activate(virtual_sol%models, virtual_sol%exchanges)
224 call this%route_active(virtual_sol%solution_id, stage)
225 call this%deactivate()
227 if (this%enable_monitor)
then
228 write (this%imon,
'(a)')
"<< end routing solution"
241 integer(I4B) :: stage
245 integer :: ierr, msg_size
246 logical(LGP) :: from_cache
248 integer,
dimension(:),
allocatable :: rcv_req
249 integer,
dimension(:),
allocatable :: snd_req
250 integer,
dimension(:, :),
allocatable :: rcv_stat
253 integer,
dimension(:),
allocatable :: body_rcv_t
254 integer,
dimension(:),
allocatable :: body_snd_t
257 call this%update_senders()
258 call this%update_receivers()
261 allocate (body_rcv_t(this%senders%size))
262 allocate (body_snd_t(this%receivers%size))
265 allocate (rcv_req(this%senders%size))
266 allocate (snd_req(this%receivers%size))
267 allocate (rcv_stat(mpi_status_size, this%senders%size))
270 rcv_req = mpi_request_null
271 snd_req = mpi_request_null
273 if (this%enable_monitor)
then
274 write (this%imon,
'(2x,a,*(i3))')
"process ids sending data: ", &
275 this%senders%get_values()
276 write (this%imon,
'(2x,a,*(i3))')
"process ids receiving data: ", &
277 this%receivers%get_values()
281 from_cache = this%is_cached(unit, stage)
282 if (.not. from_cache)
then
283 call this%compose_messages(unit, stage, body_snd_t, body_rcv_t)
285 call this%load_messages(unit, stage, body_snd_t, body_rcv_t)
288 if (this%enable_monitor)
then
289 write (this%imon,
'(2x,a)')
"== communicating bodies =="
293 do i = 1, this%senders%size
294 rnk = this%senders%at(i)
295 if (this%enable_monitor)
then
296 write (this%imon,
'(4x,a,i0)')
"receiving from process: ", rnk
299 call mpi_type_size(body_rcv_t(i), msg_size, ierr)
300 if (msg_size > 0)
then
301 call mpi_irecv(mpi_bottom, 1, body_rcv_t(i), rnk, stage, &
302 this%mpi_world%comm, rcv_req(i), ierr)
306 if (this%enable_monitor)
then
307 write (this%imon,
'(6x,a,i0)')
"message body size: ", msg_size
312 do i = 1, this%receivers%size
313 rnk = this%receivers%at(i)
314 if (this%enable_monitor)
then
315 write (this%imon,
'(4x,a,i0)')
"sending to process: ", rnk
318 call mpi_type_size(body_snd_t(i), msg_size, ierr)
319 if (msg_size > 0)
then
320 call mpi_isend(mpi_bottom, 1, body_snd_t(i), rnk, stage, &
321 this%mpi_world%comm, snd_req(i), ierr)
325 if (this%enable_monitor)
then
326 write (this%imon,
'(6x,a,i0)')
"message body size: ", msg_size
328 call flush (this%imon)
333 this%tmr_mpi_wait(stage, unit + 1))
334 call mpi_waitall(this%senders%size, rcv_req, rcv_stat, ierr)
335 call g_prof%stop(this%tmr_mpi_wait(stage, unit + 1))
338 deallocate (rcv_req, snd_req, rcv_stat)
339 deallocate (body_rcv_t, body_snd_t)
379 integer(I4B) :: stage
380 integer,
dimension(:) :: body_snd_t
381 integer,
dimension(:) :: body_rcv_t
383 integer(I4B) :: i, j, k
387 integer,
dimension(:),
allocatable :: rcv_req
388 integer,
dimension(:),
allocatable :: snd_req
389 integer,
dimension(:, :),
allocatable :: rcv_stat
391 integer(I4B) :: max_headers
393 integer,
dimension(:),
allocatable :: hdr_rcv_t
394 integer,
dimension(:),
allocatable :: hdr_snd_t
395 integer,
dimension(:),
allocatable :: hdr_rcv_cnt
398 integer,
dimension(:),
allocatable :: map_rcv_t
399 integer,
dimension(:),
allocatable :: map_snd_t
402 allocate (rcv_req(this%receivers%size))
403 allocate (snd_req(this%senders%size))
404 allocate (rcv_stat(mpi_status_size, this%receivers%size))
407 rcv_req = mpi_request_null
408 snd_req = mpi_request_null
411 max_headers =
size(this%rte_models) +
size(this%rte_exchanges)
412 allocate (hdr_rcv_t(this%receivers%size))
413 allocate (hdr_snd_t(this%senders%size))
414 allocate (headers(max_headers, this%receivers%size))
415 allocate (hdr_rcv_cnt(this%receivers%size))
418 allocate (map_snd_t(this%senders%size))
419 allocate (map_rcv_t(this%receivers%size))
420 allocate (rcv_maps(max_headers, this%receivers%size))
422 if (this%enable_monitor)
then
423 write (this%imon,
'(2x,a)')
"== communicating headers =="
427 do i = 1, this%receivers%size
428 rnk = this%receivers%at(i)
429 if (this%enable_monitor)
then
430 write (this%imon,
'(4x,a,i0)')
"Ireceive header from process: ", rnk
432 call this%message_builder%create_header_rcv(hdr_rcv_t(i))
433 call mpi_irecv(headers(:, i), max_headers, hdr_rcv_t(i), rnk, stage, &
434 this%mpi_world%comm, rcv_req(i), ierr)
439 do i = 1, this%senders%size
440 rnk = this%senders%at(i)
441 if (this%enable_monitor)
then
442 write (this%imon,
'(4x,a,i0)')
"send header to process: ", rnk
444 call this%message_builder%create_header_snd(rnk, stage, hdr_snd_t(i))
445 call mpi_isend(mpi_bottom, 1, hdr_snd_t(i), rnk, stage, &
446 this%mpi_world%comm, snd_req(i), ierr)
452 this%tmr_mpi_wait(stage, unit + 1))
453 call mpi_waitall(this%receivers%size, rcv_req, rcv_stat, ierr)
454 call g_prof%stop(this%tmr_mpi_wait(stage, unit + 1))
458 rcv_req = mpi_request_null
459 snd_req = mpi_request_null
462 do i = 1, this%receivers%size
463 call mpi_get_count(rcv_stat(:, i), hdr_rcv_t(i), hdr_rcv_cnt(i), ierr)
465 if (this%enable_monitor)
then
466 rnk = this%senders%at(i)
467 write (this%imon,
'(4x,a,i0)')
"received headers from process: ", rnk
468 write (this%imon,
'(6x,a)')
"expecting data for:"
469 do j = 1, hdr_rcv_cnt(i)
470 write (this%imon,
'(6x,a,i0,a,a)')
"id: ", headers(j, i)%id, &
472 write (this%imon,
'(6x,a,99i6)')
"map sizes: ", headers(j, i)%map_sizes
478 do i = 1, this%receivers%size
479 call mpi_type_free(hdr_rcv_t(i), ierr)
481 do i = 1, this%senders%size
482 call mpi_type_free(hdr_snd_t(i), ierr)
485 if (this%enable_monitor)
then
486 write (this%imon,
'(2x,a)')
"== communicating maps =="
490 do i = 1, this%receivers%size
491 do j = 1, hdr_rcv_cnt(i)
492 call rcv_maps(j, i)%create(headers(j, i)%map_sizes)
497 do i = 1, this%receivers%size
498 rnk = this%receivers%at(i)
499 if (this%enable_monitor)
then
500 write (this%imon,
'(4x,a,i0)')
"Ireceive maps from process: ", rnk
503 call this%message_builder%create_map_rcv(rcv_maps(:, i), hdr_rcv_cnt(i), &
505 call mpi_irecv(mpi_bottom, 1, map_rcv_t(i), rnk, stage, &
506 this%mpi_world%comm, rcv_req(i), ierr)
511 do i = 1, this%senders%size
512 rnk = this%senders%at(i)
513 if (this%enable_monitor)
then
514 write (this%imon,
'(4x,a,i0)')
"send map to process: ", rnk
517 call this%message_builder%create_map_snd(rnk, stage, map_snd_t(i))
518 call mpi_isend(mpi_bottom, 1, map_snd_t(i), rnk, stage, &
519 this%mpi_world%comm, snd_req(i), ierr)
525 this%tmr_mpi_wait(stage, unit + 1))
526 call mpi_waitall(this%receivers%size, rcv_req, rcv_stat, ierr)
527 call g_prof%stop(this%tmr_mpi_wait(stage, unit + 1))
531 if (this%enable_monitor)
then
532 do i = 1, this%receivers%size
533 rnk = this%receivers%at(i)
534 write (this%imon,
'(4x,a,i0)')
"received maps from process: ", rnk
535 do j = 1, hdr_rcv_cnt(i)
536 write (this%imon,
'(6x,a,i0,a,a)')
"id: ", headers(j, i)%id, &
539 write (this%imon,
'(8x,i0, a,i0)') k,
" nr. elements: ", &
540 rcv_maps(j, i)%el_maps(k)%nr_virt_elems
541 if (rcv_maps(j, i)%el_maps(k)%nr_virt_elems > 0)
then
542 write (this%imon,
'(8x,*(i6))') &
543 rcv_maps(j, i)%el_maps(k)%remote_elem_shift
551 do i = 1, this%receivers%size
552 call mpi_type_free(map_rcv_t(i), ierr)
554 do i = 1, this%senders%size
555 call mpi_type_free(map_snd_t(i), ierr)
558 if (this%enable_monitor)
then
559 write (this%imon,
'(2x,a)')
"== composing message bodies =="
563 do i = 1, this%senders%size
564 rnk = this%senders%at(i)
565 if (this%enable_monitor)
then
566 write (this%imon,
'(4x,a,i0)')
"build recv body for process: ", rnk
569 call this%message_builder%create_body_rcv(rnk, stage, body_rcv_t(i))
570 call this%msg_cache%put(unit, rnk, stage,
mpi_bdy_rcv, body_rcv_t(i))
574 do i = 1, this%receivers%size
575 rnk = this%receivers%at(i)
576 if (this%enable_monitor)
then
577 write (this%imon,
'(4x,a,i0)')
"build send body for process: ", rnk
580 call this%message_builder%create_body_snd( &
581 rnk, stage, headers(1:hdr_rcv_cnt(i), i), &
582 rcv_maps(:, i), body_snd_t(i))
583 call this%msg_cache%put(unit, rnk, stage,
mpi_bdy_snd, body_snd_t(i))
587 do i = 1, this%receivers%size
588 do j = 1, hdr_rcv_cnt(i)
589 call rcv_maps(j, i)%destroy()
593 deallocate (rcv_req, snd_req, rcv_stat)
594 deallocate (hdr_rcv_t, hdr_snd_t, hdr_rcv_cnt)
596 deallocate (map_rcv_t, map_snd_t)
597 deallocate (rcv_maps)
606 integer(I4B) :: stage
607 integer,
dimension(:),
allocatable :: body_snd_t
608 integer,
dimension(:),
allocatable :: body_rcv_t
610 integer(I4B) :: i, rnk
612 if (this%enable_monitor)
then
613 write (this%imon,
'(2x,a)')
"... running from cache ..."
616 do i = 1, this%receivers%size
617 rnk = this%receivers%at(i)
618 body_snd_t(i) = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_snd)
620 do i = 1, this%senders%size
621 rnk = this%senders%at(i)
622 body_rcv_t(i) = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_rcv)
633 call this%senders%clear()
635 do i = 1,
size(this%rte_models)
636 vdc => this%rte_models(i)%ptr
637 if (.not. vdc%is_local .and. vdc%is_active)
then
638 call this%senders%push_back_unique(vdc%orig_rank)
641 do i = 1,
size(this%rte_exchanges)
642 vdc => this%rte_exchanges(i)%ptr
643 if (.not. vdc%is_local .and. vdc%is_active)
then
644 call this%senders%push_back_unique(vdc%orig_rank)
656 call this%receivers%clear()
658 if (.not. this%halo_activated)
then
660 do i = 1, this%senders%size
661 call this%receivers%push_back(this%senders%at(i))
665 do i = 1,
size(this%rte_models)
666 vdc => this%rte_models(i)%ptr
667 do j = 1, vdc%rcv_ranks%size
668 call this%receivers%push_back_unique(vdc%rcv_ranks%at(j))
671 do i = 1,
size(this%rte_exchanges)
672 vdc => this%rte_exchanges(i)%ptr
673 do j = 1, vdc%rcv_ranks%size
674 call this%receivers%push_back_unique(vdc%rcv_ranks%at(j))
687 integer(I4B) :: stage
688 logical(LGP) :: in_cache
690 integer(I4B) :: i, rnk
691 integer(I4B) :: no_cache_cnt
692 integer :: cached_type
697 do i = 1, this%receivers%size
698 rnk = this%receivers%at(i)
699 cached_type = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_snd)
700 if (cached_type == no_cached_value) no_cache_cnt = no_cache_cnt + 1
702 do i = 1, this%senders%size
703 rnk = this%senders%at(i)
704 cached_type = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_rcv)
705 if (cached_type == no_cached_value) no_cache_cnt = no_cache_cnt + 1
709 if (no_cache_cnt == this%receivers%size + this%senders%size)
then
711 else if (no_cache_cnt == 0)
then
714 call ustop(
"Internal error: MPI message cache corrupt...")
722 call this%msg_cache%clear()
729 call this%msg_cache%destroy()
731 call this%senders%destroy()
732 call this%receivers%destroy()
734 deallocate (this%model_proc_ids)
735 deallocate (this%all_models)
736 deallocate (this%all_exchanges)
738 deallocate (this%tmr_mpi_wait)
This module contains simulation constants.
integer(i4b), parameter linelength
maximum length of a standard line
This module defines variable data types.
subroutine, public mem_print_detailed(iout)
integer(i4b), parameter, public mpi_bdy_snd
sending data (body) to ranks
integer(i4b), parameter, public mpi_bdy_rcv
receiving data (body) from ranks
subroutine mr_route_sln(this, virtual_sol, stage)
This will route all remote data from models and exchanges in a particular solution over MPI,...
subroutine route_active(this, unit, stage)
Routes the models and exchanges over MPI, either constructing the message bodies in a sequence of com...
subroutine compose_messages(this, unit, stage, body_snd_t, body_rcv_t)
Constructs the message bodies' MPI datatypes.
subroutine load_messages(this, unit, stage, body_snd_t, body_rcv_t)
Load the message body MPI datatypes from cache.
class(routerbasetype) function, pointer, public create_mpi_router()
Factory method to create MPI router.
logical(lgp) function is_cached(this, unit, stage)
Check if this stage is cached.
subroutine mr_route_all(this, stage)
This will route all remote data from the global models and exchanges over MPI, for a.
subroutine activate(this, models, exchanges)
Activate models and exchanges for routing.
subroutine mr_finalize(this)
subroutine deactivate(this)
Deactivate data after routing.
subroutine update_receivers(this)
subroutine update_senders(this)
subroutine mr_destroy(this)
subroutine mr_initialize(this)
type(mpiworldtype) function, pointer, public get_mpi_world()
subroutine, public check_mpi(mpi_error_code)
Check the MPI error code, report, and.
type(profilertype), public g_prof
the global timer object (to reduce trivial lines of code)
This module contains simulation methods.
subroutine, public ustop(stopmess, ioutlocal)
Stop the simulation.
subroutine, public store_error(msg, terminate)
Store an error message.
integer(i4b), parameter, public nr_sim_stages
before exchange formulate (per solution)
character(len=24) function, public stg_to_str(stage)
Converts a stage to its string representation.
This module contains simulation variables.
integer(i4b), parameter, public nr_vdc_element_maps
character(len=24) function, public vdc_type_to_str(cntr_type)
@ Converts a virtual container type to its string representation
class(virtualdatacontainertype) function, pointer, public get_vdc_from_list(list, idx)
type(listtype), public virtual_model_list
type(listtype), public virtual_exchange_list
Facility to cache the constructed MPI datatypes. This will avoid having to construct them over and ov...
Wrapper for virtual data containers.
Container (list) of virtual data items.
The Virtual Exchange is based on two Virtual Models and is therefore not always strictly local or rem...
This bundles all virtual data for a particular solution.