为了账号安全,请及时绑定邮箱和手机立即绑定

无法获取应用程序默认凭据。在本地运行

无法获取应用程序默认凭据。在本地运行

梵蒂冈之花 2021-10-13 16:15:10
我正在尝试使用此示例从 DataFlow 的 GCP Pub/Sub 检索数据。import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.IOException;import java.time.Instant;import java.util.ArrayList;import java.util.List;import avro.shaded.com.google.common.collect.Lists;import com.google.auth.oauth2.GoogleCredentials;import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;import org.apache.beam.sdk.options.Default;import org.apache.beam.sdk.options.Description;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.DoFn;import org.apache.beam.sdk.transforms.ParDo;import org.apache.beam.sdk.transforms.Sum;import org.apache.beam.sdk.transforms.windowing.SlidingWindows;import org.apache.beam.sdk.transforms.windowing.Window;import org.joda.time.Duration;import com.google.api.services.bigquery.model.TableFieldSchema;import com.google.api.services.bigquery.model.TableRow;import com.google.api.services.bigquery.model.TableSchema;public class StreamDemoConsumer {    public static interface MyOptions extends DataflowPipelineOptions {        @Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>")        @Default.String("coexon-seoul-dev:ledger_data_set.ledger_data2")        String getOutput();        void setOutput(String s);        @Description("Input topic")        @Default.String("projects/coexon-seoul-dev/topics/trading")        String getInput();                void setInput(String s);    }    @SuppressWarnings("serial")    public static void main(String[] args) throws IOException {        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);        options.setStreaming(true);        Pipeline p = Pipeline.create(options);        String topic = options.getInput();        String output = options.getOutput();
查看完整描述

1 回答

  • 1 回答
  • 0 关注
  • 148 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信