Project

General

Profile

redmine3162.txt

Skye Roseboom, 2012-09-05 03:17

Download (26.3 KB)

 
1
### Eclipse Workspace Patch 1.0
2
#P d1_cn_index_common
3
Index: src/test/java/org/dataone/cn/index/test/IndexTaskJpaRepositoryTest.java
4
===================================================================
5
--- src/test/java/org/dataone/cn/index/test/IndexTaskJpaRepositoryTest.java	(revision 8327)
6
+++ src/test/java/org/dataone/cn/index/test/IndexTaskJpaRepositoryTest.java	(working copy)
7
@@ -26,6 +26,7 @@
8
 import java.io.IOException;
9
 import java.io.InputStream;
10
 import java.math.BigInteger;
11
+import java.util.Calendar;
12
 import java.util.Date;
13
 import java.util.List;
14
 import java.util.UUID;
15
@@ -66,6 +67,162 @@
16
     }
17
 
18
     @Test
19
+    public void testTaskExecutionBackoffForRetry() {
20
+        repo.deleteAll();
21
+        // noise
22
+        saveIndexTaskWithStatus(UUID.randomUUID().toString(), IndexTask.STATUS_NEW);
23
+        String pidValue = "find by pid:" + UUID.randomUUID().toString();
24
+        IndexTask task = saveIndexTaskWithStatus(pidValue, IndexTask.STATUS_NEW);
25
+
26
+        task = simulateMarkNewProcessing(task);
27
+        List<IndexTask> itList = repo
28
+                .findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW);
29
+        Assert.assertEquals(2, itList.size());
30
+
31
+        Calendar cal = Calendar.getInstance();
32
+        cal.setTimeInMillis(System.currentTimeMillis());
33
+        cal.add(Calendar.MONTH, 1);
34
+        itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
35
+                cal.getTimeInMillis());
36
+        Assert.assertEquals(0, itList.size());
37
+
38
+        Calendar cal1 = Calendar.getInstance();
39
+        cal1.add(Calendar.MINUTE, 18);
40
+        Calendar cal2 = Calendar.getInstance();
41
+        cal2.add(Calendar.MINUTE, 22);
42
+        task = testNextBackoffForRetry(task, cal1, cal2);
43
+
44
+        cal1.setTimeInMillis(System.currentTimeMillis());
45
+        cal1.add(Calendar.MINUTE, 22);
46
+        cal2.setTimeInMillis(System.currentTimeMillis());
47
+        cal2.add(Calendar.MINUTE, 122);
48
+        task = testNextBackoffForRetry(task, cal1, cal2);
49
+
50
+        cal1.setTimeInMillis(System.currentTimeMillis());
51
+        cal1.add(Calendar.MINUTE, 122);
52
+        cal2.setTimeInMillis(System.currentTimeMillis());
53
+        cal2.add(Calendar.MINUTE, (60 * 8) + 2);
54
+        task = testNextBackoffForRetry(task, cal1, cal2);
55
+
56
+        cal1.setTimeInMillis(System.currentTimeMillis());
57
+        cal1.add(Calendar.MINUTE, (60 * 8) + 2);
58
+        cal2.setTimeInMillis(System.currentTimeMillis());
59
+        cal2.add(Calendar.MINUTE, (60 * 24) + 2);
60
+        task = testNextBackoffForRetry(task, cal1, cal2);
61
+
62
+        cal1.setTimeInMillis(System.currentTimeMillis());
63
+        cal1.add(Calendar.MINUTE, (60 * 8) + 2);
64
+        cal2.setTimeInMillis(System.currentTimeMillis());
65
+        cal2.add(Calendar.MINUTE, (60 * 24) + 2);
66
+        task = testNextBackoffForRetry(task, cal1, cal2);
67
+    }
68
+
69
+    @Test
70
+    public void testTaskExecutionBackoffForFailed() {
71
+        repo.deleteAll();
72
+        // noise
73
+        saveIndexTaskWithStatus(UUID.randomUUID().toString(), IndexTask.STATUS_NEW);
74
+        String pidValue = "find by pid:" + UUID.randomUUID().toString();
75
+        IndexTask task = saveIndexTaskWithStatus(pidValue, IndexTask.STATUS_NEW);
76
+
77
+        task = simulateMarkFailedProcessing(task);
78
+        List<IndexTask> itList = repo
79
+                .findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW);
80
+        Assert.assertEquals(1, itList.size());
81
+
82
+        Calendar cal = Calendar.getInstance();
83
+        cal.setTimeInMillis(System.currentTimeMillis());
84
+        cal.add(Calendar.MINUTE, 1);
85
+        itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
86
+                cal.getTimeInMillis());
87
+        Assert.assertEquals(1, itList.size());
88
+
89
+        Calendar cal1 = Calendar.getInstance();
90
+        cal1.add(Calendar.MINUTE, 18);
91
+        Calendar cal2 = Calendar.getInstance();
92
+        cal2.add(Calendar.MINUTE, 22);
93
+        task = testNextBackoffForFailed(task, cal1, cal2);
94
+
95
+        cal1.setTimeInMillis(System.currentTimeMillis());
96
+        cal1.add(Calendar.MINUTE, 22);
97
+        cal2.setTimeInMillis(System.currentTimeMillis());
98
+        cal2.add(Calendar.MINUTE, 122);
99
+        task = testNextBackoffForFailed(task, cal1, cal2);
100
+
101
+        cal1.setTimeInMillis(System.currentTimeMillis());
102
+        cal1.add(Calendar.MINUTE, 122);
103
+        cal2.setTimeInMillis(System.currentTimeMillis());
104
+        cal2.add(Calendar.MINUTE, (60 * 8) + 2);
105
+        task = testNextBackoffForFailed(task, cal1, cal2);
106
+
107
+        cal1.setTimeInMillis(System.currentTimeMillis());
108
+        cal1.add(Calendar.MINUTE, (60 * 8) + 2);
109
+        cal2.setTimeInMillis(System.currentTimeMillis());
110
+        cal2.add(Calendar.MINUTE, (60 * 24) + 2);
111
+        task = testNextBackoffForFailed(task, cal1, cal2);
112
+
113
+        cal1.setTimeInMillis(System.currentTimeMillis());
114
+        cal1.add(Calendar.MINUTE, (60 * 8) + 2);
115
+        cal2.setTimeInMillis(System.currentTimeMillis());
116
+        cal2.add(Calendar.MINUTE, (60 * 24) + 2);
117
+        task = testNextBackoffForFailed(task, cal1, cal2);
118
+    }
119
+
120
+    private IndexTask testNextBackoffForRetry(IndexTask task, Calendar previousTimeIncrement,
121
+            Calendar nextTimeIncrement) {
122
+
123
+        task = simulateMarkNewProcessing(task);
124
+        List<IndexTask> itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
125
+                previousTimeIncrement.getTimeInMillis());
126
+        Assert.assertEquals(0, itList.size());
127
+
128
+        itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
129
+                nextTimeIncrement.getTimeInMillis());
130
+        Assert.assertEquals(1, itList.size());
131
+
132
+        IndexTask it = itList.get(0);
133
+        Assert.assertNotNull(it);
134
+        Assert.assertTrue(task.getPid().equals(it.getPid()));
135
+        Assert.assertTrue(IndexTask.STATUS_FAILED.equals(it.getStatus()));
136
+        return task;
137
+    }
138
+
139
+    private IndexTask testNextBackoffForFailed(IndexTask task, Calendar previousTimeIncrement,
140
+            Calendar nextTimeIncrement) {
141
+
142
+        task = simulateMarkFailedProcessing(task);
143
+        List<IndexTask> itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
144
+                previousTimeIncrement.getTimeInMillis());
145
+        Assert.assertEquals(0, itList.size());
146
+
147
+        itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
148
+                nextTimeIncrement.getTimeInMillis());
149
+        Assert.assertEquals(1, itList.size());
150
+
151
+        IndexTask it = itList.get(0);
152
+        Assert.assertNotNull(it);
153
+        Assert.assertTrue(task.getPid().equals(it.getPid()));
154
+        Assert.assertTrue(IndexTask.STATUS_FAILED.equals(it.getStatus()));
155
+        return task;
156
+    }
157
+
158
+    private IndexTask simulateMarkNewProcessing(IndexTask task) {
159
+        task.markInProgress();
160
+        // not ready
161
+        task.markNew();
162
+        task = repo.save(task);
163
+        return task;
164
+    }
165
+
166
+    private IndexTask simulateMarkFailedProcessing(IndexTask task) {
167
+        task.markInProgress();
168
+        // not ready
169
+        task.markFailed();
170
+        task = repo.save(task);
171
+        return task;
172
+    }
173
+
174
+    @Test
175
     public void testAddOneTask() {
176
         int initialSize = repo.findAll().size();
177
         IndexTask task = saveIndexTask("pid1");
178
@@ -131,6 +288,11 @@
179
         String pidValue = "find by pid:" + UUID.randomUUID().toString();
180
         String status = "TEST-STATUS";
181
         saveIndexTaskWithStatus(pidValue, status);
182
+
183
+        String pidValue2 = "find by pid:" + UUID.randomUUID().toString();
184
+        String status2 = "TEST-STATUS2";
185
+        saveIndexTaskWithStatus(pidValue2, status2);
186
+
187
         List<IndexTask> itList = repo.findByPidAndStatus(pidValue, status);
188
         Assert.assertEquals(1, itList.size());
189
         IndexTask it = itList.get(0);
190
@@ -139,6 +301,29 @@
191
         Assert.assertTrue(status.equals(it.getStatus()));
192
     }
193
 
194
+    @Test
195
+    public void testFindByStatusAndNextExection() {
196
+        String pidValue = "find by pid:" + UUID.randomUUID().toString();
197
+        String status = "TEST-NEXT";
198
+        saveIndexTaskWithStatus(pidValue, status);
199
+
200
+        String pidValue2 = "find by pid:" + UUID.randomUUID().toString();
201
+        IndexTask task2 = saveIndexTaskWithStatus(pidValue2, "TEST-NEXT");
202
+        Calendar cal = Calendar.getInstance();
203
+        cal.setTimeInMillis(System.currentTimeMillis());
204
+        cal.add(Calendar.DATE, 1);
205
+        task2.setNextExection(cal.getTimeInMillis());
206
+        repo.save(task2);
207
+
208
+        List<IndexTask> itList = repo.findByStatusAndNextExecutionLessThan(status,
209
+                System.currentTimeMillis());
210
+        Assert.assertEquals(1, itList.size());
211
+        IndexTask it = itList.get(0);
212
+        Assert.assertNotNull(it);
213
+        Assert.assertTrue(pidValue.equals(it.getPid()));
214
+        Assert.assertTrue(status.equals(it.getStatus()));
215
+    }
216
+
217
     /**
218
      * Tests status narrowing and ordering of the task queue query
219
      */
220
@@ -148,6 +333,7 @@
221
                 "garbage status", 1);
222
 
223
         String status = "findQueue";
224
+        String status2 = "badStatus";
225
 
226
         repo.deleteAll();
227
 
228
@@ -163,6 +349,12 @@
229
         String pidValue3 = "3rd created task: " + UUID.randomUUID().toString();
230
         saveIndexTaskWithStatusAndPriority(pidValue3, status, 1);
231
 
232
+        String pidValue4 = "4th created task: " + UUID.randomUUID().toString();
233
+        saveIndexTaskWithStatusAndPriority(pidValue4, status2, 1);
234
+
235
+        String pidValue5 = "thrd created task: " + UUID.randomUUID().toString();
236
+        saveIndexTaskWithStatusAndPriority(pidValue5, status2, 1);
237
+
238
         List<IndexTask> queue = repo.findByStatusOrderByPriorityAscTaskModifiedDateAsc(status);
239
         Assert.assertEquals(3, queue.size());
240
 
241
Index: src/main/java/org/dataone/cn/index/task/IndexTask.java
242
===================================================================
243
--- src/main/java/org/dataone/cn/index/task/IndexTask.java	(revision 8794)
244
+++ src/main/java/org/dataone/cn/index/task/IndexTask.java	(working copy)
245
@@ -28,6 +28,7 @@
246
 import java.io.InputStream;
247
 import java.io.Serializable;
248
 import java.io.UnsupportedEncodingException;
249
+import java.util.Calendar;
250
 
251
 import javax.persistence.Column;
252
 import javax.persistence.Entity;
253
@@ -68,8 +69,7 @@
254
     private static Logger logger = Logger.getLogger(IndexTask.class.getName());
255
 
256
     @Transient
257
-    private final FastDateFormat format = FastDateFormat
258
-            .getInstance("MM/dd/yyyy:HH:mm:ss:SS");
259
+    private final FastDateFormat format = FastDateFormat.getInstance("MM/dd/yyyy:HH:mm:ss:SS");
260
 
261
     @Transient
262
     private static final String FORMAT_RESOURCE_MAP = "http://www.openarchives.org/ore/terms";
263
@@ -118,6 +118,10 @@
264
      */
265
     private long taskModifiedDate;
266
 
267
+    private long nextExecution = 0;
268
+
269
+    private int tryCount = 0;
270
+
271
     /**
272
      * Relative priority of this task. Some operations such as a change in
273
      * access control rules should be propagated to the index before others
274
@@ -318,6 +322,22 @@
275
         this.dateSysMetaModified = dateSysMetaModified;
276
     }
277
 
278
+    public long getNextExecution() {
279
+        return this.nextExecution;
280
+    }
281
+
282
+    public void setNextExection(long next) {
283
+        this.nextExecution = next;
284
+    }
285
+
286
+    public int getTryCount() {
287
+        return tryCount;
288
+    }
289
+
290
+    public void setTryCount(int count) {
291
+        this.tryCount = count;
292
+    }
293
+
294
     /**
295
      * Private method exposed due to JPA and unit testing requirements. Should
296
      * not use directly.
297
@@ -385,21 +405,63 @@
298
         return status;
299
     }
300
 
301
+    // Do not use this method, used by unit tests only.
302
+    // use the specific 'markNew, markFailed, markInProcess' methods.
303
     public void setStatus(String status) {
304
-        this.taskModifiedDate = System.currentTimeMillis();
305
-        this.status = status;
306
+        if (status != null) {
307
+            this.taskModifiedDate = System.currentTimeMillis();
308
+            this.status = status;
309
+        }
310
     }
311
 
312
+    private void setBackoffExectionTime() {
313
+        if (getTryCount() == 2) {
314
+            Calendar cal = Calendar.getInstance();
315
+            cal.setTimeInMillis(System.currentTimeMillis());
316
+            cal.add(Calendar.MINUTE, 20);
317
+            setNextExection(cal.getTimeInMillis());
318
+        } else if (getTryCount() == 3) {
319
+            Calendar cal = Calendar.getInstance();
320
+            cal.setTimeInMillis(System.currentTimeMillis());
321
+            cal.add(Calendar.HOUR, 2);
322
+            setNextExection(cal.getTimeInMillis());
323
+        } else if (getTryCount() == 4) {
324
+            Calendar cal = Calendar.getInstance();
325
+            cal.setTimeInMillis(System.currentTimeMillis());
326
+            cal.add(Calendar.HOUR, 8);
327
+            setNextExection(cal.getTimeInMillis());
328
+        } else if (getTryCount() > 4) {
329
+            Calendar cal = Calendar.getInstance();
330
+            cal.setTimeInMillis(System.currentTimeMillis());
331
+            cal.add(Calendar.HOUR, 24);
332
+            setNextExection(cal.getTimeInMillis());
333
+        }
334
+    }
335
+
336
+    private boolean timeForRetryBackoff(String status) {
337
+        return (getTryCount() >= 2) && (STATUS_COMPLETE.equals(status) == false)
338
+                && (STATUS_IN_PROCESS.equals(status) == false);
339
+    }
340
+
341
     public void markInProgress() {
342
         this.setStatus(STATUS_IN_PROCESS);
343
+        this.tryCount++;
344
     }
345
 
346
     public void markNew() {
347
         this.setStatus(STATUS_NEW);
348
+        if (timeForRetryBackoff(status)) {
349
+            this.status = STATUS_FAILED;
350
+            setBackoffExectionTime();
351
+        }
352
     }
353
 
354
     public void markFailed() {
355
         this.setStatus(STATUS_FAILED);
356
+        if (timeForRetryBackoff(status)) {
357
+            this.status = STATUS_FAILED;
358
+            setBackoffExectionTime();
359
+        }
360
     }
361
 
362
     public int getVersion() {
363
@@ -412,9 +474,9 @@
364
 
365
     @Override
366
     public String toString() {
367
-        return "IndexTask [id=" + id + ", pid=" + pid + ", formatid=" + formatId
368
-                + ", objectPath=" + objectPath + ", dateSysMetaModified="
369
-                + dateSysMetaModified + ", taskModifiedDate=" + taskModifiedDate
370
-                + ", priority=" + priority + ", status=" + status + "]";
371
+        return "IndexTask [id=" + id + ", pid=" + pid + ", formatid=" + formatId + ", objectPath="
372
+                + objectPath + ", dateSysMetaModified=" + dateSysMetaModified
373
+                + ", taskModifiedDate=" + taskModifiedDate + ", priority=" + priority + ", status="
374
+                + status + "]";
375
     }
376
 }
377
Index: src/main/java/org/dataone/cn/index/task/IndexTaskRepository.java
378
===================================================================
379
--- src/main/java/org/dataone/cn/index/task/IndexTaskRepository.java	(revision 8644)
380
+++ src/main/java/org/dataone/cn/index/task/IndexTaskRepository.java	(working copy)
381
@@ -64,7 +64,8 @@
382
      * @param status
383
      * @return
384
      */
385
-    List<IndexTask> findByStatusOrderByPriorityAscTaskModifiedDateAsc(
386
-            String status);
387
+    List<IndexTask> findByStatusOrderByPriorityAscTaskModifiedDateAsc(String status);
388
 
389
+    List<IndexTask> findByStatusAndNextExecutionLessThan(String status, long time);
390
+
391
 }
392
#P d1_cn_index_generator
393
Index: src/main/java/org/dataone/cn/index/generator/IndexTaskGenerator.java
394
===================================================================
395
--- src/main/java/org/dataone/cn/index/generator/IndexTaskGenerator.java	(revision 8644)
396
+++ src/main/java/org/dataone/cn/index/generator/IndexTaskGenerator.java	(working copy)
397
@@ -42,8 +42,7 @@
398
  */
399
 public class IndexTaskGenerator {
400
 
401
-    private static Logger logger = Logger.getLogger(IndexTaskGenerator.class
402
-            .getName());
403
+    private static Logger logger = Logger.getLogger(IndexTaskGenerator.class.getName());
404
     private static final String IGNOREPID = "OBJECT_FORMAT_LIST.1.1";
405
 
406
     @Autowired
407
@@ -56,8 +55,7 @@
408
      * @param SystemMetadata
409
      * @return IndexTask
410
      */
411
-    public IndexTask processSystemMetaDataAdd(SystemMetadata smd,
412
-            String objectPath) {
413
+    public IndexTask processSystemMetaDataAdd(SystemMetadata smd, String objectPath) {
414
         if (isNotIgnorePid(smd)) {
415
             removeDuplicateNewTasks(smd);
416
             IndexTask task = new IndexTask(smd, objectPath);
417
@@ -75,8 +73,7 @@
418
      * @param SystemMetadata
419
      * @return IndexTask
420
      */
421
-    public IndexTask processSystemMetaDataUpdate(SystemMetadata smd,
422
-            String objectPath) {
423
+    public IndexTask processSystemMetaDataUpdate(SystemMetadata smd, String objectPath) {
424
         if (isNotIgnorePid(smd)) {
425
             removeDuplicateNewTasks(smd);
426
             IndexTask task = new IndexTask(smd, objectPath);
427
@@ -101,14 +98,18 @@
428
      * @param SystemMetadata
429
      */
430
     private void removeDuplicateNewTasks(SystemMetadata smd) {
431
-        List<IndexTask> itList = repo.findByPidAndStatus(smd.getIdentifier()
432
-                .getValue(), IndexTask.STATUS_NEW);
433
+        removeDuplicateTasksByStatus(smd, IndexTask.STATUS_NEW);
434
+        // new update on this pid, so remove failure and try to reprocess.
435
+        removeDuplicateTasksByStatus(smd, IndexTask.STATUS_FAILED);
436
+    }
437
+
438
+    private void removeDuplicateTasksByStatus(SystemMetadata smd, String status) {
439
+        List<IndexTask> itList = repo.findByPidAndStatus(smd.getIdentifier().getValue(), status);
440
         for (IndexTask indexTask : itList) {
441
             try {
442
                 repo.delete(indexTask);
443
             } catch (HibernateOptimisticLockingFailureException e) {
444
-                logger.debug("Unable to delete existing index task for pid: "
445
-                        + indexTask.getPid()
446
+                logger.debug("Unable to delete existing index task for pid: " + indexTask.getPid()
447
                         + " prior to generating new index task.");
448
             }
449
         }
450
#P d1_cn_index_processor
451
Index: src/main/java/org/dataone/cn/index/processor/IndexTaskDeleteProcessor.java
452
===================================================================
453
--- src/main/java/org/dataone/cn/index/processor/IndexTaskDeleteProcessor.java	(revision 10046)
454
+++ src/main/java/org/dataone/cn/index/processor/IndexTaskDeleteProcessor.java	(working copy)
455
@@ -22,7 +22,6 @@
456
 
457
 package org.dataone.cn.index.processor;
458
 
459
-import java.io.File;
460
 import java.io.IOException;
461
 import java.util.ArrayList;
462
 import java.util.List;
463
@@ -91,74 +90,51 @@
464
     }
465
 
466
     private void removeDataPackage(IndexTask task) throws Exception {
467
-        if (task.getObjectPath() == null) {
468
-            String objectPath = retrieveObjectPath(task.getPid());
469
-            task.setObjectPath(objectPath);
470
-        }
471
-        boolean resourceMapFileExists = false;
472
-        if (task.getObjectPath() != null) {
473
-            File objectPathFile = new File(task.getObjectPath());
474
-            if (objectPathFile.exists()) {
475
-                resourceMapFileExists = true;
476
-            } else {
477
-                logger.info("Object path exists: " + task.getObjectPath() + " for pid: "
478
-                        + task.getPid() + " but file location does not yet exist.");
479
+        Document resourceMapDoc = getXPathDocumentParser().loadDocument(task.getObjectPath());
480
+        ResourceMap resourceMap = new ResourceMap(resourceMapDoc);
481
+        List<String> documentIds = resourceMap.getAllDocumentIDs();
482
+        List<SolrDoc> indexDocuments = httpService.getDocuments(solrQueryUri, documentIds);
483
+        removeFromIndex(task);
484
+        List<SolrDoc> docsToUpdate = new ArrayList<SolrDoc>();
485
+        // for each document in data package:
486
+        for (SolrDoc indexDoc : indexDocuments) {
487
+
488
+            if (indexDoc.getIdentifier().equals(task.getPid())) {
489
+                continue; // skipping the resource map, no need update
490
+                          // it.
491
+                          // will
492
+                          // be removed.
493
             }
494
-        } else {
495
-            logger.info("Object path not yet set for pid: " + task.getPid());
496
-        }
497
-        if (resourceMapFileExists) {
498
-            Document resourceMapDoc = getXPathDocumentParser().loadDocument(task.getObjectPath());
499
-            if (resourceMapDoc != null) {
500
-                ResourceMap resourceMap = new ResourceMap(resourceMapDoc);
501
-                List<String> documentIds = resourceMap.getAllDocumentIDs();
502
-                List<SolrDoc> indexDocuments = httpService.getDocuments(solrQueryUri, documentIds);
503
-                removeFromIndex(task);
504
-                List<SolrDoc> docsToUpdate = new ArrayList<SolrDoc>();
505
-                // for each document in data package:
506
-                for (SolrDoc indexDoc : indexDocuments) {
507
 
508
-                    if (indexDoc.getIdentifier().equals(task.getPid())) {
509
-                        continue; // skipping the resource map, no need update
510
-                                  // it.
511
-                                  // will
512
-                                  // be removed.
513
-                    }
514
+            // Remove resourceMap reference
515
+            indexDoc.removeFieldsWithValue(SolrElementField.FIELD_RESOURCEMAP,
516
+                    resourceMap.getIdentifier());
517
 
518
-                    // Remove resourceMap reference
519
-                    indexDoc.removeFieldsWithValue(SolrElementField.FIELD_RESOURCEMAP,
520
-                            resourceMap.getIdentifier());
521
-
522
-                    // // Remove documents/documentedby values for this resource
523
-                    // map
524
-                    for (ResourceEntry entry : resourceMap.getMappedReferences()) {
525
-                        if (indexDoc.getIdentifier().equals(entry.getIdentifier())) {
526
-                            for (String documentedBy : entry.getDocumentedBy()) {
527
-                                // Using removeOneFieldWithValue in-case same
528
-                                // documents
529
-                                // are in more than one data package. just
530
-                                // remove
531
-                                // one
532
-                                // instance of data package info.
533
-                                indexDoc.removeOneFieldWithValue(
534
-                                        SolrElementField.FIELD_ISDOCUMENTEDBY, documentedBy);
535
-                            }
536
-                            for (String documents : entry.getDocuments()) {
537
-                                indexDoc.removeOneFieldWithValue(SolrElementField.FIELD_DOCUMENTS,
538
-                                        documents);
539
-                            }
540
-                            break;
541
-                        }
542
+            // // Remove documents/documentedby values for this resource
543
+            // map
544
+            for (ResourceEntry entry : resourceMap.getMappedReferences()) {
545
+                if (indexDoc.getIdentifier().equals(entry.getIdentifier())) {
546
+                    for (String documentedBy : entry.getDocumentedBy()) {
547
+                        // Using removeOneFieldWithValue in-case same
548
+                        // documents
549
+                        // are in more than one data package. just
550
+                        // remove
551
+                        // one
552
+                        // instance of data package info.
553
+                        indexDoc.removeOneFieldWithValue(SolrElementField.FIELD_ISDOCUMENTEDBY,
554
+                                documentedBy);
555
                     }
556
-                    docsToUpdate.add(indexDoc);
557
+                    for (String documents : entry.getDocuments()) {
558
+                        indexDoc.removeOneFieldWithValue(SolrElementField.FIELD_DOCUMENTS,
559
+                                documents);
560
+                    }
561
+                    break;
562
                 }
563
-                SolrElementAdd addCommand = new SolrElementAdd(docsToUpdate);
564
-                httpService.sendUpdate(solrIndexUri, addCommand);
565
             }
566
-        } else {
567
-            task.markFailed();
568
-            saveTask(task);
569
+            docsToUpdate.add(indexDoc);
570
         }
571
+        SolrElementAdd addCommand = new SolrElementAdd(docsToUpdate);
572
+        httpService.sendUpdate(solrIndexUri, addCommand);
573
     }
574
 
575
     private void removeFromDataPackage(IndexTask task) throws Exception {
576
Index: src/main/java/org/dataone/cn/index/processor/IndexTaskProcessor.java
577
===================================================================
578
--- src/main/java/org/dataone/cn/index/processor/IndexTaskProcessor.java	(revision 10044)
579
+++ src/main/java/org/dataone/cn/index/processor/IndexTaskProcessor.java	(working copy)
580
@@ -111,6 +111,13 @@
581
             processTask(task);
582
             task = getNextIndexTask(queue);
583
         }
584
+
585
+        List<IndexTask> retryQueue = getIndexTaskRetryQueue();
586
+        task = getNextIndexTask(retryQueue);
587
+        while (task != null) {
588
+            processTask(task);
589
+            task = getNextIndexTask(queue);
590
+        }
591
     }
592
 
593
     private void processTask(IndexTask task) {
594
@@ -141,10 +148,6 @@
595
             task.markInProgress();
596
             task = saveTask(task);
597
 
598
-            if (task != null && task.isDeleteTask()) {
599
-                return task;
600
-            }
601
-
602
             if (task != null && !isObjectPathReady(task)) {
603
                 task.markNew();
604
                 saveTask(task);
605
@@ -152,7 +155,13 @@
606
                 continue;
607
             }
608
 
609
+            if (task != null && task.isDeleteTask()) {
610
+                return task;
611
+            }
612
+
613
             if (task != null && !isResourceMapReadyToIndex(task, queue)) {
614
+                task.markNew();
615
+                saveTask(task);
616
                 task = null;
617
             }
618
         }
619
@@ -166,8 +175,6 @@
620
             if (docObject == null) {
621
                 logger.debug("unable to load resource at object path: " + task.getObjectPath()
622
                         + ".  Marking new and continuing...");
623
-                task.markNew();
624
-                saveTask(task);
625
                 ready = false;
626
             } else if (docObject != null) {
627
                 ResourceMap rm = null;
628
@@ -181,8 +188,6 @@
629
                 if (areAllReferencedDocsIndexed(referencedIds) == false) {
630
                     logger.info("Not all map resource references indexed for map: " + task.getPid()
631
                             + ".  Marking new and continuing...");
632
-                    task.markNew();
633
-                    saveTask(task);
634
                     ready = false;
635
                 }
636
             }
637
@@ -294,6 +299,11 @@
638
         return repo.findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW);
639
     }
640
 
641
+    private List<IndexTask> getIndexTaskRetryQueue() {
642
+        return repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
643
+                System.currentTimeMillis());
644
+    }
645
+
646
     private XPathDocumentParser getXPathDocumentParser() {
647
         return documentParsers.get(0);
648
     }
Add picture from clipboard (Maximum size: 14.8 MB)