### Eclipse Workspace Patch 1.0 #P d1_cn_index_common Index: src/test/java/org/dataone/cn/index/test/IndexTaskJpaRepositoryTest.java =================================================================== --- src/test/java/org/dataone/cn/index/test/IndexTaskJpaRepositoryTest.java (revision 8327) +++ src/test/java/org/dataone/cn/index/test/IndexTaskJpaRepositoryTest.java (working copy) @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.math.BigInteger; +import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.UUID; @@ -66,6 +67,162 @@ } @Test + public void testTaskExecutionBackoffForRetry() { + repo.deleteAll(); + // noise + saveIndexTaskWithStatus(UUID.randomUUID().toString(), IndexTask.STATUS_NEW); + String pidValue = "find by pid:" + UUID.randomUUID().toString(); + IndexTask task = saveIndexTaskWithStatus(pidValue, IndexTask.STATUS_NEW); + + task = simulateMarkNewProcessing(task); + List itList = repo + .findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW); + Assert.assertEquals(2, itList.size()); + + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.MONTH, 1); + itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED, + cal.getTimeInMillis()); + Assert.assertEquals(0, itList.size()); + + Calendar cal1 = Calendar.getInstance(); + cal1.add(Calendar.MINUTE, 18); + Calendar cal2 = Calendar.getInstance(); + cal2.add(Calendar.MINUTE, 22); + task = testNextBackoffForRetry(task, cal1, cal2); + + cal1.setTimeInMillis(System.currentTimeMillis()); + cal1.add(Calendar.MINUTE, 22); + cal2.setTimeInMillis(System.currentTimeMillis()); + cal2.add(Calendar.MINUTE, 122); + task = testNextBackoffForRetry(task, cal1, cal2); + + cal1.setTimeInMillis(System.currentTimeMillis()); + cal1.add(Calendar.MINUTE, 122); + cal2.setTimeInMillis(System.currentTimeMillis()); + cal2.add(Calendar.MINUTE, (60 * 8) + 2); + task = testNextBackoffForRetry(task, cal1, cal2); + + cal1.setTimeInMillis(System.currentTimeMillis()); + cal1.add(Calendar.MINUTE, (60 * 8) + 2); + cal2.setTimeInMillis(System.currentTimeMillis()); + cal2.add(Calendar.MINUTE, (60 * 24) + 2); + task = testNextBackoffForRetry(task, cal1, cal2); + + cal1.setTimeInMillis(System.currentTimeMillis()); + cal1.add(Calendar.MINUTE, (60 * 8) + 2); + cal2.setTimeInMillis(System.currentTimeMillis()); + cal2.add(Calendar.MINUTE, (60 * 24) + 2); + task = testNextBackoffForRetry(task, cal1, cal2); + } + + @Test + public void testTaskExecutionBackoffForFailed() { + repo.deleteAll(); + // noise + saveIndexTaskWithStatus(UUID.randomUUID().toString(), IndexTask.STATUS_NEW); + String pidValue = "find by pid:" + UUID.randomUUID().toString(); + IndexTask task = saveIndexTaskWithStatus(pidValue, IndexTask.STATUS_NEW); + + task = simulateMarkFailedProcessing(task); + List itList = repo + .findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW); + Assert.assertEquals(1, itList.size()); + + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.MINUTE, 1); + itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED, + cal.getTimeInMillis()); + Assert.assertEquals(1, itList.size()); + + Calendar cal1 = Calendar.getInstance(); + cal1.add(Calendar.MINUTE, 18); + Calendar cal2 = Calendar.getInstance(); + cal2.add(Calendar.MINUTE, 22); + task = testNextBackoffForFailed(task, cal1, cal2); + + cal1.setTimeInMillis(System.currentTimeMillis()); + cal1.add(Calendar.MINUTE, 22); + cal2.setTimeInMillis(System.currentTimeMillis()); + cal2.add(Calendar.MINUTE, 122); + task = testNextBackoffForFailed(task, cal1, cal2); + + cal1.setTimeInMillis(System.currentTimeMillis()); + cal1.add(Calendar.MINUTE, 122); + cal2.setTimeInMillis(System.currentTimeMillis()); + cal2.add(Calendar.MINUTE, (60 * 8) + 2); + task = testNextBackoffForFailed(task, cal1, cal2); + + cal1.setTimeInMillis(System.currentTimeMillis()); + cal1.add(Calendar.MINUTE, (60 * 8) + 2); + cal2.setTimeInMillis(System.currentTimeMillis()); + cal2.add(Calendar.MINUTE, (60 * 24) + 2); + task = testNextBackoffForFailed(task, cal1, cal2); + + cal1.setTimeInMillis(System.currentTimeMillis()); + cal1.add(Calendar.MINUTE, (60 * 8) + 2); + cal2.setTimeInMillis(System.currentTimeMillis()); + cal2.add(Calendar.MINUTE, (60 * 24) + 2); + task = testNextBackoffForFailed(task, cal1, cal2); + } + + private IndexTask testNextBackoffForRetry(IndexTask task, Calendar previousTimeIncrement, + Calendar nextTimeIncrement) { + + task = simulateMarkNewProcessing(task); + List itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED, + previousTimeIncrement.getTimeInMillis()); + Assert.assertEquals(0, itList.size()); + + itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED, + nextTimeIncrement.getTimeInMillis()); + Assert.assertEquals(1, itList.size()); + + IndexTask it = itList.get(0); + Assert.assertNotNull(it); + Assert.assertTrue(task.getPid().equals(it.getPid())); + Assert.assertTrue(IndexTask.STATUS_FAILED.equals(it.getStatus())); + return task; + } + + private IndexTask testNextBackoffForFailed(IndexTask task, Calendar previousTimeIncrement, + Calendar nextTimeIncrement) { + + task = simulateMarkFailedProcessing(task); + List itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED, + previousTimeIncrement.getTimeInMillis()); + Assert.assertEquals(0, itList.size()); + + itList = repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED, + nextTimeIncrement.getTimeInMillis()); + Assert.assertEquals(1, itList.size()); + + IndexTask it = itList.get(0); + Assert.assertNotNull(it); + Assert.assertTrue(task.getPid().equals(it.getPid())); + Assert.assertTrue(IndexTask.STATUS_FAILED.equals(it.getStatus())); + return task; + } + + private IndexTask simulateMarkNewProcessing(IndexTask task) { + task.markInProgress(); + // not ready + task.markNew(); + task = repo.save(task); + return task; + } + + private IndexTask simulateMarkFailedProcessing(IndexTask task) { + task.markInProgress(); + // not ready + task.markFailed(); + task = repo.save(task); + return task; + } + + @Test public void testAddOneTask() { int initialSize = repo.findAll().size(); IndexTask task = saveIndexTask("pid1"); @@ -131,6 +288,11 @@ String pidValue = "find by pid:" + UUID.randomUUID().toString(); String status = "TEST-STATUS"; saveIndexTaskWithStatus(pidValue, status); + + String pidValue2 = "find by pid:" + UUID.randomUUID().toString(); + String status2 = "TEST-STATUS2"; + saveIndexTaskWithStatus(pidValue2, status2); + List itList = repo.findByPidAndStatus(pidValue, status); Assert.assertEquals(1, itList.size()); IndexTask it = itList.get(0); @@ -139,6 +301,29 @@ Assert.assertTrue(status.equals(it.getStatus())); } + @Test + public void testFindByStatusAndNextExection() { + String pidValue = "find by pid:" + UUID.randomUUID().toString(); + String status = "TEST-NEXT"; + saveIndexTaskWithStatus(pidValue, status); + + String pidValue2 = "find by pid:" + UUID.randomUUID().toString(); + IndexTask task2 = saveIndexTaskWithStatus(pidValue2, "TEST-NEXT"); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.DATE, 1); + task2.setNextExection(cal.getTimeInMillis()); + repo.save(task2); + + List itList = repo.findByStatusAndNextExecutionLessThan(status, + System.currentTimeMillis()); + Assert.assertEquals(1, itList.size()); + IndexTask it = itList.get(0); + Assert.assertNotNull(it); + Assert.assertTrue(pidValue.equals(it.getPid())); + Assert.assertTrue(status.equals(it.getStatus())); + } + /** * Tests status narrowing and ordering of the task queue query */ @@ -148,6 +333,7 @@ "garbage status", 1); String status = "findQueue"; + String status2 = "badStatus"; repo.deleteAll(); @@ -163,6 +349,12 @@ String pidValue3 = "3rd created task: " + UUID.randomUUID().toString(); saveIndexTaskWithStatusAndPriority(pidValue3, status, 1); + String pidValue4 = "4th created task: " + UUID.randomUUID().toString(); + saveIndexTaskWithStatusAndPriority(pidValue4, status2, 1); + + String pidValue5 = "thrd created task: " + UUID.randomUUID().toString(); + saveIndexTaskWithStatusAndPriority(pidValue5, status2, 1); + List queue = repo.findByStatusOrderByPriorityAscTaskModifiedDateAsc(status); Assert.assertEquals(3, queue.size()); Index: src/main/java/org/dataone/cn/index/task/IndexTask.java =================================================================== --- src/main/java/org/dataone/cn/index/task/IndexTask.java (revision 8794) +++ src/main/java/org/dataone/cn/index/task/IndexTask.java (working copy) @@ -28,6 +28,7 @@ import java.io.InputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; +import java.util.Calendar; import javax.persistence.Column; import javax.persistence.Entity; @@ -68,8 +69,7 @@ private static Logger logger = Logger.getLogger(IndexTask.class.getName()); @Transient - private final FastDateFormat format = FastDateFormat - .getInstance("MM/dd/yyyy:HH:mm:ss:SS"); + private final FastDateFormat format = FastDateFormat.getInstance("MM/dd/yyyy:HH:mm:ss:SS"); @Transient private static final String FORMAT_RESOURCE_MAP = "http://www.openarchives.org/ore/terms"; @@ -118,6 +118,10 @@ */ private long taskModifiedDate; + private long nextExecution = 0; + + private int tryCount = 0; + /** * Relative priority of this task. Some operations such as a change in * access control rules should be propagated to the index before others @@ -318,6 +322,22 @@ this.dateSysMetaModified = dateSysMetaModified; } + public long getNextExecution() { + return this.nextExecution; + } + + public void setNextExection(long next) { + this.nextExecution = next; + } + + public int getTryCount() { + return tryCount; + } + + public void setTryCount(int count) { + this.tryCount = count; + } + /** * Private method exposed due to JPA and unit testing requirements. Should * not use directly. @@ -385,21 +405,63 @@ return status; } + // Do not use this method, used by unit tests only. + // use the specific 'markNew, markFailed, markInProcess' methods. public void setStatus(String status) { - this.taskModifiedDate = System.currentTimeMillis(); - this.status = status; + if (status != null) { + this.taskModifiedDate = System.currentTimeMillis(); + this.status = status; + } } + private void setBackoffExectionTime() { + if (getTryCount() == 2) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.MINUTE, 20); + setNextExection(cal.getTimeInMillis()); + } else if (getTryCount() == 3) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.HOUR, 2); + setNextExection(cal.getTimeInMillis()); + } else if (getTryCount() == 4) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.HOUR, 8); + setNextExection(cal.getTimeInMillis()); + } else if (getTryCount() > 4) { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(System.currentTimeMillis()); + cal.add(Calendar.HOUR, 24); + setNextExection(cal.getTimeInMillis()); + } + } + + private boolean timeForRetryBackoff(String status) { + return (getTryCount() >= 2) && (STATUS_COMPLETE.equals(status) == false) + && (STATUS_IN_PROCESS.equals(status) == false); + } + public void markInProgress() { this.setStatus(STATUS_IN_PROCESS); + this.tryCount++; } public void markNew() { this.setStatus(STATUS_NEW); + if (timeForRetryBackoff(status)) { + this.status = STATUS_FAILED; + setBackoffExectionTime(); + } } public void markFailed() { this.setStatus(STATUS_FAILED); + if (timeForRetryBackoff(status)) { + this.status = STATUS_FAILED; + setBackoffExectionTime(); + } } public int getVersion() { @@ -412,9 +474,9 @@ @Override public String toString() { - return "IndexTask [id=" + id + ", pid=" + pid + ", formatid=" + formatId - + ", objectPath=" + objectPath + ", dateSysMetaModified=" - + dateSysMetaModified + ", taskModifiedDate=" + taskModifiedDate - + ", priority=" + priority + ", status=" + status + "]"; + return "IndexTask [id=" + id + ", pid=" + pid + ", formatid=" + formatId + ", objectPath=" + + objectPath + ", dateSysMetaModified=" + dateSysMetaModified + + ", taskModifiedDate=" + taskModifiedDate + ", priority=" + priority + ", status=" + + status + "]"; } } Index: src/main/java/org/dataone/cn/index/task/IndexTaskRepository.java =================================================================== --- src/main/java/org/dataone/cn/index/task/IndexTaskRepository.java (revision 8644) +++ src/main/java/org/dataone/cn/index/task/IndexTaskRepository.java (working copy) @@ -64,7 +64,8 @@ * @param status * @return */ - List findByStatusOrderByPriorityAscTaskModifiedDateAsc( - String status); + List findByStatusOrderByPriorityAscTaskModifiedDateAsc(String status); + List findByStatusAndNextExecutionLessThan(String status, long time); + } #P d1_cn_index_generator Index: src/main/java/org/dataone/cn/index/generator/IndexTaskGenerator.java =================================================================== --- src/main/java/org/dataone/cn/index/generator/IndexTaskGenerator.java (revision 8644) +++ src/main/java/org/dataone/cn/index/generator/IndexTaskGenerator.java (working copy) @@ -42,8 +42,7 @@ */ public class IndexTaskGenerator { - private static Logger logger = Logger.getLogger(IndexTaskGenerator.class - .getName()); + private static Logger logger = Logger.getLogger(IndexTaskGenerator.class.getName()); private static final String IGNOREPID = "OBJECT_FORMAT_LIST.1.1"; @Autowired @@ -56,8 +55,7 @@ * @param SystemMetadata * @return IndexTask */ - public IndexTask processSystemMetaDataAdd(SystemMetadata smd, - String objectPath) { + public IndexTask processSystemMetaDataAdd(SystemMetadata smd, String objectPath) { if (isNotIgnorePid(smd)) { removeDuplicateNewTasks(smd); IndexTask task = new IndexTask(smd, objectPath); @@ -75,8 +73,7 @@ * @param SystemMetadata * @return IndexTask */ - public IndexTask processSystemMetaDataUpdate(SystemMetadata smd, - String objectPath) { + public IndexTask processSystemMetaDataUpdate(SystemMetadata smd, String objectPath) { if (isNotIgnorePid(smd)) { removeDuplicateNewTasks(smd); IndexTask task = new IndexTask(smd, objectPath); @@ -101,14 +98,18 @@ * @param SystemMetadata */ private void removeDuplicateNewTasks(SystemMetadata smd) { - List itList = repo.findByPidAndStatus(smd.getIdentifier() - .getValue(), IndexTask.STATUS_NEW); + removeDuplicateTasksByStatus(smd, IndexTask.STATUS_NEW); + // new update on this pid, so remove failure and try to reprocess. + removeDuplicateTasksByStatus(smd, IndexTask.STATUS_FAILED); + } + + private void removeDuplicateTasksByStatus(SystemMetadata smd, String status) { + List itList = repo.findByPidAndStatus(smd.getIdentifier().getValue(), status); for (IndexTask indexTask : itList) { try { repo.delete(indexTask); } catch (HibernateOptimisticLockingFailureException e) { - logger.debug("Unable to delete existing index task for pid: " - + indexTask.getPid() + logger.debug("Unable to delete existing index task for pid: " + indexTask.getPid() + " prior to generating new index task."); } } #P d1_cn_index_processor Index: src/main/java/org/dataone/cn/index/processor/IndexTaskDeleteProcessor.java =================================================================== --- src/main/java/org/dataone/cn/index/processor/IndexTaskDeleteProcessor.java (revision 10046) +++ src/main/java/org/dataone/cn/index/processor/IndexTaskDeleteProcessor.java (working copy) @@ -22,7 +22,6 @@ package org.dataone.cn.index.processor; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -91,74 +90,51 @@ } private void removeDataPackage(IndexTask task) throws Exception { - if (task.getObjectPath() == null) { - String objectPath = retrieveObjectPath(task.getPid()); - task.setObjectPath(objectPath); - } - boolean resourceMapFileExists = false; - if (task.getObjectPath() != null) { - File objectPathFile = new File(task.getObjectPath()); - if (objectPathFile.exists()) { - resourceMapFileExists = true; - } else { - logger.info("Object path exists: " + task.getObjectPath() + " for pid: " - + task.getPid() + " but file location does not yet exist."); + Document resourceMapDoc = getXPathDocumentParser().loadDocument(task.getObjectPath()); + ResourceMap resourceMap = new ResourceMap(resourceMapDoc); + List documentIds = resourceMap.getAllDocumentIDs(); + List indexDocuments = httpService.getDocuments(solrQueryUri, documentIds); + removeFromIndex(task); + List docsToUpdate = new ArrayList(); + // for each document in data package: + for (SolrDoc indexDoc : indexDocuments) { + + if (indexDoc.getIdentifier().equals(task.getPid())) { + continue; // skipping the resource map, no need update + // it. + // will + // be removed. } - } else { - logger.info("Object path not yet set for pid: " + task.getPid()); - } - if (resourceMapFileExists) { - Document resourceMapDoc = getXPathDocumentParser().loadDocument(task.getObjectPath()); - if (resourceMapDoc != null) { - ResourceMap resourceMap = new ResourceMap(resourceMapDoc); - List documentIds = resourceMap.getAllDocumentIDs(); - List indexDocuments = httpService.getDocuments(solrQueryUri, documentIds); - removeFromIndex(task); - List docsToUpdate = new ArrayList(); - // for each document in data package: - for (SolrDoc indexDoc : indexDocuments) { - if (indexDoc.getIdentifier().equals(task.getPid())) { - continue; // skipping the resource map, no need update - // it. - // will - // be removed. - } + // Remove resourceMap reference + indexDoc.removeFieldsWithValue(SolrElementField.FIELD_RESOURCEMAP, + resourceMap.getIdentifier()); - // Remove resourceMap reference - indexDoc.removeFieldsWithValue(SolrElementField.FIELD_RESOURCEMAP, - resourceMap.getIdentifier()); - - // // Remove documents/documentedby values for this resource - // map - for (ResourceEntry entry : resourceMap.getMappedReferences()) { - if (indexDoc.getIdentifier().equals(entry.getIdentifier())) { - for (String documentedBy : entry.getDocumentedBy()) { - // Using removeOneFieldWithValue in-case same - // documents - // are in more than one data package. just - // remove - // one - // instance of data package info. - indexDoc.removeOneFieldWithValue( - SolrElementField.FIELD_ISDOCUMENTEDBY, documentedBy); - } - for (String documents : entry.getDocuments()) { - indexDoc.removeOneFieldWithValue(SolrElementField.FIELD_DOCUMENTS, - documents); - } - break; - } + // // Remove documents/documentedby values for this resource + // map + for (ResourceEntry entry : resourceMap.getMappedReferences()) { + if (indexDoc.getIdentifier().equals(entry.getIdentifier())) { + for (String documentedBy : entry.getDocumentedBy()) { + // Using removeOneFieldWithValue in-case same + // documents + // are in more than one data package. just + // remove + // one + // instance of data package info. + indexDoc.removeOneFieldWithValue(SolrElementField.FIELD_ISDOCUMENTEDBY, + documentedBy); } - docsToUpdate.add(indexDoc); + for (String documents : entry.getDocuments()) { + indexDoc.removeOneFieldWithValue(SolrElementField.FIELD_DOCUMENTS, + documents); + } + break; } - SolrElementAdd addCommand = new SolrElementAdd(docsToUpdate); - httpService.sendUpdate(solrIndexUri, addCommand); } - } else { - task.markFailed(); - saveTask(task); + docsToUpdate.add(indexDoc); } + SolrElementAdd addCommand = new SolrElementAdd(docsToUpdate); + httpService.sendUpdate(solrIndexUri, addCommand); } private void removeFromDataPackage(IndexTask task) throws Exception { Index: src/main/java/org/dataone/cn/index/processor/IndexTaskProcessor.java =================================================================== --- src/main/java/org/dataone/cn/index/processor/IndexTaskProcessor.java (revision 10044) +++ src/main/java/org/dataone/cn/index/processor/IndexTaskProcessor.java (working copy) @@ -111,6 +111,13 @@ processTask(task); task = getNextIndexTask(queue); } + + List retryQueue = getIndexTaskRetryQueue(); + task = getNextIndexTask(retryQueue); + while (task != null) { + processTask(task); + task = getNextIndexTask(queue); + } } private void processTask(IndexTask task) { @@ -141,10 +148,6 @@ task.markInProgress(); task = saveTask(task); - if (task != null && task.isDeleteTask()) { - return task; - } - if (task != null && !isObjectPathReady(task)) { task.markNew(); saveTask(task); @@ -152,7 +155,13 @@ continue; } + if (task != null && task.isDeleteTask()) { + return task; + } + if (task != null && !isResourceMapReadyToIndex(task, queue)) { + task.markNew(); + saveTask(task); task = null; } } @@ -166,8 +175,6 @@ if (docObject == null) { logger.debug("unable to load resource at object path: " + task.getObjectPath() + ". Marking new and continuing..."); - task.markNew(); - saveTask(task); ready = false; } else if (docObject != null) { ResourceMap rm = null; @@ -181,8 +188,6 @@ if (areAllReferencedDocsIndexed(referencedIds) == false) { logger.info("Not all map resource references indexed for map: " + task.getPid() + ". Marking new and continuing..."); - task.markNew(); - saveTask(task); ready = false; } } @@ -294,6 +299,11 @@ return repo.findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW); } + private List getIndexTaskRetryQueue() { + return repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED, + System.currentTimeMillis()); + } + private XPathDocumentParser getXPathDocumentParser() { return documentParsers.get(0); }