| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- import json
- from c import struct
- import builtins
- _BASIC_TYPES = [int, float, str, bool, type(None)]
- _MOD_T_SEP = "@"
- def _find_class(path: str):
- if _MOD_T_SEP not in path:
- return builtins.__dict__[path]
- modpath, name = path.split(_MOD_T_SEP)
- return __import__(modpath).__dict__[name]
- class _Pickler:
- def __init__(self, obj) -> None:
- self.obj = obj
- self.raw_memo = {} # id -> int
- self.memo = [] # int -> object
- @staticmethod
- def _type_id(t: type):
- assert type(t) is type
- name = t.__name__
- mod = t.__module__
- if mod is not None:
- name = mod.__path__ + _MOD_T_SEP + name
- return name
- def wrap(self, o):
- o_t = type(o)
- if o_t in _BASIC_TYPES:
- return o
- if o_t is type:
- return ["type", self._type_id(o)]
- index = self.raw_memo.get(id(o), None)
- if index is not None:
- return [index]
-
- ret = []
- index = len(self.memo)
- self.memo.append(ret)
- self.raw_memo[id(o)] = index
- if o_t is tuple:
- ret.append("tuple")
- ret.append([self.wrap(i) for i in o])
- return [index]
- if o_t is bytes:
- ret.append("bytes")
- ret.append([o[j] for j in range(len(o))])
- return [index]
- if o_t is list:
- ret.append("list")
- ret.append([self.wrap(i) for i in o])
- return [index]
- if o_t is dict:
- ret.append("dict")
- ret.append([[self.wrap(k), self.wrap(v)] for k,v in o.items()])
- return [index]
-
- _0 = self._type_id(o_t)
- if getattr(o_t, '__struct__', False):
- ret.append(_0)
- ret.append(o.tostruct().hex())
- return [index]
- if hasattr(o, "__getnewargs__"):
- _1 = o.__getnewargs__() # an iterable
- _1 = [self.wrap(i) for i in _1]
- else:
- _1 = None
- if o.__dict__ is None:
- _2 = None
- else:
- _2 = {k: self.wrap(v) for k,v in o.__dict__.items()}
- ret.append(_0) # type id
- ret.append(_1) # newargs
- ret.append(_2) # state
- return [index]
-
- def run_pipe(self):
- o = self.wrap(self.obj)
- return [o, self.memo]
- class _Unpickler:
- def __init__(self, obj, memo: list) -> None:
- self.obj = obj
- self.memo = memo
- self._unwrapped = [None] * len(memo)
- def tag(self, index, o):
- assert self._unwrapped[index] is None
- self._unwrapped[index] = o
- def unwrap(self, o, index=None):
- if type(o) in _BASIC_TYPES:
- return o
- assert type(o) is list
- if o[0] == "type":
- return _find_class(o[1])
- # reference
- if type(o[0]) is int:
- assert index is None # index should be None
- index = o[0]
- if self._unwrapped[index] is None:
- o = self.memo[index]
- assert type(o) is list
- assert type(o[0]) is str
- self.unwrap(o, index)
- assert self._unwrapped[index] is not None
- return self._unwrapped[index]
-
- # concrete reference type
- if o[0] == "tuple":
- ret = tuple([self.unwrap(i) for i in o[1]])
- self.tag(index, ret)
- return ret
- if o[0] == "bytes":
- ret = bytes(o[1])
- self.tag(index, ret)
- return ret
- if o[0] == "list":
- ret = []
- self.tag(index, ret)
- for i in o[1]:
- ret.append(self.unwrap(i))
- return ret
- if o[0] == "dict":
- ret = {}
- self.tag(index, ret)
- for k,v in o[1]:
- ret[self.unwrap(k)] = self.unwrap(v)
- return ret
-
- # generic object
- cls = _find_class(o[0])
- if getattr(cls, '__struct__', False):
- inst = cls.fromstruct(struct.fromhex(o[1]))
- self.tag(index, inst)
- return inst
- else:
- _, newargs, state = o
- # create uninitialized instance
- new_f = getattr(cls, "__new__")
- if newargs is not None:
- newargs = [self.unwrap(i) for i in newargs]
- inst = new_f(cls, *newargs)
- else:
- inst = new_f(cls)
- self.tag(index, inst)
- # restore state
- if state is not None:
- for k,v in state.items():
- setattr(inst, k, self.unwrap(v))
- return inst
- def run_pipe(self):
- return self.unwrap(self.obj)
- def _wrap(o):
- return _Pickler(o).run_pipe()
- def _unwrap(packed: list):
- return _Unpickler(*packed).run_pipe()
- def dumps(o) -> bytes:
- o = _wrap(o)
- return json.dumps(o).encode()
- def loads(b) -> object:
- assert type(b) is bytes
- o = json.loads(b.decode())
- return _unwrap(o)
|