Source code for csvw.dsv

"""Support for reading delimiter-separated value files.

This module contains unicode aware replacements for :func:`csv.reader`
and :func:`csv.writer`. It was stolen/extracted from the ``csvkit``
project to allow re-use when the whole ``csvkit`` package isn't
required.

The original implementations were largely copied from
`examples in the csv module documentation <http://docs.python.org/library/csv.html\
#examples>`_.

.. seealso:: http://en.wikipedia.org/wiki/Delimiter-separated_values
"""
import io
import csv
import codecs
import shutil
import typing
import pathlib
import tempfile
import warnings
import functools
import collections

from . import utils
from .dsv_dialects import Dialect

__all__ = [
    'UnicodeWriter',
    'UnicodeReader', 'UnicodeReaderWithLineNumber', 'UnicodeDictReader', 'NamedTupleReader',
    'iterrows',
    'rewrite', 'add_rows', 'filter_rows_as_dict',
]

LINES_OR_PATH = typing.Union[str, pathlib.Path, typing.IO, typing.Iterable[str]]


def normalize_encoding(encoding: str) -> str:
    return codecs.lookup(encoding).name


[docs]class UnicodeWriter: """ Write Unicode data to a csv file. :param f: The target to which to write the data; a local path specified as `str` or \ `pathlib.Path` or `None`, in which case the data, formatted as DSV can be retrieved \ via :meth:`~UnicodeWriter.read` :param dialect: Either a dialect name as recognized by `csv.writer` or a \ :class:`~Dialect` instance for dialect customization beyond what can be done with \ `csv.writer`. :param kw: Keyword arguments passed through to `csv.writer`. .. code-block:: python >>> from csvw import UnicodeWriter >>> with UnicodeWriter('data.tsv', delimiter='\t') as writer: ... writer.writerow(['ä', 'ö', 'ü']) """ def __init__( self, f: typing.Optional[typing.Union[str, pathlib.Path]] = None, dialect: typing.Optional[typing.Union[Dialect, str]] = None, **kw): self.f = f self.encoding = kw.pop('encoding', 'utf-8') if isinstance(dialect, Dialect): self.encoding = dialect.python_encoding self.kw = dialect.as_python_formatting_parameters() self.kw.update(kw) else: self.kw = kw if dialect: self.kw['dialect'] = dialect self.encoding = normalize_encoding(self.encoding) self.escapechar = self.kw.get('escapechar') if self.escapechar and self.kw.get('quoting') != csv.QUOTE_NONE: # work around https://bugs.python.org/issue12178 # (csv.writer doesn't escape escapechar while csv.reader expects it) def _escapedoubled(row, _type=str, _old=self.escapechar, _new=2 * self.escapechar): return [s.replace(_old, _new) if isinstance(s, _type) else s for s in row] else: def _escapedoubled(row): return row self._escapedoubled = _escapedoubled self._close = False def __enter__(self): if isinstance(self.f, (str, pathlib.Path)): if isinstance(self.f, pathlib.Path): self.f = str(self.f) self.f = io.open(self.f, 'wt', encoding=self.encoding, newline='') self._close = True elif self.f is None: self.f = io.StringIO(newline='') self.writer = csv.writer(self.f, **self.kw) return self
[docs] def read(self) -> typing.Optional[bytes]: """ If the writer has been initialized passing `None` as target, the CSV data as `bytes` can be retrieved calling this method. """ if hasattr(self.f, 'seek'): self.f.seek(0) if hasattr(self.f, 'read'): return self.f.read().encode('utf-8')
def __exit__(self, type, value, traceback): if self._close: self.f.close() def writerow(self, row: typing.Union[tuple, list]): self.writer.writerow(self._escapedoubled(row)) def writerows(self, rows: typing.Iterable[typing.Union[tuple, list]]): for row in rows: self.writerow(row)
[docs]class UnicodeReader: """ Read Unicode data from a csv file. :param f: The source from which to read the data; a local path specified as `str` or \ `pathlib.Path`, a file-like object or a `list` of lines. :param dialect: Either a dialect name as recognized by `csv.reader` or a \ :class:`~Dialect` instance for dialect customization beyond what can be done with \ `csv.writer`. :param kw: Keyword arguments passed through to `csv.reader`. .. code-block:: python >>> with UnicodeReader('tests/fixtures/frictionless-data.csv', delimiter='|') as reader: ... for row in reader: ... print(row) ... break ... ['FK', 'Year', 'Location name', 'Value', 'binary', 'anyURI', 'email', 'boolean', 'array', 'geojson'] """ def __init__( self, f: LINES_OR_PATH, dialect: typing.Optional[typing.Union[Dialect, str]] = None, **kw): self.f = f self.encoding = normalize_encoding(kw.pop('encoding', 'utf-8-sig')) self.newline = kw.pop('lineterminator', None) self.dialect = dialect if isinstance(dialect, Dialect) else None if self.dialect: self.encoding = self.dialect.python_encoding self.kw = dialect.as_python_formatting_parameters() self.kw.update(kw) else: self.kw = kw if dialect: self.kw['dialect'] = dialect self._close = False self.comments = [] # We potentially screw people with valid CSV files where the content - presumably the # header - starts with 0xfeff. But the chance of irritating people trying to read Excel # exported CSV with the defaults seems way bigger - and anyone with CSV column names # starting with 0xfeff will run into more trouble down the line anyway ... if self.encoding == 'utf-8': self.encoding = 'utf-8-sig' # encoding of self.reader rows: differs from source encoding # where we need to recode from non-8bit clean source encoding # to utf-8 first to feed into the (byte-based) PY2 csv.reader self._reader_encoding = self.encoding def __enter__(self): if isinstance(self.f, (str, pathlib.Path)): if isinstance(self.f, pathlib.Path): self.f = str(self.f) self.f = io.open(self.f, mode='rt', encoding=self.encoding, newline=self.newline or '') self._close = True elif not hasattr(self.f, 'read'): lines = [] for line in self.f: lines.append(line.decode(self.encoding) if isinstance(line, bytes) else line) self.f = lines self.reader = csv.reader(self.f, **self.kw) self.lineno = -1 return self def _next_row(self): self.lineno += 1 row = [ s if isinstance(s, str) else s.decode(self._reader_encoding) for s in next(self.reader)] self.lineno += sum([list(s).count('\n') for s in row]) return row def __next__(self): row = self._next_row() if self.dialect: while (row and self.dialect.commentPrefix and # noqa: W504 row[0].startswith(self.dialect.commentPrefix)) or \ ((not row or set(row) == {''}) and self.dialect.skipBlankRows) or \ (self.lineno < self.dialect.skipRows): if (row and self.dialect.commentPrefix and # noqa: W504 row[0].startswith(self.dialect.commentPrefix)) or \ (row and self.lineno < self.dialect.skipRows): self.comments.append(( self.lineno, self.dialect.delimiter.join(row).lstrip(self.dialect.commentPrefix).strip(), )) row = self._next_row() row = [self.dialect.trimmer(s) for s in row][self.dialect.skipColumns:] return row def __exit__(self, exc_type, exc_val, exc_tb): if self._close: self.f.close() def __iter__(self): return self
[docs]class UnicodeReaderWithLineNumber(UnicodeReader): """ A `UnicodeReader` yielding (lineno, row) pairs, where "lineno" is the 1-based number of the the **text line** where the (possibly multi-line) row data starts in the DSV file. """ def __next__(self): """ :return: a pair (1-based line number in the input, row) """ # Retrieve the row, thereby incrementing the line number: row = super(UnicodeReaderWithLineNumber, self).__next__() return self.lineno + 1, row
[docs]class UnicodeDictReader(UnicodeReader): """ A `UnicodeReader` yielding one `dict` per row. :param f: As for :class:`UnicodeReader` :param fieldnames: .. code-block:: python >>> with UnicodeDictReader( ... 'tests/fixtures/frictionless-data.csv', ... dialect=Dialect(delimiter='|', header=False), ... fieldnames=[str(i) for i in range(1, 11)]) as reader: ... for row in reader: ... print(row) ... break ... OrderedDict([('1', 'FK'), ('2', 'Year'), ('3', 'Location name'), ('4', 'Value'), ('5', 'binary'), ('6', 'anyURI'), ('7', 'email'), ('8', 'boolean'), ('9', 'array'), ('10', 'geojson')]) """ def __init__(self, f, fieldnames=None, restkey=None, restval=None, **kw): self._fieldnames = fieldnames # list of keys for the dict self.restkey = restkey # key to catch long rows self.restval = restval # default value for short rows self.line_num = 0 super(UnicodeDictReader, self).__init__(f, **kw) @property def fieldnames(self): if self._fieldnames is None: try: self._fieldnames = super(UnicodeDictReader, self).__next__() except StopIteration: pass self.line_num = self.reader.line_num if self._fieldnames: if len(set(self._fieldnames)) != len(self._fieldnames): warnings.warn('Duplicate column names!') return self._fieldnames def __next__(self): if self.line_num == 0: # Used only for its side effect. self.fieldnames row = super(UnicodeDictReader, self).__next__() self.line_num = self.reader.line_num # unlike the basic reader, we prefer not to return blanks, # because we will typically wind up with a dict full of None # values while row == []: row = super(UnicodeDictReader, self).__next__() return self.item(row) def item(self, row): d = collections.OrderedDict((k, v) for k, v in zip(self.fieldnames, row)) lf = len(self.fieldnames) lr = len(row) if lf < lr: d[self.restkey] = row[lf:] elif lf > lr: for key in self.fieldnames[lr:]: d[key] = self.restval return d
[docs]class NamedTupleReader(UnicodeDictReader): """ A `UnicodeReader` yielding one `namedtuple` per row. .. note:: This reader has some limitations, notably that fieldnames must be normalized to be admissible Python names, but also bad performance (compared with `UnicodeDictReader`). """ _normalize_fieldname = staticmethod(utils.normalize_name) @functools.cached_property def cls(self): fieldnames = list(map(self._normalize_fieldname, self.fieldnames)) return collections.namedtuple('Row', fieldnames) def item(self, row): d = UnicodeDictReader.item(self, row) for name in self.fieldnames: d.setdefault(name, None) return self.cls( **{self._normalize_fieldname(k): v for k, v in d.items() if k in self.fieldnames})
[docs]def iterrows(lines_or_file: LINES_OR_PATH, namedtuples: typing.Optional[bool] = False, dicts: typing.Optional[bool] = False, encoding: typing.Optional[str] = 'utf-8', **kw) -> typing.Generator: """Convenience factory function for csv reader. :param lines_or_file: Content to be read. Either a file handle, a file path or a list\ of strings. :param namedtuples: Yield namedtuples. :param dicts: Yield dicts. :param encoding: Encoding of the content. :param kw: Keyword parameters are passed through to csv.reader. :return: A generator over the rows. """ if namedtuples and dicts: raise ValueError('either namedtuples or dicts can be chosen as output format') elif namedtuples: _reader = NamedTupleReader elif dicts: _reader = UnicodeDictReader else: _reader = UnicodeReader with _reader(lines_or_file, encoding=encoding, **kw) as r: for item in r: yield item
reader = iterrows
[docs]def rewrite(fname: typing.Union[str, pathlib.Path], visitor: typing.Callable[[int, typing.List[str]], typing.Union[None, typing.List[str]]], **kw): """Utility function to rewrite rows in dsv files. :param fname: Path of the dsv file to operate on. :param visitor: A callable that takes a line-number and a row as input and returns a \ (modified) row or None to filter out the row. :param kw: Keyword parameters are passed through to csv.reader/csv.writer. """ fname = utils.ensure_path(fname) assert fname.is_file() with tempfile.NamedTemporaryFile(delete=False) as fp: tmp = pathlib.Path(fp.name) with UnicodeReader(fname, **kw) as reader_: with UnicodeWriter(tmp, **kw) as writer: for i, row in enumerate(reader_): row = visitor(i, row) if row is not None: writer.writerow(row) shutil.move(str(tmp), str(fname)) # Path.replace is Python 3.3+
def add_rows(fname: typing.Union[str, pathlib.Path], *rows: typing.List[str]): with tempfile.NamedTemporaryFile(delete=False) as fp: tmp = pathlib.Path(fp.name) fname = utils.ensure_path(fname) with UnicodeWriter(tmp) as writer: if fname.exists(): with UnicodeReader(fname) as reader_: for row in reader_: writer.writerow(row) writer.writerows(rows) shutil.move(str(tmp), str(fname)) # Path.replace is Python 3.3+
[docs]def filter_rows_as_dict(fname: typing.Union[str, pathlib.Path], filter_: typing.Callable[[dict], bool], **kw) -> int: """Rewrite a dsv file, filtering the rows. :param fname: Path to dsv file :param filter_: callable which accepts a `dict` with a row's data as single argument\ returning a `Boolean` indicating whether to keep the row (`True`) or to discard it \ `False`. :param kw: Keyword arguments to be passed `UnicodeReader` and `UnicodeWriter`. :return: The number of rows that have been removed. """ filter_ = DictFilter(filter_) rewrite(fname, filter_, **kw) return filter_.removed
class DictFilter(object): def __init__(self, filter_): self.header = None self.filter = filter_ self.removed = 0 def __call__(self, i, row): if i == 0: self.header = row return row if row: item = dict(zip(self.header, row)) if self.filter(item): return row else: self.removed += 1