Source code for xlsx_provider.operators.from_xlsx_operator

#!/usr/bin/env python

import json
import datetime
import dateutil.parser
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from xlsx_provider.loader import load_worksheet
from xlsx_provider.commons import (
    check_column_names,
    get_type,
    prepare_value,
    get_column_names,
    FileFormat,
    DEFAULT_CSV_DELIMITER,
    DEFAULT_CSV_HEADER,
    DEFAULT_FORMAT,
    DEFAULT_FLOAT_FORMAT,
    INDEX_COLUMN_NAME,
    TYPE_INT,
    TYPE_NULLABLE_INT,
    TYPE_DOUBLE,
    TYPE_DATETIME,
    NUMERIC_TYPES,
    HEADER_UPPER,
    HEADER_LOWER,
    XLSX_EPOC,
)

__all__ = ['FromXLSXOperator']


[docs]class FromXLSXOperator(BaseOperator): """ Convert an XLSX/XLS file into Parquet or CSV file Read an XLSX or XLS file and convert it into Parquet, CSV, JSON, JSON Lines(one line per record) file. :param source: Source filename (XLSX or XLS, templated) :type source: str :param target: Target filename (templated) :type target: str :param worksheet: Worksheet title or number (zero-based, templated) :type worksheet: str or int :param skip_rows: Number of input lines to skip (default: 0, templated) :type skip_rows: int :param limit: Row limit (default: None, templated) :type limit: int :param drop_columns: List of columns to be dropped :type drop_columns: list of str :param add_columns: Columns to be added (dict or list column=value) :type add_columns: list of str or dictionary of string key/value pair :param types: force Parquet column types (dict or list column='str', 'int64', 'double', 'datetime64[ns]') :type types: str or dictionary of string key/value pair :param column_names: force columns names (list) :type column_names: list of str :param file_format: Output file format (parquet, csv, json, jsonl) :type file_format: str :param csv_delimiter: CSV delimiter (default: ',') :type csv_delimiter: str :param csv_header: Convert CSV output header case ('lower', 'upper', 'skip') :type csv_header: str :param float_format: Format string for floating point numbers (default '%g') :type float_format: str :param nullable_int: nullable integer data type support :type nullable_int: bool """ FileFormat = FileFormat template_fields = ('source', 'target', 'worksheet', 'limit', 'skip_rows') ui_color = '#a934bd' @apply_defaults def __init__( self, source, target, worksheet=0, skip_rows=0, limit=None, drop_columns=None, add_columns=None, types=None, column_names=None, file_format=DEFAULT_FORMAT, csv_delimiter=DEFAULT_CSV_DELIMITER, csv_header=DEFAULT_CSV_HEADER, float_format=DEFAULT_FLOAT_FORMAT, nullable_int=False, *args, **kwargs ): super(FromXLSXOperator, self).__init__(*args, **kwargs) self.source = source self.target = target try: self.worksheet = int(worksheet) except: self.worksheet = worksheet self.skip_rows = skip_rows self.limit = limit self.drop_columns = drop_columns or [] if isinstance(add_columns, list): self.add_columns = dict(x.split('=') for x in add_columns) else: self.add_columns = add_columns or {} if isinstance(types, list): self.types = dict(x.split('=') for x in types) else: self.types = types or {} self.names = column_names self.file_format = FileFormat.lookup(file_format) self.csv_delimiter = csv_delimiter self.csv_header = csv_header self.float_format = float_format self.nullable_int = nullable_int def load_worksheet(self, sheet=None): # Load a worksheet return load_worksheet( filename=self.source, sheet=sheet, worksheet=self.worksheet, skip_rows=self.skip_rows, csv_delimiter=self.csv_delimiter, ) def get_value_and_type(self, cel, name, datatypes): value = cel.value if value is not None: value = prepare_value(name, value) type_ = get_type(name, value, self.nullable_int) if datatypes[name] is None: datatypes[name] = type_ elif ( datatypes[name] in (TYPE_INT, TYPE_NULLABLE_INT) and type_ == TYPE_DOUBLE ): datatypes[name] = type_ # If the column is numeric, replace empty strings with None if value == '' and datatypes[name] in NUMERIC_TYPES: value = None if datatypes[name] == TYPE_DATETIME: if not value: value = None elif isinstance(value, int) or isinstance(value, float): value = XLSX_EPOC + datetime.timedelta(days=value) elif not isinstance(value, datetime.datetime) and not isinstance( value, datetime.date ): value = dateutil.parser.parse(value) return value
[docs] def execute(self, context): try: sheet = self.load_worksheet() rows = list(sheet) if self.names is not None: names = self.names else: names = get_column_names(sheet, skip_rows=self.skip_rows) empty_lines = 0 # Check unique columns check_column_names(names) datatypes = dict([(name, self.types.get(name)) for name in names]) if INDEX_COLUMN_NAME in datatypes: datatypes[INDEX_COLUMN_NAME] = 'double' columns = dict([(name, []) for name in names]) # Add the additional (fixed value) columns for name, value in self.add_columns.items(): datatypes[name] = get_type(name, value, self.nullable_int) columns[name] = [] for _index, row in enumerate(rows[1:]): if self.limit is not None and _index >= self.limit: break # Skip empty lines empty = True for i, name in enumerate(names): if name != INDEX_COLUMN_NAME and row[i].value is not None: empty = False if empty: empty_lines = empty_lines + 1 continue for i, name in enumerate(names): if name == INDEX_COLUMN_NAME: columns[INDEX_COLUMN_NAME].append(_index) continue cel = row[i] value = self.get_value_and_type(cel, name, datatypes) columns[name].append(value) for name, value in self.add_columns.items(): if name not in names: columns[name].append(value) row_num = sheet.max_row - 1 if self.limit is not None: row_num = min(row_num, self.limit) for i, name in enumerate(names): assert ( len(columns[name]) == row_num - empty_lines ) # check rows number (skip header) self.write(names, columns, datatypes) except Exception as e: raise AirflowException("XLSXToParquet operator error: {0}".format(str(e))) return True
def to_dataframe(self, names, columns, datatypes): import pandas as pd all_names = names + [x for x in self.add_columns.keys() if x not in names] pd_data = {} for name in all_names: if name not in self.drop_columns: pd_data[name] = pd.Series(columns[name], dtype=datatypes[name]) return pd.DataFrame(pd_data)
[docs] def write_parquet(self, names, columns, datatypes): "Write the results in parquet format" import pyarrow.parquet df = self.to_dataframe(names, columns, datatypes) pyarrow.parquet.write_table( table=pyarrow.Table.from_pandas(df), where=self.target, compression='SNAPPY', flavor='spark', )
[docs] def write_csv(self, names, columns, datatypes): "Write data to CSV file" with open(self.target, 'w') as f: # header if self.csv_header == HEADER_UPPER: f.write(self.csv_delimiter.join([x.upper() for x in datatypes.keys()])) f.write('\n') elif self.csv_header == HEADER_LOWER: f.write(self.csv_delimiter.join(datatypes.keys())) f.write('\n') # data df = self.to_dataframe(names, columns, datatypes) df.to_csv( path_or_buf=f, sep=self.csv_delimiter, header=False, index=False, date_format='%Y-%m-%d %M:%M:%S', float_format=self.float_format, )
[docs] def write_json(self, names, columns, datatypes): "Write data to JSON file" data = list( dict(q) for q in zip(*(list((k, x) for x in v) for k, v in columns.items())) ) with open(self.target, 'w') as f: f.write(json.dumps(data, indent=2, default=str))
[docs] def write_jsonl(self, names, columns, datatypes): "Write data to JSON Lines file" data = list( dict(q) for q in zip(*(list((k, x) for x in v) for k, v in columns.items())) ) with open(self.target, 'w') as f: f.write('\n'.join(json.dumps(x, default=str) for x in data))
[docs] def write(self, names, columns, datatypes): "Write data to file" if self.file_format == FileFormat.csv: self.write_csv(names, columns, datatypes) elif self.file_format == FileFormat.json: self.write_json(names, columns, datatypes) elif self.file_format == FileFormat.jsonl: self.write_jsonl(names, columns, datatypes) else: self.write_parquet(names, columns, datatypes)
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument('filename') parser.add_argument('-w', '--worksheet', dest='worksheet', default=0) parser.add_argument('-a', '--add_col', dest='add_columns', action='append') parser.add_argument('-d', '--drop_col', dest='drop_columns', action='append') parser.add_argument('-t', '--type', dest='types', action='append') parser.add_argument('--float_format', dest='float_format', default='%g') parser.add_argument( '--delimiter', dest='csv_delimiter', default=DEFAULT_CSV_DELIMITER ) parser.add_argument('-o', '--output', dest='output') parser.add_argument( '--header', dest='csv_header', choices=['lower', 'upper', 'skip'], default=DEFAULT_CSV_HEADER, ) group = parser.add_mutually_exclusive_group() group.add_argument('--csv', dest='file_format_csv', action='store_true') group.add_argument( '--parquet', dest='file_format_csv', action='store_false', default=False ) parser.add_argument('-n', '--nullable_int', action=argparse.BooleanOptionalAction) args = parser.parse_args() file_format = 'csv' if args.file_format_csv else 'parquet' so = FromXLSXOperator( task_id='test', source=args.filename, target=args.output or (args.filename + '.' + file_format), drop_columns=args.drop_columns, add_columns=args.add_columns, types=args.types, file_format=file_format, worksheet=args.worksheet, csv_delimiter=args.csv_delimiter, csv_header=args.csv_header, float_format=args.float_format, nullable_int=args.nullable_int, ) so.execute({})