Skip to content

Commit d2f88af

Browse files
committed
v0.1.5
2 parents 5a000d3 + 743becb commit d2f88af

23 files changed

+523
-135
lines changed

pandaharvester/commit_timestamp.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
timestamp = "10-09-2019 13:14:16 on release (by fahui)"
1+
timestamp = "04-10-2019 11:57:38 on release (by fahui)"

pandaharvester/harvesterbody/monitor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
694694
else:
695695
newStatus = WorkSpec.ST_idle
696696
elif not workSpec.is_post_processed():
697-
if not queue_config.is_no_heartbeat_status(newStatus):
697+
if not queue_config.is_no_heartbeat_status(newStatus) and not queue_config.truePilot:
698698
# post processing unless heartbeat is suppressed
699699
jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID,
700700
None, True,

pandaharvester/harvesterbody/submitter.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def run(self):
5757
# get commands
5858
comStr = '{0}:{1}'.format(CommandSpec.COM_setNWorkers, siteName)
5959
commandSpecs = self.dbProxy.get_commands_for_receiver('submitter', comStr)
60-
mainLog.debug('got {0} {1} commands'.format(commandSpecs, comStr))
60+
mainLog.debug('got {0} {1} commands'.format(len(commandSpecs), comStr))
6161
for commandSpec in commandSpecs:
6262
newLimits = self.dbProxy.set_queue_limit(siteName, commandSpec.params)
6363
for tmpResource, tmpNewVal in iteritems(newLimits):

pandaharvester/harvesterbody/sweeper.py

+11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
55
from pandaharvester.harvestercore.plugin_factory import PluginFactory
66
from pandaharvester.harvesterbody.agent_base import AgentBase
7+
from pandaharvester.harvestercore.command_spec import CommandSpec
78

89
# logger
910
_logger = core_utils.setup_logger('sweeper')
@@ -25,6 +26,16 @@ def run(self):
2526
while True:
2627
sw_main = core_utils.get_stopwatch()
2728
mainLog = self.make_logger(_logger, 'id={0}'.format(lockedBy), method_name='run')
29+
# get commands to kill
30+
sw_getcomm = core_utils.get_stopwatch()
31+
mainLog.debug('try to get commands')
32+
comStr = CommandSpec.COM_killWorkers
33+
commandSpecs = self.dbProxy.get_commands_for_receiver('sweeper', comStr)
34+
mainLog.debug('got {0} {1} commands'.format(len(commandSpecs), comStr))
35+
for commandSpec in commandSpecs:
36+
n_to_kill = self.dbProxy.kill_workers_by_query(commandSpec.params)
37+
mainLog.debug('will kill {0} workers with {1}'.format(n_to_kill, commandSpec.params))
38+
mainLog.debug('done handling commands' + sw_getcomm.get_elapsed_time())
2839
# killing stage
2940
sw_kill = core_utils.get_stopwatch()
3041
mainLog.debug('try to get workers to kill')

pandaharvester/harvestercore/command_spec.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ class CommandSpec(SpecBase):
1717
# commands
1818
COM_reportWorkerStats = 'REPORT_WORKER_STATS'
1919
COM_setNWorkers = 'SET_N_WORKERS'
20+
COM_killWorkers = 'KILL_WORKERS'
2021
# mapping between command and receiver
2122
receiver_map = {
2223
COM_reportWorkerStats: 'propagator',
23-
COM_setNWorkers: 'submitter'
24+
COM_setNWorkers: 'submitter',
25+
COM_killWorkers: 'sweeper',
2426
}
2527

2628
# constructor

pandaharvester/harvestercore/core_utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ def get_output_file_report(jobspec):
277277
chksum = fileSpec.chksum.split(':')[-1]
278278
else:
279279
chksum = fileSpec.chksum
280-
xml += """"<File ID="{guid}">
280+
xml += """<File ID="{guid}">
281281
<logical>
282282
<lfn name="{lfn}"/>
283283
</logical>

pandaharvester/harvestercore/db_proxy.py

+63-2
Original file line numberDiff line numberDiff line change
@@ -3032,7 +3032,7 @@ def refresh_cache(self, main_key, sub_key, new_info):
30323032
return False
30333033

30343034
# get a cached info
3035-
def get_cache(self, main_key, sub_key=None):
3035+
def get_cache(self, main_key, sub_key=None, from_local_cache=True):
30363036
useDB = False
30373037
try:
30383038
# get logger
@@ -3045,7 +3045,7 @@ def get_cache(self, main_key, sub_key=None):
30453045
# lock dict
30463046
globalDict.acquire()
30473047
# found
3048-
if cacheKey in globalDict:
3048+
if from_local_cache and cacheKey in globalDict:
30493049
# release dict
30503050
globalDict.release()
30513051
# make spec
@@ -5370,3 +5370,64 @@ def get_workers_from_ids(self, ids):
53705370
core_utils.dump_error_message(_logger)
53715371
# return
53725372
return {}
5373+
5374+
# send kill command to workers by query
5375+
def kill_workers_by_query(self, params):
5376+
try:
5377+
# get logger
5378+
tmpLog = core_utils.make_logger(_logger, method_name='kill_workers_by_query')
5379+
tmpLog.debug('start')
5380+
# sql to set killTime
5381+
sqlL = "UPDATE {0} SET killTime=:setTime ".format(workTableName)
5382+
sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) "
5383+
# sql to get workers
5384+
constraints_query_string_list = []
5385+
tmp_varMap = {}
5386+
constraint_map = {'status': params.get('status', [WorkSpec.ST_submitted]),
5387+
'computingSite': params.get('computingSite', []),
5388+
'computingElement': params.get('computingElement', []),
5389+
'submissionHost': params.get('submissionHost', [])}
5390+
tmpLog.debug('query {0}'.format(constraint_map))
5391+
for attribute, match_list in iteritems(constraint_map):
5392+
if match_list == 'ALL':
5393+
pass
5394+
elif not match_list:
5395+
tmpLog.debug('{0} constraint is not specified in the query. Skipped'.format(attribute))
5396+
return 0
5397+
else:
5398+
one_param_list = [ ':param_{0}_{1}'.format(attribute, v_i) for v_i in range(len(match_list)) ]
5399+
tmp_varMap.update(zip(one_param_list, match_list))
5400+
params_string = '(' + ','.join(one_param_list) + ')'
5401+
constraints_query_string_list.append('{0} IN {1}'.format(attribute, params_string))
5402+
constranits_query_string = ' AND '.join(constraints_query_string_list)
5403+
sqlW = "SELECT workerID FROM {0} ".format(workTableName)
5404+
sqlW += "WHERE {0} ".format(constranits_query_string)
5405+
# set an older time to trigger sweeper
5406+
setTime = datetime.datetime.utcnow() - datetime.timedelta(hours=6)
5407+
# get workers
5408+
varMap = dict()
5409+
varMap.update(tmp_varMap)
5410+
self.execute(sqlW, varMap)
5411+
resW = self.cur.fetchall()
5412+
nRow = 0
5413+
for workerID, in resW:
5414+
# set killTime
5415+
varMap = dict()
5416+
varMap[':workerID'] = workerID
5417+
varMap[':setTime'] = setTime
5418+
varMap[':st1'] = WorkSpec.ST_finished
5419+
varMap[':st2'] = WorkSpec.ST_failed
5420+
varMap[':st3'] = WorkSpec.ST_cancelled
5421+
self.execute(sqlL, varMap)
5422+
nRow += self.cur.rowcount
5423+
# commit
5424+
self.commit()
5425+
tmpLog.debug('set killTime to {0} workers'.format(nRow))
5426+
return nRow
5427+
except Exception:
5428+
# roll back
5429+
self.rollback()
5430+
# dump error
5431+
core_utils.dump_error_message(_logger)
5432+
# return
5433+
return None

pandaharvester/harvestercore/fifos.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
_attribute_list = ['id', 'item', 'score']
2828

2929
# fifo object spec
30-
FifoObject = collections.namedtuple('FifoObject', _attribute_list, verbose=False, rename=False)
30+
FifoObject = collections.namedtuple('FifoObject', _attribute_list, rename=False)
3131

3232
# logger
3333
_logger = core_utils.setup_logger('fifos')

pandaharvester/harvestercore/queue_config_mapper.py

+34-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import json
33
import copy
4+
import time
45
import datetime
56
import threading
67
import importlib
@@ -178,6 +179,7 @@ def __init__(self, update_db=True):
178179
self.configFromCacher = harvester_config.qconf.configFromCacher
179180
except AttributeError:
180181
self.configFromCacher = False
182+
self.updateInterval = 600
181183

182184
# load config from DB cache of URL with validation
183185
def _load_config_from_cache(self):
@@ -242,15 +244,43 @@ def _get_resolver():
242244
resolver = None
243245
return resolver
244246

247+
# update last reload time
248+
def _update_last_reload_time(self):
249+
new_info = '{0:.3f}'.format(time.time())
250+
return self.dbProxy.refresh_cache('_qconf_last_reload', '_universal', new_info)
251+
252+
# get last reload time
253+
def _get_last_reload_time(self):
254+
cacheSpec = self.dbProxy.get_cache('_qconf_last_reload', '_universal', from_local_cache=False)
255+
if cacheSpec is None:
256+
return None
257+
timestamp = float(cacheSpec.data)
258+
return timestamp
259+
245260
# load data
246261
def load_data(self):
247262
mainLog = _make_logger(method_name='QueueConfigMapper.load_data')
248-
# check interval
249-
timeNow = datetime.datetime.utcnow()
250-
if self.lastUpdate is not None and timeNow - self.lastUpdate < datetime.timedelta(minutes=10):
251-
return
263+
with self.lock:
264+
# check if to update
265+
timeNow_timestamp = time.time()
266+
if self.lastUpdate is not None:
267+
last_reload_timestamp = self._get_last_reload_time()
268+
if (last_reload_timestamp is not None and self.lastUpdate is not None
269+
and datetime.datetime.utcfromtimestamp(last_reload_timestamp) < self.lastUpdate
270+
and timeNow_timestamp - last_reload_timestamp < self.updateInterval):
271+
return
252272
# start
253273
with self.lock:
274+
# update timesatmp of last reload, lock with check interval
275+
got_timesatmp_update_lock = self.dbProxy.get_process_lock('qconf_reload', 'qconf_universal', self.updateInterval)
276+
if got_timesatmp_update_lock:
277+
retVal = self._update_last_reload_time()
278+
if retVal:
279+
mainLog.debug('updated last reload timestamp')
280+
else:
281+
mainLog.warning('failed to update last reload timestamp. Skipped')
282+
else:
283+
mainLog.debug('did not get qconf_reload timestamp lock. Skipped to update last reload timestamp')
254284
# init
255285
newQueueConfig = dict()
256286
localTemplatesDict = dict()

pandaharvester/harvestermiddleware/direct_ssh_herder.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def __init__(self, **kwarg):
119119
self.sockDir = getattr(self, 'sockDir', '/tmp')
120120
self.numMasters = getattr(self, 'numMasters', 1)
121121
self.execStr = getattr(self, 'execStr', '')
122+
self.connectionLifetime = getattr(self, 'connectionLifetime', None)
122123
try:
123124
self._get_connection()
124125
except Exception as e:
@@ -151,7 +152,8 @@ def _get_connection(self):
151152
sshMasterPool.make_control_master(self.remoteHost, self.remotePort, self.numMasters,
152153
ssh_username=self.sshUserName, ssh_password=self.sshPassword,
153154
private_key=self.privateKey, pass_phrase=self.passPhrase,
154-
jump_host=self.jumpHost, jump_port=self.jumpPort, sock_dir=self.sockDir)
155+
jump_host=self.jumpHost, jump_port=self.jumpPort, sock_dir=self.sockDir,
156+
connection_lifetime=self.connectionLifetime)
155157
conn = sshMasterPool.get_connection(self.remoteHost, self.remotePort, self.execStr)
156158
if conn is not None:
157159
tmpLog.debug('connected successfully')

pandaharvester/harvestermiddleware/ssh_master_pool.py

+19-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import threading
33
import uuid
44
import os
5+
import time
56

67
import six
78
import pexpect
@@ -39,7 +40,7 @@ def make_dict_key(self, host, port):
3940
def make_control_master(self, remote_host, remote_port, num_masters=1,
4041
ssh_username=None, ssh_password=None, private_key=None, pass_phrase=None,
4142
jump_host=None, jump_port=None, login_timeout=60, reconnect=False,
42-
with_lock=True, sock_dir=None):
43+
with_lock=True, sock_dir=None, connection_lifetime=None):
4344
dict_key = self.make_dict_key(remote_host, remote_port)
4445
if with_lock:
4546
self.lock.acquire()
@@ -56,7 +57,8 @@ def make_control_master(self, remote_host, remote_port, num_masters=1,
5657
'jump_host': jump_host,
5758
'jump_port': jump_port,
5859
'login_timeout': login_timeout,
59-
'sock_dir': sock_dir
60+
'sock_dir': sock_dir,
61+
'connection_lifetime': connection_lifetime,
6062
}
6163
else:
6264
num_masters = self.params[dict_key]['num_masters']
@@ -68,6 +70,7 @@ def make_control_master(self, remote_host, remote_port, num_masters=1,
6870
jump_port = self.params[dict_key]['jump_port']
6971
login_timeout = self.params[dict_key]['login_timeout']
7072
sock_dir = self.params[dict_key]['sock_dir']
73+
connection_lifetime = self.params[dict_key]['connection_lifetime']
7174
# make a master
7275
for i in range(num_masters - len(self.pool[dict_key])):
7376
# make a socket file
@@ -94,6 +97,7 @@ def make_control_master(self, remote_host, remote_port, num_masters=1,
9497
loginString,
9598
]
9699
c = pexpect_spawn(com, echo=False)
100+
baseLogger.debug('pexpect_spawn')
97101
c.logfile_read = baseLogger.handlers[0].stream
98102
isOK = False
99103
for iTry in range(3):
@@ -132,27 +136,35 @@ def make_control_master(self, remote_host, remote_port, num_masters=1,
132136
# exec to confirm login
133137
c.sendline('echo {0}'.format(loginString))
134138
if isOK:
135-
self.pool[dict_key].append((sock_file, c))
139+
conn_exp_time = (time.time() + connection_lifetime) if connection_lifetime is not None else None
140+
self.pool[dict_key].append((sock_file, c, conn_exp_time))
136141
if with_lock:
137142
self.lock.release()
138143

139144
# get a connection
140145
def get_connection(self, remote_host, remote_port, exec_string):
146+
baseLogger.debug('get_connection start')
141147
dict_key = self.make_dict_key(remote_host, remote_port)
142148
self.lock.acquire()
143149
active_masters = []
144150
someClosed = False
145-
for sock_file, child in self.pool[dict_key]:
146-
if child.isalive():
147-
active_masters.append((sock_file, child))
151+
for sock_file, child, conn_exp_time in list(self.pool[dict_key]):
152+
if child.isalive() and time.time() <= conn_exp_time:
153+
active_masters.append((sock_file, child, conn_exp_time))
148154
else:
149155
child.close()
156+
self.pool[dict_key].remove((sock_file, child, conn_exp_time))
150157
someClosed = True
158+
if child.isalive():
159+
baseLogger.debug('a connection process is dead')
160+
else:
161+
baseLogger.debug('a connection is expired')
151162
if someClosed:
152163
self.make_control_master(remote_host, remote_port, reconnect=True, with_lock=False)
153164
active_masters = [item for item in self.pool[dict_key] if os.path.exists(item[0])]
165+
baseLogger.debug('reconnected; now {0} active connections'.format(len(active_masters)))
154166
if len(active_masters) > 0:
155-
sock_file, child = random.choice(active_masters)
167+
sock_file, child, conn_exp_time = random.choice(active_masters)
156168
con = subprocess.Popen(['ssh', 'dummy', '-S', sock_file, exec_string],
157169
shell=False,
158170
stdin=subprocess.PIPE,

pandaharvester/harvestermisc/htcondor_utils.py

+22-5
Original file line numberDiff line numberDiff line change
@@ -246,18 +246,33 @@ def wrapper(self, *args, **kwargs):
246246
# Make logger
247247
tmpLog = core_utils.make_logger(baseLogger, 'submissionHost={0}'.format(self.submissionHost), method_name='CondorClient.renew_session_if_error')
248248
func_name = func.__name__
249+
try:
250+
self.schedd
251+
except AttributeError:
252+
if self.lock.acquire(False):
253+
is_renewed = self.renew_session()
254+
self.lock.release()
255+
if not is_renewed:
256+
errStr = 'failed to communicate with {0}'.format(self.submissionHost)
257+
tmpLog.error(errStr)
258+
tmpLog.debug('got RuntimeError: {0}'.format(e))
259+
raise Exception(errStr)
249260
try:
250261
ret = func(self, *args, **kwargs)
251262
except RuntimeError as e:
252263
tmpLog.debug('got RuntimeError: {0}'.format(e))
253264
if self.lock.acquire(False):
254-
self.renew_session()
265+
is_renewed = self.renew_session()
255266
self.lock.release()
256-
if to_retry:
257-
tmpLog.debug('condor session renewed. Retrying {0}'.format(func_name))
258-
ret = func(self, *args, **kwargs)
267+
if is_renewed:
268+
if to_retry:
269+
tmpLog.debug('condor session renewed. Retrying {0}'.format(func_name))
270+
ret = func(self, *args, **kwargs)
271+
else:
272+
tmpLog.debug('condor session renewed')
273+
raise
259274
else:
260-
tmpLog.debug('condor session renewed')
275+
tmpLog.error('failed to renew condor session')
261276
raise
262277
else:
263278
tmpLog.debug('another thread is renewing condor session; skipped...')
@@ -324,11 +339,13 @@ def renew_session(self, retry=3, init=False):
324339
tmpLog.warning('Failed. Retry...')
325340
else:
326341
tmpLog.warning('Retry {0} times. Still failed. Skipped'.format(i_try))
342+
return False
327343
i_try += 1
328344
self.secman.invalidateAllSessions()
329345
time.sleep(3)
330346
# Sleep
331347
time.sleep(3)
348+
return True
332349

333350

334351
# Condor job query

0 commit comments

Comments
 (0)