Memory optimizations for distributed executors in big data clouds
MetadataShow full item record
The amount of data generated from software and hardware sensors continues to grow exponentially as the world become more instrumented and interconnected. Our ability to analyze this huge and growing amount of data is critical. Real-time processing of big data enables us to identify frequent patterns, gain better understanding of happenings around us, and increases the accuracy of our predictions on future activities, events, and trends. Hadoop and Spark have been the dominating distributed computing platforms for big data processing and analytics on a cluster of commodity servers. Distributed executors are widely used as the computation abstractions for providing data parallelism and computation parallelism in large computing clusters. Each executor is typically a multi- threaded Java Virtual Machine (JVM) instance on Spark clusters, and Spark runtime sup- ports memory-intensive parallel computation for iterative machine learning applications by launching multiple executors on every cluster node and enabling explicit caching of inter- mediate data as Resilient Distributed Datasets (RDDs). It is well-known that JVM executors may not be effective in utilizing available memory for improving application runtime performance due to high cost of garbage collection (GC). Such situations may get worse when the dataset contains large number of small size objects, leading to frequent GC overhead. Spark addresses such problems by relying on multi-threaded executors with the support of three fundamental storage modes of RDDs: memory-only RDD, disk-only RDD and memory-disk RDD. When RDD partitions are fully cached into the available DRAM, Spark applications enjoy excellent performance for iterative big data analytics workloads as expected. However, these applications start to experience drastic performance degradation when applications have heterogeneous tasks, highly skewed datasets, or their RDD working sets can no longer fully cached in memory. In these scenarios, we identify three serious performance bottlenecks: (1) As the amount of cached data increases, the application performance suffers from high garbage collection overhead. (2) Depending on the heterogeneity of application, or the non-uniformity in data, the distribution of tasks over executors may differ, leading to different memory utilization on executors. Such temporal imbalance of memory usage can cause out-of-memory error for those executors under memory pressure, even though other executors on the same host or in the same cluster have sufficient unused memory. (3) Depending on the task granularity, partition granularity of data to be cached may be too large as the working set size at runtime, experiencing executor thrashing and out-of-memory error, even though there are plenty of unused memory on Spark nodes in a cluster and the total physical memory of the node or the cluster is not fully utilized. This dissertation research takes a holistic approach to tackle the above problems from three dimensions. First, we analyze JVM heap structure, components of garbage collection, and different garbage collection policies and mechanisms. Then using a variety of memory intensive benchmarks, we perform extensive evaluation of JVM configurations on appli- cation performance, under different memory sizes, heap structures and garbage collection algorithms. This comprehensive measurement and comparative analysis enable us to gain an in depth understanding of the inherent problems of JVM GC and the opportunities for introducing effective optimizations. Second, we have engaged in a systematic study on the benefits and hidden performance bottlenecks of distributed executors and their use of RDDs in Spark runtime due to inefficient utilization of memory resources on both Spark node and Spark cluster. Through extensive measurement and analytical study, we identify several inherent problems of Spark RDDs when the partition granularity of RDDs exceeds the available working memory of the application running on Spark. To improve the performance of distributed executors for big data analytics workloads, we develop a lightweight, cooperative RDD caching frame- work for Spark executors. We implement the first prototype of this framework, named as DAHI. DAHI is novel in three perspectives. First, DAHI introduces a Node Manager to coordinate memory operations of JVM instances. Second, DAHI develops the d-store instances that are attached to JVMs to enable coordination of data caching by the DAHI Node Manager. The combination of node manager and d-store enables DAHI to effectively re- duce garbage collection overhead caused by data caching. Furthermore, the node manager coordinated RDD cache memory management provides efficient sharing of RDD caches among all JVM instances on each node of the Spark cluster. This design offers seamless capability to prevent drastic performance degradation in the situations where one or more executor experience and suffer from out-of-memory error, even though other executor(s) on the same node have plenty of unused memory. Moreover, DAHI can effectively consolidate RDD cache memory for all JVM instances on a node, and achieve high memory utilization for each executor and each cluster node by reducing or avoiding the out-of-memory errors in executors through dynamic and graceful management of RDD caching. Finally, we propose to extend DAHI development to provide DAHI-Remote capability. Today, many data centers have reported temporal imbalance of memory utilization across nodes in a cluster, such as Google and Facebook. To address the potential problems of memory contention on some compute nodes in a cluster, DAHI-remote development is aimed to alleviate the high memory pressure of those executors that cannot find sufficient idle memory on their local nodes in a cluster by creating and providing remote memory sharing opportunities from other nodes in the cluster. The DAHI-Remote framework will enable memory coordination and caching among JVM instances across the entire cluster through a hierarchical RDD caching and memory sharing protocol. First, we enable JVM instances to have access their local native memory under local node and d-store manage- ment if available. Second, upon detection of insufficient memory for executors on the local node, DAHI-Remote creates and establish channels for remote native memory for RDD caching. For large size clusters, DAHI-Remote will create group based sharing such that each node can select and belong to one DAHI-Remote sharing group. The formation of DAHI-Remote sharing group is guided based on load balance, availability and overall performance of the applications on the cluster to ensure high throughput and low latency. DAHI-Remote also achieves low garbage collection overhead for JVM instances, and en- ables efficient memory coordination among local and distributed executors across the clus- ter. DAHI-Remote also investigates policies for selecting remote cache node(s), selecting RDD partitioning schemes, aiming for high throughput, and fast data transfer over RDMA.