Showing posts with label sqoop. Show all posts
Showing posts with label sqoop. Show all posts

Saturday, July 1, 2017

Creating a Sqoop Metastore in Postgres

     If you are using sqoop to load data from your source system relational databases into Hadoop, you probably want a way to track the deltas. This way you won't have to do a full load every time or build some delta mechanism on your own. The sqoop metastore provides a way for your sqoop jobs to store a value so that the next time the job kicks off, it will load from after that value. So if you were loading based on some incremental value such as an auto increment key, and the last value you loaded was 15, the next time the job ran it would add a autoincrementkey>15 to the where clause when selecting. 

    If your Hadoop vendor is Cloudera, they will want you to use a Hyper SQL database for your sqoop metastore. It's ok and all, but if you already have MySQL or Postgres installed on your cluster, its just 1 more database for you to maintain. If you don't want to use Hyper SQL as your metastore, and have Postgres installed, you can follow the steps in this post for getting your sqoop metastore up and running on Postgres. (Just keep in mind, that if something happens, Cloudera will not help you since this currently not supported).




  Log into the CLI of the server running Postgres and su as postgres. 

   su postgres

  Then create a database that will act as our sqoop metastore.

   psql -c "create database sqoop"
  
   While still in the Postgres shell, create a new Role to operate under: 

  CREATE ROLE sqoop WITH PASSWORD 'test';

    Change the owner of the sqoop database to sqoop:


     ALTER DATABASE sqoop OWNER TO sqoop;

         Make sure the role can login:

         ALTER ROLE sqoop WITH LOGIN;

         Verify:
    
     \list
    
        Now connect to the sqoop database:

          \connect sqoop
      
          Create a table:

CREATE TABLE SQOOP_ROOT (
      VERSION INT,
      PROPNAME VARCHAR(128) NOT NULL,
      PROPVAL VARCHAR(256),
     CONSTRAINT SQOOP_ROOT_UNQ UNIQUE (VERSION, PROPNAME)
     );

          Insert storage data into new table:

            INSERT INTO SQOOP_ROOT
        VALUES(
          NULL,
          'sqoop.hsqldb.job.storage.version',
          '0';

        Insert job info into new table:

      INSERT INTO SQOOP_ROOT
VALUES(
    0,
    'sqoop.hsqldb.job.info.table',
    'SQOOP_SESSIONS'
          );
         Verify:

       SELECT * FROM SQOOP_ROOT;
    
       
              Create table SQOOP_SESSIONS. 
     
       CREATE TABLE SQOOP_SESSIONS(
         JOB_NAME VARCHAR(64) ,
         PROPNAME VARCHAR(128) ,
         PROPVAL VARCHAR(1024) ,
         PROPCLASS VARCHAR(32) ,
        CONSTRAINT SQOOP_SESSIONS_unq UNIQUE
(JOB_NAME ,
PROPNAME ,
PROPCLASS)
          );
           
        Grant table perms:

        GRANT ALL PRIVILEGES ON SQOOP_ROOT TO sqoop;
        GRANT ALL PRIVILEGES ON SQOOP_SESSIONS TO sqoop;
''
          Verify port postgres is running on (In this example it was running on 5432):

SELECT * FROM pg_settings WHERE name = 'port';


          Log into Cloudera Manager and go into the Sqoop 1 Client:

Figure 1. Cloudera Manager Sqoop Client
        


         In the Sqoop 1 Client screen, click on configuration and filter on:
       
         Sqoop 1 Client Client Advanced Configuration Snippet (Safety Valve) for sqoop-conf/sqoop-site.xml

          In that section add these 4 configs: 

                    Name: sqoop.metastore.client.enable.autoconnect
                   Value:  false
                   Description: If true, Sqoop will connect to a local metastore for job                                                   management when no other metastore arguments are provided.

                   Name: sqoop.metastore.client.autoconnect.url
                   Value: jdbc:postgresql://<your postgres server>:5432/sqoop
                   Description: Connection to the sqoop metastore database

                   Name: sqoop.metastore.client.autoconnect.username
                   Value: sqoop
                   Description: Sqoop metastore database user name

                   Name: sqoop.metastore.client.autoconnect.password
                   Value: test
                   Description: Sqoop metastore database password

Figure 2. Sqoop Client Config 

        Save changes and go back to Cloudera Manager's main screen. You should see a new icon next to the Sqoop 1 Client:

Figure 3. Stale Sqoop Config

         Click that and then click deploy changes on the next screen. You should now be good to go. Make sure a postgres JDBC driver exists in: /var/lib/sqoop on whatever server is running your sqoop jobs.  


        Create a test job to make sure everything is working. Here is a sample:

    sqoop job --create sample_job --meta-connect "jdbc:postgresql://<postgres server>:5432/sqoop?user=sqoop&password=test" -- import --connect jdbc:oracle:thin://@<oracle server>:1521/servicename --username user --password pass
--table SUPPORT.APC_CODE --as-avrodatafile --target-dir /staging/hpm_support/apc_code --check-column mycolumn --incremental append --last-value 0 m 1


     Verify job was created:

   select * from SQOOP_SESSIONS;


      To run the sample job:

       sqoop job --exec sample_job --meta-connect "jdbc:postgresql://<postgres server>:5432/sqoop?user=sqoop&password=test"


     Remove sample job:

   sqoop job --delete sample_job --meta-connect "jdbc:postgresql://<postgres server>:5432/sqoop?user=sqoop&password=test"


     Your Postgres metastore should now be all set to go!

   









    

Sunday, June 4, 2017

Installing and Configuring Hashicorp Vault to work with Streamsets Data Collector

     I've been using Streamsets Data Collector a lot lately in my work, and I'm really impressed with it. It has a really nice UI and lots of components that come out of the box with the product. By virtue of the name, this product was built for streaming data, i.e. you turn on a pipeline and let it just run. Now that they have included the Pipeline Finisher Executor in version 2.5.0, being able to load data from RDBMs in batch has become much easier. 

    Since I use this product to connect to various source data systems, and since these pipelines can be exported to JSON files, the potential to have plaintext usernames/passwords laying around is present. Luckily, Data Collector has native integration with Hashicorp Vault, where we can store our source system usernames and passwords as "secrets". I'm going to walk through, step by step, how to configure and use Vault with Cloudera's Hadoop distribution and Streamsets. 

  1. Download binary from here and make sure to grab the 64 bit version for Linux
  2. Unzip the file to /usr/vault 
  3. Create a symbolic link called vault in  /usr/bin  that points to  /usr/vault/vault 
  4. Allow the file to be executed: chmod u+x /usr/vault/vault
  5. Create a directory to store the config: mkdir /usr/vault/config.d 
  6.       Now we need to create a "Secret Backend" a.k.a. storage for the "secrets". In this example we will mount a backend in zookeeper. By using zookeeper we can take advantage of the high availability that comes with it and not have to worry about building out a HA solution using the one of the other available backends.  In the /usr/vault/config.d directory make a file called  config.hcl with the following contents in the Hashicorp Configuration Language:
backend "zookeeper" {
     address = "<zookeepernode1>:2181,<zookeepernode2>:2181,<zookeepernode3>:2181"
     path = "vault/"
     redirect_addr ="http://<vaultserver>:8200"
     } 
     listener "tcp" {
     address = "<vaultserver>:8200"
     tls_disable = 1
     }  
    
       Put in all of your zookeeper nodes separated  by a comma, as well as the name of              the server that is going to run Vault. In this example we are not going to cover TLS,           so I have it disabled
 7.  Now we can start up Vault: vault server -config=/usr/vault/config.d/config.hcl
 8.  Hit ctrl z then enter bg to have Vault run in the background
       9.  Next we initialize vault and get the keys (we are using a share of 3 and a threshold               of 3. This means you need all 3 keys to unseal the vault in this example) 

vault init -key-shares=3 -key-threshold=3  -address=http://<vaultserver>:8200  

        which produced:


      Unseal Key 1: <key1>
      Unseal Key 2: <key2>
      Unseal Key 3: <key3>
      Initial Root Token: <roottoken>
      

         Make sure to store these keys/token somewhere safe, because you won't be able to              get these back again

  10. Since we aren't using TLS, we need to overwrite an environment variable so Vault               won't try to use https anymore: 
     export VAULT_ADDR='http://<vaultserver>:8200' 
 11. You now have the master keys to the vault as well as an initial root token. Use the             master key(s) to "unseal" the vault so we can start using it. Here are the commands            using the REST API that will do this. Until all 3 keys are entered, the vault will                       remain sealed:

      curl \
    -X PUT \
    -d '{"key": "<key1>"}' \
    http://<vaultserver>:8200/v1/sys/unseal

curl \
    -X PUT \
    -d '{"key": "<key2>"}' \
    http://<vaultserver>:8200/v1/sys/unseal

curl \
    -X PUT \
    -d '{"key": "<key3>"}' \
    http://<vaultserver>:8200/v1/sys/unseal

         After running all 3 the vault will "unseal" and will give us this JSON response,                     verifying we have indeed unsealed the vault:

  {"sealed":false,"t":3,"n":3,"progress":0,"nonce":"","version":"0.7.0","cluster_name":"vault-cluster-22ad02a3","cluster_id":"160eb791-8a19-dcad-115f-e741e561e4f4"}
[   root@<vaultserver> ~]# 2017/04/28 11:31:08.252492 [INFO ] core: acquired lock, enabling active operation
    2017/04/28 11:31:08.302087 [INFO ] core: post-unseal setup starting
    2017/04/28 11:31:08.302459 [INFO ] core: loaded wrapping token key
    2017/04/28 11:31:08.303398 [INFO ] core: successfully mounted backend: type=generic path=secret/
    2017/04/28 11:31:08.303521 [INFO ] core: successfully mounted backend: type=system path=sys/
    2017/04/28 11:31:08.303542 [INFO ] core: successfully mounted backend: type=cubbyhole path=cubbyhole/
    2017/04/28 11:31:08.303611 [INFO ] rollback: starting rollback manager
    2017/04/28 11:31:08.305776 [INFO ] expiration: restoring leases
    2017/04/28 11:31:08.307007 [INFO ] core: post-unseal setup complete
    2017/04/28 11:31:08.307025 [INFO ] core/startClusterListener: starting listener: listener_address=<vaultserver>:8201
    2017/04/28 11:31:08.307417 [INFO ] core/startClusterListener: serving cluster requests: cluster_listen_address=<vaultserver>:8201

12.   Now we need to authenticate Streamsets so it can be used with Vault. First we get               the user id for Streamsets by running: /usr/bin/streamsets show-vault-id which will give us our <vaultid> for Streamsets.
      13.   To authenticate we enable the App ID authentication type using the root token as such:


curl -X POST -H "X-Vault-Token:<roottoken>" -d '{"type":"app-id"}' http://<vaultserver>:8200/v1/sys/auth/app-id


       The log data stream should report:  

core: enabled credential backend: path=app-id/ type=app-id 

     Full disclosure here...the App ID authentication type is deprecated and is being                    replaced by the AppRole authentication type. As of writing this, Streamsets does not            yet support AppRole so we have go to ahead with App ID.
14. Now we need to create a policy that this role can access. This will be the policy used             to read "secrets" out of Vault.

curl \
    -X PUT \
   -H "X-Vault-Token:<roottoken>" \
    -H "Content-Type: application/json" \
    -d '{"rules": "{\"path\":{\"secret/*\": {\"policy\": \"read\"}}"}' \
    http://<vaultserver>:8200/v1/sys/policy/secret-policy

    We can verify the policy took place by running:

curl \
  -X GET \
  -H "X-Vault-Token:<roottoken>" \
  http://<vaultserver>:8200/v1/sys/policy/secret-policy

      15. Now we need to create the App ID for Streamsets to use. Try to use a randomly                     generated UUID like:

 uuidgen

      This produces the value <appid> which we will use as our App ID                                     16.  Now we will create our App ID authentication and associate it with the App ID, we                 just generated, and the secret policy

curl -X POST -H "X-Vault-Token:<roottoken>" -d '{"value":"secret-policy", "display":"streamsets"}' \
http://<vaultserver>:8200/v1/auth/app-id/map/app-id/<appid>

17.  Next we will associate this App ID with the Vault ID we accessed
     in streamsets:

curl -X POST -H "X-Vault-Token:<roottoken>" -d '{"value":"<vaultid>"}' http://<vaultserver>:8200/v1/auth/app-id/map/user-id/<appid> 


18.  Let’s verify everything was entered properly

curl -X GET -H "X-Vault-Token:<roottoken>" http://<vaultserver>:8200/v1/auth/app-id/map/app-id/<vaultid>

curl -X GET -H "X-Vault-Token:<roottoken>" http://<vaultserver>:8200/v1/auth/app-id/map/user-id/<appid>


     19.  With all of this behind us now, we can finally enter our first set of secrets. Let's store           the username "user1" and password "pass1" for a source system we will call "source": 



      curl \
     -H "X-Vault-Token: <roottoken>" \
     -H "Content-Type: application/json" \
      -X POST \
     -d '{"value":"user1"}' \
      http://<vaultserver>:8200/v1/secret/source/username


    curl \
      -H "X-Vault-Token: <roottoken>" \
      -H "Content-Type: application/json" \
      -X POST \
      -d '{"value":"pass1"}' \
      http://<vaultserver>:8200/v1/secret/source/password
      

   20.  You can verify the "secrets" took by running something like this:
    
    curl -X GET -H "X-Vault-Token:<roottoken>"  http://<vaultserver>:8200/v1/secret/source/username 
  

        Which should return something like this:


{"request_id":"ded782b5-7f08-d2c0-5929-94b99039cc87","lease_id":"","renewable":false,"lease_duration":2764800,"data":{"value":"user1"},"wrap_info":null,"warnings":null,"auth":null} 
  
   21. Now we need to configure Streamsets. Go into Cloudera Manager, click on the                   Streamsets  service on the main page. In the streamsets screen click on configuration       on the top of the screen. In the  search box type *sdc.pr to filter for Data Collector             Advanced Configuration Snippet (Safety Valve) for sdc.properties. Type the following in     the text box:

       vault.addr=http://<vaultserver>:8200
     vault.app.id=<appid>


Figure 1. Cloudera Manager Configuration for Streamsets


   Click Save Changes on the bottom of the page.

22. Go back to the main page on Cloudera Manager. Restart the Streamsets Service to load       the config changes and we're done!

      Now to use Vault, here are a couple of examples:

Streamsets 

    When attempting to access the source username and password from a Streamsets                 pipeline we simply use the following notation for credentials to access the secrets:

Figure 2. Streamsets Data Collector JDBC Origin Credentials Tab



            Here is an example of using Vault secrets for accessing a JDBC origin in Data Collector.

   Bash/Sqoop

 usercontent=$(curl -s  -X GET -H "X-Vault-Token:<roottoken>"  http://<vaultserver>:8200/v1/secret/source/username) \
     username=$( echo jq -r  '.data.value' <<< "${usercontent}" ) \
      pwdcontent=$(curl -s  -X GET -H "X-Vault-Token:<roottoken>"  http://<vaultserver>:8200/v1/secret/source/password) \
     password=$( echo jq -r  '.data.value' <<< "${pwdcontent}" ) \
     sqoop import -Dmapreduce.map.memory.mb=4096 -              Dmapreduce.job.name=Import_<user>_<table> -D-Xmx3500m --connect jdbc:oracle:thin://<sourceserver>:1521/<servicename> --username ${username} --password ${password} -m 1 --table <user>.<table> --as-avrodatafile --target-dir /staging/source/<table> --delete-target-dir
   
        This bash script will connect to our data source, using usernames and passwords stored in     Vault via sqoop, and store the data using the avro format in hdfs.

   Java (w/ Jackson and Jersey)

import java.io.IOException;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;

public class VaultTest {
      
              Secret secret;
              public static class Data {
      
                     public String value;
       }

              public static class Secret {

      
                     public String request_id;
                     public String lease_id;
                     public boolean renewable;
                     public int lease_duration;
                     public Data data;
                     public String wrap_info;
                     public String warnings;
                     public String auth;
       }
     public void executeTest() throws JsonParseException, JsonMappingException,      ClientHandlerException,UniformInterfaceException, IOException {


              Client client = Client.create();

              WebResource webResource =                                                                    client.resource("http://<vaultserver>:8200/v1/secret/source/username");
              ClientResponse response = webResource.accept("application/json")
                           .header("X-Vault-Token", "<roottoken>").get(ClientResponse.class);
              if (response.getStatus() != 500) {

                     if (response.getStatus() != 200) {
                           throw new RuntimeException("Failed : HTTP error code : " + response.getStatus());
                     }
                     response.bufferEntity();

                     ObjectMapper mapper = new ObjectMapper();
                     secret = mapper.readValue(response.getEntity(String.class), Secret.class);

                     System.out.println(secret.data.value);

        [root@<vaultserver> ~]# java -jar /var/tmp/vaulttest/VaultTest.jar
     user1  

      In this example we're using a Java application with Jersey and Jackson to pull the secret from Vault's REST API, deserialze it, and print it out to the CLI.