Source code for gismo.filesource

import zstandard as zstd
import json
import io
import dill as pickle
from pathlib import Path
from gismo.corpus import toy_source_dict


[docs] def create_file_source(source=None, filename="mysource", path="."): """ Write a source (list of dict) to files in the same format used by FileSource. Only useful to transfer from a computer with a lot of RAM to a computer with less RAM. For more complex cases, e.g. when the initial source itself is a very large file, a dedicated converter has to be provided. Parameters ---------- source: list of dict The source to write filename: str Stem of the file. Two files will be created, with suffixes *.index* and *.data*. path: str or Path Destination directory """ if source is None: source = toy_source_dict path = Path(path) data_file = path / Path(f"{filename}.data") index_file = path / Path(f"{filename}.index") indices = [0] cctx = zstd.ZstdCompressor(level=3) with open(data_file, "wb") as f: for item in source: f.write(cctx.compress(json.dumps(item).encode("utf8"))) # f.write(zlib.compress(json.dumps(item).encode('utf8'))) indices.append(f.tell()) with open(index_file, "wb") as f: pickle.dump(indices, f)
[docs] class FileSource: """ Yield a file source as a list. Assumes the existence of two files: The *mysource*.data file contains the stacked items. Each item is compressed with :py:mod:`zlib`; The *mysource*.index files contains the list of pointers to seek items in the data file. The resulting source object is fully compatible with the :class:`~gismo.corpus.Corpus` class: * It can be iterated (``[item for item in source]``); * It can yield single items (``source[i]``); * It has a length (``len(source)``). More advanced functionalities like slices are not implemented. Parameters ---------- path: str Location of the files filename: str Stem of the file load_source: bool Should the data be loaded in RAM Examples --------- >>> import tempfile >>> with tempfile.TemporaryDirectory() as dirname: ... create_file_source(filename='mysource', path=dirname) ... source = FileSource(filename='mysource', path=dirname, load_source=True) ... content = [e['content'] for e in source] >>> content[:3] ['Gizmo is a Mogwaï.', 'This is a sentence about Blade.', 'This is another sentence about Shadoks.'] Note: when source is read from file (``load_source=False``, default behavior), you need to close the source afterward to avoid pending file handles. Or use a context manager. >>> with tempfile.TemporaryDirectory() as dirname: ... create_file_source(filename='mysource', path=dirname) ... with FileSource(filename='mysource', path=dirname) as source: ... size = len(source) ... items = [source[i] for i in range(0, size, 2)] >>> size 5 >>> items # doctest: +NORMALIZE_WHITESPACE [{'title': 'First Document', 'content': 'Gizmo is a Mogwaï.'}, {'title': 'Third Document', 'content': 'This is another sentence about Shadoks.'}, {'title': 'Fifth Document', 'content': 'In chinese folklore, a Mogwaï is a demon.'}] """ def __init__(self, filename="mysource", path=".", load_source=False): path = Path(path) index = path / Path(f"{filename}.index") data = path / Path(f"{filename}.data") self.dctx = zstd.ZstdDecompressor() # load index with open(index, "rb") as f: self.index = pickle.load(f) self.n = len(self.index) - 1 if load_source: with open(data, "rb") as f: self.f = io.BytesIO(f.read()) else: self.f = open(data, "rb") def __getitem__(self, i): self.f.seek(self.index[i]) line = self.dctx.decompress( self.f.read(self.index[i + 1] - self.index[i]) ).decode("utf8") return json.loads(line) def __iter__(self): self.i = 0 self.f.seek(0) return self def __next__(self): if self.i == self.n: raise StopIteration line = self.dctx.decompress( self.f.read(self.index[self.i + 1] - self.index[self.i]) ).decode("utf8") self.i += 1 return json.loads(line) def __len__(self): return self.n def close(self): if self.f: self.f.close() self.f = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()