什么是apache flink?开发人员入门指南,新的遗物账户体验团队建立了我们的第一个Flink应用程序,我们称之为使用计算器。此应用程序从中收到数百万条消息New Relic APM基础设施, 和合成词每天代理商。消息进来了卡夫卡主题并包含来自代理的关于其检入频率的数据;具体来说,我们感兴趣的是用户每天使用New Relic的时间。

使用计算器接收消息并在将数据发送到新的遗物数据库(此时,客户可以查询并搜索)之前24小时收到消息并在其上持续24小时。我们在每24小时内收集和持有的数据是我们所谓的状态,一种不间断的数据记录。例如,一个APM代理检入New Relic, Usage Calculator会查看我们是否已经有该代理的数据,如果有,它会将新消息与它已经拥有的任何消息结合起来,只产生一个记录——这就是状态。如果我们的应用程序重新启动——或者更糟,下降——我们仍然有数据记录。但更重要的是,我们只存储一条记录,这有助于我们优化状态。

为什么优化状态如此重要?

当我们在状态下保存更少的数据块时,我们可以安全地对应用程序进行更多更改,而不必担心数据丢失。当我们部署使用计算器时,Flink自动从备份文件中恢复状态。只要我们部署的更改不改变状态,我们就可以在不丢失数据的情况下进行部署。此外,保持我们需要的最少数据可以使状态存储保持在可管理的大小,这减少了Flink写出备份文件(称为check/savepoints)所需的时间。减少写入检查/保存点的时间是至关重要的,因为它必须在下一次检查/保存点开始之前完成。

数据中的使用计算器流在数据中,转换和折叠它,并用地图折叠并减少操作,然后将其退回。最小化状态的地图和减少操作是维持应用程序灵活性和可靠性的关键。

在我们的地图中并减少操作

优化状态的最佳方法是在传入数据模型和传出数据模型之间引入状态模型。在我们早期的尝试中,我们专注于直接将传入数据转换为传出数据格式,然后重新转换。但每次向传出数据格式添加新字段时,我们都必须在部署我们的应用程序时删除状态。

在我们最终的、有效的解决方案中,我们开发了一个模式,在这个模式中我们接受传入的数据,实现一个状态模型,它只有减少所需的最小属性,减少数据,最后装饰和转换它,以适应传出的模型。这种状态模型方法最大限度地减少了CPU使用,因为装饰和转换只在每条减少的消息上发生一次,而不是在每条传入的消息上。

以下是代码中的方式:

公共静态孔隙declareStream (StreamExecutionEnvironment streamEnv) {SingleOutputStreamOperator < NrDailyHost > infrastructureHostStream = streamEnv.addSource (generateConsumer ()) .assignTimestampsAndWatermarks(新BoundedOutOfOrdernessTimestampExtractor < AgentMessage > ())) . map (HostState::新).keyBy(“consumingAccountId”,reduce(new ReduceHostUsageFunction()) .map(new MapHostStateToNrDailyHostFunction());declareKafkaWriter (infrastructureHostStream);}

首先,我们在流中读取并将消息转换为AgentMessage.,传入的线框模型。然后我们将其映衬到Hoststate.,最小化状态模型。我们在最终填充到消息之前执行减少步骤nrdailyhost.,传出线框模型。

让我们更仔细地检查每一个步骤。

线框模型

这个模型与输入的数据完全匹配。

public class AgentMessage {private long timestamp milliseconds ssinceepoch;私人字符串agentVersion;私人字符串agentLang;私人int accountId;私人字符串主机名;私人整数logicalProcessors;私人长totalRamBytes;私人int metadataVersion;}

状态模型

接下来我们实例化状态模型,该模型由我们需要的属性组成。在这种情况下,我们关键消费accountid.主机名,我们跟踪每条新信息的使用时间。我们也挂在了AgentMessage.这样我们就可以在以后的步骤中使用它。

公共类Hoststate {Private Int ConsualIngountID;私有哈希集 epochhoursused = new hashset (24);私人AgentMessage Agentmessage;私人字符串主机名;public hoststate(AgentMessage AgentMessage){this.agentmessage = AgentMessage;epochhoursused.add(AgentMessage.getTimestampmilliseCondsSinnePoch());fourceingAccountID = AgentMessage.getAccountId();hostname = AgentMessage.gethostName();}}

使用状态模型进行减少

我们聚合的唯一数据是使用New Relic的小时数,这是我们与Hoststate..这个简化是Flink中的有状态操作符,因此它与输入和输出模型(Hoststate.).从输出线框中隔离减少(nrdailyhost.)允许我们在不对状态产生负面影响的情况下突变输出线框。

公共宿舍减少(hoststate value1,hoststate value2)抛出异常{if(value1!= null){value2.getepochhoursused()。addall(value1.getopochhoursused());}返回value2;}

传出线框模型

最后,我们可以使用映射函数将消息展开并注释到传出数据模型中。现在,我们已经有了聚合字段以及创建实例所需的所有内容nrdailyhost.可以写入Kafka主题。在这个例子中,nrdailyhost.还有许多需要计算的其他字段,因此我们只有在邮件减少而不是每个传入消息时才获得执行此操作的好处。

在路上让事情变得更容易

为了满足我们在第一个Flink应用中的需求,使用map和reduce操作来接收传入数据、实现状态,并将数据转换为适合输出模式,这非常容易实现,并且值得我们节省处理时间。随着时间的推移,我们可能会对这个应用程序进行更改,特别是处理传入或传出模型的管道部分。但是如果我们确实需要进行更改,我们可以在不丢失状态的情况下轻松地部署它们。