1+ /*
2+ * Copyright 2025 Google LLC
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package com .example .dataflow ;
18+
19+ import org .apache .beam .sdk .Pipeline ;
20+ import org .apache .beam .sdk .io .gcp .pubsub .PubsubIO ;
21+ import org .apache .beam .sdk .io .gcp .pubsub .PubsubMessage ;
22+ import org .apache .beam .sdk .options .Description ;
23+ import org .apache .beam .sdk .options .PipelineOptionsFactory ;
24+ import org .apache .beam .sdk .options .SdkHarnessOptions ;
25+ import org .apache .beam .sdk .transforms .DoFn ;
26+ import org .apache .beam .sdk .transforms .ParDo ;
27+ import org .slf4j .Logger ;
28+ import org .slf4j .LoggerFactory ;
29+ import org .slf4j .MDC ;
30+
31+ public class MdcSample {
32+
33+ public interface MdcSampleJobOptions extends SdkHarnessOptions {
34+ @ Description ("The Pub/Sub subscription to read from." )
35+ String getInputSubscription ();
36+
37+ void setInputSubscription (String value );
38+ }
39+
40+ public static class MessageReaderFn extends DoFn <PubsubMessage , Void > {
41+
42+ private transient Logger logger ;
43+
44+ @ Setup
45+ public void setup () {
46+ logger = LoggerFactory .getLogger (MessageReaderFn .class );
47+ }
48+
49+ @ ProcessElement
50+ public void processElement (ProcessContext c ) {
51+ PubsubMessage message = c .element ();
52+ String messageId = message .getMessageId ();
53+
54+ try (MDC .MDCCloseable ignored = MDC .putCloseable ("messageId" , messageId )) {
55+ String payload = new String (message .getPayload (), java .nio .charset .StandardCharsets .UTF_8 );
56+ logger .info ("Received message with payload: {}" , payload );
57+
58+ // This is the example task
59+ logger .info ("Executing example task..." );
60+ } catch (Exception e ) {
61+ logger .error ("Failed to process message" , e );
62+ }
63+ }
64+ }
65+
66+ public static void main (String [] args ) {
67+ MdcSampleJobOptions options =
68+ PipelineOptionsFactory .fromArgs (args ).withValidation ().as (MdcSampleJobOptions .class );
69+ // options.setRunner(DirectRunner.class);
70+
71+ options .setLogMdc (true );
72+
73+
74+ Pipeline p = Pipeline .create (options );
75+
76+ p .apply (
77+ "Read Messages from Pub/Sub" ,
78+ PubsubIO .readMessagesWithAttributes ().fromSubscription (options .getInputSubscription ()))
79+ .apply ("Process Message" , ParDo .of (new MessageReaderFn ()));
80+
81+ p .run ();
82+ }
83+ }
0 commit comments