4
4
from pandaharvester .harvestercore import core_utils
5
5
from pandaharvester .harvestercore .work_spec import WorkSpec
6
6
from pandaharvester .harvestercore .plugin_base import PluginBase
7
+ from pandaharvester .harvestercore .worker_errors import WorkerErrors
7
8
from pandaharvester .harvesterconfig import harvester_config
8
9
9
10
from act .common .aCTConfig import aCTConfigARC
@@ -24,7 +25,11 @@ def __init__(self, **kwarg):
24
25
25
26
# Set up aCT DB connection
26
27
self .log = core_utils .make_logger (baseLogger , 'aCT submitter' , method_name = '__init__' )
27
- self .actDB = aCTDBPanda (self .log )
28
+ try :
29
+ self .actDB = aCTDBPanda (self .log )
30
+ except Exception as e :
31
+ self .log .error ('Could not connect to aCT database: {0}' .format (str (e )))
32
+ self .actDB = None
28
33
29
34
# get access point
30
35
def get_access_point (self , workspec , panda_id ):
@@ -68,10 +73,11 @@ def check_workers(self, workspec_list):
68
73
method_name = 'check_workers' )
69
74
try :
70
75
tmpLog .debug ('Querying aCT for id {0}' .format (workSpec .batchID ))
71
- columns = ['actpandastatus' , 'pandastatus' , 'computingElement' , 'node' ]
76
+ columns = ['actpandastatus' , 'pandastatus' , 'computingElement' , 'node' , 'error' ]
72
77
actjobs = self .actDB .getJobs ("id={0}" .format (workSpec .batchID ), columns )
73
78
except Exception as e :
74
- tmpLog .error ("Failed to query aCT DB: {0}" .format (str (e )))
79
+ if self .actDB :
80
+ tmpLog .error ("Failed to query aCT DB: {0}" .format (str (e )))
75
81
# send back current status
76
82
retList .append ((workSpec .status , '' ))
77
83
continue
@@ -85,12 +91,16 @@ def check_workers(self, workspec_list):
85
91
actstatus = actjobs [0 ]['actpandastatus' ]
86
92
workSpec .nativeStatus = actstatus
87
93
newStatus = WorkSpec .ST_running
94
+ errorMsg = ''
88
95
if actstatus in ['waiting' , 'sent' , 'starting' ]:
89
96
newStatus = WorkSpec .ST_submitted
90
97
elif actstatus == 'done' :
91
98
newStatus = self .check_pilot_status (workSpec , tmpLog )
92
99
elif actstatus == 'donefailed' :
93
100
newStatus = WorkSpec .ST_failed
101
+ errorMsg = actjobs [0 ]['error' ] or 'Unknown error'
102
+ error_code = WorkerErrors .error_codes .get ('GENERAL_ERROR' )
103
+ workSpec .set_supplemental_error (error_code = error_code , error_diag = errorMsg )
94
104
elif actstatus == 'donecancelled' :
95
105
newStatus = WorkSpec .ST_cancelled
96
106
@@ -108,6 +118,6 @@ def check_workers(self, workspec_list):
108
118
except :
109
119
tmpLog .warning ('Could not extract panda ID for worker {0}' .format (workSpec .batchID ))
110
120
111
- retList .append ((newStatus , '' ))
121
+ retList .append ((newStatus , errorMsg ))
112
122
113
123
return True , retList
0 commit comments