Mark aeron tests not thread safe for more consistent execution
This commit is contained in:
		
							parent
							
								
									fa8537f0c7
								
							
						
					
					
						commit
						be25b08a29
					
				| @ -56,7 +56,7 @@ if [ -n "$BUILD_PATH" ]; then | ||||
|     export PATH="$PATH:$BUILD_PATH" | ||||
| fi | ||||
| 
 | ||||
| ../blasbuild/${CHIP}/tests_cpu/layers_tests/runtests | ||||
| ../blasbuild/${CHIP}/te NdArrayIpcTeststs_cpu/layers_tests/runtests | ||||
| 
 | ||||
| # Workaround to fix posix path conversion problem on Windows (http://mingw.org/wiki/Posix_path_conversion) | ||||
| [ -f "${GTEST_OUTPUT#*:}" ] && cp -a surefire-reports/ ../target && rm -rf surefire-reports/ | ||||
|  | ||||
| @ -39,7 +39,7 @@ | ||||
|         <maven.compiler.source>1.8</maven.compiler.source> | ||||
|         <maven.compiler.target>1.8</maven.compiler.target> | ||||
|         <sbe.version>1.5.4</sbe.version> | ||||
|         <aeron.version>1.4.0</aeron.version> | ||||
|         <aeron.version>1.32.0</aeron.version> | ||||
|     </properties> | ||||
| 
 | ||||
|     <dependencies> | ||||
| @ -93,7 +93,7 @@ | ||||
|                         <configuration> | ||||
|                             <environmentVariables> | ||||
|                                 <LD_LIBRARY_PATH> | ||||
|                                     ${env.LD_LIBRARY_PATH}:${user.dir}:${libnd4jhome}/blasbuild/cpu/blas/ | ||||
|                                     ${env.LD_LIBRARY_PATH}${path.separator}${user.dir}${path.separator}${libnd4jhome}/blasbuild/cpu/blas/${path.separator}${libnd4jhome}/../nd4j/nd4j-backends/nd4j-backend-impls/nd4j-native/target/classes | ||||
|                                 </LD_LIBRARY_PATH> | ||||
|                             </environmentVariables> | ||||
|                             <testSourceDirectory>src/test/java</testSourceDirectory> | ||||
| @ -119,7 +119,6 @@ | ||||
| 
 | ||||
|                                 For testing large zoo models, this may not be enough (so comment it out). | ||||
|                             --> | ||||
|                             <argLine>-Ddtype=float -Dfile.encoding=UTF-8 -Xmx8g</argLine> | ||||
|                         </configuration> | ||||
|                     </plugin> | ||||
|                 </plugins> | ||||
|  | ||||
| @ -59,13 +59,15 @@ public class AeronUtil { | ||||
|             ipcLength += 2; | ||||
|         // System.setProperty("aeron.term.buffer.size",String.valueOf(ipcLength)); | ||||
|         final MediaDriver.Context ctx = | ||||
|                         new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirsDeleteOnStart(true) | ||||
|                                         /*  .ipcTermBufferLength(ipcLength) | ||||
|                                           .publicationTermBufferLength(ipcLength) | ||||
|                                           .maxTermBufferLength(ipcLength)*/ | ||||
|                                         .conductorIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                                         .receiverIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                                         .senderIdleStrategy(new BusySpinIdleStrategy()); | ||||
|                 new MediaDriver.Context().threadingMode(ThreadingMode.SHARED) | ||||
|                         .dirDeleteOnStart(true) | ||||
|                         .dirDeleteOnShutdown(true) | ||||
|                         /*  .ipcTermBufferLength(ipcLength) | ||||
|                           .publicationTermBufferLength(ipcLength) | ||||
|                           .maxTermBufferLength(ipcLength)*/ | ||||
|                         .conductorIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                         .receiverIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                         .senderIdleStrategy(new BusySpinIdleStrategy()); | ||||
|         return ctx; | ||||
|     } | ||||
| 
 | ||||
| @ -92,7 +94,7 @@ public class AeronUtil { | ||||
|      * @return loop function | ||||
|      */ | ||||
|     public static Consumer<Subscription> subscriberLoop(final FragmentHandler fragmentHandler, final int limit, | ||||
|                     final AtomicBoolean running, final AtomicBoolean launched) { | ||||
|                                                         final AtomicBoolean running, final AtomicBoolean launched) { | ||||
|         final IdleStrategy idleStrategy = new BusySpinIdleStrategy(); | ||||
|         return subscriberLoop(fragmentHandler, limit, running, idleStrategy, launched); | ||||
|     } | ||||
| @ -109,7 +111,7 @@ public class AeronUtil { | ||||
|      * @return loop function | ||||
|      */ | ||||
|     public static Consumer<Subscription> subscriberLoop(final FragmentHandler fragmentHandler, final int limit, | ||||
|                     final AtomicBoolean running, final IdleStrategy idleStrategy, final AtomicBoolean launched) { | ||||
|                                                         final AtomicBoolean running, final IdleStrategy idleStrategy, final AtomicBoolean launched) { | ||||
|         return (subscription) -> { | ||||
|             try { | ||||
|                 while (running.get()) { | ||||
| @ -134,7 +136,7 @@ public class AeronUtil { | ||||
|             buffer.getBytes(offset, data); | ||||
| 
 | ||||
|             System.out.println(String.format("Message to stream %d from session %d (%d@%d) <<%s>>", streamId, | ||||
|                             header.sessionId(), length, offset, new String(data))); | ||||
|                     header.sessionId(), length, offset, new String(data))); | ||||
|         }; | ||||
|     } | ||||
| 
 | ||||
| @ -149,7 +151,7 @@ public class AeronUtil { | ||||
|      * @param cause     of the error | ||||
|      */ | ||||
|     public static void printError(final String channel, final int streamId, final int sessionId, final String message, | ||||
|                     final HeaderFlyweight cause) { | ||||
|                                   final HeaderFlyweight cause) { | ||||
|         System.out.println(message); | ||||
|     } | ||||
| 
 | ||||
| @ -162,9 +164,9 @@ public class AeronUtil { | ||||
|      * @param totalBytes     being reported | ||||
|      */ | ||||
|     public static void printRate(final double messagesPerSec, final double bytesPerSec, final long totalMessages, | ||||
|                     final long totalBytes) { | ||||
|                                  final long totalBytes) { | ||||
|         System.out.println(String.format("%.02g msgs/sec, %.02g bytes/sec, totals %d messages %d MB", messagesPerSec, | ||||
|                         bytesPerSec, totalMessages, totalBytes / (1024 * 1024))); | ||||
|                 bytesPerSec, totalMessages, totalBytes / (1024 * 1024))); | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
| @ -175,7 +177,7 @@ public class AeronUtil { | ||||
|     public static void printAvailableImage(final Image image) { | ||||
|         final Subscription subscription = image.subscription(); | ||||
|         System.out.println(String.format("Available image on %s streamId=%d sessionId=%d from %s", | ||||
|                         subscription.channel(), subscription.streamId(), image.sessionId(), image.sourceIdentity())); | ||||
|                 subscription.channel(), subscription.streamId(), image.sessionId(), image.sourceIdentity())); | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
| @ -186,7 +188,7 @@ public class AeronUtil { | ||||
|     public static void printUnavailableImage(final Image image) { | ||||
|         final Subscription subscription = image.subscription(); | ||||
|         System.out.println(String.format("Unavailable image on %s streamId=%d sessionId=%d", subscription.channel(), | ||||
|                         subscription.streamId(), image.sessionId())); | ||||
|                 subscription.streamId(), image.sessionId())); | ||||
|     } | ||||
| 
 | ||||
|     private static final AtomicInteger conductorCount = new AtomicInteger(); | ||||
|  | ||||
| @ -34,8 +34,7 @@ public class LowLatencyMediaDriver { | ||||
| 
 | ||||
|     @SuppressWarnings("checkstyle:UncommentedMain") | ||||
|     public static void main(final String... args) { | ||||
|         MediaDriver.loadPropertiesFiles(args); | ||||
| 
 | ||||
|         MediaDriver.main(args); | ||||
|         setProperty(DISABLE_BOUNDS_CHECKS_PROP_NAME, "true"); | ||||
|         setProperty("aeron.mtu.length", "16384"); | ||||
|         setProperty("aeron.socket.so_sndbuf", "2097152"); | ||||
| @ -43,10 +42,11 @@ public class LowLatencyMediaDriver { | ||||
|         setProperty("aeron.rcv.initial.window.length", "2097152"); | ||||
| 
 | ||||
|         final MediaDriver.Context ctx = | ||||
|                         new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirsDeleteOnStart(true) | ||||
|                                         .termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                                         .receiverIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                                         .senderIdleStrategy(new BusySpinIdleStrategy()); | ||||
|                 new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true) | ||||
|                         .dirDeleteOnShutdown(true) | ||||
|                         .termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                         .receiverIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                         .senderIdleStrategy(new BusySpinIdleStrategy()); | ||||
| 
 | ||||
|         try (MediaDriver ignored = MediaDriver.launch(ctx)) { | ||||
|             new SigIntBarrier().await(); | ||||
|  | ||||
| @ -28,13 +28,14 @@ import org.nd4j.common.tests.BaseND4JTest; | ||||
| import org.nd4j.linalg.api.ndarray.INDArray; | ||||
| import org.nd4j.linalg.factory.Nd4j; | ||||
| 
 | ||||
| import javax.annotation.concurrent.NotThreadSafe; | ||||
| import java.io.BufferedOutputStream; | ||||
| import java.io.ByteArrayOutputStream; | ||||
| import java.io.DataOutputStream; | ||||
| 
 | ||||
| import static org.junit.Assert.assertEquals; | ||||
| import static org.junit.Assert.assertTrue; | ||||
| 
 | ||||
| @NotThreadSafe | ||||
| public class AeronNDArraySerdeTest extends BaseND4JTest { | ||||
| 
 | ||||
|     @Test | ||||
|  | ||||
| @ -31,11 +31,13 @@ import org.nd4j.common.tests.BaseND4JTest; | ||||
| import org.nd4j.linalg.api.ndarray.INDArray; | ||||
| import org.nd4j.linalg.factory.Nd4j; | ||||
| 
 | ||||
| import javax.annotation.concurrent.NotThreadSafe; | ||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||
| 
 | ||||
| import static org.junit.Assert.assertFalse; | ||||
| 
 | ||||
| @Slf4j | ||||
| @NotThreadSafe | ||||
| public class LargeNdArrayIpcTest extends BaseND4JTest { | ||||
|     private MediaDriver mediaDriver; | ||||
|     private Aeron.Context ctx; | ||||
| @ -73,9 +75,10 @@ public class LargeNdArrayIpcTest extends BaseND4JTest { | ||||
|         int length = (int) 1e7; | ||||
|         INDArray arr = Nd4j.ones(length); | ||||
|         AeronNDArrayPublisher publisher; | ||||
|         ctx = new Aeron.Context().publicationConnectionTimeout(-1).availableImageHandler(AeronUtil::printAvailableImage) | ||||
|         ctx = new Aeron.Context() | ||||
|                 .driverTimeoutMs(-1).availableImageHandler(AeronUtil::printAvailableImage) | ||||
|                         .unavailableImageHandler(AeronUtil::printUnavailableImage) | ||||
|                         .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(10000) | ||||
|                         .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(10000) | ||||
|                         .errorHandler(err -> err.printStackTrace()); | ||||
| 
 | ||||
|         final AtomicBoolean running = new AtomicBoolean(true); | ||||
| @ -149,10 +152,10 @@ public class LargeNdArrayIpcTest extends BaseND4JTest { | ||||
| 
 | ||||
|     private Aeron.Context getContext() { | ||||
|         if (ctx == null) | ||||
|             ctx = new Aeron.Context().publicationConnectionTimeout(-1) | ||||
|             ctx = new Aeron.Context().driverTimeoutMs(-1) | ||||
|                             .availableImageHandler(AeronUtil::printAvailableImage) | ||||
|                             .unavailableImageHandler(AeronUtil::printUnavailableImage) | ||||
|                             .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(10000) | ||||
|                             .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(10000) | ||||
|                             .errorHandler(err -> err.printStackTrace()); | ||||
|         return ctx; | ||||
|     } | ||||
|  | ||||
| @ -26,8 +26,11 @@ import org.nd4j.common.tests.BaseND4JTest; | ||||
| import org.nd4j.linalg.api.ndarray.INDArray; | ||||
| import org.nd4j.linalg.factory.Nd4j; | ||||
| 
 | ||||
| import javax.annotation.concurrent.NotThreadSafe; | ||||
| 
 | ||||
| import static org.junit.Assert.assertEquals; | ||||
| 
 | ||||
| @NotThreadSafe | ||||
| public class NDArrayMessageTest extends BaseND4JTest { | ||||
| 
 | ||||
|     @Test | ||||
|  | ||||
| @ -32,12 +32,14 @@ import org.nd4j.linalg.factory.Nd4j; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| 
 | ||||
| import javax.annotation.concurrent.NotThreadSafe; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| import java.util.concurrent.Executors; | ||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||
| 
 | ||||
| import static org.junit.Assert.assertFalse; | ||||
| 
 | ||||
| @NotThreadSafe | ||||
| public class NdArrayIpcTest extends BaseND4JTest { | ||||
|     private MediaDriver mediaDriver; | ||||
|     private static Logger log = LoggerFactory.getLogger(NdArrayIpcTest.class); | ||||
| @ -223,10 +225,10 @@ public class NdArrayIpcTest extends BaseND4JTest { | ||||
| 
 | ||||
|     private Aeron.Context getContext() { | ||||
|         if (ctx == null) | ||||
|             ctx = new Aeron.Context().publicationConnectionTimeout(1000) | ||||
|             ctx = new Aeron.Context().driverTimeoutMs(1000) | ||||
|                             .availableImageHandler(image -> System.out.println(image)) | ||||
|                             .unavailableImageHandler(AeronUtil::printUnavailableImage) | ||||
|                             .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(1000) | ||||
|                             .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(1000) | ||||
|                             .errorHandler(e -> log.error(e.toString(), e)); | ||||
|         return ctx; | ||||
|     } | ||||
|  | ||||
| @ -25,8 +25,11 @@ import org.nd4j.common.tests.BaseND4JTest; | ||||
| import org.nd4j.aeron.ipc.NDArrayMessage; | ||||
| import org.nd4j.linalg.factory.Nd4j; | ||||
| 
 | ||||
| import javax.annotation.concurrent.NotThreadSafe; | ||||
| 
 | ||||
| import static org.junit.Assert.assertEquals; | ||||
| 
 | ||||
| @NotThreadSafe | ||||
| public class ChunkAccumulatorTests extends BaseND4JTest { | ||||
| 
 | ||||
|     @Test | ||||
|  | ||||
| @ -27,11 +27,13 @@ import org.nd4j.aeron.ipc.NDArrayMessage; | ||||
| import org.nd4j.aeron.util.BufferUtil; | ||||
| import org.nd4j.linalg.factory.Nd4j; | ||||
| 
 | ||||
| import javax.annotation.concurrent.NotThreadSafe; | ||||
| import java.nio.ByteBuffer; | ||||
| 
 | ||||
| import static org.junit.Assert.assertArrayEquals; | ||||
| import static org.junit.Assert.assertEquals; | ||||
| 
 | ||||
| @NotThreadSafe | ||||
| public class NDArrayMessageChunkTests extends BaseND4JTest { | ||||
| 
 | ||||
|     @Test | ||||
|  | ||||
| @ -33,12 +33,14 @@ import org.nd4j.aeron.ipc.*; | ||||
| import org.nd4j.linalg.api.ndarray.INDArray; | ||||
| import org.nd4j.linalg.factory.Nd4j; | ||||
| 
 | ||||
| import javax.annotation.concurrent.NotThreadSafe; | ||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||
| 
 | ||||
| import static org.junit.Assert.assertEquals; | ||||
| 
 | ||||
| @Slf4j | ||||
| @NotThreadSafe | ||||
| public class AeronNDArrayResponseTest extends BaseND4JTest { | ||||
|     private MediaDriver mediaDriver; | ||||
| 
 | ||||
| @ -51,7 +53,8 @@ public class AeronNDArrayResponseTest extends BaseND4JTest { | ||||
|     public void before() { | ||||
|         if(isIntegrationTests()) { | ||||
|             final MediaDriver.Context ctx = | ||||
|                     new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirsDeleteOnStart(true) | ||||
|                     new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirDeleteOnShutdown(true) | ||||
|                             .dirDeleteOnStart(true) | ||||
|                             .termBufferSparseFile(false).conductorIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                             .receiverIdleStrategy(new BusySpinIdleStrategy()) | ||||
|                             .senderIdleStrategy(new BusySpinIdleStrategy()); | ||||
| @ -69,10 +72,10 @@ public class AeronNDArrayResponseTest extends BaseND4JTest { | ||||
|         int streamId = 10; | ||||
|         int responderStreamId = 11; | ||||
|         String host = "127.0.0.1"; | ||||
|         Aeron.Context ctx = new Aeron.Context().publicationConnectionTimeout(-1) | ||||
|         Aeron.Context ctx = new Aeron.Context().driverTimeoutMs(-1) | ||||
|                         .availableImageHandler(AeronUtil::printAvailableImage) | ||||
|                         .unavailableImageHandler(AeronUtil::printUnavailableImage) | ||||
|                         .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveInterval(1000) | ||||
|                         .aeronDirectoryName(mediaDriver.aeronDirectoryName()).keepAliveIntervalNs(1000) | ||||
|                         .errorHandler(e -> log.error(e.toString(), e)); | ||||
| 
 | ||||
|         int baseSubscriberPort = 40123 + new java.util.Random().nextInt(1000); | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user