Flink入门级应用域名处理示例

 更新时间:2022年03月21日 17:54:48   作者:andandan  
这篇文章主要介绍了一个比较简单的入门级Flink应用,代码很容易写,主要用到的算子有FlatMap、KeyBy、Reduce,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
(福利推荐:【腾讯云】服务器最新限时优惠活动,云服务器1核2G仅99元/年、2核4G仅768元/3年,立即抢购>>>:9i0i.cn/qcloud

(福利推荐:你还在原价购买阿里云服务器?现在阿里云0.8折限时抢购活动来啦!4核8G企业云服务器仅2998元/3年,立即抢购>>>:9i0i.cn/aliyun

概述

最近做了一个小任务,要使用Flink处理域名数据,在4GB的域名文档中求出每个域名的顶级域名,最后输出每个顶级域名下的前10个子级域名。一个比较简单的入门级Flink应用,代码很容易写,主要用到的算子有FlatMap、KeyBy、Reduce。但是由于Maven打包问题,总是提示找不到入口类,卡了好久,最后也是成功解决了。

主体代码如下:

public class FlinkStreamingTopDomain {
    public static void main(String[] args) throws Exception{
        // 获取流处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取kafkaConsumer
        FlinkKafkaConsumer<String> kafkaConsumer = FlinkUtil.getKafkaConsumer("ahl_test1", "console-consumer-72096");
        // 从当前消费组下标开始读取
        kafkaConsumer.setStartFromEarliest();
        DataStreamSource text = env.addSource(kafkaConsumer);

        // 算子
        DataStream<Tuple2<String,String>> windowCount = text.flatMap(new FlatMap())
                .keyBy(0).reduce(new Reduce());
        //把数据打印到控制台
        windowCount.print()
                .setParallelism(16);//使用16个并行度
        //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming topDomain calculate");
    }
}

算子

FlatMap

Flatmap是对一行字符进行处理的,官网上的解释如下

FlatMap
DataStream → DataStream
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

其实和Hadoop的Map差不多,都是把一行字符串进行处理,得到我们想要的<key,value>,不同之处在于Map处理后得到的是<key,values[]>。即Hadoop的Map操作会按key自动的将value处理成数组的形式,而Flink的FlatMap算子只会把每行数据处理成key、value。

下面是我处理业务的FlatMap代码

    // FlatMap分割域名,并输出二元组<顶级域名,域名>
    public static class FlatMap implements FlatMapFunction<String, Tuple2<String,String>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, String>> out) throws Exception {
            String[] values = s.split("\\^");   // 按字符^分割
            if(values.length - 1 < 2) {
                return;
            }
            String domain = values[2];
            out.collect(new Tuple2<String,String>(ToolUtil.getTopDomain(domain),domain));
        }
    }

我这里把数据处理成了二元组形式,之后reduce也是对这个二元组进行处理。

KeyBy

先来看看官网的解释

KeyBy
DataStream → KeyedStream
    
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.

This transformation returns a KeyedStream, which is, among other things, required to use keyed state.

dataStream.keyBy(value -&gt; value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -&gt; value.f0) // Key by the first element of a Tuple

Attention:A type cannot be a key if:
    1.it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
    2.it is an array of any type.   

keyBy会按照一个keySelector定义的方式进行哈希分区,会将一个流分成多个Partition,相同key的会被分在同一个分区,经过keyBy的流变成KeyedStream。

需要注意的有两点:

1.pojo类型作为key,必须重写hashcode()方法

2.数组类型不能作为key

Reduce

官网的解释如下

Reduce
KeyedStream → DataStream
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

reduce是进行”滚动“处理的,即reduce方法的第一个参数是当前已经得到的结果记为currentResult,第二个参数是当前要处理的<key,value>。流式计算会一条一条的处理数据,每处理完一条数据就得到新的currentResult。

业务处理代码如下

    // 拼接同一分区下的ip
    public static class Reduce implements ReduceFunction<Tuple2<String,String>>{
        @Override
        public Tuple2<String,String> reduce(Tuple2 t1, Tuple2 t2) throws Exception {
            String[] domains = t1.f1.toString().split("\\^");
            if(domains.length == 10){
                return t1;
            }
            t1.f1 = t1.f1.toString() + "^" + t2.f1.toString();
            System.out.println(t1.f1 );
            return t1;
        }
   }

连接socket测试

1.将主体代码里的kafka获取数据,改成socket获取数据

//        int port;
//        try {
//            ParameterTool parameterTool = ParameterTool.fromArgs(args);
//            port = parameterTool.getInt("port");
//        } catch (Exception e){
//            System.out.println("没有指定port参数,使用默认值1112");
//            port = 1112;
//        }

        // 连接socket获取输入数据
//        DataStreamSource<String> text = env.socketTextStream("192.168.3.221",port);

2.在服务器开启一个端口号:nc -l -p 1112

3.运行代码

4.服务器输入测试数据就可以实时的获取处理结果

连接kafka

正式

使用kafka命令创建主题

kafka-topics.sh --create --zookeeper IP1:2181 IP2:2181... --replication-factor 2 --partitions 16 --topic ahl_test

kafka建立topic需要先开启zookeeper

运行生产者jar包,用生产者读取数据

java -jar $jar包路径  $topic $path

测试

另外,还可以使用测试生产者实现和socket测试相同的效果

/kafka-console-producer.sh --broker-list slave3:9092 --topic ahl_test1

打包上传服务器

打包上传服务器注意不要使用idea提供的build方式,反正我使用build会一直报错找不到主类,即便我反编译jar包发现主类在里面,并且MF文件也有配置主类信息。这个问题卡了我很久,最后我使用mvn pakage的方式打包并运行成功,把我的打包插件贴出来帮助遇到和我相同问题的人

<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<!--							<createDependencyReducedPom>false</createDependencyReducedPom>-->
							<artifactSet>
								<excludes>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer
										implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.ncs.flink.streaming.FlinkStreamingTopDomain</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>

Flink运行指令为:

/home/soft/flink-1.12.0//bin/flink run -c com.ncs.flink.streaming.FlinkStreamingDomainJob /home/ahl/flink/situation-mapred-flink-0.0.1-SNAPSHOT.jar

或者可以访问Flink集群的8081端口,在提供的UI页面上传运行

以上就是Flink入门级应用域名处理示例的详细内容,更多关于Flink域名处理的资料请关注程序员之家其它相关文章!

相关文章

  • Java的枚举,注解和反射(一)

    Java的枚举,注解和反射(一)

    今天小编就为大家分享一篇关于Java枚举,注解与反射原理说明,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2021-07-07
  • 浅谈@RequestBody和@RequestParam可以同时使用

    浅谈@RequestBody和@RequestParam可以同时使用

    这篇文章主要介绍了@RequestBody和@RequestParam可以同时使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • Java实现调用MySQL存储过程详解

    Java实现调用MySQL存储过程详解

    相信大家都知道存储过程是在大型数据库系统中,一组为了完成特定功能的SQL语句集。存储过程是数据库中的一个重要对象,任何一个设计良好的数据库应用程序都应该用到存储过程。Java调用mysql存储过程,实现如下,有需要的朋友们可以参考借鉴,下面来一起看看吧。
    2016-11-11
  • 浅析java异常栈

    浅析java异常栈

    给大家通过一个简单的代码实例给大家分型了java异常栈问题,需要的朋友参考一下吧。
    2017-12-12
  • java开放地址法和链地址法解决hash冲突的方法示例

    java开放地址法和链地址法解决hash冲突的方法示例

    这篇文章主要介绍了java开放地址法和链地址法解决hash冲突的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-12-12
  • Java中判断字符串是否相等的实现

    Java中判断字符串是否相等的实现

    这篇文章主要介绍了Java中判断字符串是否相等的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • MyBatis?Plus如何实现获取自动生成主键值

    MyBatis?Plus如何实现获取自动生成主键值

    这篇文章主要介绍了MyBatis?Plus如何实现获取自动生成主键值问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • Java实现简单聊天机器人

    Java实现简单聊天机器人

    这篇文章主要为大家详细介绍了Java实现简单聊天机器人,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-07-07
  • Mybatis中updateBatch实现批量更新

    Mybatis中updateBatch实现批量更新

    本文主要介绍了Mybatis中updateBatch实现批量更新,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03
  • MyBatisPlus中@TableField注解的基本使用

    MyBatisPlus中@TableField注解的基本使用

    这篇文章主要介绍了MyBatisPlus中@TableField注解的基本使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07

最新评论

?


http://www.vxiaotou.com