怎样利用Spark Streaming和Hadoop实现近实时的会话连接

 我来答
就烦条0o
2016-07-17 · 知道合伙人软件行家
就烦条0o
知道合伙人软件行家
采纳数:33315 获赞数:46492
从事多年系统运维,喜欢编写各种小程序和脚本。

向TA提问 私信TA
展开全部
/**

* This function will be called for to union of keys in the Reduce DStream

* with the active sessions from the last micro batch with the ipAddress

* being the key

*

* To goal is that this produces a stateful RDD that has all the active

* sessions. So we add new sessions and remove sessions that have timed

* out and extend sessions that are still going

*/
def updateStatbyOfSessions(

//(sessionStartTime, sessionFinishTime, countOfEvents)
a: Seq[(Long, Long, Long)],

//(sessionStartTime, sessionFinishTime, countOfEvents, isNewSession)
b: Option[(Long, Long, Long, Boolean)]
): Option[(Long, Long, Long, Boolean)] = {

//This function will return a Optional value.

//If we want to delete the value we can return a optional "None".

//This value contains four parts

//(startTime, endTime, countOfEvents, isNewSession)
var result: Option[(Long, Long, Long, Boolean)] = null

// These if statements are saying if we didn't get a new event for

//this session's ip address for longer then the session

//timeout + the batch time then it is safe to remove this key value

//from the future Stateful DStream
if (a.size == 0) {
if (System.currentTimeMillis() - b.get._2 > SESSION_TIMEOUT + 11000) {
result = None
} else {
if (b.get._4 == false) {
result = b
} else {
result = Some((b.get._1, b.get._2, b.get._3, false))
}
}
}

//Now because we used the reduce function before this function we are

//only ever going to get at most one event in the Sequence.
a.foreach(c => {
if (b.isEmpty) {

//If there was no value in the Stateful DStream then just add it

//new, with a true for being a new session
result = Some((c._1, c._2, c._3, true))
} else {
if (c._1 - b.get._2 < SESSION_TIMEOUT) {

//If the session from the stateful DStream has not timed out

//then extend the session
result = Some((
Math.min(c._1, b.get._1),
//newStartTime
Math.max(c._2, b.get._2),
//newFinishTime
b.get._3 + c._3,
//newSumOfEvents
false
//This is not a new session
))
} else {

//Otherwise remove the old session with a new one
result = Some((
c._1,
//newStartTime
c._2,
//newFinishTime
b.get._3,
//newSumOfEvents
true
//new session
))
}
}
})
result
}
}

在这段代码做了很多事,而且通过很多方式,这是整个工作中最复杂的部分。总之,它跟踪活动的会话,所以你知道你是继续现有的会话还是启动一个新的。

126到207行:计数和HBase 这部分做了大多数计数工作。在这里有很多是重复的,让我们只看一个count的例子,然后一步步地我们把生成的同一个记录counts存储在HBase中。
val onlyActiveSessions = latestSessionInfo.filter(t => System.currentTimeMillis() - t._2._2 < SESSION_TIMEOUT)

val newSessionCount = onlyActiveSessions.filter(t => {

//is the session newer then that last micro batch

//and is the boolean saying this is a new session true
(System.currentTimeMillis() - t._2._2 > 11000 && t._2._4)
}).
count.
map[HashMap[String, Long]](t => HashMap((NEW_SESSION_COUNTS, t)))


之,上面的代码是过滤除了活动的会话其他所有会话,对他们进行计数,并把该最终计记录到一个的HashMap实例中。它使用HashMap作为容
器,所以在所有的count做完后,我们可以调用下面的reduce函数把他们都到一个单一的记录。 (我敢肯定有更好的方法来实现这一点,但这种方法工
作得很好。)

接下来,下面的代码处理所有的那些HashMap,并把他们所有的值在一个HashMap中。
val allCounts = newSessionCount.
union(totalSessionCount).
union(totals).
union(totalEventsCount).
union(deadSessionsCount).
union(totalSessionEventCount).
reduce((a, b) => b ++ a)

用HBaseContext来使Spark Streaming与HBase交互超级简单。所有你需要做的就是用HashMap和函数将其转换为一个put对象提供给DSTREAM。
hbaseContext.streamBulkPut[HashMap[String, Long]](
allCounts,
//The input RDD
hTableName,
//The name of the table we want to put too
(t) => {

//Here we are converting our input record into a put

//The rowKey is C for Count and a backward counting time so the newest

//count show up first in HBase's sorted order
val put = new Put(Bytes.toBytes("C." + (Long.MaxValue - System.currentTimeMillis())))

//We are iterating through the HashMap to make all the columns with their counts
t.foreach(kv => put.add(Bytes.toBytes(hFamily), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2.toString)))
put
},
false)

现在,HBase的这些信息可以用Apache Hive table包起来,然后通过你喜欢的BI工具执行一个查询来获取像下面这样的图,它每次micro-batch会刷新。

209到215行:写入HDFS 最后的任务是把拥有事件数据的活动会话信息加入,然后把事件以会话的开始时间来持久化到HDFS。
//Persist to HDFS
ipKeyLines.join(onlyActiveSessions).
map(t => {

//Session root start time | Event message
dateFormat.format(new Date(t._2._2._1)) + "t" + t._2._1._3
}).
saveAsTextFiles(outputDir + "/session", "txt")
推荐律师服务: 若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询

为你推荐:

下载百度知道APP,抢鲜体验
使用百度知道APP,立即抢鲜体验。你的手机镜头里或许有别人想知道的答案。
扫描二维码下载
×

类别

我们会通过消息、邮箱等方式尽快将举报结果通知您。

说明

0/200

提交
取消

辅 助

模 式