From c396fcb9605999e6586b95d460bd2524e0f8589e Mon Sep 17 00:00:00 2001 From: raver119 Date: Wed, 13 May 2020 08:12:07 +0300 Subject: [PATCH] More pre-release fixes (#456) * - numPrefixBlocks fix for threshold_encoding - temparrays pointers fixed Signed-off-by: raver119@gmail.com * auto configuration of memory workspace for gradients sharing Signed-off-by: raver119@gmail.com * limit sparse encoding message size Signed-off-by: raver119@gmail.com * one more workspace test Signed-off-by: raver119@gmail.com * one more CUDA-specific test Signed-off-by: raver119@gmail.com * one more CUDA-specific workspace test Signed-off-by: raver119@gmail.com * one more CUDA-specific workspace test Signed-off-by: raver119@gmail.com * one more CUDA-specific workspace test Signed-off-by: raver119@gmail.com * add separate host/device reset for circular workspace mode Signed-off-by: raver119@gmail.com * new PW builder method for encoder memory amount Signed-off-by: raver119@gmail.com * "inplace" execution for threshold encoding Signed-off-by: raver119@gmail.com --- .../parallelism/ParallelWrapper.java | 32 +++++- .../helpers/cuda/compression/threshold.cu | 9 +- .../layers_tests/DeclarableOpsTests19.cpp | 47 ++++++++ .../api/ops/compression/EncodeThreshold.java | 6 + .../ops/executioner/DefaultOpExecutioner.java | 26 ++++- .../nd4j/jita/workspace/CudaWorkspace.java | 9 +- .../jita/workspace/CudaWorkspaceTest.java | 105 ++++++++++++++++++ .../java/org/nd4j/nativeblas/Nd4jCpu.java | 15 +++ .../linalg/workspace/BasicWorkspaceTests.java | 25 +++++ 9 files changed, 264 insertions(+), 10 deletions(-) create mode 100644 nd4j/nd4j-backends/nd4j-backend-impls/nd4j-cuda/src/test/java/org/nd4j/jita/workspace/CudaWorkspaceTest.java diff --git a/deeplearning4j/deeplearning4j-scaleout/deeplearning4j-scaleout-parallelwrapper/src/main/java/org/deeplearning4j/parallelism/ParallelWrapper.java b/deeplearning4j/deeplearning4j-scaleout/deeplearning4j-scaleout-parallelwrapper/src/main/java/org/deeplearning4j/parallelism/ParallelWrapper.java index 00ca839f7..842ac34b2 100644 --- a/deeplearning4j/deeplearning4j-scaleout/deeplearning4j-scaleout-parallelwrapper/src/main/java/org/deeplearning4j/parallelism/ParallelWrapper.java +++ b/deeplearning4j/deeplearning4j-scaleout/deeplearning4j-scaleout-parallelwrapper/src/main/java/org/deeplearning4j/parallelism/ParallelWrapper.java @@ -20,6 +20,8 @@ import lombok.*; import lombok.extern.slf4j.Slf4j; import org.deeplearning4j.core.storage.StatsStorageRouter; import org.deeplearning4j.core.storage.listener.RoutingIterationListener; +import org.deeplearning4j.optimize.solvers.accumulation.EncodingHandler; +import org.deeplearning4j.optimize.solvers.accumulation.encoding.threshold.AdaptiveThresholdAlgorithm; import org.nd4j.linalg.dataset.AsyncDataSetIterator;; import org.nd4j.linalg.dataset.AsyncMultiDataSetIterator; import org.deeplearning4j.datasets.iterator.DummyBlockDataSetIterator; @@ -688,6 +690,7 @@ public class ParallelWrapper implements AutoCloseable { protected Supplier updaterParamsSupplier; protected ThresholdAlgorithm thresholdAlgorithm; protected ResidualPostProcessor residualPostProcessor; + protected Long encoderMemory = -1L; protected GradientsAccumulator accumulator; @@ -872,6 +875,19 @@ public class ParallelWrapper implements AutoCloseable { return this; } + /** + * This method allows to define amount of temporary memory that will be used for gradients sharing. + * Typically it's safe to keep default value. + * + * Default value: -1, amount of temporary memory will be calculated automatically + * @param numBytes number of bytes to be used + * @return + */ + public Builder temporaryMemory(@NonNull Long numBytes) { + this.encoderMemory = numBytes; + return this; + } + /** * Set the residual post processor algorithm. Not used for single machine training (only for PW used in a * distributed setting), and should not be set by users in most cases. @@ -907,11 +923,23 @@ public class ParallelWrapper implements AutoCloseable { } break; case SHARED_GRADIENTS: { - Preconditions.checkState(thresholdAlgorithm != null, "Cannot use SHARED_GRADIENTS training mode without setting a threshold algorithm"); + if (thresholdAlgorithm == null) + thresholdAlgorithm = new AdaptiveThresholdAlgorithm(); + this.trainerContext = new SymmetricTrainerContext(); if (this.accumulator == null) { log.info("Creating new GradientsAccumulator instance with default threshold of [5e-4]"); - this.accumulator = new EncodedGradientsAccumulator(workers, thresholdAlgorithm, residualPostProcessor, false); + val numParams = model.numParams(); + + // we're limiting max size of updates for Sparse encoding to the size of bitmap encoded message + val maxUpdate = (int) (numParams / 16 + 5); + + // memory sie in number of bytes + long memorySize = encoderMemory == null || encoderMemory < 0 + ? maxUpdate * 4 * (workers + 3) + : encoderMemory; + + this.accumulator = new EncodedGradientsAccumulator(workers, new EncodingHandler(thresholdAlgorithm, residualPostProcessor, maxUpdate, false), memorySize, workers + 2, Integer.MAX_VALUE, false); } } break; diff --git a/libnd4j/include/ops/declarable/helpers/cuda/compression/threshold.cu b/libnd4j/include/ops/declarable/helpers/cuda/compression/threshold.cu index 19cd3f67c..bbe6d6881 100644 --- a/libnd4j/include/ops/declarable/helpers/cuda/compression/threshold.cu +++ b/libnd4j/include/ops/declarable/helpers/cuda/compression/threshold.cu @@ -163,14 +163,13 @@ namespace sd { // here we just calculate number of sumBlock arrays do { int numPrefixBlocks = sd::math::nd4j_max(1, sd::math::nd4j_ceil((float) numElts / (2.0f * prefixThreads))); - if (numBlocks > 1) { + if (numPrefixBlocks > 1) { level++; } numElts = numPrefixBlocks; } while (numElts > 1); - std::vector tempArrays(level); std::vector pointers(level); @@ -181,13 +180,13 @@ namespace sd { int numPrefixBlocks = sd::math::nd4j_max(1, sd::math::nd4j_ceil((float) numElts / (2.0f * prefixThreads))); if (numPrefixBlocks > 1) { tempArrays[level] = std::move(NDArrayFactory::create('c', {numPrefixBlocks})); - pointers[level] = tempArrays[level++].specialBuffer(); + pointers[level] = tempArrays[level].specialBuffer();; + level++; } numElts = numPrefixBlocks; } while (numElts > 1); PointersManager pm(LaunchContext::defaultContext(), "thresholdEncode"); - auto dptr = pm.replicatePointer(pointers.data(), pointers.size() * 8); auto offsets = NDArrayFactory::create('c', {numBlocks}); // we want to check, if we're hiting external limit on number of encoded elements @@ -200,7 +199,7 @@ namespace sd { NDArray::prepareSpecialUse({}, {&encoded, &updates}); // filling offsets - encodeThresholdP2Int_(reinterpret_cast(dptr), + encodeThresholdP2Int_(reinterpret_cast(pointers.data()), reinterpret_cast(blocks.specialBuffer()), numBlocks, reinterpret_cast(offsets.specialBuffer())); diff --git a/libnd4j/tests_cpu/layers_tests/DeclarableOpsTests19.cpp b/libnd4j/tests_cpu/layers_tests/DeclarableOpsTests19.cpp index 641728ad3..ce5038020 100644 --- a/libnd4j/tests_cpu/layers_tests/DeclarableOpsTests19.cpp +++ b/libnd4j/tests_cpu/layers_tests/DeclarableOpsTests19.cpp @@ -228,6 +228,53 @@ TEST_F(DeclarableOpsTests19, test_threshold_encode_decode) { ASSERT_EQ(exp, initial); } +TEST_F(DeclarableOpsTests19, test_threshold_encode_decode_2) { + // [2,1,135079944,1,1,8192,1,99] + auto initial = NDArrayFactory::create('c', {1, 135079944}); + initial = 1.0f; + auto exp = initial.dup(); + auto neg = initial.like(); + neg = 0.5f; + + sd::ops::encode_threshold enc; + auto enc_result = enc.evaluate({&initial}, {0.5f}); + auto encoded = enc_result.at(1); + + ASSERT_EQ(135079944 + 4, encoded->lengthOf()); + ASSERT_NE(exp, initial); +/* + for (int e = 0; e < initial.lengthOf(); e++) { + auto f = initial.e(e); + if (f != 0.5f) { + nd4j_printf("initial[%i] = %f\n", e, f); + throw std::runtime_error(""); + } + } + */ + ASSERT_EQ(neg, initial); + + // checking equality of all encoded bits + //for (int e = 5; e < encoded->lengthOf() - 1; e++) { + //if (encoded->e(e) != encoded->e(e - 1) + 1) + //nd4j_printf("Non equal encoded values at E[%i]: %i;\n", e, encoded->e(e)); + //} + + sd::ops::decode_threshold dec; + auto status = dec.execute({&initial, encoded}, {&initial}); + ASSERT_EQ(Status::OK(), status); + + // checking equality of all dedoded bits + /* + for (int e = 0; e < initial.lengthOf(); e++) { + auto f = initial.e(e); + if (f != 1.0f) + nd4j_printf("initial[%i] = %f\n", e, f); + } + */ + + ASSERT_EQ(exp, initial); +} + TEST_F(DeclarableOpsTests19, test_matmul_ccc) { auto x = NDArrayFactory::create('c', {10, 10}); diff --git a/nd4j/nd4j-backends/nd4j-api-parent/nd4j-api/src/main/java/org/nd4j/linalg/api/ops/compression/EncodeThreshold.java b/nd4j/nd4j-backends/nd4j-api-parent/nd4j-api/src/main/java/org/nd4j/linalg/api/ops/compression/EncodeThreshold.java index 621b459e9..fec9582be 100644 --- a/nd4j/nd4j-backends/nd4j-api-parent/nd4j-api/src/main/java/org/nd4j/linalg/api/ops/compression/EncodeThreshold.java +++ b/nd4j/nd4j-backends/nd4j-api-parent/nd4j-api/src/main/java/org/nd4j/linalg/api/ops/compression/EncodeThreshold.java @@ -41,6 +41,12 @@ public class EncodeThreshold extends DynamicCustomOp { this(updates, threshold, Integer.MAX_VALUE); } + public EncodeThreshold(@NonNull INDArray updates, @NonNull INDArray encoded, float threshold, @NonNull Integer boundary) { + this(updates, threshold, boundary); + + addOutputArgument(updates, encoded); + } + public EncodeThreshold(@NonNull INDArray updates, float threshold, @NonNull Integer boundary) { addInputArgument(updates); diff --git a/nd4j/nd4j-backends/nd4j-api-parent/nd4j-api/src/main/java/org/nd4j/linalg/api/ops/executioner/DefaultOpExecutioner.java b/nd4j/nd4j-backends/nd4j-api-parent/nd4j-api/src/main/java/org/nd4j/linalg/api/ops/executioner/DefaultOpExecutioner.java index af3ffb2c4..f4be7cc92 100644 --- a/nd4j/nd4j-backends/nd4j-api-parent/nd4j-api/src/main/java/org/nd4j/linalg/api/ops/executioner/DefaultOpExecutioner.java +++ b/nd4j/nd4j-backends/nd4j-api-parent/nd4j-api/src/main/java/org/nd4j/linalg/api/ops/executioner/DefaultOpExecutioner.java @@ -692,9 +692,33 @@ public abstract class DefaultOpExecutioner implements OpExecutioner { return thresholdEncode(input, threshold, Integer.MAX_VALUE); } + private long _length(long[] shape) { + // scalar case + if (shape.length == 0) + return 1; + else if (shape.length == 1) + return shape[0]; + else { + long length = 1; + for (int e = 0; e < shape.length; e++) + length *= shape[e]; + + return length; + } + } + @Override public INDArray thresholdEncode(INDArray input, double threshold, Integer boundary) { - val result = Nd4j.exec(new EncodeThreshold(input, (float) threshold, boundary))[1]; + val op_shape = new EncodeThreshold(input, (float) threshold, boundary); + val shapes = Nd4j.getExecutioner().calculateOutputShape(op_shape); + + if (_length(shapes.get(1).getShape()) < 2) + return null; + + val result = Nd4j.create(DataType.INT32, shapes.get(1).getShape()); + + op_shape.addOutputArgument(input, result); + Nd4j.exec(op_shape); return result.getInt(0) > 0 ? result : null; } diff --git a/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-cuda/src/main/java/org/nd4j/jita/workspace/CudaWorkspace.java b/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-cuda/src/main/java/org/nd4j/jita/workspace/CudaWorkspace.java index e8d9b2d18..0c4cf9ffa 100644 --- a/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-cuda/src/main/java/org/nd4j/jita/workspace/CudaWorkspace.java +++ b/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-cuda/src/main/java/org/nd4j/jita/workspace/CudaWorkspace.java @@ -191,7 +191,7 @@ public class CudaWorkspace extends Nd4jWorkspace { // spill if (workspaceConfiguration.getPolicyReset() == ResetPolicy.ENDOFBUFFER_REACHED && currentSize.get() > 0 && !trimmer && Nd4j.getWorkspaceManager().getDebugMode() != DebugMode.SPILL_EVERYTHING) { //log.info("End of space reached. Current offset: {}; requiredMemory: {}", deviceOffset.get(), requiredMemory); - reset(); + deviceOffset.set(0); resetPlanned.set(true); return alloc(requiredMemory, kind, type, initialize); } @@ -204,7 +204,6 @@ public class CudaWorkspace extends Nd4jWorkspace { if (isDebug.get()) { log.info("Workspace [{}] device_{}: spilled DEVICE array of {} bytes, capacity of {} elements", id, Nd4j.getAffinityManager().getDeviceForCurrentThread(), requiredMemory, numElements); } - //Nd4j.getWorkspaceManager().printAllocationStatisticsForCurrentThread(); val shape = new AllocationShape(requiredMemory / Nd4j.sizeOfDataType(type), Nd4j.sizeOfDataType(type), type); @@ -258,6 +257,12 @@ public class CudaWorkspace extends Nd4jWorkspace { return ptr; } else { // log.info("Spilled HOST array of {} bytes, capacity of {} elements", requiredMemory, numElements); + if (workspaceConfiguration.getPolicyReset() == ResetPolicy.ENDOFBUFFER_REACHED && currentSize.get() > 0 && !trimmer && Nd4j.getWorkspaceManager().getDebugMode() != DebugMode.SPILL_EVERYTHING) { + //log.info("End of space reached. Current offset: {}; requiredMemory: {}", deviceOffset.get(), requiredMemory); + hostOffset.set(0); + //resetPlanned.set(true); + return alloc(requiredMemory, kind, type, initialize); + } val shape = new AllocationShape(requiredMemory / Nd4j.sizeOfDataType(type), Nd4j.sizeOfDataType(type), type); diff --git a/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-cuda/src/test/java/org/nd4j/jita/workspace/CudaWorkspaceTest.java b/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-cuda/src/test/java/org/nd4j/jita/workspace/CudaWorkspaceTest.java new file mode 100644 index 000000000..7b572de9c --- /dev/null +++ b/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-cuda/src/test/java/org/nd4j/jita/workspace/CudaWorkspaceTest.java @@ -0,0 +1,105 @@ +/******************************************************************************* + * Copyright (c) 2020 Konduit K.k. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ******************************************************************************/ + +package org.nd4j.jita.workspace; + +import lombok.val; +import org.junit.Test; +import org.nd4j.linalg.api.buffer.DataType; +import org.nd4j.linalg.api.concurrency.AffinityManager; +import org.nd4j.linalg.api.memory.conf.WorkspaceConfiguration; +import org.nd4j.linalg.api.memory.enums.AllocationPolicy; +import org.nd4j.linalg.api.memory.enums.LearningPolicy; +import org.nd4j.linalg.api.memory.enums.ResetPolicy; +import org.nd4j.linalg.api.memory.enums.SpillPolicy; +import org.nd4j.linalg.factory.Nd4j; + +import static org.junit.Assert.*; + +public class CudaWorkspaceTest { + + @Test + public void testCircularWorkspaceAsymmetry_1() { + // circular workspace mode + val configuration = WorkspaceConfiguration.builder().initialSize(10 * 1024 * 1024) + .policyReset(ResetPolicy.ENDOFBUFFER_REACHED).policyAllocation(AllocationPolicy.STRICT) + .policySpill(SpillPolicy.FAIL).policyLearning(LearningPolicy.NONE).build(); + + + try (val ws = (CudaWorkspace) Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) { + val array = Nd4j.create(DataType.FLOAT, 10, 10); + + assertEquals(0, ws.getHostOffset()); + assertNotEquals(0, ws.getDeviceOffset()); + + // we expect that this array has no data/buffer on HOST side + assertEquals(AffinityManager.Location.DEVICE, Nd4j.getAffinityManager().getActiveLocation(array)); + + // since this array doesn't have HOST buffer - it will allocate one now + array.getDouble(3L); + + assertEquals(ws.getHostOffset(), ws.getDeviceOffset()); + } + + try (val ws = (CudaWorkspace) Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) { + assertEquals(ws.getHostOffset(), ws.getDeviceOffset()); + } + + Nd4j.getWorkspaceManager().destroyAllWorkspacesForCurrentThread(); + } + + @Test + public void testCircularWorkspaceAsymmetry_2() { + // circular workspace mode + val configuration = WorkspaceConfiguration.builder().initialSize(10 * 1024 * 1024) + .policyReset(ResetPolicy.ENDOFBUFFER_REACHED).policyAllocation(AllocationPolicy.STRICT) + .policySpill(SpillPolicy.FAIL).policyLearning(LearningPolicy.NONE).build(); + + val root = Nd4j.create(DataType.FLOAT, 1000000).assign(119); + + for (int e = 0; e < 100; e++) { + try (val ws = (CudaWorkspace) Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) { + val array = Nd4j.create(DataType.FLOAT, root.shape()); + array.assign(root); + + array.data().getInt(3); + + assertEquals(ws.getHostOffset(), ws.getDeviceOffset()); + } + } + } + + @Test + public void testCircularWorkspaceAsymmetry_3() { + // circular workspace mode + val configuration = WorkspaceConfiguration.builder().initialSize(10 * 1024 * 1024) + .policyReset(ResetPolicy.ENDOFBUFFER_REACHED).policyAllocation(AllocationPolicy.STRICT) + .policySpill(SpillPolicy.FAIL).policyLearning(LearningPolicy.NONE).build(); + + val root = Nd4j.create(DataType.FLOAT, 1000000).assign(119); + + for (int e = 0; e < 100; e++) { + try (val ws = (CudaWorkspace) Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) { + val array = Nd4j.create(DataType.FLOAT, root.shape()); + array.assign(root); + + val second = Nd4j.create(DataType.FLOAT, root.shape()); + + array.data().getInt(3); + } + } + } +} \ No newline at end of file diff --git a/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-native/src/main/java/org/nd4j/nativeblas/Nd4jCpu.java b/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-native/src/main/java/org/nd4j/nativeblas/Nd4jCpu.java index b67949a54..b97274ba1 100644 --- a/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-native/src/main/java/org/nd4j/nativeblas/Nd4jCpu.java +++ b/nd4j/nd4j-backends/nd4j-backend-impls/nd4j-native/src/main/java/org/nd4j/nativeblas/Nd4jCpu.java @@ -16619,6 +16619,21 @@ public static final int TAD_THRESHOLD = TAD_THRESHOLD(); private native void allocate(); public native ShapeList calculateOutputShape(ShapeList inputShape, @ByRef Context block); } + @Namespace("sd::ops") public static class clipbyavgnorm_bp extends DeclarableCustomOp { + static { Loader.load(); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public clipbyavgnorm_bp(Pointer p) { super(p); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public clipbyavgnorm_bp(long size) { super((Pointer)null); allocateArray(size); } + private native void allocateArray(long size); + @Override public clipbyavgnorm_bp position(long position) { + return (clipbyavgnorm_bp)super.position(position); + } + + public clipbyavgnorm_bp() { super((Pointer)null); allocate(); } + private native void allocate(); + public native ShapeList calculateOutputShape(ShapeList inputShape, @ByRef Context block); + } // #endif // #if NOT_EXCLUDED(OP_cumsum) diff --git a/nd4j/nd4j-backends/nd4j-tests/src/test/java/org/nd4j/linalg/workspace/BasicWorkspaceTests.java b/nd4j/nd4j-backends/nd4j-tests/src/test/java/org/nd4j/linalg/workspace/BasicWorkspaceTests.java index 5b357d9a4..290b69723 100644 --- a/nd4j/nd4j-backends/nd4j-tests/src/test/java/org/nd4j/linalg/workspace/BasicWorkspaceTests.java +++ b/nd4j/nd4j-backends/nd4j-tests/src/test/java/org/nd4j/linalg/workspace/BasicWorkspaceTests.java @@ -26,6 +26,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.nd4j.linalg.BaseNd4jTest; import org.nd4j.linalg.api.buffer.DataType; +import org.nd4j.linalg.api.concurrency.AffinityManager; import org.nd4j.linalg.api.memory.MemoryWorkspace; import org.nd4j.linalg.api.memory.conf.WorkspaceConfiguration; import org.nd4j.linalg.api.memory.enums.*; @@ -1219,6 +1220,30 @@ public class BasicWorkspaceTests extends BaseNd4jTest { Nd4j.getWorkspaceManager().destroyAllWorkspacesForCurrentThread(); } + @Test + public void testCircularWorkspaceAsymmetry_1() { + // nothing to test on CPU here + if (Nd4j.getEnvironment().isCPU()) + return; + + // circular workspace mode + val configuration = WorkspaceConfiguration.builder().initialSize(10 * 1024 * 1024) + .policyReset(ResetPolicy.ENDOFBUFFER_REACHED).policyAllocation(AllocationPolicy.STRICT) + .policySpill(SpillPolicy.FAIL).policyLearning(LearningPolicy.NONE).build(); + + + try (val ws = Nd4j.getWorkspaceManager().getAndActivateWorkspace(configuration, "circular_ws")) { + val array = Nd4j.create(DataType.FLOAT, 10, 10); + + // we expect that this array has no data/buffer on HOST side + assertEquals(AffinityManager.Location.DEVICE, Nd4j.getAffinityManager().getActiveLocation(array)); + + // since this array doesn't have HOST buffer - it will allocate one now + array.getDouble(3L); + } + + Nd4j.getWorkspaceManager().destroyAllWorkspacesForCurrentThread(); + } @Override public char ordering() {