From be25b08a2949a229dff48ca26f3fb4bf07cc2a34 Mon Sep 17 00:00:00 2001 From: agibsonccc Date: Fri, 5 Mar 2021 19:41:55 +0900 Subject: [PATCH] Mark aeron tests not thread safe for more consistent execution --- libnd4j/tests_cpu/run_tests.sh | 2 +- nd4j/nd4j-serde/nd4j-aeron/pom.xml | 5 ++- .../java/org/nd4j/aeron/ipc/AeronUtil.java | 32 ++++++++++--------- .../nd4j/aeron/ipc/LowLatencyMediaDriver.java | 12 +++---- .../nd4j/aeron/ipc/AeronNDArraySerdeTest.java | 3 +- .../nd4j/aeron/ipc/LargeNdArrayIpcTest.java | 11 ++++--- .../nd4j/aeron/ipc/NDArrayMessageTest.java | 3 ++ .../org/nd4j/aeron/ipc/NdArrayIpcTest.java | 6 ++-- .../ipc/chunk/ChunkAccumulatorTests.java | 3 ++ .../ipc/chunk/NDArrayMessageChunkTests.java | 2 ++ .../response/AeronNDArrayResponseTest.java | 9 ++++-- 11 files changed, 53 insertions(+), 35 deletions(-) diff --git a/libnd4j/tests_cpu/run_tests.sh b/libnd4j/tests_cpu/run_tests.sh index c06a99e0a..592e643fd 100755 --- a/libnd4j/tests_cpu/run_tests.sh +++ b/libnd4j/tests_cpu/run_tests.sh @@ -56,7 +56,7 @@ if [ -n "$BUILD_PATH" ]; then export PATH="$PATH:$BUILD_PATH" fi -../blasbuild/${CHIP}/tests_cpu/layers_tests/runtests +../blasbuild/${CHIP}/te NdArrayIpcTeststs_cpu/layers_tests/runtests # Workaround to fix posix path conversion problem on Windows (http://mingw.org/wiki/Posix_path_conversion) [ -f "${GTEST_OUTPUT#*:}" ] && cp -a surefire-reports/ ../target && rm -rf surefire-reports/ diff --git a/nd4j/nd4j-serde/nd4j-aeron/pom.xml b/nd4j/nd4j-serde/nd4j-aeron/pom.xml index f868bbd89..236004a0c 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/pom.xml +++ b/nd4j/nd4j-serde/nd4j-aeron/pom.xml @@ -39,7 +39,7 @@ 1.8 1.8 1.5.4 - 1.4.0 + 1.32.0 @@ -93,7 +93,7 @@ - ${env.LD_LIBRARY_PATH}:${user.dir}:${libnd4jhome}/blasbuild/cpu/blas/ + ${env.LD_LIBRARY_PATH}${path.separator}${user.dir}${path.separator}${libnd4jhome}/blasbuild/cpu/blas/${path.separator}${libnd4jhome}/../nd4j/nd4j-backends/nd4j-backend-impls/nd4j-native/target/classes src/test/java @@ -119,7 +119,6 @@ For testing large zoo models, this may not be enough (so comment it out). --> - -Ddtype=float -Dfile.encoding=UTF-8 -Xmx8g diff --git a/nd4j/nd4j-serde/nd4j-aeron/src/main/java/org/nd4j/aeron/ipc/AeronUtil.java b/nd4j/nd4j-serde/nd4j-aeron/src/main/java/org/nd4j/aeron/ipc/AeronUtil.java index c373e712e..97e83a5aa 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/src/main/java/org/nd4j/aeron/ipc/AeronUtil.java +++ b/nd4j/nd4j-serde/nd4j-aeron/src/main/java/org/nd4j/aeron/ipc/AeronUtil.java @@ -59,13 +59,15 @@ public class AeronUtil { ipcLength += 2; // System.setProperty("aeron.term.buffer.size",String.valueOf(ipcLength)); final MediaDriver.Context ctx = - new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirsDeleteOnStart(true) - /* .ipcTermBufferLength(ipcLength) - .publicationTermBufferLength(ipcLength) - .maxTermBufferLength(ipcLength)*/ - .conductorIdleStrategy(new BusySpinIdleStrategy()) - .receiverIdleStrategy(new BusySpinIdleStrategy()) - .senderIdleStrategy(new BusySpinIdleStrategy()); + new MediaDriver.Context().threadingMode(ThreadingMode.SHARED) + .dirDeleteOnStart(true) + .dirDeleteOnShutdown(true) + /* .ipcTermBufferLength(ipcLength) + .publicationTermBufferLength(ipcLength) + .maxTermBufferLength(ipcLength)*/ + .conductorIdleStrategy(new BusySpinIdleStrategy()) + .receiverIdleStrategy(new BusySpinIdleStrategy()) + .senderIdleStrategy(new BusySpinIdleStrategy()); return ctx; } @@ -92,7 +94,7 @@ public class AeronUtil { * @return loop function */ public static Consumer subscriberLoop(final FragmentHandler fragmentHandler, final int limit, - final AtomicBoolean running, final AtomicBoolean launched) { + final AtomicBoolean running, final AtomicBoolean launched) { final IdleStrategy idleStrategy = new BusySpinIdleStrategy(); return subscriberLoop(fragmentHandler, limit, running, idleStrategy, launched); } @@ -109,7 +111,7 @@ public class AeronUtil { * @return loop function */ public static Consumer subscriberLoop(final FragmentHandler fragmentHandler, final int limit, - final AtomicBoolean running, final IdleStrategy idleStrategy, final AtomicBoolean launched) { + final AtomicBoolean running, final IdleStrategy idleStrategy, final AtomicBoolean launched) { return (subscription) -> { try { while (running.get()) { @@ -134,7 +136,7 @@ public class AeronUtil { buffer.getBytes(offset, data); System.out.println(String.format("Message to stream %d from session %d (%d@%d) <<%s>>", streamId, - header.sessionId(), length, offset, new String(data))); + header.sessionId(), length, offset, new String(data))); }; } @@ -149,7 +151,7 @@ public class AeronUtil { * @param cause of the error */ public static void printError(final String channel, final int streamId, final int sessionId, final String message, - final HeaderFlyweight cause) { + final HeaderFlyweight cause) { System.out.println(message); } @@ -162,9 +164,9 @@ public class AeronUtil { * @param totalBytes being reported */ public static void printRate(final double messagesPerSec, final double bytesPerSec, final long totalMessages, - final long totalBytes) { + final long totalBytes) { System.out.println(String.format("%.02g msgs/sec, %.02g bytes/sec, totals %d messages %d MB", messagesPerSec, - bytesPerSec, totalMessages, totalBytes / (1024 * 1024))); + bytesPerSec, totalMessages, totalBytes / (1024 * 1024))); } /** @@ -175,7 +177,7 @@ public class AeronUtil { public static void printAvailableImage(final Image image) { final Subscription subscription = image.subscription(); System.out.println(String.format("Available image on %s streamId=%d sessionId=%d from %s", - subscription.channel(), subscription.streamId(), image.sessionId(), image.sourceIdentity())); + subscription.channel(), subscription.streamId(), image.sessionId(), image.sourceIdentity())); } /** @@ -186,7 +188,7 @@ public class AeronUtil { public static void printUnavailableImage(final Image image) { final Subscription subscription = image.subscription(); System.out.println(String.format("Unavailable image on %s streamId=%d sessionId=%d", subscription.channel(), - subscription.streamId(), image.sessionId())); + subscription.streamId(), image.sessionId())); } private static final AtomicInteger conductorCount = new AtomicInteger(); diff --git a/nd4j/nd4j-serde/nd4j-aeron/src/main/java/org/nd4j/aeron/ipc/LowLatencyMediaDriver.java b/nd4j/nd4j-serde/nd4j-aeron/src/main/java/org/nd4j/aeron/ipc/LowLatencyMediaDriver.java index e366a0c9d..d5d1da518 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/src/main/java/org/nd4j/aeron/ipc/LowLatencyMediaDriver.java +++ b/nd4j/nd4j-serde/nd4j-aeron/src/main/java/org/nd4j/aeron/ipc/LowLatencyMediaDriver.java @@ -34,8 +34,7 @@ public class LowLatencyMediaDriver { @SuppressWarnings("checkstyle:UncommentedMain") public static void main(final String... args) { - MediaDriver.loadPropertiesFiles(args); - + MediaDriver.main(args); setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true"); setProperty("aeron.mtu.length", "16384"); setProperty("aeron.socket.so_sndbuf", "2097152"); @@ -43,10 +42,11 @@ public class LowLatencyMediaDriver { setProperty("aeron.rcv.initial.window.length", "2097152"); final MediaDriver.Context ctx = - new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirsDeleteOnStart(true) - .termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy()) - .receiverIdleStrategy(new BusySpinIdleStrategy()) - .senderIdleStrategy(new BusySpinIdleStrategy()); + new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true) + .dirDeleteOnShutdown(true) + .termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy()) + .receiverIdleStrategy(new BusySpinIdleStrategy()) + .senderIdleStrategy(new BusySpinIdleStrategy()); try (MediaDriver ignored = MediaDriver.launch(ctx)) { new SigIntBarrier().await(); diff --git a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/AeronNDArraySerdeTest.java b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/AeronNDArraySerdeTest.java index eb0a400ff..b28937c62 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/AeronNDArraySerdeTest.java +++ b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/AeronNDArraySerdeTest.java @@ -28,13 +28,14 @@ import org.nd4j.common.tests.BaseND4JTest; import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.factory.Nd4j; +import javax.annotation.concurrent.NotThreadSafe; import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - +@NotThreadSafe public class AeronNDArraySerdeTest extends BaseND4JTest { @Test diff --git a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/LargeNdArrayIpcTest.java b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/LargeNdArrayIpcTest.java index 832b2c9c5..af4bb515c 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/LargeNdArrayIpcTest.java +++ b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/LargeNdArrayIpcTest.java @@ -31,11 +31,13 @@ import org.nd4j.common.tests.BaseND4JTest; import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.factory.Nd4j; +import javax.annotation.concurrent.NotThreadSafe; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertFalse; @Slf4j +@NotThreadSafe public class LargeNdArrayIpcTest extends BaseND4JTest { private MediaDriver mediaDriver; private Aeron.Context ctx; @@ -73,9 +75,10 @@ public class LargeNdArrayIpcTest extends BaseND4JTest { int length = (int) 1e7; INDArray arr = Nd4j.ones(length); AeronNDArrayPublisher publisher; - ctx = new Aeron.Context().publicationConnectionTimeout(-1).availableImageHandler(AeronUtil::printAvailableImage) + ctx = new Aeron.Context() + .driverTimeoutMs(-1).availableImageHandler(AeronUtil::printAvailableImage) .unavailableImageHandler(AeronUtil::printUnavailableImage) - .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(10000) + .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(10000) .errorHandler(err -> err.printStackTrace()); final AtomicBoolean running = new AtomicBoolean(true); @@ -149,10 +152,10 @@ public class LargeNdArrayIpcTest extends BaseND4JTest { private Aeron.Context getContext() { if (ctx == null) - ctx = new Aeron.Context().publicationConnectionTimeout(-1) + ctx = new Aeron.Context().driverTimeoutMs(-1) .availableImageHandler(AeronUtil::printAvailableImage) .unavailableImageHandler(AeronUtil::printUnavailableImage) - .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(10000) + .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(10000) .errorHandler(err -> err.printStackTrace()); return ctx; } diff --git a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/NDArrayMessageTest.java b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/NDArrayMessageTest.java index ffc4e04e6..0a8b89277 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/NDArrayMessageTest.java +++ b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/NDArrayMessageTest.java @@ -26,8 +26,11 @@ import org.nd4j.common.tests.BaseND4JTest; import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.factory.Nd4j; +import javax.annotation.concurrent.NotThreadSafe; + import static org.junit.Assert.assertEquals; +@NotThreadSafe public class NDArrayMessageTest extends BaseND4JTest { @Test diff --git a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/NdArrayIpcTest.java b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/NdArrayIpcTest.java index 253df0082..6dac31259 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/NdArrayIpcTest.java +++ b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/NdArrayIpcTest.java @@ -32,12 +32,14 @@ import org.nd4j.linalg.factory.Nd4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.NotThreadSafe; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertFalse; +@NotThreadSafe public class NdArrayIpcTest extends BaseND4JTest { private MediaDriver mediaDriver; private static Logger log = LoggerFactory.getLogger(NdArrayIpcTest.class); @@ -223,10 +225,10 @@ public class NdArrayIpcTest extends BaseND4JTest { private Aeron.Context getContext() { if (ctx == null) - ctx = new Aeron.Context().publicationConnectionTimeout(1000) + ctx = new Aeron.Context().driverTimeoutMs(1000) .availableImageHandler(image -> System.out.println(image)) .unavailableImageHandler(AeronUtil::printUnavailableImage) - .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(1000) + .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(1000) .errorHandler(e -> log.error(e.toString(), e)); return ctx; } diff --git a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/chunk/ChunkAccumulatorTests.java b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/chunk/ChunkAccumulatorTests.java index 62b724760..d3b89ef48 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/chunk/ChunkAccumulatorTests.java +++ b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/chunk/ChunkAccumulatorTests.java @@ -25,8 +25,11 @@ import org.nd4j.common.tests.BaseND4JTest; import org.nd4j.aeron.ipc.NDArrayMessage; import org.nd4j.linalg.factory.Nd4j; +import javax.annotation.concurrent.NotThreadSafe; + import static org.junit.Assert.assertEquals; +@NotThreadSafe public class ChunkAccumulatorTests extends BaseND4JTest { @Test diff --git a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/chunk/NDArrayMessageChunkTests.java b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/chunk/NDArrayMessageChunkTests.java index 8df55f0bd..8ff2eae34 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/chunk/NDArrayMessageChunkTests.java +++ b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/chunk/NDArrayMessageChunkTests.java @@ -27,11 +27,13 @@ import org.nd4j.aeron.ipc.NDArrayMessage; import org.nd4j.aeron.util.BufferUtil; import org.nd4j.linalg.factory.Nd4j; +import javax.annotation.concurrent.NotThreadSafe; import java.nio.ByteBuffer; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +@NotThreadSafe public class NDArrayMessageChunkTests extends BaseND4JTest { @Test diff --git a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/response/AeronNDArrayResponseTest.java b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/response/AeronNDArrayResponseTest.java index 7a663e690..1c4c46acd 100644 --- a/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/response/AeronNDArrayResponseTest.java +++ b/nd4j/nd4j-serde/nd4j-aeron/src/test/java/org/nd4j/aeron/ipc/response/AeronNDArrayResponseTest.java @@ -33,12 +33,14 @@ import org.nd4j.aeron.ipc.*; import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.factory.Nd4j; +import javax.annotation.concurrent.NotThreadSafe; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; @Slf4j +@NotThreadSafe public class AeronNDArrayResponseTest extends BaseND4JTest { private MediaDriver mediaDriver; @@ -51,7 +53,8 @@ public class AeronNDArrayResponseTest extends BaseND4JTest { public void before() { if(isIntegrationTests()) { final MediaDriver.Context ctx = - new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirsDeleteOnStart(true) + new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirDeleteOnShutdown(true) + .dirDeleteOnStart(true) .termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy()) .receiverIdleStrategy(new BusySpinIdleStrategy()) .senderIdleStrategy(new BusySpinIdleStrategy()); @@ -69,10 +72,10 @@ public class AeronNDArrayResponseTest extends BaseND4JTest { int streamId = 10; int responderStreamId = 11; String host = "127.0.0.1"; - Aeron.Context ctx = new Aeron.Context().publicationConnectionTimeout(-1) + Aeron.Context ctx = new Aeron.Context().driverTimeoutMs(-1) .availableImageHandler(AeronUtil::printAvailableImage) .unavailableImageHandler(AeronUtil::printUnavailableImage) - .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(1000) + .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(1000) .errorHandler(e -> log.error(e.toString(), e)); int baseSubscriberPort = 40123 + new java.util.Random().nextInt(1000);