"""
=================================
The :mod:`mpi_array.comms` Module
=================================
MPI communicators and windows for locales.
Classes
=======
.. autosummary::
:toctree: generated/
LocaleCommsInfo - :obj:`LocaleComms` communicator related data members.
LocaleComms - Intra-locale and inter-locale communicators.
CartLocaleCommsInfo - :obj:`CartLocaleComms` communicator related data members.
CartLocaleComms - Intra-locale and cartesian-inter-locale communicators.
CommsAndDistribution - Tuple with :obj:`LocaleComms` and :obj:`Distribution`.
ThisLocaleInfo - The inter_locale_comm inter_locale_rank and corresponding peer_comm peer_rank.
RmaWindowBuffer - Container for locale array buffer and associated RMA windows.
Factory Functions
=================
.. autosummary::
:toctree: generated/
create_locale_comms_info - Creates a :obj:`LocaleCommsInfo` instance.
get_locale_comms_info - Finds or creates a :obj:`LocaleCommsInfo` instance.
create_cart_locale_comms_info - Creates a :obj:`CartLocaleCommsInfo` instance.
get_cart_locale_comms_info - Finds or creates a :obj:`CartLocaleCommsInfo` instance.
create_locale_comms - Factory function for creating :obj:`LocaleComms` instances.
create_block_distribution - Factory function for creating :obj:`BlockPartition` instances.
create_cloned_distribution - Factory function for creating :obj:`ClonedDistribution` instances.
create_single_locale_distribution - Creating :obj:`SingleLocaleDistribution` instances.
create_distribution - Factory function for creating :obj:`Distribution` instances.
Attributes
==========
.. autosummary::
:toctree: generated/
LT_PROCESS
LT_NODE
DT_BLOCK
DT_SLAB
DT_CLONED
DT_SINGLE_LOCALE
"""
from __future__ import absolute_import
from .license import license as _license, copyright as _copyright, version as _version
import sys as _sys
import copy as _copy
import mpi4py.MPI as _mpi
import numpy as _np
import collections as _collections
import array_split as _array_split
from . import logging as _logging
from .distribution import BlockPartition, ClonedDistribution, SingleLocaleDistribution
from .utils import log_shared_memory_alloc as _log_shared_memory_alloc
from .utils import log_memory_alloc as _log_memory_alloc
__author__ = "Shane J. Latham"
__license__ = _license()
__copyright__ = _copyright()
__version__ = _version()
def mpi_version():
"""
Return the MPI API version.
:rtype: :obj:`int`
:return: MPI major version number.
"""
return _mpi.VERSION
class CommsCache(object):
"""
Cache for re-using the MPI communicators (and associated info)
when constructing :obj:`LocaleComms` (and :obj:`CartLocaleComms`)
instances.
"""
def __init__(self):
"""
Construct cache.
"""
self._lookup = []
self._rank_logger = \
_logging.get_rank_logger(str(__name__ + "." + self.__class__.__name__))
@property
def lookup(self):
"""
The :obj:`list` of cached items. Each entry is
a pair :samp:`(comms_tuple, obj)` where the *key* :samp:`comms_tuple` is
a sequence of :obj:`mpi4py.MPI.Comm` objects and :samp:`obj` is
the *value* associated with this *key*.
"""
return self._lookup
def clear(self):
"""
Clears the cache.
"""
self._lookup = []
def create_key_equality_tuple(self, key0, key1):
"""
Returns a :obj:`tuple` of :obj:`bool` elements indicating
the per-element equality of the sequence :samp:`{key0}`
and sequence :samp:`{key1}`. Assumes that :samp:`len({key0}) == len({key1})`.
:type key0: :obj:`tuple`
:param key0: A look-up key.
:type key1: :obj:`tuple`
:param key1: A look-up key.
:rtype: :obj:`tuple` of :obj:`bool` elements
:return: Per element equality of :samp:`{key0}` and :samp:`{key1}` keys.
"""
return \
tuple(
(
(
(
(key0[j] is None)
or
(key1[j] is None)
or
isinstance(key0[j], _mpi.Comm)
or
isinstance(key1[j], _mpi.Comm)
)
and
(key0[j] is key1[j])
)
or
(
(
(key0[j] is not None)
and
(key1[j] is not None)
and
(not isinstance(key0[j], _mpi.Comm))
and
(not isinstance(key1[j], _mpi.Comm))
)
and
(_np.asanyarray(key0[j]).size == _np.asanyarray(key1[j]).size)
and
_np.all(_np.asanyarray(key0[j]) == _np.asanyarray(key1[j]))
)
)
for j in range(len(key1))
)
def find(self, key):
"""
Returns the *value* associated with the *key* :samp:`{key}`.
:type key: sequence of :obj:`object`
:param key: Key for look-up, typically a :samp:`tuple`
of elements of type :obj:`mpi4py.MPI.Comm`, :samp:`None`, integer-scalar
or integer-sequence.
:rtype: :obj:`object` or :samp:`None`
:return: The object associated with the key :samp:`{key}`,
or :samp:`None` if the key :samp:`{key}` is not found.
"""
value = None
for i in range(len(self._lookup)):
current_key = self._lookup[i][0]
if (
(len(current_key) == len(key))
and
_np.all(self.create_key_equality_tuple(key, current_key))
):
value = self._lookup[i][1]
value.rank_logger.debug(
"key:\n%s matched existing key:\n%s, value=\n%s\n%s",
key, current_key, value, self.create_key_equality_tuple(key, current_key)
)
break
if value is None:
self._rank_logger.debug("No match for:\n%s", (key,))
return value
def add(self, key, value):
"""
Adds a :samp:`({key}, {value})` pair to this cache.
:type key: :obj:`tuple`
:param key: Key for look-up.
:type value: :obj:`object`
:param value: Object associated with the :samp:`{key}` key.
"""
existing_value = self.find(key)
if existing_value is not None:
raise ValueError(
"The key %s already has an associated value=%s"
%
(key, existing_value)
)
self._lookup.append((key, value))
ThisLocaleInfo = _collections.namedtuple("ThisLocaleInfo", ["inter_locale_rank", "peer_rank"])
if (_sys.version_info[0] >= 3) and (_sys.version_info[1] >= 5):
ThisLocaleInfo.__doc__ = \
"""
Pair of communicator rank values :samp:`(inter_locale_rank, peer_rank)` which
indicates that the rank :samp:`inter_locale_rank` of the :samp:`inter_locale_comm`
communicator corresponds to the :samp:`peer_rank` rank of the :samp:`peer_comm`
communicator.
"""
ThisLocaleInfo.inter_locale_rank.__doc__ = \
"A :obj:`int` indicating the rank of :samp:`inter_locale_comm` communicator."
ThisLocaleInfo.peer_rank.__doc__ = \
"A :obj:`int` indicating the rank of :samp:`peer_comm` communicator."
[docs]class RmaWindowBuffer(object):
"""
Details of the buffer allocated on a locale.
"""
[docs] def __init__(
self,
is_shared,
shape,
dtype,
itemsize,
intra_locale_win,
peer_comm=None,
inter_locale_comm=None,
peer_win=None,
inter_locale_win=None,
root_logger=None,
rank_logger=None
):
object.__init__(self)
self._is_shared = is_shared
self._shape = shape
self._dtype = dtype
self._itemsize = itemsize
self._intra_locale_root_rank = 0
self._intra_locale_win = intra_locale_win
self._peer_win = peer_win
self._inter_locale_win = inter_locale_win
self._peer_comm = peer_comm
self._inter_locale_comm = inter_locale_comm
self._root_logger = root_logger
self._rank_logger = rank_logger
@property
def root_logger(self):
"""
A :obj:`logging.Logger` for logging messages from the *root* MPI
process of :attr:`peer_comm`.
"""
if self._root_logger is None:
self._root_logger = \
_logging.get_root_logger(
__name__ + "." + self.__class__.__name__,
comm=self._peer_comm
)
return self._root_logger
@property
def rank_logger(self):
"""
A :obj:`logging.Logger` for logging messages from all rank MPI
processes of :attr:`peer_comm`.
"""
if self._rank_logger is None:
self._rank_logger = \
_logging.get_rank_logger(
__name__ + "." + self.__class__.__name__,
comm=self._peer_comm
)
return self._rank_logger
@property
def is_shared(self):
"""
A :obj:`bool`, if :samp:`True` then buffer memory was allocated
using :meth:`mpi4py.MPI.Win.Allocate_shared`, otherwise memory
was allocated using :meth:`mpi4py.MPI.Win.Allocate`.
"""
return self._is_shared
@property
def shape(self):
"""
A sequence of :obj:`int` indicating the shape of the locale array.
"""
return self._shape
@property
def dtype(self):
"""
A :obj:`numpy.dtype` indicating the data type of elements
stored in the array.
"""
return self._dtype
@property
def itemsize(self):
"""
An :obj:`int` indicating the number of bytes per array element
(same as :attr:`numpy.dtype.itemsize`).
"""
return self._itemsize
@property
def peer_win(self):
"""
The :obj:`mpi4py.MPI.Win` created from the :samp:`peer_comm` communicator
which exposes :attr:`buffer` for inter-locale RMA access.
"""
if self._peer_win is None:
self._peer_win = self.create_peer_win()
return self._peer_win
@property
def intra_locale_win(self):
"""
The :obj:`mpi4py.MPI.Win` created from the :samp:`intra_locale_comm` communicator
which exposes :attr:`buffer` for intra-locale RMA access.
When :samp:`{intra_locale_win}.group.size > 1` then :attr:`buffer` was
allocated as shared memory (using :meth:`mpi4py.MPI.Win.Allocate_shared`).
"""
return self._intra_locale_win
@property
def inter_locale_win(self):
"""
The :obj:`mpi4py.MPI.Win` created from the :samp:`inter_locale_comm` communicator
which exposes :attr:`buffer` for inter-locale RMA access.
"""
if self._inter_locale_win is None:
self._inter_locale_win = self.create_inter_locale_win()
return self._inter_locale_win
@property
def peer_comm(self):
"""
The *peer* :obj:`mpi4py.MPI.Comm` MPI communicator.
"""
return self._peer_comm
@property
def inter_locale_comm(self):
"""
The *inter-locale* :obj:`mpi4py.MPI.Win` MPI communicator.
"""
return self._inter_locale_comm
@property
def intra_locale_win_memory(self):
"""
The memory allocated using one of :meth:`mpi4py.MPI.Win.Allocate`
or :meth:`mpi4py.MPI.Win.Allocate_shared`. This memory is used to store
elements of the globale array apportioned to a locale.
"""
if self.is_shared:
buffer, itemsize = self.intra_locale_win.Shared_query(0)
else:
buffer = get_win_memory(self.intra_locale_win)
return buffer
@property
def buffer(self):
"""
A :samp:`numpy.ndarray` with memory allocated using one of :meth:`mpi4py.MPI.Win.Allocate`
or :meth:`mpi4py.MPI.Win.Allocate_shared` as the buffer.
.. seealso: :attr:`intra_locale_win_memory`
"""
return _np.array(self.intra_locale_win_memory, dtype='B', copy=False)
[docs] def create_inter_locale_win(self):
"""
Creates a :obj:`mpi4py.MPI.Win` window (over the :attr:`inter_locale_comm`
communicator) exposing the :attr:`buffer` memory.
:rtype: :obj:`mpi4py.MPI.Win`
:return: Newly created window exposing the :attr:`buffer` memory.
Returns :attr:`mpi4py.MPI.WIN_NULL` on processes which
have :attr:`inter_locale_comm` equal to :attr:`mpi4py.MPI.COMM_NULL`.
"""
inter_locale_win = _mpi.WIN_NULL
if (self._inter_locale_comm != _mpi.COMM_NULL) and (self._inter_locale_comm is not None):
self.rank_logger.debug("BEG: Win.Create for self.inter_locale_comm")
inter_locale_win = \
_mpi.Win.Create(
self.intra_locale_win_memory,
self.itemsize,
comm=self._inter_locale_comm
)
self.rank_logger.debug("END: Win.Create for self.inter_locale_comm")
return inter_locale_win
[docs] def create_peer_win(self):
"""
Creates a :obj:`mpi4py.MPI.Win` window (over the :attr:`peer_win`
communicator) exposing the :attr:`buffer` memory.
:rtype: :obj:`mpi4py.MPI.Win`
:return: Newly created window exposing the :attr:`buffer` memory.
"""
if self._inter_locale_comm != _mpi.COMM_NULL:
peer_buffer = self.intra_locale_win_memory
peer_buffer_nbytes = len(self.intra_locale_win_memory)
else:
peer_buffer_nbytes = 0
peer_buffer = None
self.rank_logger.debug(
"BEG: Win.Create for self.peer_comm, buffer.nbytes=%s...",
peer_buffer_nbytes
)
peer_win = _mpi.Win.Create(peer_buffer, self.itemsize, comm=self._peer_comm)
self.rank_logger.debug(
"END: Win.Create for self.peer_comm, buffer.nbytes=%s...",
peer_buffer_nbytes
)
return peer_win
@property
def peer_win_initialised(self):
"""
Returns :samp:`True` if *peer* RMA window (:attr:`peer_win`) has been
created. :samp:`False` otherwise.
"""
return self._peer_win is not None
@property
def inter_locale_win_initialised(self):
"""
Returns :samp:`True` if *inter-locale* RMA window (:attr:`inter_locale_win`)
has been created. :samp:`False` otherwise.
"""
return self._inter_locale_win is not None
@property
def windows_initialised(self):
"""
Returns :samp:`True` if both *peer* RMA window (:attr:`peer_win`)
and *inter-locale* RMA window (:attr:`inter_locale_win`) have been
created. :samp:`False` otherwise.
"""
return (self._peer_win is not None) and (self._inter_locale_win is not None)
[docs] def initialise_windows(self):
"""
Creates both of the :attr:`peer_win`
and :attr:`inter_locale_win` :obj:`mpi4py.MPI.Win` objects.
"""
return self.peer_win, self.inter_locale_win
[docs] def free(self):
"""
Free MPI windows and associated buffer memory.
"""
if (self._inter_locale_win != _mpi.WIN_NULL) and (self._inter_locale_win is not None):
self._inter_locale_win.Free()
self._inter_locale_win = _mpi.WIN_NULL
if (self._peer_win != _mpi.WIN_NULL) and (self._peer_win is not None):
self._peer_win.Free()
self._peer_win = _mpi.WIN_NULL
if self._intra_locale_win != _mpi.WIN_NULL:
self._intra_locale_win.Free()
self._intra_locale_win = _mpi.WIN_NULL
LocaleCommsInfo = \
_collections.namedtuple(
"LocaleCommsInfo",
[
"peer_comm",
"intra_locale_comm",
"inter_locale_comm",
"num_locales",
"peer_ranks_per_locale",
"rank_logger",
"root_logger"
]
)
if (_sys.version_info[0] >= 3) and (_sys.version_info[1] >= 5):
LocaleCommsInfo.__doc__ = \
"""
Communicators associated with a locale.
"""
_comms_info_cache = CommsCache()
[docs]def create_locale_comms_info(
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None
):
"""
Creates a :obj:`LocaleCommsInfo` associated with
the specified communicators. Collective over :samp:`{peer_comm}`.
"""
if peer_comm is None:
peer_comm = _mpi.COMM_WORLD
rank_logger = _logging.get_rank_logger(__name__ + "." + "create_locale_comms_info", peer_comm)
if intra_locale_comm is None:
rank_logger.debug(
"BEG: Splitting peer_comm with peer_comm.Split_type(COMM_TYPE_SHARED, ...)"
)
intra_locale_comm = peer_comm.Split_type(_mpi.COMM_TYPE_SHARED, key=peer_comm.rank)
rank_logger.debug(
"END: Splitting peer_comm with peer_comm.Split_type(COMM_TYPE_SHARED, ...)"
)
if inter_locale_comm is None:
color = _mpi.UNDEFINED
if intra_locale_comm.rank == 0:
color = 0
rank_logger.debug("BEG: peer_comm.Split to create inter_locale_comm.")
inter_locale_comm = peer_comm.Split(color, peer_comm.rank)
rank_logger.debug("END: peer_comm.Split to create inter_locale_comm.")
peer_ranks_per_locale = None
num_locales = None
bad_inter_rank_to_intra_rank = tuple()
if inter_locale_comm != _mpi.COMM_NULL:
if (
_mpi.Group.Translate_ranks(
inter_locale_comm.group,
(inter_locale_comm.rank,),
intra_locale_comm.group
)[0]
!=
0
):
# Collect the inconsistent inter-locale-rank to intra-locale-rank
# translations so that all peer_comm ranks can raise a ValueError
# (i.e. avoid only raising ValueError on some inter_locale_comm ranks)
bad_inter_rank_to_intra_rank = \
(
(
peer_comm.rank,
inter_locale_comm.rank,
_mpi.Group.Translate_ranks(
inter_locale_comm.group,
(inter_locale_comm.rank,),
intra_locale_comm.group
)[0]
),
)
num_locales = inter_locale_comm.size
peer_ranks_per_locale = \
_np.array(
_mpi.Group.Translate_ranks(
intra_locale_comm.group,
_np.arange(0, intra_locale_comm.size),
peer_comm.group
),
dtype="int64"
).reshape((intra_locale_comm.size,))
rank_logger.debug("BEG: inter_locale_comm.allgather for peer-ranks-per-locale...")
all = \
inter_locale_comm.allgather(
(bad_inter_rank_to_intra_rank, peer_ranks_per_locale)
)
rank_logger.debug("END: inter_locale_comm.allgather for peer-ranks-per-locale.")
all_bad = tuple(a[0] for a in all)
all_prpl = tuple(a[1] for a in all)
# Assign peer_ranks_per_locale as 1D array of object
# in case intra_locale.comm.size is not the same on
# all locales.
peer_ranks_per_locale = \
_np.ones((len(all_prpl),), dtype="object")
for i in range(len(all_prpl)):
peer_ranks_per_locale[i] = all_prpl[i]
if (
_np.all(
tuple(
len(peer_ranks_per_locale[i]) == len(peer_ranks_per_locale[0])
for i in range(len(peer_ranks_per_locale))
)
)
):
# If all locales have the same intra_locale_comm.size,
# then just use a 2D numpy.ndarray of 'int64' elements
# (rather than an 1D array of variable-length 'object' elements
num_peer_ranks_per_locale = len(peer_ranks_per_locale[0])
peer_ranks_per_locale = \
_np.array(peer_ranks_per_locale.tolist(), dtype="int64")
peer_ranks_per_locale = \
peer_ranks_per_locale.reshape(
(num_locales, num_peer_ranks_per_locale)
)
bad_inter_rank_to_intra_rank = sum(all_bad, ())
del all, all_bad, all_prpl
rank_logger.debug("BEG: intra_locale_comm.bcast for peer-ranks-per-locale...")
bad_inter_rank_to_intra_rank, peer_ranks_per_locale, num_locales = \
intra_locale_comm.bcast(
(bad_inter_rank_to_intra_rank, peer_ranks_per_locale, num_locales),
0
)
rank_logger.debug("END: intra_locale_comm.bcast for peer-ranks-per-locale...")
if len(bad_inter_rank_to_intra_rank) > 0:
raise ValueError(
"Got invalid inter_locale_comm rank to intra_locale_comm rank translation:\n"
"\n".join(
(
(
"peer rank=%s, inter_locale_comm rank=%s translated to "
+
"intra_locale_comm rank=%s, should be intra_locale_comm rank=0"
)
%
bad_translation
for bad_translation in bad_inter_rank_to_intra_rank
)
)
)
if num_locales > peer_comm.size:
raise ValueError(
"Got number of locales=%s which is greater than peer_comm.size=%s"
%
(num_locales, peer_comm.size)
)
return \
LocaleCommsInfo(
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm,
num_locales=num_locales,
peer_ranks_per_locale=peer_ranks_per_locale,
rank_logger=_logging.get_rank_logger(
__name__ + "." + LocaleComms.__name__,
comm=peer_comm
),
root_logger=_logging.get_root_logger(
__name__ + "." + LocaleComms.__name__,
comm=peer_comm
)
)
[docs]def get_locale_comms_info(
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None
):
"""
Finds or creates a :obj:`LocaleCommsInfo` associated with
the specified communicators. Collective over :samp:`{peer_comm}`.
"""
global _comms_info_cache
key = (peer_comm, intra_locale_comm, inter_locale_comm)
comms_info = _comms_info_cache.find(key)
if comms_info is None:
comms_info = \
create_locale_comms_info(
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)
_comms_info_cache.add(key, comms_info)
key = (comms_info.peer_comm, comms_info.intra_locale_comm, comms_info.inter_locale_comm)
if _comms_info_cache.find(key) is None:
_comms_info_cache.add(key, comms_info)
return comms_info
def get_win_memory(win):
"""
Returns the memory buffer associated with the specified :samp:`{win}` MPI window.
:type win: :obj:`mpi4py.MPI.Win`
:param win: Return memory for this window.
"""
if hasattr(win, "memory"):
buffer = win.memory
else:
buffer = win.tomemory()
return buffer
[docs]class LocaleComms(object):
"""
MPI communicators for inter and intra locale data exchange. There are three
communicators:
:attr:`peer_comm`
Typically this is :attr:`mpi4py.MPI.COMM_WORLD`. It is the group
of processes which operate on (perform computations on portions of) a globale array.
:attr:`intra_locale_comm`
Can be :attr:`mpi4py.MPI.COMM_SELF`, but is more typically
the communicator returned
by :samp:`self.peer_comm.Split_type(mpi4py.MPI.COMM_TYPE_SHARED, key=self.peer_comm.rank)`.
It is the communicator passed to the :func:`mpi4py.MPI.Win.Allocate_shared` function
which allocates shared-memory and creates a shared-memory :obj:`mpi4py.MPI.Win` window.
:attr:`inter_locale_comm`
Typically this communicator is formed by selecting a single process from each locale.
This communicator (and associated `mpi4py.MPI.Win` window) is used to exchange
data between locales.
"""
[docs] def __init__(
self,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None,
comms_info=None
):
"""
Construct, this is a collective call over the :samp:`{peer_comm}` communcator.
:type peer_comm: :obj:`mpi4py.MPI.Comm`
:param peer_comm: Communicator which is split according to
shared memory allocation (uses :meth:`mpi4py.MPI.Comm.Split_type`).
If :samp:`None`, uses :attr:`mpi4py.MPI.COMM_WORLD`.
:type intra_locale_comm: :obj:`mpi4py.MPI.Comm`
:param intra_locale_comm: Intra-locale communicator.
Should be a subset of processes returned
by :samp:`{peer_comm}.Split_type(mpi4py.MPI.COMM_TYPE_SHARED)`.
If :samp:`None`, :samp:`{peer_comm}` is *split* into groups
which can use a MPI window to allocate shared memory
(i.e. in this case locale is a (possibly NUMA) node).
Can also specify as :samp:`mpi4py.MPI.COMM_SELF`, in which case the
locale is a single process and regular (non-shared) memory
is not allocated in :meth:`alloc_locale_buffer`.
:type inter_locale_comm: :obj:`mpi4py.MPI.Comm`
:param inter_locale_comm: Inter-locale communicator used to exchange
data between different locales. If :samp:`None` then one process
(the :samp:`{intra_locale_comm}.rank == 0`
process) is selected from each locale to form the :samp:`{inter_locale_comm}`
communicator group.
:type comms_info: :obj:`LocaleCommsInfo`
:param comms_info: Convenience construction without need for lookup
via :func:`get_locale_comms_info`.
"""
object.__init__(self)
if comms_info is None:
comms_info = \
get_locale_comms_info(
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)
self._peer_comm = comms_info.peer_comm
self._intra_locale_comm = comms_info.intra_locale_comm
self._inter_locale_comm = comms_info.inter_locale_comm
self._num_locales = comms_info.num_locales
self._peer_ranks_per_locale = comms_info.peer_ranks_per_locale
self._rank_logger = comms_info.rank_logger
self._root_logger = comms_info.root_logger
[docs] def free(self):
"""
"""
pass
[docs] def alloc_locale_buffer(self, shape, dtype):
"""
Allocates a buffer using :meth:`mpi4py.MPI.Win.Allocate_shared` which
provides storage for the elements of the locale multi-dimensional array.
:type shape: sequence of :obj:`int`
:param shape: The shape of the locale array for which a buffer is allocated.
:type dtype: :obj:`numpy.dtype`
:param dtype: The array element type.
:rtype: :obj:`RmaWindowBuffer`
:returns: A :obj:`collections.namedtuple` containing allocated buffer
and associated RMA MPI windows.
"""
is_shared_alloc = False
self.rank_logger.debug("BEG: alloc_locale_buffer")
num_rank_bytes = 0
dtype = _np.dtype(dtype)
locale_shape = shape
rank_shape = locale_shape
if self.intra_locale_comm.rank == 0:
num_rank_bytes = int(_np.product(rank_shape) * dtype.itemsize)
else:
rank_shape = tuple(_np.zeros_like(rank_shape))
if self.intra_locale_comm.size > 1:
is_shared_alloc = True
_log_shared_memory_alloc(
self.rank_logger.debug, "BEG: ", num_rank_bytes, rank_shape, dtype
)
intra_locale_win = \
_mpi.Win.Allocate_shared(
num_rank_bytes,
dtype.itemsize,
comm=self.intra_locale_comm
)
buffer, itemsize = intra_locale_win.Shared_query(0)
_log_shared_memory_alloc(
self.rank_logger.debug, "END: ", num_rank_bytes, rank_shape, dtype, buffer=buffer
)
else:
_log_memory_alloc(
self.rank_logger.debug, "BEG: ", num_rank_bytes, rank_shape, dtype
)
intra_locale_win = \
_mpi.Win.Allocate(num_rank_bytes, dtype.itemsize, comm=self.peer_comm)
buffer = get_win_memory(intra_locale_win)
itemsize = dtype.itemsize
_log_memory_alloc(
self.rank_logger.debug, "END: ", num_rank_bytes, rank_shape, dtype, buffer=buffer
)
self.rank_logger.debug("END: alloc_local_buffer")
rma_windows = \
RmaWindowBuffer(
is_shared=is_shared_alloc,
shape=locale_shape,
dtype=dtype,
itemsize=itemsize,
intra_locale_win=intra_locale_win,
peer_comm=self.peer_comm,
inter_locale_comm=self.inter_locale_comm,
root_logger=self.root_logger,
rank_logger=self.rank_logger
)
return rma_windows
@property
def num_locales(self):
"""
An :samp:`int` indicating the number of *locales* over which an array is distributed.
"""
return self._num_locales
@property
def peer_comm(self):
"""
A :obj:`mpi4py.MPI.Comm` which is super-set of the :attr:`intra_locale_comm`
and :attr:`inter_locale_comm` communicators.
"""
return self._peer_comm
@property
def intra_locale_comm(self):
"""
A :obj:`mpi4py.MPI.Comm` object which defines the group of processes
which can allocate (and access) MPI window shared memory
(allocated via :meth:`mpi4py.MPI.Win.Allocate_shared` if available).
"""
return self._intra_locale_comm
@property
def inter_locale_comm(self):
"""
A :obj:`mpi4py.MPI.Comm` communicator defining the group of processes
which exchange data between locales.
"""
return self._inter_locale_comm
@inter_locale_comm.setter
def inter_locale_comm(self, inter_locale_comm):
self._inter_locale_comm = inter_locale_comm
@property
def have_valid_inter_locale_comm(self):
"""
Is :samp:`True` if this *peer rank* has :samp:`{self}.inter_locale_comm`
which is not :samp:`None` and is not :obj:`mpi4py.MPI.COMM_NULL`.
"""
return \
(
(self.inter_locale_comm is not None)
and
(self.inter_locale_comm != _mpi.COMM_NULL)
)
@property
def inter_locale_rank_to_peer_rank_map(self):
"""
Returns sequence, :samp:`m` say, of :obj:`int`
where :samp:`m[inter_r]` is the *peer rank* of :samp:`self.peer_comm`
which corresponds to the *inter-locale rank* :samp:`inter_r`
of :samp:`self.inter_locale_comm`.
:rtype: :samp:`None` or sequence of :obj:`int`
:return: Sequence of length :samp:`self.inter_locale_comm.size` on
ranks for which :samp:`self.have_valid_inter_locale_comm is True`, :samp:`None`
otherwise.
"""
m = None
if self.have_valid_inter_locale_comm:
m = \
_mpi.Group.Translate_ranks(
self.inter_locale_comm.group,
range(0, self.inter_locale_comm.group.size),
self.peer_comm.group
)
return m
@property
def peer_ranks_per_locale(self):
"""
A :obj:`numpy.ndarray` of shape :samp:`(self.num_locales, num_peer_ranks_per_locale)`
with :obj:`numpy.int64` elements.
The :samp:`self.peer_ranks_per_locale[inter_locale_rank]` sequence of elements
are the :attr:`peer_comm` ranks which make up locale associated
with :attr:`inter_locale_comm` rank :samp:`inter_locale_rank`.
"""
return self._peer_ranks_per_locale
@property
def this_locale_rank_info(self):
"""
A :obj:`ThisLocaleInfo` object. Indicates the :samp:`self.inter_locale_comm.rank`
and `self.peer_comm.rank` on processes for
which :samp:`self.have_valid_inter_locale_comm is True`.
Is :attr:`mpi4py.MPI.UNDEFINED` on processes
where :samp:`self.have_valid_inter_locale_comm is False`.
"""
if self.have_valid_inter_locale_comm:
i = ThisLocaleInfo(self.inter_locale_comm.rank, self.peer_comm.rank)
elif self.inter_locale_comm is None:
i = ThisLocaleInfo(0, 0)
else:
i = _mpi.UNDEFINED
return i
@property
def rank_logger(self):
"""
A :attr:`peer_comm` :obj:`logging.Logger`.
"""
return self._rank_logger
@property
def root_logger(self):
"""
A :attr:`peer_comm` :obj:`logging.Logger`.
"""
return self._root_logger
CartLocaleCommsInfo = \
_collections.namedtuple(
"CartLocaleCommsInfo",
LocaleCommsInfo._fields
+
(
"dims",
"cart_comm"
)
)
if (_sys.version_info[0] >= 3) and (_sys.version_info[1] >= 5):
CartLocaleCommsInfo.__doc__ = \
"""
Communicators associated with a cartesian topology locales.
"""
def get_cannonical_dims(num_locales, ndims=None, dims=None):
"""
Returns the block partition per axis for :samp:`{num_locales}` number
of total number of blocks.
:type num_locales: :obj:`int`
:param num_locales: Total number of blocks in the partition.
:type ndims: :obj:`int`
:param ndims: The number of dimensions of the block partitioning.
:type dims: sequence of :obj:`int`
:param dims: Constraint for the partitioning, positive values indicate
the number of blocks for the associated axis, zero or negative values
are chosen (replaced) so that :samp:`num_locales` total number of blocks
are formed.
:rtype: :obj:`numpy.ndarray`
:return: An array of :obj:`int` such that the :samp:`numpy.product` of the
returned array is equal to :samp:`{num_locales}`.
"""
if (ndims is None) and (dims is None):
raise ValueError("Must specify one of dims or ndims, got both ndims=None and dims=None.")
elif (ndims is not None) and (dims is not None) and (len(dims) != ndims):
raise ValueError(
"Length of dims (len(dims)=%s) not equal to ndims=%s." % (len(dims), ndims)
)
elif ndims is None:
ndims = len(dims)
if dims is None:
dims = _np.zeros((ndims,), dtype="int64")
dims = \
_array_split.split.calculate_num_slices_per_axis(
dims,
num_locales
)
return dims
[docs]def create_cart_locale_comms_info(
ndims=None,
dims=None,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None,
cart_comm=None
):
"""
Creates a :obj:`CartLocaleCommsInfo` associated with
the specified communicators. Collective over :samp:`{peer_comm}`.
"""
comms_info = \
get_locale_comms_info(
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)
dims = get_cannonical_dims(num_locales=comms_info.num_locales, ndims=ndims, dims=dims)
periods = _np.zeros((len(dims),), dtype="bool")
inter_locale_comm = comms_info.inter_locale_comm
if (inter_locale_comm != _mpi.COMM_NULL) and (cart_comm is None):
comms_info.rank_logger.debug("BEG: inter_locale_comm.Create_cart to create cart_comm.")
cart_comm = \
inter_locale_comm.Create_cart(
dims,
periods,
reorder=True
)
comms_info.rank_logger.debug("END: inter_locale_comm.Create_cart to create cart_comm.")
elif (inter_locale_comm == _mpi.COMM_NULL) and (cart_comm is None):
cart_comm = _mpi.COMM_NULL
if (
(cart_comm != _mpi.COMM_NULL)
and
(cart_comm.group.size != comms_info.num_locales)
):
raise ValueError(
"Got cart_comm.group.size (=%s) != self._num_locales (=%s)."
%
(cart_comm.group.size, comms_info.num_locales)
)
return \
CartLocaleCommsInfo(
peer_comm=comms_info.peer_comm,
intra_locale_comm=comms_info.intra_locale_comm,
inter_locale_comm=comms_info.inter_locale_comm,
num_locales=comms_info.num_locales,
peer_ranks_per_locale=comms_info.peer_ranks_per_locale,
rank_logger=comms_info.rank_logger,
root_logger=comms_info.root_logger,
dims=dims,
cart_comm=cart_comm
)
[docs]def get_cart_locale_comms_info(
ndims=None,
dims=None,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None,
cart_comm=None
):
"""
Finds or creates a :obj:`CartLocaleCommsInfo` associated with
the specified communicators. Collective over :samp:`{peer_comm}`.
"""
global _comms_info_cache
key = (ndims, dims, peer_comm, intra_locale_comm, inter_locale_comm, cart_comm)
comms_info = _comms_info_cache.find(key)
if comms_info is None:
comms_info = \
create_cart_locale_comms_info(
ndims,
dims,
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm,
cart_comm=cart_comm
)
_comms_info_cache.add(key, comms_info)
key = \
(
len(comms_info.dims),
comms_info.dims,
comms_info.peer_comm,
comms_info.intra_locale_comm,
comms_info.inter_locale_comm,
comms_info.cart_comm
)
if _comms_info_cache.find(key) is None:
_comms_info_cache.add(key, comms_info)
key = (None,) + key[1:]
if _comms_info_cache.find(key) is None:
_comms_info_cache.add(key, comms_info)
return comms_info
[docs]class CartLocaleComms(LocaleComms):
"""
Defines cartesian communication topology for locales.
In addition to the :obj:`LocaleComms` communicators, defines:
:attr:`cart_comm`
Typically this communicator is created using
the call :samp:`{inter_locale_comm}.Create_cart(...)`.
This communicator (and associated `mpi4py.MPI.Win` window) is used to exchange
data between locales.
Note that :attr:`inter_locale_comm` property over-rides
the :attr:`LocaleComms.inter_locale_comm` property to return the :attr:`cart_comm`
communicator.
"""
[docs] def __init__(
self,
ndims=None,
dims=None,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None,
cart_comm=None
):
"""
Initialises cartesian communicator for inter-locale data exchange.
Need to specify at least one of the :samp:`{ndims}` or :samp:`{dims}`.
to indicate the dimension of the cartesian partitioning.
:type ndims: :obj:`int`
:param ndims: Dimension of the cartesian partitioning, e.g. 1D, 2D, 3D, etc.
If :samp:`None`, :samp:`{ndims}=len({dims})`.
:type dims: sequence of :obj:`int`
:param dims: The number of partitions along each array axis, zero elements
are replaced with positive integers such
that :samp:`numpy.product({dims}) == {peer_comm}.size`.
If :samp:`None`, :samp:`{dims} = (0,)*{ndims}`.
:type peer_comm: :obj:`mpi4py.MPI.Comm`
:param peer_comm: The MPI processes which will have access
(via a :obj:`mpi4py.MPI.Win` object) to the distributed array.
If :samp:`None` uses :obj:`mpi4py.MPI.COMM_WORLD`.
:type intra_locale_comm: :obj:`mpi4py.MPI.Comm`
:param intra_locale_comm: The MPI communicator used to create a window which
can be used to allocate shared memory
via :meth:`mpi4py.MPI.Win.Allocate_shared`.
:type inter_locale_comm: :obj:`mpi4py.MPI.Comm`
:param inter_locale_comm: Inter-locale communicator used to exchange
data between different locales.
:type cart_comm: :obj:`mpi4py.MPI.Comm`
:param cart_comm: Cartesian topology inter-locale communicator used to exchange
data between different locales.
"""
comms_info = \
get_cart_locale_comms_info(
ndims=ndims,
dims=dims,
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm,
cart_comm=cart_comm
)
LocaleComms.__init__(
self,
comms_info=comms_info
)
self._dims = comms_info.dims
self._cart_comm = comms_info.cart_comm
@property
def cart_coord_to_cart_rank_map(self):
"""
A :obj:`dict` of :obj:`tuple`
cartesian coordinate (:meth:`mpi4py.MPI.CartComm.Get_coords`) keys
which map to the associated :attr:`cart_comm` peer_rank.
"""
d = None
if self.have_valid_cart_comm:
d = _np.zeros(self.cart_comm.dims, dtype="int64")
for cart_rank in range(self.cart_comm.size):
d[tuple(self.cart_comm.Get_coords(cart_rank))] = cart_rank
return d
@property
def dims(self):
"""
The number of partitions along each array axis. Defines
the cartesian topology over which an array is distributed.
"""
return self._dims
@property
def ndim(self):
"""
An :obj:`int` indicating the dimension of the cartesian topology.
"""
return self._dims.size
@property
def have_valid_cart_comm(self):
"""
Is :samp:`True` if this peer_rank has :samp:`{self}.cart_comm`
which is not :samp:`None` and is not :obj:`mpi4py.MPI.COMM_NULL`.
"""
return \
(
(self.cart_comm is not None)
and
(self.cart_comm != _mpi.COMM_NULL)
)
@property
def cart_comm(self):
"""
A :obj:`mpi4py.MPI.CartComm` communicator defining a cartesian topology of
MPI processes (typically one process per locale) used for inter-locale
exchange of array data.
"""
return self._cart_comm
@property
def inter_locale_comm(self):
"""
Overrides :attr:`LocaleComms.inter_locale_comm` to return :attr:`cart_comm`.
"""
return self.cart_comm
def shape_squeeze(shape):
"""
Returns sequence with the the :samp:`1` elements removed.
"""
return _np.asarray(shape)[_np.where(shape != 1)]
def reshape_comms_distribution(comms_distrib, new_globale_shape):
"""
"""
reshaped_comms_distrib = None
locale_comms = comms_distrib.locale_comms
distrib = comms_distrib.distribution
this_locale = comms_distrib.this_locale
globale_shape = _np.array(distrib.globale_extent.shape)
new_globale_shape = _np.array(new_globale_shape)
if (
(new_globale_shape.size == globale_shape.size)
and
_np.all(new_globale_shape == globale_shape)
):
reshaped_comms_distrib = \
CommsAndDistribution(locale_comms, _copy.deepcopy(distrib), this_locale)
else:
new_squeeze_shape = shape_squeeze(new_globale_shape)
org_squeeze_shape = shape_squeeze(globale_shape)
if len(new_squeeze_shape) == len(org_squeeze_shape):
if _np.all(_np.sort(org_squeeze_shape) == _np.sort(org_squeeze_shape)):
_np.all(new_squeeze_shape == org_squeeze_shape)
if new_globale_shape.size >= globale_shape.size:
idx = \
[
_np.nonzero(new_globale_shape == globale_shape[i])[0][0]
for i in range(len(globale_shape))
]
reshaped_comms_distrib = \
CommsAndDistribution(
locale_comms.expand_dims(
ndim=new_globale_shape.size,
orig_axis_new_pos=idx
),
distrib.expand_dims(ndim=new_globale_shape.size, orig_axis_new_pos=idx),
this_locale
)
else:
idx = [_np.nonzero(new_globale_shape[0] == globale_shape)[0][0], ]
for i in range(1, new_globale_shape.size):
idx.append(
_np.nonzero(new_globale_shape[i] == globale_shape[idx[-1]:])[0][0]
)
idx = _np.setdiff1d(idx, _np.arange(0, new_globale_shape.size))
reshaped_comms_distrib = \
CommsAndDistribution(
locale_comms.squeeze(axis=idx),
distrib.squeeze(axis=idx),
this_locale
)
return reshaped_comms_distrib
#: Hyper-block partition distribution type
DT_BLOCK = "block"
#: Hyper-slab partition distribution type
DT_SLAB = "slab"
#: Entire array repeated on each locale.
DT_CLONED = "cloned"
#: Entire array on single locale, no array elements on other locales.
DT_SINGLE_LOCALE = "single_locale"
#: List of value :samp:`distrib_type` values.
_valid_distrib_types = [DT_BLOCK, DT_SLAB, DT_CLONED, DT_SINGLE_LOCALE]
#: Node (NUMA) locale type
LT_NODE = "node"
#: Single process locale type
LT_PROCESS = "process"
#: List of value :samp:`locale_type` values.
_valid_locale_types = [LT_NODE, LT_PROCESS]
CommsAndDistribution = \
_collections.namedtuple("CommsAndDistribution", ["locale_comms", "distribution", "this_locale"])
if (_sys.version_info[0] >= 3) and (_sys.version_info[1] >= 5):
CommsAndDistribution.__doc__ = \
"""
A 3 element tuple :samp:`(locale_comms, distribution, this_locale)`
describing the apportionment of array elements over MPI processes.
"""
CommsAndDistribution.locale_comms.__doc__ = \
"""
A :obj:`LocaleComms` object containing communicators for exchanging
data between locales.
"""
CommsAndDistribution.distribution.__doc__ = \
"""
A :obj:`mpi_array.distribution.Distribution` object describing the
apportionment of array elements over locales.
"""
CommsAndDistribution.this_locale.__doc__ = \
"""
A :obj:`ThisLocaleInfo` with rank pair pertinent for this locale.
"""
[docs]def create_locale_comms(
locale_type=None,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None
):
"""
Factory function for creating a :obj:`LocaleComms` object.
:type locale_type: :obj:`str`
:param locale_type: One of :attr:`mpi_array.comms.DT_PROCESS`
or :attr:`mpi_array.comms.DT_NODE`.
:type peer_comm: :obj:`mpi4py.MPI.Comm`
:param peer_comm: See :obj:`LocaleComms`.
:type intra_locale_comm: :obj:`mpi4py.MPI.Comm`
:param intra_locale_comm: See :obj:`LocaleComms`.
:type inter_locale_comm: :obj:`mpi4py.MPI.Comm`
:param inter_locale_comm: See :obj:`LocaleComms`.
:rtype: :obj:`LocaleComms`
:return: A :obj:`LocaleComms` object.
"""
if locale_type is None:
locale_type = LT_NODE
if locale_type.lower() == LT_PROCESS:
if (intra_locale_comm is not None) and (intra_locale_comm.size > 1):
raise ValueError(
"Got locale_type=%s, but intra_locale_comm.size=%s"
%
(locale_type, intra_locale_comm.size)
)
elif intra_locale_comm is None:
intra_locale_comm = _mpi.COMM_SELF
locale_comms = \
LocaleComms(
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)
inter_locale_rank_to_peer_rank = locale_comms.inter_locale_rank_to_peer_rank_map
this_locale = locale_comms.this_locale_rank_info
# Broadcast on intra_locale_comm to get peer_rank mapping to all
# peer_comm ranks
inter_locale_rank_to_peer_rank, this_locale = \
locale_comms.intra_locale_comm.bcast(
(inter_locale_rank_to_peer_rank, this_locale),
0
)
locale_comms.rank_logger.debug(
"inter_locale_rank_to_peer_rank=%s",
inter_locale_rank_to_peer_rank
)
return locale_comms, inter_locale_rank_to_peer_rank, this_locale
[docs]def create_cloned_distribution(
shape,
locale_type=None,
halo=0,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None
):
"""
Factory function for creating :obj:`mpi_array.distrbution.ClonedDistribution`
distribution and associated :obj:`LocaleComms`.
:rtype: :obj:`CommsAndDistribution`
:return: A :obj:`CommsAndDistribution` pair.
"""
locale_comms, inter_locale_rank_to_peer_rank, this_locale =\
create_locale_comms(
locale_type=locale_type,
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)
cloned_distrib = \
ClonedDistribution(
globale_extent=shape,
inter_locale_rank_to_peer_rank=inter_locale_rank_to_peer_rank,
num_locales=locale_comms.num_locales,
halo=halo
)
cloned_distrib.peer_ranks_per_locale = locale_comms.peer_ranks_per_locale
return CommsAndDistribution(locale_comms, cloned_distrib, this_locale)
[docs]def create_single_locale_distribution(
shape,
locale_type=None,
halo=0,
inter_locale_rank=0,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None
):
"""
Factory function for creating :obj:`mpi_array.distrbution.SingleLocaleDistribution`
distribution and associated :obj:`LocaleComms`.
:rtype: :obj:`CommsAndDistribution`
:return: A :obj:`CommsAndDistribution` pair.
"""
locale_comms, inter_locale_rank_to_peer_rank, this_locale =\
create_locale_comms(
locale_type=locale_type,
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)
single_locale_distrib = \
SingleLocaleDistribution(
globale_extent=shape,
num_locales=locale_comms.num_locales,
inter_locale_rank=inter_locale_rank,
inter_locale_rank_to_peer_rank=inter_locale_rank_to_peer_rank,
halo=halo
)
single_locale_distrib.peer_ranks_per_locale = locale_comms.peer_ranks_per_locale
return CommsAndDistribution(locale_comms, single_locale_distrib, this_locale)
[docs]def create_block_distribution(
shape,
locale_type=None,
dims=None,
halo=0,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None,
cart_comm=None
):
"""
Factory function for creating :obj:`mpi_array.distrbution.BlockPartition`
distribution and associated :obj:`CartLocaleComms`.
:type shape: sequence of :obj:`int`
:param shape: Shape of the globale array.
:type locale_type: :obj:`str`
:param locale_type: One of :attr:`mpi_array.comms.DT_PROCESS`
or :attr:`mpi_array.comms.DT_NODE`. Defines locales.
:type dims: sequence of :obj:`int`
:param dims: Defines the partitioning of the globale array axes.
:type peer_comm: :obj:`mpi4py.MPI.Comm`
:param peer_comm: See :obj:`LocaleComms`.
:type intra_locale_comm: :obj:`mpi4py.MPI.Comm`
:param intra_locale_comm: See :obj:`LocaleComms`.
:type inter_locale_comm: :obj:`mpi4py.MPI.Comm`
:param inter_locale_comm: See :obj:`LocaleComms`.
:type cart_comm: :obj:`mpi4py.MPI.Comm`
:param cart_comm: See :obj:`CartLocaleComms`.
:rtype: :obj:`CommsAndDistribution`
:return: A :obj:`CommsAndDistribution` :obj:`collections.namedtuple`.
"""
if dims is None:
dims = _np.zeros_like(shape, dtype="int64")
locale_comms, inter_locale_rank_to_peer_rank, this_locale = \
create_locale_comms(
locale_type=locale_type,
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)
cart_locale_comms = \
CartLocaleComms(
dims=dims,
peer_comm=locale_comms.peer_comm,
intra_locale_comm=locale_comms.intra_locale_comm,
inter_locale_comm=locale_comms.inter_locale_comm,
cart_comm=cart_comm
)
cart_coord_to_cart_rank = cart_locale_comms.cart_coord_to_cart_rank_map
cart_rank_to_peer_rank = cart_locale_comms.inter_locale_rank_to_peer_rank_map
this_locale = cart_locale_comms.this_locale_rank_info
# Broadcast on intra_locale_comm to get peer_rank mapping to all
# peer_comm ranks
cart_coord_to_cart_rank, cart_rank_to_peer_rank, this_locale = \
cart_locale_comms.intra_locale_comm.bcast(
(cart_coord_to_cart_rank, cart_rank_to_peer_rank, this_locale),
0
)
cart_locale_comms.rank_logger.debug("cart_rank_to_peer_rank=%s", cart_rank_to_peer_rank)
block_distrib = \
BlockPartition(
globale_extent=shape,
dims=cart_locale_comms.dims,
cart_coord_to_cart_rank=cart_coord_to_cart_rank,
inter_locale_rank_to_peer_rank=cart_rank_to_peer_rank,
halo=halo
)
block_distrib.peer_ranks_per_locale = cart_locale_comms.peer_ranks_per_locale
return CommsAndDistribution(cart_locale_comms, block_distrib, this_locale)
def check_distrib_type(distrib_type):
"""
Checks :samp:`{distrib_type}` occurs in :samp:`_valid_distrib_types`.
:type distrib_type: :obj:`str`
:param distrib_type: String to check.
:raises ValueError: If :samp:`{distrib_type}` is not a valid *distribution type* specifier.
"""
if distrib_type.lower() not in _valid_distrib_types:
raise ValueError(
"Invalid distrib_type=%s, valid types are: %s."
%
(
distrib_type,
", ".join(_valid_distrib_types)
)
)
def check_locale_type(locale_type):
"""
Checks :samp:`{locale_type}` occurs in :samp:`_valid_locale_types`.
:type locale_type: :obj:`str`
:param locale_type: String to check.
:raises ValueError: If :samp:`{locale_type}` is not a valid *locale type* specifier.
"""
if locale_type.lower() not in _valid_locale_types:
raise ValueError(
"Invalid locale_type=%s, valid types are: %s."
%
(
locale_type,
", ".join(_valid_locale_types)
)
)
[docs]def create_distribution(shape, distrib_type=None, locale_type=LT_NODE, **kwargs):
"""
Factory function for creating :obj:`mpi_array.distribution.Distribution`
and associated :obj:`LocaleComms`.
:type shape: sequence of :obj:`int`
:param shape: Shape of the globale array.
:type distrib_type: :obj:`str`
:param distrib_type: One
of :attr:`mpi_array.comms.DT_BLOCK` or :attr:`mpi_array.comms.DT_SLAB`
or :attr:`mpi_array.comms.DT_CLONED` or :attr:`mpi_array.comms.DT_SINGLE_LOCALE`.
Defines how the globale array is dstributed over locales.
If :samp:`None` defaults to :attr:`mpi_array.comms.DT_BLOCK`
if :samp:`numpy.product(shape) > 0` otherwise :attr:`mpi_array.comms.DT_CLONED`.
:type locale_type: :obj:`str`
:param locale_type: One of :attr:`mpi_array.comms.DT_PROCESS`
or :attr:`mpi_array.comms.DT_NODE`. Defines locales.
:type dims: sequence of :obj:`int`
:param dims: Only relevant when :samp:`{distrib_type} == DT_BLOCK`.
Defines the partitioning of the globale array axes.
:type axis: :obj:`int`
:param axis: Only relevant when :samp:`{distrib_type} == DT_SLAB`.
Indicates the single axis of the globale array partitioned into slabs.
:type peer_comm: :obj:`mpi4py.MPI.Comm`
:param peer_comm: See :obj:`LocaleComms`.
:type intra_locale_comm: :obj:`mpi4py.MPI.Comm`
:param intra_locale_comm: See :obj:`LocaleComms`.
:type inter_locale_comm: :obj:`mpi4py.MPI.Comm`
:param inter_locale_comm: See :obj:`LocaleComms`.
:type cart_comm: :obj:`mpi4py.MPI.Comm`
:param cart_comm: Only relevant when :samp:`{distrib_type} == DT_BLOCK`
or :samp:`{distrib_type} == DT_SLAB`. See :obj:`CartLocaleComms`.
:rtype: :obj:`CommsAndDistribution`
:return: A :obj:`CommsAndDistribution` :obj:`collections.namedtuple`.
See also:
:func:`create_block_distribution`
:func:`create_cloned_distribution`
:func:`create_single_locale_distribution`
"""
if distrib_type is None:
if _np.product(shape) > 0:
distrib_type = DT_BLOCK
else:
distrib_type = DT_CLONED
check_distrib_type(distrib_type)
check_locale_type(locale_type)
if distrib_type.lower() == DT_BLOCK:
comms_and_distrib = create_block_distribution(shape, locale_type, **kwargs)
elif distrib_type.lower() == DT_SLAB:
if "axis" in kwargs.keys():
axis = kwargs["axis"]
del kwargs["axis"]
else:
axis = 0
dims = _np.ones_like(shape, dtype="int64")
dims[axis] = 0
comms_and_distrib = create_block_distribution(shape, locale_type, dims=dims, **kwargs)
elif distrib_type.lower() == DT_CLONED:
comms_and_distrib = create_cloned_distribution(shape, locale_type, **kwargs)
elif distrib_type.lower() == DT_SINGLE_LOCALE:
comms_and_distrib = create_single_locale_distribution(shape, locale_type, **kwargs)
return comms_and_distrib
__all__ = [s for s in dir() if not s.startswith('_')]