mpi4py.util.pkl5

New in version 3.1.0.

pickle protocol 5 (see PEP 574) introduced support for out-of-band buffers, allowing for more efficient handling of certain object types with large memory footprints.

MPI for Python uses the traditional in-band handling of buffers. This approach is appropriate for communicating non-buffer Python objects, or buffer-like objects with small memory footprints. For point-to-point communication, in-band buffer handling allows for the communication of a pickled stream with a single MPI message, at the expense of additional CPU and memory overhead in the pickling and unpickling steps.

The mpi4py.util.pkl5 module provides communicator wrapper classes reimplementing pickle-based point-to-point communication methods using pickle protocol 5. Handling out-of-band buffers necessarily involve multiple MPI messages, thus increasing latency and hurting performance in case of small size data. However, in case of large size data, the zero-copy savings of out-of-band buffer handling more than offset the extra latency costs. Additionally, these wrapper methods overcome the infamous 2 GiB message count limit (MPI-1 to MPI-3).

Note

Support for pickle protocol 5 is available in the pickle module within the Python standard library since Python 3.8. Previous Python 3 releases can use the pickle5 backport, which is available on PyPI and can be installed with:

python -m pip install pickle5

The pickle5 backport is not available for Python 2. Using the mpi4py.util.pkl5 module on a Python 2 runtime provides no benefits and may hurt communication performance.

class mpi4py.util.pkl5.Request

Custom request class for nonblocking communications.

Note

Request is not a subclass of MPI.Request

Free()None
cancel()None
get_status(status: Optional[MPI.Status] = None)bool
test(status: Optional[MPI.Status] = None)Tuple[bool, Optional[Any]]
wait(status: Optional[MPI.Status] = None)Any
classmethod testall(requests: Sequence[Request], statuses: Optional[List[MPI.Status]] = None)Tuple[bool, Optional[List[Any]]]
classmethod waitall(requests: Sequence[Request], statuses: Optional[List[MPI.Status]] = None)List[Any]
class mpi4py.util.pkl5.Message

Custom message class for matching probes.

Note

Message is not a subclass of MPI.Message

recv(status: Optional[MPI.Status] = None)Any:
irecv()Request
classmethod probe(comm: Comm, source: int = MPI.ANY_SOURCE, tag: int = MPI.ANY_TAG, status: Optional[MPI.Status] = None)Message
classmethod iprobe(comm: Comm, source: int = MPI.ANY_SOURCE, tag: int = MPI.ANY_TAG, status: Optional[MPI.Status] = None)Optional[Message]:
class mpi4py.util.pkl5.Comm(comm: MPI.Comm = MPI.COMM_NULL)

Base communicator wrapper class.

send(obj: Any, dest: int, tag: int = 0)None
bsend(obj: Any, dest: int, tag: int = 0)None
ssend(obj: Any, dest: int, tag: int = 0)None
isend(obj: Any, dest: int, tag: int = 0)Request
ibsend(obj: Any, dest: int, tag: int = 0)Request
issend(obj: Any, dest: int, tag: int = 0)Request
recv(buf: Optional[Buffer] = None, source: int = MPI.ANY_SOURCE, tag: int = MPI.ANY_TAG, status: Optional[MPI.Status] = None)Any
irecv(buf: Optional[Buffer] = None, source: int = MPI.ANY_SOURCE, tag: int = MPI.ANY_TAG)Request

Warning

This method cannot be supported reliably and raises RuntimeError.

sendrecv(sendobj: Any, dest: int, sendtag: int = 0, recvbuf: Optional[Buffer] = None, source: int = MPI.ANY_SOURCE, recvtag: int = MPI.ANY_TAG, status: Optional[MPI.Status] = None)Any
mprobe(source: int = MPI.ANY_SOURCE, tag: int = MPI.ANY_TAG, status: Optional[MPI.Status] = None)Message
improbe(source: int = MPI.ANY_SOURCE, tag: int = MPI.ANY_TAG, status: Optional[MPI.Status] = None)Optional[Message]
bcast(self, obj: Any, root: int = 0)Any
class mpi4py.util.pkl5.Intracomm(comm: MPI.Intracomm = MPI.COMM_NULL)

Intracommunicator wrapper class.

class mpi4py.util.pkl5.Intercomm(comm: MPI.Intercomm = MPI.COMM_NULL)

Intercommunicator wrapper class.

Examples

test-pkl5-1.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import numpy as np
from mpi4py import MPI
from mpi4py.util import pkl5

comm = pkl5.Intracomm(MPI.COMM_WORLD)  # comm wrapper
size = comm.Get_size()
rank = comm.Get_rank()
dst = (rank + 1) % size
src = (rank - 1) % size

sobj = np.full(1024**3, rank, dtype='i4')  # > 4 GiB
sreq = comm.isend(sobj, dst, tag=42)
robj = comm.recv (None, src, tag=42)
sreq.Free()

assert np.min(robj) == src
assert np.max(robj) == src
test-pkl5-2.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import numpy as np
from mpi4py import MPI
from mpi4py.util import pkl5

comm = pkl5.Intracomm(MPI.COMM_WORLD)  # comm wrapper
size = comm.Get_size()
rank = comm.Get_rank()
dst = (rank + 1) % size
src = (rank - 1) % size

sobj = np.full(1024**3, rank, dtype='i4')  # > 4 GiB
sreq = comm.isend(sobj, dst, tag=42)

status = MPI.Status()
rmsg = comm.mprobe(status=status)
assert status.Get_source() == src
assert status.Get_tag() == 42
rreq = rmsg.irecv()
robj = rreq.wait()

sreq.Free()
assert np.max(robj) == src
assert np.min(robj) == src