Tuesday, July 4, 2017

Recovering From an Avro to Parquet Conversion Failure in Streamsets Data Collector

     In version 2.6 of Streamsets Data Collector, they added data drift support for the parquet file format. This means that if your source system changes, i.e. a new column has been added to the table/flat file/etc., Data Collector will update the target Hive table with this new column before the pipeline starts sending data to it. They have a walkthrough here that will help explain how it all works. In previous versions this was only available for the avro file format.

     To make the data drift solution work for parquet, Data Collector will first dump the data in an Avro format, then will use a MapReduce executor to kick off a mapreduce job ,via YARN, that will convert the avro file to parquet. It does this asynchronously, thus the pipeline reports it has finished and the mapreduce job goes off to run on the cluster. This is fine and all if it works. But........sometimes it doesn't. There are a myriad of reasons why your mapreduce job could fail, none of which matter at the moment when you realize that you just finished an initial load of some monster from the source system that may have taken hours to days to pull down into hadoop and now you have to run it all over again. Well my friend...put down the gun...we have one more shot to save the patient here.

     For this fix, we are going to have to convert the file from avro to parquet "manually".To do this we are going to use the code from this github project. The unfortunate thing is that this project is not being maintained and you will get some dependency issues when attempting to build it in maven. So I had to do some surgery on the project and updated some of the dependencies for avro and hadoop to get it working for CDH 5.11. If you want to build the project yourself, here is the pom:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cloudera.science</groupId> <artifactId>avro2parquet</artifactId> <version>0.1.0</version> <packaging>jar</packaging> <name>avro2parquet</name> <properties> <hadoop.version>2.0.0-cdh4.1.2</hadoop.version> <guava.version>13.0.1</guava.version> <junit.version>4.8.2</junit.version> </properties> <dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-mapred</artifactId> <version>1.7.7</version> <classifier>hadoop2</classifier> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <version>1.3</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.12</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-avro</artifactId> <version>1.0.0</version> <exclusions> <exclusion> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-column</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.11.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.11.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.11.0</version> <exclusions> <exclusion> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <repositories> <repository> <id>cloudera-repos</id> <name>Cloudera Repos</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- bind to the packaging phase --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>

   If you are way too busy to be building java projects you can download the jar from here. So now that we have the jar built, here is a walkthrough of how to use it.


   In this example we will use this pipeline that takes data from a single table in a relational database and moves it to an external table in Hive, a.k.a. will create a parquet file in HDFS.

Figure 1. Streamsets Pipeline

     When the pipeline finishes, and the MapReduce executor kicks off, in mid-flight conversion you get a failure and your HDFS dir has something like this in there:

Figure 2. Failed Parquet Conversion
     
     If the conversion was successful the Avro file would be gone(if you made that selection in the MapReduce executor) and you would see a file with a .parquet file extension. Here you see the avro_to_parquet_tmp….. a relic of a failed mapreduce job. From here, we need to look at the Data Collector log for this pipeline to retrieve the YARN application id for the mapreduce job:

Figure 3. Streamsets Data Collector Log

Log into Hue and click on the Job Browser in the top right of the screen:

Figure 4. Hue Job Browser


In that screen filter on Username sdc and the text should be the application id from the log:


Figure 5. Hue Job Browser Application Filter


Click on the log icon to access the logs for this failed job. On the syslog tab you should see something like this:


Figure 6. Syslog for Application


This is the JSON representation of the avro schema for the file. We will need to copy this text:
{"type":"record","name":"order_results_target","namespace":"clarity_dbo","fields":[{"name":"line","type":["null","int"],"default":null},{"name":"ord_date_real","type":["null","double"],"default":null},{"name":"order_proc_id","type":["null","double"],"default":null},{"name":"cm_ct_owner_id","type":["null","string"],"default":null},{"name":"comp_anl_inst_tm","type":["null","double"],"default":null},{"name":"comp_obs_inst_tm","type":["null","double"],"default":null},{"name":"comp_res_technicia","type":["null","string"],"default":null},{"name":"comp_snomed_src_c","type":["null","int"],"default":null},{"name":"compon_lnc_id","type":["null","double"],"default":null},{"name":"compon_lnc_src_c","type":["null","int"],"default":null},{"name":"component_comment","type":["null","string"],"default":null},{"name":"component_id","type":["null","double"],"default":null},{"name":"component_type_c","type":["null","int"],"default":null},{"name":"data_type_c","type":["null","int"],"default":null},{"name":"interface_yn","type":["null","string"],"default":null},{"name":"lab_status_c","type":["null","int"],"default":null},{"name":"lrr_based_organ_id","type":["null","double"],"default":null},{"name":"numeric_precision","type":["null","double"],"default":null},{"name":"ord_end_date_real","type":["null","double"],"default":null},{"name":"ord_num_value","type":["null","double"],"default":null},{"name":"ord_raw_value","type":["null","string"],"default":null},{"name":"ord_value","type":["null","string"],"default":null},{"name":"organism_quantity","type":["null","string"],"default":null},{"name":"organism_quantity_unit","type":["null","string"],"default":null},{"name":"pat_enc_csn_id","type":["null","double"],"default":null},{"name":"pat_enc_date_real","type":["null","double"],"default":null},{"name":"pat_id","type":["null","string"],"default":null},{"name":"raw_high","type":["null","string"],"default":null},{"name":"raw_low","type":["null","string"],"default":null},{"name":"raw_ref_vals","type":["null","string"],"default":null},{"name":"ref_normal_vals","type":["null","string"],"default":null},{"name":"ref_range_type","type":["null","string"],"default":null},{"name":"ref_unit_uom_id","type":["null","double"],"default":null},{"name":"reference_high","type":["null","string"],"default":null},{"name":"reference_low","type":["null","string"],"default":null},{"name":"reference_unit","type":["null","string"],"default":null},{"name":"result_cmt_end_ln","type":["null","int"],"default":null},{"name":"result_cmt_start_ln","type":["null","int"],"default":null},{"name":"result_date","type":["null","double"],"default":null},{"name":"result_flag_c","type":["null","int"],"default":null},{"name":"result_in_range_yn","type":["null","string"],"default":null},{"name":"result_status_c","type":["null","int"],"default":null},{"name":"result_sub_idn","type":["null","string"],"default":null},{"name":"result_time","type":["null","double"],"default":null},{"name":"result_val_end_ln","type":["null","int"],"default":null},{"name":"result_val_start_ln","type":["null","int"],"default":null},{"name":"resulting_lab_id","type":["null","double"],"default":null},{"name":"rslt_reportable_yn","type":["null","string"],"default":null},{"name":"serv_area_id","type":["null","double"],"default":null},{"name":"value_normalized","type":["null","string"],"default":null},{"name":"verify_user_id","type":["null","string"],"default":null},{"name":"sys_change_version","type":["null","long"],"default":null},{"name":"sys_change_operation","type":["null","string"],"default":null},{"name":"loaddate","type":["null","long"],"default":null}]}

Now that we have our avro schema we need to create an Avro schema file. For this example the table is clarity_dbo.order_results. So we will create a file called order_results.avsc, copy the avro schema into the file, change the file to Unix EOL (can be done in notepad ++ Edit->EOL Conversion->Unix/OSX Format), and upload it to a dir in HDFS like /schemas/clarity_dbo/order_results:


Figure 7. Avro Schema File in HDFS

Figure 8. Avro Schema File Text
If you cannot find this in the logs, you can generate the avro schema using avro tools. The jar file is available here. Now in the directory you downloaded avro tools to on your local linux file system run this command:
hadoop jar avro-tools-1.8.2.jar getschema /tmp/parquetfix/sdc-4dbc5be5-5809-4faa-9e58-8e9a8b77de84_4c546953-99fd-43f1-a1f4-2eea27b7bbe5 | hadoop fs -put -f - /schemas/clarity_dbo/order_results/order_results.avsc

Parameter breakdown:
1. getschema tells avro tools to grab the schema from...
2. The full path with, file name, to the avro file we are going to generate the avro schema from
3. The path in hadoop where we want to put the avro schema file
Now we need to copy the Avro file from /sources/clarity_dbo/order_results to a directory we can use for this repair, in the example we use /tmp/parquetfix (If there are any files already in that directory you will need to delete them before attempting to repair!!!!). Check the box next to the file and Click Actions->Copy:


Figure 9. HDFS Copy Screen

Figure 10. Avro File in Tmp Dir
We're good to begin the conversion now. Log on to a server on the cluster where you put the conversion jar. Make sure to kinit as a user who has rights to execute this job if you have kerberos on your cluster. In this example I put the jar in sub dir in /var/lib so
cd /var/lib/avro2parquet
Here is where I put the avro2parquet.jar file. For the files/directories cited in this example, this would be the command you would want to execute:
hadoop jar avro2parquet.jar  \
-D mapreduce.map.memory.mb=27120 \
hdfs:///schemas/clarity_dbo/order_results/order_results.avsc\
hdfs:///tmp/parquetfix \
hdfs:///sources/clarity_dbo/order_results/fix
The first part of the command calls the jar.
  • The first parameter determines how much memory should be allocated to the job. In this example we give it 27GB of memory( which is overkill for this, but I had a lot of memory free on the server at the time). Make sure the job has enough memory to run or it will fail. You can see what the current memory load for the server is in Cloudera Manager-->Hosts
  • The second parameter is the link to the avro schema we created in the previous step
  • The third parameter is the location of the avro file we want converted
  • The fourth parameter is the location where we want to dump the converted parquet file(s). Make sure this directory doesn’t already exist or the script will fail

Kick off the script. When it is done, hopefully, you should see a success message:

Figure 11. Mapreduce Job Results
Now we go to /sources/clarity_dbo/order_results/fix to see our parquet files:
Figure 12. Parquet Files
Check the checkbox next to every file with the .parquet extension and click Action->Move and enter the parent folder:

Figure 13. Move Parquet Files

Now that the parquet files are there you can delete the Avro file, the tmp parquet file and the fix directory. This should leave just the .parquet files. To verify the fix took go to Hive and select some records from the table. If you can see the data now congratulations!! You saved the data!

1 comment: