我正在尝试使用此示例从 DataFlow 的 GCP Pub/Sub 检索数据。import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.IOException;import java.time.Instant;import java.util.ArrayList;import java.util.List;import avro.shaded.com.google.common.collect.Lists;import com.google.auth.oauth2.GoogleCredentials;import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;import org.apache.beam.sdk.options.Default;import org.apache.beam.sdk.options.Description;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.DoFn;import org.apache.beam.sdk.transforms.ParDo;import org.apache.beam.sdk.transforms.Sum;import org.apache.beam.sdk.transforms.windowing.SlidingWindows;import org.apache.beam.sdk.transforms.windowing.Window;import org.joda.time.Duration;import com.google.api.services.bigquery.model.TableFieldSchema;import com.google.api.services.bigquery.model.TableRow;import com.google.api.services.bigquery.model.TableSchema;public class StreamDemoConsumer { public static interface MyOptions extends DataflowPipelineOptions { @Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>") @Default.String("coexon-seoul-dev:ledger_data_set.ledger_data2") String getOutput(); void setOutput(String s); @Description("Input topic") @Default.String("projects/coexon-seoul-dev/topics/trading") String getInput(); void setInput(String s); } @SuppressWarnings("serial") public static void main(String[] args) throws IOException { MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); options.setStreaming(true); Pipeline p = Pipeline.create(options); String topic = options.getInput(); String output = options.getOutput();
添加回答
举报
0/150
提交
取消