Google Cloud Platform (GCP) Dataproc Product Overview
Google Cloud Platform (GCP)
Managed Hadoop product
>
Tips
Create a cluster for each processing job
Shutdown the cluster when it is not processing data
Auto shutdown options
Cloud Dataproc Workflows
Cloud Composer
Cluster Scheduled Deletion
Use Cloud Storage instead of HDFS
Use custom machines to closely the CPU and RAM requirements of a job
Use PVMs to cut costs and decrease job processing time, for non-critical jobs only
Send the output results of DataProc into BigQuery to benefit from its analysis capabilities
How to process large amount of data (memory size) coming from BigQuery? How to read the data into an RDD or Spark dataframe?
Option 1 - Export data from BigQuery into a Cloud Storage sharded table; Use a ShardedRecordReader in Dataproc to read it into an RDD with ShardedInputSplit
Option 2 - Setup BigQuery connector on the Dataproc cluster - enables read/write access from Spark/Hadoop directly into BigQuery
Connector is a Java library
Can't query, can only read the whole table
sc.newAPIHadoopRDD(); bq.load(); JSON encoding/decoding
Collection of connectors from: Map Reduce, Hadoop, Spark Scala and Spark Pyspark
Issue with BigQuery reading the output from Dataproc - each Dataproc worker outputted into its own shard, so would cause a separate BigQuery job to run to read each shared and ingest all the data. Overcome by using connector using Cloud Storage.
Cloud Dataproc BigQuery Connectors use Cloud Storage - Dataproc workers write their shards as objects to Cloud Storage. When work is complete, as part of connector shutdown process, a single load job is issued in BigQuery.
PySpark issues a bq shell command to run the BigQuery job
Spark Scala directly generates an API request to BigQuery
Integrate with Bigtable through its open source, industry standard HBase API
>
Overview
Hadoop clusters are now flexible, software-defined resources
>
Challenges of Hadoop
On-Premises - you manage everything: custom code, health/monitoring, dev integration, scaling, job submission, GCP connectivity, deployment, creation
Vendor Hadoop (IaaS) - you manage everything: custom code, health/monitoring, dev integration, scaling, job submission, GCP connectivity, deployment, creation
bdutil - you manage: custom code, health/monitoring, dev integration, scaling, job submission
A free open-source toolkit/CLI (by Google) for managing Apache Hadoop and Apache Spark instances on Google Compute Engine
DEPRECATED - use Dataproc instead
GitHub - bdutil
Dataproc - you manage: custom code only; GCP manages everything else
>
Features
Scheduled Deletion - turn down clusters automatically; idle (min 10 minutes), timestamp (max 14 days), duration (granularity 1 second)
Auto-scaling - based on Hadoop YARN Metrics - only works if storing data off-cluster
NOT designed to scale to zero - not a good choice for idle / rarely used clusters - instead terminate and create when needed
Worker Nodes more than the Nodes Minimum setting - ensures cluster starts faster to basic capacity than autoscale
scale_up.factor - how many nodes to add when Autoscaling determines it is time to scale up. Typically 1.
Cooldown period - after a scale up or down - to let things settle before autoscaling evaluation occurs again
Graceful_decomission_timeout - to give running jobs a change to complete before the node goes out of service
scale_down.factor
Primary.max_workers
Secondary.min_workers (for Preemptible workers)
Secondary.max_workers (for Preemptible workers)
Cluster Scheduled Deletion
Option - a fixed amount of time after the cluster enters the Idle state
Option - a timestamp for expiry - RFC 3339 UTC Zulu format accurate to nanoseconds
Option - duration in seconds to wait before deleting the cluster - from 10 minutes to 14 days max; 1 second granularity; up to 9 fraction digits terminated with 's'. e.g. '5.6s'
Google docs - Cluster Scheduled Deletion
Available from command line and REST API, but not the Console
>
Reduce costs with Preemptible VMs (PVMs) - up to 80% cheaper than regular VMs - (only because Hadoop jobs/workloads are fault tolerant - with data stored off-cluster)
Use Cases
Non-critical processing (the node can be taken away at any time and lose work in progress!)
Huge clusters (to reduce cost)
>
How?
Primary Managed Instance Group - non-preemptible workers; 2 node minimum
Secondary Managed Instance Group - preemptible; processing only - no data storage; disk only for system and cache; disk is MIN(worker node GB, 100GB)
Warning: The more PVMs you have, the higher the likelihood of failures. More than a 50/50 ratio of PVMs may have diminishing returns
Warning: There may not be enough PVMs in the region, so you may not get any!
Cluster starts in 90 seconds
>
Cluster modes
Single node (for experimentation)
Standard (1 master only)
High availability (3 masters) - better for long-running jobs so loss of a VM won't lose a job
Network setting - to setup firewall rules etc
Initialisation actions - to run initialisation scripts and install custom software on the workers and master
>
Machine/Instance Types
Custom Compute instances - you select the exact number of VCPUs and RAM
Why? To overcome Hadoop's limitations
Tips
Use the UI Advanced View to choose the size and CPUs, then get the machineType value from the REST code
CLI command: gcloud dataproc clusters create my-cluster --worker-machine-type custom-6-30720 --master-machine-type custom-6-23040 (i.e. 6 VCPUs and 30GB * 1024 = 30720KB)
>
Standard Compute instances
n1-[standard|highmem]-CpuAndRamMultiplier (where each block of memory is 3.75GB, and highmem is double the total RAM of standard)
n1-standard-1 = 1 VCPU, 3.75GB RAM
n1-standard-2 = 2 VCPUs, 7.5GB RAM
n1-standard-4 = 4 VCPUs, 15GB RAM
n1-standard-8 = 8 VCPUs, 30GB RAM
n1-highmem-4 = 4 VCPUs, 30GB RAM
Does NOT support Spark Structured Streaming (streaming service on top of Spark SQL)
Apache Bigtop
Open source project that automates packaging, deployment and integration of other projects in the Hadoop ecosystem
Features
Gathers the core Hadoop components and ensures the configuration works
Uses Jenkins for continuous integration testing
Ensures the default Dataproc clusters perform well - and eliminates elements that are not needed and are wasting resources
Handles installation of base software.
Installation scripts
For customising Dataproc clusters
Examples: make custom alteration to the master node; make consistent changes to all worker nodes; install Cloud Datalab on a master or worker
Procedure: Write executable program (bash, python, etc); upload to Cloud Storage (gsutil); specify GCS location in Dataproc creation command
Find example scripts on Github for your desired software
e.g. use apt-get ; ROLE=$(/usr/share/google/get_metadata_value attributes/dataproc-role) ; if [[ "${ROLE}" == 'Master' ]]; then apt-get install -y blah else #worker only fi
GCP doco - example scripts: Dataproc Initialisation Actions
Example scripts available in public bucket gs://dataproc-initialization-actions
Default --initialization-action-timeout is 10 minutes (10m)
Modify cluster properties - e.g. common config files (core-site.xml); or specify by file_prefix:property=value in gcloud SDK
Cloud DataProc Workflow Template
YAML file processed through a Directed Acyclic Graph
Can do lifecycle of actions - create cluster, select existing cluster, submit jobs, hold jobs for submission until dependencies can complete, delete cluster
gcloud and REST; NOT UI Console
Good if only doing Dataproc - think of Cloud Composer if more complicated
Cloud Composer
>
How to start a cluster?
Deployment Manager Template
gcloud CLI commands
gcloud dataproc clusters create my-cluster --zone us-central1-a --master-machine-type n1-standard-1 --master-boot-disk-size 50 --num-workers 2 --worker-machine-type n1-standard-1 --worker-boot-disk-size 50
GCP web console
>
Why use Cloud Storage instead of HSFS?
Cloud Storage with Petabit network is generally faster depending on the IOPs of the VM you choose
it reduces config complexity of the cluster disk size
allows you to shutdown your cluster without losing data
>
How to migrate code / lift and shift Hadoop workloads
Use gs:// to access Cloud Storage instead of hdfs:// to access local cluster data
Cloud Bigtable supports HBase
>
Concepts
Clusters
Region
Zone - where the compute nodes will live - want this close to your data
HDFS - storage on the cluster disks
Jobs
Job types: Hadoop, Spark, SparkR, PySpark, Hive, SparkSql, Pig, Presto
Workflows
Notebooks
Yarn Resource Manager - web access on port 8088 of master node - to check the status of jobs
HDFS Name Node - web access on port 9870 of master node - to look at HDFS insights
SSH to cluster nodes; can use a SOCKS proxy to connect browser through an SSH tunnel
Why? To directly access software installed on the cluster - e.g. Hive, Pig, PySpark
Cluster Customisation
Can install for example: hadoop, hcatalog, hive, hue, kafka, mahout, oozie, pig, spark, sqoop, tez, zookeeper