From e51a5dd1c35d7d191b63c06093e7469a59758bf2 Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Tue, 2 Jun 2026 13:01:12 +0800 Subject: [PATCH] =?UTF-8?q?[refact](udf)=20remove=20the=20udf=20cache=20ex?= =?UTF-8?q?piration=5Ftime=20=E2=80=8Cproperty=E2=80=8C=20(#63897)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Problem Summary: doc https://github.com/apache/doris-website/pull/3845 ``` CREATE FUNCTION print_12() RETURNS int PROPERTIES ( "file" = "file:///path/to/java-udf-demo-jar-with-dependencies.jar", "symbol" = "org.apache.doris.udf.Print", "always_nullable"="true", "type" = "JAVA_UDF", "static_load" = "true", // default value is false "expiration_time" = "60" // default value is 360 minutes ); ``` ``` before in the java-udf could use static_load and expiration_time to control the cache jar times in BE. which use a backgroud thread to scan the jars every ten minutes, check it's init times, and then drop it if time expire. those will cause some long running query failed when the backgroud thread remove it. Now, remove the expiration_time, and the jar will be clean when drop fucntion immediately ``` --- .../common/classloader/ScannerLoader.java | 72 +++++++++++-- .../doris/common/jni/utils/ExpiringMap.java | 100 ------------------ .../org/apache/doris/udf/BaseExecutor.java | 17 ++- 3 files changed, 78 insertions(+), 111 deletions(-) delete mode 100644 fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java index 1c5874031d49b3..f8a119efaa9ba4 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -17,7 +17,6 @@ package org.apache.doris.common.classloader; -import org.apache.doris.common.jni.utils.ExpiringMap; import org.apache.doris.common.jni.utils.UdfClassCache; import com.google.common.collect.Streams; @@ -37,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.stream.Collectors; @@ -88,7 +88,21 @@ public class ScannerLoader { public static final Logger LOG = LogManager.getLogger(ScannerLoader.class); private static final Map> loadedClasses = new HashMap<>(); - private static final ExpiringMap udfLoadedClasses = new ExpiringMap<>(); + // Cache of UDF class metadata (including the URLClassLoader used to load the UDF). + // Entries are inserted on first use and only ever removed by an explicit + // cleanUdfClassLoader() call (triggered by FE on DROP FUNCTION). There is intentionally + // no time-based eviction: that previously caused two issues — + // 1) closing a URLClassLoader while another thread was still loading classes from it + // led to NoClassDefFoundError; + // 2) rebuilding a fresh URLClassLoader on every eviction produced multiple coexisting + // ClassLoaders for the same UDF, which broke lazy class resolution and reflective + // lookups inside user UDF code. + // NOTE: a cache miss in BaseExecutor.getClassCache() is NOT only reachable after + // cleanUdfClassLoader() — concurrent first-time loads of the same signature can also + // both observe a miss. cacheClassLoader() must therefore insert atomically via + // putIfAbsent and must never close a cache that was already published to the map, + // because another executor may already be holding it. + private static final Map udfLoadedClasses = new ConcurrentHashMap<>(); private static final String CLASS_SUFFIX = ".class"; private static final String LOAD_PACKAGE = "org.apache.doris"; @@ -116,15 +130,57 @@ public static UdfClassCache getUdfClassLoader(String functionSignature) { return udfLoadedClasses.get(functionSignature); } - public static synchronized void cacheClassLoader(String functionSignature, UdfClassCache classCache, + /** + * Cache the UDF class metadata for the given function signature. + * + *

Insertion is atomic via {@link Map#putIfAbsent}: if another executor thread has + * already published a cache entry for {@code functionSignature}, the {@code classCache} + * argument is treated as a redundant build and closed here (it has not yet been handed + * to any executor, so closing its URLClassLoader is safe). The already-published entry + * is returned to the caller so the current executor can switch to it.

+ * + *

The {@code expirationTime} parameter is kept for backward compatibility with the + * existing call sites and DDL property {@code expiration_time}, but is no longer used: + * cached entries are not evicted by time. Removal happens only via + * {@link #cleanUdfClassLoader(String)} on DROP FUNCTION.

+ * + * @return the {@link UdfClassCache} actually held in the map after this call — + * either {@code classCache} (we won the race) or the pre-existing entry + * (another thread won; {@code classCache} has been closed and must not be used). + */ + public static UdfClassCache cacheClassLoader(String functionSignature, UdfClassCache classCache, long expirationTime) { - LOG.info("Cache UDF for: {}", functionSignature); - udfLoadedClasses.put(functionSignature, classCache, expirationTime * 60 * 1000L); + LOG.info("Cache UDF for: " + functionSignature); + UdfClassCache existing = udfLoadedClasses.putIfAbsent(functionSignature, classCache); + if (existing == null) { + return classCache; + } + // Lost the race against a concurrent first-time load. The cache we just built has + // never been exposed to any executor, so closing its URLClassLoader here cannot + // affect anyone. Do NOT touch `existing` — another executor may already be using it. + try { + classCache.close(); + } catch (Exception e) { + LOG.warn("Failed to close redundant UdfClassCache for " + functionSignature, e); + } + return existing; } - public synchronized void cleanUdfClassLoader(String functionSignature) { - LOG.info("cleanUdfClassLoader for: {}", functionSignature); - udfLoadedClasses.remove(functionSignature); + public void cleanUdfClassLoader(String functionSignature) { + LOG.info("cleanUdfClassLoader for: " + functionSignature); + UdfClassCache removed = udfLoadedClasses.remove(functionSignature); + if (removed != null) { + // Immediately close the URLClassLoader. NOTE: any in-flight query still holding a + // reference to this cache (e.g. via JNIContext.executor) will fail with + // NoClassDefFoundError on lazy class resolution after this point. This is the + // accepted semantic of DROP FUNCTION: the function is gone, queries against it + // are expected to fail. + try { + removed.close(); + } catch (Exception e) { + LOG.warn("Failed to close UdfClassCache for " + functionSignature, e); + } + } } /** diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java deleted file mode 100644 index 3496d5bbb63eb6..00000000000000 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.jni.utils; - -import org.apache.log4j.Logger; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ExpiringMap { - private final ConcurrentHashMap map = new ConcurrentHashMap<>(); // key --> value - private final ConcurrentHashMap ttlMap = new ConcurrentHashMap<>(); // key --> ttl interval - // key --> expirationTime(ttl interval + currentTimeMillis) - private final ConcurrentHashMap expirationMap = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private static final long DEFAULT_INTERVAL_TIME = 10 * 60 * 1000L; // 10 minutes - public static final Logger LOG = Logger.getLogger(ExpiringMap.class); - - public ExpiringMap() { - startExpirationTask(); - } - - public void put(K key, V value, long expirationTimeMs) { - long expirationTime = System.currentTimeMillis() + expirationTimeMs; - map.put(key, value); - expirationMap.put(key, expirationTime); - ttlMap.put(key, expirationTimeMs); - } - - public V get(K key) { - Long expirationTime = expirationMap.get(key); - if (expirationTime == null || System.currentTimeMillis() > expirationTime) { - remove(key); - return null; - } - // reset time again - long ttl = ttlMap.get(key); - long newExpirationTime = System.currentTimeMillis() + ttl; - expirationMap.put(key, newExpirationTime); - return map.get(key); - } - - private void startExpirationTask() { - scheduler.scheduleAtFixedRate(() -> { - long now = System.currentTimeMillis(); - for (K key : expirationMap.keySet()) { - if (expirationMap.get(key) <= now) { - remove(key); - } - } - }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MINUTES); - } - - public void remove(K key) { - V value = map.remove(key); - expirationMap.remove(key); - ttlMap.remove(key); - - // Uniformly release resources for any AutoCloseable value, - if (value instanceof AutoCloseable) { - try { - ((AutoCloseable) value).close(); - } catch (Exception e) { - LOG.warn("Failed to close cached resource: " + key, e); - } - } - } - - public int size() { - return map.size(); - } - - public void shutdown() { - scheduler.shutdown(); - try { - if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { - scheduler.shutdownNow(); - } - } catch (InterruptedException e) { - scheduler.shutdownNow(); - } - } -} diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java index 0967e9fde0bd07..6356576baaf573 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java @@ -138,8 +138,12 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira UdfClassCache cache = null; if (isStaticLoad) { cache = ScannerLoader.getUdfClassLoader(signature); - if (cache != null && cache.classLoader != null) { - // Reuse the cached classLoader to ensure dependent classes can be loaded + if (cache != null) { + // Reuse the cached classLoader to ensure dependent classes can be loaded. + // NOTE: cache.classLoader may be null when the UDF was originally loaded via + // the system class loader (jarPath empty / custom_lib UDF); see + // UdfClassCache#classLoader. A null value here is a valid cached state and + // must NOT trigger a rebuild — only an actual cache miss does. classLoader = cache.classLoader; } } @@ -162,7 +166,14 @@ public UdfClassCache getClassCache(String jarPath, String signature, long expira cache.classLoader = classLoader; checkAndCacheUdfClass(cache, funcRetType, parameterTypes); if (isStaticLoad) { - ScannerLoader.cacheClassLoader(signature, cache, expirationTime); + UdfClassCache effective = ScannerLoader.cacheClassLoader(signature, cache, expirationTime); + if (effective != cache) { + // Another thread won the publish race. Our locally-built cache (and its + // URLClassLoader) was already closed inside cacheClassLoader(); switch to + // the published one so we share its live classLoader. + cache = effective; + classLoader = cache.classLoader; + } } } return cache;