#!/usr/bin/env python
import os
import os.path
import re
import datetime
import unicodedata
import sys
import csv
from copy import copy
from enum import Enum
__all__ = [
'check_column_names',
'get_column_names',
'get_type',
'prepare_value',
'clean_key',
'col_number_to_name',
'copy_cells',
'print_sheet',
'quoted',
'rmdiacritics',
'FileFormat',
'HEADER_LOWER',
'HEADER_UPPER',
'HEADER_SKIP',
'DEFAULT_FORMAT',
'DEFAULT_FLOAT_FORMAT',
'DEFAULT_CSV_DELIMITER',
'DEFAULT_CSV_HEADER',
'DEFAULT_TABLE_NAME',
'INDEX_COLUMN_NAME',
'TYPE_DOUBLE',
'TYPE_INT',
'TYPE_NULLABLE_INT',
'TYPE_DATETIME',
'TYPE_STRING',
'NUMERIC_TYPES',
'XLS_EPOC',
'XLSX_EPOC',
'VERSION',
]
HEADER_LOWER = 'lower'
HEADER_UPPER = 'upper'
HEADER_SKIP = 'skip'
#: Default output format
DEFAULT_FORMAT = 'parquet'
#: Default float format
DEFAULT_FLOAT_FORMAT = '%g'
#: Default CSV delimiter
DEFAULT_CSV_DELIMITER = ','
#: Default CSV header case
DEFAULT_CSV_HEADER = HEADER_LOWER
#: XLS Epoc - see https://support.microsoft.com/en-us/help/214326/excel-incorrectly-assumes-that-the-year-1900-is-a-leap-year
XLS_EPOC = datetime.datetime(1899, 12, 30)
#: XLSX Epoc
XLSX_EPOC = datetime.datetime(1900, 1, 1)
#: Default Query Operator table name
DEFAULT_TABLE_NAME = 'xls'
#: Index colummn name
INDEX_COLUMN_NAME = '_index'
#: Double data type
TYPE_DOUBLE = 'double'
#: Integer data type
TYPE_INT = 'int64'
#: Integer data type with possibly missing value
TYPE_NULLABLE_INT = 'Int64'
#: Datetime data type
TYPE_DATETIME = 'datetime64[ns]'
#: String data type
TYPE_STRING = 'str'
#: Numeric data type
NUMERIC_TYPES = (TYPE_INT, TYPE_NULLABLE_INT, TYPE_DOUBLE)
VERSION_FILE = os.path.join(os.path.dirname(__file__), "VERSION")
with open(VERSION_FILE) as f:
#: Plugin Version
VERSION = f.read().strip()
[docs]def rmdiacritics(char):
"Return the base character without diacritics (eg. accents)"
desc = unicodedata.name(char)
cutoff = desc.find(' WITH ')
if cutoff != -1:
desc = desc[:cutoff]
try:
char = unicodedata.lookup(desc)
except KeyError:
pass
return char
def quoted(string):
return "'" + string + "'"
def clean_key(k):
k = k.strip().lower().replace('-', '').replace('€', '')
k = re.sub('[\ \'\<\>\(\)\.\,\/\_]+', '_', k)
k = ''.join([rmdiacritics(x) for x in k])
k = k.strip('_')
return k
[docs]def col_number_to_name(col_number):
"""
Convert a column number to name (e.g. 0 -> '_index', 0 -> A, 1 -> B)
:param col_number: column number
:type col_number: int
"""
def _col_number_to_name(x):
return (_col_number_to_name((x // 26) - 1) if x >= 26 else '') + chr(
65 + (x % 26)
)
if col_number == 0:
return INDEX_COLUMN_NAME
else:
return _col_number_to_name(col_number - 1)
def get_type(name, value, nullable_int=False):
if isinstance(value, float):
return TYPE_DOUBLE
elif isinstance(value, int):
return TYPE_NULLABLE_INT if nullable_int else TYPE_INT
elif isinstance(value, datetime.datetime):
return TYPE_DATETIME
elif isinstance(value, str):
return TYPE_STRING
else:
raise Exception('unsupported data type {} {}'.format(name, type(value)))
[docs]def prepare_value(name, value):
"Try cast string to int and float"
if isinstance(value, str):
value = value.strip()
try:
return int(value)
except (TypeError, ValueError):
try:
return float(value)
except (TypeError, ValueError):
pass
return value
[docs]def get_column_names(sheet, skip_rows=0):
"""
Extract the column names from the first row of the worksheet
:param sheet: worksheet
:type sheet: Worksheet
:param skip_rows: Number of input lines to skip
:type skip_rows: int
"""
header = sheet[1 + skip_rows]
names = [clean_key(x.value) for x in header if x.value is not None]
# Append the column to the name if the name is not unique
return [
x
if (i == 0 or x not in names[:i])
else '{}_{}'.format(x, col_number_to_name(i + 1).lower())
for i, x in enumerate(names)
]
def check_column_names(column_names):
# Check unique columns
if len(set(column_names)) != len(column_names):
duplicates = list(set([x for x in column_names if column_names.count(x) > 1]))
raise Exception('Columns names are not unique: {0}'.format(duplicates))
[docs]def print_sheet(sheet, fileobj=sys.stdout):
"Print a sheet on standard output as CSV"
csw_writer = csv.writer(fileobj, quoting=csv.QUOTE_MINIMAL, delimiter=',')
csw_writer.writerows(sheet.values)
[docs]def copy_cells(source, target):
"Copy cells from source worksheet to target"
for (row, col), source_cell in source._cells.items():
target_cell = target.cell(column=col, row=row)
target_cell._value = source_cell._value
target_cell.data_type = source_cell.data_type
if source_cell.has_style:
target_cell.number_format = copy(source_cell.number_format)