3636import org .apache .beam .sdk .coders .KvCoder ;
3737import org .apache .beam .sdk .coders .MapCoder ;
3838import org .apache .beam .sdk .coders .StringUtf8Coder ;
39- import org .apache .beam .sdk .io .common .NetworkTestHelper ;
4039import org .apache .beam .sdk .io .range .ByteKey ;
4140import org .apache .beam .sdk .io .redis .RedisIO .Write .Method ;
4241import org .apache .beam .sdk .testing .PAssert ;
4847import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableMap ;
4948import org .junit .AfterClass ;
5049import org .junit .BeforeClass ;
50+ import org .junit .ClassRule ;
5151import org .junit .Rule ;
5252import org .junit .Test ;
5353import org .junit .runner .RunWith ;
5454import org .junit .runners .JUnit4 ;
55+ import org .testcontainers .containers .GenericContainer ;
56+ import org .testcontainers .utility .DockerImageName ;
5557import redis .clients .jedis .Jedis ;
5658import redis .clients .jedis .StreamEntryID ;
5759import redis .clients .jedis .resps .StreamEntry ;
58- import redis .embedded .RedisServer ;
5960
6061/** Test on the Redis IO. */
6162@ RunWith (JUnit4 .class )
6263public class RedisIOTest {
6364
64- private static final String REDIS_HOST = "localhost" ;
65+ private static final DockerImageName REDIS_IMAGE_NAME = DockerImageName . parse ( "redis:7-alpine" ) ;
6566 private static final Long NO_EXPIRATION = -1L ;
67+ private static final int REDIS_PORT = 6379 ;
6668
6769 @ Rule public TestPipeline p = TestPipeline .create ();
6870
69- private static RedisServer server ;
71+ @ ClassRule
72+ public static final GenericContainer <?> REDIS_CONTAINER =
73+ new GenericContainer <>(REDIS_IMAGE_NAME ).withExposedPorts (REDIS_PORT );
74+
75+ private static String redisHost ;
7076 private static int port ;
7177
7278 private static Jedis client ;
7379
7480 @ BeforeClass
7581 public static void beforeClass () throws Exception {
76- port = NetworkTestHelper . getAvailableLocalPort ();
77- server = new RedisServer ( port );
78- server . start ();
79- client = RedisConnectionConfiguration . create ( REDIS_HOST , port ). connect ();
82+ redisHost = REDIS_CONTAINER . getHost ();
83+ port = REDIS_CONTAINER . getMappedPort ( REDIS_PORT );
84+ client = RedisConnectionConfiguration . create ( redisHost , port ). connect ();
85+ client . ping ();
8086 }
8187
8288 @ AfterClass
8389 public static void afterClass () {
84- client .close ();
85- server .stop ();
90+ if (client != null ) {
91+ client .close ();
92+ }
8693 }
8794
8895 @ Test
@@ -94,7 +101,7 @@ public void testRead() {
94101 p .apply (
95102 "Read" ,
96103 RedisIO .read ()
97- .withEndpoint (REDIS_HOST , port )
104+ .withEndpoint (redisHost , port )
98105 .withKeyPattern ("bulkread*" )
99106 .withBatchSize (10 ));
100107 PAssert .that (read ).containsInAnyOrder (data );
@@ -110,7 +117,7 @@ public void testReadSplitBig() {
110117 p .apply (
111118 "Read" ,
112119 RedisIO .read ()
113- .withEndpoint (REDIS_HOST , port )
120+ .withEndpoint (redisHost , port )
114121 .withKeyPattern ("bigset*" )
115122 .withBatchSize (8 ));
116123 PAssert .that (read ).containsInAnyOrder (data );
@@ -126,7 +133,7 @@ public void testReadSplitSmall() {
126133 p .apply (
127134 "Read" ,
128135 RedisIO .read ()
129- .withEndpoint (REDIS_HOST , port )
136+ .withEndpoint (redisHost , port )
130137 .withKeyPattern ("smallset*" )
131138 .withBatchSize (20 ));
132139 PAssert .that (read ).containsInAnyOrder (data );
@@ -139,13 +146,12 @@ public void testReadWithKeyPattern() {
139146 data .forEach (kv -> client .set (kv .getKey (), kv .getValue ()));
140147
141148 PCollection <KV <String , String >> read =
142- p .apply ("Read" , RedisIO .read ().withEndpoint (REDIS_HOST , port ).withKeyPattern ("pattern*" ));
149+ p .apply ("Read" , RedisIO .read ().withEndpoint (redisHost , port ).withKeyPattern ("pattern*" ));
143150 PAssert .that (read ).containsInAnyOrder (data );
144151
145152 PCollection <KV <String , String >> readNotMatch =
146153 p .apply (
147- "ReadNotMatch" ,
148- RedisIO .read ().withEndpoint (REDIS_HOST , port ).withKeyPattern ("foobar*" ));
154+ "ReadNotMatch" , RedisIO .read ().withEndpoint (redisHost , port ).withKeyPattern ("foobar*" ));
149155 PAssert .thatSingleton (readNotMatch .apply (Count .globally ())).isEqualTo (0L );
150156
151157 p .run ();
@@ -158,7 +164,7 @@ public void testWriteWithMethodSet() {
158164
159165 String newValue = "newValue" ;
160166 PCollection <KV <String , String >> write = p .apply (Create .of (KV .of (key , newValue )));
161- write .apply (RedisIO .write ().withEndpoint (REDIS_HOST , port ).withMethod (Method .SET ));
167+ write .apply (RedisIO .write ().withEndpoint (redisHost , port ).withMethod (Method .SET ));
162168 p .run ();
163169
164170 assertEquals (newValue , client .get (key ));
@@ -174,7 +180,7 @@ public void testWriteWithMethodSetWithExpiration() {
174180 PCollection <KV <String , String >> write = p .apply (Create .of (KV .of (key , newValue )));
175181 write .apply (
176182 RedisIO .write ()
177- .withEndpoint (REDIS_HOST , port )
183+ .withEndpoint (redisHost , port )
178184 .withMethod (Method .SET )
179185 .withExpireTime (10_000L ));
180186 p .run ();
@@ -193,7 +199,7 @@ public void testWriteWithMethodLPush() {
193199
194200 String newValue = "newValue" ;
195201 PCollection <KV <String , String >> write = p .apply (Create .of (KV .of (key , newValue )));
196- write .apply (RedisIO .write ().withEndpoint (REDIS_HOST , port ).withMethod (Method .LPUSH ));
202+ write .apply (RedisIO .write ().withEndpoint (redisHost , port ).withMethod (Method .LPUSH ));
197203 p .run ();
198204
199205 List <String > values = client .lrange (key , 0 , -1 );
@@ -208,7 +214,7 @@ public void testWriteWithMethodRPush() {
208214
209215 String newValue = "newValue" ;
210216 PCollection <KV <String , String >> write = p .apply (Create .of (KV .of (key , newValue )));
211- write .apply (RedisIO .write ().withEndpoint (REDIS_HOST , port ).withMethod (Method .RPUSH ));
217+ write .apply (RedisIO .write ().withEndpoint (redisHost , port ).withMethod (Method .RPUSH ));
212218 p .run ();
213219
214220 List <String > values = client .lrange (key , 0 , -1 );
@@ -222,7 +228,7 @@ public void testWriteWithMethodSAdd() {
222228 List <KV <String , String >> data = buildConstantKeyList (key , values );
223229
224230 PCollection <KV <String , String >> write = p .apply (Create .of (data ));
225- write .apply (RedisIO .write ().withEndpoint (REDIS_HOST , port ).withMethod (Method .SADD ));
231+ write .apply (RedisIO .write ().withEndpoint (redisHost , port ).withMethod (Method .SADD ));
226232 p .run ();
227233
228234 Set <String > expected = new HashSet <>(values );
@@ -237,7 +243,7 @@ public void testWriteWithMethodPFAdd() {
237243 List <KV <String , String >> data = buildConstantKeyList (key , values );
238244
239245 PCollection <KV <String , String >> write = p .apply (Create .of (data ));
240- write .apply (RedisIO .write ().withEndpoint (REDIS_HOST , port ).withMethod (Method .PFADD ));
246+ write .apply (RedisIO .write ().withEndpoint (redisHost , port ).withMethod (Method .PFADD ));
241247 p .run ();
242248
243249 long count = client .pfcount (key );
@@ -254,7 +260,7 @@ public void testWriteWithMethodPFAddWithExpireTime() {
254260 PCollection <KV <String , String >> write = p .apply (Create .of (data ));
255261 write .apply (
256262 RedisIO .write ()
257- .withEndpoint (REDIS_HOST , port )
263+ .withEndpoint (redisHost , port )
258264 .withMethod (Method .PFADD )
259265 .withExpireTime (10_000L ));
260266 p .run ();
@@ -273,7 +279,7 @@ public void testWriteUsingINCRBY() {
273279 List <KV <String , String >> data = buildConstantKeyList (key , values );
274280
275281 PCollection <KV <String , String >> write = p .apply (Create .of (data ));
276- write .apply (RedisIO .write ().withEndpoint (REDIS_HOST , port ).withMethod (Method .INCRBY ));
282+ write .apply (RedisIO .write ().withEndpoint (redisHost , port ).withMethod (Method .INCRBY ));
277283
278284 p .run ();
279285
@@ -289,7 +295,7 @@ public void testWriteUsingDECRBY() {
289295 List <KV <String , String >> data = buildConstantKeyList (key , values );
290296
291297 PCollection <KV <String , String >> write = p .apply (Create .of (data ));
292- write .apply (RedisIO .write ().withEndpoint (REDIS_HOST , port ).withMethod (Method .DECRBY ));
298+ write .apply (RedisIO .write ().withEndpoint (redisHost , port ).withMethod (Method .DECRBY ));
293299
294300 p .run ();
295301
@@ -319,7 +325,7 @@ public void testWriteStreams() {
319325 KvCoder .of (
320326 StringUtf8Coder .of (),
321327 MapCoder .of (StringUtf8Coder .of (), StringUtf8Coder .of ()))));
322- write .apply (RedisIO .writeStreams ().withEndpoint (REDIS_HOST , port ));
328+ write .apply (RedisIO .writeStreams ().withEndpoint (redisHost , port ));
323329 p .run ();
324330
325331 for (String key : redisKeys ) {
@@ -353,7 +359,7 @@ public void testWriteStreamsWithTruncation() {
353359 MapCoder .of (StringUtf8Coder .of (), StringUtf8Coder .of ()))));
354360 write .apply (
355361 RedisIO .writeStreams ()
356- .withEndpoint (REDIS_HOST , port )
362+ .withEndpoint (redisHost , port )
357363 .withMaxLen (1 )
358364 .withApproximateTrim (false ));
359365 p .run ();
0 commit comments