Source code for mpi_array.globale

"""
===================================
The :mod:`mpi_array.globale` Module
===================================

Defines :obj:`gndarray` class and factory functions for
creating multi-dimensional distributed arrays (Partitioned Global Address Space).

Classes
=======

.. autosummary::
   :toctree: generated/

   gndarray - A :obj:`numpy.ndarray` like distributed array.
   PerAxisRmaHaloUpdater - Helper class for performing ghost element updates.
   RmaRedistributeUpdater - Helper class for redistributing elements between distributions.

Functions
=========

.. autosummary::
   :toctree: generated/

   copyto - Copy elements of one array to another array.


"""

from __future__ import absolute_import

import mpi4py.MPI as _mpi
import numpy as _np
from numpy.lib.mixins import NDArrayOperatorsMixin as _NDArrayOperatorsMixin

from .license import license as _license, copyright as _copyright, version as _version
from .update import UpdatesForRedistribute as _UpdatesForRedistribute
from .update import MpiUpdatesForGet as _MpiUpdatesForGet
from .update import MpiHalosUpdate as _MpiHalosUpdate
from .update import MpiPairExtentUpdate as _MpiPairExtentUpdate
from .update import MpiPairExtentUpdateDifferentDtypes as _MpiPairExtentUpdateDifferentDtypes
from .update import RmaUpdateExecutor as _RmaUpdateExecutor
from .locale import win_lndarray as _win_lndarray
from .distribution import LocaleExtent as _LocaleExtent
from .indexing import HaloIndexingExtent as _HaloIndexingExtent

__author__ = "Shane J. Latham"
__license__ = _license()
__copyright__ = _copyright()
__version__ = _version()

_builtin_slice = slice


class CommLogger:

    """
    """

    def __init__(self, rank_logger=None, root_logger=None):
        self._rank_logger = None
        self._root_logger = None

    @property
    def rank_logger(self):
        return self._rank_logger

    @rank_logger.setter
    def rank_logger(self, logger):
        self._rank_logger = logger

    @property
    def root_logger(self):
        return self._root_logger

    @root_logger.setter
    def root_logger(self, logger):
        self._root_logger = logger


[docs]class PerAxisRmaHaloUpdater(CommLogger): """ Helper class for performing halo data transfer using RMA via MPI windows (:obj:`mpi4py.MPI.Win` objects). """ #: Halo "low index" indices. LO = _HaloIndexingExtent.LO #: Halo "high index" indices. HI = _HaloIndexingExtent.HI
[docs] def __init__(self, locale_extents, dtype, order, inter_locale_win, dst_buffer): """ Initialise. :type locale_extents: sequence of :obj:`mpi_array.distributon.LocaleExtent` :param locale_extents: :samp:`locale_extents[r]` is the extent of the array elements which reside on rank :samp:`r` of the :samp:`inter_locale_comm` communicator. :type dtype: :obj:`numpy.dtype` :param dtype: Data type of elements in array. :type order: :obj:`str` :param order: The array order, :samp:`'C'` for C memory layout. :type inter_locale_win: :obj:`mpi4py.MPI.Win` :param inter_locale_win: The window used to exchange halo element data. :type dst_buffer: :obj:`memoryview` :param dst_buffer: The buffer into which the halo elements are written. """ CommLogger.__init__(self) self._locale_extents = locale_extents self._dtype = dtype self._order = order self._inter_locale_win = inter_locale_win self._dst_buffer = dst_buffer self._halo_updates = None self._have_axis_updates = None
@property def locale_extents(self): """ Sequence of :obj:`mpi_array.distribution.LocaleExtent` objects which define the partitioning of the array. """ return self._locale_extents @property def dtype(self): """ The :obj:`numpy.dtype` of the data to be exchanged in the halo update. """ return self._dtype @property def order(self): """ Array order :obj:`str`, :samp:`'C'` for C memory layout. """ return self._order
[docs] def calc_halo_updates(self): """ Calculates the per-axis halo-region updates for all inter-locale ranks (of the :samp:`inter_locale_comm`). :rtype: :obj:`tuple` pair :return: A :samp:`(rank_2_updates_dict, bool_sequence)` pair where `rank_2_updates` is a :obj:`dict` of :samp:`{inter_locale_rank, halos_update}`, where :samp:`inter_locale_rank` is an :obj:`int` indicating the rank of the process in :samp:`inter_locale_comm` and :samp:`halos_update` is a :obj:`mpi_array.update.MpiHalosUpdate` containing the description of regions which are required to be fetched from remote processes. The :samp:`bool_sequence` is of length :attr:`ndim` and :samp:`bool_sequence[a] is True` indicates that halo updates are required on axis :samp:`a`. """ halo_updates_dict = dict() ndim = self.locale_extents[0].ndim have_axis_updates = _np.zeros((ndim, ), dtype="bool") for inter_locale_rank in range(len(self.locale_extents)): rank_inter_locale_updates = \ _MpiHalosUpdate( inter_locale_rank, self.locale_extents ) halo_updates_dict[inter_locale_rank] = rank_inter_locale_updates have_axis_updates = \ _np.logical_or( have_axis_updates, _np.array( [ (rank_inter_locale_updates.updates_per_axis[a] is not None) and ( (len(rank_inter_locale_updates.updates_per_axis[a][self.LO]) > 0) or (len(rank_inter_locale_updates.updates_per_axis[a][self.HI]) > 0) ) for a in range(ndim) ], dtype="bool" ) ) if _np.any(have_axis_updates): for a in range(ndim): if not have_axis_updates[a]: for inter_locale_rank in range(len(self.locale_extents)): halo_updates_dict[inter_locale_rank].updates_per_axis[a] = None else: halo_updates_dict = None return halo_updates_dict, have_axis_updates
@property def dst_buffer(self): """ A :obj:`memoryview` which provides the buffer into which the halo data is written. """ return self._dst_buffer @property def halo_updates(self): """ The :samp:`(rank_2_updates_dict, bool_sequence)` pair calculated by :meth:`calc_halo_updates`. """ if self._halo_updates is None: self._halo_updates, self._have_axis_updates = self.calc_halo_updates() return self._halo_updates
[docs] def update_halos(self): """ Performs the data exchange required to update the halo (ghost) elements of the array buffer :attr:`dst_buffer`:samp:`.buffer`. Can be called :samp:`peer_comm` collectively. """ self.do_update_halos(self.halo_updates)
[docs] def do_update_halos(self, halo_updates): """ Performs the data exchange required to update the halo (ghost) elements of the array buffer :attr:`dst_buffer`:samp:`.buffer`. Can be called :samp:`peer_comm` collectively. :type halo_updates: :obj:`mpi_array.update.MpiHalosUpdate` :param halo_updates: A :obj:`dict` of per :samp:`inter_locale_rank` halo region updates. See :meth:`calc_halo_updates`. """ if halo_updates is not None: # Get the halo updates for this rank rank_inter_locale_updates = halo_updates[self._inter_locale_win.group.rank] # Get the updates separated into per-axis (hyper-slab) updates rank_updates_per_axis = rank_inter_locale_updates.updates_per_axis # rank_updates_per_axis is None, on *all* inter_locale_win.group ranks, # when there are no halos on any axis. if rank_updates_per_axis is not None: for a in range(len(rank_updates_per_axis)): lo_hi_updates_pair = rank_updates_per_axis[a] # When axis doesn't have a halo then lo_hi_updates_pair # is None on all inter_locale_comm ranks, and we avoid calling the Fence # in this case if lo_hi_updates_pair is not None: axis_inter_locale_rank_updates = \ lo_hi_updates_pair[rank_inter_locale_updates.LO] + \ lo_hi_updates_pair[rank_inter_locale_updates.HI] self.rank_logger.debug( "BEG: Fence(_mpi.MODE_NOPUT | _mpi.MODE_NOPRECEDE)..." ) self._inter_locale_win.Fence( _mpi.MODE_NOPUT | _mpi.MODE_NOPRECEDE) self.rank_logger.debug( "END: Fence(_mpi.MODE_NOPUT | _mpi.MODE_NOPRECEDE)..." ) for single_update in axis_inter_locale_rank_updates: single_update.initialise_data_types(self.dtype, self.order) self.rank_logger.debug( "BEG: Getting update:\n%s\n%s", single_update._header_str, single_update ) self._inter_locale_win.Get( [self._dst_buffer, 1, single_update.dst_data_type], single_update.src_extent.cart_rank, [0, 1, single_update.src_data_type] ) self.rank_logger.debug( "END: Getting update:\n%s\n%s", single_update._header_str, single_update ) self.rank_logger.debug( "BEG: Fence(_mpi.MODE_NOSUCCEED)." ) self._inter_locale_win.Fence(_mpi.MODE_NOSUCCEED) self.rank_logger.debug( "END: Fence(_mpi.MODE_NOSUCCEED)." )
class RankTranslator(object): """ Translate ranks between two `mpi4py.MPI.Group` objects. """ def __init__(self, dst_group, src_group): """ """ object.__init__(self) self._dst_group = dst_group self._src_group = src_group def dst_to_src(self, ranks): """ Returns :samp:`mpi4py.MPI.Group.Translate_ranks(self.dst_group, ranks, self.src_group)`. """ r = _np.array(ranks, copy=True) r.ravel()[...] = _mpi.Group.Translate_ranks(self.dst_group, r.ravel(), self.src_group) return r def src_to_dst(self, ranks): """ Returns :samp:`mpi4py.MPI.Group.Translate_ranks(self.src_group, ranks, self.dst_group)`. """ r = _np.array(ranks, copy=True) r.ravel()[...] = _mpi.Group.Translate_ranks(self.src_group, r.ravel(), self.dst_group) return r @property def dst_group(self): """ A :obj:`mpi4py.MPI.Group`. """ return self._dst_group @property def src_group(self): """ A :obj:`mpi4py.MPI.Group`. """ return self._src_group
[docs]class RmaRedistributeUpdater(_UpdatesForRedistribute): """ Helper class for redistributing array to new distribution. Calculates sequence of :obj:`mpi_array.distribution.ExtentUpdate` objects which are used to copy elements from remote :samp:`{src}` locales to local :samp:`{dst}` locales. """
[docs] def __init__(self, dst, src, casting="same_kind"): """ """ self._dst = dst self._src = src self._casting = casting self._mpi_pair_extent_update_type = _MpiPairExtentUpdate self._max_outstanding_requests = 32 * 32 self._min_outstanding_requests_per_proc = 2 self._max_ranks_per_inter_locale_sub_group = 128 if self._dst.dtype != self._src.dtype: self._mpi_pair_extent_update_type = _MpiPairExtentUpdateDifferentDtypes _UpdatesForRedistribute.__init__( self, dst.comms_and_distrib.distribution, src.comms_and_distrib.distribution, peer_rank_translator=RankTranslator( self._dst.locale_comms.peer_comm.group, self._src.locale_comms.peer_comm.group ) ) self._inter_win = self._src.rma_window_buffer.peer_win self._max_outstanding_requests_per_proc = \ _np.max( ( self._min_outstanding_requests_per_proc, self._max_outstanding_requests // self._max_ranks_per_inter_locale_sub_group ) ) seed_str = str(2 ** 31)[1:] rank_str = str(self._inter_win.group.rank + 1) seed_str = rank_str + seed_str[len(rank_str):] seed_str = seed_str[0:-len(rank_str)] + rank_str[::-1] self._random_state = _np.random.RandomState(seed=int(seed_str))
[docs] def calc_can_use_existing_src_peer_comm(self): """ Returns :samp:`True` if :samp:`self._src.locale_comms.peer_comm` can be used to redistribute to the distribution of the :samp:`self._dst` array. :rtype: :obj:`bool` :return: :samp:`True` if :samp:`self._src.locale_comms.peer_comm` is a super-set of the processes of :samp:`self._dst.locale_comms.peer_comm` """ can_use_existing_src_peer_comm = self._src.locale_comms.peer_comm is not None if self._dst.locale_comms.have_valid_inter_locale_comm: if self._src.locale_comms.peer_comm != _mpi.COMM_NULL: can_use_existing_src_peer_comm = \ ( ( _mpi.Group.Intersection( self._dst.locale_comms.inter_locale_comm.group, self._src.locale_comms.peer_comm.group ).size == self._dst.locale_comms.inter_locale_comm.group.size ) ) self._dst.rank_logger.debug( "BEG: self._dst_cad.locale_comms.intra_locale_comm.allreduce...") can_use_existing_src_peer_comm = \ self._dst.locale_comms.intra_locale_comm.allreduce( can_use_existing_src_peer_comm, _mpi.BAND ) self._dst.rank_logger.debug("END: self._dst_cad.locale_comms.intra_locale_comm.allreduce.") self._dst.rank_logger.debug( "can_use_existing_src_peer_comm = %s", can_use_existing_src_peer_comm ) return can_use_existing_src_peer_comm
[docs] def create_pair_extent_update( self, dst_extent, src_extent, intersection_extent ): """ Factory method which creates sequence of of :obj:`mpi_array.distribution.MpiPairExtentUpdate` objects. """ updates = \ [ self._mpi_pair_extent_update_type( self._dst.distribution.locale_extents[dst_extent.inter_locale_rank], self._src.distribution.locale_extents[src_extent.inter_locale_rank], intersection_extent, intersection_extent ), ] for update in updates: update.initialise_data_types( dst_dtype=self._dst.dtype, src_dtype=self._src.dtype, dst_order=self._dst.lndarray_proxy.md.order, src_order=self._src.lndarray_proxy.md.order ) update.casting = self._casting return updates
[docs] def wait_all(self, req_list): """ """ self._dst.rank_logger.debug( "BEG: Waiting for outstanding rget requests, len(req_list)=%s...", len(req_list) ) _mpi.Request.Waitall(req_list) self._dst.rank_logger.debug( "END: Waiting for outstanding rget requests, len(req_list)=%s.", len(req_list) )
[docs] def do_locale_cpy2_update(self): """ Performs direct copy updates. """ updates = self._dst_cpy2_updates[self._dst.this_locale.inter_locale_rank] my_dst_peer_rank = self._dst.locale_comms.peer_comm.rank my_src_peer_rank = self._src.locale_comms.peer_comm.rank src_lndarray = self._src.lndarray_proxy.lndarray dst_lndarray = self._dst.lndarray_proxy.lndarray for update in updates: src_translated_peer_ranks = \ self._src_translated_peer_ranks[update.src_extent.inter_locale_rank] dst_translated_peer_ranks = \ self._dst_translated_peer_ranks[update.dst_extent.inter_locale_rank] if ( ( (my_src_peer_rank == update.src_extent.peer_rank) and (update.src_extent.peer_rank in dst_translated_peer_ranks) ) or ( (my_dst_peer_rank == update.dst_extent.peer_rank) and (update.dst_extent.peer_rank in src_translated_peer_ranks) ) ): self._dst.rank_logger.debug( "Copying update: mdpr=%s, mspr=%s\nsrc_t_ranks=%s\ndst_t_ranks=%s\n%s\n%s", my_dst_peer_rank, my_src_peer_rank, src_translated_peer_ranks, dst_translated_peer_ranks, update._header_str, update ) update.copyto(dst_lndarray, src_lndarray, casting=self._casting)
[docs] def do_locale_rma_update(self): """ Performs RMA to get elements from remote locales to update the locale extent array. """ can_use_existing_src_peer_comm = self.calc_can_use_existing_src_peer_comm() self._dst.rank_logger.debug( "%s.%s: " + "can_use_existing_src_peer_comm=%s", self.__class__.__name__, "do_locale_rma_update", can_use_existing_src_peer_comm ) if can_use_existing_src_peer_comm: inter_win = _mpi.WIN_NULL if self._dst.locale_comms.have_valid_inter_locale_comm: inter_win = self._inter_win update_executor = \ _RmaUpdateExecutor( inter_win=inter_win, dst_lndarray=self._dst.lndarray_proxy.lndarray, src_inter_win_rank_attr="peer_rank", rank_logger=self._dst.rank_logger ) # Fetch remote data. updates = self._dst_rget_updates[self._dst.this_locale.inter_locale_rank] update_executor.do_locale_rma_update(updates) else: raise RuntimeError( ( "can_use_existing_src_peer_comm=%s: " + "incompatible dst inter_locale_comma and src peer_comm." ) % (can_use_existing_src_peer_comm,) ) self._dst.locale_comms.intra_locale_comm.barrier()
[docs] def do_locale_update(self): self.do_locale_rma_update() self.do_locale_cpy2_update()
[docs] def do_update(self): self.barrier() self._dst.locale_comms.rank_logger.debug( "%s: BEG: do_locale_cpy2_update()...", self.__class__.__name__ ) self.do_locale_cpy2_update() self._dst.locale_comms.rank_logger.debug( "%s: END: do_locale_cpy2_update().", self.__class__.__name__ ) self.barrier() self._dst.locale_comms.rank_logger.debug( "%s: BEG: do_locale_rma_update()...", self.__class__.__name__ ) self.do_locale_rma_update() self._dst.locale_comms.rank_logger.debug( "%s: END: do_locale_rma_update().", self.__class__.__name__ ) self.barrier()
[docs] def barrier(self): """ MPI barrier. """ self._dst.locale_comms.rank_logger.debug( "%s: BEG: self._src.locale_comms.peer_comm.barrier()...", self.__class__.__name__ ) self._src.locale_comms.peer_comm.barrier() self._dst.locale_comms.rank_logger.debug( "%s: END: self._src.locale_comms.peer_comm.barrier().", self.__class__.__name__ )
[docs]class gndarray(_NDArrayOperatorsMixin): """ A Partitioned Global Address Space array with :obj:`numpy.ndarray` API. """ def __new__( cls, comms_and_distrib, rma_window_buffer, lndarray_proxy ): """ Construct, at least one of :samp:{shape} or :samp:`comms_and_distrib` should be specified (i.e. at least one should not be :samp:`None`). :type comms_and_distrib: :obj:`mpi_array.distribution.Decomposition` :param comms_and_distrib: Array distribution info and used to allocate (possibly) shared memory. """ self = _NDArrayOperatorsMixin.__new__(cls) self._comms_and_distrib = comms_and_distrib self._rma_window_buffer = rma_window_buffer self._lndarray_proxy = lndarray_proxy self._halo_updater = None return self
[docs] def free(self): """ Collective (all samp:`peer_comm` processes) free of MPI windows (and locale array memory). """ self._halo_updater = None if self._comms_and_distrib is not None: self._comms_and_distrib = None if self._lndarray_proxy is not None: self._lndarray_proxy.free() self._lndarray_proxy = None if self._rma_window_buffer is not None: self._rma_window_buffer.free() self._rma_window_buffer = None
def __del__(self): """ Calls :meth:`free`. """ self.free() def __enter__(self): """ For use with :samp:`with` contexts. """ return self def __exit__(self, type, value, traceback): """ For use with :samp:`with` contexts. """ self.free() return False def __getitem__(self, i): """ """ self.rank_logger.debug("__getitem__: i=%s", i) return None def __setitem__(self, i, v): """ """ self.rank_logger.debug("__setitem__: i=%s, v=%s", i, v) def __array_ufunc__(self, *args, **kwargs): """ """ from . import globale_ufunc as _globale_ufunc return _globale_ufunc.gndarray_array_ufunc(self, *args, **kwargs) @property def this_locale(self): return self._comms_and_distrib.this_locale @property def locale_comms(self): return self._comms_and_distrib.locale_comms @property def distribution(self): return self._comms_and_distrib.distribution @property def comms_and_distrib(self): return self._comms_and_distrib @property def rma_window_buffer(self): return self._rma_window_buffer @property def lndarray_proxy(self): return self._lndarray_proxy @property def ndim(self): return len(self.shape) @property def num_locales(self): """ """ return self._comms_and_distrib.locale_comms.num_locales @property def shape(self): return self._comms_and_distrib.distribution.globale_extent.shape_n @property def dtype(self): return self._lndarray_proxy.dtype @property def order(self): return self._lndarray_proxy.md.order @property def view_n(self): return self._lndarray_proxy.view_n @property def view_h(self): return self._lndarray_proxy.view_h @property def rank_view_n(self): return self._lndarray_proxy.rank_view_n @property def rank_view_h(self): return self._lndarray_proxy.rank_view_h @property def rank_logger(self): """ """ return self._comms_and_distrib.locale_comms.rank_logger @property def root_logger(self): """ """ return self._comms_and_distrib.locale_comms.root_logger
[docs] def initialise_windows(self): """ Creates the RMA windows required for inter-locale (and peer) one-sided RMA comms. """ self.rma_window_buffer.initialise_windows()
[docs] def intra_locale_barrier(self): """ """ self.rank_logger.debug( "BEG: self.comms_and_distrib.locale_comms.intra_locale_comm.barrier()..." ) self.comms_and_distrib.locale_comms.intra_locale_comm.barrier() self.rank_logger.debug( "END: self.comms_and_distrib.locale_comms.intra_locale_comm.barrier()." )
[docs] def inter_locale_barrier(self): """ """ if self.comms_and_distrib.locale_comms.have_valid_inter_locale_comm: self.rank_logger.debug( "BEG: self.comms_and_distrib.locale_comms.inter_locale_comm.barrier()..." ) self.comms_and_distrib.locale_comms.inter_locale_comm.barrier() self.rank_logger.debug( "END: self.comms_and_distrib.locale_comms.inter_locale_comm.barrier()." )
@property def halo_updater(self): if self._halo_updater is None: self._halo_updater = \ PerAxisRmaHaloUpdater( locale_extents=self.distribution.locale_extents, dtype=self.dtype, order=self.order, inter_locale_win=self.rma_window_buffer.inter_locale_win, dst_buffer=self.lndarray_proxy.lndarray ) self._halo_updater.rank_logger = self.rank_logger self._halo_updater.root_logger = self.root_logger return self._halo_updater
[docs] def update(self): """ """ # If running on single locale then there are no halos to update. if self.comms_and_distrib.locale_comms.num_locales > 1: rank_logger = self.comms_and_distrib.locale_comms.rank_logger # Only communicate data between the ranks # of self.comms_and_distrib.locale_comms.inter_locale_comm self.comms_and_distrib.locale_comms.peer_comm.barrier() if ( self.comms_and_distrib.locale_comms.have_valid_inter_locale_comm ): rank_logger.debug( "BEG: update_halos..." ) self.halo_updater.update_halos() rank_logger.debug( "END: update_halos." ) self.intra_locale_barrier()
[docs] def calculate_copyfrom_updates(self, src, casting="same_kind"): return \ RmaRedistributeUpdater( self, src, casting )
[docs] def copyfrom(self, src, casting="same_kind"): """ Copy the elements of the :samp:`{src}` array to corresponding elements of the :samp:`{self}` array. :type src: :obj:`gndarray` :param src: Global array from which elements are copied. :type casting: :obj:`str` :param casting: See :samp:`{casting}` parameter in :func:`numpy.copyto`. """ if not isinstance(src, gndarray): raise ValueError( "Got type(src)=%s, expected %s." % (type(src), gndarray) ) redistribute_updater = self.calculate_copyfrom_updates(src, casting) redistribute_updater.do_update()
[docs] def all(self, **unused_kwargs): return \ self.locale_comms.peer_comm.allreduce( bool(self.lndarray_proxy.rank_view_n.all()), op=_mpi.BAND )
[docs] def fill(self, value): """ Fill the array (excluding ghost elements) with a scalar value. :type value: scalar :param value: All non-ghost elements will be assigned this value. """ self.lndarray_proxy.fill(value) self.intra_locale_barrier()
[docs] def fill_h(self, value): """ Fill all array elements (including ghost elements) with a scalar value. :type value: scalar :param value: All elements will be assigned this value. """ self.lndarray_proxy.fill_h(value) self.intra_locale_barrier()
[docs] def copy(self, order='C'): from . import globale_creation as _globale_creation ary_out = _globale_creation.empty_like(self, order=order) ary_out.lndarray_proxy.rank_view_partition_h[...] = \ self.lndarray_proxy.rank_view_partition_h[...] self.intra_locale_barrier() return ary_out
[docs] def get_view(self, slice=None, start=None, stop=None, halo=0): """ Returns :samp:`(ary, extent)` pair, where :samp:`ary` is a view from the locale extent array corresponding to the specified extent arguments. If any of the globale slice lies outside the locale extent, then :samp:`ary` is :samp:`None`. The :samp:`extent` element is a :obj:`mpi_array.distribution.LocaleExtent` instance which corresponds to the specified extent arguments. """ if slice is not None: tmp = _np.array(list([s.start, s.stop] for s in slice)) start = tmp[:, 0] stop = tmp[:, 1] # Create an extent object equivalent to the argument slice. locale_extent = self.lndarray_proxy.locale_extent dst_extent =\ _LocaleExtent( peer_rank=locale_extent.peer_rank, inter_locale_rank=locale_extent.inter_locale_rank, start=start, stop=stop, slice=slice, globale_extent=self.distribution.globale_extent, halo=halo, ) locale_ary = None if _np.all( _np.logical_and( dst_extent.start_h >= locale_extent.start_n, dst_extent.stop_h <= locale_extent.stop_n ) ): # Can return a view of the locale array data shape = dst_extent.shape_h lstart = locale_extent.globale_to_locale_h(dst_extent.start_h) lstop = lstart + shape slc = tuple(_builtin_slice(lstart[a], lstop[a]) for a in range(locale_extent.ndim)) locale_ary = self.lndarray_proxy.lndarray[slc] return locale_ary, dst_extent
[docs] def reshape(self, shape): """ Returns an array containing the same data with a new shape equal to :samp:`{shape}`. """ raise NotImplementedError()
[docs] def locale_get(self, slice=None, start=None, stop=None, halo=0): """ Collective over :samp:`{self}.comms.intra_locale_comm` to get a portion of the globale array. Returns a view from the locale extent of the array if possible, otherwise allocates shared memory and performs one-sided RMA to fetch data from remote locales. """ locale_ary, dst_extent = self.get_view(slice=slice, start=start, stop=stop, halo=halo) if locale_ary is None: # Need to fetch remote data if not self.rma_window_buffer.inter_locale_win_initialised: raise ValueError( "Attempting inter-locale one-sided RMA without having created" + " the inter-locale window, call the initialise_windows method" + " (all *peer* ranks)" + " to create windows before performing one-sided RMA." ) # Allocate (shared) memory for the data to be returned. locale_ary = \ _win_lndarray( shape=dst_extent.shape_h, dtype=self.dtype, comm=self.locale_comms.intra_locale_comm ) if self.locale_comms.have_valid_inter_locale_comm: # Calculate the update objects which indicate where to fetch the data. update_calculator = \ _MpiUpdatesForGet( dst_extent=dst_extent, src_distrib=self.distribution, dtype=self.dtype, order=self.order, update_dst_halo=True ) update_executor = \ _RmaUpdateExecutor( inter_win=self.rma_window_buffer.inter_locale_win, dst_lndarray=locale_ary, src_inter_win_rank_attr="inter_locale_rank", rank_logger=self.rank_logger ) # Perform the updates, copy locale array data to locale_ary first. updates = update_calculator._dst_cpy2_updates[dst_extent.inter_locale_rank] update_executor.do_direct_cpy2_update(updates, self.lndarray_proxy.lndarray) # Fetch remote data. updates = update_calculator._dst_rget_updates[dst_extent.inter_locale_rank] update_executor.do_locale_rma_update(updates) # All locale processes wait for data fetch to conclude self.intra_locale_barrier() return locale_ary
[docs] def peer_rank_get(self, slice=None, start=None, stop=None, halo=0): """ Non-collective, one-sided fetch of data to this peer rank process. Returns a view from the locale extent of the array if possible, otherwise allocates non-shared memory and performs one-sided RMA to fetch data from remote locales. """ locale_ary, dst_extent = self.get_view(slice=slice, start=start, stop=stop, halo=halo) if locale_ary is None: # Need to fetch remote data if not self.rma_window_buffer.peer_win_initialised: raise ValueError( "Attempting peer one-sided RMA without having created" + " the peer window, call the initialise_windows method (all *peer* ranks)" + " to create windows before performing one-sided RMA." ) # Allocate memory for the data to be returned. locale_ary = \ _win_lndarray( shape=dst_extent.shape_h, dtype=self.dtype, comm=_mpi.COMM_SELF ) update_calculator = \ _MpiUpdatesForGet( dst_extent=dst_extent, src_distrib=self.distribution, dtype=self.dtype, order=self.order, update_dst_halo=True ) update_executor = \ _RmaUpdateExecutor( inter_win=self.rma_window_buffer.peer_win, dst_lndarray=locale_ary, src_inter_win_rank_attr="peer_rank", rank_logger=self.rank_logger ) # Perform the updates, copy locale array data to locale_ary first. updates = update_calculator._dst_cpy2_updates[dst_extent.inter_locale_rank] update_executor.do_direct_cpy2_update(updates, self.lndarray_proxy.lndarray) # Fetch remote data. updates = update_calculator._dst_rget_updates[dst_extent.inter_locale_rank] update_executor.do_locale_rma_update(updates) return locale_ary
def free_all(objects): """ Call the :samp:`free` attribute on all arguments. :type objects: sequence of :obj:`object` :param objects: Call the :samp:`free` attribute for all objects in this sequence (if it exists and it is :obj:`callable`). """ for obj in objects: if hasattr(obj, "free") and hasattr(obj.free, "__call__"): obj.free()
[docs]def copyto(dst, src, casting="same_kind", **kwargs): """ Copy the elements of the :samp:`{src}` array to corresponding elements of the :samp:`dst` array. :type dst: :obj:`gndarray` :param dst: Global array which receives elements. :type src: :obj:`gndarray` :param src: Global array from which elements are copied. :type casting: :obj:`str` :param casting: See :samp:`{casting}` parameter in :func:`numpy.copyto`. """ if not isinstance(dst, gndarray): raise ValueError( "Got type(dst)=%s, expected %s." % (type(dst), gndarray) ) dst.copyfrom(src, casting=casting)
__all__ = [s for s in dir() if not s.startswith('_')]