Merge pull request #9208 from eclipse/ag_aeron_fixes

Mark aeron tests not thread safe for more consistent execution, update aeron version
master
Adam Gibson 2021-03-06 09:19:55 +09:00 committed by GitHub
commit 4852cb975f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 53 additions and 35 deletions

View File

@ -56,7 +56,7 @@ if [ -n "$BUILD_PATH" ]; then
export PATH="$PATH:$BUILD_PATH" export PATH="$PATH:$BUILD_PATH"
fi fi
../blasbuild/${CHIP}/tests_cpu/layers_tests/runtests ../blasbuild/${CHIP}/te NdArrayIpcTeststs_cpu/layers_tests/runtests
# Workaround to fix posix path conversion problem on Windows (http://mingw.org/wiki/Posix_path_conversion) # Workaround to fix posix path conversion problem on Windows (http://mingw.org/wiki/Posix_path_conversion)
[ -f "${GTEST_OUTPUT#*:}" ] && cp -a surefire-reports/ ../target && rm -rf surefire-reports/ [ -f "${GTEST_OUTPUT#*:}" ] && cp -a surefire-reports/ ../target && rm -rf surefire-reports/

View File

@ -39,7 +39,7 @@
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<sbe.version>1.5.4</sbe.version> <sbe.version>1.5.4</sbe.version>
<aeron.version>1.4.0</aeron.version> <aeron.version>1.32.0</aeron.version>
</properties> </properties>
<dependencies> <dependencies>
@ -93,7 +93,7 @@
<configuration> <configuration>
<environmentVariables> <environmentVariables>
<LD_LIBRARY_PATH> <LD_LIBRARY_PATH>
${env.LD_LIBRARY_PATH}:${user.dir}:${libnd4jhome}/blasbuild/cpu/blas/ ${env.LD_LIBRARY_PATH}${path.separator}${user.dir}${path.separator}${libnd4jhome}/blasbuild/cpu/blas/${path.separator}${libnd4jhome}/../nd4j/nd4j-backends/nd4j-backend-impls/nd4j-native/target/classes
</LD_LIBRARY_PATH> </LD_LIBRARY_PATH>
</environmentVariables> </environmentVariables>
<testSourceDirectory>src/test/java</testSourceDirectory> <testSourceDirectory>src/test/java</testSourceDirectory>
@ -119,7 +119,6 @@
For testing large zoo models, this may not be enough (so comment it out). For testing large zoo models, this may not be enough (so comment it out).
--> -->
<argLine>-Ddtype=float -Dfile.encoding=UTF-8 -Xmx8g</argLine>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>

View File

@ -59,13 +59,15 @@ public class AeronUtil {
ipcLength += 2; ipcLength += 2;
// System.setProperty("aeron.term.buffer.size",String.valueOf(ipcLength)); // System.setProperty("aeron.term.buffer.size",String.valueOf(ipcLength));
final MediaDriver.Context ctx = final MediaDriver.Context ctx =
new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirsDeleteOnStart(true) new MediaDriver.Context().threadingMode(ThreadingMode.SHARED)
/* .ipcTermBufferLength(ipcLength) .dirDeleteOnStart(true)
.publicationTermBufferLength(ipcLength) .dirDeleteOnShutdown(true)
.maxTermBufferLength(ipcLength)*/ /* .ipcTermBufferLength(ipcLength)
.conductorIdleStrategy(new BusySpinIdleStrategy()) .publicationTermBufferLength(ipcLength)
.receiverIdleStrategy(new BusySpinIdleStrategy()) .maxTermBufferLength(ipcLength)*/
.senderIdleStrategy(new BusySpinIdleStrategy()); .conductorIdleStrategy(new BusySpinIdleStrategy())
.receiverIdleStrategy(new BusySpinIdleStrategy())
.senderIdleStrategy(new BusySpinIdleStrategy());
return ctx; return ctx;
} }
@ -92,7 +94,7 @@ public class AeronUtil {
* @return loop function * @return loop function
*/ */
public static Consumer<Subscription> subscriberLoop(final FragmentHandler fragmentHandler, final int limit, public static Consumer<Subscription> subscriberLoop(final FragmentHandler fragmentHandler, final int limit,
final AtomicBoolean running, final AtomicBoolean launched) { final AtomicBoolean running, final AtomicBoolean launched) {
final IdleStrategy idleStrategy = new BusySpinIdleStrategy(); final IdleStrategy idleStrategy = new BusySpinIdleStrategy();
return subscriberLoop(fragmentHandler, limit, running, idleStrategy, launched); return subscriberLoop(fragmentHandler, limit, running, idleStrategy, launched);
} }
@ -109,7 +111,7 @@ public class AeronUtil {
* @return loop function * @return loop function
*/ */
public static Consumer<Subscription> subscriberLoop(final FragmentHandler fragmentHandler, final int limit, public static Consumer<Subscription> subscriberLoop(final FragmentHandler fragmentHandler, final int limit,
final AtomicBoolean running, final IdleStrategy idleStrategy, final AtomicBoolean launched) { final AtomicBoolean running, final IdleStrategy idleStrategy, final AtomicBoolean launched) {
return (subscription) -> { return (subscription) -> {
try { try {
while (running.get()) { while (running.get()) {
@ -134,7 +136,7 @@ public class AeronUtil {
buffer.getBytes(offset, data); buffer.getBytes(offset, data);
System.out.println(String.format("Message to stream %d from session %d (%d@%d) <<%s>>", streamId, System.out.println(String.format("Message to stream %d from session %d (%d@%d) <<%s>>", streamId,
header.sessionId(), length, offset, new String(data))); header.sessionId(), length, offset, new String(data)));
}; };
} }
@ -149,7 +151,7 @@ public class AeronUtil {
* @param cause of the error * @param cause of the error
*/ */
public static void printError(final String channel, final int streamId, final int sessionId, final String message, public static void printError(final String channel, final int streamId, final int sessionId, final String message,
final HeaderFlyweight cause) { final HeaderFlyweight cause) {
System.out.println(message); System.out.println(message);
} }
@ -162,9 +164,9 @@ public class AeronUtil {
* @param totalBytes being reported * @param totalBytes being reported
*/ */
public static void printRate(final double messagesPerSec, final double bytesPerSec, final long totalMessages, public static void printRate(final double messagesPerSec, final double bytesPerSec, final long totalMessages,
final long totalBytes) { final long totalBytes) {
System.out.println(String.format("%.02g msgs/sec, %.02g bytes/sec, totals %d messages %d MB", messagesPerSec, System.out.println(String.format("%.02g msgs/sec, %.02g bytes/sec, totals %d messages %d MB", messagesPerSec,
bytesPerSec, totalMessages, totalBytes / (1024 * 1024))); bytesPerSec, totalMessages, totalBytes / (1024 * 1024)));
} }
/** /**
@ -175,7 +177,7 @@ public class AeronUtil {
public static void printAvailableImage(final Image image) { public static void printAvailableImage(final Image image) {
final Subscription subscription = image.subscription(); final Subscription subscription = image.subscription();
System.out.println(String.format("Available image on %s streamId=%d sessionId=%d from %s", System.out.println(String.format("Available image on %s streamId=%d sessionId=%d from %s",
subscription.channel(), subscription.streamId(), image.sessionId(), image.sourceIdentity())); subscription.channel(), subscription.streamId(), image.sessionId(), image.sourceIdentity()));
} }
/** /**
@ -186,7 +188,7 @@ public class AeronUtil {
public static void printUnavailableImage(final Image image) { public static void printUnavailableImage(final Image image) {
final Subscription subscription = image.subscription(); final Subscription subscription = image.subscription();
System.out.println(String.format("Unavailable image on %s streamId=%d sessionId=%d", subscription.channel(), System.out.println(String.format("Unavailable image on %s streamId=%d sessionId=%d", subscription.channel(),
subscription.streamId(), image.sessionId())); subscription.streamId(), image.sessionId()));
} }
private static final AtomicInteger conductorCount = new AtomicInteger(); private static final AtomicInteger conductorCount = new AtomicInteger();

View File

@ -34,8 +34,7 @@ public class LowLatencyMediaDriver {
@SuppressWarnings("checkstyle:UncommentedMain") @SuppressWarnings("checkstyle:UncommentedMain")
public static void main(final String... args) { public static void main(final String... args) {
MediaDriver.loadPropertiesFiles(args); MediaDriver.main(args);
setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true"); setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true");
setProperty("aeron.mtu.length", "16384"); setProperty("aeron.mtu.length", "16384");
setProperty("aeron.socket.so_sndbuf", "2097152"); setProperty("aeron.socket.so_sndbuf", "2097152");
@ -43,10 +42,11 @@ public class LowLatencyMediaDriver {
setProperty("aeron.rcv.initial.window.length", "2097152"); setProperty("aeron.rcv.initial.window.length", "2097152");
final MediaDriver.Context ctx = final MediaDriver.Context ctx =
new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirsDeleteOnStart(true) new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true)
.termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy()) .dirDeleteOnShutdown(true)
.receiverIdleStrategy(new BusySpinIdleStrategy()) .termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy())
.senderIdleStrategy(new BusySpinIdleStrategy()); .receiverIdleStrategy(new BusySpinIdleStrategy())
.senderIdleStrategy(new BusySpinIdleStrategy());
try (MediaDriver ignored = MediaDriver.launch(ctx)) { try (MediaDriver ignored = MediaDriver.launch(ctx)) {
new SigIntBarrier().await(); new SigIntBarrier().await();

View File

@ -28,13 +28,14 @@ import org.nd4j.common.tests.BaseND4JTest;
import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.api.ndarray.INDArray;
import org.nd4j.linalg.factory.Nd4j; import org.nd4j.linalg.factory.Nd4j;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@NotThreadSafe
public class AeronNDArraySerdeTest extends BaseND4JTest { public class AeronNDArraySerdeTest extends BaseND4JTest {
@Test @Test

View File

@ -31,11 +31,13 @@ import org.nd4j.common.tests.BaseND4JTest;
import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.api.ndarray.INDArray;
import org.nd4j.linalg.factory.Nd4j; import org.nd4j.linalg.factory.Nd4j;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@Slf4j @Slf4j
@NotThreadSafe
public class LargeNdArrayIpcTest extends BaseND4JTest { public class LargeNdArrayIpcTest extends BaseND4JTest {
private MediaDriver mediaDriver; private MediaDriver mediaDriver;
private Aeron.Context ctx; private Aeron.Context ctx;
@ -73,9 +75,10 @@ public class LargeNdArrayIpcTest extends BaseND4JTest {
int length = (int) 1e7; int length = (int) 1e7;
INDArray arr = Nd4j.ones(length); INDArray arr = Nd4j.ones(length);
AeronNDArrayPublisher publisher; AeronNDArrayPublisher publisher;
ctx = new Aeron.Context().publicationConnectionTimeout(-1).availableImageHandler(AeronUtil::printAvailableImage) ctx = new Aeron.Context()
.driverTimeoutMs(-1).availableImageHandler(AeronUtil::printAvailableImage)
.unavailableImageHandler(AeronUtil::printUnavailableImage) .unavailableImageHandler(AeronUtil::printUnavailableImage)
.aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(10000) .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(10000)
.errorHandler(err -> err.printStackTrace()); .errorHandler(err -> err.printStackTrace());
final AtomicBoolean running = new AtomicBoolean(true); final AtomicBoolean running = new AtomicBoolean(true);
@ -149,10 +152,10 @@ public class LargeNdArrayIpcTest extends BaseND4JTest {
private Aeron.Context getContext() { private Aeron.Context getContext() {
if (ctx == null) if (ctx == null)
ctx = new Aeron.Context().publicationConnectionTimeout(-1) ctx = new Aeron.Context().driverTimeoutMs(-1)
.availableImageHandler(AeronUtil::printAvailableImage) .availableImageHandler(AeronUtil::printAvailableImage)
.unavailableImageHandler(AeronUtil::printUnavailableImage) .unavailableImageHandler(AeronUtil::printUnavailableImage)
.aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(10000) .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(10000)
.errorHandler(err -> err.printStackTrace()); .errorHandler(err -> err.printStackTrace());
return ctx; return ctx;
} }

View File

@ -26,8 +26,11 @@ import org.nd4j.common.tests.BaseND4JTest;
import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.api.ndarray.INDArray;
import org.nd4j.linalg.factory.Nd4j; import org.nd4j.linalg.factory.Nd4j;
import javax.annotation.concurrent.NotThreadSafe;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@NotThreadSafe
public class NDArrayMessageTest extends BaseND4JTest { public class NDArrayMessageTest extends BaseND4JTest {
@Test @Test

View File

@ -32,12 +32,14 @@ import org.nd4j.linalg.factory.Nd4j;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@NotThreadSafe
public class NdArrayIpcTest extends BaseND4JTest { public class NdArrayIpcTest extends BaseND4JTest {
private MediaDriver mediaDriver; private MediaDriver mediaDriver;
private static Logger log = LoggerFactory.getLogger(NdArrayIpcTest.class); private static Logger log = LoggerFactory.getLogger(NdArrayIpcTest.class);
@ -223,10 +225,10 @@ public class NdArrayIpcTest extends BaseND4JTest {
private Aeron.Context getContext() { private Aeron.Context getContext() {
if (ctx == null) if (ctx == null)
ctx = new Aeron.Context().publicationConnectionTimeout(1000) ctx = new Aeron.Context().driverTimeoutMs(1000)
.availableImageHandler(image -> System.out.println(image)) .availableImageHandler(image -> System.out.println(image))
.unavailableImageHandler(AeronUtil::printUnavailableImage) .unavailableImageHandler(AeronUtil::printUnavailableImage)
.aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(1000) .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(1000)
.errorHandler(e -> log.error(e.toString(), e)); .errorHandler(e -> log.error(e.toString(), e));
return ctx; return ctx;
} }

View File

@ -25,8 +25,11 @@ import org.nd4j.common.tests.BaseND4JTest;
import org.nd4j.aeron.ipc.NDArrayMessage; import org.nd4j.aeron.ipc.NDArrayMessage;
import org.nd4j.linalg.factory.Nd4j; import org.nd4j.linalg.factory.Nd4j;
import javax.annotation.concurrent.NotThreadSafe;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@NotThreadSafe
public class ChunkAccumulatorTests extends BaseND4JTest { public class ChunkAccumulatorTests extends BaseND4JTest {
@Test @Test

View File

@ -27,11 +27,13 @@ import org.nd4j.aeron.ipc.NDArrayMessage;
import org.nd4j.aeron.util.BufferUtil; import org.nd4j.aeron.util.BufferUtil;
import org.nd4j.linalg.factory.Nd4j; import org.nd4j.linalg.factory.Nd4j;
import javax.annotation.concurrent.NotThreadSafe;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@NotThreadSafe
public class NDArrayMessageChunkTests extends BaseND4JTest { public class NDArrayMessageChunkTests extends BaseND4JTest {
@Test @Test

View File

@ -33,12 +33,14 @@ import org.nd4j.aeron.ipc.*;
import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.api.ndarray.INDArray;
import org.nd4j.linalg.factory.Nd4j; import org.nd4j.linalg.factory.Nd4j;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@Slf4j @Slf4j
@NotThreadSafe
public class AeronNDArrayResponseTest extends BaseND4JTest { public class AeronNDArrayResponseTest extends BaseND4JTest {
private MediaDriver mediaDriver; private MediaDriver mediaDriver;
@ -51,7 +53,8 @@ public class AeronNDArrayResponseTest extends BaseND4JTest {
public void before() { public void before() {
if(isIntegrationTests()) { if(isIntegrationTests()) {
final MediaDriver.Context ctx = final MediaDriver.Context ctx =
new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirsDeleteOnStart(true) new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirDeleteOnShutdown(true)
.dirDeleteOnStart(true)
.termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy()) .termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy())
.receiverIdleStrategy(new BusySpinIdleStrategy()) .receiverIdleStrategy(new BusySpinIdleStrategy())
.senderIdleStrategy(new BusySpinIdleStrategy()); .senderIdleStrategy(new BusySpinIdleStrategy());
@ -69,10 +72,10 @@ public class AeronNDArrayResponseTest extends BaseND4JTest {
int streamId = 10; int streamId = 10;
int responderStreamId = 11; int responderStreamId = 11;
String host = "127.0.0.1"; String host = "127.0.0.1";
Aeron.Context ctx = new Aeron.Context().publicationConnectionTimeout(-1) Aeron.Context ctx = new Aeron.Context().driverTimeoutMs(-1)
.availableImageHandler(AeronUtil::printAvailableImage) .availableImageHandler(AeronUtil::printAvailableImage)
.unavailableImageHandler(AeronUtil::printUnavailableImage) .unavailableImageHandler(AeronUtil::printUnavailableImage)
.aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(1000) .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(1000)
.errorHandler(e -> log.error(e.toString(), e)); .errorHandler(e -> log.error(e.toString(), e));
int baseSubscriberPort = 40123 + new java.util.Random().nextInt(1000); int baseSubscriberPort = 40123 + new java.util.Random().nextInt(1000);