Source code for xlsx_provider.commons

#!/usr/bin/env python

import os
import os.path
import re
import datetime
import unicodedata
import sys
import csv
from copy import copy
from enum import Enum

__all__ = [
    'check_column_names',
    'get_column_names',
    'get_type',
    'prepare_value',
    'clean_key',
    'col_number_to_name',
    'copy_cells',
    'print_sheet',
    'quoted',
    'rmdiacritics',
    'FileFormat',
    'HEADER_LOWER',
    'HEADER_UPPER',
    'HEADER_SKIP',
    'DEFAULT_FORMAT',
    'DEFAULT_FLOAT_FORMAT',
    'DEFAULT_CSV_DELIMITER',
    'DEFAULT_CSV_HEADER',
    'DEFAULT_TABLE_NAME',
    'INDEX_COLUMN_NAME',
    'TYPE_DOUBLE',
    'TYPE_INT',
    'TYPE_NULLABLE_INT',
    'TYPE_DATETIME',
    'TYPE_STRING',
    'NUMERIC_TYPES',
    'XLS_EPOC',
    'XLSX_EPOC',
    'VERSION',
]

HEADER_LOWER = 'lower'
HEADER_UPPER = 'upper'
HEADER_SKIP = 'skip'
#: Default output format
DEFAULT_FORMAT = 'parquet'
#: Default float format
DEFAULT_FLOAT_FORMAT = '%g'
#: Default CSV delimiter
DEFAULT_CSV_DELIMITER = ','
#: Default CSV header case
DEFAULT_CSV_HEADER = HEADER_LOWER
#: XLS Epoc - see https://support.microsoft.com/en-us/help/214326/excel-incorrectly-assumes-that-the-year-1900-is-a-leap-year
XLS_EPOC = datetime.datetime(1899, 12, 30)
#: XLSX Epoc
XLSX_EPOC = datetime.datetime(1900, 1, 1)
#: Default Query Operator table name
DEFAULT_TABLE_NAME = 'xls'
#: Index colummn name
INDEX_COLUMN_NAME = '_index'

#: Double data type
TYPE_DOUBLE = 'double'
#: Integer data type
TYPE_INT = 'int64'
#: Integer data type with possibly missing value
TYPE_NULLABLE_INT = 'Int64'
#: Datetime data type
TYPE_DATETIME = 'datetime64[ns]'
#: String data type
TYPE_STRING = 'str'
#: Numeric data type
NUMERIC_TYPES = (TYPE_INT, TYPE_NULLABLE_INT, TYPE_DOUBLE)

VERSION_FILE = os.path.join(os.path.dirname(__file__), "VERSION")
with open(VERSION_FILE) as f:
    #: Plugin Version
    VERSION = f.read().strip()


[docs]def rmdiacritics(char): "Return the base character without diacritics (eg. accents)" desc = unicodedata.name(char) cutoff = desc.find(' WITH ') if cutoff != -1: desc = desc[:cutoff] try: char = unicodedata.lookup(desc) except KeyError: pass return char
def quoted(string): return "'" + string + "'" def clean_key(k): k = k.strip().lower().replace('-', '').replace('€', '') k = re.sub('[\ \'\<\>\(\)\.\,\/\_]+', '_', k) k = ''.join([rmdiacritics(x) for x in k]) k = k.strip('_') return k
[docs]def col_number_to_name(col_number): """ Convert a column number to name (e.g. 0 -> '_index', 0 -> A, 1 -> B) :param col_number: column number :type col_number: int """ def _col_number_to_name(x): return (_col_number_to_name((x // 26) - 1) if x >= 26 else '') + chr( 65 + (x % 26) ) if col_number == 0: return INDEX_COLUMN_NAME else: return _col_number_to_name(col_number - 1)
def get_type(name, value, nullable_int=False): if isinstance(value, float): return TYPE_DOUBLE elif isinstance(value, int): return TYPE_NULLABLE_INT if nullable_int else TYPE_INT elif isinstance(value, datetime.datetime): return TYPE_DATETIME elif isinstance(value, str): return TYPE_STRING else: raise Exception('unsupported data type {} {}'.format(name, type(value)))
[docs]def prepare_value(name, value): "Try cast string to int and float" if isinstance(value, str): value = value.strip() try: return int(value) except (TypeError, ValueError): try: return float(value) except (TypeError, ValueError): pass return value
[docs]def get_column_names(sheet, skip_rows=0): """ Extract the column names from the first row of the worksheet :param sheet: worksheet :type sheet: Worksheet :param skip_rows: Number of input lines to skip :type skip_rows: int """ header = sheet[1 + skip_rows] names = [clean_key(x.value) for x in header if x.value is not None] # Append the column to the name if the name is not unique return [ x if (i == 0 or x not in names[:i]) else '{}_{}'.format(x, col_number_to_name(i + 1).lower()) for i, x in enumerate(names) ]
def check_column_names(column_names): # Check unique columns if len(set(column_names)) != len(column_names): duplicates = list(set([x for x in column_names if column_names.count(x) > 1])) raise Exception('Columns names are not unique: {0}'.format(duplicates))
[docs]def copy_cells(source, target): "Copy cells from source worksheet to target" for (row, col), source_cell in source._cells.items(): target_cell = target.cell(column=col, row=row) target_cell._value = source_cell._value target_cell.data_type = source_cell.data_type if source_cell.has_style: target_cell.number_format = copy(source_cell.number_format)
[docs]class FileFormat(Enum): "File format enumerator (parquet/csv/json/jsonl)" parquet = 'parquet' csv = 'csv' json = 'json' jsonl = 'jsonl' # JSON lines (newline-delimited JSON) @classmethod def lookup(cls, file_format): if not file_format: return DEFAULT_FORMAT elif isinstance(file_format, FileFormat): return file_format else: return cls[file_format.lower()]