# 基础使用

# 数据类型

官方文档 LanguageManual Types

# 复杂类型使用示例

CREATE TABLE movies(
    participants ARRAY<string>,
    release_dates MAP<string,timestamp>,
    studio_addr STRUICT<state:string,city:string,zip:string,streetnbr:int,streetname:string,unit:string>,
    complex_participants MAP<string,STRUCT<address:string,attributes MAP<string,string>>>
    misc UNIONTYPE<int,string,ARRAY<double>>
);
select movie_name,
  release_dates["USA"],
  studio_addr.zip,
  complex_participants["Leonardo Dicaprio"].attributes["fav_color"],
  misc
from movies;

# Array

创建包含Array类型的表

create table mobilephones (
    id string,
    title string,
    cost float,
    colors array<string>,
    screen_size array<float>
);

insert into table mobilephones
select 
    "redminote7", "Redmi Note 7", 300,
    array("white", "silver", "black"), array(float(4.5))
UNION ALL
select 
    "motoGplus", "Moto G Plus", 200, array("black", "gold"),
    array(float(4.5), float(5.5));

select * from mobilephones;

select id, colors from mobilephones;

select id, colors[0] from mobilephones;

执行数组类型字段的分隔符并导入数据

create table mobilephones (
id string,
title string,
cost float,
colors array<string>,
screen_size array<float>
)
row format delimited fields terminated by ','
collection items terminated by '#';

load data local inpath 'mobilephones.csv'
into table mobilephones;

查询数组

SELECT name,subordinates[0] FROM employees;

collect_set (会去重) collect_list(不去重)

    select cookie_id,collect_set(ad_id) as orders
    from click_log
    group by cookie_id;

-- 输出
    cookie_id        orders
    11        ["ad_101","ad_104"]
    22       ["ad_104","ad_102","ad_103"]

impala中不支持数组,但是可以用group_concat函数达到同样的效果

select 
    cookie_id,group_concat(ad_id,'|') as  orders
from click_log
group by cookie_id;

LATERAL VIEW

这里catalogs是数组. 通过LATERAL VIEW 语句让数组横向展示.

select click.cookie_id,ad.catalog from click_log click
left outer join (
  select ad_ikd,catalog from ad_list LATERAL VIEW OUTER expolode(catalogs) t AS catalog
) ad
on (click.ad_id = ad.ad_id);

数组排序

select ad_id,sort_array(catalogs) from ad_list;

查询数组是否包含某值

select ad_id,catalogs from ad_list where array_contains(catalogs,'catalog1');

# Map

create table mobilephones (
    id string,
    title string,
    cost float,
    colors array<string>,
    screen_size array<float>,
    features map<string, boolean>
)
row format delimited fields terminated by ','
collection items terminated by '#'
map keys terminated by ':';

Edit the file to add features

load data local inpath 'mobilephones.csv'
into table mobilephones;

select id, features[' camera'] from mobilephones;

创建Map: mapstr_to_map 取key,value: map_keysmap_values 使用: maplateral view 查看大小:size(Map<K.V>) 查看是否包含某个key:array_contains(a,'test') from t1

查找key中包含item8的行

select * from f_orders where array_contains(map_keys(items),'item8');

侧向显示items

select user_id,order_id,item,amount from f_orders LATERAL VIEW explode(items) t AS item,amount;

侧向展示,并显示在原来map中的位置,貌似只能用数组

select username, pos, ts, page_id from ts_int
lateral view posexplode(visits) t as pos, ts, page_id;

将两个字段之前相差的数字进行行转列

select  t.f1
    ,t.start_r - pe.i as seq_no 

from   (select 'ABC' as f1,62971 as start_r,62937 as end_r)                t 
    lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
;

# Struct

create table mobilephones (
    id string,
    title string,
    cost float,
    colors array<string>,
    screen_size array<float>,
    features map<string, boolean>,
    information struct<battery:string,camera:string>
)
row format delimited fields terminated by ','
collection items terminated by '#'
map keys terminated by ':';

--Edit the file to add information

load data local inpath 'mobilephones.csv'
into table mobilephones;

select id, features, information
from mobilephones;
select id, features['camera'], information.battery
from mobilephones;
select id, features['camera'] as CameraPresent, information.battery
from mobilephones;

# 数据库操作

创建数据库的时候附带注释

CREATE DATABASE <database name> COMMENT
'Hold all secret information';

设置命令行显示当前使用的数据库

set hive.cli.print.current.db=true;

创建数据库的时候附带属性值

CREATE DATABASE bihell WITH DBPROPERTIES
('creator'='haseo','Date'='2016-07-10');

显示数据库信息

DESCRIBE DataBase <database_name>;

显示数据库的扩展信息

DESCRIBE DataBase extended <database_name>;

删除数据库

DROP DATABASE IF EXISTS <database_name>;

删除包含表的数据库

DROP DATABASE IF EXISTS <database_name> CASCADE;

修改数据库属性

ALTER DataBase bihell set DBPROPERTIES
('edited-by'='Haseo','Date'='2016-07-10');

# 表操作

创建表

CREATE TABLE IF NOT EXISTS mydb.employee(
  Name STRING COMMENT 'Employee name',
  Salary FLOAT COMMENT 'Employee salary',
  Subordinates ARRAY<STRING> COMMENT 'Names of subordinates',
  deductions MAP<STRING,FLOAT> COMMENT 'deductions',
  address STRUCT<street:STRING,city:STRING,state:STRING,zip:INT> COMMENT 'Address')
)
COMMENT 'Description of the table'
PARTITIONED BY (dt STRING, country STRING);
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
COLLECTION ITEMS TERMINATED BY '|' --指定数组类型的数据用什么符号分割
MAP KEYS TERMINATED BY '\t'
STORED AS TEXTFILE;
--创建外部表
CREATE EXTERNAL TABLE IF NOT EXISTS mydb.employee (
  Name STRING COMMENT 'Employee name',
  Salary FLOAT COMMENT 'Employee salary',
  Address STRING COMMENT 'Address')
COMMENT 'Description of the table'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
Location '/user/cloudera/hdfs_dir';

创建表的时候忽略表头

CREATE TABLE Employee (Emp_Number Int,Emp_Name String,Emp_sal Int) row format delimited fields terminated BY,lines terminated BY ‘\n’ tblproperties(“skip.header.line.count”=1);

通过已存在表的Schema创建新表

CREATE TABLE IF NOT EXISTS mydb.new_employee LIKE mydb.employee;

显示单个字段的Schema信息

Describe <table name>.<column name>;

显示表的详细信息

Describe FORMATTED <table name>;

删除表

DROP TABLE IF EXISTS <table_name>;

修改表名

ALTER TABLE <table_name> RENAME TO <new table_name>;

修改字段

--语法
ALTER TABLE <table_name> CHANGE COLUMN <column name> <new column name> <data type>;
--将id字段放在title字段后面,注意只是字段位置的改变,内容是不变的。
alter table freshproducts change column id id string after title;

增加字段

ALTER TABLE employee ADD COLUMNS (app_name STRING,session_id LONG);

替换字段

ALTER TABLE employee REPLACE COLUMNS (app_name STRING COMMENT 'old application name',app_id String COMMENT 'new application id');

修改表属性

ALTER TABLE employee SET TBLPROPERTIES(
'notes'='The process id is no longer captured');
)

添加注释

Alter Table Hive_Test_table SET TBLPROPERTIES ('comment' = 'This is a new comment');

修改表格存储格式

ALTER TABLE employee SET FILEFORMAT SEQUENCEFILE;

通过查询创建新表

CREATE TABLE target
AS
SELECT col1, col2
FROM source;

创建CSV表

DROP TABLE IF EXISTS default.customphone;
CREATE TABLE customphone(EMpID string,EMPName string,showedPhone string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
TBLPROPERTIES  ('skip.header.line.count'='1');

LOAD DATA LOCAL INPATH 'customPhone.csv' OVERWRITE  INTO TABLE default.customphone;

# 视图操作

创建视图

CREATE VIEW IF NOT EXISTS shipments(firstname,lastname)
COMMENT 'firstname and lastname'.
TBLPROPERTIES('creator'='Haseo')
AS SELECT...;

修改视图

ALTER VIEW shipments
SET TBLPROPERTIES
('created_at' = 'some_timestamp');

# 数据导入导出

# 合并文本表格的文本文件

hadoop fs -text path_* >  000000

# 插入数据

insert into table score1
partition (openingtime=201509)
values (21,1,'76'),(22,2,'45');

# 从已有表中选择数据插入其他表

INSERT OVERWRITE TABLE target
PARTITION (dt='2001-01-01')
SELECT col1, col2
FROM source;

# 多表插入

FROM records2
INSERT OVERWRITE TABLE stations_by_year
SELECT year, COUNT(DISTINCT station)
GROUP BY year
INSERT OVERWRITE TABLE records_by_year
SELECT year, COUNT(1)
GROUP BY year
INSERT OVERWRITE TABLE good_records_by_year
SELECT year, COUNT(1)
WHERE temperature != 9999 AND quality IN (0, 1, 4, 5, 9)
GROUP BY year;

# 从文件导入数据

--本地导入
load data local inpath '/data/tmp/score_7.txt'
overwrite into table score PARTITION (openingtime=201507);
--群集导入
load data inpath '/tmp/input/score_8.txt'
overwrite into table score partition(openingtime=201508);

# 导出数据

INSERT OVERWRITE LOCAL DIRECTORY '/tmp/ca_employees'
SELECT name,salary,address
FROM employees WHERE state='CA'

# 命令行结果输出到文件

hive -S -e 'select a,b from t1'>results.txt

# 查询

# Grouping Sets,Cube,Rollup

Grouping Sets

    SELECT a, b, c, SUM(d) FROM t1 GROUP BY a, b WITH CUBE
    --相当于
    SELECT a, b, SUM(c) FROM t1 GROUP BY a, b
    UNION ALL
    SELECT a, NULL, SUM(c) FROM t1 GROUP BY a
    SELECT a, b, SUM(c) FROM t1 GROUP BY a, b GROUPING SETS (a,b,())
    --相当于
    SELECT a, NULL, SUM(c) FROM t1 GROUP BY a
    UNION ALL
    SELECT NULL, b, SUM(c) FROM t1 GROUP BY b
    UNION ALL
    SELECT NULL, NULL, SUM(c) FROM t1

Cube

    SELECT a, b, c, SUM(d) FROM t1 GROUP BY a, b WITH CUBE
    --相当于
    SELECT a, b, c, SUM(d) FROM t1 GROUP BY a, b, c GROUPING SETS
    ((a,b,c),(a,b),(b,c),(a,c),a,b,c,())

Rollup

    SELECT a, b, c, SUM(d) FROM t1 GROUP BY a, b WITH ROLLUP
    --相当于
    SELECT a, b, c, SUM(d) FROM t1 GROUP BY a, b, c GROUPING SETS
    ((a,b,c),(a,b),a,())

# 显示函数

SHOW FUNCTIONS;
DESCRIBE FUNCTION length;

# Lateral View

Lateral View通过UDTF函数作为输入,然后提供组合的查询结果。

--语法
SELECT a,b,columnAlias
FROM baseTable
LATERAL VIEW UDTF(expression) tableAlias AS columnAlias;

--例子
SELECT a,b,col1,col2
FROM baseTable
LATERAL VIEW UDTF(x )t1 AS col1
LATERAL VIEW UDTF(col1)52 AS col2;

# RLIKE 语句

RLIKE可以让我们可以在Hive中使用Java正则表达式进行查询。

SELECT name,address.street
FROM employees WHERE address.street RLIKE '.*(Chicago|Ontario).*';

通过正则选择字段

SELECT语句支持通过正则表达式选择字段,用的是Java的正则语法,可以通过该网站进行正则的验证http://www.fileformat.info/tool/regex.htm,以下语句表示选择除ds和hr之外的所有字段。

SELECT `(ds|hr)?+.+` FROM sales

TIP

实际应用中不建议使用,维护和理解都会有问题。

# 分区

指定分区导入

LOAD DATA LOCAL INPATH 'input/hive/partitions/file1'
INTO TABLE logs
PARTITION (dt='2001-01-01', country='GB');

动态分区插入 默认情况下这个特性是关闭的,需要开启以下配置

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE target
PARTITION (dt)
SELECT col1, col2, dt
FROM source;

显示分区信息

show partitions logs;

修复分区表

msck repair table order_created_partition;

添加分区

ALTER TABLE page_views ADD IF NOT EXISTS
PARTITION (dt=2013-09-09, applicationtype=‘iPhone’) LOCATION ‘/somewhere/on/hdfs/data/iphone/currentPARTITION (dt=2013-09-08, applicationtype=‘iPhone’) LOCATION ‘/somewhere/on/hdfs/data/prev1/iphone’;

删除分区

ALTER TABLE log_messages DROP IF EXISTS PARTITION(year=2015,month=1,day=2);

分区限制查询

ALTER TABLE log_messages
PARTITION(year=2015,month=1,day=1)
ENABLE OFFLINE;

防止删除分区

ALTER TABLE log_messages
PARTITION(year=2015,month=1,day-1)
ENABLE NO_DROP;

修改某分区的文件格式

ALTER TABLE XXX PARTITION (EVENT_MONTH='2014-06') SET FILEFORMAT TEXTFILE;

# JOIN

HIVE的JOIN类型有很多,Inner Join、LEFT OUTER Join、RIGHT OUTER Join、FULL OUTER Join、LEFT SEMI Join、Cartesian Product Join、Map-side Join。挑几个说下。

# LEFT SEMI JOIN

返回符合ON谓词条件的左边表的记录。比Inner JOIN效率高

SELECT a.val FROM a LEFT SEMI JOIN b ON (a.key = b.key);

--与下面语句等价
SELECT a.val FROM a WHERE a.key IN (SELECT b.key FROM b) - Not Supported
SELECT a.val FROM a WHERE EXISTS (SELECT 1 FROM b WHERE b.key = a.key) - Not Supported

# Cartesian Product Join

左右两边笛卡尔积

SELECT * FROM CUSTOMERS JOIN ORDERS;

# Map-side Join

根据mapjoin的计算原理,MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配。

SELECT /*+MAPJOIN(o)*/ c.ID,c.NAME,o.AMOUNT
FROM CUSTOMERS c JOIN ORDERS o
ON (c.ID=o.CUSTOMER_ID)

# 性能优化

# DISTRIBUTE BY with SORT BY

Order by 能够预期产生完全排序的结果,但是它是通过只用一个reduce来做到这点的。所以对于大规模的数据集它的效率非常低。在很多情况下,并不需要全局排序,此时可以换成Hive的非标准扩展sort by。Sort by为每个reducer产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,通常是为了进行后续的聚集操作。Hive的distribute by 子句可以做这件事。

--根据年份和气温对气象数据进行排序,以确保所有具有相同年份的行最终都在一个reducer分区中
From record2
select year, temperature
distribute by year
sort by year asc, temperature desc;

# CLUSTER BY

CLUSTER BY同事具备DISTRIBUTE BY 和 SORT BY的功能,但是只能正序排序。无法指定倒序。算是上面查询的精简版。

SELECT s.emp_id,s.emp_name,s.emp_mobile,s.city
FROM employee3 s
CLUSTER BY s.city;

# 开启vectorization

该特性自Hive 0.13引入,开启后一次处理1024行,而不是1行

set hive.vectorized.execution = ture
set hive.vectorized.execution.enabled = true

# 开启CBO(Cost based optimization)

Hive在提交最终执行之前优化每个查询的逻辑和物理执行计划。这些优化不是基于查询成本的。 最近Hive添加了CBO特性,基于查询成本执行进一步优化,比如如何排序连接,执行哪种类型的连接,并行度和其他灯。

要使用CBO,请在查询开头设置以下参数:

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;

Hive运行“analyze”命令为CBO准备数据,以收集有关我们要使用CBO的表的各种统计信息

# Group by 优化

hive.groupby.skewindata=true --如果是group by过程出现倾斜应该设置为true
hive.groupby.mapaggr.checkinterval=100000  --当group的键对应的记录条数超过这个值时就会进行优化。

# Job合并文件

Job合并输入小文件

以下参数用来限制map大小的

set mapreduce.input.fileinputformat.split.maxsize=5000;
set mapreduce.input.fileinputformat.split.minsize=5000;
--上面是新版Hive的参数,下面是老版Hive的参数
set mapred.max.split.size=5000;
set mapred.min.split.size=5000;

如果一个文件有10,000字节,那么以上设置会让Job产生两个Map

以下参数可以用来减少mapper运行完毕后reducer的数量,如果设置为true,则会在map job完成后合并小文件.

set hive.merge.mapfiles=true;

另外有一个hive.input.format,属性也可以减少map的数量,不过这里有个弊端,如果小文件分散在各节点上则无法合并

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
--默认值,Hive会合并所有小于split.minsize的文件
--老的参数值为org.apache.hadoop.hive.ql.io.Hiv eInputFormat现在已基本弃用.

另外set mapreduce.job.maps = xx可以直接设置查询使用多少map,不过如果参数mapreduce.jobtracker.address的值为local,则会忽略该设置.

Job合并输出小文件

set hive.merge.smallfiles.avgsize=256000000;当输出平均大小小于该值,启用新的job合并文件。
set hive.merge.size.per.task = 64000000;合并后的文件大小。

# 数据倾斜设置

出现数据倾斜可以尝试以下参数,如果不行还是随机数等方法搞定。

set hive.optimize.skewjoin=true;
set hive.skewjoin.sky=100000;

# 谓词下推(Predicate pushdown)

以下语句,按照正常情况会先进行join,然后在进行id的过滤. 假设在这个场景中,Join的id实际上没多少是相匹的,此时后面的过滤其实作用甚微. 开启谓词下推后,可以先进行id的过滤然后再jion从而增加性能.

set hive.optimize.ppd=true;

# 内存溢出相关设置

set hive.exec.parallel=true;
set mapred.max.split.size=128000000;
set mapreduce.map.memory.mb=6144;
set mapreduce.map.java.opts=-Xmx6144m;
set hive.exec.reducers.bytes.per.reducer=536870912;
set mapreduce.reduce.memory.mb=8192;
set mapreduce.reduce.java.opts=-Xmx8192m;

# 其他

# 运行指定文件中的SQL

beeline -u "jdbc:hive2://xxx.xxx.com:10000" --hiveconf rptdate=`date --date='1 days ago' "+%Y%m%d"` -n hdfs -f /xx/xx/xx.SQL

其中--hiveconf rptdate用来把变量带入SQL脚本,SQL脚本中使用${hiveconf:rptdate}获变量值。

# HIVE 变量

Hive一共有四种变量类型

hivevar: -d,--define,-hivevar set hivevar:name=value hiveconf: --hiveconf set hiveconf:property=value system: set system:property=value env: set env:property=value

$ hive -d srctable=movies
hive> set hivevar:cond=123;
hive> select a,b,c from pluralsight.${hivevar:srctable} where a = ${hivevar:cond};

$ hive -v -d src=movies -d db=pluralsight -e 'select * from ${hivevar:db}.${hivevar:src} LIMIT 100;

TIP

实际应用中对 Hive 连接进行封装,查询语句用封装语言的变量拼接更为便利。

# 创建hive目录和权限

hadoop fs –mkdir /tmp
hadoop fs –chmod a+w /tmp
hadoop fs –mkdir /user/hive/warehouse
hadoop fs –chmod a+w /user/hive/warehouse

# 限制用户查询

设置以下选项用户查询数据的时候必须指定分区和返回数量

配置文件中

<property>
  <name>hive.mapred.mode</name>
  <value>strict</value>
</property>

或者

set hive.mapred.mode=strict;

# 压缩

检测集群已经开启了哪些压缩

hbase org.apache.hadoop.util.NativeLibraryChecker

设置Snappy压缩

SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

或者

<property>
  <name>hive.exec.compress.output</name>
  <value>true</value>
</property>
<property>
  <name>mapred.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
<property>
  <name>mapred.output.compression.type</name>
  <value>BLOCK</value>
</property>