怎样利用Spark Streaming和Hadoop实现近实时的会话连接
1个回答
展开全部
/**
* This function will be called for to union of keys in the Reduce DStream
* with the active sessions from the last micro batch with the ipAddress
* being the key
*
* To goal is that this produces a stateful RDD that has all the active
* sessions. So we add new sessions and remove sessions that have timed
* out and extend sessions that are still going
*/
def updateStatbyOfSessions(
//(sessionStartTime, sessionFinishTime, countOfEvents)
a: Seq[(Long, Long, Long)],
//(sessionStartTime, sessionFinishTime, countOfEvents, isNewSession)
b: Option[(Long, Long, Long, Boolean)]
): Option[(Long, Long, Long, Boolean)] = {
//This function will return a Optional value.
//If we want to delete the value we can return a optional "None".
//This value contains four parts
//(startTime, endTime, countOfEvents, isNewSession)
var result: Option[(Long, Long, Long, Boolean)] = null
// These if statements are saying if we didn't get a new event for
//this session's ip address for longer then the session
//timeout + the batch time then it is safe to remove this key value
//from the future Stateful DStream
if (a.size == 0) {
if (System.currentTimeMillis() - b.get._2 > SESSION_TIMEOUT + 11000) {
result = None
} else {
if (b.get._4 == false) {
result = b
} else {
result = Some((b.get._1, b.get._2, b.get._3, false))
}
}
}
//Now because we used the reduce function before this function we are
//only ever going to get at most one event in the Sequence.
a.foreach(c => {
if (b.isEmpty) {
//If there was no value in the Stateful DStream then just add it
//new, with a true for being a new session
result = Some((c._1, c._2, c._3, true))
} else {
if (c._1 - b.get._2 < SESSION_TIMEOUT) {
//If the session from the stateful DStream has not timed out
//then extend the session
result = Some((
Math.min(c._1, b.get._1),
//newStartTime
Math.max(c._2, b.get._2),
//newFinishTime
b.get._3 + c._3,
//newSumOfEvents
false
//This is not a new session
))
} else {
//Otherwise remove the old session with a new one
result = Some((
c._1,
//newStartTime
c._2,
//newFinishTime
b.get._3,
//newSumOfEvents
true
//new session
))
}
}
})
result
}
}
在这段代码做了很多事,而且通过很多方式,这是整个工作中最复杂的部分。总之,它跟踪活动的会话,所以你知道你是继续现有的会话还是启动一个新的。
126到207行:计数和HBase 这部分做了大多数计数工作。在这里有很多是重复的,让我们只看一个count的例子,然后一步步地我们把生成的同一个记录counts存储在HBase中。
val onlyActiveSessions = latestSessionInfo.filter(t => System.currentTimeMillis() - t._2._2 < SESSION_TIMEOUT)
…
val newSessionCount = onlyActiveSessions.filter(t => {
//is the session newer then that last micro batch
//and is the boolean saying this is a new session true
(System.currentTimeMillis() - t._2._2 > 11000 && t._2._4)
}).
count.
map[HashMap[String, Long]](t => HashMap((NEW_SESSION_COUNTS, t)))
总
之,上面的代码是过滤除了活动的会话其他所有会话,对他们进行计数,并把该最终计记录到一个的HashMap实例中。它使用HashMap作为容
器,所以在所有的count做完后,我们可以调用下面的reduce函数把他们都到一个单一的记录。 (我敢肯定有更好的方法来实现这一点,但这种方法工
作得很好。)
接下来,下面的代码处理所有的那些HashMap,并把他们所有的值在一个HashMap中。
val allCounts = newSessionCount.
union(totalSessionCount).
union(totals).
union(totalEventsCount).
union(deadSessionsCount).
union(totalSessionEventCount).
reduce((a, b) => b ++ a)
用HBaseContext来使Spark Streaming与HBase交互超级简单。所有你需要做的就是用HashMap和函数将其转换为一个put对象提供给DSTREAM。
hbaseContext.streamBulkPut[HashMap[String, Long]](
allCounts,
//The input RDD
hTableName,
//The name of the table we want to put too
(t) => {
//Here we are converting our input record into a put
//The rowKey is C for Count and a backward counting time so the newest
//count show up first in HBase's sorted order
val put = new Put(Bytes.toBytes("C." + (Long.MaxValue - System.currentTimeMillis())))
//We are iterating through the HashMap to make all the columns with their counts
t.foreach(kv => put.add(Bytes.toBytes(hFamily), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2.toString)))
put
},
false)
现在,HBase的这些信息可以用Apache Hive table包起来,然后通过你喜欢的BI工具执行一个查询来获取像下面这样的图,它每次micro-batch会刷新。
209到215行:写入HDFS 最后的任务是把拥有事件数据的活动会话信息加入,然后把事件以会话的开始时间来持久化到HDFS。
//Persist to HDFS
ipKeyLines.join(onlyActiveSessions).
map(t => {
//Session root start time | Event message
dateFormat.format(new Date(t._2._2._1)) + "t" + t._2._1._3
}).
saveAsTextFiles(outputDir + "/session", "txt")
* This function will be called for to union of keys in the Reduce DStream
* with the active sessions from the last micro batch with the ipAddress
* being the key
*
* To goal is that this produces a stateful RDD that has all the active
* sessions. So we add new sessions and remove sessions that have timed
* out and extend sessions that are still going
*/
def updateStatbyOfSessions(
//(sessionStartTime, sessionFinishTime, countOfEvents)
a: Seq[(Long, Long, Long)],
//(sessionStartTime, sessionFinishTime, countOfEvents, isNewSession)
b: Option[(Long, Long, Long, Boolean)]
): Option[(Long, Long, Long, Boolean)] = {
//This function will return a Optional value.
//If we want to delete the value we can return a optional "None".
//This value contains four parts
//(startTime, endTime, countOfEvents, isNewSession)
var result: Option[(Long, Long, Long, Boolean)] = null
// These if statements are saying if we didn't get a new event for
//this session's ip address for longer then the session
//timeout + the batch time then it is safe to remove this key value
//from the future Stateful DStream
if (a.size == 0) {
if (System.currentTimeMillis() - b.get._2 > SESSION_TIMEOUT + 11000) {
result = None
} else {
if (b.get._4 == false) {
result = b
} else {
result = Some((b.get._1, b.get._2, b.get._3, false))
}
}
}
//Now because we used the reduce function before this function we are
//only ever going to get at most one event in the Sequence.
a.foreach(c => {
if (b.isEmpty) {
//If there was no value in the Stateful DStream then just add it
//new, with a true for being a new session
result = Some((c._1, c._2, c._3, true))
} else {
if (c._1 - b.get._2 < SESSION_TIMEOUT) {
//If the session from the stateful DStream has not timed out
//then extend the session
result = Some((
Math.min(c._1, b.get._1),
//newStartTime
Math.max(c._2, b.get._2),
//newFinishTime
b.get._3 + c._3,
//newSumOfEvents
false
//This is not a new session
))
} else {
//Otherwise remove the old session with a new one
result = Some((
c._1,
//newStartTime
c._2,
//newFinishTime
b.get._3,
//newSumOfEvents
true
//new session
))
}
}
})
result
}
}
在这段代码做了很多事,而且通过很多方式,这是整个工作中最复杂的部分。总之,它跟踪活动的会话,所以你知道你是继续现有的会话还是启动一个新的。
126到207行:计数和HBase 这部分做了大多数计数工作。在这里有很多是重复的,让我们只看一个count的例子,然后一步步地我们把生成的同一个记录counts存储在HBase中。
val onlyActiveSessions = latestSessionInfo.filter(t => System.currentTimeMillis() - t._2._2 < SESSION_TIMEOUT)
…
val newSessionCount = onlyActiveSessions.filter(t => {
//is the session newer then that last micro batch
//and is the boolean saying this is a new session true
(System.currentTimeMillis() - t._2._2 > 11000 && t._2._4)
}).
count.
map[HashMap[String, Long]](t => HashMap((NEW_SESSION_COUNTS, t)))
总
之,上面的代码是过滤除了活动的会话其他所有会话,对他们进行计数,并把该最终计记录到一个的HashMap实例中。它使用HashMap作为容
器,所以在所有的count做完后,我们可以调用下面的reduce函数把他们都到一个单一的记录。 (我敢肯定有更好的方法来实现这一点,但这种方法工
作得很好。)
接下来,下面的代码处理所有的那些HashMap,并把他们所有的值在一个HashMap中。
val allCounts = newSessionCount.
union(totalSessionCount).
union(totals).
union(totalEventsCount).
union(deadSessionsCount).
union(totalSessionEventCount).
reduce((a, b) => b ++ a)
用HBaseContext来使Spark Streaming与HBase交互超级简单。所有你需要做的就是用HashMap和函数将其转换为一个put对象提供给DSTREAM。
hbaseContext.streamBulkPut[HashMap[String, Long]](
allCounts,
//The input RDD
hTableName,
//The name of the table we want to put too
(t) => {
//Here we are converting our input record into a put
//The rowKey is C for Count and a backward counting time so the newest
//count show up first in HBase's sorted order
val put = new Put(Bytes.toBytes("C." + (Long.MaxValue - System.currentTimeMillis())))
//We are iterating through the HashMap to make all the columns with their counts
t.foreach(kv => put.add(Bytes.toBytes(hFamily), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2.toString)))
put
},
false)
现在,HBase的这些信息可以用Apache Hive table包起来,然后通过你喜欢的BI工具执行一个查询来获取像下面这样的图,它每次micro-batch会刷新。
209到215行:写入HDFS 最后的任务是把拥有事件数据的活动会话信息加入,然后把事件以会话的开始时间来持久化到HDFS。
//Persist to HDFS
ipKeyLines.join(onlyActiveSessions).
map(t => {
//Session root start time | Event message
dateFormat.format(new Date(t._2._2._1)) + "t" + t._2._1._3
}).
saveAsTextFiles(outputDir + "/session", "txt")
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询