提前感谢您的任何帮助....我被卡住了! 我正在尝试编写一个脚本,该脚本将启动许多进程,每个进程执行以下操作:
1. 启动 AWS EC2 实例 2. 在该实例上执行脚本(脚本位于 AMI 上(
3. 脚本完成后
终止实例
使用单独的 VM 确实是必要的,对于本地其他类型的并行性来说,这不是一个好案例。
下面的脚本有效,但池工作线程停止响应。 我完成了target_cos列表的一部分,而游泳池工作人员什么都不做。 他们所在的进程仍在运行,并且 VM 在 AWS 上仍处于活动状态,但没有任何反应,事情似乎已挂起。
此外,当我使用调试器/IDE 和单个池辅助角色单步执行它时,它可以正常工作。 当我在 mp 池中与许多工作线程一起运行它时,它们在 ~2 次迭代后挂起。
import postgresql
import boto3
import paramiko
import time
import os
from multiprocessing import Pool
def spin_up(target_co):
#What would happen with paramiko import here?
ec2 = boto3.resource('ec2', region_name="us-west-2")
instances = ec2.create_instances(ImageId='AMI-HERE',
MinCount=1,
MaxCount=1,
KeyName="SECRET_KEY",
InstanceType="t2.micro",
Placement={
'AvailabilityZone': 'us-west-2c'
},
SecurityGroups=[
'Internal'
]
)
i = instances[0]
print('WAITING FOR INSTANCE AVAILABILITY....')
i.wait_until_running()
print('OK.')
i.load()
k = paramiko.RSAKey.from_private_key_file('KEY_FILE_PATH.pem')
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
success_flag = False
attempt_counter = 0
while success_flag == False and attempt_counter < 20:
try:
c.connect(hostname=i.public_dns_name, username="ubuntu", pkey=k)
success_flag = True
except:
print('SSH Error.....Retrying.')
attempt_counter += 1
time.sleep(5)
cmd = "python3 PATH_TO_EXECUTABLE_ON_AMI.py --target {} > LOG_FILE_ON_VM".format(target_co)
transport = c.get_transport()
channel = transport.open_session()
channel.exec_command(cmd)
while(channel.exit_status < 0):
print(str(os.getpid()) + ' sleeping...')
time.sleep(60)
print('Terminating Instance....')
i.terminate()
print('Exiting....')
if __name__ == '__main__':
target_cos = ['539',
'542',
'528',
'48',
'536',
'26',
'7',
'20572',
'10',
'20',
'101',
'10023']
# PARALLEL
with Pool(processes=2) as pool:
pool.map_async(spin_up, iter(target_cos))
#are these actually required?
pool.close()
pool.join()
您不需要打开频道或睡觉。只是做client.exec_command()
.它将阻止直到完成。
我像这样运行你的代码:
import paramiko
import time
from multiprocessing import Pool
def spin_up(target_co):
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
success_flag = False
attempt_counter = 0
while success_flag == False and attempt_counter < 20:
try:
c.connect(hostname='localhost', username="user", password='pass')
success_flag = True
except:
print('SSH Error.....Retrying.')
attempt_counter += 1
time.sleep(5)
cmd = "hostname"
stdin, stdout, stderr = c.exec_command(cmd)
print(stdout.read())
print(stderr.read())
print(stdout.channel.recv_exit_status())
if __name__ == '__main__':
target_cos = ['539',
'542',
'528',
'48',
'536',
'26',
'7',
'20572',
'10',
'20',
'101',
'10023']
with Pool(processes=2) as pool:
pool.map(spin_up, iter(target_cos))
我没有亚马逊的东西,所以我无法测试。您可以使用 map 而不是 map_async,因为在启动进程后无需执行任何操作。您可以阻止直到完成。
如果仍有问题,请尝试将泳池更改为 TreadPool。同时分叉和踩踏可能会导致问题。如果之后您仍然遇到问题,您可能需要查看 AsyncSSH。它将让你使用Asyncio来做到这一点,这是Python很酷的新Async库。
忘了提:您正在分叉和线程,因为多处理使用分叉,而 Paramiko 使用线程。