为了账号安全,请及时绑定邮箱和手机立即绑定

Apache Flink (v1.6.0) 验证 Elasticsearch Sink (v6.4)

Apache Flink (v1.6.0) 验证 Elasticsearch Sink (v6.4)

ibeautiful 2021-09-12 15:50:38
我正在使用 Apache Flink v1.6.0 并且我正在尝试写入 Elasticsearch v6.4.0,它托管在Elastic Cloud 中。我在对 Elastic Cloud 集群进行身份验证时遇到问题。我已经能够使用以下代码让 Flink 写入本地 Elasticsearch v6.4.0 节点,该节点没有加密:/*    Elasticsearch Configuration*/List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));// use a ElasticsearchSink.Builder to create an ElasticsearchSinkElasticsearchSink.Builder<ObjectNode> esSinkBuilder = new ElasticsearchSink.Builder<>(        httpHosts,        new ElasticsearchSinkFunction<ObjectNode>() {            private IndexRequest createIndexRequest(ObjectNode payload) {                // remove the value node so the fields are at the base of the json payload                JsonNode jsonOutput = payload.get("value");                return Requests.indexRequest()                        .index("raw-payload")                        .type("payload")                        .source(jsonOutput.toString(), XContentType.JSON);            }            @Override            public void process(ObjectNode payload, RuntimeContext ctx, RequestIndexer indexer) {                indexer.add(createIndexRequest(payload));            }        });// set number of events to be seen before writing to ElasticsearchesSinkBuilder.setBulkFlushMaxActions(1);// finally, build and add the sink to the job's pipelinestream.addSink(esSinkBuilder.build());然而,当我尝试添加验证到代码库中,作为记录在这里的弗林克文档和这里对应Elasticsearch Java文档上。看起来像这样:// provide a RestClientFactory for custom configuration on the internally created REST clientHeader[] defaultHeaders = new Header[]{new BasicHeader("username", "password")};esSinkBuilder.setRestClientFactory(        restClientBuilder -> {            restClientBuilder.setDefaultHeaders(defaultHeaders);        });谁能帮忙指出我哪里出错了?
查看完整描述

2 回答

?
侃侃尔雅

TA贡献1801条经验 获得超16个赞

override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {

        // TODO Additional rest client args go here - authentication headers for secure connections etc...

      }

    })

我希望这可以帮助你。


查看完整回答
反对 回复 2021-09-12
  • 2 回答
  • 0 关注
  • 169 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信