我的第一个flink

it2022-05-05  125

之前看了视频学习第一个flink  word count使用,但是对于socket发送数据作为数据源我这里有点忘记了,加上最近有个项目要发布,一直在忙,所以迟迟无法完成;

1、首先我们要有数据源,因为不论是流计算处理还是批次处理,都需要数据源,然后经过transformation转换成我们想要的数据输出到某个地方,这里我们就输出到控制台即可;

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; public class SocketTalkServer { public static void main(String[] args) { try { ServerSocket server = null; // 创建一个端口为9000监听客户端请求的serversocket try { server = new ServerSocket(9000); System.out.println("服务端启动成功:服务端端口号为9000"); } catch (IOException e) { // 如果连接不上,打印出错信息 System.out.println("can not listen to:"+e); } Socket serverSocket = null; try { // 使用accept()阻塞等待客户请求,有客户请求则产生一个Socket对象,并继续执行 serverSocket = server.accept(); // 有客户端连接 System.out.println("有个客户端连接:"+serverSocket.getInetAddress()+":"+serverSocket.getPort()); } catch (IOException e) { // 客户端请求异常 System.out.println(e); } String line; // 通过Socket对象得到输出流,构造printwriter对象 PrintWriter serverPrintWriter = new PrintWriter(serverSocket.getOutputStream()); // 通过控制台构造bufferedreader对象 BufferedReader serverInput = new BufferedReader(new InputStreamReader(System.in)); // 服务端控制台上输入的数据源字符串 String serverLine = serverInput.readLine(); // 如果输入bye,停止循环 while (!serverLine.equals("bye")){ // 向客户端输出字符串 serverPrintWriter.println(serverLine); // 刷新输出流 serverPrintWriter.flush(); // 在系统控制台上打印输入的内容; System.out.println("Server:"+serverLine); // 继续输入然后重新读取字符串 serverLine = serverInput.readLine(); } serverPrintWriter.close(); serverSocket.close(); server.close(); } catch (Exception e) { e.printStackTrace(); } } }

2、编写flink计算程序,也是我的第一个程序,这里有几个步骤,我觉着视频中的老师写的非常好,就抄过来了,十分易于理解:

package com.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception { // 获取所需要的端口号 int port = 9000; // try{ // ParameterTool parameterTool = ParameterTool.fromArgs(args); // port = parameterTool.getInt("port");} // catch (Exception e){ // System.err.println("no port specified. use default 9000"); // port = 9000; // } // 获取flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "127.0.0.1"; String delimiter = "\n"; // 链接socket获取输入的数据 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); DataStream<WordIsCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordIsCount>() { @Override public void flatMap(String value, Collector<WordIsCount> out) throws Exception { String[] words = value.split("\\s"); for (String word : words) { out.collect(new WordIsCount(word, 1L)); } } }).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))// 指定时间窗口大小为2秒,指定时间间隔为1秒 .sum("count");// 在这里使用sum或者reduce都可以 // 将数据打印到控制台,并设置并行度 windowCounts.print().setParallelism(1); // 这一行代码一定要实现,否则不执行 env.execute("socket window count"); } public static class WordIsCount{ public String word; public long count; public WordIsCount(String word, long count) { this.word = word; this.count = count; } public WordIsCount() { } @Override public String toString() { return "WordIsCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }

 

转载于:https://www.cnblogs.com/AlanWilliamWalker/p/10444116.html

相关资源:各显卡算力对照表!

最新回复(0)