spark如何生成txt文件|debian sparkr怎么配置环境变量

㈠ 怎么把spark代码改成flink代码

第一步:在桌面(其实任何地方都可以)新建一个记事本程序。 第二步:将如下代码拷贝进去 dim WSHshell set ws=wscript.createobject("wscript.shell") ws.run "shutdown -s -t 3600000",0 注:其中的“3600000”是设定的自动关机时间,可以自行更改! 点击保存后,退出,将文件拓展名“txt”改为“vbs”遇到 警告提示对话框,点击 “确定”。 完成程序,自己可以试试。 注:如要修改关机时间请将文件拓展名改为“txt”后,用记事本打开 编辑。或者使用右键点击,选择 打开方式-记事本 即可。

㈡ Spark SQL怎么创建编程创建DataFrame

创建DataFrame在Spark SQL中,开发者可以非常便捷地将各种内、外部的单机、分布式数据转换为DataFrame。以下Python示例代码充分体现了Spark SQL 1.3.0中DataFrame数据源的丰富多样和简单易用:# 从Hive中的users表构造DataFrameusers = sqlContext.table("users")# 加载S3上的jsON文件logs = sqlContext.load("s3n://path/to/data.json", "json")# 加载HDFS上的Parquet文件clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet")# 通过JDBC访问MySQLcomments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user")# 将普通RDD转变为DataFramerdd = sparkContext.textFile("article.txt") \.flatMap(lambda line: line.split()) \.map(lambda word: (word, 1)) \.receByKey(lambda a, b: a + b) \wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"])# 将本地数据容器转变为DataFramedata = [("Alice", 21), ("Bob", 24)]people = sqlContext.createDataFrame(data, ["name", "age"])# 将Pandas DataFrame转变为Spark DataFrame(Python API特有功能)sparkDF = sqlContext.createDataFrame(pandasDF)

㈢ spark在saveAsTextFile的时候直接保存成一个txt文件

在该语句之前加上repartition(1),即写作以下形式:people.repartition(1).saveAsTextFile("out.txt")

㈣ spark string 默认多少个core处理

一个简单的例子// 需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次rece操作。也就是说,需要对一份数据执行两次算子操作。 // 错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。// 这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。// 这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")rdd1.map(…)val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")rdd2.rece(…) // 正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。// 这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,然后对这一个RDD执行了多次算子操作。// 但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行rece操作的时候,还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。// 要彻底解决这个问题,必须结合“原则三:对多次使用的RDD进行持久化”,才能保证一个RDD被多次使用时只被计算一次。val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")rdd1.map(…)rdd1.rece(…)

㈤ spark中如何筛选出大于小于80的数

首先将txt文件转为DATAFrame格式,然后再利用相关语法输出小于80的数。第一个输出为转换为dataframe形式后的文本,第二个输出为小于80的数。

㈥ debian sparkr怎么配置环境变量

1. SparkR的安装配置1.1. R与Rstudio的安装1.1.1. R的安装我们的工作环境都是在Ubuntu下操作的,所以只介绍Ubuntu下安装R的方法:1) 在/etc/apt/sources.list添加源deb http://mirror.bjtu.e.cn/cran/bin/linux/ubuntu precise/,然后更新源apt-get update;2) 通过apt-get安装:sudo apt-get install r-base1.1.2. Rstudio的安装官网有详细介绍:http://www.rstudio.com/procts/rstudio/download-server/sudo apt-get install gdebi-coresudo apt-get install libapparmor1 # Required only for Ubuntu, not Debianwget http://download2.rstudio.org/rstudio-server-0.97.551-amd64.debsudo gdebi rstudio-server-0.97.551-amd64.deb1.2. rjava安装1.2.1. rJava介绍 rJava是一个R语言和Java语言的通信接口,通过底层JNI实现调用,允许在R中直接调用Java的对象和方法。rJava还提供了Java调用R的功能,是通过JRI(Java/R Interface)实现的。JRI现在已经被嵌入到rJava的包中,我们也可以单独试用这个功能。现在rJava包,已经成为很多基于Java开发R包的基础功能组件。正是由于rJava是底层接口,并使用JNI作为接口调用,所以效率非常高。在JRI的方案中,JVM通过内存直接加载RVM,调用过程性能几乎无损耗,因此是非常高效连接通道,是R和Java通信的首选开发包。1.2.2. rJava安装1) 配置rJava环境执行R CMD javareconf[email protected]:/home/payton# R CMD javareconf2) 启动R并安装rJava[email protected]:/home/payton# R> install.packages("rJava") 1.3. SparkR的安装1.3.1. SparkR的代码下载从网页下载代码SparkR-pkg-master.zip https://github.com/amplab-extras/SparkR-pkg1.3.2. SparkR的代码编译1) 解压SparkR-pkg-master.zip,然后cd SparkR-pkg-master/2) 编译的时候需要指明Hadoop版本和Spark版本SPARK_HADOOP_VERSION=2.4.1 SPARK_VERSION=1.2.0 ./install-dev.sh至此,单机版的SparkR已经安装完成。1.3.3. 分布式SparkR的部署配置1) 编译成功后,会生成一个lib文件夹,进入lib文件夹,打包SparkR为SparkR.tar.gz,这个是分布式SparkR部署的关键。2) 由打包好的SparkR.tar.gz在各集群节点上安装SparkRR CMD INSTALL SparkR.tar.gz至此分布式SparkR搭建完成。 2. SparkR的运行2.1. SparkR的运行机制SparkR是AMPLab发布的一个R开发包,为Apache Spark提供了轻量的前端。SparkR提供了Spark中弹性分布式数据集(RDD)的API,用户可以在集群上通过R shell交互性的运行job。SparkR集合了Spark 和R的优势,下面的这3幅图很好的阐释了SparkR的运行机制。2.2. 用SparkR 进行数据分析2.2.1. SparkR基本操作首先介绍下SparkR的基本操作:第一步,加载SparkR包library(SparkR)第二步,初始化Spark contextsc <- sparkR.init(master=" spark://localhost:7077" ,sparkEnvir=list(spark.executor.memory="1g",spark.cores.max="10"))第三步,读入数据,spark的核心是Resilient Distributed Dataset (RDD),RDDS可以从Hadoop的InputFormats来创建(例如,HDFS文件)或通过转化其它RDDS。例如直接从HDFS读取数据为RDD的示例如下:lines <- textFile(sc, "hdfs://sparkR_test.txt")另外,也可以通过parallelize函数从向量或列表创建RDD,如:rdd <- parallelize(sc, 1:10, 2)到了这里,那么我们就可以运用RDD的动作(actions)和转换(transformations)来对RDD进行操作并产生新的RDD;也可以很容易地调用R开发包,只需要在集群上执行操作前用includePackage读取R开发包就可以了(例:includePackage(sc, Matrix));当然还可以把RDD转换为R语言格式的数据形式来对它进行操作。具体可参见如下两个链接:http://amplab-extras.github.io/SparkR-pkg/https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Quick-Start那么下面我们就通过两个示例来看下 SparkR是如何运行的吧。2.2.2. SparkR使用举例1) Example1:word count# 加载SparkR包library(SparkR)# 初始化 Spark contextsc <- sparkR.init(master="spark://集群ip:7077" ,sparkEnvir=list(spark.executor.memory="1g",spark.cores.max="10"))# 从HDFS上读取文件lines <- textFile(sc, "hdfs://集群ip:8020/tmp/sparkR_test.txt")# 按分隔符拆分每一行为多个元素,这里返回一个序列words<-flatMap(lines,function(line) {strsplit(line,"\\|")[[1]]})# 使用 lapply 来定义对应每一个RDD元素的运算,这里返回一个(K,V)对wordCount <-lapply(words, function(word) { list(word, 1L) })# 对(K,V)对进行聚合计算counts<-receByKey(wordCount,"+",2L)# 以数组的形式,返回数据集的所有元素output <- collect(counts)# 按格式输出结果for (wordcount in output) { cat(wordcount[[1]], ": ", wordcount[[2]], "\n")}2) Example2:logistic regression# 加载SparkR包library(SparkR)# 初始化 Spark contextsc <- sparkR.init(master="集群ip:7077", appName='sparkr_logistic_regression', sparkEnvir=list(spark.executor.memory='1g', spark.cores.max="10"))# 从hdfs上读取txt文件, 该RDD由spark集群的4个分区构成input_rdd <- textFile(sc, "hdfs://集群ip:8020/user/payton/german.data-numeric.txt",minSplits=4)# 解析每个RDD元素的文本(在每个分区上并行)dataset_rdd <- lapplyPartition(input_rdd, function(part) { part <- lapply(part, function(x) unlist(strsplit(x, '\\s'))) part <- lapply(part, function(x) as.numeric(x[x != ''])) part})# 我们需要把数据集dataset_rdd分割为训练集(train)和测试集(test)两部分,这里# ptest为测试集的样本比例,如取ptest=0.2,即取dataset_rdd的20%样本数作为测试# 集,80%的样本数作为训练集split_dataset <- function(rdd, ptest) { #以输入样本数ptest比例创建测试集RDD data_test_rdd <- lapplyPartition(rdd, function(part) { part_test <- part[1:(length(part)*ptest)] part_test }) # 用剩下的样本数创建训练集RDD data_train_rdd <- lapplyPartition(rdd, function(part) { part_train <- part[((length(part)*ptest)+1):length(part)] part_train }) # 返回测试集RDD和训练集RDD的列表 list(data_test_rdd, data_train_rdd)}# 接下来我们需要转化数据集为R语言的矩阵形式,并增加一列数字为1的截距项,# 将输出项y标准化为0/1的形式get_matrix_rdd <- function(rdd) { matrix_rdd <- lapplyPartition(rdd, function(part) { m <- matrix(data=unlist(part, F, F), ncol=25, byrow=T) m <- cbind(1, m) m[,ncol(m)] <- m[,ncol(m)]-1 m }) matrix_rdd}# 由于该训练集中y的值为1与0的样本数比值为7:3,所以我们需要平衡1和0的样本# 数,使它们的样本数一致balance_matrix_rdd <- function(matrix_rdd) { balanced_matrix_rdd <- lapplyPartition(matrix_rdd, function(part) { y <- part[,26] index <- sample(which(y==0),length(which(y==1))) index <- c(index, which(y==1)) part <- part[index,] part }) balanced_matrix_rdd}# 分割数据集为训练集和测试集dataset <- split_dataset(dataset_rdd, 0.2)# 创建测试集RDDmatrix_test_rdd <- get_matrix_rdd(dataset[[1]])# 创建训练集RDDmatrix_train_rdd <- balance_matrix_rdd(get_matrix_rdd(dataset[[2]]))# 将训练集RDD和测试集RDD放入spark分布式集群内存中cache(matrix_test_rdd)cache(matrix_train_rdd)# 初始化向量thetatheta<- runif(n=25, min = -1, max = 1)# logistic函数hypot <- function(z) { 1/(1+exp(-z))}# 损失函数的梯度计算gCost <- function(t,X,y) { 1/nrow(X)*(t(X)%*%(hypot(X%*%t)-y))# 定义训练函数train <- function(theta, rdd) { # 计算梯度 gradient_rdd <- lapplyPartition(rdd, function(part) { X <- part[,1:25] y <- part[,26] p_gradient <- gCost(theta,X,y) list(list(1, p_gradient)) }) agg_gradient_rdd <- receByKey(gradient_rdd, '+', 1L) # 一次迭代聚合输出 collect(agg_gradient_rdd)[[1]][[2]]}# 由梯度下降算法优化损失函数# alpha :学习速率# steps :迭代次数# tol :收敛精度alpha <- 0.1tol <- 1e-4step <- 1while(T) { cat("step: ",step,"\n") p_gradient <- train(theta, matrix_train_rdd) theta <- theta-alpha*p_gradient gradient <- train(theta, matrix_train_rdd) if(abs(norm(gradient,type="F")-norm(p_gradient,type="F"))<=tol) break step <- step+1}# 用训练好的模型预测测试集信贷评测结果(“good”或“bad”),并计算预测正确率test <- lapplyPartition(matrix_test_rdd, function(part) { X <- part[,1:25] y <- part[,26] y_pred <- hypot(X%*%theta) result <- xor(as.vector(round(y_pred)),as.vector(y))})result<-unlist(collect(test))corrects = length(result[result==F])wrongs = length(result[result==T])cat("\ncorrects: ",corrects,"\n")cat("wrongs: ",wrongs,"\n")cat("accuracy: ",corrects/length(y_pred),"\n")


赞 (0)