1
|
### Eclipse Workspace Patch 1.0
|
2
|
#P d1_cn_index_common
|
3
|
Index: src/test/java/org/dataone/cn/index/test/IndexTaskJpaRepositoryTest.java
|
4
|
===================================================================
|
5
|
--- src/test/java/org/dataone/cn/index/test/IndexTaskJpaRepositoryTest.java (revision 8327)
|
6
|
+++ src/test/java/org/dataone/cn/index/test/IndexTaskJpaRepositoryTest.java (working copy)
|
7
|
@@ -26,6 +26,7 @@
|
8
|
import java.io.IOException;
|
9
|
import java.io.InputStream;
|
10
|
import java.math.BigInteger;
|
11
|
+import java.util.Calendar;
|
12
|
import java.util.Date;
|
13
|
import java.util.List;
|
14
|
import java.util.UUID;
|
15
|
@@ -66,6 +67,162 @@
|
16
|
}
|
17
|
|
18
|
@Test
|
19
|
+ public void testTaskExecutionBackoffForRetry() {
|
20
|
+ repo.deleteAll();
|
21
|
+ // noise
|
22
|
+ saveIndexTaskWithStatus(UUID.randomUUID().toString(), IndexTask.STATUS_NEW);
|
23
|
+ String pidValue = "find by pid:" + UUID.randomUUID().toString();
|
24
|
+ IndexTask task = saveIndexTaskWithStatus(pidValue, IndexTask.STATUS_NEW);
|
25
|
+
|
26
|
+ task = simulateMarkNewProcessing(task);
|
27
|
+ List<IndexTask> itList = repo
|
28
|
+ .findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW);
|
29
|
+ Assert.assertEquals(2, itList.size());
|
30
|
+
|
31
|
+ Calendar cal = Calendar.getInstance();
|
32
|
+ cal.setTimeInMillis(System.currentTimeMillis());
|
33
|
+ cal.add(Calendar.MONTH, 1);
|
34
|
+ itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
|
35
|
+ cal.getTimeInMillis());
|
36
|
+ Assert.assertEquals(0, itList.size());
|
37
|
+
|
38
|
+ Calendar cal1 = Calendar.getInstance();
|
39
|
+ cal1.add(Calendar.MINUTE, 18);
|
40
|
+ Calendar cal2 = Calendar.getInstance();
|
41
|
+ cal2.add(Calendar.MINUTE, 22);
|
42
|
+ task = testNextBackoffForRetry(task, cal1, cal2);
|
43
|
+
|
44
|
+ cal1.setTimeInMillis(System.currentTimeMillis());
|
45
|
+ cal1.add(Calendar.MINUTE, 22);
|
46
|
+ cal2.setTimeInMillis(System.currentTimeMillis());
|
47
|
+ cal2.add(Calendar.MINUTE, 122);
|
48
|
+ task = testNextBackoffForRetry(task, cal1, cal2);
|
49
|
+
|
50
|
+ cal1.setTimeInMillis(System.currentTimeMillis());
|
51
|
+ cal1.add(Calendar.MINUTE, 122);
|
52
|
+ cal2.setTimeInMillis(System.currentTimeMillis());
|
53
|
+ cal2.add(Calendar.MINUTE, (60 * 8) + 2);
|
54
|
+ task = testNextBackoffForRetry(task, cal1, cal2);
|
55
|
+
|
56
|
+ cal1.setTimeInMillis(System.currentTimeMillis());
|
57
|
+ cal1.add(Calendar.MINUTE, (60 * 8) + 2);
|
58
|
+ cal2.setTimeInMillis(System.currentTimeMillis());
|
59
|
+ cal2.add(Calendar.MINUTE, (60 * 24) + 2);
|
60
|
+ task = testNextBackoffForRetry(task, cal1, cal2);
|
61
|
+
|
62
|
+ cal1.setTimeInMillis(System.currentTimeMillis());
|
63
|
+ cal1.add(Calendar.MINUTE, (60 * 8) + 2);
|
64
|
+ cal2.setTimeInMillis(System.currentTimeMillis());
|
65
|
+ cal2.add(Calendar.MINUTE, (60 * 24) + 2);
|
66
|
+ task = testNextBackoffForRetry(task, cal1, cal2);
|
67
|
+ }
|
68
|
+
|
69
|
+ @Test
|
70
|
+ public void testTaskExecutionBackoffForFailed() {
|
71
|
+ repo.deleteAll();
|
72
|
+ // noise
|
73
|
+ saveIndexTaskWithStatus(UUID.randomUUID().toString(), IndexTask.STATUS_NEW);
|
74
|
+ String pidValue = "find by pid:" + UUID.randomUUID().toString();
|
75
|
+ IndexTask task = saveIndexTaskWithStatus(pidValue, IndexTask.STATUS_NEW);
|
76
|
+
|
77
|
+ task = simulateMarkFailedProcessing(task);
|
78
|
+ List<IndexTask> itList = repo
|
79
|
+ .findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW);
|
80
|
+ Assert.assertEquals(1, itList.size());
|
81
|
+
|
82
|
+ Calendar cal = Calendar.getInstance();
|
83
|
+ cal.setTimeInMillis(System.currentTimeMillis());
|
84
|
+ cal.add(Calendar.MINUTE, 1);
|
85
|
+ itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
|
86
|
+ cal.getTimeInMillis());
|
87
|
+ Assert.assertEquals(1, itList.size());
|
88
|
+
|
89
|
+ Calendar cal1 = Calendar.getInstance();
|
90
|
+ cal1.add(Calendar.MINUTE, 18);
|
91
|
+ Calendar cal2 = Calendar.getInstance();
|
92
|
+ cal2.add(Calendar.MINUTE, 22);
|
93
|
+ task = testNextBackoffForFailed(task, cal1, cal2);
|
94
|
+
|
95
|
+ cal1.setTimeInMillis(System.currentTimeMillis());
|
96
|
+ cal1.add(Calendar.MINUTE, 22);
|
97
|
+ cal2.setTimeInMillis(System.currentTimeMillis());
|
98
|
+ cal2.add(Calendar.MINUTE, 122);
|
99
|
+ task = testNextBackoffForFailed(task, cal1, cal2);
|
100
|
+
|
101
|
+ cal1.setTimeInMillis(System.currentTimeMillis());
|
102
|
+ cal1.add(Calendar.MINUTE, 122);
|
103
|
+ cal2.setTimeInMillis(System.currentTimeMillis());
|
104
|
+ cal2.add(Calendar.MINUTE, (60 * 8) + 2);
|
105
|
+ task = testNextBackoffForFailed(task, cal1, cal2);
|
106
|
+
|
107
|
+ cal1.setTimeInMillis(System.currentTimeMillis());
|
108
|
+ cal1.add(Calendar.MINUTE, (60 * 8) + 2);
|
109
|
+ cal2.setTimeInMillis(System.currentTimeMillis());
|
110
|
+ cal2.add(Calendar.MINUTE, (60 * 24) + 2);
|
111
|
+ task = testNextBackoffForFailed(task, cal1, cal2);
|
112
|
+
|
113
|
+ cal1.setTimeInMillis(System.currentTimeMillis());
|
114
|
+ cal1.add(Calendar.MINUTE, (60 * 8) + 2);
|
115
|
+ cal2.setTimeInMillis(System.currentTimeMillis());
|
116
|
+ cal2.add(Calendar.MINUTE, (60 * 24) + 2);
|
117
|
+ task = testNextBackoffForFailed(task, cal1, cal2);
|
118
|
+ }
|
119
|
+
|
120
|
+ private IndexTask testNextBackoffForRetry(IndexTask task, Calendar previousTimeIncrement,
|
121
|
+ Calendar nextTimeIncrement) {
|
122
|
+
|
123
|
+ task = simulateMarkNewProcessing(task);
|
124
|
+ List<IndexTask> itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
|
125
|
+ previousTimeIncrement.getTimeInMillis());
|
126
|
+ Assert.assertEquals(0, itList.size());
|
127
|
+
|
128
|
+ itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
|
129
|
+ nextTimeIncrement.getTimeInMillis());
|
130
|
+ Assert.assertEquals(1, itList.size());
|
131
|
+
|
132
|
+ IndexTask it = itList.get(0);
|
133
|
+ Assert.assertNotNull(it);
|
134
|
+ Assert.assertTrue(task.getPid().equals(it.getPid()));
|
135
|
+ Assert.assertTrue(IndexTask.STATUS_FAILED.equals(it.getStatus()));
|
136
|
+ return task;
|
137
|
+ }
|
138
|
+
|
139
|
+ private IndexTask testNextBackoffForFailed(IndexTask task, Calendar previousTimeIncrement,
|
140
|
+ Calendar nextTimeIncrement) {
|
141
|
+
|
142
|
+ task = simulateMarkFailedProcessing(task);
|
143
|
+ List<IndexTask> itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
|
144
|
+ previousTimeIncrement.getTimeInMillis());
|
145
|
+ Assert.assertEquals(0, itList.size());
|
146
|
+
|
147
|
+ itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
|
148
|
+ nextTimeIncrement.getTimeInMillis());
|
149
|
+ Assert.assertEquals(1, itList.size());
|
150
|
+
|
151
|
+ IndexTask it = itList.get(0);
|
152
|
+ Assert.assertNotNull(it);
|
153
|
+ Assert.assertTrue(task.getPid().equals(it.getPid()));
|
154
|
+ Assert.assertTrue(IndexTask.STATUS_FAILED.equals(it.getStatus()));
|
155
|
+ return task;
|
156
|
+ }
|
157
|
+
|
158
|
+ private IndexTask simulateMarkNewProcessing(IndexTask task) {
|
159
|
+ task.markInProgress();
|
160
|
+ // not ready
|
161
|
+ task.markNew();
|
162
|
+ task = repo.save(task);
|
163
|
+ return task;
|
164
|
+ }
|
165
|
+
|
166
|
+ private IndexTask simulateMarkFailedProcessing(IndexTask task) {
|
167
|
+ task.markInProgress();
|
168
|
+ // not ready
|
169
|
+ task.markFailed();
|
170
|
+ task = repo.save(task);
|
171
|
+ return task;
|
172
|
+ }
|
173
|
+
|
174
|
+ @Test
|
175
|
public void testAddOneTask() {
|
176
|
int initialSize = repo.findAll().size();
|
177
|
IndexTask task = saveIndexTask("pid1");
|
178
|
@@ -131,6 +288,11 @@
|
179
|
String pidValue = "find by pid:" + UUID.randomUUID().toString();
|
180
|
String status = "TEST-STATUS";
|
181
|
saveIndexTaskWithStatus(pidValue, status);
|
182
|
+
|
183
|
+ String pidValue2 = "find by pid:" + UUID.randomUUID().toString();
|
184
|
+ String status2 = "TEST-STATUS2";
|
185
|
+ saveIndexTaskWithStatus(pidValue2, status2);
|
186
|
+
|
187
|
List<IndexTask> itList = repo.findByPidAndStatus(pidValue, status);
|
188
|
Assert.assertEquals(1, itList.size());
|
189
|
IndexTask it = itList.get(0);
|
190
|
@@ -139,6 +301,29 @@
|
191
|
Assert.assertTrue(status.equals(it.getStatus()));
|
192
|
}
|
193
|
|
194
|
+ @Test
|
195
|
+ public void testFindByStatusAndNextExection() {
|
196
|
+ String pidValue = "find by pid:" + UUID.randomUUID().toString();
|
197
|
+ String status = "TEST-NEXT";
|
198
|
+ saveIndexTaskWithStatus(pidValue, status);
|
199
|
+
|
200
|
+ String pidValue2 = "find by pid:" + UUID.randomUUID().toString();
|
201
|
+ IndexTask task2 = saveIndexTaskWithStatus(pidValue2, "TEST-NEXT");
|
202
|
+ Calendar cal = Calendar.getInstance();
|
203
|
+ cal.setTimeInMillis(System.currentTimeMillis());
|
204
|
+ cal.add(Calendar.DATE, 1);
|
205
|
+ task2.setNextExection(cal.getTimeInMillis());
|
206
|
+ repo.save(task2);
|
207
|
+
|
208
|
+ List<IndexTask> itList = repo.findByStatusAndNextExecutionLessThan(status,
|
209
|
+ System.currentTimeMillis());
|
210
|
+ Assert.assertEquals(1, itList.size());
|
211
|
+ IndexTask it = itList.get(0);
|
212
|
+ Assert.assertNotNull(it);
|
213
|
+ Assert.assertTrue(pidValue.equals(it.getPid()));
|
214
|
+ Assert.assertTrue(status.equals(it.getStatus()));
|
215
|
+ }
|
216
|
+
|
217
|
/**
|
218
|
* Tests status narrowing and ordering of the task queue query
|
219
|
*/
|
220
|
@@ -148,6 +333,7 @@
|
221
|
"garbage status", 1);
|
222
|
|
223
|
String status = "findQueue";
|
224
|
+ String status2 = "badStatus";
|
225
|
|
226
|
repo.deleteAll();
|
227
|
|
228
|
@@ -163,6 +349,12 @@
|
229
|
String pidValue3 = "3rd created task: " + UUID.randomUUID().toString();
|
230
|
saveIndexTaskWithStatusAndPriority(pidValue3, status, 1);
|
231
|
|
232
|
+ String pidValue4 = "4th created task: " + UUID.randomUUID().toString();
|
233
|
+ saveIndexTaskWithStatusAndPriority(pidValue4, status2, 1);
|
234
|
+
|
235
|
+ String pidValue5 = "thrd created task: " + UUID.randomUUID().toString();
|
236
|
+ saveIndexTaskWithStatusAndPriority(pidValue5, status2, 1);
|
237
|
+
|
238
|
List<IndexTask> queue = repo.findByStatusOrderByPriorityAscTaskModifiedDateAsc(status);
|
239
|
Assert.assertEquals(3, queue.size());
|
240
|
|
241
|
Index: src/main/java/org/dataone/cn/index/task/IndexTask.java
|
242
|
===================================================================
|
243
|
--- src/main/java/org/dataone/cn/index/task/IndexTask.java (revision 8794)
|
244
|
+++ src/main/java/org/dataone/cn/index/task/IndexTask.java (working copy)
|
245
|
@@ -28,6 +28,7 @@
|
246
|
import java.io.InputStream;
|
247
|
import java.io.Serializable;
|
248
|
import java.io.UnsupportedEncodingException;
|
249
|
+import java.util.Calendar;
|
250
|
|
251
|
import javax.persistence.Column;
|
252
|
import javax.persistence.Entity;
|
253
|
@@ -68,8 +69,7 @@
|
254
|
private static Logger logger = Logger.getLogger(IndexTask.class.getName());
|
255
|
|
256
|
@Transient
|
257
|
- private final FastDateFormat format = FastDateFormat
|
258
|
- .getInstance("MM/dd/yyyy:HH:mm:ss:SS");
|
259
|
+ private final FastDateFormat format = FastDateFormat.getInstance("MM/dd/yyyy:HH:mm:ss:SS");
|
260
|
|
261
|
@Transient
|
262
|
private static final String FORMAT_RESOURCE_MAP = "http://www.openarchives.org/ore/terms";
|
263
|
@@ -118,6 +118,10 @@
|
264
|
*/
|
265
|
private long taskModifiedDate;
|
266
|
|
267
|
+ private long nextExecution = 0;
|
268
|
+
|
269
|
+ private int tryCount = 0;
|
270
|
+
|
271
|
/**
|
272
|
* Relative priority of this task. Some operations such as a change in
|
273
|
* access control rules should be propagated to the index before others
|
274
|
@@ -318,6 +322,22 @@
|
275
|
this.dateSysMetaModified = dateSysMetaModified;
|
276
|
}
|
277
|
|
278
|
+ public long getNextExecution() {
|
279
|
+ return this.nextExecution;
|
280
|
+ }
|
281
|
+
|
282
|
+ public void setNextExection(long next) {
|
283
|
+ this.nextExecution = next;
|
284
|
+ }
|
285
|
+
|
286
|
+ public int getTryCount() {
|
287
|
+ return tryCount;
|
288
|
+ }
|
289
|
+
|
290
|
+ public void setTryCount(int count) {
|
291
|
+ this.tryCount = count;
|
292
|
+ }
|
293
|
+
|
294
|
/**
|
295
|
* Private method exposed due to JPA and unit testing requirements. Should
|
296
|
* not use directly.
|
297
|
@@ -385,21 +405,63 @@
|
298
|
return status;
|
299
|
}
|
300
|
|
301
|
+ // Do not use this method, used by unit tests only.
|
302
|
+ // use the specific 'markNew, markFailed, markInProcess' methods.
|
303
|
public void setStatus(String status) {
|
304
|
- this.taskModifiedDate = System.currentTimeMillis();
|
305
|
- this.status = status;
|
306
|
+ if (status != null) {
|
307
|
+ this.taskModifiedDate = System.currentTimeMillis();
|
308
|
+ this.status = status;
|
309
|
+ }
|
310
|
}
|
311
|
|
312
|
+ private void setBackoffExectionTime() {
|
313
|
+ if (getTryCount() == 2) {
|
314
|
+ Calendar cal = Calendar.getInstance();
|
315
|
+ cal.setTimeInMillis(System.currentTimeMillis());
|
316
|
+ cal.add(Calendar.MINUTE, 20);
|
317
|
+ setNextExection(cal.getTimeInMillis());
|
318
|
+ } else if (getTryCount() == 3) {
|
319
|
+ Calendar cal = Calendar.getInstance();
|
320
|
+ cal.setTimeInMillis(System.currentTimeMillis());
|
321
|
+ cal.add(Calendar.HOUR, 2);
|
322
|
+ setNextExection(cal.getTimeInMillis());
|
323
|
+ } else if (getTryCount() == 4) {
|
324
|
+ Calendar cal = Calendar.getInstance();
|
325
|
+ cal.setTimeInMillis(System.currentTimeMillis());
|
326
|
+ cal.add(Calendar.HOUR, 8);
|
327
|
+ setNextExection(cal.getTimeInMillis());
|
328
|
+ } else if (getTryCount() > 4) {
|
329
|
+ Calendar cal = Calendar.getInstance();
|
330
|
+ cal.setTimeInMillis(System.currentTimeMillis());
|
331
|
+ cal.add(Calendar.HOUR, 24);
|
332
|
+ setNextExection(cal.getTimeInMillis());
|
333
|
+ }
|
334
|
+ }
|
335
|
+
|
336
|
+ private boolean timeForRetryBackoff(String status) {
|
337
|
+ return (getTryCount() >= 2) && (STATUS_COMPLETE.equals(status) == false)
|
338
|
+ && (STATUS_IN_PROCESS.equals(status) == false);
|
339
|
+ }
|
340
|
+
|
341
|
public void markInProgress() {
|
342
|
this.setStatus(STATUS_IN_PROCESS);
|
343
|
+ this.tryCount++;
|
344
|
}
|
345
|
|
346
|
public void markNew() {
|
347
|
this.setStatus(STATUS_NEW);
|
348
|
+ if (timeForRetryBackoff(status)) {
|
349
|
+ this.status = STATUS_FAILED;
|
350
|
+ setBackoffExectionTime();
|
351
|
+ }
|
352
|
}
|
353
|
|
354
|
public void markFailed() {
|
355
|
this.setStatus(STATUS_FAILED);
|
356
|
+ if (timeForRetryBackoff(status)) {
|
357
|
+ this.status = STATUS_FAILED;
|
358
|
+ setBackoffExectionTime();
|
359
|
+ }
|
360
|
}
|
361
|
|
362
|
public int getVersion() {
|
363
|
@@ -412,9 +474,9 @@
|
364
|
|
365
|
@Override
|
366
|
public String toString() {
|
367
|
- return "IndexTask [id=" + id + ", pid=" + pid + ", formatid=" + formatId
|
368
|
- + ", objectPath=" + objectPath + ", dateSysMetaModified="
|
369
|
- + dateSysMetaModified + ", taskModifiedDate=" + taskModifiedDate
|
370
|
- + ", priority=" + priority + ", status=" + status + "]";
|
371
|
+ return "IndexTask [id=" + id + ", pid=" + pid + ", formatid=" + formatId + ", objectPath="
|
372
|
+ + objectPath + ", dateSysMetaModified=" + dateSysMetaModified
|
373
|
+ + ", taskModifiedDate=" + taskModifiedDate + ", priority=" + priority + ", status="
|
374
|
+ + status + "]";
|
375
|
}
|
376
|
}
|
377
|
Index: src/main/java/org/dataone/cn/index/task/IndexTaskRepository.java
|
378
|
===================================================================
|
379
|
--- src/main/java/org/dataone/cn/index/task/IndexTaskRepository.java (revision 8644)
|
380
|
+++ src/main/java/org/dataone/cn/index/task/IndexTaskRepository.java (working copy)
|
381
|
@@ -64,7 +64,8 @@
|
382
|
* @param status
|
383
|
* @return
|
384
|
*/
|
385
|
- List<IndexTask> findByStatusOrderByPriorityAscTaskModifiedDateAsc(
|
386
|
- String status);
|
387
|
+ List<IndexTask> findByStatusOrderByPriorityAscTaskModifiedDateAsc(String status);
|
388
|
|
389
|
+ List<IndexTask> findByStatusAndNextExecutionLessThan(String status, long time);
|
390
|
+
|
391
|
}
|
392
|
#P d1_cn_index_generator
|
393
|
Index: src/main/java/org/dataone/cn/index/generator/IndexTaskGenerator.java
|
394
|
===================================================================
|
395
|
--- src/main/java/org/dataone/cn/index/generator/IndexTaskGenerator.java (revision 8644)
|
396
|
+++ src/main/java/org/dataone/cn/index/generator/IndexTaskGenerator.java (working copy)
|
397
|
@@ -42,8 +42,7 @@
|
398
|
*/
|
399
|
public class IndexTaskGenerator {
|
400
|
|
401
|
- private static Logger logger = Logger.getLogger(IndexTaskGenerator.class
|
402
|
- .getName());
|
403
|
+ private static Logger logger = Logger.getLogger(IndexTaskGenerator.class.getName());
|
404
|
private static final String IGNOREPID = "OBJECT_FORMAT_LIST.1.1";
|
405
|
|
406
|
@Autowired
|
407
|
@@ -56,8 +55,7 @@
|
408
|
* @param SystemMetadata
|
409
|
* @return IndexTask
|
410
|
*/
|
411
|
- public IndexTask processSystemMetaDataAdd(SystemMetadata smd,
|
412
|
- String objectPath) {
|
413
|
+ public IndexTask processSystemMetaDataAdd(SystemMetadata smd, String objectPath) {
|
414
|
if (isNotIgnorePid(smd)) {
|
415
|
removeDuplicateNewTasks(smd);
|
416
|
IndexTask task = new IndexTask(smd, objectPath);
|
417
|
@@ -75,8 +73,7 @@
|
418
|
* @param SystemMetadata
|
419
|
* @return IndexTask
|
420
|
*/
|
421
|
- public IndexTask processSystemMetaDataUpdate(SystemMetadata smd,
|
422
|
- String objectPath) {
|
423
|
+ public IndexTask processSystemMetaDataUpdate(SystemMetadata smd, String objectPath) {
|
424
|
if (isNotIgnorePid(smd)) {
|
425
|
removeDuplicateNewTasks(smd);
|
426
|
IndexTask task = new IndexTask(smd, objectPath);
|
427
|
@@ -101,14 +98,18 @@
|
428
|
* @param SystemMetadata
|
429
|
*/
|
430
|
private void removeDuplicateNewTasks(SystemMetadata smd) {
|
431
|
- List<IndexTask> itList = repo.findByPidAndStatus(smd.getIdentifier()
|
432
|
- .getValue(), IndexTask.STATUS_NEW);
|
433
|
+ removeDuplicateTasksByStatus(smd, IndexTask.STATUS_NEW);
|
434
|
+ // new update on this pid, so remove failure and try to reprocess.
|
435
|
+ removeDuplicateTasksByStatus(smd, IndexTask.STATUS_FAILED);
|
436
|
+ }
|
437
|
+
|
438
|
+ private void removeDuplicateTasksByStatus(SystemMetadata smd, String status) {
|
439
|
+ List<IndexTask> itList = repo.findByPidAndStatus(smd.getIdentifier().getValue(), status);
|
440
|
for (IndexTask indexTask : itList) {
|
441
|
try {
|
442
|
repo.delete(indexTask);
|
443
|
} catch (HibernateOptimisticLockingFailureException e) {
|
444
|
- logger.debug("Unable to delete existing index task for pid: "
|
445
|
- + indexTask.getPid()
|
446
|
+ logger.debug("Unable to delete existing index task for pid: " + indexTask.getPid()
|
447
|
+ " prior to generating new index task.");
|
448
|
}
|
449
|
}
|
450
|
#P d1_cn_index_processor
|
451
|
Index: src/main/java/org/dataone/cn/index/processor/IndexTaskDeleteProcessor.java
|
452
|
===================================================================
|
453
|
--- src/main/java/org/dataone/cn/index/processor/IndexTaskDeleteProcessor.java (revision 10046)
|
454
|
+++ src/main/java/org/dataone/cn/index/processor/IndexTaskDeleteProcessor.java (working copy)
|
455
|
@@ -22,7 +22,6 @@
|
456
|
|
457
|
package org.dataone.cn.index.processor;
|
458
|
|
459
|
-import java.io.File;
|
460
|
import java.io.IOException;
|
461
|
import java.util.ArrayList;
|
462
|
import java.util.List;
|
463
|
@@ -91,74 +90,51 @@
|
464
|
}
|
465
|
|
466
|
private void removeDataPackage(IndexTask task) throws Exception {
|
467
|
- if (task.getObjectPath() == null) {
|
468
|
- String objectPath = retrieveObjectPath(task.getPid());
|
469
|
- task.setObjectPath(objectPath);
|
470
|
- }
|
471
|
- boolean resourceMapFileExists = false;
|
472
|
- if (task.getObjectPath() != null) {
|
473
|
- File objectPathFile = new File(task.getObjectPath());
|
474
|
- if (objectPathFile.exists()) {
|
475
|
- resourceMapFileExists = true;
|
476
|
- } else {
|
477
|
- logger.info("Object path exists: " + task.getObjectPath() + " for pid: "
|
478
|
- + task.getPid() + " but file location does not yet exist.");
|
479
|
+ Document resourceMapDoc = getXPathDocumentParser().loadDocument(task.getObjectPath());
|
480
|
+ ResourceMap resourceMap = new ResourceMap(resourceMapDoc);
|
481
|
+ List<String> documentIds = resourceMap.getAllDocumentIDs();
|
482
|
+ List<SolrDoc> indexDocuments = httpService.getDocuments(solrQueryUri, documentIds);
|
483
|
+ removeFromIndex(task);
|
484
|
+ List<SolrDoc> docsToUpdate = new ArrayList<SolrDoc>();
|
485
|
+ // for each document in data package:
|
486
|
+ for (SolrDoc indexDoc : indexDocuments) {
|
487
|
+
|
488
|
+ if (indexDoc.getIdentifier().equals(task.getPid())) {
|
489
|
+ continue; // skipping the resource map, no need update
|
490
|
+ // it.
|
491
|
+ // will
|
492
|
+ // be removed.
|
493
|
}
|
494
|
- } else {
|
495
|
- logger.info("Object path not yet set for pid: " + task.getPid());
|
496
|
- }
|
497
|
- if (resourceMapFileExists) {
|
498
|
- Document resourceMapDoc = getXPathDocumentParser().loadDocument(task.getObjectPath());
|
499
|
- if (resourceMapDoc != null) {
|
500
|
- ResourceMap resourceMap = new ResourceMap(resourceMapDoc);
|
501
|
- List<String> documentIds = resourceMap.getAllDocumentIDs();
|
502
|
- List<SolrDoc> indexDocuments = httpService.getDocuments(solrQueryUri, documentIds);
|
503
|
- removeFromIndex(task);
|
504
|
- List<SolrDoc> docsToUpdate = new ArrayList<SolrDoc>();
|
505
|
- // for each document in data package:
|
506
|
- for (SolrDoc indexDoc : indexDocuments) {
|
507
|
|
508
|
- if (indexDoc.getIdentifier().equals(task.getPid())) {
|
509
|
- continue; // skipping the resource map, no need update
|
510
|
- // it.
|
511
|
- // will
|
512
|
- // be removed.
|
513
|
- }
|
514
|
+ // Remove resourceMap reference
|
515
|
+ indexDoc.removeFieldsWithValue(SolrElementField.FIELD_RESOURCEMAP,
|
516
|
+ resourceMap.getIdentifier());
|
517
|
|
518
|
- // Remove resourceMap reference
|
519
|
- indexDoc.removeFieldsWithValue(SolrElementField.FIELD_RESOURCEMAP,
|
520
|
- resourceMap.getIdentifier());
|
521
|
-
|
522
|
- // // Remove documents/documentedby values for this resource
|
523
|
- // map
|
524
|
- for (ResourceEntry entry : resourceMap.getMappedReferences()) {
|
525
|
- if (indexDoc.getIdentifier().equals(entry.getIdentifier())) {
|
526
|
- for (String documentedBy : entry.getDocumentedBy()) {
|
527
|
- // Using removeOneFieldWithValue in-case same
|
528
|
- // documents
|
529
|
- // are in more than one data package. just
|
530
|
- // remove
|
531
|
- // one
|
532
|
- // instance of data package info.
|
533
|
- indexDoc.removeOneFieldWithValue(
|
534
|
- SolrElementField.FIELD_ISDOCUMENTEDBY, documentedBy);
|
535
|
- }
|
536
|
- for (String documents : entry.getDocuments()) {
|
537
|
- indexDoc.removeOneFieldWithValue(SolrElementField.FIELD_DOCUMENTS,
|
538
|
- documents);
|
539
|
- }
|
540
|
- break;
|
541
|
- }
|
542
|
+ // // Remove documents/documentedby values for this resource
|
543
|
+ // map
|
544
|
+ for (ResourceEntry entry : resourceMap.getMappedReferences()) {
|
545
|
+ if (indexDoc.getIdentifier().equals(entry.getIdentifier())) {
|
546
|
+ for (String documentedBy : entry.getDocumentedBy()) {
|
547
|
+ // Using removeOneFieldWithValue in-case same
|
548
|
+ // documents
|
549
|
+ // are in more than one data package. just
|
550
|
+ // remove
|
551
|
+ // one
|
552
|
+ // instance of data package info.
|
553
|
+ indexDoc.removeOneFieldWithValue(SolrElementField.FIELD_ISDOCUMENTEDBY,
|
554
|
+ documentedBy);
|
555
|
}
|
556
|
- docsToUpdate.add(indexDoc);
|
557
|
+ for (String documents : entry.getDocuments()) {
|
558
|
+ indexDoc.removeOneFieldWithValue(SolrElementField.FIELD_DOCUMENTS,
|
559
|
+ documents);
|
560
|
+ }
|
561
|
+ break;
|
562
|
}
|
563
|
- SolrElementAdd addCommand = new SolrElementAdd(docsToUpdate);
|
564
|
- httpService.sendUpdate(solrIndexUri, addCommand);
|
565
|
}
|
566
|
- } else {
|
567
|
- task.markFailed();
|
568
|
- saveTask(task);
|
569
|
+ docsToUpdate.add(indexDoc);
|
570
|
}
|
571
|
+ SolrElementAdd addCommand = new SolrElementAdd(docsToUpdate);
|
572
|
+ httpService.sendUpdate(solrIndexUri, addCommand);
|
573
|
}
|
574
|
|
575
|
private void removeFromDataPackage(IndexTask task) throws Exception {
|
576
|
Index: src/main/java/org/dataone/cn/index/processor/IndexTaskProcessor.java
|
577
|
===================================================================
|
578
|
--- src/main/java/org/dataone/cn/index/processor/IndexTaskProcessor.java (revision 10044)
|
579
|
+++ src/main/java/org/dataone/cn/index/processor/IndexTaskProcessor.java (working copy)
|
580
|
@@ -111,6 +111,13 @@
|
581
|
processTask(task);
|
582
|
task = getNextIndexTask(queue);
|
583
|
}
|
584
|
+
|
585
|
+ List<IndexTask> retryQueue = getIndexTaskRetryQueue();
|
586
|
+ task = getNextIndexTask(retryQueue);
|
587
|
+ while (task != null) {
|
588
|
+ processTask(task);
|
589
|
+ task = getNextIndexTask(queue);
|
590
|
+ }
|
591
|
}
|
592
|
|
593
|
private void processTask(IndexTask task) {
|
594
|
@@ -141,10 +148,6 @@
|
595
|
task.markInProgress();
|
596
|
task = saveTask(task);
|
597
|
|
598
|
- if (task != null && task.isDeleteTask()) {
|
599
|
- return task;
|
600
|
- }
|
601
|
-
|
602
|
if (task != null && !isObjectPathReady(task)) {
|
603
|
task.markNew();
|
604
|
saveTask(task);
|
605
|
@@ -152,7 +155,13 @@
|
606
|
continue;
|
607
|
}
|
608
|
|
609
|
+ if (task != null && task.isDeleteTask()) {
|
610
|
+ return task;
|
611
|
+ }
|
612
|
+
|
613
|
if (task != null && !isResourceMapReadyToIndex(task, queue)) {
|
614
|
+ task.markNew();
|
615
|
+ saveTask(task);
|
616
|
task = null;
|
617
|
}
|
618
|
}
|
619
|
@@ -166,8 +175,6 @@
|
620
|
if (docObject == null) {
|
621
|
logger.debug("unable to load resource at object path: " + task.getObjectPath()
|
622
|
+ ". Marking new and continuing...");
|
623
|
- task.markNew();
|
624
|
- saveTask(task);
|
625
|
ready = false;
|
626
|
} else if (docObject != null) {
|
627
|
ResourceMap rm = null;
|
628
|
@@ -181,8 +188,6 @@
|
629
|
if (areAllReferencedDocsIndexed(referencedIds) == false) {
|
630
|
logger.info("Not all map resource references indexed for map: " + task.getPid()
|
631
|
+ ". Marking new and continuing...");
|
632
|
- task.markNew();
|
633
|
- saveTask(task);
|
634
|
ready = false;
|
635
|
}
|
636
|
}
|
637
|
@@ -294,6 +299,11 @@
|
638
|
return repo.findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW);
|
639
|
}
|
640
|
|
641
|
+ private List<IndexTask> getIndexTaskRetryQueue() {
|
642
|
+ return repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
|
643
|
+ System.currentTimeMillis());
|
644
|
+ }
|
645
|
+
|
646
|
private XPathDocumentParser getXPathDocumentParser() {
|
647
|
return documentParsers.get(0);
|
648
|
}
|