spark dataframe 字段可以有几种数据类型
1个回答
展开全部
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.PrintWriter
import util.control.Breaks._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode
object SimpleDemo extends App {
val sc = new SparkContext("local[*]", "test")
val sqlc = new SQLContext(sc)
val driverUrl = "jdbc:mysql://ip:3306/ding?user=root&password=root&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8"
val tableName = "tbaclusterresult"
//把数据转化为DataFrame,并注册为一个表
val df = sqlc.read.json("G:/data/json.txt")
df.registerTempTable("user")
val res = sqlc.sql("select * from user")
println(res.count() + "---------------------------")
res.collect().map { row =>
{
println(row.toString())
}
}
//从MYSQL读取数据
val jdbcDF = sqlc.read
.options(Map("url" -> driverUrl,
// "user" -> "root",
// "password" -> "root",
"dbtable" -> tableName))
.format("jdbc")
.load()
println(jdbcDF.count() + "---------------------------")
jdbcDF.collect().map { row =>
{
println(row.toString())
}
}
//插入数据至MYSQL
val schema = StructType(
StructField("name", StringType) ::
StructField("age", IntegerType)
:: Nil)
val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),
("com", 40), ("bt", 33), ("www", 23))).
map(item => Row.apply(item._1, item._2))
import sqlc.implicits._
val df1 = sqlc.createDataFrame(data1, schema)
// df1.write.jdbc(driverUrl, "sparktomysql", new Properties)
df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)
//DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:
//def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit
//插入数据到MYSQL
val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
data.foreachPartition(myFun)
case class Blog(name: String, count: Int)
def myFun(iterator: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = "insert into blog(name, count) values (?, ?)"
try {
conn = DriverManager.getConnection(driverUrl, "root", "root")
iterator.foreach(data => {
ps = conn.prepareStatement(sql)
ps.setString(1, data._1)
ps.setInt(2, data._2)
ps.executeUpdate()
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
}
}
import scala.io.Source
import java.io.PrintWriter
import util.control.Breaks._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode
object SimpleDemo extends App {
val sc = new SparkContext("local[*]", "test")
val sqlc = new SQLContext(sc)
val driverUrl = "jdbc:mysql://ip:3306/ding?user=root&password=root&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8"
val tableName = "tbaclusterresult"
//把数据转化为DataFrame,并注册为一个表
val df = sqlc.read.json("G:/data/json.txt")
df.registerTempTable("user")
val res = sqlc.sql("select * from user")
println(res.count() + "---------------------------")
res.collect().map { row =>
{
println(row.toString())
}
}
//从MYSQL读取数据
val jdbcDF = sqlc.read
.options(Map("url" -> driverUrl,
// "user" -> "root",
// "password" -> "root",
"dbtable" -> tableName))
.format("jdbc")
.load()
println(jdbcDF.count() + "---------------------------")
jdbcDF.collect().map { row =>
{
println(row.toString())
}
}
//插入数据至MYSQL
val schema = StructType(
StructField("name", StringType) ::
StructField("age", IntegerType)
:: Nil)
val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),
("com", 40), ("bt", 33), ("www", 23))).
map(item => Row.apply(item._1, item._2))
import sqlc.implicits._
val df1 = sqlc.createDataFrame(data1, schema)
// df1.write.jdbc(driverUrl, "sparktomysql", new Properties)
df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)
//DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:
//def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit
//插入数据到MYSQL
val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
data.foreachPartition(myFun)
case class Blog(name: String, count: Int)
def myFun(iterator: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = "insert into blog(name, count) values (?, ?)"
try {
conn = DriverManager.getConnection(driverUrl, "root", "root")
iterator.foreach(data => {
ps = conn.prepareStatement(sql)
ps.setString(1, data._1)
ps.setInt(2, data._2)
ps.executeUpdate()
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
}
}
本回答被提问者采纳
已赞过
已踩过<
评论
收起
你对这个回答的评价是?
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询