non-inplace pydatavec transform processes (#8326)

* non inplace pydatavec tps

* pep8 linting

* logging and clean up

* update version
master
Max Pumperla 2019-10-30 15:59:54 +01:00 committed by GitHub
parent cea72582b8
commit ca881a987a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 82 additions and 34 deletions

View File

@ -17,7 +17,6 @@
import os import os
class Writable(object): class Writable(object):
def __init__(self, j_w): def __init__(self, j_w):
@ -77,4 +76,3 @@ class LocalExecutor(object):
data.add(rr.next()) data.add(rr.next())
processed_data = LocalTransformExecutor.execute(data, tp) processed_data = LocalTransformExecutor.execute(data, tp)
return Writable(processed_data) return Writable(processed_data)

View File

@ -16,6 +16,7 @@
import os import os
import logging
_JVM_RUNNING = False _JVM_RUNNING = False
@ -76,7 +77,7 @@ class SparkExecutor(object):
while(os.path.isdir(path)): while(os.path.isdir(path)):
tempid += 1 tempid += 1
path = 'temp_' + str(tempid) path = 'temp_' + str(tempid)
print('Converting pyspark RDD to JavaRDD...') logging.info('Converting pyspark RDD to JavaRDD...')
source.saveAsTextFile(path) source.saveAsTextFile(path)
string_data = self.spark_context.textFile(path) string_data = self.spark_context.textFile(path)
else: else:

View File

@ -75,17 +75,23 @@ try:
SparkConf = autoclass('org.apache.spark.SparkConf') SparkConf = autoclass('org.apache.spark.SparkConf')
SparkContext = autoclass('org.apache.spark.api.java.JavaSparkContext') SparkContext = autoclass('org.apache.spark.api.java.JavaSparkContext')
JavaRDD = autoclass('org.apache.spark.api.java.JavaRDD') JavaRDD = autoclass('org.apache.spark.api.java.JavaRDD')
SparkTransformExecutor = autoclass('org.datavec.spark.transform.SparkTransformExecutor') SparkTransformExecutor = autoclass(
StringToWritablesFunction = autoclass('org.datavec.spark.transform.misc.StringToWritablesFunction') 'org.datavec.spark.transform.SparkTransformExecutor')
WritablesToStringFunction = autoclass('org.datavec.spark.transform.misc.WritablesToStringFunction') StringToWritablesFunction = autoclass(
'org.datavec.spark.transform.misc.StringToWritablesFunction')
WritablesToStringFunction = autoclass(
'org.datavec.spark.transform.misc.WritablesToStringFunction')
spark_available = True spark_available = True
except: except:
spark_available = False spark_available = False
CSVRecordReader = autoclass('org.datavec.api.records.reader.impl.csv.CSVRecordReader') CSVRecordReader = autoclass(
CSVRecordWriter = autoclass('org.datavec.api.records.writer.impl.csv.CSVRecordWriter') 'org.datavec.api.records.reader.impl.csv.CSVRecordReader')
CSVRecordWriter = autoclass(
'org.datavec.api.records.writer.impl.csv.CSVRecordWriter')
LocalTransformExecutor = autoclass('org.datavec.local.transforms.LocalTransformExecutor') LocalTransformExecutor = autoclass(
'org.datavec.local.transforms.LocalTransformExecutor')
ChangeCaseStringTransform = autoclass( ChangeCaseStringTransform = autoclass(
'org.datavec.api.transform.transform.string.ChangeCaseStringTransform') 'org.datavec.api.transform.transform.string.ChangeCaseStringTransform')
@ -112,4 +118,5 @@ FileSplit = autoclass('org.datavec.api.split.FileSplit')
JFile = autoclass('java.io.File') JFile = autoclass('java.io.File')
ArrayList = autoclass('java.util.ArrayList') ArrayList = autoclass('java.util.ArrayList')
NumberOfRecordsPartitioner = autoclass('org.datavec.api.split.partition.NumberOfRecordsPartitioner') NumberOfRecordsPartitioner = autoclass(
'org.datavec.api.split.partition.NumberOfRecordsPartitioner')

View File

@ -19,6 +19,7 @@ from collections import OrderedDict
from .conditions import * from .conditions import *
from .schema import Schema from .schema import Schema
import warnings import warnings
import logging
def _dq(x): def _dq(x):
@ -47,11 +48,12 @@ def _dict_to_jmap(d, JMap):
class TransformProcess(object): class TransformProcess(object):
def __init__(self, schema): def __init__(self, schema, inplace=True):
self.schema = schema self.schema = schema
self.final_schema = schema.copy() self.final_schema = schema.copy()
self.steps = [] self.steps = []
self.executors = {} self.executors = {}
self.inplace = inplace
def add_step(self, step, *args): def add_step(self, step, *args):
self.steps.append((step,) + args) self.steps.append((step,) + args)
@ -70,34 +72,38 @@ class TransformProcess(object):
self.add_step("removeColumns", *columns) self.add_step("removeColumns", *columns)
for c in columns: for c in columns:
del self.final_schema.columns[c] del self.final_schema.columns[c]
if not self.inplace:
return self
def remove_columns_except(self, *columns): def remove_columns_except(self, *columns):
if len(columns) == 1: if len(columns) == 1:
columns = columns[0] columns = columns[0]
if type(columns) in (list, tuple): if type(columns) in (list, tuple):
self.add_step("removeAllColumnsExceptFor", *columns) self.add_step("removeAllColumnsExceptFor", *columns)
todel = [] to_del = []
for c in self.final_schema.columns: for c in self.final_schema.columns:
if c not in columns: if c not in columns:
todel.append(c) to_del.append(c)
for c in todel: for c in to_del:
del self.final_schema.columns[c] del self.final_schema.columns[c]
else: else:
self.add_step("removeAllColumnsExceptFor", columns) self.add_step("removeAllColumnsExceptFor", columns)
todel = [] to_del = []
for c in self.final_schema.columns: for c in self.final_schema.columns:
if c != columns: if c != columns:
todel.append(c) to_del.append(c)
for c in todel: for c in to_del:
del self.final_schema.columns[c] del self.final_schema.columns[c]
else: else:
self.add_step("removeAllColumnsExceptFor", *columns) self.add_step("removeAllColumnsExceptFor", *columns)
todel = [] to_del = []
for c in self.final_schema.columns: for c in self.final_schema.columns:
if c not in columns: if c not in columns:
todel.append(c) to_del.append(c)
for c in todel: for c in to_del:
del self.final_schema.columns[c] del self.final_schema.columns[c]
if not self.inplace:
return self
def filter(self, condition): def filter(self, condition):
col_name = condition.column col_name = condition.column
@ -112,6 +118,8 @@ class TransformProcess(object):
code = code.format(col_type, _dq(col_name), code = code.format(col_type, _dq(col_name),
condition.name, condition.value) condition.name, condition.value)
self.add_step("exec", code) self.add_step("exec", code)
if not self.inplace:
return self
def replace(self, column, value, condition): def replace(self, column, value, condition):
# there are 2 columns involved # there are 2 columns involved
@ -131,6 +139,8 @@ class TransformProcess(object):
code = code.format(_dq(column), column1_type, value, column2_type, _dq( code = code.format(_dq(column), column1_type, value, column2_type, _dq(
column2), condition.name, condition.value) column2), condition.name, condition.value)
self.add_step("exec", code) self.add_step("exec", code)
if not self.inplace:
return self
def rename_column(self, column, new_name): def rename_column(self, column, new_name):
new_d = OrderedDict() new_d = OrderedDict()
@ -142,11 +152,15 @@ class TransformProcess(object):
new_d[k] = old_d[k] new_d[k] = old_d[k]
self.final_schema.columns = new_d self.final_schema.columns = new_d
self.add_step("renameColumn", column, new_name) self.add_step("renameColumn", column, new_name)
if not self.inplace:
return self
def string_to_time(self, column, format="YYY-MM-DD HH:mm:ss.SSS", time_zone="UTC"): def string_to_time(self, column, format="YYY-MM-DD HH:mm:ss.SSS", time_zone="UTC"):
self.final_schema.columns[column][0] = "DateTime" self.final_schema.columns[column][0] = "DateTime"
self.add_step("exec", "stringToTimeTransform({}, {}, {})".format( self.add_step("exec", "stringToTimeTransform({}, {}, {})".format(
_dq(column), _dq(format), "DateTimeZone." + time_zone)) _dq(column), _dq(format), "DateTimeZone." + time_zone))
if not self.inplace:
return self
def derive_column_from_time(self, source_column, new_column, field): def derive_column_from_time(self, source_column, new_column, field):
code = 'transform(DeriveColumnsFromTimeTransformBuilder({}).addIntegerDerivedColumn({}, DateTimeFieldType.{}()).build())' code = 'transform(DeriveColumnsFromTimeTransformBuilder({}).addIntegerDerivedColumn({}, DateTimeFieldType.{}()).build())'
@ -154,6 +168,8 @@ class TransformProcess(object):
new_column), _to_camel(field)) new_column), _to_camel(field))
self.add_step("exec", code) self.add_step("exec", code)
self.final_schema.add_column("integer", new_column) self.final_schema.add_column("integer", new_column)
if not self.inplace:
return self
def categorical_to_integer(self, column): def categorical_to_integer(self, column):
if self.final_schema.columns[column][0] != 'categorical': if self.final_schema.columns[column][0] != 'categorical':
@ -161,12 +177,16 @@ class TransformProcess(object):
' transform on column \"{}\" because it is not a categorcal column.'.format(column)) ' transform on column \"{}\" because it is not a categorcal column.'.format(column))
self.final_schema.columns[column][0] = 'integer' self.final_schema.columns[column][0] = 'integer'
self.add_step('categoricalToInteger', column) self.add_step('categoricalToInteger', column)
if not self.inplace:
return self
def append_string(self, column, string): def append_string(self, column, string):
if self.final_schema.columns[column][0] != 'string': if self.final_schema.columns[column][0] != 'string':
raise Exception( raise Exception(
'Can not apply append_string transform to column {} because it is not a string column'.format(column)) 'Can not apply append_string transform to column {} because it is not a string column'.format(column))
self.add_step('appendStringColumnTransform', column, string) self.add_step('appendStringColumnTransform', column, string)
if not self.inplace:
return self
def lower(self, column): def lower(self, column):
if self.final_schema.columns[column][0] != 'string': if self.final_schema.columns[column][0] != 'string':
@ -174,6 +194,8 @@ class TransformProcess(object):
'Can not apply lower transform to column {} because it is not a string column'.format(column)) 'Can not apply lower transform to column {} because it is not a string column'.format(column))
self.add_step( self.add_step(
'exec', 'transform(ChangeCaseStringTransform({}, ChangeCaseStringTransformCaseType.LOWER))'.format(_dq(column))) 'exec', 'transform(ChangeCaseStringTransform({}, ChangeCaseStringTransformCaseType.LOWER))'.format(_dq(column)))
if not self.inplace:
return self
def upper(self, column): def upper(self, column):
if self.final_schema.columns[column][0] != 'string': if self.final_schema.columns[column][0] != 'string':
@ -181,6 +203,8 @@ class TransformProcess(object):
'Can not apply lower transform to column {} because it is not a string column'.format(column)) 'Can not apply lower transform to column {} because it is not a string column'.format(column))
self.add_step( self.add_step(
'exec', 'transform(ChangeCaseStringTransform({}, ChangeCaseStringTransformCaseType.UPPER))'.format(_dq(column))) 'exec', 'transform(ChangeCaseStringTransform({}, ChangeCaseStringTransformCaseType.UPPER))'.format(_dq(column)))
if not self.inplace:
return self
def concat(self, columns, new_column=None, delimiter=','): def concat(self, columns, new_column=None, delimiter=','):
for column in columns: for column in columns:
@ -196,6 +220,8 @@ class TransformProcess(object):
self.final_schema.add_string_column(new_column) self.final_schema.add_string_column(new_column)
self.add_step('exec', 'transform(ConcatenateStringColumns({}, {}, Arrays.asList({})))'.format( self.add_step('exec', 'transform(ConcatenateStringColumns({}, {}, Arrays.asList({})))'.format(
_dq(new_column), _dq(delimiter), ', '.join(columns))) _dq(new_column), _dq(delimiter), ', '.join(columns)))
if not self.inplace:
return self
def remove_white_spaces(self, column): def remove_white_spaces(self, column):
if self.final_schema.columns[column][0] != 'string': if self.final_schema.columns[column][0] != 'string':
@ -203,6 +229,8 @@ class TransformProcess(object):
'Can not apply remove_white_spaces transform to column {} because it is not a string column'.format(column)) 'Can not apply remove_white_spaces transform to column {} because it is not a string column'.format(column))
self.add_step( self.add_step(
'exec', 'transform(RemoveWhiteSpaceTransform({}))'.format(_dq(column))) 'exec', 'transform(RemoveWhiteSpaceTransform({}))'.format(_dq(column)))
if not self.inplace:
return self
def replace_empty_string(self, column, value): def replace_empty_string(self, column, value):
if self.final_schema.columns[column][0] != 'string': if self.final_schema.columns[column][0] != 'string':
@ -210,6 +238,8 @@ class TransformProcess(object):
'Can not apply replace_empty_string transform to column {} because it is not a string column'.format(column)) 'Can not apply replace_empty_string transform to column {} because it is not a string column'.format(column))
self.add_step('exec', 'transform(ReplaceEmptyStringTransform({}, {}))'.format( self.add_step('exec', 'transform(ReplaceEmptyStringTransform({}, {}))'.format(
_dq(column), _dq(value))) _dq(column), _dq(value)))
if not self.inplace:
return self
def replace_string(self, column, *args): def replace_string(self, column, *args):
if self.final_schema.columns[column][0] != 'string': if self.final_schema.columns[column][0] != 'string':
@ -228,6 +258,8 @@ class TransformProcess(object):
'Invalid argument. Possible signatures are replace(str, str, str) and replace(str, dict)') 'Invalid argument. Possible signatures are replace(str, str, str) and replace(str, dict)')
self.add_step('exec', 'transform(ReplaceStringTransform({}, _dict_to_jmap({}, JMap)))'.format( self.add_step('exec', 'transform(ReplaceStringTransform({}, _dict_to_jmap({}, JMap)))'.format(
_dq(column), str(args))) _dq(column), str(args)))
if not self.inplace:
return self
def map_string(self, column, mapping): def map_string(self, column, mapping):
if self.final_schema.columns[column][0] != 'string': if self.final_schema.columns[column][0] != 'string':
@ -235,6 +267,8 @@ class TransformProcess(object):
'Can not apply replace_string transform to column {} because it is not a string column'.format(column)) 'Can not apply replace_string transform to column {} because it is not a string column'.format(column))
self.add_step('exec', 'transform(StringMapTransform({}, _dict_to_jmap({}, JMap)))'.format( self.add_step('exec', 'transform(StringMapTransform({}, _dict_to_jmap({}, JMap)))'.format(
_dq(column), str(mapping))) _dq(column), str(mapping)))
if not self.inplace:
return self
def one_hot(self, column): def one_hot(self, column):
if self.final_schema.columns[column][0] != 'categorical': if self.final_schema.columns[column][0] != 'categorical':
@ -251,6 +285,8 @@ class TransformProcess(object):
new_schema[k] = self.final_schema.columns[k] new_schema[k] = self.final_schema.columns[k]
self.final_schema.columns = new_schema self.final_schema.columns = new_schema
self.add_step('categoricalToOneHot', column) self.add_step('categoricalToOneHot', column)
if not self.inplace:
return self
def reduce(self, key, *args, **kwargs): def reduce(self, key, *args, **kwargs):
# possible signatures: # possible signatures:
@ -328,6 +364,8 @@ class TransformProcess(object):
new_type = reduction_to_type.get(reduction, old_type) new_type = reduction_to_type.get(reduction, old_type)
new_schema[k] = [new_type, new_name] new_schema[k] = [new_type, new_name]
self.final_schema.columns = new_schema self.final_schema.columns = new_schema
if not self.inplace:
return self
def serialize(self): def serialize(self):
config = {'steps': self.steps, 'schema': self.schema.serialize()} config = {'steps': self.steps, 'schema': self.schema.serialize()}
@ -375,7 +413,7 @@ class TransformProcess(object):
for step in self.steps: for step in self.steps:
if step[0] == "exec": if step[0] == "exec":
code = step[1] code = step[1]
print(code) logging.info(code)
exec("builder." + code) exec("builder." + code)
else: else:
f = getattr(builder, step[0]) f = getattr(builder, step[0])
@ -389,7 +427,8 @@ class TransformProcess(object):
if executor == 'spark': if executor == 'spark':
from .java_classes import spark_available from .java_classes import spark_available
if not spark_available: if not spark_available:
warnings.warn('Spark not available. Running local executor instead.') warnings.warn(
'Spark not available. Running local executor instead.')
from .executors import LocalExecutor from .executors import LocalExecutor
executor = LocalExecutor() executor = LocalExecutor()
self.executors['local'] = executor self.executors['local'] = executor

View File

@ -20,7 +20,8 @@ import requests
import sys import sys
import time import time
import math import math
import logging
import warnings
def _mean(x): def _mean(x):
s = float(sum(x)) s = float(sum(x))
@ -142,7 +143,6 @@ class ProgressBar(object):
def download_file(url, file_name): def download_file(url, file_name):
#u = urlopen(url)
r = requests.get(url, stream=True) r = requests.get(url, stream=True)
file_size = int(r.headers['Content-length']) file_size = int(r.headers['Content-length'])
''' '''
@ -157,15 +157,15 @@ def download_file(url, file_name):
if local_file_size == file_size: if local_file_size == file_size:
file_exists = True file_exists = True
else: else:
print("File corrupt. Downloading again.") warnings.warn("File corrupt. Downloading again.")
os.remove(file_name) os.remove(file_name)
if not file_exists: if not file_exists:
factor = int(math.floor(math.log(file_size)/math.log(1024))) factor = int(math.floor(math.log(file_size)/math.log(1024)))
display_file_size = str(file_size / 1024 ** factor) + \ display_file_size = str(file_size / 1024 ** factor) + \
['B', 'KB', 'MB', 'GB', 'TB', 'PB'][factor] ['B', 'KB', 'MB', 'GB', 'TB', 'PB'][factor]
print("Source: " + url) logging.info("Source: " + url)
print("Destination " + file_name) logging.info("Destination " + file_name)
print("Size: " + display_file_size) logging.info("Size: " + display_file_size)
file_size_dl = 0 file_size_dl = 0
block_sz = 8192 block_sz = 8192
f = open(file_name, 'wb') f = open(file_name, 'wb')
@ -182,5 +182,5 @@ def download_file(url, file_name):
# print(status) # print(status)
f.close() f.close()
else: else:
print("File already exists - " + file_name) logging.info("File already exists - " + file_name)
return True return True

View File

@ -22,7 +22,7 @@ from setuptools import find_packages
setup(name='pydatavec', setup(name='pydatavec',
version='0.1', version='0.1.1',
description='Python interface for DataVec', description='Python interface for DataVec',
long_description='Python interface for DataVec', long_description='Python interface for DataVec',

View File

@ -63,7 +63,8 @@ def test_reduce_3():
def test_reduce_4(): def test_reduce_4():
reductions = ['first', 'last', 'append', 'prepend', 'count', 'count_unique'] reductions = ['first', 'last', 'append',
'prepend', 'count', 'count_unique']
for red in reductions: for red in reductions:
schema = Schema() schema = Schema()
schema.add_string_column('col1') schema.add_string_column('col1')
@ -76,7 +77,8 @@ def test_reduce_4():
def test_reduce_5(): def test_reduce_5():
reductions = ['first', 'last', 'append', 'prepend', 'count', 'count_unique'] reductions = ['first', 'last', 'append',
'prepend', 'count', 'count_unique']
for red1 in reductions: for red1 in reductions:
for red2 in reductions: for red2 in reductions:
schema = Schema() schema = Schema()
@ -90,7 +92,8 @@ def test_reduce_5():
def test_reduce_6(): def test_reduce_6():
reductions = ['first', 'last', 'append', 'prepend', 'count', 'count_unique'] reductions = ['first', 'last', 'append',
'prepend', 'count', 'count_unique']
for red1 in reductions: for red1 in reductions:
for red2 in reductions: for red2 in reductions:
schema = Schema() schema = Schema()