pig学习笔记

jopen 10年前

Pig是一种探索大规模数据集的脚本语言。

pig是在HDFSMapReduce之上的数据流处理语言,它将数据流处理翻译成多个mapreduce函数,提供更高层次的抽象将程序员从具体的编

程中解放出来。

 

Pig包括两部分:用于描述数据流的语言,称为Pig Latin;和用于运行Pig Latin程序的执行环境。

 

Pig Latin程序有一系列的operationtransformation组成。每个操作或变换对输入进行数据处理,然后产生输出结果。这些操作整体上

描述了一个数据流。Pig内部,这些变换操作被转换成一系列的MapReduce作业。

 

Pig不适合所有的数据处理任务,和MapReduce一样,它是为数据批处理而设计的。如果只想查询大数据集中的一小部分数据,pig的实现

不会很好,因为它要扫描整个数据集或绝大部分。

 

1. Pig的运行

 

Pig是作为客户端运行的程序,你需要将其连接到本地Hadoop或者集群上。当安装Pig之后,有三种执行pig程序的方法:pig脚本

(将程序写入.pig文件中),Grunt(运行Pig命令的交互式shell环境)和嵌入式方式。

 

records = Load ‘sample.txt’ as (year:chararray, temperature:int, quality:int);

 

filter_records = FILTER records BY temperature != 9999 AND quality == 0;

 

group_records = GROUP filter_records BY year;

 

max_temp = FOREACH group_records GENERATE group, MAX(filter_records.temperature);

 

DUMP max_temp;

 

生成上面程序的创建的数据集结构: grunt> ILLUSTRATE max_temp;

 

Pig和数据库的比较:

 

1Pig是数据流编程语言,而SQL是一种描述型编程语言。Pig是相对于输入的一步步操作,其中每一步都是对数据的一个简单的变换;

   SQL语句是一个约束的集合,这些约束结合在一起定义了输出。Pig更像RDBMS中的查询规划器。

 

2RDBMS把数据存储在严格定义了模式的表内,但pig对数据的要求更宽松,可以在运行时定义模式,而且是可选的。

 

3pig对复杂、嵌套数据结构的支持更强;

 

4Pig不支持事务和索引,也不支持随机读和几十毫秒级别的查询,它是针对数据批量处理的。

 

5Hive是介于PigRDBMS之间的系统。HiveHDFS为存储,但是查询语言是基于SQL的,而且Hive要求所有数据必须存储在表中,

   表必须有模式,而模式由Hive管理。但Hive允许为预先存在HDFS中的数据关联一个模式,因此数据加载步骤是可选的。

 

2 .Pig Latin

 

程序有一系列语句构成。操作和命令是大小写无关的,而别名和函数名是大小写敏感的。

 

Pig处理多行语句时,在整个程序逻辑计划没有构造完毕前,pig并不处理数据。

 

Pig Latin关系操作

 

类型 操作 描述

 

加载与存储

  LOAD             将数据从外部文件或其它存储中加载数据,存入关系

  STORE            将一个关系存放到文件系统或其它存储中

  DUMP             将关系打印到控制台

过滤              

  FILTER           从关系中删除不需要的行

  DISTINCT         从关系中删除重复的行

  FOREACHGENERATE 对于集合的每个元素,生成或删除字段

  STREAM           使用外部程序对关系进行变换

  SAMPLE           从关系中随机取样

分组与连接

  JOIN             连接两个或多个关系

  COGROUP          在两个或多个关系中分组

  GROUP            在一个关系中对数据分组

  CROSS            获取两个或更多关系的乘积(叉乘)

排序

  ORDER            根据一个或多个字段对某个关系进行排序

  LIMIT            限制关系的元组个数

合并与分割

  UNION            合并两个或多个关系

  SPLIT            把某个关系切分成两个或多个关系

 

Pig Latin的诊断操作

 

操作 描述

DESCRIBE           打印关系的模式

EXPLAIN            打印逻辑和物理计划

ILLUSTRATE         使用生成的输入子集显示逻辑计划的试运行结果

 

Pig Latin UDF语句

 

REGISTER           Pig运行时环境中注册一个JAR文件

 

DEFINE             UDF、流式脚本或命令规范新建别名

 

Pig Latin命令类型

 

kill               中止某个MapReduce任务

 

exec               在一个新的Grunt shell程序中以批处理模式运行一个脚本

 

run                在当前Grunt外壳程序中运行程序

 

quit               退出解释器

 

set                设置Pig选项

 

Pig Latin表达式

 

类型               表达式                   描述                                示例

字段                 $n                   n个字段                              $0

字段                 f                     字段名f                              year

投影                c.$n,          c.f 在关系、包或元组中的字段        records.$0, records.year

Map查找             m#k               在映射m中键k对应的值                   itemsCoat

类型转换           (t)f               将字段t转换成f类型                     (int)year

函数型平面化  fn(f1, f2, )           在字段上应用函数                  fn isGood(quality)

                 FLATTEN(f)         从包和元组中去除嵌套                   flatten(group)

 

其它的表达式,如算术、条件、比较和布尔型类似其它语言,不详述.

 

Pig Latin类型

 

数据类型包括int (32位有符号整数), long64位有符号整数), float32位浮点数), double64位浮点数),

chararrayUTF16格式的字符数组), Bytearray(字节数组), tuple(元组), bag(包), map(键值对).

 

tuple: (1, hello)         //任何类型的字段序列

 

bag: {(1, hello), (2)}    //元组的无序多重集合(允许重复元组)

 

map: [a hello]          //一组键值对,键必须是字符数组

 

关系和包在概念上是相同的,但是有细微差别。关系是顶层构造结构,只能从上表中的关系操作中创建关系,包必须在某个关系中。

举例:

 

A = {(1, 2), (3, 4)}       //错,使用load语句从文件中加载数据

 

B = A.$0                   //错, B = foreach A generate $0;

 

模式(Schema

 

Pig的一个关系可以有一个关联的模式,模式为关系的字段指定名称和类型。Pig的这种模式声明方式和SQL数据库要求数据加载前必须

先声明模式截然不同,Pig设计的目的是用于分析不包含数据类型信息的纯文本输入文件的。但是尽量定义模式,会让程序运行地更高效。

 

缺点:在查询中声明模式的方式是灵活的,但不利于模式重用。每个查询中维护重复出现的模式会很困难。处理这一问题的办法是写

自己的加载函数来封装模式。

 

SQL数据库在加载数据时,会强制检查表模式中的约束。在pig中,如果一个值无法被强制转换为模式中申明的类型,pig会用空值null代替,

显示一个空位。大数据集普遍都有被损坏的值、无效值或意料之外的值,简单的方法是过滤掉无效值:

 

grunt>good_records = filter records by temperature is not null;

 

另一种技巧是使用SPLIT操作把数据划分成好和坏两个关系,然后在分别进行分析:

 

grunt> split records into good_records if temperature is not null,

 

                          bad_records if temperature is null;

 

grunt> dump good_records;

 

Pig中,不用为数据流中的每个新产生的关系声明模式。大多数情况下,Pig能够根据关系操作的输入关系的模式来确定输出结果的模式。

有些操作不改变模式,如Limit。而Union会自动生成新的模式。

 

如果要重新定义一个关系的模式,可以使用带as子句的FOREACHGENERATE操作来定义输入关系的一部分或全部字段的模式。

 

函数

 

Pig的函数分为计算函数,过滤函数,加载函数和存储函数。

 

计算函数:     AVG, COUNT, CONCAT, COUNTSTAR, DIFF, MAX, MIN, SIZE, SUM, TOKENIZE

 

过滤函数:     IsEmpty

 

加载/存储函数:PigStorage, BinStorage, BinaryStorage, TextLoader, PigDump

 

用户自定义函数(UDF

 

public abstract class EvalFunc<T> {

 

  public abstract T exec(Tuple input) throws IOException;

 

  public List<FuncSpec> getAvgToFuncMapping() throws FrontendException;

 

  public FuncSpec outputSchema() throws FrontendException;

 

}

 

输入元组的字段包含传递给函数的表达式,输出是泛型;对于过滤函数输出就是Boolean类型。建议尽量在

getAvgToFuncMapping()/outputSchema()申明输入和输出数据的类型,以便Pig进行类型转换或过滤不匹配类型的错误值。

 

Grunt>REGISTER pig-examples.jar;

 

          DEFINE isGood org.hadoopbook.pig.IsGoodQuality();

 

加载UDF

 

public LoadFunc {

 

    public void setLocation(String location, Job job);

 

    public InputFormat getInputFormat();

 

    public void prepareToRead(RecordReader reader, PigSplit split);

 

    public Tuple next() throws IOException;

 

}

 

类似HadoopPig的数据加载先于mapper的运行,所以保证数据可以被分割成能被各个mapper独立处理的部分非常重要。从Pig 0.7开始,

加载和存储函数接口已经进行了大幅修改,以便与HadoopInputFormatOutputFormat类基本一致。

 

Grunt>Register loadfunc.jar

 

          Define customLoad org.hadoopbook.pig.loadfunc()

 

          records = load ‘input/sample.txt’ using customLoad(‘16-19, 88-92, 93-93’)

 

                         as (year:int, temperature:int, quality:int);

 

数据处理操作

 

加载和存储数据: store A into out using pigStorage(:) ;  // 将元组存储为以分号分隔的纯文本值

 

过滤数据

 

Foreach  generate  // 逐个处理一个关系中的行,来生成一个新的关系包含部分或全部的字段

 

例子: B = foreach A generate $0, $2+1, Constant;

 

分组与连接数据

 

Join 连接

 

C = join A by $0, B by $1; // 默认为内连接,将A的第一个字段和B的第二个字段连接,输出匹配的字段

 

// 连接后新关系的字段为输入关系的字段和

 

C = join A by $0, B by $1 using replicated; // 分段复制链接,B表中的数据将会放在内存中

 

C= join A by $0 left outer, B by $1;  // 左外连接,左边的没有匹配项也输出

 

Cogroup 多关系分组

 

类似于Join,但默认是外连接,连接键为第一个字段,第二个字段为匹配的第一个关系中的所有元组的包,第三个字段为第二个表中匹配的

所有元组的包。

 

D = COGROUP A by $0, B by $1; // 新的关系的元组个数为连接键的并集(去除重复);

 

D= COGROUP A by $0 inner, B by $1 inner; // 新关系的元组个数是连接键取交集的个数(只输出匹配的)。每个元组中的第二个和

第三个字段都是一个包含一个元组的包

 

COGROUPinnerflatten组合使用相当于实现了内连接:

 

G = COGROUP A by $0 innner, B by $1 inner;

 

H = foreach G generate flatten($1), flatten($2)

 

// Hjoin A by $0, B by $1相同

 

cross叉乘

 

I = cross A, B; // 所有可能m*n

 

Group 分组

 

B = group A by $0; // 第一个字段为group字段,第二个字段为一个包,包含元组的其它字段

 

B = group A by size($1); // 长度为第一个字段,第二个字段为一个包,包含所有长度为第一个字段的元组

 

C = group A all; // 只有一行,第一个字段为all,第二个字段为A中所有元组的包

 

D = group A any; // 对关系中的元组随机分组,对取样非常有用

 

排序数据

 

Pig按什么顺序来处理关系的行是不确定的,只能在输出前排序。

 

B = order A by $0, $1 DESC;

 

C = Limit B 2;

 

组合和切分数据

 

Union可以将几个关系合在一起,即所有元组的集合,当关系的模式不匹配时,新关系就没有模式。

 

C = union A, B;

 

Split 可以将一个关系的元组按某种条件分成几个子集。

 

Split A into B if $0 is null, C if $0 is not null;

 

5 pig实用技巧

 

并行处理: 可以在很多语句中指定reducer的数量

 

   group, join, cogroup, cross, distinct, order

 

(复习:reduce的任务个数设置为稍小于集群中的reduce任务槽数)

 

参数替换:在pig语句中使用$加变量名的方式使用外部定义的变量值,在运行时可以通过"-param input=”设置变量的值,

或者通过"-param_file ”来指定参数文件。

 

动态参数:很多Unix shell用反引号引用的命令来替换实际值,如`date +%Y-%m-%d `会按规定格式输出日期。

这个可以放在-param或参数文件中来动态得到一个值。

 

=====================================================================

                                                    各种SQLPIG中实现

 

我这里以Mysql 5.1.x为例,Pig的版本是0.8

 

同时我将数据放在了两个文件,存放在/tmp/data_file_1/tmp/data_file_2.文件内容如下:

 

tmp_file_1:

Txt代码 

zhangsan    23  1 

lisi    24  1 

wangmazi    30  1 

meinv   18  0 

dama    55  0 

 

tmp_file_2:

Txt代码 

1       a

23     bb

50     ccc

30     dddd

66     eeeee 

 

1.从文件导入数据

 

   1)Mysql (Mysql需要先创建表).

 

       CREATE TABLE TMP_TABLE(USER VARCHAR(32),AGE INT,IS_MALE BOOLEAN);

 

       CREATE TABLE TMP_TABLE_2(AGE INT,OPTIONS VARCHAR(50));   -- 用于Join

 

       LOAD DATA LOCAL INFILE '/tmp/data_file_1'  INTO TABLE TMP_TABLE ;

 

       LOAD DATA LOCAL INFILE '/tmp/data_file_2'  INTO TABLE TMP_TABLE_2;

 

   2)Pig

 

        tmp_table = LOAD '/tmp/data_file_1' USING PigStorage('\t') AS (user:chararray, age:int,is_male:int);

 

        tmp_table_2= LOAD '/tmp/data_file_2' USING PigStorage('\t') AS (age:int,options:chararray);

 

 

 

2.查询整张表

 

   1)Mysql

 

      SELECT * FROM TMP_TABLE;

 

   2)Pig

 

      DUMP tmp_table;

 

3. 查询前50

 

   1)Mysql

 

      SELECT * FROM TMP_TABLE LIMIT 50;

 

   2)Pig

 

       tmp_table_limit = LIMIT tmp_table 50;

 

       DUMP tmp_table_limit;

 

                  

                  

4.查询某些列

 

   1)Mysql

 

       SELECT USER FROM TMP_TABLE;

 

   2)Pig

 

       tmp_table_user = FOREACH tmp_table GENERATE user;

 

       DUMP tmp_table_user;

 

5. 给列取别名

 

    1)Mysql

 

       SELECT USER AS USER_NAME,AGE AS USER_AGE FROM TMP_TABLE;

 

    2)Pig

 

       tmp_table_column_alias = FOREACH tmp_table GENERATE user AS user_name,age AS user_age;

 

       DUMP tmp_table_column_alias;

 

 

 

 6.排序

 

    1)Mysql

 

       SELECT * FROM TMP_TABLE ORDER BY AGE;

 

    2)Pig

 

        tmp_table_order = ORDER tmp_table BY age ASC;

 

        DUMP tmp_table_order;

 

 

 

 7.条件查询

 

    1Mysql

 

        SELECT * FROM TMP_TABLE WHERE AGE>20;

 

    2) Pig

 

        tmp_table_where = FILTER tmp_table by age > 20;

 

        DUMP tmp_table_where;

 

 

 

 8.内连接Inner Join

 

    1)Mysql

 

       SELECT * FROM TMP_TABLE A JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;

 

    2)Pig

 

        tmp_table_inner_join = JOIN tmp_table BY age,tmp_table_2 BY age;

 

        DUMP tmp_table_inner_join;

 

9.左连接Left  Join

 

   1)Mysql

 

       SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;

 

   2)Pig

 

      tmp_table_left_join = JOIN tmp_table BY age LEFT OUTER,tmp_table_2 BY age;

 

      DUMP tmp_table_left_join;

 

10.右连接Right Join

 

     1)Mysql

 

        SELECT * FROM TMP_TABLE A RIGHT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;

 

     2)Pig

 

        tmp_table_right_join = JOIN tmp_table BY age RIGHT OUTER,tmp_table_2 BY age;

 

        DUMP tmp_table_right_join;

 

11.全连接Full Join

 

     1)Mysql

 

        SELECT * FROM TMP_TABLE A  JOIN TMP_TABLE_2 B ON A.AGE=B.AGE

 

            UNION SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE

 

            UNION SELECT * FROM TMP_TABLE A RIGHT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;

 

     2)Pig

 

        tmp_table_full_join = JOIN tmp_table BY age FULL OUTER,tmp_table_2 BY age;

 

        DUMP tmp_table_full_join;

 

 

 

2.同时对多张表交叉查询

 

    1)Mysql

 

       SELECT * FROM TMP_TABLE,TMP_TABLE_2;

 

    2)Pig

 

       tmp_table_cross = CROSS tmp_table,tmp_table_2;

 

       DUMP tmp_table_cross;

 

 

 

3.分组GROUP BY

 

   1)Mysql

 

      SELECT * FROM TMP_TABLE GROUP BY IS_MALE;

 

   2)Pig

 

      tmp_table_group = GROUP tmp_table BY is_male;

 

      DUMP tmp_table_group;

 

14.分组并统计

 

     1)Mysql

 

       SELECT IS_MALE,COUNT(*) FROM TMP_TABLE GROUP BY IS_MALE;

 

     2)Pig

 

        tmp_table_group_count = GROUP tmp_table BY is_male;

 

        tmp_table_group_count = FOREACH tmp_table_group_count GENERATE group,COUNT($1);

 

        DUMP tmp_table_group_count;

 

 

15.查询去重DISTINCT

 

     1)MYSQL

 

        SELECT DISTINCT IS_MALE FROM TMP_TABLE;

 

     2)Pig

 

        tmp_table_distinct = FOREACH tmp_table GENERATE is_male;

 

        tmp_table_distinct = DISTINCT tmp_table_distinct;

 

        DUMP  tmp_table_distinct;