最近更新时间 : 2023.08.02 14:36:25
首次发布时间 : 2022.05.26 14:18:32
Flink 是一个面向有限流和无限流有状态计算的分布式计算框架,它能够支持流处理和批处理两种应用类型。 本文介绍如何配置 EMR 中的 Flink 服务使用和访问 CloudFS。
说明
集群所有节点都要修改如下配置。
连接 E-MapReduce 集群,连接方式如下:
下载 CloudFS 的 SDK 包至 E-MapReduce 集群指定存储位置。下载地址: inf.hdfs.cfs_sdk_deploy_1.4.1.tar.gz
解压后将 SDK 目录下的 cloudfs-hadoop-with-dependencies-cfs-1.4.1.jar 文件复制到 Hadoop 的 /hadoop/hdfs 目录下。
cloudfs-hadoop-with-dependencies-cfs-1.4.1.jar
/hadoop/hdfs
cp {Directory}/cloudfs-hadoop-with-dependencies-cfs-1.4.1.jar /{Directory}/hadoop/hdfs/
配置 core-site.xml 文件。
core-site.xml
vim {hadoop_安装目录}/hadoop/conf/core-site.xml
<property> <name>fs.defaultFS</name> <value>cfs://xxxx.cfs-cn-beijing.ivolces.com</value> <!-- 填写获取的挂载点地址 --> </property> <property> <name>fs.cfs.impl</name> <value>com.volcengine.cloudfs.fs.CfsFileSystem</value> </property> <property> <name>fs.AbstractFileSystem.cfs.impl</name> <value>com.volcengine.cloudfs.fs.CFS</value> </property> <property> <name>cfs.access.key</name> <value>AKxxxxxxxxxxx</value> <!-- 填写访问密钥ID --> </property> <property> <name>cfs.secret.key</name> <value>SKxxxxxxxxxxx</value> <!-- 填写私有访问密钥--> </property> <!-- 可选:如果使用的是 STS Token,需要填写 --> <property> <name>cfs.security.token</name> <value>STSTokenxxxxxxxx</value> </property> <!-- 可选:如果开启缓存加速,需要配置缓存加速接入的 VPC 的网段 --> <property> <name>cfs.client.network.segment</name> <value><VPC 网段,例如 192.168.0.0/16></value> </property>
将解压后的 SDK 目录下的 cloudfs-hadoop-with-dependencies-cfs-1.4.1.jar 文件复制到 Flink lib 文件夹下。
Flink lib
cp {Directory}/cloudfs-hadoop-with-dependencies-cfs-1.4.1.jar /{Directory}/flink/lib/
重启 Flink 服务。
需自行上传测试数据。
准备测试数据。
hdfs dfs -ls cfs://`{`Directory`}/`{document} <!-- 查看上传目录下的文件 -->
执行 Flink 计算命令。
export HADOOP_CONF_DIR=/`{`Directory`}`/hadoop/conf <!-- 该命令为指定环境使用于你的hadoop下的conf文件夹 --> flink run -t local /`{`Directory`}`/flink/examples/batch/WordCount.jar \ ---input cfs://`{`Directory`}`/{document} \ <!-- 此处文件夹下的数据作为输入 --> --output cfs://`{`Directory`}`/{document} <!-- 此处为经过WordCount计算后的输出文件 -->
查看计算结果。
hdfs dfs -cat cfs://`{`Directory`}`/{document} <!-- 此处为计算后的文件路径 -->
返回如下图类似信息,则表示配置 Flink 成功。
前提条件
步骤一:配置 CloudFS 服务
步骤二:配置 Flink
步骤三:验证 Flink 配置