Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
const val TYPE = "type"
const val INDEX_PATTERN_SUFFIX = "-000001"
const val QUERY_INDEX_BASE_FIELDS_COUNT = 8 // 3 fields we defined and 5 builtin additional metadata fields

/**
* Returns a mutable deep copy of [map] so that callers can mutate the result
* without affecting the original (e.g. cached cluster-state data).
*/
@Suppress("UNCHECKED_CAST")
fun deepCopyMap(map: Map<String, Any>): MutableMap<String, Any> =
map.entries.associate { (k, v) ->
k to when (v) {
is Map<*, *> -> deepCopyMap(v as Map<String, Any>)
is List<*> -> (v as List<Any>).map { elem ->
if (elem is Map<*, *>) deepCopyMap(elem as Map<String, Any>) else elem
}.toMutableList()
else -> v
}
}.toMutableMap()

@JvmStatic
fun docLevelQueriesMappings(): String {
return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText()
Expand Down Expand Up @@ -297,15 +314,18 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
if (clusterState.routingTable.hasIndex(concreteIndexName)) {
val indexMetadata = clusterState.metadata.index(concreteIndexName)
if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) {
val properties = (
(indexMetadata.mapping()?.sourceAsMap?.get("properties"))
as MutableMap<String, Any>
)
@Suppress("UNCHECKED_CAST")
val properties = deepCopyMap(
indexMetadata.mapping()?.sourceAsMap?.get("properties") as Map<String, Any>
)
// Node processor function is used to process leaves of index mappings tree
//
val leafNodeProcessor =
fun(fieldName: String, fullPath: String, props: MutableMap<String, Any>):
Triple<String, String, MutableMap<String, Any>> {
if (props["type"] == "alias") {
return Triple(fieldName, fieldName, mutableMapOf())
}
val newProps = props.toMutableMap()
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
Expand All @@ -331,14 +351,23 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
// Traverse and update index mappings here while extracting flatten field paths
val flattenPaths = mutableMapOf<String, MutableMap<String, Any>>()
traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths)
flattenPaths.keys.forEach { allFlattenPaths.add(Pair(it, concreteIndexName)) }
flattenPaths.entries
.filter { (_, props) -> props["type"] != "alias" }
.forEach { (path, _) -> allFlattenPaths.add(Pair(path, concreteIndexName)) }
// Updated mappings ready to be applied on queryIndex
properties.forEach {
if (
it.value is Map<*, *> &&
(it.value as Map<String, Any>).containsKey("type") &&
(it.value as Map<String, Any>)["type"] == NESTED
) {
} else if (
it.value is Map<*, *> &&
(
(it.value as Map<String, Any>)["type"] == "alias" ||
(it.value as Map<String, Any>).isEmpty()
)
) {
} else {
if (updatedProperties.containsKey(it.key) && updatedProperties[it.key] != it.value) {
val mergedField = mergeConflictingFields(
Expand Down Expand Up @@ -526,7 +555,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return Pair(updateMappingResponse, targetQueryIndex)
} catch (e: Exception) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
log.debug("exception after rollover queryIndex index: $targetQueryIndex exception: ${unwrappedException.message}")
log.warn("PUT mapping failed on queryIndex $targetQueryIndex [${unwrappedException.javaClass.simpleName}]: ${unwrappedException.message}")
// If we reached limit for total number of fields in mappings, do a rollover here
if (unwrappedException.message?.contains("Limit of total fields") == true) {
try {
Expand Down Expand Up @@ -556,6 +585,20 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}
}
} else {
// Fast-fail on structural mapping errors that index recreation cannot fix.
val isUnrecoverableMappingError = unwrappedException.message?.let {
it.contains("alias must refer to an existing field") ||
(it.contains("mapper [") && it.contains("cannot be changed from type"))
} ?: false

if (isUnrecoverableMappingError) {
log.error(
"Unrecoverable mapping error on queryIndex $targetQueryIndex — " +
"retry will not help: ${unwrappedException.message}"
)
throw AlertingException.wrap(unwrappedException)
}

// retry with deleting query index
if (monitor.deleteQueryIndexInEveryRun == true) {
try {
Expand Down Expand Up @@ -648,21 +691,26 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
if (clusterState.routingTable.hasIndex(concreteIndexName)) {
val indexMetadata = clusterState.metadata.index(concreteIndexName)
if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) {
val properties = (
(indexMetadata.mapping()?.sourceAsMap?.get("properties"))
as MutableMap<String, Any>
)
@Suppress("UNCHECKED_CAST")
val properties = deepCopyMap(
indexMetadata.mapping()?.sourceAsMap?.get("properties") as Map<String, Any>
)
// Node processor function is used to process leaves of index mappings tree
//
val leafNodeProcessor =
fun(fieldName: String, _: String, props: MutableMap<String, Any>): Triple<String, String, MutableMap<String, Any>> {
if (props["type"] == "alias") {
return Triple(fieldName, fieldName, mutableMapOf())
}
return Triple(fieldName, fieldName, props)
}
// Traverse and update index mappings here while extracting flatten field paths
val flattenPaths = mutableMapOf<String, MutableMap<String, Any>>()
traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths)

flattenPaths.forEach {
// skip alias-type fields — they differ by design across indices and must not be flagged
if (it.value["type"] == "alias") return@forEach
if (allFlattenPaths.containsKey(it.key) && allFlattenPaths[it.key]!! != it.value) {
conflictingFields.add(it.key)
}
Expand Down
Loading