12
12
import com .google .common .util .concurrent .ListeningExecutorService ;
13
13
import com .google .common .util .concurrent .RateLimiter ;
14
14
15
+ import com .instaclustr .esop .impl .hash .HashService ;
16
+ import com .instaclustr .esop .impl .hash .HashServiceImpl ;
15
17
import org .slf4j .Logger ;
16
18
import org .slf4j .LoggerFactory ;
17
19
@@ -45,17 +47,17 @@ public class UploadTracker extends AbstractTracker<UploadUnit, UploadSession, Ba
45
47
@ Inject
46
48
public UploadTracker (final @ UploadingFinisher ListeningExecutorService finisherExecutorService ,
47
49
final OperationsService operationsService ,
48
- final HashSpec hashSpec ) {
49
- super (finisherExecutorService , operationsService , hashSpec );
50
+ final HashService hashService ) {
51
+ super (finisherExecutorService , operationsService , hashService );
50
52
}
51
53
52
54
@ Override
53
55
public UploadUnit constructUnitToSubmit (final Backuper backuper ,
54
56
final ManifestEntry manifestEntry ,
55
57
final AtomicBoolean shouldCancel ,
56
58
final String snapshotTag ,
57
- final HashSpec hashSpec ) {
58
- return new UploadUnit (backuper , manifestEntry , shouldCancel , snapshotTag , hashSpec );
59
+ final HashService hashService ) {
60
+ return new UploadUnit (backuper , manifestEntry , shouldCancel , snapshotTag , hashService );
59
61
}
60
62
61
63
@ Override
@@ -96,8 +98,8 @@ public UploadUnit(final Backuper backuper,
96
98
final ManifestEntry manifestEntry ,
97
99
final AtomicBoolean shouldCancel ,
98
100
final String snapshotTag ,
99
- final HashSpec hashSpec ) {
100
- super (manifestEntry , shouldCancel , hashSpec );
101
+ final HashService hashService ) {
102
+ super (manifestEntry , shouldCancel , hashService );
101
103
this .backuper = backuper ;
102
104
this .snapshotTag = snapshotTag ;
103
105
}
@@ -118,23 +120,29 @@ public Void call() {
118
120
final RemoteObjectReference ref = getRemoteObjectReference (manifestEntry .objectKey );
119
121
120
122
// try to refresh object / decide if it is required to upload it
121
- Callable <Boolean > condition = () -> {
123
+ Callable <Backuper . RefreshingOutcome > condition = () -> {
122
124
try {
123
- return backuper .freshenRemoteObject (manifestEntry , ref ) == FRESHENED ;
125
+
126
+ return backuper .freshenRemoteObject (manifestEntry , ref );
124
127
} catch (final Exception ex ) {
125
128
throw new RetriableException ("Failed to refresh remote object" + ref .objectKey , ex );
126
129
}
127
130
};
128
131
129
- if (manifestEntry .type != MANIFEST_FILE && getRetrier (backuper .request .retry ).submit (condition )) {
130
- logger .info ("{}skipping the upload of alredy uploaded file {}" ,
132
+ Backuper .RefreshingOutcome refreshmentOutcome = getRetrier (backuper .request .retry ).submit (condition );
133
+
134
+ if (manifestEntry .type != MANIFEST_FILE && refreshmentOutcome .result == FRESHENED ) {
135
+ logger .info ("{}skipping the upload of already uploaded file {}" ,
131
136
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "" ,
132
137
ref .canonicalPath );
133
138
134
139
state = State .FINISHED ;
135
140
return null ;
136
141
}
137
142
143
+ if (refreshmentOutcome .hash != null )
144
+ manifestEntry .hash = refreshmentOutcome .hash ;
145
+
138
146
// do the upload
139
147
getRetrier (backuper .request .retry ).submit (() -> {
140
148
try (final InputStream fileStream = new BufferedInputStream (new FileInputStream (manifestEntry .localFile .toFile ()))) {
@@ -144,6 +152,10 @@ public Void call() {
144
152
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "" ,
145
153
manifestEntry .objectKey ,
146
154
DataSize .bytesToHumanReadable (manifestEntry .size )));
155
+
156
+ if (manifestEntry .hash == null )
157
+ manifestEntry .hash = hashService .hash (manifestEntry .localFile );
158
+
147
159
// never encrypt manifest
148
160
if (manifestEntry .type == MANIFEST_FILE ) {
149
161
backuper .uploadFile (manifestEntry , rateLimitedStream , ref );
0 commit comments