# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import decimal
import re
from wolframclient.exception import WolframParserException
from wolframclient.language.expression import WLFunction, WLSymbol
from wolframclient.serializers.wxfencoder import constants
from wolframclient.utils.api import numpy
__all__ = ['WXFConsumer', 'WXFConsumerNumpy']
[docs]class WXFConsumer(object):
"""Map WXF types to Python object generating functions.
This class exposes a comprehensive list of methods consuming WXF types.
Subclasses can override these members to implement custom parsing logic.
Example implementing a consumer that maps any function with head
:wl:`DirectedInfinity` to float('inf')::
class ExampleConsumer(WXFConsumer):
Infinity = wl.DirectedInfinity
def build_function(self, head, arg_list, **kwargs):
if head == self.Infinity:
return float('inf')
else:
super().build_function(head, args_list, **kwargs)
Test the new consumer::
>>> wxf = export({'-inf': wl.DirectedInfinity(-1), '+inf': wl.DirectedInfinity(1)}, target_format='wxf')
>>> binary_deserialize(wxf, consumer=ExampleConsumer())
{'-inf': inf, '+inf': inf}
Compare with default result::
>>> binary_deserialize(wxf)
{'-inf': DirectedInfinity[-1], '+inf': DirectedInfinity[1]}
Once initialized, the entry point of a consumer is the method
:func:`~wolframclient.deserializers.wxf.wxfconsumer.WXFConsumer.next_expression`.
It takes a token generator and returns a Python object. This method is particularly
useful when building nested expressions, e.g:
:func:`~wolframclient.deserializers.wxf.wxfconsumer.WXFConsumer.build_function`,
:func:`~wolframclient.deserializers.wxf.wxfconsumer.WXFConsumer.consume_association`, etc,
in order to fetch sub-expressions.
"""
_mapping = {
constants.WXF_CONSTANTS.Function: 'consume_function',
constants.WXF_CONSTANTS.Symbol: 'consume_symbol',
constants.WXF_CONSTANTS.String: 'consume_string',
constants.WXF_CONSTANTS.BinaryString: 'consume_binary_string',
constants.WXF_CONSTANTS.Integer8: 'consume_integer8',
constants.WXF_CONSTANTS.Integer16: 'consume_integer16',
constants.WXF_CONSTANTS.Integer32: 'consume_integer32',
constants.WXF_CONSTANTS.Integer64: 'consume_integer64',
constants.WXF_CONSTANTS.Real64: 'consume_real64',
constants.WXF_CONSTANTS.BigInteger: 'consume_bigint',
constants.WXF_CONSTANTS.BigReal: 'consume_bigreal',
constants.WXF_CONSTANTS.PackedArray: 'consume_packed_array',
constants.WXF_CONSTANTS.NumericArray: 'consume_numeric_array',
constants.WXF_CONSTANTS.Association: 'consume_association',
constants.WXF_CONSTANTS.Rule: 'consume_rule',
constants.WXF_CONSTANTS.RuleDelayed: 'consume_rule_delayed'
}
[docs] def next_expression(self, tokens, **kwargs):
"""Deserialize the next expression starting at the next token yield by `tokens`."""
token = next(tokens)
consumer = self._consumer_from_type(token.wxf_type)
return consumer(token, tokens, **kwargs)
def _consumer_from_type(self, wxf_type):
try:
func = self._mapping[wxf_type]
except KeyError:
raise WolframParserException(
'Class %s does not implement any consumer method for WXF token %s'
% (self.__class__.__name__, wxf_type))
return getattr(self, func)
_LIST = WLSymbol('List')
[docs] def consume_function(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *function*.
Return a :class:`list` if the head is symbol `List`, otherwise returns the result of :func:`~wolframclient.deserializers.wxf.wxfconsumer.WXFConsumer.build_function`
applied to the head and arguments.
Usually custom parsing rules target Functions, but not List. To do so, it is recommended to override
:func:`~wolframclient.deserializers.wxf.wxfconsumer.WXFConsumer.build_function`.
"""
head = self.next_expression(tokens, **kwargs)
args = []
for i in range(current_token.length):
args.append(self.next_expression(tokens, **kwargs))
if head == self._LIST:
return args
else:
return self.build_function(head, args, **kwargs)
[docs] def build_function(self, head, arg_list, **kwargs):
"""Create a Python object from head and args.
This function can be conveniently overloaded to create specific Python objects
from various heads. e.g: DateObject, Complex, etc.
"""
return WLFunction(head, *arg_list)
[docs] def consume_association(self,
current_token,
tokens,
dict_class=dict,
**kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *association*.
By default, return a :class:`dict` made from the rules.
The named option `dict_class` can be set to any type in which case an instance of
:class:`dict_class` is returned.
"""
return dict_class(
self.next_expression(tokens, **kwargs)
for i in range(current_token.length))
[docs] def consume_rule(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *rule* as a tuple"""
return (self.next_expression(tokens, **kwargs),
self.next_expression(tokens, **kwargs))
[docs] def consume_rule_delayed(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *rule* as a tuple"""
return (self.next_expression(tokens, **kwargs),
self.next_expression(tokens, **kwargs))
BUILTIN_SYMBOL = {
'True': True,
'False': False,
'Null': None,
'Indeterminate': float('NaN')
}
""" See documentation of :func:`~wolframclient.serializers.encoders.builtin.encode_none` for more information
about the mapping of None and Null. """
[docs] def consume_symbol(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *symbol* as a :class:`~wolframclient.language.expression.WLSymbol`"""
try:
return self.BUILTIN_SYMBOL[current_token.data]
except KeyError:
return WLSymbol(current_token.data)
[docs] def consume_bigint(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *big integer* as a :class:`int`."""
try:
return int(current_token.data)
except ValueError:
raise WolframParserException(
'Invalid big integer value: %s' % current_token.data)
BIGREAL_RE = re.compile(r'([^`]+)(`[0-9.]+){0,1}(\*\^[0-9]+){0,1}')
[docs] def consume_bigreal(self, current_token, tokens, **kwargs):
"""Parse a WXF big real as a WXF serializable big real.
There is not such thing as a big real, in Wolfram Language notation, in Python. This
wrapper ensures round tripping of big reals without the need of `ToExpression`.
Introducing `ToExpression` would imply to marshall the big real data to avoid malicious
code from being introduced in place of an actual real.
"""
match = self.BIGREAL_RE.match(current_token.data)
if match:
num, prec, exp = match.groups()
if exp:
return decimal.Decimal('%se%s' % (num, exp[2:]))
return decimal.Decimal(num)
raise WolframParserException(
'Invalid big real value: %s' % current_token.data)
[docs] def consume_string(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *string* as a string of unicode utf8 encoded."""
return current_token.data
[docs] def consume_binary_string(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *binary string* as a string of bytes."""
return current_token.data
[docs] def consume_integer8(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *integer* as a :class:`int`."""
return current_token.data
[docs] def consume_integer16(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *integer* as a :class:`int`."""
return current_token.data
[docs] def consume_integer32(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *integer* as a :class:`int`."""
return current_token.data
[docs] def consume_integer64(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *integer* as a :class:`int`."""
return current_token.data
[docs] def consume_real64(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *real* as a :class:`float`."""
return current_token.data
[docs] def consume_numeric_array(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *raw array*.
This method return :class:`list`, and made the assumption that system is little endian.
"""
return self._array_to_list(current_token, tokens)
[docs] def consume_packed_array(self, current_token, tokens, **kwargs):
"""Consume a :class:`~wolframclient.deserializers.wxf.wxfparser.WXFToken` of type *packed array*.
This method return :class:`list`, and made the assumption that system is little endian.
"""
return self._array_to_list(current_token, tokens)
# memoryview.cast was introduced in Python 3.3.
if hasattr(memoryview, 'cast'):
unpack_mapping = {
constants.ARRAY_TYPES.Integer8: 'b',
constants.ARRAY_TYPES.UnsignedInteger8: 'B',
constants.ARRAY_TYPES.Integer16: 'h',
constants.ARRAY_TYPES.UnsignedInteger16: 'H',
constants.ARRAY_TYPES.Integer32: 'i',
constants.ARRAY_TYPES.UnsignedInteger32: 'I',
constants.ARRAY_TYPES.Integer64: 'q',
constants.ARRAY_TYPES.UnsignedInteger64: 'Q',
constants.ARRAY_TYPES.Real32: 'f',
constants.ARRAY_TYPES.Real64: 'd',
constants.ARRAY_TYPES.ComplexReal32: 'f',
constants.ARRAY_TYPES.ComplexReal64: 'd',
}
def _to_complex(self, array, max_depth, curr_depth):
# recursivelly traverse the array until the last (real) dimension is reached
# it correspond to an array of (fake) array of two elements (real and im parts).
if curr_depth < max_depth - 1:
for sub in array:
self._to_complex(sub, max_depth, curr_depth + 1)
return
# iterate over the pairs
for index, complex_pair in enumerate(array):
array[index] = complex(*complex_pair)
def _array_to_list(self, current_token, tokens):
view = memoryview(current_token.data)
if current_token.array_type == constants.ARRAY_TYPES.ComplexReal32 or current_token.array_type == constants.ARRAY_TYPES.ComplexReal64:
dimensions = list(current_token.dimensions)
# In the given array, 2 reals give one complex,
# adding one last dimension to represent it.
dimensions.append(2)
as_list = view.cast(
self.unpack_mapping[current_token.array_type],
shape=dimensions).tolist()
self._to_complex(as_list, len(current_token.dimensions), 0)
return as_list
else:
return view.cast(
self.unpack_mapping[current_token.array_type],
shape=current_token.dimensions).tolist()
else:
unpack_mapping = {
constants.ARRAY_TYPES.Integer8: constants.StructInt8LE,
constants.ARRAY_TYPES.UnsignedInteger8: constants.StructUInt8LE,
constants.ARRAY_TYPES.Integer16: constants.StructInt16LE,
constants.ARRAY_TYPES.UnsignedInteger16: constants.StructUInt16LE,
constants.ARRAY_TYPES.Integer32: constants.StructInt32LE,
constants.ARRAY_TYPES.UnsignedInteger32: constants.StructUInt32LE,
constants.ARRAY_TYPES.Integer64: constants.StructInt64LE,
constants.ARRAY_TYPES.UnsignedInteger64: constants.StructUInt64LE,
constants.ARRAY_TYPES.Real32: constants.StructFloat,
constants.ARRAY_TYPES.Real64: constants.StructDouble,
constants.ARRAY_TYPES.ComplexReal32: constants.StructFloat,
constants.ARRAY_TYPES.ComplexReal64: constants.StructDouble,
}
def _array_to_list(self, current_token, tokens):
value, _ = self._build_array_from_bytes(
current_token.data, 0, current_token.array_type,
current_token.dimensions, 0)
return value
def _build_array_from_bytes(self, data, offset, array_type, dimensions,
current_dim):
new_array = list()
if current_dim < len(dimensions) - 1:
for i in range(dimensions[current_dim]):
new_elem, offset = self._build_array_from_bytes(
data, offset, array_type, dimensions, current_dim + 1)
new_array.append(new_elem)
else:
struct = self.unpack_mapping[array_type]
# complex values, need two reals for each.
if array_type == constants.ARRAY_TYPES.ComplexReal32 or array_type == constants.ARRAY_TYPES.ComplexReal64:
for i in range(dimensions[-1]):
# this returns a tuple.
re = struct.unpack_from(data, offset=offset)
offset = offset + struct.size
im = struct.unpack_from(data, offset=offset)
offset = offset + struct.size
new_array.append(complex(re[0], im[0]))
else:
for i in range(dimensions[-1]):
# this returns a tuple.
value = struct.unpack_from(data, offset=offset)
offset = offset + struct.size
new_array.append(value[0])
return new_array, offset
[docs]class WXFConsumerNumpy(WXFConsumer):
""" A WXF consumer that maps WXF array types to NumPy arrays. """
[docs] def consume_array(self, current_token, tokens, **kwargs):
arr = numpy.frombuffer(
current_token.data,
dtype=WXFConsumerNumpy.WXF_TYPE_TO_DTYPE[current_token.array_type])
arr = numpy.reshape(arr, tuple(current_token.dimensions))
return arr
"""Build a numpy array from a PackedArray."""
consume_packed_array = consume_array
"""Build a numpy array from a NumericArray."""
consume_numeric_array = consume_array
WXF_TYPE_TO_DTYPE = {
constants.ARRAY_TYPES.Integer8: 'int8',
constants.ARRAY_TYPES.Integer16: 'int16',
constants.ARRAY_TYPES.Integer32: 'int32',
constants.ARRAY_TYPES.Integer64: 'int64',
constants.ARRAY_TYPES.UnsignedInteger8: 'uint8',
constants.ARRAY_TYPES.UnsignedInteger16: 'uint16',
constants.ARRAY_TYPES.UnsignedInteger32: 'uint32',
constants.ARRAY_TYPES.UnsignedInteger64: 'uint64',
constants.ARRAY_TYPES.Real32: 'float32',
constants.ARRAY_TYPES.Real64: 'float64',
constants.ARRAY_TYPES.ComplexReal32: 'complex64',
constants.ARRAY_TYPES.ComplexReal64: 'complex128',
}