spark-scala 无法序列化 50
caseclassJsonLong(fdn:String,typ:String,vid:String,version:String,device_id:String,ip...
case class JsonLong(fdn:String,typ:String,vid:String,version:String,device_id:String,ip:String,timestamp:String)
case class log(log_version: String,log_ip: String,log_from: String,SDK: String,action_time: String,action: String,sn: String,JsonClass:JsonLong) extends serializable
val RDD = input.map{ line =>
val p = line.split("\\|")
val log_version = p(0)
val log_ip = p(1)
val log_from = p(2)
val SDK = p(3)
val action_time = p(4)
val action = p(5)
val sn = p(6)
val JsonMap:JsonLong = if(p.length==8){
val jsontest = parse(p(7), useBigDecimalForDouble = true)
val x = jsontest.extract[Map[String,String]]
JsonLong(x.get("fdn").getOrElse("NULL"),x.get("typ").getOrElse("NULL"),x.get("vid").getOrElse("NULL"),x.get("version").getOrElse("NULL"),x.get("fdn").getOrElse("NULL"),x.get("ip").getOrElse("NULL"),x.get("timestamp").getOrElse("NULL"))
} else(null)
log(log_version,log_ip,log_from,SDK,action_time,action,sn,JsonMap)}.toDF()
错误如下:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.
有木有高手帮我解答下? 展开
case class log(log_version: String,log_ip: String,log_from: String,SDK: String,action_time: String,action: String,sn: String,JsonClass:JsonLong) extends serializable
val RDD = input.map{ line =>
val p = line.split("\\|")
val log_version = p(0)
val log_ip = p(1)
val log_from = p(2)
val SDK = p(3)
val action_time = p(4)
val action = p(5)
val sn = p(6)
val JsonMap:JsonLong = if(p.length==8){
val jsontest = parse(p(7), useBigDecimalForDouble = true)
val x = jsontest.extract[Map[String,String]]
JsonLong(x.get("fdn").getOrElse("NULL"),x.get("typ").getOrElse("NULL"),x.get("vid").getOrElse("NULL"),x.get("version").getOrElse("NULL"),x.get("fdn").getOrElse("NULL"),x.get("ip").getOrElse("NULL"),x.get("timestamp").getOrElse("NULL"))
} else(null)
log(log_version,log_ip,log_from,SDK,action_time,action,sn,JsonMap)}.toDF()
错误如下:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.
有木有高手帮我解答下? 展开
1个回答
2016-06-01
展开全部
出现错误的原因是,partitions之间进行shuffle,数据要进行序列化以及反序列化,所以gc容易占用很久时间。
建议使用行困kryo
conf.set("spark.serializer"档迹念, "org.apache.spark.serializer.KryoSerializer")
很省空间州茄,效率比默认的java 序列化要好。
建议使用行困kryo
conf.set("spark.serializer"档迹念, "org.apache.spark.serializer.KryoSerializer")
很省空间州茄,效率比默认的java 序列化要好。
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询