- 浏览: 343626 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
CREATE TABLE records (year STRING, temperature INT, quality INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
hive> explain
> LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
(TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-1 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-0
Copy
source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_00-10-28_984_2803781885868135028/-ext-10000
Stage: Stage-1
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: records
Time taken: 0.333 seconds
(TOK_LOAD
'/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
(TOK_TAB records)
LOCAL
OVERWRITE
)
hive> explain extended
> LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
(TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-1 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-0
Copy
source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000
Stage: Stage-1
Move Operator
tables:
replace: true
source: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns year,temperature,quality
columns.types string:int:int
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/records
name records
serialization.ddl struct records { string year, i32 temperature, i32 quality}
serialization.format
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1313975965
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: records
tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10001
CliDriver:
CliDriver.main() {
ret = cli.processLine(line);
}
CliDriver:
public int processLine(String line) {
ret = processCmd(command);
}
CliDriver:
public int processCmd(String cmd) {
CommandProcessor proc = CommandProcessorFactory.get(tokens[0]);
Driver qp = (Driver) proc;
ret = qp.run(cmd).getResponseCode();
}
Driver:
public CommandProcessorResponse run(String command) {
// command = LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records
int ret = compile(command);
ret = execute();
}
Driver:
public int compile(String command) {
ctx = new Context(conf); //
SemanticAnalyzerFactory sfactory = new SemanticAnalyzerFactory(conf);
BaseSemanticAnalyzer sem = sfactory.get(tree); // return new LoadSemanticAnalyzer(conf);
sem.analyze(tree, ctx);
}
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
}
/**
* Create a Context with a given executionId. ExecutionId, together with
* user name and conf, will determine the temporary directory locations.
*/
public Context(Configuration conf, String executionId) throws IOException {
this.conf = conf;
this.executionId = executionId; //hive_2011-08-21_00-02-22_445_7799135143086468923
// non-local tmp location is configurable. however it is the same across
// all external file systems
nonLocalScratchPath =
new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
executionId); // /tmp/hive-tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923
// local tmp location is not configurable for now
localScratchDir = System.getProperty("java.io.tmpdir")
+ Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR
+ executionId; // /tmp/tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923
}
public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
public void analyzeInternal(ASTNode ast) throws SemanticException {
isLocal = false;
isOverWrite = false;
Tree fromTree = ast.getChild(0); // '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
Tree tableTree = ast.getChild(1); // TOK_TAB
if (ast.getChildCount() == 4) { // true
isLocal = true; //
isOverWrite = true; //
}
if (ast.getChildCount() == 3) {
if (ast.getChild(2).getText().toLowerCase().equals("local")) {
isLocal = true;
} else {
isOverWrite = true;
}
}
// initialize load path
URI fromURI;
try {
String fromPath = stripQuotes(fromTree.getText()); ///home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
fromURI = initializeFromURI(fromPath); // file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
} catch (IOException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
.getMessage()), e);
} catch (URISyntaxException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
.getMessage()), e);
}
// initialize destination table/partition
tableSpec ts = new tableSpec(db, conf, (ASTNode) tableTree); //
Table tbl = ts.tableHandle;
if (tbl.isView()) {
throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
}
if (tbl.isNonNative()) {
throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
}
genAuthorizeEntry(db.getCurrentDatabase(), tbl.getTableName(), null, Privilege.INSERT_PRIV);
URI toURI = (ts.partHandle != null) ? ts.partHandle.getDataLocation()
: ts.tableHandle.getDataLocation(); // hdfs://localhost:54310/user/hive/warehouse/records
List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
if (isOverWrite && (parts != null && parts.size() > 0)
&& (ts.partSpec == null || ts.partSpec.size() == 0)) {
throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
}
// make sure the arguments make sense
applyConstraints(fromURI, toURI, fromTree, isLocal);
Task<? extends Serializable> rTask = null;
// create copy work
if (isLocal) { //true
// if the local keyword is specified - we will always make a copy. this
// might seem redundant in the case
// that the hive warehouse is also located in the local file system - but
// that's just a test case.
String copyURIStr = ctx.getExternalTmpFileURI(toURI); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
URI copyURI = URI.create(copyURIStr);
rTask = TaskFactory.get(new CopyWork(fromURI.toString(), copyURIStr),
conf);
fromURI = copyURI;
}
// create final load/move work
String loadTmpPath = ctx.getExternalTmpFileURI(toURI); // hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
Map<String, String> partSpec = ts.getPartSpec();
if (partSpec == null) {
partSpec = new LinkedHashMap<String, String>();
}
LoadTableDesc loadTableWork = new LoadTableDesc(fromURI.toString(),
loadTmpPath, Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
if (rTask != null) {
rTask.addDependentTask(TaskFactory.get(new MoveWork(getInputs(),
getOutputs(), loadTableWork, null, true), conf));
} else {
rTask = TaskFactory.get(new MoveWork(getInputs(), getOutputs(),
loadTableWork, null, true), conf);
}
rootTasks.add(rTask);
}
private URI initializeFromURI(String fromPath) throws IOException,
URISyntaxException {
URI fromURI = new Path(fromPath).toUri(); // /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
String fromScheme = fromURI.getScheme(); // null
String fromAuthority = fromURI.getAuthority(); // null
String path = fromURI.getPath(); // /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
// generate absolute path relative to current directory or hdfs home
// directory
if (!path.startsWith("/")) { //不是绝对路径,执行下面的操作
if (isLocal) {
path = new Path(System.getProperty("user.dir"), path).toString();
} else {
path = new Path(new Path("/user/" + System.getProperty("user.name")),
path).toString();
}
}
// set correct scheme and authority
if (StringUtils.isEmpty(fromScheme)) { // true
if (isLocal) { //true
// file for local
fromScheme = "file"; //
} else {
// use default values from fs.default.name
URI defaultURI = FileSystem.get(conf).getUri();
fromScheme = defaultURI.getScheme();
fromAuthority = defaultURI.getAuthority();
}
}
// if scheme is specified but not authority then use the default authority
if (fromScheme.equals("hdfs") && StringUtils.isEmpty(fromAuthority)) {
URI defaultURI = FileSystem.get(conf).getUri();
fromAuthority = defaultURI.getAuthority();
}
LOG.debug(fromScheme + "@" + fromAuthority + "@" + path);
return new URI(fromScheme, fromAuthority, path, null, null); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
}
public tableSpec(Hive db, HiveConf conf, ASTNode ast)
throws SemanticException {
assert (ast.getToken().getType() == HiveParser.TOK_TAB);
int childIndex = 0;
numDynParts = 0;
try {
// get table metadata
tableName = unescapeIdentifier(ast.getChild(0).getText()); // records
boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); // false
if (testMode) {
tableName = conf.getVar(HiveConf.ConfVars.HIVETESTMODEPREFIX)
+ tableName;
}
tableHandle = db.getTable(tableName);
} catch (InvalidTableException ite) {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(ast
.getChild(0)), ite);
} catch (HiveException e) {
throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg(ast
.getChild(childIndex), e.getMessage()), e);
}
// get partition metadata if partition specified
if (ast.getChildCount() == 2) { // false
childIndex = 1;
ASTNode partspec = (ASTNode) ast.getChild(1);
// partSpec is a mapping from partition column name to its value.
partSpec = new LinkedHashMap<String, String>(partspec.getChildCount());
for (int i = 0; i < partspec.getChildCount(); ++i) {
ASTNode partspec_val = (ASTNode) partspec.getChild(i);
String val = null;
if (partspec_val.getChildCount() < 2) { // DP in the form of T partition (ds, hr)
++numDynParts;
} else { // in the form of T partition (ds="2010-03-03")
val = stripQuotes(partspec_val.getChild(1).getText());
}
partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText().toLowerCase()), val);
}
// check if the partition spec is valid
if (numDynParts > 0) {
List<FieldSchema> parts = tableHandle.getPartitionKeys();
int numStaPart = parts.size() - numDynParts;
if (numStaPart == 0 &&
conf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg());
}
for (FieldSchema fs: parts) {
if (partSpec.get(fs.getName().toLowerCase()) == null) {
if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
throw new SemanticException(
ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg(ast.getChild(childIndex)));
}
break;
} else {
--numStaPart;
}
}
partHandle = null;
} else {
try {
// this doesn't create partition. partition is created in MoveTask
partHandle = new Partition(tableHandle, partSpec, null);
} catch (HiveException e) {
throw new SemanticException(
ErrorMsg.INVALID_PARTITION.getMsg(ast.getChild(childIndex)));
}
}
}
}
Driver.execute() {
}
CopyTask.execute()
public int execute(DriverContext driverContext) {
FileSystem dstFs = null;
Path toPath = null;
try {
Path fromPath = new Path(work.getFromPath()); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
toPath = new Path(work.getToPath()); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
console.printInfo("Copying data from " + fromPath.toString(), " to "
+ toPath.toString());
FileSystem srcFs = fromPath.getFileSystem(conf); //org.apache.hadoop.fs.LocalFileSystem@1c9e4d2
dstFs = toPath.getFileSystem(conf);
FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
if (srcs == null || srcs.length == 0) {
console.printError("No files matching path: " + fromPath.toString());
errorMessage = "No files matching path: " + fromPath.toString();
return 3;
}
if (!dstFs.mkdirs(toPath)) {
console
.printError("Cannot make target directory: " + toPath.toString());
errorMessage = "Cannot make target directory: " + toPath.toString();
return 2;
}
for (FileStatus oneSrc : srcs) {
LOG.debug("Copying file: " + oneSrc.getPath().toString());
if (!FileUtil.copy(srcFs, oneSrc.getPath(), dstFs, toPath, false, // delete
// source
true, // overwrite destination
conf)) {
console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+ "to: '" + toPath.toString() + "'");
errorMessage = "Failed to copy: '" + oneSrc.getPath().toString()
+ "to: '" + toPath.toString() + "'";
return 1;
}
}
return 0;
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
+ StringUtils.stringifyException(e));
errorMessage = "Failed with exception " + e.getMessage()+ "\n"
+ StringUtils.stringifyException(e);
return (1);
}
}
MoveTask:
public int execute(DriverContext driverContext) {
try {
// Do any hive related operations like moving tables and files
// to appropriate locations
LoadFileDesc lfd = work.getLoadFileWork(); // null
if (lfd != null) {
Path targetPath = new Path(lfd.getTargetDir());
Path sourcePath = new Path(lfd.getSourceDir());
FileSystem fs = sourcePath.getFileSystem(conf);
if (lfd.getIsDfsDir()) {
// Just do a rename on the URIs, they belong to the same FS
String mesg = "Moving data to: " + lfd.getTargetDir();
String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the output directory if it already exists
fs.delete(targetPath, true);
// if source exists, rename. Otherwise, create a empty directory
if (fs.exists(sourcePath)) {
if (!fs.rename(sourcePath, targetPath)) {
throw new HiveException("Unable to rename: " + sourcePath
+ " to: " + targetPath);
}
} else if (!fs.mkdirs(targetPath)) {
throw new HiveException("Unable to make directory: " + targetPath);
}
} else {
// This is a local file
String mesg = "Copying data to local directory " + lfd.getTargetDir();
String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the existing dest directory
LocalFileSystem dstFs = FileSystem.getLocal(conf);
if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
console.printInfo(mesg, mesg_detail);
// if source exists, rename. Otherwise, create a empty directory
if (fs.exists(sourcePath)) {
fs.copyToLocalFile(sourcePath, targetPath);
} else {
if (!dstFs.mkdirs(targetPath)) {
throw new HiveException("Unable to make local directory: "
+ targetPath);
}
}
} else {
throw new AccessControlException(
"Unable to delete the existing destination directory: "
+ targetPath);
}
}
}
// Next we do this for tables and partitions
LoadTableDesc tbd = work.getLoadTableWork();
if (tbd != null) {
StringBuilder mesg = new StringBuilder("Loading data to table ")
.append( tbd.getTable().getTableName());
if (tbd.getPartitionSpec().size() > 0) {
mesg.append(" partition (");
Map<String, String> partSpec = tbd.getPartitionSpec();
for (String key: partSpec.keySet()) {
mesg.append(key).append('=').append(partSpec.get(key)).append(", ");
}
mesg.setLength(mesg.length()-2);
mesg.append(')');
}
String mesg_detail = " from " + tbd.getSourceDir();
console.printInfo(mesg.toString(), mesg_detail); //11/08/21 21:58:44 INFO exec.MoveTask: Loading data to table records from hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
Table table = db.getTable(db.getCurrentDatabase(), tbd
.getTable().getTableName());
if (work.getCheckFileFormat()) {
// Get all files from the src directory
FileStatus[] dirs;
ArrayList<FileStatus> files;
FileSystem fs;
try {
fs = FileSystem.get(table.getDataLocation(), conf);
dirs = fs.globStatus(new Path(tbd.getSourceDir()));
files = new ArrayList<FileStatus>();
for (int i = 0; (dirs != null && i < dirs.length); i++) {
files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath())));
// We only check one file, so exit the loop when we have at least
// one.
if (files.size() > 0) {
break;
}
}
} catch (IOException e) {
throw new HiveException(
"addFiles: filesystem error in check phase", e);
}
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
// Check if the file format of the file matches that of the table.
boolean flag = HiveFileFormatUtils.checkInputFormat(
fs, conf, tbd.getTable().getInputFileFormatClass(), files);
if (!flag) {
throw new HiveException(
"Wrong file format. Please check the file's format.");
}
}
}
// Create a data container
DataContainer dc = null;
if (tbd.getPartitionSpec().size() == 0) {
dc = new DataContainer(table.getTTable());
db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable()
.getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir())); // 替换。
if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(table));
}
} else {
LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
// deal with dynamic partitions
DynamicPartitionCtx dpCtx = tbd.getDPCtx();
if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
// load the list of DP partitions and return the list of partition specs
ArrayList<LinkedHashMap<String, String>> dp =
db.loadDynamicPartitions(
new Path(tbd.getSourceDir()),
tbd.getTable().getTableName(),
tbd.getPartitionSpec(),
tbd.getReplace(),
new Path(tbd.getTmpDir()),
dpCtx.getNumDPCols());
// for each partition spec, get the partition
// and put it to WriteEntity for post-exec hook
for (LinkedHashMap<String, String> partSpec: dp) {
Partition partn = db.getPartition(table, partSpec, false);
WriteEntity enty = new WriteEntity(partn);
if (work.getOutputs() != null) {
work.getOutputs().add(enty);
}
// Need to update the queryPlan's output as well so that post-exec hook get executed.
// This is only needed for dynamic partitioning since for SP the the WriteEntity is
// constructed at compile time and the queryPlan already contains that.
// For DP, WriteEntity creation is deferred at this stage so we need to update
// queryPlan here.
if (queryPlan.getOutputs() == null) {
queryPlan.setOutputs(new HashSet<WriteEntity>());
}
queryPlan.getOutputs().add(enty);
// update columnar lineage for each partition
dc = new DataContainer(table.getTTable(), partn.getTPartition());
if (SessionState.get() != null) {
SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
table.getCols());
}
console.printInfo("\tLoading partition " + partSpec);
}
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
dc = new DataContainer(table.getTTable(), partn.getTPartition());
// add this partition to post-execution hook
if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(partn));
}
}
}
if (SessionState.get() != null && dc != null) {
SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
table.getCols());
}
}
return 0;
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
+ StringUtils.stringifyException(e));
errorMessage = "Failed with exception " + e.getMessage() + "\n"
+ StringUtils.stringifyException(e);
return (1);
}
}
Hive.java:
public void loadTable(Path loadPath, String tableName, boolean replace,
Path tmpDirPath) throws HiveException {
Table tbl = getTable(tableName); //records
if (replace) { //true
tbl.replaceFiles(loadPath, tmpDirPath);
// loadPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
// tmpDirPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
} else {
tbl.copyFiles(loadPath);
}
}
Table:
protected void replaceFiles(Path srcf, Path tmpd) throws HiveException {
FileSystem fs;
try {
fs = FileSystem.get(getDataLocation(), Hive.get().getConf());
Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), fs, tmpd);
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
}
}
getDataLocation() // hdfs://localhost:54310/user/hive/warehouse/records
static protected void replaceFiles(Path srcf, Path destf, FileSystem fs,
Path tmppath) throws HiveException {
FileStatus[] srcs;
try {
srcs = fs.globStatus(srcf); // srcf = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
}
if (srcs == null) {
LOG.info("No sources specified to move: " + srcf);
return;
// srcs = new FileStatus[0]; Why is this needed?
}
checkPaths(fs, srcs, destf, true); // destf = /user/hive/warehouse/records
try {
fs.mkdirs(tmppath); // tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
for (FileStatus src : srcs) {
FileStatus[] items = fs.listStatus(src.getPath());
for (int j = 0; j < items.length; j++) {
if (!fs.rename(items[j].getPath(), new Path(tmppath, items[j]
.getPath().getName()))) {
//
// public boolean rename(Path src, Path dst) throws IOException {
// return dfs.rename(getPathName(src), getPathName(dst));
// }
// src = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000/sample.txt
// dst = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001/sample.txt
throw new HiveException("Error moving: " + items[j].getPath()
+ " into: " + tmppath);
}
}
}
// point of no return
boolean b = fs.delete(destf, true); // destf = /user/hive/warehouse/records
LOG.debug("Deleting:" + destf.toString() + ",Status:" + b);
// create the parent directory otherwise rename can fail if the parent
// doesn't exist
if (!fs.mkdirs(destf.getParent())) {
throw new HiveException("Unable to create destination directory: "
+ destf.getParent().toString());
}
b = fs.rename(tmppath, destf);
// tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
// destf = /user/hive/warehouse/records
if (!b) {
throw new HiveException("Unable to move results from " + tmppath
+ " to destination directory: " + destf.getParent().toString());
}
LOG.debug("Renaming:" + tmppath.toString() + ",Status:" + b);
} catch (IOException e) {
throw new HiveException("replaceFiles: error while moving files from "
+ tmppath + " to " + destf + "!!!", e);
}
// In case of error, we should leave the temporary data there, so
// that user can recover the data if necessary.
}
static private void checkPaths(FileSystem fs, FileStatus[] srcs, Path destf,
boolean replace) throws HiveException {
try {
for (FileStatus src : srcs) {
FileStatus[] items = fs.listStatus(src.getPath());
for (FileStatus item : items) {
if (Utilities.isTempPath(item)) {
// This check is redundant because temp files are removed by
// execution layer before
// calling loadTable/Partition. But leaving it in just in case.
fs.delete(item.getPath(), true);
continue;
}
if (item.isDir()) {
throw new HiveException("checkPaths: " + src.getPath()
+ " has nested directory" + item.getPath());
}
Path tmpDest = new Path(destf, item.getPath().getName()); ///user/hive/warehouse/records/sample.txt
if (!replace && fs.exists(tmpDest)) { // replace = true
throw new HiveException("checkPaths: " + tmpDest
+ " already exists");
}
}
}
} catch (IOException e) {
throw new HiveException("checkPaths: filesystem error in check phase", e);
}
}
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
hive> explain
> LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
(TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-1 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-0
Copy
source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_00-10-28_984_2803781885868135028/-ext-10000
Stage: Stage-1
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: records
Time taken: 0.333 seconds
(TOK_LOAD
'/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
(TOK_TAB records)
LOCAL
OVERWRITE
)
hive> explain extended
> LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
(TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-1 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-0
Copy
source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000
Stage: Stage-1
Move Operator
tables:
replace: true
source: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns year,temperature,quality
columns.types string:int:int
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/records
name records
serialization.ddl struct records { string year, i32 temperature, i32 quality}
serialization.format
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1313975965
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: records
tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10001
CliDriver:
CliDriver.main() {
ret = cli.processLine(line);
}
CliDriver:
public int processLine(String line) {
ret = processCmd(command);
}
CliDriver:
public int processCmd(String cmd) {
CommandProcessor proc = CommandProcessorFactory.get(tokens[0]);
Driver qp = (Driver) proc;
ret = qp.run(cmd).getResponseCode();
}
Driver:
public CommandProcessorResponse run(String command) {
// command = LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records
int ret = compile(command);
ret = execute();
}
Driver:
public int compile(String command) {
ctx = new Context(conf); //
SemanticAnalyzerFactory sfactory = new SemanticAnalyzerFactory(conf);
BaseSemanticAnalyzer sem = sfactory.get(tree); // return new LoadSemanticAnalyzer(conf);
sem.analyze(tree, ctx);
}
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
}
/**
* Create a Context with a given executionId. ExecutionId, together with
* user name and conf, will determine the temporary directory locations.
*/
public Context(Configuration conf, String executionId) throws IOException {
this.conf = conf;
this.executionId = executionId; //hive_2011-08-21_00-02-22_445_7799135143086468923
// non-local tmp location is configurable. however it is the same across
// all external file systems
nonLocalScratchPath =
new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
executionId); // /tmp/hive-tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923
// local tmp location is not configurable for now
localScratchDir = System.getProperty("java.io.tmpdir")
+ Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR
+ executionId; // /tmp/tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923
}
public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
public void analyzeInternal(ASTNode ast) throws SemanticException {
isLocal = false;
isOverWrite = false;
Tree fromTree = ast.getChild(0); // '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
Tree tableTree = ast.getChild(1); // TOK_TAB
if (ast.getChildCount() == 4) { // true
isLocal = true; //
isOverWrite = true; //
}
if (ast.getChildCount() == 3) {
if (ast.getChild(2).getText().toLowerCase().equals("local")) {
isLocal = true;
} else {
isOverWrite = true;
}
}
// initialize load path
URI fromURI;
try {
String fromPath = stripQuotes(fromTree.getText()); ///home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
fromURI = initializeFromURI(fromPath); // file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
} catch (IOException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
.getMessage()), e);
} catch (URISyntaxException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
.getMessage()), e);
}
// initialize destination table/partition
tableSpec ts = new tableSpec(db, conf, (ASTNode) tableTree); //
Table tbl = ts.tableHandle;
if (tbl.isView()) {
throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
}
if (tbl.isNonNative()) {
throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
}
genAuthorizeEntry(db.getCurrentDatabase(), tbl.getTableName(), null, Privilege.INSERT_PRIV);
URI toURI = (ts.partHandle != null) ? ts.partHandle.getDataLocation()
: ts.tableHandle.getDataLocation(); // hdfs://localhost:54310/user/hive/warehouse/records
List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
if (isOverWrite && (parts != null && parts.size() > 0)
&& (ts.partSpec == null || ts.partSpec.size() == 0)) {
throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
}
// make sure the arguments make sense
applyConstraints(fromURI, toURI, fromTree, isLocal);
Task<? extends Serializable> rTask = null;
// create copy work
if (isLocal) { //true
// if the local keyword is specified - we will always make a copy. this
// might seem redundant in the case
// that the hive warehouse is also located in the local file system - but
// that's just a test case.
String copyURIStr = ctx.getExternalTmpFileURI(toURI); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
URI copyURI = URI.create(copyURIStr);
rTask = TaskFactory.get(new CopyWork(fromURI.toString(), copyURIStr),
conf);
fromURI = copyURI;
}
// create final load/move work
String loadTmpPath = ctx.getExternalTmpFileURI(toURI); // hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
Map<String, String> partSpec = ts.getPartSpec();
if (partSpec == null) {
partSpec = new LinkedHashMap<String, String>();
}
LoadTableDesc loadTableWork = new LoadTableDesc(fromURI.toString(),
loadTmpPath, Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
if (rTask != null) {
rTask.addDependentTask(TaskFactory.get(new MoveWork(getInputs(),
getOutputs(), loadTableWork, null, true), conf));
} else {
rTask = TaskFactory.get(new MoveWork(getInputs(), getOutputs(),
loadTableWork, null, true), conf);
}
rootTasks.add(rTask);
}
private URI initializeFromURI(String fromPath) throws IOException,
URISyntaxException {
URI fromURI = new Path(fromPath).toUri(); // /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
String fromScheme = fromURI.getScheme(); // null
String fromAuthority = fromURI.getAuthority(); // null
String path = fromURI.getPath(); // /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
// generate absolute path relative to current directory or hdfs home
// directory
if (!path.startsWith("/")) { //不是绝对路径,执行下面的操作
if (isLocal) {
path = new Path(System.getProperty("user.dir"), path).toString();
} else {
path = new Path(new Path("/user/" + System.getProperty("user.name")),
path).toString();
}
}
// set correct scheme and authority
if (StringUtils.isEmpty(fromScheme)) { // true
if (isLocal) { //true
// file for local
fromScheme = "file"; //
} else {
// use default values from fs.default.name
URI defaultURI = FileSystem.get(conf).getUri();
fromScheme = defaultURI.getScheme();
fromAuthority = defaultURI.getAuthority();
}
}
// if scheme is specified but not authority then use the default authority
if (fromScheme.equals("hdfs") && StringUtils.isEmpty(fromAuthority)) {
URI defaultURI = FileSystem.get(conf).getUri();
fromAuthority = defaultURI.getAuthority();
}
LOG.debug(fromScheme + "@" + fromAuthority + "@" + path);
return new URI(fromScheme, fromAuthority, path, null, null); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
}
public tableSpec(Hive db, HiveConf conf, ASTNode ast)
throws SemanticException {
assert (ast.getToken().getType() == HiveParser.TOK_TAB);
int childIndex = 0;
numDynParts = 0;
try {
// get table metadata
tableName = unescapeIdentifier(ast.getChild(0).getText()); // records
boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); // false
if (testMode) {
tableName = conf.getVar(HiveConf.ConfVars.HIVETESTMODEPREFIX)
+ tableName;
}
tableHandle = db.getTable(tableName);
} catch (InvalidTableException ite) {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(ast
.getChild(0)), ite);
} catch (HiveException e) {
throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg(ast
.getChild(childIndex), e.getMessage()), e);
}
// get partition metadata if partition specified
if (ast.getChildCount() == 2) { // false
childIndex = 1;
ASTNode partspec = (ASTNode) ast.getChild(1);
// partSpec is a mapping from partition column name to its value.
partSpec = new LinkedHashMap<String, String>(partspec.getChildCount());
for (int i = 0; i < partspec.getChildCount(); ++i) {
ASTNode partspec_val = (ASTNode) partspec.getChild(i);
String val = null;
if (partspec_val.getChildCount() < 2) { // DP in the form of T partition (ds, hr)
++numDynParts;
} else { // in the form of T partition (ds="2010-03-03")
val = stripQuotes(partspec_val.getChild(1).getText());
}
partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText().toLowerCase()), val);
}
// check if the partition spec is valid
if (numDynParts > 0) {
List<FieldSchema> parts = tableHandle.getPartitionKeys();
int numStaPart = parts.size() - numDynParts;
if (numStaPart == 0 &&
conf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg());
}
for (FieldSchema fs: parts) {
if (partSpec.get(fs.getName().toLowerCase()) == null) {
if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
throw new SemanticException(
ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg(ast.getChild(childIndex)));
}
break;
} else {
--numStaPart;
}
}
partHandle = null;
} else {
try {
// this doesn't create partition. partition is created in MoveTask
partHandle = new Partition(tableHandle, partSpec, null);
} catch (HiveException e) {
throw new SemanticException(
ErrorMsg.INVALID_PARTITION.getMsg(ast.getChild(childIndex)));
}
}
}
}
Driver.execute() {
}
CopyTask.execute()
public int execute(DriverContext driverContext) {
FileSystem dstFs = null;
Path toPath = null;
try {
Path fromPath = new Path(work.getFromPath()); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
toPath = new Path(work.getToPath()); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
console.printInfo("Copying data from " + fromPath.toString(), " to "
+ toPath.toString());
FileSystem srcFs = fromPath.getFileSystem(conf); //org.apache.hadoop.fs.LocalFileSystem@1c9e4d2
dstFs = toPath.getFileSystem(conf);
FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
if (srcs == null || srcs.length == 0) {
console.printError("No files matching path: " + fromPath.toString());
errorMessage = "No files matching path: " + fromPath.toString();
return 3;
}
if (!dstFs.mkdirs(toPath)) {
console
.printError("Cannot make target directory: " + toPath.toString());
errorMessage = "Cannot make target directory: " + toPath.toString();
return 2;
}
for (FileStatus oneSrc : srcs) {
LOG.debug("Copying file: " + oneSrc.getPath().toString());
if (!FileUtil.copy(srcFs, oneSrc.getPath(), dstFs, toPath, false, // delete
// source
true, // overwrite destination
conf)) {
console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+ "to: '" + toPath.toString() + "'");
errorMessage = "Failed to copy: '" + oneSrc.getPath().toString()
+ "to: '" + toPath.toString() + "'";
return 1;
}
}
return 0;
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
+ StringUtils.stringifyException(e));
errorMessage = "Failed with exception " + e.getMessage()+ "\n"
+ StringUtils.stringifyException(e);
return (1);
}
}
MoveTask:
public int execute(DriverContext driverContext) {
try {
// Do any hive related operations like moving tables and files
// to appropriate locations
LoadFileDesc lfd = work.getLoadFileWork(); // null
if (lfd != null) {
Path targetPath = new Path(lfd.getTargetDir());
Path sourcePath = new Path(lfd.getSourceDir());
FileSystem fs = sourcePath.getFileSystem(conf);
if (lfd.getIsDfsDir()) {
// Just do a rename on the URIs, they belong to the same FS
String mesg = "Moving data to: " + lfd.getTargetDir();
String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the output directory if it already exists
fs.delete(targetPath, true);
// if source exists, rename. Otherwise, create a empty directory
if (fs.exists(sourcePath)) {
if (!fs.rename(sourcePath, targetPath)) {
throw new HiveException("Unable to rename: " + sourcePath
+ " to: " + targetPath);
}
} else if (!fs.mkdirs(targetPath)) {
throw new HiveException("Unable to make directory: " + targetPath);
}
} else {
// This is a local file
String mesg = "Copying data to local directory " + lfd.getTargetDir();
String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the existing dest directory
LocalFileSystem dstFs = FileSystem.getLocal(conf);
if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
console.printInfo(mesg, mesg_detail);
// if source exists, rename. Otherwise, create a empty directory
if (fs.exists(sourcePath)) {
fs.copyToLocalFile(sourcePath, targetPath);
} else {
if (!dstFs.mkdirs(targetPath)) {
throw new HiveException("Unable to make local directory: "
+ targetPath);
}
}
} else {
throw new AccessControlException(
"Unable to delete the existing destination directory: "
+ targetPath);
}
}
}
// Next we do this for tables and partitions
LoadTableDesc tbd = work.getLoadTableWork();
if (tbd != null) {
StringBuilder mesg = new StringBuilder("Loading data to table ")
.append( tbd.getTable().getTableName());
if (tbd.getPartitionSpec().size() > 0) {
mesg.append(" partition (");
Map<String, String> partSpec = tbd.getPartitionSpec();
for (String key: partSpec.keySet()) {
mesg.append(key).append('=').append(partSpec.get(key)).append(", ");
}
mesg.setLength(mesg.length()-2);
mesg.append(')');
}
String mesg_detail = " from " + tbd.getSourceDir();
console.printInfo(mesg.toString(), mesg_detail); //11/08/21 21:58:44 INFO exec.MoveTask: Loading data to table records from hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
Table table = db.getTable(db.getCurrentDatabase(), tbd
.getTable().getTableName());
if (work.getCheckFileFormat()) {
// Get all files from the src directory
FileStatus[] dirs;
ArrayList<FileStatus> files;
FileSystem fs;
try {
fs = FileSystem.get(table.getDataLocation(), conf);
dirs = fs.globStatus(new Path(tbd.getSourceDir()));
files = new ArrayList<FileStatus>();
for (int i = 0; (dirs != null && i < dirs.length); i++) {
files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath())));
// We only check one file, so exit the loop when we have at least
// one.
if (files.size() > 0) {
break;
}
}
} catch (IOException e) {
throw new HiveException(
"addFiles: filesystem error in check phase", e);
}
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
// Check if the file format of the file matches that of the table.
boolean flag = HiveFileFormatUtils.checkInputFormat(
fs, conf, tbd.getTable().getInputFileFormatClass(), files);
if (!flag) {
throw new HiveException(
"Wrong file format. Please check the file's format.");
}
}
}
// Create a data container
DataContainer dc = null;
if (tbd.getPartitionSpec().size() == 0) {
dc = new DataContainer(table.getTTable());
db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable()
.getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir())); // 替换。
if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(table));
}
} else {
LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
// deal with dynamic partitions
DynamicPartitionCtx dpCtx = tbd.getDPCtx();
if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
// load the list of DP partitions and return the list of partition specs
ArrayList<LinkedHashMap<String, String>> dp =
db.loadDynamicPartitions(
new Path(tbd.getSourceDir()),
tbd.getTable().getTableName(),
tbd.getPartitionSpec(),
tbd.getReplace(),
new Path(tbd.getTmpDir()),
dpCtx.getNumDPCols());
// for each partition spec, get the partition
// and put it to WriteEntity for post-exec hook
for (LinkedHashMap<String, String> partSpec: dp) {
Partition partn = db.getPartition(table, partSpec, false);
WriteEntity enty = new WriteEntity(partn);
if (work.getOutputs() != null) {
work.getOutputs().add(enty);
}
// Need to update the queryPlan's output as well so that post-exec hook get executed.
// This is only needed for dynamic partitioning since for SP the the WriteEntity is
// constructed at compile time and the queryPlan already contains that.
// For DP, WriteEntity creation is deferred at this stage so we need to update
// queryPlan here.
if (queryPlan.getOutputs() == null) {
queryPlan.setOutputs(new HashSet<WriteEntity>());
}
queryPlan.getOutputs().add(enty);
// update columnar lineage for each partition
dc = new DataContainer(table.getTTable(), partn.getTPartition());
if (SessionState.get() != null) {
SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
table.getCols());
}
console.printInfo("\tLoading partition " + partSpec);
}
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
dc = new DataContainer(table.getTTable(), partn.getTPartition());
// add this partition to post-execution hook
if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(partn));
}
}
}
if (SessionState.get() != null && dc != null) {
SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
table.getCols());
}
}
return 0;
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
+ StringUtils.stringifyException(e));
errorMessage = "Failed with exception " + e.getMessage() + "\n"
+ StringUtils.stringifyException(e);
return (1);
}
}
Hive.java:
public void loadTable(Path loadPath, String tableName, boolean replace,
Path tmpDirPath) throws HiveException {
Table tbl = getTable(tableName); //records
if (replace) { //true
tbl.replaceFiles(loadPath, tmpDirPath);
// loadPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
// tmpDirPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
} else {
tbl.copyFiles(loadPath);
}
}
Table:
protected void replaceFiles(Path srcf, Path tmpd) throws HiveException {
FileSystem fs;
try {
fs = FileSystem.get(getDataLocation(), Hive.get().getConf());
Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), fs, tmpd);
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
}
}
getDataLocation() // hdfs://localhost:54310/user/hive/warehouse/records
static protected void replaceFiles(Path srcf, Path destf, FileSystem fs,
Path tmppath) throws HiveException {
FileStatus[] srcs;
try {
srcs = fs.globStatus(srcf); // srcf = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
}
if (srcs == null) {
LOG.info("No sources specified to move: " + srcf);
return;
// srcs = new FileStatus[0]; Why is this needed?
}
checkPaths(fs, srcs, destf, true); // destf = /user/hive/warehouse/records
try {
fs.mkdirs(tmppath); // tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
for (FileStatus src : srcs) {
FileStatus[] items = fs.listStatus(src.getPath());
for (int j = 0; j < items.length; j++) {
if (!fs.rename(items[j].getPath(), new Path(tmppath, items[j]
.getPath().getName()))) {
//
// public boolean rename(Path src, Path dst) throws IOException {
// return dfs.rename(getPathName(src), getPathName(dst));
// }
// src = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000/sample.txt
// dst = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001/sample.txt
throw new HiveException("Error moving: " + items[j].getPath()
+ " into: " + tmppath);
}
}
}
// point of no return
boolean b = fs.delete(destf, true); // destf = /user/hive/warehouse/records
LOG.debug("Deleting:" + destf.toString() + ",Status:" + b);
// create the parent directory otherwise rename can fail if the parent
// doesn't exist
if (!fs.mkdirs(destf.getParent())) {
throw new HiveException("Unable to create destination directory: "
+ destf.getParent().toString());
}
b = fs.rename(tmppath, destf);
// tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
// destf = /user/hive/warehouse/records
if (!b) {
throw new HiveException("Unable to move results from " + tmppath
+ " to destination directory: " + destf.getParent().toString());
}
LOG.debug("Renaming:" + tmppath.toString() + ",Status:" + b);
} catch (IOException e) {
throw new HiveException("replaceFiles: error while moving files from "
+ tmppath + " to " + destf + "!!!", e);
}
// In case of error, we should leave the temporary data there, so
// that user can recover the data if necessary.
}
static private void checkPaths(FileSystem fs, FileStatus[] srcs, Path destf,
boolean replace) throws HiveException {
try {
for (FileStatus src : srcs) {
FileStatus[] items = fs.listStatus(src.getPath());
for (FileStatus item : items) {
if (Utilities.isTempPath(item)) {
// This check is redundant because temp files are removed by
// execution layer before
// calling loadTable/Partition. But leaving it in just in case.
fs.delete(item.getPath(), true);
continue;
}
if (item.isDir()) {
throw new HiveException("checkPaths: " + src.getPath()
+ " has nested directory" + item.getPath());
}
Path tmpDest = new Path(destf, item.getPath().getName()); ///user/hive/warehouse/records/sample.txt
if (!replace && fs.exists(tmpDest)) { // replace = true
throw new HiveException("checkPaths: " + tmpDest
+ " already exists");
}
}
}
} catch (IOException e) {
throw new HiveException("checkPaths: filesystem error in check phase", e);
}
}
发表评论
-
hive rename table name
2013-09-18 14:28 2526hive rename tablename hive re ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2421有用户问:hive的distribute by分桶是怎么分 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11168like vs rlike vs regexp r ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8692insert overwrite table aaaa ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8240原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1173select split("accba&quo ... -
RegexSerDe
2012-03-14 09:58 1514官方示例在: https://cwiki.apache.or ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1777Hive 的 OutputCommitter publi ... -
hive LATERAL VIEW 行转列
2011-11-09 14:49 5395drop table lateralview; create ... -
hive complex type
2011-11-08 19:56 1319数据: 1,100|3,20|2,70|5,100 建表: ... -
hive转义字符
2011-10-25 16:41 6191CREATE TABLE escape (id STRING, ... -
hive 两个不同类型的columns进行比较
2011-09-19 13:46 2984select case when "ab1234&q ... -
lateral view
2011-09-18 04:04 0lateral view与udtf相关 -
udf 中获得 FileSystem
2011-09-14 10:28 0在udf中获得FileSystem,需要获得知道fs.defa ... -
hive union mapjoin
2011-09-09 16:29 0union union.q union2.q ... -
hive eclipse
2011-09-08 17:42 0eclipse-templates$ vi .classpat ... -
hive join filter
2011-09-07 23:05 0join16.q.out hive.optimize.ppd ... -
hive limit
2011-09-07 21:02 0limit 关键字: input4_limit.q.out ... -
hive convertMapJoin MapJoinProcessor
2011-09-06 21:17 0join25.q join26 ... -
hive hive.merge.mapfiles hive.merge.mapredfiles
2011-09-06 19:14 0HiveConf: HIVEMERGEMAPFILES ...
相关推荐
insert into db_name.table_name_1 ( col_1,col2,col3 ) with temp_table_1 as ( select id,col_2 from db_name.table_name_2 where id = condatition ), temp_table_2 as ( select id,col_3 from db_name....
mysql可以使用nevicat导出insert语句用于数据构造,但是hive无法直接导出insert语句。我们可以先打印在hive命令行,然后使用脚本拼装成insert语句,进行数据构造。 手动copy到python脚本进行sql语句构造: def ...
Ambari搭建hadoop环境下,hive的数据导入
hive>insert overwrite table tongji select '2017-07-09',tab1.pv,tab2.uv,tab3.vv,t
hive hive hive hive hive hive hive hive hive hive hive hive
使用hive3.1.2和spark3.0.0配置hive on spark的时候,发现官方下载的hive3.1.2和spark3.0.0不兼容,hive3.1.2对应的版本是spark2.3.0,而spark3.0.0对应的hadoop版本是hadoop2.6或hadoop2.7。 所以,如果想要使用高...
hive-jdbc
2.6.1 Inserting data into Hive Tables from queries 21 2.6.2 Writing data into filesystem from queries 21 2.7 Cli 22 2.7.1 Hive Command line Options 22 2.7.2 Hive interactive Shell Command 24 2.7.3 ...
Hive表生成工具,Hive表生成工具Hive表生成工具
1 Hive 概念与连接使用: 2 2 Hive支持的数据类型: 2 2.1原子数据类型: 2 2.2复杂数据类型: 2 2.3 Hive类型转换: 3 3 Hive创建/删除数据库 3 3.1创建数据库: 3 3.2 删除数据库: 3 4 Hive 表相关语句 3 4.1 Hive ...
2.6.1 Inserting data into Hive Tables from queries 21 2.6.2 Writing data into filesystem from queries 21 2.7 Cli 22 2.7.1 Hive Command line Options 22 2.7.2 Hive interactive Shell Command 24 2.7.3 ...
概率蜂巢 (Alpha) hive 中的概率数据结构和算法。 目前只实现了 hyperloglog。 概要 -- estimate the cardinality of SELECT * FROM src GROUP BY col1, col2; SELECT hll(col1, col2)... INSERT OVERWRITE TABLE hll
《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第...
INTO ... VALUES 添加数据,使用 UPDATE ... SET 修改数据。 5. 索引。之前已经说过, Hive 在加载数据的过程中不会对数据进行任何处理,甚至不会 对数据进行扫描,因此也没有对数据中的某些 Key 建立索引。 Hive ...
# insert overwrite table student_search # select # id, -- id编号 # name, -- 姓名 # age, -- 年龄 # address -- 住址 # from student -- 学生表 # where 1=1 # and name = '张三' -- 姓名为张三 # -------------...
1)Failing because I am unlikely to write too. 2)Caused by: java.lang.OutOfMemoryError: Java heap space ...5)hive on tez 最终insert的表如果使用到union all 时会导致直接查询结果表数据为空的
Hive是一个基于Hadoop的数据仓库工具,它本身并不存储数据,部署在Hadoop集群上,数据是存储在HDFS上的. Hive所建的表在HDFS上对应的是一个文件夹,表的内容对应的是一个文件。它不仅可以存储大量的数据而且可以对...
1、 load data local inpath '/input/files/tb.txt' overwrite into table tb; LOCAL ,就是从HDFS加载 OVERWRITE意味着,数据表已经存在的数据将被删除。省略OVERWRITE,数据文件将会添加到原有数据列表里 2 、...