怎样利用Spark Streaming和Hadoop实现近实时的会话连接
展开全部
这个 Spark Streaming 样例是一个可持久化到Hadoop近实时会话的很好的例子。
Spark Streaming 是Apache Spark 中最有趣的组件之一。你用Spark Streaming可以创建数据管道来用批量加载数据一样的API处理流式数据。此外,Spark Steaming的“micro-batching”方式提供相当好的弹性来应对一些原因造成的任务失败。
在这篇文章中,我将通过网站的事件近实时回话的例子演示使你熟悉一些常见的和高级的Spark Streaming功能,然后加载活动有关的统计数据到Apache HBase,用不喜欢的BI用具来绘图分析。 (Sessionization指的是捕获的单一访问者的网站会话时间范围内所有点击流活动。)你可以在这里找到了这个演示的代码。
像这样的系统对于了解访问者的行为(无论是人还是机器)是超级有用的。通过一些额外的工作它也可以被设计成windowing模式来以异步方式检测可能的欺诈。
Spark Streaming 代码
我们的例子中的main class是:
com.cloudera.sa.example.sparkstreaming.sessionization.SessionizeData
让我们来看看这段代码段(忽略1-59行,其中包含imports 和其他无聊的东西)。
60到112行:设置Spark Streaming 这些行是非常基本的,用来设置的Spark Streaming,同时可以选择从HDFS或socket接收数据流。如果你在Spark Streaming方面是一个新手,我已经添加了一些详细的注释帮助理解代码。 (我不打算在这里详谈,因为仍然在样例代码里。)
//This is just creating a Spark Config object. I don't do much here but
//add the app name. There are tons of options to put into the Spark config,
//but none are needed for this simple example.
val sparkConf = new SparkConf().
setAppName("SessionizeData " + args(0)).
set("spark.cleaner.ttl", "120000")
//These two lines will get us out SparkContext and our StreamingContext.
//These objects have all the root functionality we need to get started.
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
//Here are are loading our HBase Configuration object. This will have
//all the information needed to connect to our HBase cluster.
//There is nothing different here from when you normally interact with HBase.
val conf = HBaseConfiguration.create();
conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
//This is a HBaseContext object. This is a nice abstraction that will hide
//any complex HBase stuff from us so we can focus on our business case
//HBaseContext is from the SparkOnHBase project which can be found at
// https://github.com/tmalaska/SparkOnHBase
val hbaseContext = new HBaseContext(sc, conf);
//This is create a reference to our root DStream. DStreams are like RDDs but
//with the context of being in micro batch world. I set this to null now
//because I later give the option of populating this data from HDFS or from
//a socket. There is no reason this could not also be populated by Kafka,
//Flume, MQ system, or anything else. I just focused on these because
//there are the easiest to set up.
var lines: DStream[String] = null
//Options for data load. Will be adding Kafka and Flume at some point
if (args(0).equals("socket")) {
val host = args(FIXED_ARGS);
val port = args(FIXED_ARGS + 1);
println("host:" + host)
println("port:" + Integer.parseInt(port))
//Simple example of how you set up a receiver from a Socket Stream
lines = ssc.socketTextStream(host, port.toInt)
} else if (args(0).equals("newFile")) {
val directory = args(FIXED_ARGS)
println("directory:" + directory)
//Simple example of how you set up a receiver from a HDFS folder
lines = ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (t: Path) => true, true).map(_._2.toString)
} else {
throw new RuntimeException("bad input type")
}
114到124行: 字符串解析 这里是Spark Streaming的开始的地方. 请看下面四行::
val ipKeyLines = lines.map[(String, (Long, Long, String))](eventRecord => {
//Get the time and ip address out of the original event
val time = dateFormat.parse(
eventRecord.substring(eventRecord.indexOf('[') + 1, eventRecord.indexOf(']'))).
getTime()
val ipAddress = eventRecord.substring(0, eventRecord.indexOf(' '))
//We are return the time twice because we will use the first at the start time
//and the second as the end time
(ipAddress, (time, time, eventRecord))
})
上面第一命令是在DSTREAM对象“lines”上进行了map函数和,解析原始事件来分离出的IP地址,时间戳和事件的body。对于那些Spark Streaming的新手,一个DSTREAM保存着要处理的一批记录。这些记录由以前所定义的receiver对象填充,并且此map函数在这个micro-batch内产生另一个DSTREAM存储变换后的记录来进行额外的处理。
Spark Streaming 是Apache Spark 中最有趣的组件之一。你用Spark Streaming可以创建数据管道来用批量加载数据一样的API处理流式数据。此外,Spark Steaming的“micro-batching”方式提供相当好的弹性来应对一些原因造成的任务失败。
在这篇文章中,我将通过网站的事件近实时回话的例子演示使你熟悉一些常见的和高级的Spark Streaming功能,然后加载活动有关的统计数据到Apache HBase,用不喜欢的BI用具来绘图分析。 (Sessionization指的是捕获的单一访问者的网站会话时间范围内所有点击流活动。)你可以在这里找到了这个演示的代码。
像这样的系统对于了解访问者的行为(无论是人还是机器)是超级有用的。通过一些额外的工作它也可以被设计成windowing模式来以异步方式检测可能的欺诈。
Spark Streaming 代码
我们的例子中的main class是:
com.cloudera.sa.example.sparkstreaming.sessionization.SessionizeData
让我们来看看这段代码段(忽略1-59行,其中包含imports 和其他无聊的东西)。
60到112行:设置Spark Streaming 这些行是非常基本的,用来设置的Spark Streaming,同时可以选择从HDFS或socket接收数据流。如果你在Spark Streaming方面是一个新手,我已经添加了一些详细的注释帮助理解代码。 (我不打算在这里详谈,因为仍然在样例代码里。)
//This is just creating a Spark Config object. I don't do much here but
//add the app name. There are tons of options to put into the Spark config,
//but none are needed for this simple example.
val sparkConf = new SparkConf().
setAppName("SessionizeData " + args(0)).
set("spark.cleaner.ttl", "120000")
//These two lines will get us out SparkContext and our StreamingContext.
//These objects have all the root functionality we need to get started.
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
//Here are are loading our HBase Configuration object. This will have
//all the information needed to connect to our HBase cluster.
//There is nothing different here from when you normally interact with HBase.
val conf = HBaseConfiguration.create();
conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
//This is a HBaseContext object. This is a nice abstraction that will hide
//any complex HBase stuff from us so we can focus on our business case
//HBaseContext is from the SparkOnHBase project which can be found at
// https://github.com/tmalaska/SparkOnHBase
val hbaseContext = new HBaseContext(sc, conf);
//This is create a reference to our root DStream. DStreams are like RDDs but
//with the context of being in micro batch world. I set this to null now
//because I later give the option of populating this data from HDFS or from
//a socket. There is no reason this could not also be populated by Kafka,
//Flume, MQ system, or anything else. I just focused on these because
//there are the easiest to set up.
var lines: DStream[String] = null
//Options for data load. Will be adding Kafka and Flume at some point
if (args(0).equals("socket")) {
val host = args(FIXED_ARGS);
val port = args(FIXED_ARGS + 1);
println("host:" + host)
println("port:" + Integer.parseInt(port))
//Simple example of how you set up a receiver from a Socket Stream
lines = ssc.socketTextStream(host, port.toInt)
} else if (args(0).equals("newFile")) {
val directory = args(FIXED_ARGS)
println("directory:" + directory)
//Simple example of how you set up a receiver from a HDFS folder
lines = ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (t: Path) => true, true).map(_._2.toString)
} else {
throw new RuntimeException("bad input type")
}
114到124行: 字符串解析 这里是Spark Streaming的开始的地方. 请看下面四行::
val ipKeyLines = lines.map[(String, (Long, Long, String))](eventRecord => {
//Get the time and ip address out of the original event
val time = dateFormat.parse(
eventRecord.substring(eventRecord.indexOf('[') + 1, eventRecord.indexOf(']'))).
getTime()
val ipAddress = eventRecord.substring(0, eventRecord.indexOf(' '))
//We are return the time twice because we will use the first at the start time
//and the second as the end time
(ipAddress, (time, time, eventRecord))
})
上面第一命令是在DSTREAM对象“lines”上进行了map函数和,解析原始事件来分离出的IP地址,时间戳和事件的body。对于那些Spark Streaming的新手,一个DSTREAM保存着要处理的一批记录。这些记录由以前所定义的receiver对象填充,并且此map函数在这个micro-batch内产生另一个DSTREAM存储变换后的记录来进行额外的处理。
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询