`
bupt04406
  • 浏览: 343626 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hive insert overwrite into

    博客分类:
  • Hive
阅读更多
CREATE TABLE records (year STRING, temperature INT, quality INT)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t';

LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;


hive> explain
    > LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
  (TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)

STAGE DEPENDENCIES:
  Stage-0 is a root stage
  Stage-1 depends on stages: Stage-0

STAGE PLANS:
  Stage: Stage-0
    Copy
      source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
      destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_00-10-28_984_2803781885868135028/-ext-10000

  Stage: Stage-1
    Move Operator
      tables:
          replace: true
          table:
              input format: org.apache.hadoop.mapred.TextInputFormat
              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: records


Time taken: 0.333 seconds



(TOK_LOAD
  '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
  (TOK_TAB records)
          LOCAL
          OVERWRITE
)


hive> explain extended
    > LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
  (TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)

STAGE DEPENDENCIES:
  Stage-0 is a root stage
  Stage-1 depends on stages: Stage-0

STAGE PLANS:
  Stage: Stage-0
    Copy
      source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
      destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000

  Stage: Stage-1
    Move Operator
      tables:
          replace: true
          source: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000
          table:
              input format: org.apache.hadoop.mapred.TextInputFormat
              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              properties:
                bucket_count -1
                columns year,temperature,quality
                columns.types string:int:int
                field.delim
                file.inputformat org.apache.hadoop.mapred.TextInputFormat
                file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                location hdfs://localhost:54310/user/hive/warehouse/records
                name records
                serialization.ddl struct records { string year, i32 temperature, i32 quality}
                serialization.format
                serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                transient_lastDdlTime 1313975965
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: records
          tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10001





CliDriver:
CliDriver.main() {
   ret = cli.processLine(line);
}
CliDriver:
public int processLine(String line) {
   ret = processCmd(command);
}
CliDriver:
public int processCmd(String cmd) {
   CommandProcessor proc = CommandProcessorFactory.get(tokens[0]);
   Driver qp = (Driver) proc;
   ret = qp.run(cmd).getResponseCode();
}
Driver:
public CommandProcessorResponse run(String command) {
   // command = LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records

   int ret = compile(command);
   ret = execute();
}

Driver:
  public int compile(String command) {
        ctx = new Context(conf); //
       
         SemanticAnalyzerFactory sfactory = new SemanticAnalyzerFactory(conf);
         BaseSemanticAnalyzer sem = sfactory.get(tree); // return new LoadSemanticAnalyzer(conf);
         sem.analyze(tree, ctx);
        
  }


  public Context(Configuration conf) throws IOException {
    this(conf, generateExecutionId());
  }

  /**
   * Create a Context with a given executionId.  ExecutionId, together with
   * user name and conf, will determine the temporary directory locations.
   */
  public Context(Configuration conf, String executionId) throws IOException {
    this.conf = conf;
    this.executionId = executionId; //hive_2011-08-21_00-02-22_445_7799135143086468923

    // non-local tmp location is configurable. however it is the same across
    // all external file systems
    nonLocalScratchPath =
        new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
                executionId);  // /tmp/hive-tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923

    // local tmp location is not configurable for now
    localScratchDir = System.getProperty("java.io.tmpdir")
           + Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR
           + executionId;  // /tmp/tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923
  }


public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
  public void analyzeInternal(ASTNode ast) throws SemanticException {
    isLocal = false;
    isOverWrite = false;
    Tree fromTree = ast.getChild(0); //  '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
    Tree tableTree = ast.getChild(1); // TOK_TAB

    if (ast.getChildCount() == 4) { // true
      isLocal = true;  //
      isOverWrite = true; //
    }

    if (ast.getChildCount() == 3) {
      if (ast.getChild(2).getText().toLowerCase().equals("local")) {
        isLocal = true;
      } else {
        isOverWrite = true;
      }
    }

    // initialize load path
    URI fromURI;
    try {
      String fromPath = stripQuotes(fromTree.getText()); ///home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
      fromURI = initializeFromURI(fromPath);  //  file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
    } catch (IOException e) {
      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
          .getMessage()), e);
    } catch (URISyntaxException e) {
      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
          .getMessage()), e);
    }

    // initialize destination table/partition
    tableSpec ts = new tableSpec(db, conf, (ASTNode) tableTree); //
    Table tbl = ts.tableHandle;
    if (tbl.isView()) {
      throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
    }
    if (tbl.isNonNative()) {
      throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
    }

    genAuthorizeEntry(db.getCurrentDatabase(), tbl.getTableName(), null, Privilege.INSERT_PRIV);

    URI toURI = (ts.partHandle != null) ? ts.partHandle.getDataLocation()
        : ts.tableHandle.getDataLocation(); // hdfs://localhost:54310/user/hive/warehouse/records

    List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
    if (isOverWrite && (parts != null && parts.size() > 0)
        && (ts.partSpec == null || ts.partSpec.size() == 0)) {
      throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
    }

    // make sure the arguments make sense
    applyConstraints(fromURI, toURI, fromTree, isLocal);

    Task<? extends Serializable> rTask = null;

    // create copy work
    if (isLocal) {  //true
      // if the local keyword is specified - we will always make a copy. this
      // might seem redundant in the case
      // that the hive warehouse is also located in the local file system - but
      // that's just a test case.
      String copyURIStr = ctx.getExternalTmpFileURI(toURI); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
      URI copyURI = URI.create(copyURIStr);
      rTask = TaskFactory.get(new CopyWork(fromURI.toString(), copyURIStr),
          conf);
      fromURI = copyURI;
    }

    // create final load/move work

    String loadTmpPath = ctx.getExternalTmpFileURI(toURI);  // hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
    Map<String, String> partSpec = ts.getPartSpec();
    if (partSpec == null) {
      partSpec = new LinkedHashMap<String, String>();
    }
    LoadTableDesc loadTableWork = new LoadTableDesc(fromURI.toString(),
        loadTmpPath, Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);

    if (rTask != null) {
      rTask.addDependentTask(TaskFactory.get(new MoveWork(getInputs(),
          getOutputs(), loadTableWork, null, true), conf));
    } else {
      rTask = TaskFactory.get(new MoveWork(getInputs(), getOutputs(),
          loadTableWork, null, true), conf);
    }

    rootTasks.add(rTask);
}



  private URI initializeFromURI(String fromPath) throws IOException,
      URISyntaxException {
    URI fromURI = new Path(fromPath).toUri();  //  /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt

    String fromScheme = fromURI.getScheme(); // null
    String fromAuthority = fromURI.getAuthority(); // null
    String path = fromURI.getPath(); // /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt

    // generate absolute path relative to current directory or hdfs home
    // directory
    if (!path.startsWith("/")) {  //不是绝对路径,执行下面的操作
      if (isLocal) {
        path = new Path(System.getProperty("user.dir"), path).toString();
      } else {
        path = new Path(new Path("/user/" + System.getProperty("user.name")),
            path).toString();
      }
    }

    // set correct scheme and authority
    if (StringUtils.isEmpty(fromScheme)) { // true
      if (isLocal) { //true
        // file for local
        fromScheme = "file"; //
      } else {
        // use default values from fs.default.name
        URI defaultURI = FileSystem.get(conf).getUri();
        fromScheme = defaultURI.getScheme();
        fromAuthority = defaultURI.getAuthority();
      }
    }

    // if scheme is specified but not authority then use the default authority
    if (fromScheme.equals("hdfs") && StringUtils.isEmpty(fromAuthority)) {
      URI defaultURI = FileSystem.get(conf).getUri();
      fromAuthority = defaultURI.getAuthority();
    }

    LOG.debug(fromScheme + "@" + fromAuthority + "@" + path);
    return new URI(fromScheme, fromAuthority, path, null, null); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
  }



    public tableSpec(Hive db, HiveConf conf, ASTNode ast)
        throws SemanticException {

      assert (ast.getToken().getType() == HiveParser.TOK_TAB);
      int childIndex = 0;
      numDynParts = 0;

      try {
        // get table metadata
        tableName = unescapeIdentifier(ast.getChild(0).getText());  //  records
        boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE);  // false
        if (testMode) {
          tableName = conf.getVar(HiveConf.ConfVars.HIVETESTMODEPREFIX)
              + tableName;
        }

        tableHandle = db.getTable(tableName);
      } catch (InvalidTableException ite) {
        throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(ast
            .getChild(0)), ite);
      } catch (HiveException e) {
        throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg(ast
            .getChild(childIndex), e.getMessage()), e);
      }
      // get partition metadata if partition specified
      if (ast.getChildCount() == 2) {  // false
        childIndex = 1;
        ASTNode partspec = (ASTNode) ast.getChild(1);
        // partSpec is a mapping from partition column name to its value.
        partSpec = new LinkedHashMap<String, String>(partspec.getChildCount());
        for (int i = 0; i < partspec.getChildCount(); ++i) {
          ASTNode partspec_val = (ASTNode) partspec.getChild(i);
          String val = null;
          if (partspec_val.getChildCount() < 2) { // DP in the form of T partition (ds, hr)
            ++numDynParts;
          } else { // in the form of T partition (ds="2010-03-03")
            val = stripQuotes(partspec_val.getChild(1).getText());
          }
          partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText().toLowerCase()), val);
        }
        // check if the partition spec is valid
        if (numDynParts > 0) {
          List<FieldSchema> parts = tableHandle.getPartitionKeys();
          int numStaPart = parts.size() - numDynParts;
          if (numStaPart == 0 &&
              conf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
            throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg());
          }
        for (FieldSchema fs: parts) {
          if (partSpec.get(fs.getName().toLowerCase()) == null) {
            if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
              throw new SemanticException(
                  ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg(ast.getChild(childIndex)));
            }
            break;
          } else {
            --numStaPart;
          }
        }
          partHandle = null;
        } else {
          try {
            // this doesn't create partition. partition is created in MoveTask
            partHandle = new Partition(tableHandle, partSpec, null);
        } catch (HiveException e) {
         throw new SemanticException(
             ErrorMsg.INVALID_PARTITION.getMsg(ast.getChild(childIndex)));
        }
        }
      }
    }




Driver.execute() {
  
}

CopyTask.execute()
  public int execute(DriverContext driverContext) {
    FileSystem dstFs = null;
    Path toPath = null;
    try {
      Path fromPath = new Path(work.getFromPath()); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
      toPath = new Path(work.getToPath()); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000

      console.printInfo("Copying data from " + fromPath.toString(), " to "
          + toPath.toString());

      FileSystem srcFs = fromPath.getFileSystem(conf); //org.apache.hadoop.fs.LocalFileSystem@1c9e4d2
      dstFs = toPath.getFileSystem(conf);

      FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);

      if (srcs == null || srcs.length == 0) {
        console.printError("No files matching path: " + fromPath.toString());
        errorMessage = "No files matching path: " + fromPath.toString();
        return 3;
      }

      if (!dstFs.mkdirs(toPath)) {
        console
            .printError("Cannot make target directory: " + toPath.toString());
        errorMessage = "Cannot make target directory: " + toPath.toString();
        return 2;
      }

      for (FileStatus oneSrc : srcs) {
        LOG.debug("Copying file: " + oneSrc.getPath().toString());
        if (!FileUtil.copy(srcFs, oneSrc.getPath(), dstFs, toPath, false, // delete
            // source
            true, // overwrite destination
            conf)) {
          console.printError("Failed to copy: '" + oneSrc.getPath().toString()
              + "to: '" + toPath.toString() + "'");
          errorMessage = "Failed to copy: '" + oneSrc.getPath().toString()
              + "to: '" + toPath.toString() + "'";
          return 1;
        }
      }
      return 0;

    } catch (Exception e) {
      console.printError("Failed with exception " + e.getMessage(), "\n"
          + StringUtils.stringifyException(e));
      errorMessage = "Failed with exception " + e.getMessage()+ "\n"
          + StringUtils.stringifyException(e);
      return (1);
    }
  }




MoveTask:
  public int execute(DriverContext driverContext) {

    try {
      // Do any hive related operations like moving tables and files
      // to appropriate locations
      LoadFileDesc lfd = work.getLoadFileWork();  // null
      if (lfd != null) {
        Path targetPath = new Path(lfd.getTargetDir());
        Path sourcePath = new Path(lfd.getSourceDir());
        FileSystem fs = sourcePath.getFileSystem(conf);
        if (lfd.getIsDfsDir()) {
          // Just do a rename on the URIs, they belong to the same FS
          String mesg = "Moving data to: " + lfd.getTargetDir();
          String mesg_detail = " from " + lfd.getSourceDir();
          console.printInfo(mesg, mesg_detail);

          // delete the output directory if it already exists
          fs.delete(targetPath, true);
          // if source exists, rename. Otherwise, create a empty directory
          if (fs.exists(sourcePath)) {
            if (!fs.rename(sourcePath, targetPath)) {
              throw new HiveException("Unable to rename: " + sourcePath
                  + " to: " + targetPath);
            }
          } else if (!fs.mkdirs(targetPath)) {
            throw new HiveException("Unable to make directory: " + targetPath);
          }
        } else {
          // This is a local file
          String mesg = "Copying data to local directory " + lfd.getTargetDir();
          String mesg_detail = " from " + lfd.getSourceDir();
          console.printInfo(mesg, mesg_detail);

          // delete the existing dest directory
          LocalFileSystem dstFs = FileSystem.getLocal(conf);

          if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
            console.printInfo(mesg, mesg_detail);
            // if source exists, rename. Otherwise, create a empty directory
            if (fs.exists(sourcePath)) {
              fs.copyToLocalFile(sourcePath, targetPath);
            } else {
              if (!dstFs.mkdirs(targetPath)) {
                throw new HiveException("Unable to make local directory: "
                    + targetPath);
              }
            }
          } else {
            throw new AccessControlException(
                "Unable to delete the existing destination directory: "
                + targetPath);
          }
        }
      }

      // Next we do this for tables and partitions
      LoadTableDesc tbd = work.getLoadTableWork();
      if (tbd != null) {
        StringBuilder mesg = new StringBuilder("Loading data to table ")
            .append( tbd.getTable().getTableName());
        if (tbd.getPartitionSpec().size() > 0) {
          mesg.append(" partition (");
          Map<String, String> partSpec = tbd.getPartitionSpec();
          for (String key: partSpec.keySet()) {
            mesg.append(key).append('=').append(partSpec.get(key)).append(", ");
          }
          mesg.setLength(mesg.length()-2);
          mesg.append(')');
        }
        String mesg_detail = " from " + tbd.getSourceDir();
        console.printInfo(mesg.toString(), mesg_detail);  //11/08/21 21:58:44 INFO exec.MoveTask: Loading data to table records from hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000

        Table table = db.getTable(db.getCurrentDatabase(), tbd
            .getTable().getTableName());

        if (work.getCheckFileFormat()) {
          // Get all files from the src directory
          FileStatus[] dirs;
          ArrayList<FileStatus> files;
          FileSystem fs;
          try {
            fs = FileSystem.get(table.getDataLocation(), conf);
            dirs = fs.globStatus(new Path(tbd.getSourceDir()));
            files = new ArrayList<FileStatus>();
            for (int i = 0; (dirs != null && i < dirs.length); i++) {
              files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath())));
              // We only check one file, so exit the loop when we have at least
              // one.
              if (files.size() > 0) {
                break;
              }
            }
          } catch (IOException e) {
            throw new HiveException(
                "addFiles: filesystem error in check phase", e);
          }
          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
            // Check if the file format of the file matches that of the table.
            boolean flag = HiveFileFormatUtils.checkInputFormat(
                fs, conf, tbd.getTable().getInputFileFormatClass(), files);
            if (!flag) {
              throw new HiveException(
                  "Wrong file format. Please check the file's format.");
            }
          }
        }

        // Create a data container
        DataContainer dc = null;
        if (tbd.getPartitionSpec().size() == 0) {
          dc = new DataContainer(table.getTTable());
          db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable()
              .getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir()));  // 替换。
          if (work.getOutputs() != null) {
            work.getOutputs().add(new WriteEntity(table));
          }
        } else {
          LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
          // deal with dynamic partitions
          DynamicPartitionCtx dpCtx = tbd.getDPCtx();
          if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
            // load the list of DP partitions and return the list of partition specs
            ArrayList<LinkedHashMap<String, String>> dp =
              db.loadDynamicPartitions(
                  new Path(tbd.getSourceDir()),
                  tbd.getTable().getTableName(),
                tbd.getPartitionSpec(),
                tbd.getReplace(),
                new Path(tbd.getTmpDir()),
                dpCtx.getNumDPCols());
            // for each partition spec, get the partition
            // and put it to WriteEntity for post-exec hook
            for (LinkedHashMap<String, String> partSpec: dp) {
              Partition partn = db.getPartition(table, partSpec, false);

              WriteEntity enty = new WriteEntity(partn);
              if (work.getOutputs() != null) {
                work.getOutputs().add(enty);
              }
              // Need to update the queryPlan's output as well so that post-exec hook get executed.
              // This is only needed for dynamic partitioning since for SP the the WriteEntity is
              // constructed at compile time and the queryPlan already contains that.
              // For DP, WriteEntity creation is deferred at this stage so we need to update
              // queryPlan here.
              if (queryPlan.getOutputs() == null) {
                queryPlan.setOutputs(new HashSet<WriteEntity>());
              }
              queryPlan.getOutputs().add(enty);

              // update columnar lineage for each partition
              dc = new DataContainer(table.getTTable(), partn.getTPartition());

              if (SessionState.get() != null) {
                SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
                    table.getCols());
              }

              console.printInfo("\tLoading partition " + partSpec);
            }
            dc = null; // reset data container to prevent it being added again.
          } else { // static partitions
            db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
                tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
          Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
          dc = new DataContainer(table.getTTable(), partn.getTPartition());
          // add this partition to post-execution hook
          if (work.getOutputs() != null) {
            work.getOutputs().add(new WriteEntity(partn));
          }
         }
        }
        if (SessionState.get() != null && dc != null) {
          SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
              table.getCols());
        }
      }

      return 0;
    } catch (Exception e) {
      console.printError("Failed with exception " + e.getMessage(), "\n"
          + StringUtils.stringifyException(e));
      errorMessage = "Failed with exception " + e.getMessage() + "\n"
          + StringUtils.stringifyException(e);
      return (1);
    }
  }


Hive.java:
  public void loadTable(Path loadPath, String tableName, boolean replace,
      Path tmpDirPath) throws HiveException {
    Table tbl = getTable(tableName); //records

    if (replace) {  //true
      tbl.replaceFiles(loadPath, tmpDirPath); 
// loadPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
// tmpDirPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
    } else {
      tbl.copyFiles(loadPath);
    }
  }

Table:
  protected void replaceFiles(Path srcf, Path tmpd) throws HiveException {
    FileSystem fs;
    try {
      fs = FileSystem.get(getDataLocation(), Hive.get().getConf());
      Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), fs, tmpd);
    } catch (IOException e) {
      throw new HiveException("addFiles: filesystem error in check phase", e);
    }
  }
getDataLocation() // hdfs://localhost:54310/user/hive/warehouse/records



  static protected void replaceFiles(Path srcf, Path destf, FileSystem fs,
      Path tmppath) throws HiveException {
    FileStatus[] srcs;
    try {
      srcs = fs.globStatus(srcf);  // srcf = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
    } catch (IOException e) {
      throw new HiveException("addFiles: filesystem error in check phase", e);
    }
    if (srcs == null) {
      LOG.info("No sources specified to move: " + srcf);
      return;
      // srcs = new FileStatus[0]; Why is this needed?
    }
    checkPaths(fs, srcs, destf, true);   //   destf = /user/hive/warehouse/records

    try {
      fs.mkdirs(tmppath);  //  tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
      for (FileStatus src : srcs) {
        FileStatus[] items = fs.listStatus(src.getPath());
        for (int j = 0; j < items.length; j++) {
          if (!fs.rename(items[j].getPath(), new Path(tmppath, items[j]
              .getPath().getName()))) {
//
//   public boolean rename(Path src, Path dst) throws IOException {
//    return dfs.rename(getPathName(src), getPathName(dst));
//  }
// src = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000/sample.txt
// dst = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001/sample.txt

            throw new HiveException("Error moving: " + items[j].getPath()
                + " into: " + tmppath);
          }
        }
      }

      // point of no return
      boolean b = fs.delete(destf, true);  // destf = /user/hive/warehouse/records
      LOG.debug("Deleting:" + destf.toString() + ",Status:" + b);

      // create the parent directory otherwise rename can fail if the parent
      // doesn't exist
      if (!fs.mkdirs(destf.getParent())) {
        throw new HiveException("Unable to create destination directory: "
            + destf.getParent().toString());
      }

      b = fs.rename(tmppath, destf); 
// tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
// destf = /user/hive/warehouse/records
      if (!b) {
        throw new HiveException("Unable to move results from " + tmppath
            + " to destination directory: " + destf.getParent().toString());
      }
      LOG.debug("Renaming:" + tmppath.toString() + ",Status:" + b);

    } catch (IOException e) {
      throw new HiveException("replaceFiles: error while moving files from "
          + tmppath + " to " + destf + "!!!", e);
    }
    // In case of error, we should leave the temporary data there, so
    // that user can recover the data if necessary.
  }


  static private void checkPaths(FileSystem fs, FileStatus[] srcs, Path destf,
      boolean replace) throws HiveException {
    try {
      for (FileStatus src : srcs) {
        FileStatus[] items = fs.listStatus(src.getPath());
        for (FileStatus item : items) {

          if (Utilities.isTempPath(item)) {
            // This check is redundant because temp files are removed by
            // execution layer before
            // calling loadTable/Partition. But leaving it in just in case.
            fs.delete(item.getPath(), true);
            continue;
          }
          if (item.isDir()) {
            throw new HiveException("checkPaths: " + src.getPath()
                + " has nested directory" + item.getPath());
          }
          Path tmpDest = new Path(destf, item.getPath().getName()); ///user/hive/warehouse/records/sample.txt
          if (!replace && fs.exists(tmpDest)) {   //  replace = true
            throw new HiveException("checkPaths: " + tmpDest
                + " already exists");
          }
        }
      }
    } catch (IOException e) {
      throw new HiveException("checkPaths: filesystem error in check phase", e);
    }
  }


分享到:
评论
1 楼 lvyuan1234 2017-02-24  
你好,你那个sample.txt文件可以分享给我吗

相关推荐

    Hive那些事儿之八-大数据踩过的坑——Hive insert

    insert into db_name.table_name_1 ( col_1,col2,col3 ) with temp_table_1 as ( select id,col_2 from db_name.table_name_2 where id = condatition ), temp_table_2 as ( select id,col_3 from db_name....

    使用Python构造hive insert语句说明

    mysql可以使用nevicat导出insert语句用于数据构造,但是hive无法直接导出insert语句。我们可以先打印在hive命令行,然后使用脚本拼装成insert语句,进行数据构造。 手动copy到python脚本进行sql语句构造: def ...

    Hive几种数据导入方式

    Ambari搭建hadoop环境下,hive的数据导入

    网站流量分析项目hive sql语句1

    hive&gt;insert overwrite table tongji select '2017-07-09',tab1.pv,tab2.uv,tab3.vv,t

    hive

    hive hive hive hive hive hive hive hive hive hive hive hive

    Hive3.1.2编译源码

    使用hive3.1.2和spark3.0.0配置hive on spark的时候,发现官方下载的hive3.1.2和spark3.0.0不兼容,hive3.1.2对应的版本是spark2.3.0,而spark3.0.0对应的hadoop版本是hadoop2.6或hadoop2.7。 所以,如果想要使用高...

    hive-jdbc hive jdbc驱动

    hive-jdbc

    分布式数据仓库Hive大全

    2.6.1 Inserting data into Hive Tables from queries 21 2.6.2 Writing data into filesystem from queries 21 2.7 Cli 22 2.7.1 Hive Command line Options 22 2.7.2 Hive interactive Shell Command 24 2.7.3 ...

    Hive表生成工具,Hive表生成工具Hive表生成工具

    Hive表生成工具,Hive表生成工具Hive表生成工具

    Hive使用手册Hive使用手册

    1 Hive 概念与连接使用: 2 2 Hive支持的数据类型: 2 2.1原子数据类型: 2 2.2复杂数据类型: 2 2.3 Hive类型转换: 3 3 Hive创建/删除数据库 3 3.1创建数据库: 3 3.2 删除数据库: 3 4 Hive 表相关语句 3 4.1 Hive ...

    Hive用户指南

    2.6.1 Inserting data into Hive Tables from queries 21 2.6.2 Writing data into filesystem from queries 21 2.7 Cli 22 2.7.1 Hive Command line Options 22 2.7.2 Hive interactive Shell Command 24 2.7.3 ...

    hive-probabilistic-utils:hive 的概率数据结构和算法

    概率蜂巢 (Alpha) hive 中的概率数据结构和算法。 目前只实现了 hyperloglog。 概要 -- estimate the cardinality of SELECT * FROM src GROUP BY col1, col2; SELECT hll(col1, col2)... INSERT OVERWRITE TABLE hll

    《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf

    《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第...

    Hive用户指南(Hive_user_guide)_中文版.pdf

    INTO ... VALUES 添加数据,使用 UPDATE ... SET 修改数据。 5. 索引。之前已经说过, Hive 在加载数据的过程中不会对数据进行任何处理,甚至不会 对数据进行扫描,因此也没有对数据中的某些 Key 建立索引。 Hive ...

    python脚本中hivesql字段注释格式化

    # insert overwrite table student_search # select # id, -- id编号 # name, -- 姓名 # age, -- 年龄 # address -- 住址 # from student -- 学生表 # where 1=1 # and name = '张三' -- 姓名为张三 # -------------...

    hive on tez 常见报错问题收集

    1)Failing because I am unlikely to write too. 2)Caused by: java.lang.OutOfMemoryError: Java heap space ...5)hive on tez 最终insert的表如果使用到union all 时会导致直接查询结果表数据为空的

    hive-3.1.1安装包

    Hive是一个基于Hadoop的数据仓库工具,它本身并不存储数据,部署在Hadoop集群上,数据是存储在HDFS上的. Hive所建的表在HDFS上对应的是一个文件夹,表的内容对应的是一个文件。它不仅可以存储大量的数据而且可以对...

    大数据与云计算技术 Hadoop之Hive详解三.ppt

    1、 load data local inpath '/input/files/tb.txt' overwrite into table tb; LOCAL ,就是从HDFS加载 OVERWRITE意味着,数据表已经存在的数据将被删除。省略OVERWRITE,数据文件将会添加到原有数据列表里 2 、...

Global site tag (gtag.js) - Google Analytics