diff --git a/iuberdata_core/src/main/resources/python/spark_ec2.py b/iuberdata_core/src/main/resources/python/spark_ec2.py index 7111514..dc7f388 100755 --- a/iuberdata_core/src/main/resources/python/spark_ec2.py +++ b/iuberdata_core/src/main/resources/python/spark_ec2.py @@ -169,12 +169,12 @@ class UsageError(Exception): # Configure and parse our command-line arguments def parse_args(): parser = OptionParser(usage="spark-ec2 [options] " - + "\n\n can be: launch, destroy, login, stop, start, get-master", + + "\n\n can be: launch, destroy, login, stop, start, get-main", add_help_option=False) parser.add_option("-h", "--help", action="help", help="Show this help message and exit") - parser.add_option("-s", "--slaves", type="int", default=1, - help="Number of slaves to launch (default: 1)") + parser.add_option("-s", "--subordinates", type="int", default=1, + help="Number of subordinates to launch (default: 1)") parser.add_option("-w", "--wait", type="int", default=120, help="Seconds to wait for nodes to start (default: 120)") parser.add_option("-k", "--key-pair", @@ -184,11 +184,11 @@ def parse_args(): parser.add_option("-t", "--instance-type", default="m1.large", help="Type of instance to launch (default: m1.large). " + "WARNING: must be 64-bit; small instances won't work") - parser.add_option("-m", "--master-instance-type", default="", - help="Master instance type (leave empty for same as instance-type)") + parser.add_option("-m", "--main-instance-type", default="", + help="Main instance type (leave empty for same as instance-type)") parser.add_option("-r", "--region", help="EC2 region zone to launch instances in") parser.add_option("-z", "--zone", help="Availability zone to launch instances in, or 'all' to spread " + - "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "subordinates across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies)") parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use") parser.add_option("-p", "--profile", help="AWS profile/role arn to use") @@ -212,7 +212,7 @@ def parse_args(): parser.add_option("--swap", metavar="SWAP", type="int", default=1024, help="Swap space to set up per node, in MB (default: 1024)") parser.add_option("--spot-price", metavar="PRICE", type="float", - help="If specified, launch slaves as spot instances with the given " + + help="If specified, launch subordinates as spot instances with the given " + "maximum price (in dollars)") parser.add_option("--ganglia", action="store_true", default=True, help="Setup Ganglia monitoring on cluster (default: on). NOTE: " + @@ -223,12 +223,12 @@ def parse_args(): help="The SSH user you want to connect as (default: root)") parser.add_option("--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created") - parser.add_option("--use-existing-master", action="store_true", default=False, - help="Launch fresh slaves, but use an existing stopped master if possible") + parser.add_option("--use-existing-main", action="store_true", default=False, + help="Launch fresh subordinates, but use an existing stopped main if possible") parser.add_option("--worker-instances", type="int", default=1, help="Number of instances per worker: variable SPARK_WORKER_INSTANCES (default: 1)") - parser.add_option("--master-opts", type="string", default="", - help="Extra options to give to master through SPARK_MASTER_OPTS variable (e.g -Dspark.worker.timeout=180)") + parser.add_option("--main-opts", type="string", default="", + help="Extra options to give to main through SPARK_MASTER_OPTS variable (e.g -Dspark.worker.timeout=180)") (opts, args) = parser.parse_args() if len(args) != 2: @@ -406,7 +406,7 @@ def get_spark_ami(opts): # Launch a cluster of the given name, by setting up its security groups, # and then starting new instances in them. -# Returns a tuple of EC2 reservation objects for the master and slaves +# Returns a tuple of EC2 reservation objects for the main and subordinates # Fails if there already instances running in the cluster's groups. def launch_cluster(conn, opts, cluster_name): @@ -433,52 +433,52 @@ def launch_cluster(conn, opts, cluster_name): echo '$public_key' >> ~ec2-user/.ssh/authorized_keys""").substitute(public_key=public_key) print("Setting up security groups...") - master_group = get_or_make_group(conn, cluster_name + "-master") - slave_group = get_or_make_group(conn, cluster_name + "-slaves") + main_group = get_or_make_group(conn, cluster_name + "-main") + subordinate_group = get_or_make_group(conn, cluster_name + "-subordinates") security_group = os.popen("curl -s http://169.254.169.254/latest/meta-data/security-groups").read() sparknotebook_group = get_or_make_group(conn, security_group) - if master_group.rules == []: # Group was just now created - master_group.authorize(src_group=master_group) - master_group.authorize(src_group=slave_group) - master_group.authorize(src_group=sparknotebook_group) - master_group.authorize('tcp', 22, 22, '0.0.0.0/0') - master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') - master_group.authorize('tcp', 18080, 18080, '0.0.0.0/0') - master_group.authorize('tcp', 19999, 19999, '0.0.0.0/0') - master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') - master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') - master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') - master_group.authorize('tcp', 4040, 4045, '0.0.0.0/0') - master_group.authorize('tcp', 7077, 7077, '0.0.0.0/0') + if main_group.rules == []: # Group was just now created + main_group.authorize(src_group=main_group) + main_group.authorize(src_group=subordinate_group) + main_group.authorize(src_group=sparknotebook_group) + main_group.authorize('tcp', 22, 22, '0.0.0.0/0') + main_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') + main_group.authorize('tcp', 18080, 18080, '0.0.0.0/0') + main_group.authorize('tcp', 19999, 19999, '0.0.0.0/0') + main_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') + main_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') + main_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') + main_group.authorize('tcp', 4040, 4045, '0.0.0.0/0') + main_group.authorize('tcp', 7077, 7077, '0.0.0.0/0') if opts.ganglia: - master_group.authorize('tcp', 5080, 5080, '0.0.0.0/0') - if slave_group.rules == []: # Group was just now created - slave_group.authorize(src_group=master_group) - slave_group.authorize(src_group=slave_group) - slave_group.authorize(src_group=sparknotebook_group) - slave_group.authorize('tcp', 22, 22, '0.0.0.0/0') - slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') - slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0') - slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0') - slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') - slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') - - if not any(r for r in sparknotebook_group.rules for g in r.grants if master_group.id == g.group_id): - sparknotebook_group.authorize(ip_protocol="tcp", from_port="1", to_port="65535", src_group=master_group) - sparknotebook_group.authorize(ip_protocol="icmp", from_port="-1", to_port="-1", src_group=master_group) - - if not any(r for r in sparknotebook_group.rules for g in r.grants if slave_group.id == g.group_id): - sparknotebook_group.authorize(ip_protocol="tcp", from_port="1", to_port="65535", src_group=slave_group) - sparknotebook_group.authorize(ip_protocol="icmp", from_port="-1", to_port="-1", src_group=slave_group) + main_group.authorize('tcp', 5080, 5080, '0.0.0.0/0') + if subordinate_group.rules == []: # Group was just now created + subordinate_group.authorize(src_group=main_group) + subordinate_group.authorize(src_group=subordinate_group) + subordinate_group.authorize(src_group=sparknotebook_group) + subordinate_group.authorize('tcp', 22, 22, '0.0.0.0/0') + subordinate_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') + subordinate_group.authorize('tcp', 50060, 50060, '0.0.0.0/0') + subordinate_group.authorize('tcp', 50075, 50075, '0.0.0.0/0') + subordinate_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') + subordinate_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') + + if not any(r for r in sparknotebook_group.rules for g in r.grants if main_group.id == g.group_id): + sparknotebook_group.authorize(ip_protocol="tcp", from_port="1", to_port="65535", src_group=main_group) + sparknotebook_group.authorize(ip_protocol="icmp", from_port="-1", to_port="-1", src_group=main_group) + + if not any(r for r in sparknotebook_group.rules for g in r.grants if subordinate_group.id == g.group_id): + sparknotebook_group.authorize(ip_protocol="tcp", from_port="1", to_port="65535", src_group=subordinate_group) + sparknotebook_group.authorize(ip_protocol="icmp", from_port="-1", to_port="-1", src_group=subordinate_group) # Check if instances are already running in our groups - existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, + existing_mains, existing_subordinates = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) - if existing_slaves or (existing_masters and not opts.use_existing_master): + if existing_subordinates or (existing_mains and not opts.use_existing_main): print (("ERROR: There are already instances running in " + - "group %s or %s" % (master_group.name, slave_group.name)), file=sys.stderr) + "group %s or %s" % (main_group.name, subordinate_group.name)), file=sys.stderr) sys.exit(1) # Figure out Spark AMI @@ -510,7 +510,7 @@ def launch_cluster(conn, opts, cluster_name): name = '/dev/sd' + string.ascii_letters[i + 1] block_map[name] = dev - # Launch slaves + # Launch subordinates if opts.spot_price != None: zones = get_zones(conn, opts) @@ -521,26 +521,26 @@ def launch_cluster(conn, opts, cluster_name): for zone in zones: best_price = find_best_price(conn,opts.instance_type,zone, opts.spot_price) # Launch spot instances with the requested price - print(("Requesting %d slaves as spot instances with price $%.3f/hour each (total $%.3f/hour)" % - (opts.slaves, best_price, opts.slaves * best_price)), file=sys.stderr) + print(("Requesting %d subordinates as spot instances with price $%.3f/hour each (total $%.3f/hour)" % + (opts.subordinates, best_price, opts.subordinates * best_price)), file=sys.stderr) - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(subnet_id=subnetId(), groups=[slave_group.id], associate_public_ip_address=True) + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(subnet_id=subnetId(), groups=[subordinate_group.id], associate_public_ip_address=True) interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface) - slave_reqs = conn.request_spot_instances( + subordinate_reqs = conn.request_spot_instances( price = best_price, image_id = opts.ami, launch_group = "launch-group-%s" % cluster_name, placement = zone, - count = num_slaves_this_zone, + count = num_subordinates_this_zone, key_name = opts.key_pair, instance_type = opts.instance_type, block_device_map = block_map, user_data = user_data, instance_profile_arn = opts.profile, network_interfaces = interfaces) - my_req_ids += [req.id for req in slave_reqs] + my_req_ids += [req.id for req in subordinate_reqs] i += 1 print ("Waiting for spot instances to be granted", file=sys.stderr) @@ -555,24 +555,24 @@ def launch_cluster(conn, opts, cluster_name): for i in my_req_ids: if i in id_to_req and id_to_req[i].state == "active": active_instance_ids.append(id_to_req[i].instance_id) - if len(active_instance_ids) == opts.slaves: - print ("All %d slaves granted" % opts.slaves, file=sys.stderr) + if len(active_instance_ids) == opts.subordinates: + print ("All %d subordinates granted" % opts.subordinates, file=sys.stderr) reservations = conn.get_all_instances(active_instance_ids) - slave_nodes = [] + subordinate_nodes = [] for r in reservations: - slave_nodes += r.instances + subordinate_nodes += r.instances break else: # print >> stderr, ".", - print("%d of %d slaves granted, waiting longer" % ( - len(active_instance_ids), opts.slaves)) + print("%d of %d subordinates granted, waiting longer" % ( + len(active_instance_ids), opts.subordinates)) except: print("Canceling spot instance requests", file=sys.stderr) conn.cancel_spot_instance_requests(my_req_ids) # Log a warning if any of these requests actually launched instances: - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - running = len(master_nodes) + len(slave_nodes) + running = len(main_nodes) + len(subordinate_nodes) if running: print(("WARNING: %d instances are still running" % running), file=sys.stderr) sys.exit(0) @@ -581,59 +581,59 @@ def launch_cluster(conn, opts, cluster_name): zones = get_zones(conn, opts) num_zones = len(zones) i = 0 - slave_nodes = [] + subordinate_nodes = [] for zone in zones: - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - if num_slaves_this_zone > 0: - slave_res = image.run(key_name = opts.key_pair, - security_group_ids = [slave_group.id], + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + if num_subordinates_this_zone > 0: + subordinate_res = image.run(key_name = opts.key_pair, + security_group_ids = [subordinate_group.id], instance_type = opts.instance_type, subnet_id = subnetId(), placement = zone, - min_count = num_slaves_this_zone, - max_count = num_slaves_this_zone, + min_count = num_subordinates_this_zone, + max_count = num_subordinates_this_zone, block_device_map = block_map, user_data = user_data, instance_profile_arn = opts.profile) - slave_nodes += slave_res.instances - print("Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone, - zone, slave_res.id), file=sys.stderr) + subordinate_nodes += subordinate_res.instances + print("Launched %d subordinates in %s, regid = %s" % (num_subordinates_this_zone, + zone, subordinate_res.id), file=sys.stderr) i += 1 - # Launch or resume masters - if existing_masters: - print("Starting master...") - for inst in existing_masters: + # Launch or resume mains + if existing_mains: + print("Starting main...") + for inst in existing_mains: if inst.state not in ["shutting-down", "terminated"]: inst.start() - master_nodes = existing_masters + main_nodes = existing_mains else: - master_type = opts.master_instance_type - if master_type == "": - master_type = opts.instance_type + main_type = opts.main_instance_type + if main_type == "": + main_type = opts.instance_type if opts.zone == 'all': opts.zone = random.choice(conn.get_all_zones()).name if opts.spot_price != None: - best_price = find_best_price(conn,master_type,opts.zone,opts.spot_price) + best_price = find_best_price(conn,main_type,opts.zone,opts.spot_price) # Launch spot instances with the requested price - print(("Requesting master as spot instances with price $%.3f/hour" % (best_price)), file=sys.stderr) + print(("Requesting main as spot instances with price $%.3f/hour" % (best_price)), file=sys.stderr) - interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(subnet_id=subnetId(), groups=[master_group.id], associate_public_ip_address=True) + interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(subnet_id=subnetId(), groups=[main_group.id], associate_public_ip_address=True) interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface) - master_reqs = conn.request_spot_instances( + main_reqs = conn.request_spot_instances( price = best_price, image_id = opts.ami, launch_group = "launch-group-%s" % cluster_name, placement = opts.zone, count = 1, key_name = opts.key_pair, - instance_type = master_type, + instance_type = main_type, block_device_map = block_map, user_data = user_data, instance_profile_arn = opts.profile, network_interfaces = interfaces) - my_req_ids = [r.id for r in master_reqs] + my_req_ids = [r.id for r in main_reqs] print("Waiting for spot instance to be granted", file=sys.stderr) try: while True: @@ -648,30 +648,30 @@ def launch_cluster(conn, opts, cluster_name): if i in id_to_req and id_to_req[i].state == "active": active_instance_ids.append(id_to_req[i].instance_id) if len(active_instance_ids) == 1: - print ( "Master granted", file=sys.stderr) + print ( "Main granted", file=sys.stderr) reservations = conn.get_all_instances(active_instance_ids) - master_nodes = [] + main_nodes = [] for r in reservations: - master_nodes += r.instances + main_nodes += r.instances break else: # print >> stderr, ".", - print("%d of %d masters granted, waiting longer" % ( + print("%d of %d mains granted, waiting longer" % ( len(active_instance_ids), 1)) except: print("Canceling spot instance requests", file=sys.stderr) conn.cancel_spot_instance_requests(my_req_ids) # Log a warning if any of these requests actually launched instances: - (master_nodes, master_nodes) = get_existing_cluster( + (main_nodes, main_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - running = len(master_nodes) + len(master_nodes) + running = len(main_nodes) + len(main_nodes) if running: print(("WARNING: %d instances are still running" % running), file=sys.stderr) sys.exit(0) else: - master_res = image.run(key_name = opts.key_pair, - security_group_ids = [master_group.id], - instance_type = master_type, + main_res = image.run(key_name = opts.key_pair, + security_group_ids = [main_group.id], + instance_type = main_type, subnet_id = subnetId(), placement = opts.zone, min_count = 1, @@ -679,41 +679,41 @@ def launch_cluster(conn, opts, cluster_name): block_device_map = block_map, user_data = user_data, instance_profile_arn = opts.profile) - master_nodes = master_res.instances - print("Launched master in %s, regid = %s" % (zone, master_res.id), file=sys.stderr) + main_nodes = main_res.instances + print("Launched main in %s, regid = %s" % (zone, main_res.id), file=sys.stderr) # Return all the instances - return (master_nodes, slave_nodes) + return (main_nodes, subordinate_nodes) # Get the EC2 instances in an existing cluster if available. -# Returns a tuple of lists of EC2 instance objects for the masters and slaves +# Returns a tuple of lists of EC2 instance objects for the mains and subordinates def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print("Searching for existing cluster %s ..." % cluster_name, file=sys.stderr) reservations = conn.get_all_instances() - master_nodes = [] - slave_nodes = [] + main_nodes = [] + subordinate_nodes = [] for res in reservations: active = [i for i in res.instances if is_active(i)] for inst in active: group_names = [g.name for g in inst.groups] - if (cluster_name + "-master") in group_names: - master_nodes.append(inst) - elif (cluster_name + "-slaves") in group_names: - slave_nodes.append(inst) - if any((master_nodes, slave_nodes)): - print("Spark standalone cluster started at http://%s:8080" % master_nodes[0].public_dns_name) - print("Spark private ip address %s" % master_nodes[0].private_dns_name) - print("Spark standalone cluster started at http://%s:8080" % master_nodes[0].public_dns_name, file=sys.stderr) - print(("Found %d master(s), %d slaves" % - (len(master_nodes), len(slave_nodes))), file=sys.stderr) - get_master_setup_files(master_nodes[0].private_dns_name, opts) + if (cluster_name + "-main") in group_names: + main_nodes.append(inst) + elif (cluster_name + "-subordinates") in group_names: + subordinate_nodes.append(inst) + if any((main_nodes, subordinate_nodes)): + print("Spark standalone cluster started at http://%s:8080" % main_nodes[0].public_dns_name) + print("Spark private ip address %s" % main_nodes[0].private_dns_name) + print("Spark standalone cluster started at http://%s:8080" % main_nodes[0].public_dns_name, file=sys.stderr) + print(("Found %d main(s), %d subordinates" % + (len(main_nodes), len(subordinate_nodes))), file=sys.stderr) + get_main_setup_files(main_nodes[0].private_dns_name, opts) if opts.ganglia: - print("Ganglia started at http://%s:5080/ganglia" % master_nodes[0].public_dns_name, file=sys.stderr) - if master_nodes != [] or not die_on_error: - return (master_nodes, slave_nodes) + print("Ganglia started at http://%s:5080/ganglia" % main_nodes[0].public_dns_name, file=sys.stderr) + if main_nodes != [] or not die_on_error: + return (main_nodes, subordinate_nodes) else: - if master_nodes == [] and slave_nodes != []: - print("ERROR: Could not find master in group %s-master" %cluster_name) + if main_nodes == [] and subordinate_nodes != []: + print("ERROR: Could not find main in group %s-main" %cluster_name) else: print("ERROR: Could not find any existing cluster") sys.exit(1) @@ -721,24 +721,24 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. -def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): +def setup_cluster(conn, main_nodes, subordinate_nodes, opts, deploy_ssh_key): - master_nodes[0].update() - master = master_nodes[0] - print ("Spark private ip address %s" % master.private_dns_name) + main_nodes[0].update() + main = main_nodes[0] + print ("Spark private ip address %s" % main.private_dns_name) if deploy_ssh_key: - print("Generating cluster's SSH key on master...") + print("Generating cluster's SSH key on main...") key_setup = """ [ -f ~/.ssh/id_rsa ] || (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa && cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) """ - ssh(master.private_dns_name, opts, key_setup) - dot_ssh_tar = ssh_read(master.private_dns_name, opts, ['tar', 'c', '.ssh']) - print("Transferring cluster's SSH key to slaves...", file=sys.stderr) - for slave in slave_nodes: - slave.update() - ssh_write(slave.private_dns_name, opts, ['tar', 'x'], dot_ssh_tar) + ssh(main.private_dns_name, opts, key_setup) + dot_ssh_tar = ssh_read(main.private_dns_name, opts, ['tar', 'c', '.ssh']) + print("Transferring cluster's SSH key to subordinates...", file=sys.stderr) + for subordinate in subordinate_nodes: + subordinate.update() + ssh_write(subordinate.private_dns_name, opts, ['tar', 'x'], dot_ssh_tar) modules = ['mysql', 'spark', 'ephemeral-hdfs', 'persistent-hdfs', @@ -753,7 +753,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): # NOTE: We should clone the repository before running deploy_files to # prevent ec2-variables.sh from being overwritten ssh( - host=master.private_dns_name, + host=main.private_dns_name, opts=opts, command="rm -rf spark-ec2" + " && " @@ -761,36 +761,36 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): ) - print("Deploying files to master... ", file=sys.stderr) + print("Deploying files to main... ", file=sys.stderr) (path, name) = os.path.split(__file__) - deploy_files(conn, path+"/deploy.generic", opts, master_nodes, slave_nodes, modules) + deploy_files(conn, path+"/deploy.generic", opts, main_nodes, subordinate_nodes, modules) - print("Running setup on master... ", file=sys.stderr) - setup_spark_cluster(master, opts) - get_master_setup_files(master.private_dns_name, opts) + print("Running setup on main... ", file=sys.stderr) + setup_spark_cluster(main, opts) + get_main_setup_files(main.private_dns_name, opts) print( stderr,"Done!", file=sys.stderr) -def get_master_setup_files(master, opts): - scp(master, opts, "/root/spark/jars/datanucleus*.jar", "%s/lib" % SPARK_EC2_DIR) - scp(master, opts, "/root/spark/conf/*", "%s/conf" % SPARK_EC2_DIR) +def get_main_setup_files(main, opts): + scp(main, opts, "/root/spark/jars/datanucleus*.jar", "%s/lib" % SPARK_EC2_DIR) + scp(main, opts, "/root/spark/conf/*", "%s/conf" % SPARK_EC2_DIR) -def setup_spark_cluster(master, opts): - ssh(master.private_dns_name, opts, "chmod u+x spark-ec2/setup.sh") - ssh(master.private_dns_name, opts, "spark-ec2/setup.sh") - master.update() - print("Spark standalone cluster started at http://%s:8080" % master.public_dns_name) - print("Spark standalone cluster started at http://%s:8080" % master.public_dns_name, file=sys.stderr) +def setup_spark_cluster(main, opts): + ssh(main.private_dns_name, opts, "chmod u+x spark-ec2/setup.sh") + ssh(main.private_dns_name, opts, "spark-ec2/setup.sh") + main.update() + print("Spark standalone cluster started at http://%s:8080" % main.public_dns_name) + print("Spark standalone cluster started at http://%s:8080" % main.public_dns_name, file=sys.stderr) if opts.ganglia: - print("Ganglia started at http://%s:5080/ganglia" % master.public_dns_name, file=sys.stderr) + print("Ganglia started at http://%s:5080/ganglia" % main.public_dns_name, file=sys.stderr) -# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up -def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes): +# Wait for a whole cluster (mains, subordinates and ZooKeeper) to start up +def wait_for_cluster(conn, wait_secs, main_nodes, subordinate_nodes): print("Waiting for instances to start up...", file=sys.stderr) time.sleep(5) - wait_for_instances(conn, master_nodes) - wait_for_instances(conn, slave_nodes) + wait_for_instances(conn, main_nodes) + wait_for_instances(conn, subordinate_nodes) def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): @@ -956,11 +956,11 @@ def get_num_disks(instance_type): # Deploy the configuration file templates in a given local directory to # a cluster, filling in any template parameters with information about the -# cluster (e.g. lists of masters and slaves). Files are only deployed to -# the first master instance in the cluster, and we expect the setup +# cluster (e.g. lists of mains and subordinates). Files are only deployed to +# the first main instance in the cluster, and we expect the setup # script to be run on that instance to copy them to other nodes. -def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): - active_master = master_nodes[0].private_dns_name +def deploy_files(conn, root_dir, opts, main_nodes, subordinate_nodes, modules): + active_main = main_nodes[0].private_dns_name num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = "/mnt/ephemeral-hdfs/data" @@ -972,7 +972,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i spark_local_dirs += ",/mnt%d/spark" % i - cluster_url = "%s:7077" % active_master + cluster_url = "%s:7077" % active_main if "." in opts.spark_version: # Pre-built Spark deploy @@ -987,9 +987,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): worker_instances_str = "%d" % opts.worker_instances if opts.worker_instances else "" template_vars = { - "master_list": '\n'.join([i.public_dns_name for i in master_nodes]), - "active_master": active_master, - "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]), + "main_list": '\n'.join([i.public_dns_name for i in main_nodes]), + "active_main": active_main, + "subordinate_list": '\n'.join([i.public_dns_name for i in subordinate_nodes]), "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, "mapred_local_dirs": mapred_local_dirs, @@ -1001,7 +1001,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): "metastore_user": "hive", "metastore_passwd": ''.join(random.SystemRandom().choice(string.uppercase + string.digits) for _ in xrange(10)), "spark_worker_instances": worker_instances_str, - "spark_master_opts": opts.master_opts + "spark_main_opts": opts.main_opts } # Create a temp directory in which we will place all the files to be @@ -1025,12 +1025,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): text = text.replace("{{" + key + "}}", template_vars[key]) dest.write(text) dest.close() - # rsync the whole directory over to the master machine + # rsync the whole directory over to the main machine command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), "%s/" % tmp_dir, - "%s@%s:/" % (opts.user, active_master) + "%s@%s:/" % (opts.user, active_main) ] subprocess.check_call(command) # Remove the temp directory we created above @@ -1169,10 +1169,10 @@ def get_zones(conn, opts): # Gets the number of items in a partition def get_partition(total, num_partitions, current_partitions): - num_slaves_this_zone = total // num_partitions + num_subordinates_this_zone = total // num_partitions if (total % num_partitions) - current_partitions > 0: - num_slaves_this_zone += 1 - return num_slaves_this_zone + num_subordinates_this_zone += 1 + return num_subordinates_this_zone def real_main(): @@ -1189,40 +1189,40 @@ def real_main(): opts.zone = random.choice(conn.get_all_zones()).name if action == "launch": - if opts.slaves <= 0: - print("ERROR: You have to start at least 1 slave", file=sys.stderr) + if opts.subordinates <= 0: + print("ERROR: You have to start at least 1 subordinate", file=sys.stderr) sys.exit(1) if opts.resume: - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name) else: start_secs = time.time() - (master_nodes, slave_nodes) = launch_cluster( + (main_nodes, subordinate_nodes) = launch_cluster( conn, opts, cluster_name) - wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) + wait_for_cluster(conn, opts.wait, main_nodes, subordinate_nodes) print("Provisioning took %.3f minutes" % ((time.time() - start_secs) / 60.0), file=sys.stderr) start_secs = time.time() - setup_cluster(conn, master_nodes, slave_nodes, opts, True) + setup_cluster(conn, main_nodes, subordinate_nodes, opts, True) print("Setup took %.3f minutes" % ((time.time() - start_secs)/60.0), file=sys.stderr) elif action == "destroy": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print("Terminating master...", file=sys.stderr) - for inst in master_nodes: + print("Terminating main...", file=sys.stderr) + for inst in main_nodes: inst.terminate() - print("Terminating slaves...", file=sys.stderr) - for inst in slave_nodes: + print("Terminating subordinates...", file=sys.stderr) + for inst in subordinate_nodes: inst.terminate() # Delete security groups as well if opts.delete_groups: print("Deleting security groups (this will take some time)...", file=sys.stderr) - group_names = [cluster_name + "-master", cluster_name + "-slaves"] + group_names = [cluster_name + "-main", cluster_name + "-subordinates"] wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='terminated' ) attempt = 1; @@ -1263,36 +1263,36 @@ def real_main(): print ("Try re-running in a few minutes.", file=sys.stderr) elif action == "login": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name) - master = master_nodes[0].public_dns_name - print("Logging into master " + master + "...") + main = main_nodes[0].public_dns_name + print("Logging into main " + main + "...") proxy_opt = [] if opts.proxy_port != None: proxy_opt = ['-D', opts.proxy_port] subprocess.check_call( - ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)]) + ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, main)]) - elif action == "get-master": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print(master_nodes[0].public_dns_name) + elif action == "get-main": + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + print(main_nodes[0].public_dns_name) elif action == "stop": response = raw_input("Are you sure you want to stop the cluster " + cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + - "All data on spot-instance slaves will be lost.\n" + + "All data on spot-instance subordinates will be lost.\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print("Stopping master...", file=sys.stderr) - for inst in master_nodes: + print("Stopping main...", file=sys.stderr) + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.stop() - print("Stopping slaves...", file=sys.stderr) - for inst in slave_nodes: + print("Stopping subordinates...", file=sys.stderr) + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: inst.terminate() @@ -1300,17 +1300,17 @@ def real_main(): inst.stop() elif action == "start": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print("Starting slaves...", file=sys.stderr) - for inst in slave_nodes: + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + print("Starting subordinates...", file=sys.stderr) + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() - print("Starting master...", file=sys.stderr) - for inst in master_nodes: + print("Starting main...", file=sys.stderr) + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() - wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) - setup_cluster(conn, master_nodes, slave_nodes, opts, False) + wait_for_cluster(conn, opts.wait, main_nodes, subordinate_nodes) + setup_cluster(conn, main_nodes, subordinate_nodes, opts, False) else: print("Invalid action: %s" % action, file=sys.stderr)