Source code for wolframclient.serializers.wxfencoder.utils

# -*- coding: utf-8 -*-

from __future__ import absolute_import, print_function, unicode_literals

from wolframclient.serializers.wxfencoder.constants import (
    ARRAY_TYPES, VALID_PACKED_ARRAY_TYPES, WXF_CONSTANTS, StructDouble,
    StructInt8LE, StructInt16LE, StructInt32LE, StructInt64LE)
from wolframclient.utils import six

if six.JYTHON:
    import jarray


[docs]def write_varint(int_value, stream): """Serialize `int_value` into varint bytes and write them to `stream`, return the stream. """ stream.write(varint_bytes(int_value))
[docs]def varint_bytes(int_value): """Serialize `int_value` into varint bytes and return them as a byetarray.""" buf = bytearray(9) if int_value < 0: raise TypeError('Negative values cannot be encoded as varint.') count = 0 while True: next = int_value & 0x7f int_value >>= 7 if int_value: buf[count] = next | 0x80 count += 1 else: buf[count] = next count += 1 break return buf[:count]
_exceptions = { 0: (WXF_CONSTANTS.Integer8, 1), -(1 << 7): (WXF_CONSTANTS.Integer8, 1), -(1 << 15): (WXF_CONSTANTS.Integer16, 2), -(1 << 31): (WXF_CONSTANTS.Integer32, 4), -(1 << 63): (WXF_CONSTANTS.Integer64, 8), } _size = dict((j, (WXF_CONSTANTS['Integer%i' % ih], ih // 8)) for il, ih in ((1, 8), (9, 16), (17, 32), (33, 64)) for j in range(il, ih + 1))
[docs]def integer_size(value): try: return _exceptions.get(value, None) or _size[value.bit_length() + 1] except KeyError: raise ValueError('Value %i is not a machine-sized integer.' % value)
_packing = { 1: StructInt8LE, 2: StructInt16LE, 4: StructInt32LE, 8: StructInt64LE } if six.JYTHON: def integer_to_bytes(value, int_size): buffer = jarray.zeros(8, 'c') _packing.get(int_size).pack_into(buffer, 0, value) return buffer[:int_size].tostring() elif six.PY2: def integer_to_bytes(value, int_size): buffer = bytearray(8) _packing.get(int_size).pack_into(buffer, 0, value) return buffer[:int_size] else:
[docs] def integer_to_bytes(value, int_size): return value.to_bytes(int_size, byteorder='little', signed=True)
if six.JYTHON: def float_to_bytes(value): buffer = jarray.zeros(8, 'c') StructDouble.pack_into(buffer, 0, value) return buffer.tostring() else:
[docs] def float_to_bytes(value): buffer = bytearray(8) StructDouble.pack_into(buffer, 0, value) return buffer
[docs]def numeric_array_to_wxf(data, dimensions, wl_type): yield WXF_CONSTANTS.NumericArray yield ARRAY_TYPES[wl_type] yield varint_bytes(len(dimensions)) for dim in dimensions: yield varint_bytes(dim) yield data