-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathzmqnumpy.py
141 lines (129 loc) · 5.55 KB
/
zmqnumpy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright (c) 2012 Marco Bartolini, marco.bartolini@gmail.com
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
"""
This module implements a series of functions used to exchange
numpy ndarrays between U{zeromq<http://www.zeromq.org>} sockets.
Serializtion of numpy arrays happens using the numpy.ndarray.tostring method
which preserves portability to standard C binary format,
enabling data exchange with different programming languages.
A very simple protocol is defined in order to exchange array data, the
multipart messages will be composed of:
1. identifier string name
2. the numpy array element type (dtype) in its string representation
3. numpy array shape encoded as a binary numpy.int32 array
4. the array data encoded as string using numpy.ndarray.tostring()
This protocol guarantees that numpy array can be carried around and
recostructed uniquely without errors on both ends of a connected pair enabling
an efficient interchange of data between processes and nodes.
@author: Marco Bartolini
@contact: marco.bartolini@gmail.com
@version: 0.1
"""
import numpy
import zmq
import functools
import uuid
def array_to_msg(nparray):
"""
Convert a numpy ndarray to its multipart zeromq message representation.
The return list is composed of:
0. The string representation of the array element type, i.e. 'float32'
1. The binary string representation of the shape of the array converted to a numpy array with dtype int32
2. The binary string representation of the array
These informations together can be used from the receiver code to recreate
uniquely the original array.
@param nparray: A numpy ndarray
@type nparray: numpy.ndarray
@rtype: list
@return: [dtype, shape, array]
"""
_shape = numpy.array(nparray.shape, dtype=numpy.int32)
return [nparray.dtype.name.encode(),
_shape.tostring(),
nparray.tostring()]
def msg_to_array(msg):
"""
reverse the array_to_message function in order to recover the proper
serialization of the array.
@param msg: the array representation in a list as serizlized by
array_to_msg
@return: the numpy array
"""
_dtype_name = msg[0].decode()
_shape = numpy.fromstring(msg[1], numpy.int32)
_array = numpy.fromstring(msg[2], _dtype_name)
return (_dtype_name, _shape, _array.reshape(tuple(_shape)))
def sender_msg_to_array(msg):
"""
Parse a list argument as returned by L{array_to_msg} function of this
module, and returns the numpy array contained in the message body.
@param msg: a list as returned by L{array_to_msg} function
@rtype: numpy.ndarray
@return: The numpy array contained in the message
"""
[_dtype, _shape, _bin_msg] = msg_to_array(msg[2:])
_uuid = uuid.UUID(bytes=msg[0])
_data_name = msg[1].decode()
return (_uuid, _data_name, _dtype, _shape, _bin_msg)
def numpy_array_sender(name, endpoint, sender_id="", socket_type=zmq.PUSH):
"""
Decorator Factory
The decorated function will have to return a numpy array, while the
decorator will create a zmq socket of the specified socket type connected
to the specified endpoint.
Each time the function is called the numpy array will be sent over the
instantiated transport after being converted to a multipart message using
L{array_to_msg} function. The multipart message is prepended with a UUID
and the given name as the first two elements.
#TODO: Would it be good to add the possibility of transimitting arbitrary
metadata? --- Marco Bartolini 27/04/2012
Usage example::
import zmq
import zmqnumpy
import numpy
@zmqnumpy.numpy_array_sender(\"mysender\", \"tcp://127.0.0.1:8765\")
def random_array_generator(min, max, width):
return numpy.random.randint(min, max, width)
@type name: string
@param name: the label of the data stream
@type endpoint: string
@param endpoint: a zmq endpoint made as \"protocol://host:port\"
@param sender_id: sender identifier, if not given a uuid will be generated
automatically
@param socket_type: a zmq socket type such as zmq.PUSH or zmq.PUB
"""
_context = zmq.Context.instance()
_socket = _context.socket(socket_type)
_socket.connect(endpoint)
if not sender_id:
_uuid = uuid.uuid4().bytes
else:
_uuid = sender_id
def wrapper(fn):
@functools.wraps(fn)
def wrapped(*args, **kwargs):
_data = fn(*args, **kwargs)
_socket.send_multipart([_uuid, name.encode()] + array_to_msg(_data))
return wrapped
return wrapper