Thursday, May 28, 2015

Loading JSON Files with Nested Arrays from Azure Blob Storage into Hive Tables in HDInsight

     In my previous post I wrote about how to upload JSON files into Azure blob storage. In this post, I'd like to expand upon that and show how to load these files into Hive in Azure HDInsight.

     
     The 2 JSON files I'm going to load are up in my blob storage, acronym/abc.txt and acronym/def.txt:


Figure 1. JSON Files in Azure Blob Storage

 Both of these files came from a public RESTful API:

http://www.nactem.ac.uk/software/acromine/dictionary.py?sf=abc

and

http://www.nactem.ac.uk/software/acromine/dictionary.py?sf=def


The format for these 2 files looks something like this:


{  
   
"a":[  
      
{  
         
"sf":"ABC",
         
"lfs":[  
            
{  
               
"lf":"ATP-binding cassette",
               
"freq":1437,
               
"since":1990,
               
"vars":[  
                  
{  
                     
"lf":"ATP-binding cassette",
                     
"freq":1057,
                     
"since":1990
                  
}
               
]
            
}
         
]
      
}
   
]
}

     First thing I'd like to do is create an external table in Hive, where I'm going to "load" the raw JSON files, so we can play around a little with some of the out of box Hive functions for JSON. In the Hive Query Editor 


DROP TABLE IF EXISTS AcronymRaw;

CREATE EXTERNAL TABLE AcronymRaw(json string)

STORED AS TEXTFILE LOCATION 'wasb://jymbo@jymbo.blob.core.windows.net/acronym/';

If we execute the following we can see the raw JSON:


SET hive.execution.engine=tez;


SELECT * FROM  AcronymRaw;

We will get back the raw JSON from the files. (I have to add the tez switch since I did not configure my cluster to use this engine by default). If I want to dive into the first array of my JSON objects and see the acronyms I can use a lateral view, to flatten out the hierarchy, combined with a  json_tuple function:

SET hive.execution.engine=tez;

SELECT
  b.sf
 FROM AcronymRaw AS a lateral view json_tuple(get_json_object(a.json, '$.a[0]'),'sf') b AS sf

Will give me:

Figure 2. JSON Acronym Results
If we want to step into the first array of the next level in the hierarchy, lfs, we can add another lateral view to the statement:

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

SELECT

  b.sf,
  c.lf,
  c.freq,
  c.since
 FROM AcronymRaw as a lateral view json_tuple(get_json_object(a.json, '$.a[0]'), 'sf', 'lfs') b AS sf, lfs

lateral view json_tuple(get_json_object(concat('{"a":', b.lfs, '}'), '$.a[0]'), 'lf', 'freq', 'since') c AS lf, freq, since;


Figure 3. First Nested JSON Array


     Now to get this JSON loaded into an external table, that will match the structure of the JSON, we're going to incorporate a 3rd party java library that can deserialize the JSON for us. This will make querying the data much easier. You can download and build the project using Maven from this github link, or you can be lazy, like me, and get the JARS from here. Using Visual Studio you can upload these files to blob storage so they can be referenced in Hive:


Figure 4. Visual Studio Server Explorer
       
     After adding these files to Blob storage we can reference them in our Hive statements when creating and referencing tables using this library:


ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

DROP TABLE IF EXISTS AcronymNonNormalized;

CREATE EXTERNAL TABLE AcronymNonNormalized (
  a array<
    struct<sf:string,lfs:array<
          struct<lf:string, freq:int, since:int, vars:array<
                struct<lf:string, freq:int, since:int>
                  >
                >
              >
            >
          >
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'

STORED AS TEXTFILE LOCATION 'wasb://jymbo@jymbo.blob.core.windows.net/acronym/';

     Now that this structure is in a Hive table, we can query it much easier than in the raw JSON format. In this query we can explode the lfs arrays and project them alongside sf:

ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

SELECT
 
    a[0].sf,
    lfs.lf,
    lfs.freq,
    lfs.since
 
FROM AcronymNonNormalized LATERAL VIEW EXPLODE( a[0].lfs) lfstable AS lfs;

Figure 5. Exploded Results
     We can even drill into the second tier array vars using another lateral view and explode:


ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

SELECT
 
    a[0].sf,
    lfs.lf as mainlf,
    vars.lf,
    vars.freq,
    vars.since
 
  FROM AcronymNonNormalized LATERAL VIEW EXPLODE( a[0].lfs) lfstable AS lfs
  LATERAL VIEW EXPLODE(lfstable.lfs.vars) vartable AS vars;

Figure 6. Second Nested Array
     
     So now that we've figured out how to traverse the arrays in the table, we could potentially normalize this table and create a schema similar to this:


Figure 7. Normalized Schema


 The first table we need to populate will be AcronymSF:

   ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

DROP TABLE IF EXISTS AcronymSF;

CREATE TABLE AcronymSF
(
     sf string -->pk
);

INSERT OVERWRITE TABLE AcronymSF
SELECT

    a[0].sf

FROM AcronymNonNormalized;

SELECT * FROM AcronymSF;

Figure 8. Acronym SF Contents


     For us to populate the rest of the tables, we will need to produce unique values for the arrays in these hierarchies that will be part of a composite primary key.   To do this we will take advantage of a Hive function called posexplode that will generate an auto integer for each array so we can uniquely identify them. The first table we want to create and populate like this will be the AcronymLFS table:

ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

DROP TABLE IF EXISTS AcronymLFS;

CREATE TABLE AcronymLFS
(
     sf string, -->pk
     lfs int, -->pk
     lf string,
     freq int,
     since int

);

INSERT OVERWRITE TABLE AcronymLFS
SELECT

    a[0].sf,
    lfstable.seq AS lfs,
    lfs.lf,
    lfs.freq,
    lfs.since

FROM AcronymNonNormalized LATERAL VIEW POSEXPLODE( a[0].lfs) lfstable AS seq, lfs;

SELECT * FROM AcronymLFS;

Figure 9. AcronymLFS Contents
     
     To load the AcronymVARS table we need to use the same method we used for AcronymLFS, but one level down on the JSON hierarchy:

ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

DROP TABLE IF EXISTS AcronymVARS;

CREATE TABLE AcronymVARS
(
     sf string, -->pk
     lfs int, -->pk
     vars int, -->pk
     lf string,
     freq int,
     since int

);

INSERT OVERWRITE TABLE AcronymVARS
SELECT

    a[0].sf,
    lfstable.seq as lfs,
    vartable.seq as vars,
    vars.lf,
    vars.freq,
    vars.since

FROM AcronymNonNormalized LATERAL VIEW POSEXPLODE( a[0].lfs) lfstable AS seq, lfs;
LATERAL VIEW POSEXPLODE(lfstable.lfs.vars) vartable AS seq, vars;

SELECT * FROM AcronymVARS;

Figure 10. AcronymVARS Content

     So with this we have normalized our JSON. If we execute a standard SQL query in Hive like this:

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

SELECT
 
     tsf.sf,
     tlfs.lf AS mainlf,
     tvars.lf,
     tvars.freq,
     tvars.since
      
FROM AcronymSF tsf
JOIN AcronymLFS tlfs ON tsf.sf=tlfs.sf
JOIN AcronymVARS tvars ON tlfs.sf=tvars.sf AND tlfs.lfs=tvars.lfs;

We get this:

Figure 11. SQL Join Result
     
     As you can see from these results, they match exactly the results from the query in figure 6. Now for performance purposes you would want to leave all this data in the same table(nested arrays and all). But if you wanted to set this data up to be moved to a relational database with sqoop, going through this technique may be useful.

2 comments: