㈠ 刚学习spark,想上传文件给hdfs,是不是需要hadoop然后java编程这样是用eclip
spark会把hdfs当做一个数据源来处理, 所以数据存储都要做, 之后编程是从Hadoop改成spark就可以了. 是否用eclipse无所谓, 只要能编译运行就可以
㈡ 用java向hdfs上传文件时,如何实现断点续传
@Component("javaLargeFileUploaderServlet") @WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" }) public class UploadServlet extends HttpRequestHandlerServlet implements HttpRequestHandler { private static final Logger log = LoggerFactory.getLogger(UploadServlet.class); @Autowired UploadProcessor uploadProcessor; @Autowired FileUploaderHelper fileUploaderHelper; @Autowired ExceptionCodeMappingHelper exceptionCodeMappingHelper; @Autowired Authorizer authorizer; @Autowired StaticStateIdentifierManager staticStateIdentifierManager; @Override public void handleRequest(HttpServletRequest request, HttpServletResponse response) throws IOException { log.trace("Handling request"); Serializable jsonObject = null; try { // extract the action from the request UploadServletAction actionByParameterName = UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action)); // check authorization checkAuthorization(request, actionByParameterName); // then process the asked action jsonObject = processAction(actionByParameterName, request); // if something has to be written to the response if (jsonObject != null) { fileUploaderHelper.writeToResponse(jsonObject, response); } } // If exception, write it catch (Exception e) { exceptionCodeMappingHelper.processException(e, response); } } private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName) throws MissingParameterException, AuthorizationException { // check authorization // if its not get progress (because we do not really care about authorization for get // progress and it uses an array of file ids) if (!actionByParameterName.equals(UploadServletAction.getProgress)) { // extract uuid final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false); // if this is init, the identifier is the one in parameter UUID clientOrJobId; String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false); if (actionByParameterName.equals(UploadServletAction.getConfig) && parameter != null) { clientOrJobId = UUID.fromString(parameter); } // if not, get it from manager else { clientOrJobId = staticStateIdentifierManager.getIdentifier(); } // call authorizer authorizer.getAuthorization( request, actionByParameterName, clientOrJobId, fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null); } } private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request) throws Exception { log.debug("Processing action " + actionByParameterName.name()); Serializable returnObject = null; switch (actionByParameterName) { case getConfig: String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false); returnObject = uploadProcessor.getConfig( parameterValue != null ? UUID.fromString(parameterValue) : null); break; case verifyCrcOfUncheckedPart: returnObject = verifyCrcOfUncheckedPart(request); break; case prepareUpload: returnObject = prepareUpload(request); break; case clearFile: uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId))); break; case clearAll: uploadProcessor.clearAll(); break; case pauseFile: List<UUID> uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)); uploadProcessor.pauseFile(uuids); break; case resumeFile: returnObject = uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId))); break; case setRate: uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)), Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate))); break; case getProgress: returnObject = getProgress(request); break; } return returnObject; } List<UUID> getFileIdsFromString(String fileIds) { String[] splittedFileIds = fileIds.split(","); List<UUID> uuids = Lists.newArrayList(); for (int i = 0; i < splittedFileIds.length; i++) { uuids.add(UUID.fromString(splittedFileIds[i])); } return uuids; } private Serializable getProgress(HttpServletRequest request) throws MissingParameterException { Serializable returnObject; String[] ids = new Gson() .fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class); Collection<UUID> uuids = Collections2.transform(Arrays.asList(ids), new Function<String, UUID>() { @Override public UUID apply(String input) { return UUID.fromString(input); } }); returnObject = Maps.newHashMap(); for (UUID fileId : uuids) { try { ProgressJson progress = uploadProcessor.getProgress(fileId); ((HashMap<String, ProgressJson>) returnObject).put(fileId.toString(), progress); } catch (FileNotFoundException e) { log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage()); } } return returnObject; } private Serializable prepareUpload(HttpServletRequest request) throws MissingParameterException, IOException { // extract file information PrepareUploadJson[] fromJson = new Gson() .fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class); // prepare them final HashMap<String, UUID> prepareUpload = uploadProcessor.prepareUpload(fromJson); // return them return Maps.newHashMap(Maps.transformValues(prepareUpload, new Function<UUID, String>() { public String apply(UUID input) { return input.toString(); }; })); } private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request) throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException { UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)); try { uploadProcessor.verifyCrcOfUncheckedPart(fileId, fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc)); } catch (InvalidCrcException e) { // no need to log this exception, a fallback behaviour is defined in the // throwing method. // but we need to return something! return Boolean.FALSE; } return Boolean.TRUE; } }
㈢ java用org.apache.hadoop.fs 这个api向hdfs上传文件是根据什么协议
HTTP是很常见的协议,虽然用得很多,但对细节的了解却是很浅,这回通过向服务端上传文件信息来理解细节。网络库的选择:1、WinHTTP是windows下常用的库;2、CURL是广受喜爱的开源库。对于我来说,libcurl最大的优点是使用方便,可以把注意力更多的集中到业务层上,提高工作效率,避免重造轮子;缺点是略大(MD编译有264KB,MT编译有340KB),不像WinHTTP可以由windows操作系统集成。下边展示如何使用这两种网络库实现表单POST文件。
㈣ hdfs工作流程
1. hdfs基本工作流程
1. hdfs初始化目录结构
hdfs namenode -format 只是初始化了namenode的工作目录而datanode的工作目录是在datanode启动后自己初始化的
namenode在format初始化的时候会形成两个标识:blockPoolId:clusterId:
新的datanode加入时,会获取这两个标识作为自己工作目录中的标识
一旦namenode重新format后,namenode的身份标识已变,而datanode如果依然持有原来的id,就不会被namenode识别
2. hdfs的工作机制
hdfs集群分为两大角色:NameNode,DataNode (Secondary NameNode)
NameNode负责管理整个文件的元数据(命名空间信息,块信息) 相当于Master
DataNode负责管理用户的文件数据块 相当于Salve
文件会按照固定的大小(block=128M)切成若干块后分布式存储在若干个datanode节点上
每一个文件块有多个副本(默认是三个),存在不同的datanode上
DataNode会定期向NameNode汇报自身所保存的文件block信息,而namenode则会负责保持文件副本数量
hdfs的内部工作机制会对客户的保持透明,客户端请求方法hdfs都是通过向namenode申请来进行访问
SecondaryNameNode有两个作用,一是镜像备份,二是日志与镜像的定期合并
3. hdfs写入数据流程
1.客户端要向hdfs写入数据,首先要跟namenode通信以确认可以写文件并获得接收文件block的datanode,然后,客户端按照顺序将文件block逐个传给相应datanode,并由接收到block的datanode负责向其他datanode复制block副本
4. 写入数据步骤详细解析
客户端向namenode通信,请求上传文件,namenode检查目标文件是否已经存在,父目录是否存在
namenode返回给客户端,告知是否可以上传
客户端请求第一个block该传输到那些datanode服务器上
namenode返回3个datanode服务器abc
客户端请求3台datanode的一台a上传数据(本质上是一个rpc调用,建立pipeline),A收到请求后会继续调用b,然后b调用c,将整个pipeline建立完成,逐级返回客户端。
客户端开始忘a上传第一个block(先从磁盘读取数据放入本地内存缓存),以packet为单位,a收到一个packet将会传给b,b传给c,a每传一个packet会放入一个应答队列等待应答
宕一个block传输完之后,客户端再次请求namenode上传第二个block的服务器
㈤ hdfs的特点有哪些
hdfs的特点一、hdfs的优点1.支持海量数据的存储:一般来说,HDFS存储的文件可以支持TB和PB级别的数据。2.检测和快速应对硬件故障:在集群环境中,硬件故障是常见性问题。因为有上千台服务器连在一起,故障率很高,因此故障检测和自动恢复hdfs文件系统的一个设计目标。假设某一个datanode挂掉之后,因为数据是有备份的,还可以从其他节点里找到。namenode通过心跳机制来检测datanode是否还存活。3.流式数据访问:(HDFS不能做到低延迟的数据访问,但是HDFS的吞吐量大)=》Hadoop适用于处理离线数据,不适合处理实时数据。HDFS的数据处理规模比较大,应用一次需要大量的数据,同时这些应用一般都是批量处理,而不是用户交互式处理。应用程序能以流的形式访问数据库。主要的是数据的吞吐量,而不是访问速度。访问速度最终是要受制于网络和磁盘的速度,机器节点再多,也不能突破物理的局限。4.简化的一致性模型:对于外部使用用户,不需要了解hadoop底层细节,比如文件的切块,文件的存储,节点的管理。一个文件存储在HDFS上后,适合一次写入,多次读取的场景。因为存储在HDFS上的文件都是超大文件,当上传完这个文件到hadoop集群后,会进行文件切块,分发,复制等操作。如果文件被修改,会导致重新触发这个过程,而这个过程耗时是最长的。所以在hadoop里,2.0版本允许数据的追加,单不允许数据的修改。5.高容错性:数据自动保存多个副本,副本丢失后自动恢复。可构建在廉价的机器上,实现线性扩展。当集群增加新节点之后,namenode也可以感知,将数据分发和备份到相应的节点上。6.商用硬件:Hadoop并不需要运行在昂贵且高可靠的硬件上。它是设计运行在商用硬件(在各种零售店都能买到的普通硬件)的集群上的,因此至少对于庞大的集群来说,节点故障的几率还是非常高的。HDFS遇到上述故障时,被设计成能够继续运行且不让用户察觉到明显的中断。二、HDFS缺点(局限性)1、不能做到低延迟数据访问:由于hadoop针对高数据吞吐量做了优化,牺牲了获取数据的延迟,所以对于低延迟数据访问,不适合hadoop。对于低延迟的访问需求,HBase是更好的选择。2、不适合大量的小文件存储 :由于namenode将文件系统的元数据存储在内存中,因此该文件系统所能存储的文件总数受限于namenode的内存容量。根据经验,每个文件、目录和数据块的存储信息大约占150字节。因此,如果有一百万个小文件,每个小文件都会占一个数据块,那至少需要300MB内存。如果是上亿级别的,就会超出当前硬件的能力。3、修改文件:对于上传到HDFS上的文件,不支持修改文件。Hadoop2.0虽然支持了文件的追加功能,但是还是不建议对HDFS上的文件进行修改。因为效率低下。HDFS适合一次写入,然后多次读取的场景。4、不支持用户的并行写:同一时间内,只能有一个用户执行写操作。
㈥ 关于用java写程序把本地文件上传到HDFS中的问题
第一个错误是参数不对第二错误时你没有能够创建目录,不知道是不是没有权限
㈦ hdfs在上传文件的时候,如果其中一个块突然损坏了怎么办
我感觉您问的因该是:dn突然挂掉了怎么办。首先pipeline会关闭。其次acker也不会返回到从cli。其他的dn正常写入,当其他dn写入完成后会通知cli,cli会通知nn。这时候nn发现,不对啊,我的副本数是3,怎么返回的dn写入数不对啊,之后,nn会派dn去复制数据,复制完成后。也就结束了。不知道是不是你想要的!!
㈧ 如何实现让用户在网页中上传下载文件到HDFS中
hadoop计算需要在hdfs文件系统上进行,文件上传到hdfs上通常有三种方法:a hadoop自带的dfs服务,put;b hadoop的API,Writer对象可以实现这一功能;c 调用OTL可执行程序,数据从数据库直接进入hadoop hadoop计算需要在hdfs文件系统上进行,因此每次计算之前必须把需要用到的文件(我们称为原始文件)都上传到hdfs上。文件上传到hdfs上通常有三种方法: a hadoop自带的dfs服务,put; b hadoop的API,Writer对象可以实现这一功能; c 调用OTL可执行程序,数据从数据库直接进入hadoop 由于存在ETL层,因此第三种方案不予考虑 将a、b方案进行对比,如下: 1 空间:方案a在hdfs上占用空间同本地,因此假设只上传日志文件,则保存一个月日志文件将消耗掉约10T空间,如果加上这期间的各种维表、事实表,将占用大约25T空间 方案b经测试,压缩比大约为3~4:1,因此假设hdfs空间为100T,原来只能保存约4个月的数据,现在可以保存约1年 2 上传时间:方案a的上传时间经测试,200G数据上传约1小时 方案b的上传时间,程序不做任何优化,大约是以上的4~6倍,但存在一定程度提升速度的余地 3 运算时间:经过对200G数据,大约4亿条记录的测试,如果程序以IO操作为主,则压缩数据的计算可以提高大约50%的速度,但如果程序以内存操作为主,则只能提高5%~10%的速度 4 其它:未压缩的数据还有一个好处是可以直接在hdfs上查看原始数据。压缩数据想看原始数据只能用程序把它导到本地,或者利用本地备份数据 压缩格式:按照hadoop api的介绍,压缩格式分两种:BLOCK和RECORD,其中RECORD是只对value进行压缩,一般采用BLOCK进行压缩。 对压缩文件进行计算,需要用SequenceFileInputFormat类来读入压缩文件,以下是计算程序的典型配置代码:JobConf conf = new JobConf(getConf(), log.class); conf.setJobName(”log”); conf.setOutputKeyClass(Text.class);//set the map output key type conf.setOutputValueClass(Text.class);//set the map output value type conf.setMapperClass(MapClass.class); //conf.setCombinerClass(Rece.class);//set the combiner class ,if havenot, use Recuce class for default conf.setRecerClass(Rece.class); conf.setInputFormat(SequenceFileInputFormat.class);//necessary if use compress 接下来的处理与非压缩格式的处理一样
㈨ 如何远程上传文件到hadoop中
全用以下命令来上传文件源到Hadoop上: hadoop fs -put local_file_name /user/hadoop/其中,/user/hadoop/为HDFS上的路径。local_file_name为需要上传的文件名。