相关文章推荐

Repository files navigation

Flink-cos-fs

Flink-cos-fs 是腾讯云对象存储系统COS针对 Flink 的文件系统实现,并且支持了 recoverwriter 接口。 Flink 可以基于该文件系统实现读写 COS 上的数据以及作为流应用的状态后端。

支持 Linux 和 Mac OS 系统

Flink 1.10

编译或获取 Flink-cos-fs 发行包

编译 flink-cos-fs

flink-cos-fs 默认依赖的是 hadoop-3.1.0 的版本(即是于 flink-1.10 保持一致),使用如下命令即可编译打包:

mvn clean package -DskipTests

如果需要编译依赖 hadoop 其他版本的发行包,则需要手动修改项目根路径 pom.xml 中的 ${fs.hadoopshaded.version} 或者在编译命令中指定 -Dfs.hadoopshaded.version=3.x.x ,同时修改 flink-fs-hadoop-shaded 模块下的 org.apache.hadoop.util.VersionInfo.java 文件中的 hadoop 版本信息:

// ...
		if ("common".equals(component)) {
			info.setProperty("version", "3.1.0");
			info.setProperty("revision", "16b70619a24cdcf5d3b0fcf4b58ca77238ccbe6d");
			info.setProperty("branch", "branch-3.1.0");
			info.setProperty("user", "wtan");
			info.setProperty("date", "2018-04-03T04:00Z");
			info.setProperty("url", "[email protected]:hortonworks/hadoop-common-trunk.git");
			info.setProperty("srcChecksum", "14182d20c972b3e2105580a1ad6990");
			info.setProperty("protocVersion", "2.5.0");
// ...

特别地,如果需要编译 hadoop 2.x 的版本,除了上述操作以外,还需要在编译命令中,指定 maven 编译使用 hadoop-2 的配置:

mvn clean package -DskipTests -Phadoop-2 -Dfs.hadoopshaded.version=2.x.x

最后,编译完成以后,在 ${FLINK_COS_FS}/flink-cos-fs-hadoop/target 就可以得到 flink-cos-fs-hadoop-${flink.version}-{version}.jar 的依赖包。

下载地址: Flink-cos-fs release

从 Maven 中央仓库中依赖:

<dependencies>
    <dependency>
        <groupId>com.qcloud.cos</groupId>
        <artifactId>flink-cos-fs-hadoop</artifactId>
        <version>${flink.version>-${version></version>
    </dependency>
</dependencies>

安装Flink-cos-fs依赖

1.执行mkdir ${FLINK_HOME}/plugins/cos-fs-hadoop,在${FLINK_HOME}/plugins目录下创建flink-cos-fs-hadoop插件目录;

2.将对应版本的预编译包(flink-cos-fs-{flink.version}-{version}.jar)拷贝到${FLINK_HOME}/plugins/cos-fs-hadoop目录下;

3.在 ${FLINK_HOME}/conf/flink-conf.yaml 中添加一些 CosN 相关配置以确保 Flink 能够访问到 COS 存储桶,这里的配置键与 CosN 完全兼容,可参考hadoop-cos,必须配置信息如下:

fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
fs.cosn.userinfo.secretId: AKIDXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
fs.cosn.userinfo.secretKey: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
fs.cosn.bucket.region: ap-guangzhou
fs.cosn.bucket.endpoint_suffix: cos.ap-guangzhou.myqcloud.com

4.在作业的 write 或 sink 路径中填写格式为:cosn://bucket-appid/path的路径信息即可,例如:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 采用 Streaming File Sink 写入的话,必须启用 checkpoint,这里使用 COS 作为 StateBackend 举例子,也可以使用其他 checkpoint storage。
        env.setStateBackend(new FsStateBackend("cosn://bucket-name-125xxxxxx/checkpoint"));
        env.enableCheckpointing(1000);
        // 构造 Streaming File Sink 写入
        StreamingFileSink<String> streamingFileSink =
        StreamingFileSink.forRowFormat(
        new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
        .withRollingPolicy(
        DefaultRollingPolicy.builder()
        .withRolloverInterval(TimeUnit.SECONDS.toMillis(5))
        .withInactivityInterval(TimeUnit.SECONDS.toMillis(5))
        .withMaxPartSize(1024)
        .build())
        .build();

⚠️注意:如果使用 Streaming File Sink 方式写入,需要同时启用 Flink 的 checkpoint,否则写入的数据始终处于 inprogress 不可见状态,无法被读取。

以下给出 Flink Job 读写 COS 的示例代码:

// Read from COS bucket
env.readTextFile("cosn://<bucket-appid>/<object-name>");
// Write to COS bucket
stream.writeAsText("cosn://<bucket-appid>/<object-name>");
// Use COS as FsStatebackend
env.setStateBackend(new FsStateBackend("cosn://<bucket-appid>/<object-name>"));
// Use the streamingFileSink which supports the recoverable writer
StreamingFileSink<String> fileSink  =  StreamingFileSink.forRowFormat(
                new Path("cosn://<bucket-appid>/<object-name>"),new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(build).build();

所有配置说明

fs.cosn.bucket.endpoint_suffix 指定要连接的COS endpoint,该项为非必填项目。对于公有云COS用户而言,只需要正确填写上述的region配置即可。兼容原配置项:fs.cosn.userinfo.endpoint_suffix。 fs.cosn.userinfo.secretId/secretKey 填写您账户的API 密钥信息。可通过 云 API 密钥 控制台 查看。 fs.cosn.impl cosn对FileSystem的实现类,固定为 org.apache.hadoop.fs.CosFileSystem。 fs.AbstractFileSystem.cosn.impl cosn对AbstractFileSy stem的实现类,固定为org.apache.hadoop.fs.CosN。 fs.cosn.bucket.region 请填写您的地域信息,枚举值为 可用地域 中的地域简称,如ap-beijing、ap-guangzhou等。 兼容原配置项:fs.cosn.userinfo.region。 fs.cosn.tmp.dir 请设置一个实际存在的本地目录,运行过程中产生的临时文件会暂时放于此处。 /tmp/hadoop_cos fs.cosn.block.size CosN文件系统每个block的大小,默认为128MB ‭134217728‬(128MB) fs.cosn.upload.buffer CosN文件系统上传时依赖的缓冲区类型。当前支持三种类型的缓冲区:非直接内存缓冲区(non_direct_memory),直接内存缓冲区(direct_memory),磁盘映射缓冲区(mapped_disk)。非直接内存缓冲区使用的是JVM堆内存,直接内存缓冲区使用的是堆外内存,而磁盘映射缓冲区则是基于内存文件映射得到的缓冲区。 mapped_disk fs.cosn.upload.buffer.size CosN文件系统上传时依赖的缓冲区大小,如果指定为-1,则表示不限制。若不限制缓冲区大小,则缓冲区类型必须为mapped_disk。如果指定大小大于0,则要求该值至少大于等于一个block的大小。兼容原配置项:fs.cosn.buffer.size。 134217728(128MB) fs.cosn.upload.part.size 分块上传时每个part的大小。由于 COS 的分块上传最多只能支持10000块,因此需要预估最大可能使用到的单文件大小。例如,part size 为8MB时,最大能够支持78GB的单文件上传。 part size 最大可以支持到2GB,即单文件最大可支持19TB。 8388608(8MB) fs.cosn.upload_thread_pool 文件流式上传到COS时,并发上传的线程数目 fs.cosn.copy_thread_pool 目录拷贝操作时,可用于并发拷贝和删除文件的线程数目 fs.cosn.read.ahead.block.size 预读块的大小 ‭1048576‬(1MB) fs.cosn.read.ahead.queue.size 预读队列的长度 fs.cosn.maxRetries 访问COS出现错误时,最多重试的次数 fs.cosn.retry.interval.seconds 每次重试的时间间隔 fs.cosn.max.connection.num 配置COS连接池中维持的最大连接数目,这个数目与单机读写COS的并发有关,建议至少大于或等于单机读写COS的并发数 fs.cosn.customer.domain 配置COS的自定义域名,默认为空 fs.cosn.server-side-encryption.algorithm 配置COS服务端加密算法,支持SSE-C和SSE-COS,默认为空,不加密 fs.cosn.server-side-encryption.key 当开启COS的SSE-C服务端加密算法时,必须配置SSE-C的密钥,密钥格式为base64编码的AES-256密钥,默认为空,不加密 fs.cosn.crc64.checksum.enabled 是否开启CRC64校验。默认不开启,此时无法使用hadoop fs -checksum命令获取文件的CRC64校验值。 false fs.cosn.traffic.limit 上传下载带宽的控制选项,819200 ~ 838860800,单位为bits/s。默认值为-1,表示不限制。

使用POSIX Bucket配置说明

fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl 元数据加速实现类,com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter fs.cosn.trsf.fs.ofs.impl 元数据加速实现类,com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter fs.cosn.trsf.fs.ofs.tmp.cache.dir 元数据加速临时目录,给足空间及权限,例如/data/emr/hdfs/tmp/chdfs/ fs.cosn.trsf.fs.ofs.user.appid 客户bucket对应的appid fs.cosn.trsf.fs.ofs.bucket.region 客户bucket所在的园区 fs.cosn.trsf.fs.ofs.upload.flush.flag 客户端flush开关,flink on cos场景必须设置为 true false
  • Flink 既可以通过hadoop-cos读写 COS 中的对象文件,也可以通过 flink-cos-fs 来读写,这两种有什么区别?
  • hadoop-cos 实现了 Hadoop 的兼容文件系统语义,Flink 可以通过写 Hadoop 兼容文件系统的形式写入数据到 COS 中,但是这种方式不支持的 Flink 的 recoverable writer 写入,当你使用 streamingFileSink 写入数据时,要求底层文件系统支持recoverable writer。 因此,flink-cos-fs 基于 Hadoop-COS (CosN) 扩展实现了 Flink 的recoverable writer,完整地支持了 Flink 文件系统的语义,因此推荐使用它来访问 COS 对象。

    目前由于元数据加速桶静态包使用loadjar方式,如何shade了org.apache.hadoop 会导致network和loadjar的pkg路径不同 1)解决方式通过COSNFileSystemFactory的useFlinkShade开启,pom.xml of flink-cos-fs-hadoop开启shade 2)或者关闭useFlinkShade,去掉shade 3)todo: 完善ofs sdk flink或其他框架可以指定shade加载不同动态包实现all shade模式

     
    推荐文章