您当前的位置:首页 >> 家居优品

大数据指导Flink整合ElasticSearch

2023-03-12 12:16:26

ype", "CONSTANT");

//3、透过下回用的时间较宽。对于基准型则对此应在的可有

config.put("bulk.flush.backoff.delay", "2");

//4、收场下回用的次数

config.put("bulk.flush.backoff.retries", "3");

其他的一些一般而言式:

bulk.flush.max.actions: 大批量读取时的最大读取条数 bulk.flush.max.size.mb: 大批量读取时的最大数据集量 bulk.flush.interval.ms: 大批量读取的时间较宽,一般而言式后则都会按照该时间较宽明文规定,不能接受上都会的两个大批量读取一般而言式

三、收场处置器

读取ES的时候很多时候由于ES集群缓冲区满了,或者路由挂丢,时常都会所致了读取操作分派收场。考虑到这样的收场读取场景,EsSink为应用程序提供了收场处置器组态,创建Sink普通人的时候,同时可以传入一个收场处置器,一旦出现读取收场的情况则都会程序在所传入的处置器用于错误完全恢复。具体的辞汇为:

DataStream input = ...;

input.addSink(new ElasticsearchSink<>(

config, transportAddresses,

new ElasticsearchSinkFunction() {...},

new ActionRequestFailureHandler() {

@Override

void onFailure(ActionRequest action,

Throwable failure,

int restStatusCode,

RequestIndexer indexer) throw Throwable {

if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {

// 将收场劝告再次加入缓冲区,后续透过下回用读取

indexer.add(action);

} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {

// 添加快捷键的处置逻辑

} else {

throw failure;

}

}

}));

如果仅仅只是想再次做收场下回用,也可以直接用到官方提供的预设的RetryRejectedExecutionFailureHandler,该处置器都会对EsRejectedExecutionException所致了到收场读取再次做下回用处置。

四、其他同样点

1. EsSink文档块必须用到try-catch-Exception来猎取

先前在用到EsSink的时候,为了防止某次读取收场造转成计算机系统中会断,对ElasticsearchSinkFunction的 process() 方式将用到try-catch-exception句子块透过了猎取,但实际运行的时候挖掘出计算机系统跑步着跑步着还是被一个 EsRejectedException 极度中会断丢了。让人显然明明对极度透过了猎取,为什么这个极度还是必需掀开来,原地通过检视软件包挖掘出,如果在初始化EsSink普通人的时候不能传入 ActionRequestFailureHandler 则都会用到预设的 ActionRequestFailureHandler ,这个处置器的软件包如下:

public class NoOpFailureHandler implements ActionRequestFailureHandler {

private static final long serialVersionUID = 737941343410827885L;

@Override

public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {

// 这里掀开的是一个throwable

throw failure;

}

可以看到,在遭遇极度的时候,预设的处置器都会将极度套装转成一个 Throw 普通人掀开,这就是直接用到 try-Exception 无法猎取到的原因。

解决方式将:

实现自己的收场处置器小肠丢极度 用到 throw 来猎取极度

该问题一定要重点项目同样,负责都会所致了实时侦查终止丢!

2. 收场下回用组态依赖于checkpoint

如果想尽办法用到EsSink的收场下回用组态,www.atguigu.com则需通过 env.enableCheckpoint() 方式将来触发Flink侦查对checkpoint的支持,如果不能触发checkpoint组态的话,则收场下回用策略是无法生效的。这个是通过跟踪 ElasticsearchSinkBase 类软件包的时候挖掘出的,框架的文档如下:

@Override

public void initializeState(FunctionInitializationContext context) throws Exception {

// no initialization needed

}

@Override

public void snapshotState(FunctionSnapshotContext context) throws Exception {

checkErrorAndRethrow();

//如果不能触发checkPoint组态,则该codice_为false,也就所致了下面的flush下回用文档不都会分派到

if (flushOnCheckpoint) {

do {

//收场下回用的良机是遭遇在计算机系统在打checkpoint的时候

bulkProcessor.flush();

checkErrorAndRethrow();

} while (numPendingRequests.get() != 0);

}

}

3. 总结

可以通过第二点贴出的软件包挖掘出,虽然EsSink实现了 CheckpointedFunction 接口,并且重写了checkPoint的系统性方式将,但其并不能墨守转成规的借助于checkpoint定义的那样借助于State组态用于损坏完全恢复。而是借助于了checkpoint的马尔季尼夫卡,定时分派的框架来实现了自己的一套收场下回用组态。

书评刊载来源于大数据集技术与Core

中会选阅读:

大数据集整合之Flink sql 的基础性辞汇

大数据集招聘Flink基础性知识分享

大数据集招聘Flink应征者用帝皇

大数据集招聘应征者用Flink读书人分享

石家庄哪家专科医院治疗包皮过长好
北京看妇科医院哪家最好
上海哪家专科医院治早泄阳痿好
杭州男科医院哪里比较好
天津看牛皮癣哪个医院好
相关阅读
友情链接