Add BULK_FATE check to SystemCheck command for orphaned loaded/selected columns#6158
Add BULK_FATE check to SystemCheck command for orphaned loaded/selected columns#6158ArbaazKhan1 wants to merge 1 commit intoapache:mainfrom
Conversation
| printRunning(); | ||
|
|
||
| log.trace("********** Checking for orphaned bulk-import loaded columns **********"); | ||
|
|
There was a problem hiding this comment.
This code has race condition where a bulk import may start after creating liveFateIds and before scanning the metadata table. To fix this can do an initial scan of the metadata table here, this scan would be similar to the later scans but it would only collect fate ids.
final Set<FateId> initialMetadataIds = new HashSet<>();
// TODO scan metadata and find all fate idsThis set can be used in the second scan of the metadata table to avoid race conditions.
| Map<StoredTabletFile,FateId> loaded = tablet.getLoaded(); | ||
| for (Map.Entry<StoredTabletFile,FateId> entry : loaded.entrySet()) { | ||
| FateId fateId = entry.getValue(); | ||
| if (!liveFateIds.contains(fateId)) { |
There was a problem hiding this comment.
Can add this check here and few lines down to avoid the race condition.
| if (!liveFateIds.contains(fateId)) { | |
| // The fate id exist in the metadata table before and after scanning the fate table, so it must be a dead fateId. | |
| if (!liveFateIds.contains(fateId) && initialMetadataIds.contains(fateId)) { |
| import org.apache.accumulo.server.util.adminCommand.SystemCheck.Check; | ||
| import org.apache.accumulo.server.util.adminCommand.SystemCheck.CheckStatus; | ||
|
|
||
| public class BulkLoadedFateCheckRunner implements CheckRunner { |
There was a problem hiding this comment.
This code is actually finding dead fate ids related compactions and bulk imports, so could make the class name more general.
| UserFateStore<BulkLoadedFateCheckRunner> ufs = | ||
| new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null)) { | ||
|
|
||
| mfs.list().map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add); |
There was a problem hiding this comment.
There is another list method that takes an enum set. Would be better to use that and pass in EnumSet.of(IN_PROGRESS,FAILED_IN_PROGRESS,SUBMITTED) to only see things that are currently running or about to run. Would not expect to see a finished fate operation to have an id in the metadata table.
Also we could filter to the fate operations to only table compactions and bulk imports. This is not needed for correctness, just pull less stuff into memory.
| mfs.list().map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add); | |
| mfs.list(EnumSet.of(IN_PROGRESS,FAILED_IN_PROGRESS,SUBMITTED)).filter(// TODO filter on operations of type TABLE_COMPACT or BULK_IMPORT) map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add); |
| TabletsMetadata | ||
| .builder(context).scanMetadataTable().fetch(TabletMetadata.ColumnType.LOADED, |
There was a problem hiding this comment.
Should use ample off the context. Also need to read from all three data levels. So would need to refactor this code to loop over the enums in Ample.DataLevel and scan the metadata table for each level.
context.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(TabletMetadata.ColumnType.LOADED,
Closes issue #5175
Added new
BULK_FATECheck to SystemCheck command that detects orphaned bulk import columns in tablet metadata that references FATE operations that no longer exist.