Source code for xlsx_provider.operators.from_xlsx_query_operator

#!/usr/bin/env python

import csv
import json
import sqlite3
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from xlsx_provider.commons import (
    check_column_names,
    col_number_to_name,
    get_column_names,
    get_type,
    prepare_value,
    quoted,
    FileFormat,
    INDEX_COLUMN_NAME,
    DEFAULT_CSV_DELIMITER,
    DEFAULT_CSV_HEADER,
    DEFAULT_FORMAT,
    DEFAULT_TABLE_NAME,
    HEADER_UPPER,
    HEADER_LOWER,
)
from xlsx_provider.operators.from_xlsx_operator import FromXLSXOperator

__all__ = ['FromXLSXQueryOperator']


[docs]class FromXLSXQueryOperator(FromXLSXOperator): """ Execute an SQL query an XLSX/XLS file and export the result into a Parquet or CSV file This operators loads an XLSX or XLS file into an in-memory SQLite database, executes a query on the db and stores the result into a Parquet, CSV, JSON, JSON Lines(one line per record) file. The output columns names and types are determinated by the SQL query output. :param source: Source filename (XLSX or XLS, templated) :type source: str :param target: Target filename (templated) :type target: str :param worksheet: Worksheet title or number (zero-based, templated) :type worksheet: str or int :param skip_rows: Number of input lines to skip (default: 0, templated) :type skip_rows: int :param types: force Parquet column types (dict or list column='str', 'int64', 'double', 'datetime64[ns]') :type types: str or dictionary of string key/value pair :param file_format: Output file format (parquet, csv, json, jsonl) :type file_format: str :param csv_delimiter: CSV delimiter (default: ',') :type csv_delimiter: str :param csv_header: Convert CSV output header case ('lower', 'upper', 'skip') :type csv_header: str :param query: SQL query (templated) :type query: str :param table_name: Table name (default: 'xls', templated) :param use_first_row_as_header: if true, use the first row as column names otherwhise use A, B, C, ... as colum names :type use_first_row_as_header: bool :param nullable_int: nullable integer data type support :type nullable_int: bool """ FileFormat = FileFormat template_fields = ( 'source', 'target', 'worksheet', 'query', 'table_name', 'skip_rows', ) ui_color = '#a934bd' @apply_defaults def __init__( self, source, target, worksheet=0, skip_rows=0, types=None, file_format=DEFAULT_FORMAT, csv_delimiter=DEFAULT_CSV_DELIMITER, csv_header=DEFAULT_CSV_HEADER, query=None, table_name=DEFAULT_TABLE_NAME, use_first_row_as_header=False, nullable_int=False, *args, **kwargs ): super(FromXLSXQueryOperator, self).__init__( *args, **kwargs, source=source, target=target, worksheet=worksheet, skip_rows=skip_rows, types=types, file_format=file_format, csv_delimiter=csv_delimiter, csv_header=csv_header ) self.query = query self.table_name = table_name self.use_first_row_as_header = use_first_row_as_header self.nullable_int = nullable_int
[docs] def write_parquet(self, result): "Write the results in parquet format" import pandas as pd import pyarrow.parquet pd_data = {} for name in result.columns.keys(): pd_data[name] = pd.Series( result.columns[name], dtype=result.datatypes[name] ) df = pd.DataFrame(pd_data) pyarrow.parquet.write_table( table=pyarrow.Table.from_pandas(df), where=self.target, compression='SNAPPY', flavor='spark', )
[docs] def write_csv(self, result): "Write data to CSV file" data = list(zip(*[result.columns[x] for x in result.columns.keys()])) with open(self.target, 'w') as f: csw_writer = csv.writer( f, quoting=csv.QUOTE_MINIMAL, delimiter=self.csv_delimiter ) if self.csv_header == HEADER_UPPER: csw_writer.writerow([x.upper() for x in result.columns.keys()]) elif self.csv_header == HEADER_LOWER: csw_writer.writerow([x.lower() for x in result.columns.keys()]) csw_writer.writerows(data)
[docs] def write_json(self, result): "Write data to JSON file" data = list( dict(q) for q in zip(*(list((k, x) for x in v) for k, v in result.columns.items())) ) with open(self.target, 'w') as f: f.write(json.dumps(data, indent=2))
[docs] def write_jsonl(self, result): "Write data to JSON Lines file" data = list( dict(q) for q in zip(*(list((k, x) for x in v) for k, v in result.columns.items())) ) with open(self.target, 'w') as f: f.write('\n'.join(json.dumps(x) for x in data))
[docs] def write(self, result): "Write data to file" if self.file_format == FileFormat.csv: self.write_csv(result) elif self.file_format == FileFormat.json: self.write_json(result) elif self.file_format == FileFormat.jsonl: self.write_jsonl(result) else: self.write_parquet(result)
[docs] def execute(self, context): try: sheet = self.load_worksheet() if self.use_first_row_as_header: # Extract the column names from the first row of the spreadsheet column_names = get_column_names(sheet, skip_rows=self.skip_rows) # Check unique columns check_column_names(column_names) else: column_names = None result = Result( self.table_name, sheet, self.types, column_names, self.nullable_int ) result.process(self.query) self.write(result) except Exception as e: raise AirflowException("FromXLSXQueryOperator error: {0}".format(str(e))) return True
class Result(object): def __init__(self, table_name, sheet, types, column_names=None, nullable_int=False): self.table_name = table_name self.sheet = sheet self.types = types self.column_names = column_names self.nullable_int = nullable_int def process_row(self, row): for i, name in enumerate(self.column_names): value = row[i] if isinstance(value, str): value = value.strip() value = prepare_value(name, value) self.columns[name].append(value) if self.datatypes[name] is None and value is not None: self.datatypes[name] = get_type(name, value, self.nullable_int) def process(self, query=None): detect_types = sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES with sqlite3.connect(':memory:', detect_types=detect_types) as conn: if self.column_names is not None: sql_columns = ','.join( [quoted(x) for x in [INDEX_COLUMN_NAME] + self.column_names] ) skip_rows = 1 else: sql_columns = ','.join( ( quoted(col_number_to_name(x)) for x in range(0, self.sheet.max_column + 1) ) ) skip_rows = 0 # Create table create_table_sql = 'create table {table}({columns})'.format( table=self.table_name, columns=sql_columns ) conn.execute(create_table_sql) # Insert data insert_sql = 'insert into {table}({columns}) values({values})'.format( table=self.table_name, columns=sql_columns, values=','.join(['?' for x in range(0, self.sheet.max_column + 1)]), ) conn.executemany( insert_sql, ( (i,) + x for i, x in enumerate(self.sheet.values, start=1) if i >= 1 + skip_rows ), ) # Query result = conn.execute(query) # Process result self.column_names = list(x[0].lower() for x in result.description) self.datatypes = dict( [(name, self.types.get(name)) for name in self.column_names] ) self.columns = dict([(name, []) for name in self.column_names]) for row in result: self.process_row(row) result.close()