MapReduce + Oracle = Tablefunctions
我们在OpenWorld大会做的其中一件事,是漂亮的展示了如何在通用的Oracle数据库之上实现MapReduce系统。这里基于在这个博客上,显示了很好的实施tablefunctions和映射器等.
但后来我们想,为什么不经过tablefunction代码和MapReduce范例一种映射来告诉大家在Oracle中构建存在,并且如何利用Oracle创建一个数据处理/分析管道...所以这里是一些我们在OpenWorld大会中正在使用的代码.
承上启下,首先我们讨论的标题,高亮有趣的片段与代码注释,然后我们讨论的主体和实际(简单)映射器,减速机代码。该意见是很有希望使这个东西不言自明的...
Scenario
我们在这里做的相当简单。创建一个简单的表,表里有一些记录和循环。减速器是做一个聚集。步骤如下:
CREATE TABLE sls (salesman VARCHAR2(30), quantity number)
/
INSERT INTO sls VALUES('Tom', 100);
INSERT INTO sls VALUES('Chu', 200);
INSERT INTO sls VALUES('Tom', 300);
INSERT INTO sls VALUES('Mike', 100);
INSERT INTO sls VALUES('Scott', 300);
INSERT INTO sls VALUES('Tom', 250);
INSERT INTO sls VALUES('Scott', 100);
commit;
/
Header
create or replace package oracle_map_reduce
is
-- The types we define here is similar to the input files
-- and output files that are used in MR code and are used to
-- store data while we run the actual package.
-- The big advantage is that we do not need to write to disk for
-- intermediate results.
type sales_t is table of sls%rowtype;
type sale_cur_t is ref cursor return sls%rowtype;
type sale_rec_t is record (name varchar2(30), total number);
type total_sales_t is table of sale_rec_t;
-- Next we define the funtions that do the work and make them known
-- to the outside world
-- Note that both mapper and reducer are tablefunctions!
-- Both mapper and reducer are pipelined and executable in parallel
-- the parallel degree is driven from the database side and is not
-- scheduled by the actual program
function mapper(inp_cur in sys_refcursor) return sales_t
pipelined parallel_enable (partition inp_cur by any);
-- the pipelined keyword tells the caller that this function acts as
-- a row source
--
-- parallel_enable indicates that this function can be executed in parallel
-- by the parallel query framework.
function reducer(in_cur in sale_cur_t) return total_sales_t
pipelined parallel_enable (partition in_cur by hash(salesman))
-- Finally we can cluster the results so that similar rows are chunked
-- together when used (note this does not drive distribution over the
-- parallel slaves, which is done by the partition clause shown in the mapper
-- and reducers)
cluster in_cur by (salesman);
end;
/
-- The body of the package has the mapper and the reducer code
-- The header as is shown here by itself defines the signature of
-- the package and declares types and variables to be used in the
-- package.
Body
create or replace package oracle_map_reduce
is
type sales_t is table of sls%rowtype;
type sale_cur_t is ref cursor return sls%rowtype;
type sale_rec_t is record (name varchar2(30), total number);
type total_sales_t is table of sale_rec_t;
function mapper(inp_cur in sys_refcursor) return sales_t
pipelined parallel_enable (partition inp_cur by any);
function reducer(in_cur in sale_cur_t) return total_sales_t
pipelined parallel_enable (partition in_cur by hash(salesman))
cluster in_cur by (salesman);
end;
/
-- The upper part is the header the following part if the body
-- Note the difference in the create statement below as compared
-- to the header
create or replace package body oracle_map_reduce
is
function mapper(inp_cur in sys_refcursor) return sales_t
pipelined parallel_enable (partition inp_cur by any)
is
sales_rec sls%ROWTYPE;
-- construct a record to hold an entire row from the SLS table
begin
-- First loop over all records in the table
loop
fetch inp_cur into sales_rec;
exit when inp_cur%notfound;
-- Place the found records from SLS into the variable
-- end the loop when there are no more rows to loop over
pipe row (sales_rec);
-- by using pipe row here we are giving back rows in streaming
-- fashion as you would expect from a table
-- this in combination with pipelined in the definition allows
-- the pipelining (e.g. giving data as it comes on board) of
-- a table function
end loop;
return;
-- Return is a mandatory piece that allows the consumer of data (our reducer
-- in this case)
-- to ensure all data has been sent. After return the rowsource is exhausted
-- and no more data comes from this function.
end mapper;
-- The above mapper does in effect nothing other than streaming data
-- partitioned
-- over to the next step. In MR the stream would be written to a file and then -- redistributed to the reducers
-- The reducer below computes and emits the sales figures
function reducer(in_cur in sale_cur_t) return total_sales_t
pipelined parallel_enable (partition in_cur by hash(salesman))
-- The partition by clause indicates that all instances of a particular
-- salesman must be sent to one instances of the reducer function
cluster in_cur by (salesman)
-- The cluster by clause tells the parallel query framework to cluster
-- all instances of a particular salesman together.
IS
sale_rec sls%ROWTYPE;
total_sale_rec sale_rec_t;
-- two containers are created, one as input the other as output
begin
total_sale_rec.total := 0;
total_sale_rec.name := NULL;
-- reset the values to initial values
loop
fetch in_cur into sale_rec;
exit when in_cur%notfound;
-- some if then logic to ensure we pipe a row once all is processed
if (total_sale_rec.name is null) then
-- The first instance is arriving, set the salesman value to that
-- input value
-- update 0 plus the incoming value for total
total_sale_rec.name := sale_rec.salesman;
total_sale_rec.total := total_sale_rec.total +
sale_rec.quantity;
elsif ( total_sale_rec.name <> sale_rec.salesman) then
-- We now switch sales man, and are done with the first
-- salesman (as rows are partitioned and clustered)
-- First pipe out the result of the previous salesman we
-- processed
-- then update the information to work on this new salesman
pipe row (total_sale_rec);
total_sale_rec.name := sale_rec.salesman;
total_sale_rec.total := sale_rec.quantity;
else
-- We get here when we work on the same salesman and just add
-- the totals, the move on to the next record
total_sale_rec.total := total_sale_rec.total +
sale_rec.quantity;
end if;
end loop;
-- The next piece of code ensures that any remaining rows that
-- have not been piped out
-- are piped out to the consumer. If there is a single salesman,
-- he is only piped out
-- in this piece of logic as we (in the above example code) only
-- pipe out upon a change
-- of salesman
if total_sale_rec.total<> 0 then
pipe row (total_sale_rec);
end if;
return;
-- Again, we are now done and have piped all rows to our consumer
end reducer;
end;
/
在一个SQL 查询中使用它
花了一点时间,但是一旦你看到查询,你就可以知道如何建立一系列预定义的程序,然后你可以实现
串在一起获得一组结果集。
select *
from table(oracle_map_reduce.reducer(cursor(
select * from table(oracle_map_reduce.mapper(cursor(
select * from sls))) map_result)));
所有的逻辑管道数据都到下一个消费者,并且所有都是并行运行的。这使得它适合类似重型数据库ETL(我们首先为了它发明的)的任何东西,并且任何需要应用大量逻辑到记录的东西(像分析处理)。