From ca881a987a63fc59a636107dde7ab99241166872 Mon Sep 17 00:00:00 2001 From: Max Pumperla Date: Wed, 30 Oct 2019 15:59:54 +0100 Subject: [PATCH] non-inplace pydatavec transform processes (#8326) * non inplace pydatavec tps * pep8 linting * logging and clean up * update version --- pydatavec/pydatavec/executors/local.py | 2 - pydatavec/pydatavec/executors/spark.py | 3 +- pydatavec/pydatavec/java_classes.py | 21 +++++--- pydatavec/pydatavec/transform_process.py | 63 +++++++++++++++++++----- pydatavec/pydatavec/utils.py | 14 +++--- pydatavec/setup.py | 2 +- pydatavec/tests/test_reduce.py | 11 +++-- 7 files changed, 82 insertions(+), 34 deletions(-) diff --git a/pydatavec/pydatavec/executors/local.py b/pydatavec/pydatavec/executors/local.py index 267955b38..344c0c60b 100644 --- a/pydatavec/pydatavec/executors/local.py +++ b/pydatavec/pydatavec/executors/local.py @@ -17,7 +17,6 @@ import os - class Writable(object): def __init__(self, j_w): @@ -77,4 +76,3 @@ class LocalExecutor(object): data.add(rr.next()) processed_data = LocalTransformExecutor.execute(data, tp) return Writable(processed_data) - diff --git a/pydatavec/pydatavec/executors/spark.py b/pydatavec/pydatavec/executors/spark.py index 63fb7c42e..db61a7121 100644 --- a/pydatavec/pydatavec/executors/spark.py +++ b/pydatavec/pydatavec/executors/spark.py @@ -16,6 +16,7 @@ import os +import logging _JVM_RUNNING = False @@ -76,7 +77,7 @@ class SparkExecutor(object): while(os.path.isdir(path)): tempid += 1 path = 'temp_' + str(tempid) - print('Converting pyspark RDD to JavaRDD...') + logging.info('Converting pyspark RDD to JavaRDD...') source.saveAsTextFile(path) string_data = self.spark_context.textFile(path) else: diff --git a/pydatavec/pydatavec/java_classes.py b/pydatavec/pydatavec/java_classes.py index f62ce67a1..4031c9257 100644 --- a/pydatavec/pydatavec/java_classes.py +++ b/pydatavec/pydatavec/java_classes.py @@ -75,17 +75,23 @@ try: SparkConf = autoclass('org.apache.spark.SparkConf') SparkContext = autoclass('org.apache.spark.api.java.JavaSparkContext') JavaRDD = autoclass('org.apache.spark.api.java.JavaRDD') - SparkTransformExecutor = autoclass('org.datavec.spark.transform.SparkTransformExecutor') - StringToWritablesFunction = autoclass('org.datavec.spark.transform.misc.StringToWritablesFunction') - WritablesToStringFunction = autoclass('org.datavec.spark.transform.misc.WritablesToStringFunction') + SparkTransformExecutor = autoclass( + 'org.datavec.spark.transform.SparkTransformExecutor') + StringToWritablesFunction = autoclass( + 'org.datavec.spark.transform.misc.StringToWritablesFunction') + WritablesToStringFunction = autoclass( + 'org.datavec.spark.transform.misc.WritablesToStringFunction') spark_available = True except: spark_available = False -CSVRecordReader = autoclass('org.datavec.api.records.reader.impl.csv.CSVRecordReader') -CSVRecordWriter = autoclass('org.datavec.api.records.writer.impl.csv.CSVRecordWriter') +CSVRecordReader = autoclass( + 'org.datavec.api.records.reader.impl.csv.CSVRecordReader') +CSVRecordWriter = autoclass( + 'org.datavec.api.records.writer.impl.csv.CSVRecordWriter') -LocalTransformExecutor = autoclass('org.datavec.local.transforms.LocalTransformExecutor') +LocalTransformExecutor = autoclass( + 'org.datavec.local.transforms.LocalTransformExecutor') ChangeCaseStringTransform = autoclass( 'org.datavec.api.transform.transform.string.ChangeCaseStringTransform') @@ -112,4 +118,5 @@ FileSplit = autoclass('org.datavec.api.split.FileSplit') JFile = autoclass('java.io.File') ArrayList = autoclass('java.util.ArrayList') -NumberOfRecordsPartitioner = autoclass('org.datavec.api.split.partition.NumberOfRecordsPartitioner') +NumberOfRecordsPartitioner = autoclass( + 'org.datavec.api.split.partition.NumberOfRecordsPartitioner') diff --git a/pydatavec/pydatavec/transform_process.py b/pydatavec/pydatavec/transform_process.py index e11baaa89..041bb6bc4 100644 --- a/pydatavec/pydatavec/transform_process.py +++ b/pydatavec/pydatavec/transform_process.py @@ -19,6 +19,7 @@ from collections import OrderedDict from .conditions import * from .schema import Schema import warnings +import logging def _dq(x): @@ -47,11 +48,12 @@ def _dict_to_jmap(d, JMap): class TransformProcess(object): - def __init__(self, schema): + def __init__(self, schema, inplace=True): self.schema = schema self.final_schema = schema.copy() self.steps = [] self.executors = {} + self.inplace = inplace def add_step(self, step, *args): self.steps.append((step,) + args) @@ -70,34 +72,38 @@ class TransformProcess(object): self.add_step("removeColumns", *columns) for c in columns: del self.final_schema.columns[c] + if not self.inplace: + return self def remove_columns_except(self, *columns): if len(columns) == 1: columns = columns[0] if type(columns) in (list, tuple): self.add_step("removeAllColumnsExceptFor", *columns) - todel = [] + to_del = [] for c in self.final_schema.columns: if c not in columns: - todel.append(c) - for c in todel: + to_del.append(c) + for c in to_del: del self.final_schema.columns[c] else: self.add_step("removeAllColumnsExceptFor", columns) - todel = [] + to_del = [] for c in self.final_schema.columns: if c != columns: - todel.append(c) - for c in todel: + to_del.append(c) + for c in to_del: del self.final_schema.columns[c] else: self.add_step("removeAllColumnsExceptFor", *columns) - todel = [] + to_del = [] for c in self.final_schema.columns: if c not in columns: - todel.append(c) - for c in todel: + to_del.append(c) + for c in to_del: del self.final_schema.columns[c] + if not self.inplace: + return self def filter(self, condition): col_name = condition.column @@ -112,6 +118,8 @@ class TransformProcess(object): code = code.format(col_type, _dq(col_name), condition.name, condition.value) self.add_step("exec", code) + if not self.inplace: + return self def replace(self, column, value, condition): # there are 2 columns involved @@ -131,6 +139,8 @@ class TransformProcess(object): code = code.format(_dq(column), column1_type, value, column2_type, _dq( column2), condition.name, condition.value) self.add_step("exec", code) + if not self.inplace: + return self def rename_column(self, column, new_name): new_d = OrderedDict() @@ -142,11 +152,15 @@ class TransformProcess(object): new_d[k] = old_d[k] self.final_schema.columns = new_d self.add_step("renameColumn", column, new_name) + if not self.inplace: + return self def string_to_time(self, column, format="YYY-MM-DD HH:mm:ss.SSS", time_zone="UTC"): self.final_schema.columns[column][0] = "DateTime" self.add_step("exec", "stringToTimeTransform({}, {}, {})".format( _dq(column), _dq(format), "DateTimeZone." + time_zone)) + if not self.inplace: + return self def derive_column_from_time(self, source_column, new_column, field): code = 'transform(DeriveColumnsFromTimeTransformBuilder({}).addIntegerDerivedColumn({}, DateTimeFieldType.{}()).build())' @@ -154,6 +168,8 @@ class TransformProcess(object): new_column), _to_camel(field)) self.add_step("exec", code) self.final_schema.add_column("integer", new_column) + if not self.inplace: + return self def categorical_to_integer(self, column): if self.final_schema.columns[column][0] != 'categorical': @@ -161,12 +177,16 @@ class TransformProcess(object): ' transform on column \"{}\" because it is not a categorcal column.'.format(column)) self.final_schema.columns[column][0] = 'integer' self.add_step('categoricalToInteger', column) + if not self.inplace: + return self def append_string(self, column, string): if self.final_schema.columns[column][0] != 'string': raise Exception( 'Can not apply append_string transform to column {} because it is not a string column'.format(column)) self.add_step('appendStringColumnTransform', column, string) + if not self.inplace: + return self def lower(self, column): if self.final_schema.columns[column][0] != 'string': @@ -174,6 +194,8 @@ class TransformProcess(object): 'Can not apply lower transform to column {} because it is not a string column'.format(column)) self.add_step( 'exec', 'transform(ChangeCaseStringTransform({}, ChangeCaseStringTransformCaseType.LOWER))'.format(_dq(column))) + if not self.inplace: + return self def upper(self, column): if self.final_schema.columns[column][0] != 'string': @@ -181,6 +203,8 @@ class TransformProcess(object): 'Can not apply lower transform to column {} because it is not a string column'.format(column)) self.add_step( 'exec', 'transform(ChangeCaseStringTransform({}, ChangeCaseStringTransformCaseType.UPPER))'.format(_dq(column))) + if not self.inplace: + return self def concat(self, columns, new_column=None, delimiter=','): for column in columns: @@ -196,6 +220,8 @@ class TransformProcess(object): self.final_schema.add_string_column(new_column) self.add_step('exec', 'transform(ConcatenateStringColumns({}, {}, Arrays.asList({})))'.format( _dq(new_column), _dq(delimiter), ', '.join(columns))) + if not self.inplace: + return self def remove_white_spaces(self, column): if self.final_schema.columns[column][0] != 'string': @@ -203,6 +229,8 @@ class TransformProcess(object): 'Can not apply remove_white_spaces transform to column {} because it is not a string column'.format(column)) self.add_step( 'exec', 'transform(RemoveWhiteSpaceTransform({}))'.format(_dq(column))) + if not self.inplace: + return self def replace_empty_string(self, column, value): if self.final_schema.columns[column][0] != 'string': @@ -210,6 +238,8 @@ class TransformProcess(object): 'Can not apply replace_empty_string transform to column {} because it is not a string column'.format(column)) self.add_step('exec', 'transform(ReplaceEmptyStringTransform({}, {}))'.format( _dq(column), _dq(value))) + if not self.inplace: + return self def replace_string(self, column, *args): if self.final_schema.columns[column][0] != 'string': @@ -228,6 +258,8 @@ class TransformProcess(object): 'Invalid argument. Possible signatures are replace(str, str, str) and replace(str, dict)') self.add_step('exec', 'transform(ReplaceStringTransform({}, _dict_to_jmap({}, JMap)))'.format( _dq(column), str(args))) + if not self.inplace: + return self def map_string(self, column, mapping): if self.final_schema.columns[column][0] != 'string': @@ -235,6 +267,8 @@ class TransformProcess(object): 'Can not apply replace_string transform to column {} because it is not a string column'.format(column)) self.add_step('exec', 'transform(StringMapTransform({}, _dict_to_jmap({}, JMap)))'.format( _dq(column), str(mapping))) + if not self.inplace: + return self def one_hot(self, column): if self.final_schema.columns[column][0] != 'categorical': @@ -251,6 +285,8 @@ class TransformProcess(object): new_schema[k] = self.final_schema.columns[k] self.final_schema.columns = new_schema self.add_step('categoricalToOneHot', column) + if not self.inplace: + return self def reduce(self, key, *args, **kwargs): # possible signatures: @@ -328,6 +364,8 @@ class TransformProcess(object): new_type = reduction_to_type.get(reduction, old_type) new_schema[k] = [new_type, new_name] self.final_schema.columns = new_schema + if not self.inplace: + return self def serialize(self): config = {'steps': self.steps, 'schema': self.schema.serialize()} @@ -375,7 +413,7 @@ class TransformProcess(object): for step in self.steps: if step[0] == "exec": code = step[1] - print(code) + logging.info(code) exec("builder." + code) else: f = getattr(builder, step[0]) @@ -389,7 +427,8 @@ class TransformProcess(object): if executor == 'spark': from .java_classes import spark_available if not spark_available: - warnings.warn('Spark not available. Running local executor instead.') + warnings.warn( + 'Spark not available. Running local executor instead.') from .executors import LocalExecutor executor = LocalExecutor() self.executors['local'] = executor diff --git a/pydatavec/pydatavec/utils.py b/pydatavec/pydatavec/utils.py index 1a26825a3..ec84168d6 100644 --- a/pydatavec/pydatavec/utils.py +++ b/pydatavec/pydatavec/utils.py @@ -20,7 +20,8 @@ import requests import sys import time import math - +import logging +import warnings def _mean(x): s = float(sum(x)) @@ -142,7 +143,6 @@ class ProgressBar(object): def download_file(url, file_name): - #u = urlopen(url) r = requests.get(url, stream=True) file_size = int(r.headers['Content-length']) ''' @@ -157,15 +157,15 @@ def download_file(url, file_name): if local_file_size == file_size: file_exists = True else: - print("File corrupt. Downloading again.") + warnings.warn("File corrupt. Downloading again.") os.remove(file_name) if not file_exists: factor = int(math.floor(math.log(file_size)/math.log(1024))) display_file_size = str(file_size / 1024 ** factor) + \ ['B', 'KB', 'MB', 'GB', 'TB', 'PB'][factor] - print("Source: " + url) - print("Destination " + file_name) - print("Size: " + display_file_size) + logging.info("Source: " + url) + logging.info("Destination " + file_name) + logging.info("Size: " + display_file_size) file_size_dl = 0 block_sz = 8192 f = open(file_name, 'wb') @@ -182,5 +182,5 @@ def download_file(url, file_name): # print(status) f.close() else: - print("File already exists - " + file_name) + logging.info("File already exists - " + file_name) return True diff --git a/pydatavec/setup.py b/pydatavec/setup.py index 6e69cad5e..cfe14da33 100644 --- a/pydatavec/setup.py +++ b/pydatavec/setup.py @@ -22,7 +22,7 @@ from setuptools import find_packages setup(name='pydatavec', - version='0.1', + version='0.1.1', description='Python interface for DataVec', long_description='Python interface for DataVec', diff --git a/pydatavec/tests/test_reduce.py b/pydatavec/tests/test_reduce.py index e29511651..145cbe902 100644 --- a/pydatavec/tests/test_reduce.py +++ b/pydatavec/tests/test_reduce.py @@ -63,7 +63,8 @@ def test_reduce_3(): def test_reduce_4(): - reductions = ['first', 'last', 'append', 'prepend', 'count', 'count_unique'] + reductions = ['first', 'last', 'append', + 'prepend', 'count', 'count_unique'] for red in reductions: schema = Schema() schema.add_string_column('col1') @@ -76,7 +77,8 @@ def test_reduce_4(): def test_reduce_5(): - reductions = ['first', 'last', 'append', 'prepend', 'count', 'count_unique'] + reductions = ['first', 'last', 'append', + 'prepend', 'count', 'count_unique'] for red1 in reductions: for red2 in reductions: schema = Schema() @@ -90,7 +92,8 @@ def test_reduce_5(): def test_reduce_6(): - reductions = ['first', 'last', 'append', 'prepend', 'count', 'count_unique'] + reductions = ['first', 'last', 'append', + 'prepend', 'count', 'count_unique'] for red1 in reductions: for red2 in reductions: schema = Schema() @@ -105,4 +108,4 @@ def test_reduce_6(): if __name__ == '__main__': - pytest.main([__file__]) + pytest.main([__file__])