More pre-release fixes (#456)

* - numPrefixBlocks fix for threshold_encoding
- temparrays pointers fixed

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* auto configuration of memory workspace for gradients sharing

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* limit sparse encoding message size

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* one more workspace test

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* one more CUDA-specific test

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* one more CUDA-specific workspace test

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* one more CUDA-specific workspace test

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* one more CUDA-specific workspace test

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* add separate host/device reset for circular workspace mode

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* new PW builder method for encoder memory amount

Signed-off-by: raver119@gmail.com <raver119@gmail.com>

* "inplace" execution for threshold encoding

Signed-off-by: raver119@gmail.com <raver119@gmail.com>
master
raver119 2020-05-13 08:12:07 +03:00 committed by GitHub
parent f547f783d9
commit c396fcb960
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 264 additions and 10 deletions

View File

@ -20,6 +20,8 @@ import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.deeplearning4j.core.storage.StatsStorageRouter;
import org.deeplearning4j.core.storage.listener.RoutingIterationListener;
import org.deeplearning4j.optimize.solvers.accumulation.EncodingHandler;
import org.deeplearning4j.optimize.solvers.accumulation.encoding.threshold.AdaptiveThresholdAlgorithm;
import org.nd4j.linalg.dataset.AsyncDataSetIterator;;
import org.nd4j.linalg.dataset.AsyncMultiDataSetIterator;
import org.deeplearning4j.datasets.iterator.DummyBlockDataSetIterator;
@ -688,6 +690,7 @@ public class ParallelWrapper implements AutoCloseable {
protected Supplier<INDArray> updaterParamsSupplier;
protected ThresholdAlgorithm thresholdAlgorithm;
protected ResidualPostProcessor residualPostProcessor;
protected Long encoderMemory = -1L;
protected GradientsAccumulator accumulator;
@ -872,6 +875,19 @@ public class ParallelWrapper implements AutoCloseable {
return this;
}
/**
* This method allows to define amount of temporary memory that will be used for gradients sharing.
* Typically it's safe to keep default value.
*
* Default value: -1, amount of temporary memory will be calculated automatically
* @param numBytes number of bytes to be used
* @return
*/
public Builder temporaryMemory(@NonNull Long numBytes) {
this.encoderMemory = numBytes;
return this;
}
/**
* Set the residual post processor algorithm. Not used for single machine training (only for PW used in a
* distributed setting), and should not be set by users in most cases.
@ -907,11 +923,23 @@ public class ParallelWrapper implements AutoCloseable {
}
break;
case SHARED_GRADIENTS: {
Preconditions.checkState(thresholdAlgorithm != null, "Cannot use SHARED_GRADIENTS training mode without setting a threshold algorithm");
if (thresholdAlgorithm == null)
thresholdAlgorithm = new AdaptiveThresholdAlgorithm();
this.trainerContext = new SymmetricTrainerContext();
if (this.accumulator == null) {
log.info("Creating new GradientsAccumulator instance with default threshold of [5e-4]");
this.accumulator = new EncodedGradientsAccumulator(workers, thresholdAlgorithm, residualPostProcessor, false);
val numParams = model.numParams();
// we're limiting max size of updates for Sparse encoding to the size of bitmap encoded message
val maxUpdate = (int) (numParams / 16 + 5);
// memory sie in number of bytes
long memorySize = encoderMemory == null || encoderMemory < 0
? maxUpdate * 4 * (workers + 3)
: encoderMemory;
this.accumulator = new EncodedGradientsAccumulator(workers, new EncodingHandler(thresholdAlgorithm, residualPostProcessor, maxUpdate, false), memorySize, workers + 2, Integer.MAX_VALUE, false);
}
}
break;

View File

@ -163,14 +163,13 @@ namespace sd {
// here we just calculate number of sumBlock arrays
do {
int numPrefixBlocks = sd::math::nd4j_max<int>(1, sd::math::nd4j_ceil<float, int>((float) numElts / (2.0f * prefixThreads)));
if (numBlocks > 1) {
if (numPrefixBlocks > 1) {
level++;
}
numElts = numPrefixBlocks;
} while (numElts > 1);
std::vector<NDArray> tempArrays(level);
std::vector<Nd4jPointer> pointers(level);
@ -181,13 +180,13 @@ namespace sd {
int numPrefixBlocks = sd::math::nd4j_max<int>(1, sd::math::nd4j_ceil<float, int>((float) numElts / (2.0f * prefixThreads)));
if (numPrefixBlocks > 1) {
tempArrays[level] = std::move(NDArrayFactory::create<int>('c', {numPrefixBlocks}));
pointers[level] = tempArrays[level++].specialBuffer();
pointers[level] = tempArrays[level].specialBuffer();;
level++;
}
numElts = numPrefixBlocks;
} while (numElts > 1);
PointersManager pm(LaunchContext::defaultContext(), "thresholdEncode");
auto dptr = pm.replicatePointer(pointers.data(), pointers.size() * 8);
auto offsets = NDArrayFactory::create<int>('c', {numBlocks});
// we want to check, if we're hiting external limit on number of encoded elements
@ -200,7 +199,7 @@ namespace sd {
NDArray::prepareSpecialUse({}, {&encoded, &updates});
// filling offsets
encodeThresholdP2Int_(reinterpret_cast<void **>(dptr),
encodeThresholdP2Int_(reinterpret_cast<void **>(pointers.data()),
reinterpret_cast<int*>(blocks.specialBuffer()),
numBlocks,
reinterpret_cast<int*>(offsets.specialBuffer()));

View File

@ -228,6 +228,53 @@ TEST_F(DeclarableOpsTests19, test_threshold_encode_decode) {
ASSERT_EQ(exp, initial);
}
TEST_F(DeclarableOpsTests19, test_threshold_encode_decode_2) {
// [2,1,135079944,1,1,8192,1,99]
auto initial = NDArrayFactory::create<float>('c', {1, 135079944});
initial = 1.0f;
auto exp = initial.dup();
auto neg = initial.like();
neg = 0.5f;
sd::ops::encode_threshold enc;
auto enc_result = enc.evaluate({&initial}, {0.5f});
auto encoded = enc_result.at(1);
ASSERT_EQ(135079944 + 4, encoded->lengthOf());
ASSERT_NE(exp, initial);
/*
for (int e = 0; e < initial.lengthOf(); e++) {
auto f = initial.e<float>(e);
if (f != 0.5f) {
nd4j_printf("initial[%i] = %f\n", e, f);
throw std::runtime_error("");
}
}
*/
ASSERT_EQ(neg, initial);
// checking equality of all encoded bits
//for (int e = 5; e < encoded->lengthOf() - 1; e++) {
//if (encoded->e<int>(e) != encoded->e<int>(e - 1) + 1)
//nd4j_printf("Non equal encoded values at E[%i]: %i;\n", e, encoded->e<int>(e));
//}
sd::ops::decode_threshold dec;
auto status = dec.execute({&initial, encoded}, {&initial});
ASSERT_EQ(Status::OK(), status);
// checking equality of all dedoded bits
/*
for (int e = 0; e < initial.lengthOf(); e++) {
auto f = initial.e<float>(e);
if (f != 1.0f)
nd4j_printf("initial[%i] = %f\n", e, f);
}
*/
ASSERT_EQ(exp, initial);
}
TEST_F(DeclarableOpsTests19, test_matmul_ccc) {
auto x = NDArrayFactory::create<float>('c', {10, 10});

View File

@ -41,6 +41,12 @@ public class EncodeThreshold extends DynamicCustomOp {
this(updates, threshold, Integer.MAX_VALUE);
}
public EncodeThreshold(@NonNull INDArray updates, @NonNull INDArray encoded, float threshold, @NonNull Integer boundary) {
this(updates, threshold, boundary);
addOutputArgument(updates, encoded);
}
public EncodeThreshold(@NonNull INDArray updates, float threshold, @NonNull Integer boundary) {
addInputArgument(updates);

View File

@ -692,9 +692,33 @@ public abstract class DefaultOpExecutioner implements OpExecutioner {
return thresholdEncode(input, threshold, Integer.MAX_VALUE);
}
private long _length(long[] shape) {
// scalar case
if (shape.length == 0)
return 1;
else if (shape.length == 1)
return shape[0];
else {
long length = 1;
for (int e = 0; e < shape.length; e++)
length *= shape[e];
return length;
}
}
@Override
public INDArray thresholdEncode(INDArray input, double threshold, Integer boundary) {
val result = Nd4j.exec(new EncodeThreshold(input, (float) threshold, boundary))[1];
val op_shape = new EncodeThreshold(input, (float) threshold, boundary);
val shapes = Nd4j.getExecutioner().calculateOutputShape(op_shape);
if (_length(shapes.get(1).getShape()) < 2)
return null;
val result = Nd4j.create(DataType.INT32, shapes.get(1).getShape());
op_shape.addOutputArgument(input, result);
Nd4j.exec(op_shape);
return result.getInt(0) > 0 ? result : null;
}

View File

@ -191,7 +191,7 @@ public class CudaWorkspace extends Nd4jWorkspace {
// spill
if (workspaceConfiguration.getPolicyReset() == ResetPolicy.ENDOFBUFFER_REACHED && currentSize.get() > 0 && !trimmer && Nd4j.getWorkspaceManager().getDebugMode() != DebugMode.SPILL_EVERYTHING) {
//log.info("End of space reached. Current offset: {}; requiredMemory: {}", deviceOffset.get(), requiredMemory);
reset();
deviceOffset.set(0);
resetPlanned.set(true);
return alloc(requiredMemory, kind, type, initialize);
}
@ -204,7 +204,6 @@ public class CudaWorkspace extends Nd4jWorkspace {
if (isDebug.get()) {
log.info("Workspace [{}] device_{}: spilled DEVICE array of {} bytes, capacity of {} elements", id, Nd4j.getAffinityManager().getDeviceForCurrentThread(), requiredMemory, numElements);
}
//Nd4j.getWorkspaceManager().printAllocationStatisticsForCurrentThread();
val shape = new AllocationShape(requiredMemory / Nd4j.sizeOfDataType(type), Nd4j.sizeOfDataType(type), type);
@ -258,6 +257,12 @@ public class CudaWorkspace extends Nd4jWorkspace {
return ptr;
} else {
// log.info("Spilled HOST array of {} bytes, capacity of {} elements", requiredMemory, numElements);
if (workspaceConfiguration.getPolicyReset() == ResetPolicy.ENDOFBUFFER_REACHED && currentSize.get() > 0 && !trimmer && Nd4j.getWorkspaceManager().getDebugMode() != DebugMode.SPILL_EVERYTHING) {
//log.info("End of space reached. Current offset: {}; requiredMemory: {}", deviceOffset.get(), requiredMemory);
hostOffset.set(0);
//resetPlanned.set(true);
return alloc(requiredMemory, kind, type, initialize);
}
val shape = new AllocationShape(requiredMemory / Nd4j.sizeOfDataType(type), Nd4j.sizeOfDataType(type), type);

View File

@ -0,0 +1,105 @@
/*******************************************************************************
* Copyright (c) 2020 Konduit K.k.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/
package org.nd4j.jita.workspace;
import lombok.val;
import org.junit.Test;
import org.nd4j.linalg.api.buffer.DataType;
import org.nd4j.linalg.api.concurrency.AffinityManager;
import org.nd4j.linalg.api.memory.conf.WorkspaceConfiguration;
import org.nd4j.linalg.api.memory.enums.AllocationPolicy;
import org.nd4j.linalg.api.memory.enums.LearningPolicy;
import org.nd4j.linalg.api.memory.enums.ResetPolicy;
import org.nd4j.linalg.api.memory.enums.SpillPolicy;
import org.nd4j.linalg.factory.Nd4j;
import static org.junit.Assert.*;
public class CudaWorkspaceTest {
@Test
public void testCircularWorkspaceAsymmetry_1() {
// circular workspace mode
val configuration = WorkspaceConfiguration.builder().initialSize(10 * 1024 * 1024)
.policyReset(ResetPolicy.ENDOFBUFFER_REACHED).policyAllocation(AllocationPolicy.STRICT)
.policySpill(SpillPolicy.FAIL).policyLearning(LearningPolicy.NONE).build();
try (val ws = (CudaWorkspace) Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) {
val array = Nd4j.create(DataType.FLOAT, 10, 10);
assertEquals(0, ws.getHostOffset());
assertNotEquals(0, ws.getDeviceOffset());
// we expect that this array has no data/buffer on HOST side
assertEquals(AffinityManager.Location.DEVICE, Nd4j.getAffinityManager().getActiveLocation(array));
// since this array doesn't have HOST buffer - it will allocate one now
array.getDouble(3L);
assertEquals(ws.getHostOffset(), ws.getDeviceOffset());
}
try (val ws = (CudaWorkspace) Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) {
assertEquals(ws.getHostOffset(), ws.getDeviceOffset());
}
Nd4j.getWorkspaceManager().destroyAllWorkspacesForCurrentThread();
}
@Test
public void testCircularWorkspaceAsymmetry_2() {
// circular workspace mode
val configuration = WorkspaceConfiguration.builder().initialSize(10 * 1024 * 1024)
.policyReset(ResetPolicy.ENDOFBUFFER_REACHED).policyAllocation(AllocationPolicy.STRICT)
.policySpill(SpillPolicy.FAIL).policyLearning(LearningPolicy.NONE).build();
val root = Nd4j.create(DataType.FLOAT, 1000000).assign(119);
for (int e = 0; e < 100; e++) {
try (val ws = (CudaWorkspace) Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) {
val array = Nd4j.create(DataType.FLOAT, root.shape());
array.assign(root);
array.data().getInt(3);
assertEquals(ws.getHostOffset(), ws.getDeviceOffset());
}
}
}
@Test
public void testCircularWorkspaceAsymmetry_3() {
// circular workspace mode
val configuration = WorkspaceConfiguration.builder().initialSize(10 * 1024 * 1024)
.policyReset(ResetPolicy.ENDOFBUFFER_REACHED).policyAllocation(AllocationPolicy.STRICT)
.policySpill(SpillPolicy.FAIL).policyLearning(LearningPolicy.NONE).build();
val root = Nd4j.create(DataType.FLOAT, 1000000).assign(119);
for (int e = 0; e < 100; e++) {
try (val ws = (CudaWorkspace) Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) {
val array = Nd4j.create(DataType.FLOAT, root.shape());
array.assign(root);
val second = Nd4j.create(DataType.FLOAT, root.shape());
array.data().getInt(3);
}
}
}
}

View File

@ -16619,6 +16619,21 @@ public static final int TAD_THRESHOLD = TAD_THRESHOLD();
private native void allocate();
public native ShapeList calculateOutputShape(ShapeList inputShape, @ByRef Context block);
}
@Namespace("sd::ops") public static class clipbyavgnorm_bp extends DeclarableCustomOp {
static { Loader.load(); }
/** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */
public clipbyavgnorm_bp(Pointer p) { super(p); }
/** Native array allocator. Access with {@link Pointer#position(long)}. */
public clipbyavgnorm_bp(long size) { super((Pointer)null); allocateArray(size); }
private native void allocateArray(long size);
@Override public clipbyavgnorm_bp position(long position) {
return (clipbyavgnorm_bp)super.position(position);
}
public clipbyavgnorm_bp() { super((Pointer)null); allocate(); }
private native void allocate();
public native ShapeList calculateOutputShape(ShapeList inputShape, @ByRef Context block);
}
// #endif
// #if NOT_EXCLUDED(OP_cumsum)

View File

@ -26,6 +26,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.nd4j.linalg.BaseNd4jTest;
import org.nd4j.linalg.api.buffer.DataType;
import org.nd4j.linalg.api.concurrency.AffinityManager;
import org.nd4j.linalg.api.memory.MemoryWorkspace;
import org.nd4j.linalg.api.memory.conf.WorkspaceConfiguration;
import org.nd4j.linalg.api.memory.enums.*;
@ -1219,6 +1220,30 @@ public class BasicWorkspaceTests extends BaseNd4jTest {
Nd4j.getWorkspaceManager().destroyAllWorkspacesForCurrentThread();
}
@Test
public void testCircularWorkspaceAsymmetry_1() {
// nothing to test on CPU here
if (Nd4j.getEnvironment().isCPU())
return;
// circular workspace mode
val configuration = WorkspaceConfiguration.builder().initialSize(10 * 1024 * 1024)
.policyReset(ResetPolicy.ENDOFBUFFER_REACHED).policyAllocation(AllocationPolicy.STRICT)
.policySpill(SpillPolicy.FAIL).policyLearning(LearningPolicy.NONE).build();
try (val ws = Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) {
val array = Nd4j.create(DataType.FLOAT, 10, 10);
// we expect that this array has no data/buffer on HOST side
assertEquals(AffinityManager.Location.DEVICE, Nd4j.getAffinityManager().getActiveLocation(array));
// since this array doesn't have HOST buffer - it will allocate one now
array.getDouble(3L);
}
Nd4j.getWorkspaceManager().destroyAllWorkspacesForCurrentThread();
}
@Override
public char ordering() {