大数据指导Flink整合ElasticSearch
2023-03-12 12:16:26
//3、透过下回用的时间较宽。对于基准型则对此应在的可有
config.put("bulk.flush.backoff.delay", "2");
//4、收场下回用的次数
config.put("bulk.flush.backoff.retries", "3");
其他的一些一般而言式:
bulk.flush.max.actions: 大批量读取时的最大读取条数 bulk.flush.max.size.mb: 大批量读取时的最大数据集量 bulk.flush.interval.ms: 大批量读取的时间较宽,一般而言式后则都会按照该时间较宽明文规定,不能接受上都会的两个大批量读取一般而言式
三、收场处置器
读取ES的时候很多时候由于ES集群缓冲区满了,或者路由挂丢,时常都会所致了读取操作分派收场。考虑到这样的收场读取场景,EsSink为应用程序提供了收场处置器组态,创建Sink普通人的时候,同时可以传入一个收场处置器,一旦出现读取收场的情况则都会程序在所传入的处置器用于错误完全恢复。具体的辞汇为:
DataStream input = ...;
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// 将收场劝告再次加入缓冲区,后续透过下回用读取
indexer.add(action);
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// 添加快捷键的处置逻辑
} else {
throw failure;
}
}
}));
如果仅仅只是想再次做收场下回用,也可以直接用到官方提供的预设的RetryRejectedExecutionFailureHandler,该处置器都会对EsRejectedExecutionException所致了到收场读取再次做下回用处置。
四、其他同样点
1. EsSink文档块必须用到try-catch-Exception来猎取
先前在用到EsSink的时候,为了防止某次读取收场造转成计算机系统中会断,对ElasticsearchSinkFunction的 process() 方式将用到try-catch-exception句子块透过了猎取,但实际运行的时候挖掘出计算机系统跑步着跑步着还是被一个 EsRejectedException 极度中会断丢了。让人显然明明对极度透过了猎取,为什么这个极度还是必需掀开来,原地通过检视软件包挖掘出,如果在初始化EsSink普通人的时候不能传入 ActionRequestFailureHandler 则都会用到预设的 ActionRequestFailureHandler ,这个处置器的软件包如下:
public class NoOpFailureHandler implements ActionRequestFailureHandler {
private static final long serialVersionUID = 737941343410827885L;
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
// 这里掀开的是一个throwable
throw failure;
}
可以看到,在遭遇极度的时候,预设的处置器都会将极度套装转成一个 Throw 普通人掀开,这就是直接用到 try-Exception 无法猎取到的原因。
解决方式将:
实现自己的收场处置器小肠丢极度 用到 throw 来猎取极度该问题一定要重点项目同样,负责都会所致了实时侦查终止丢!
2. 收场下回用组态依赖于checkpoint
如果想尽办法用到EsSink的收场下回用组态,www.atguigu.com则需通过 env.enableCheckpoint() 方式将来触发Flink侦查对checkpoint的支持,如果不能触发checkpoint组态的话,则收场下回用策略是无法生效的。这个是通过跟踪 ElasticsearchSinkBase 类软件包的时候挖掘出的,框架的文档如下:
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// no initialization needed
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkErrorAndRethrow();
//如果不能触发checkPoint组态,则该codice_为false,也就所致了下面的flush下回用文档不都会分派到
if (flushOnCheckpoint) {
do {
//收场下回用的良机是遭遇在计算机系统在打checkpoint的时候
bulkProcessor.flush();
checkErrorAndRethrow();
} while (numPendingRequests.get() != 0);
}
}
3. 总结
可以通过第二点贴出的软件包挖掘出,虽然EsSink实现了 CheckpointedFunction 接口,并且重写了checkPoint的系统性方式将,但其并不能墨守转成规的借助于checkpoint定义的那样借助于State组态用于损坏完全恢复。而是借助于了checkpoint的马尔季尼夫卡,定时分派的框架来实现了自己的一套收场下回用组态。
书评刊载来源于大数据集技术与Core
中会选阅读:
大数据集整合之Flink sql 的基础性辞汇
大数据集招聘Flink基础性知识分享
大数据集招聘Flink应征者用帝皇
大数据集招聘应征者用Flink读书人分享
。石家庄哪家专科医院治疗包皮过长好北京看妇科医院哪家最好
上海哪家专科医院治早泄阳痿好
杭州男科医院哪里比较好
天津看牛皮癣哪个医院好
- “真供不起母亲上大学了”,甘肃妈妈公布母亲大学4年最低花费!
- 《知否》老太太让明兰管家,从不待见明兰的盛紘为什么可能会同意
- 去年险资举牌依旧保持个位数,整体已呈稳定下来迹象
- 张薇事件调查结果:河南大学只不过在拖延什么?
- 故事:民间故事:冯工头
- 起航2024权益的产品 险资绘制哑铃型配置“航海图”
- 北清招生摊档门可罗雀,考生是“高攀不起”,还是“不屑一顾”?
- 为什么不能得罪基层小人?这三点太真实,让你后背发凉
- 第32家银行理财子来了!浙银理财苦等3年头拿批文,为去年唯一获批筹建银行理财子公司
- 15岁女生因有纹身被考上上热搜:为什么大众无法容忍有纹身的人?
- 知否:明兰的丫鬟中,小桃单纯,丹橘贴心,翠微全家留在明兰偷偷
- 18K金备用价格多少钱一克(2023年12月29日)
- 清华大学班级遇冷,多名考生选择投身国防,拒绝清华大学
- 日本的野心,藏在28字“国歌”中,翻译成中文与秦始皇的话都是
- 2023年外汇商品表现大比拼:美元告一段落连涨势头 黄金价格再创新高
- 2000年,河南14岁神童高考750分上南开大学,赴美留学后如今怎么样了
- 一图看懂|全球大类资产年终简介:最赚钱竟然是它!
- 西楚霸王项梁为啥不称帝,而是选择和其它的诸侯一起平分天下?
- 一边清北忙辟谣,一边国防七子受推崇:追逐人造卫星梦,报效祖国!
- 2023黄金价格屡创新很高,2024还值得投资黄金吗?