Overview of the Google Cloud Platform (GCP) Dataproc Product

Below is an overview of the Google Cloud Platform (GCP) Dataproc product.

*Double-tap to expand/collapse an item. Left-tap & drag to move. Mouse-wheel/pinch to zoom.
Knowledge Graph |  Text |  Top
Google Cloud Platform (GCP) Dataproc Product Overview Google Cloud Platform (GCP) Managed Hadoop product > Tips Create a cluster for each processing job Shutdown the cluster when it is not processing data Auto shutdown options Cloud Dataproc Workflows Cloud Composer Cluster Scheduled Deletion Use Cloud Storage instead of HDFS Use custom machines to closely the CPU and RAM requirements of a job Use PVMs to cut costs and decrease job processing time, for non-critical jobs only Send the output results of DataProc into BigQuery to benefit from its analysis capabilities How to process large amount of data (memory size) coming from BigQuery? How to read the data into an RDD or Spark dataframe? Option 1 - Export data from BigQuery into a Cloud Storage sharded table; Use a ShardedRecordReader in Dataproc to read it into an RDD with ShardedInputSplit Option 2 - Setup BigQuery connector on the Dataproc cluster - enables read/write access from Spark/Hadoop directly into BigQuery Connector is a Java library Can't query, can only read the whole table sc.newAPIHadoopRDD(); bq.load(); JSON encoding/decoding Collection of connectors from: Map Reduce, Hadoop, Spark Scala and Spark Pyspark Issue with BigQuery reading the output from Dataproc - each Dataproc worker outputted into its own shard, so would cause a separate BigQuery job to run to read each shared and ingest all the data. Overcome by using connector using Cloud Storage. Cloud Dataproc BigQuery Connectors use Cloud Storage - Dataproc workers write their shards as objects to Cloud Storage. When work is complete, as part of connector shutdown process, a single load job is issued in BigQuery. PySpark issues a bq shell command to run the BigQuery job Spark Scala directly generates an API request to BigQuery Integrate with Bigtable through its open source, industry standard HBase API > Overview Hadoop clusters are now flexible, software-defined resources > Challenges of Hadoop On-Premises - you manage everything: custom code, health/monitoring, dev integration, scaling, job submission, GCP connectivity, deployment, creation Vendor Hadoop (IaaS) - you manage everything: custom code, health/monitoring, dev integration, scaling, job submission, GCP connectivity, deployment, creation bdutil - you manage: custom code, health/monitoring, dev integration, scaling, job submission A free open-source toolkit/CLI (by Google) for managing Apache Hadoop and Apache Spark instances on Google Compute Engine DEPRECATED - use Dataproc instead GitHub - bdutil Dataproc - you manage: custom code only; GCP manages everything else > Features Scheduled Deletion - turn down clusters automatically; idle (min 10 minutes), timestamp (max 14 days), duration (granularity 1 second) Auto-scaling - based on Hadoop YARN Metrics - only works if storing data off-cluster NOT designed to scale to zero - not a good choice for idle / rarely used clusters - instead terminate and create when needed Worker Nodes more than the Nodes Minimum setting - ensures cluster starts faster to basic capacity than autoscale scale_up.factor - how many nodes to add when Autoscaling determines it is time to scale up. Typically 1. Cooldown period - after a scale up or down - to let things settle before autoscaling evaluation occurs again Graceful_decomission_timeout - to give running jobs a change to complete before the node goes out of service scale_down.factor Primary.max_workers Secondary.min_workers (for Preemptible workers) Secondary.max_workers (for Preemptible workers) Cluster Scheduled Deletion Option - a fixed amount of time after the cluster enters the Idle state Option - a timestamp for expiry - RFC 3339 UTC Zulu format accurate to nanoseconds Option - duration in seconds to wait before deleting the cluster - from 10 minutes to 14 days max; 1 second granularity; up to 9 fraction digits terminated with 's'. e.g. '5.6s' Google docs - Cluster Scheduled Deletion Available from command line and REST API, but not the Console > Reduce costs with Preemptible VMs (PVMs) - up to 80% cheaper than regular VMs - (only because Hadoop jobs/workloads are fault tolerant - with data stored off-cluster) Use Cases Non-critical processing (the node can be taken away at any time and lose work in progress!) Huge clusters (to reduce cost) > How? Primary Managed Instance Group - non-preemptible workers; 2 node minimum Secondary Managed Instance Group - preemptible; processing only - no data storage; disk only for system and cache; disk is MIN(worker node GB, 100GB) Warning: The more PVMs you have, the higher the likelihood of failures. More than a 50/50 ratio of PVMs may have diminishing returns Warning: There may not be enough PVMs in the region, so you may not get any! Cluster starts in 90 seconds > Cluster modes Single node (for experimentation) Standard (1 master only) High availability (3 masters) - better for long-running jobs so loss of a VM won't lose a job Network setting - to setup firewall rules etc Initialisation actions - to run initialisation scripts and install custom software on the workers and master > Machine/Instance Types Custom Compute instances - you select the exact number of VCPUs and RAM Why? To overcome Hadoop's limitations Tips Use the UI Advanced View to choose the size and CPUs, then get the machineType value from the REST code CLI command: gcloud dataproc clusters create my-cluster --worker-machine-type custom-6-30720 --master-machine-type custom-6-23040 (i.e. 6 VCPUs and 30GB * 1024 = 30720KB) > Standard Compute instances n1-[standard|highmem]-CpuAndRamMultiplier (where each block of memory is 3.75GB, and highmem is double the total RAM of standard) n1-standard-1 = 1 VCPU, 3.75GB RAM n1-standard-2 = 2 VCPUs, 7.5GB RAM n1-standard-4 = 4 VCPUs, 15GB RAM n1-standard-8 = 8 VCPUs, 30GB RAM n1-highmem-4 = 4 VCPUs, 30GB RAM Does NOT support Spark Structured Streaming (streaming service on top of Spark SQL) Apache Bigtop Open source project that automates packaging, deployment and integration of other projects in the Hadoop ecosystem Features Gathers the core Hadoop components and ensures the configuration works Uses Jenkins for continuous integration testing Ensures the default Dataproc clusters perform well - and eliminates elements that are not needed and are wasting resources Handles installation of base software. Installation scripts For customising Dataproc clusters Examples: make custom alteration to the master node; make consistent changes to all worker nodes; install Cloud Datalab on a master or worker Procedure: Write executable program (bash, python, etc); upload to Cloud Storage (gsutil); specify GCS location in Dataproc creation command Find example scripts on Github for your desired software e.g. use apt-get ; ROLE=$(/usr/share/google/get_metadata_value attributes/dataproc-role) ; if [[ "${ROLE}" == 'Master' ]]; then apt-get install -y blah else #worker only fi GCP doco - example scripts: Dataproc Initialisation Actions Example scripts available in public bucket gs://dataproc-initialization-actions Default --initialization-action-timeout is 10 minutes (10m) Modify cluster properties - e.g. common config files (core-site.xml); or specify by file_prefix:property=value in gcloud SDK Cloud DataProc Workflow Template YAML file processed through a Directed Acyclic Graph Can do lifecycle of actions - create cluster, select existing cluster, submit jobs, hold jobs for submission until dependencies can complete, delete cluster gcloud and REST; NOT UI Console Good if only doing Dataproc - think of Cloud Composer if more complicated Cloud Composer > How to start a cluster? Deployment Manager Template gcloud CLI commands gcloud dataproc clusters create my-cluster --zone us-central1-a --master-machine-type n1-standard-1 --master-boot-disk-size 50 --num-workers 2 --worker-machine-type n1-standard-1 --worker-boot-disk-size 50 GCP web console > Why use Cloud Storage instead of HSFS? Cloud Storage with Petabit network is generally faster depending on the IOPs of the VM you choose it reduces config complexity of the cluster disk size allows you to shutdown your cluster without losing data > How to migrate code / lift and shift Hadoop workloads Use gs:// to access Cloud Storage instead of hdfs:// to access local cluster data Cloud Bigtable supports HBase > Concepts Clusters Region Zone - where the compute nodes will live - want this close to your data HDFS - storage on the cluster disks Jobs Job types: Hadoop, Spark, SparkR, PySpark, Hive, SparkSql, Pig, Presto Workflows Notebooks Yarn Resource Manager - web access on port 8088 of master node - to check the status of jobs HDFS Name Node - web access on port 9870 of master node - to look at HDFS insights SSH to cluster nodes; can use a SOCKS proxy to connect browser through an SSH tunnel Why? To directly access software installed on the cluster - e.g. Hive, Pig, PySpark Cluster Customisation Can install for example: hadoop, hcatalog, hive, hue, kafka, mahout, oozie, pig, spark, sqoop, tez, zookeeper