Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions smart-common/src/main/java/org/smartdata/conf/SmartConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.smartdata.conf.SmartConfKeys.SMART_FS_TYPE;
import static org.smartdata.conf.SmartConfKeys.SMART_FS_TYPE_DEFAULT;

/**
* SSM related configurations as well as HDFS configurations.
*/
Expand Down Expand Up @@ -130,6 +133,10 @@ private void parseHostsFiles() {
}
}

public SmartFsType getFsType() {
return getEnum(SMART_FS_TYPE, SMART_FS_TYPE_DEFAULT);
}

private Set<String> parseHostsFile(
SsmHostsFileReader hostFileReader, String fileName) throws IOException {
String configDir = get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.ozone.client;
package org.smartdata.metrics;

import org.apache.hadoop.fs.ozone.OzoneClientAdapter;
import org.apache.hadoop.fs.ozone.RootedOzoneFileSystem;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;

import java.io.IOException;

public class SmartRootedOzoneFileSystem extends RootedOzoneFileSystem {

@Override
protected OzoneClientAdapter createAdapter(
ConfigurationSource conf, String omHost, int omPort) throws IOException {
return SmartOzoneClientAdapter.wrap(
super.createAdapter(conf, omHost, omPort),
OzoneConfiguration.of(conf)
);
}
public interface GeneralFileInfoSource {
Map<String, Long> getPathsToIdsMapping(Collection<String> paths) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import java.util.ArrayList;
import java.util.List;

import static org.smartdata.conf.SmartConfKeys.SMART_FS_TYPE;
import static org.smartdata.conf.SmartConfKeys.SMART_FS_TYPE_DEFAULT;
import static org.smartdata.conf.SmartConfKeys.SMART_HMS_EVENT_FETCH_DEFAULT;
import static org.smartdata.conf.SmartConfKeys.SMART_HMS_EVENT_FETCH_ENABLED;

Expand Down Expand Up @@ -147,10 +145,7 @@ private void maybeEnableHiveEventsFetcher() {
}

private void maybeEnableOzoneFetcher() {
SmartFsType smartFsType = serverContext.getConf().getEnum(
SMART_FS_TYPE,
SMART_FS_TYPE_DEFAULT);
if (smartFsType != SmartFsType.OZONE) {
if (serverContext.getConf().getFsType() != SmartFsType.OZONE) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import static org.smartdata.conf.SmartConfKeys.ACCESS_EVENT_SOURCE_DEFAULT;
import static org.smartdata.conf.SmartConfKeys.ACCESS_EVENT_SOURCE_KEY;
import static org.smartdata.utils.PathUtil.addPathSeparator;
import static org.springframework.transaction.annotation.Isolation.SERIALIZABLE;

/**
Expand Down Expand Up @@ -95,7 +96,7 @@ public void init() throws IOException {
AccessCountFailoverFactory accessCountFailoverFactory =
new AccessCountFailoverFactory(serverContext.getConf());
DbAccessEventAggregator accessEventAggregator = new DbAccessEventAggregator(
serverContext.getMetaStore().fileInfoDao(),
serverContext.getMetaStore().generalFileInfoSource(),
fileAccessManager,
accessCountFailoverFactory.create());
this.accessEventFetcher = new AccessEventFetcher(
Expand Down Expand Up @@ -199,8 +200,7 @@ public void stop() throws IOException {
}

public void reportFileAccessEvent(FileAccessEvent event) {
String path = event.getPath();
path = path + (path.endsWith("/") ? "" : "/");
String path = addPathSeparator(event.getPath());

if (pathChecker.isIgnored(path)) {
LOG.debug("Path {} is in the ignore list. Skip report file access event.", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.conf.SmartConf;
import org.smartdata.exception.NotFoundException;
import org.smartdata.exception.QueueFullException;
import org.smartdata.metastore.MetaStore;
Expand All @@ -41,6 +42,10 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.smartdata.conf.SmartConfKeys.SMART_FS_TYPE_DEFAULT;
import static org.smartdata.metastore.dao.FileAccessPartitionDao.HDFS_FILE_ACCESS_TABLE;
import static org.smartdata.metastore.dao.FileAccessPartitionDao.OZONE_FILE_ACCESS_TABLE;

/**
* Execute rule queries and return result.
*/
Expand All @@ -61,16 +66,19 @@ public class RuleExecutor implements Runnable {
private final MetaStore metastore;
private final Stack<String> dynamicCleanups;
private final List<RuleExecutorPlugin> executorPlugins;
private final SmartConf conf;

private volatile boolean exited;
private long exitTime;

public RuleExecutor(
SmartConf conf,
RuleManager ruleManager,
ExecutionContext executionCtx,
RuleTranslationResult translationResult,
MetaStore metastore,
List<RuleExecutorPlugin> executorPlugins) {
this.conf = conf;
this.ruleManager = ruleManager;
this.executionCtx = executionCtx;
this.metastore = metastore;
Expand Down Expand Up @@ -199,12 +207,14 @@ public String genVirtualAccessCountTable(List<Object> parameters) {
long interval = paraList.isEmpty() ? 0L : (long) paraList.get(0);
String countFilter = "";
long currentTimeMillis = System.currentTimeMillis();
return generateSQL(newTable, countFilter, metastore, currentTimeMillis - interval,
return generateSQL(conf, newTable, countFilter, metastore, currentTimeMillis - interval,
currentTimeMillis);
}

// todo refactor dynamic calls to access count tables
@VisibleForTesting
static String generateSQL(
SmartConf conf,
String newTable,
String countFilter,
MetaStore adapter,
Expand All @@ -217,11 +227,16 @@ static String generateSQL(
} catch (MetaStoreException e) {
LOG.error("Cannot create table " + newTable, e);
}

String fileAccessTable = conf.getFsType() == SMART_FS_TYPE_DEFAULT
? HDFS_FILE_ACCESS_TABLE
: OZONE_FILE_ACCESS_TABLE;

String sqlCountFilter =
(countFilter == null || countFilter.isEmpty())
? ""
: " HAVING count(*) " + countFilter;
sqlFinal = "INSERT INTO " + newTable + " SELECT fid, count(*) AS count FROM file_access\n"
sqlFinal = "INSERT INTO " + newTable + " SELECT fid, count(*) AS count FROM " + fileAccessTable + "\n"
+ "WHERE access_time >= " + startTime + " AND access_time <= " + endTime
+ " GROUP BY fid" + sqlCountFilter + " ;";
return sqlFinal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ private RuleExecutor doLaunchExecutor(RuleManager ruleManager)
: new SmartRuleStringParser(ruleInfo.getRuleText(), translationCtx, conf).translate();

ruleExecutor = new RuleExecutor(
conf,
ruleManager,
executionCtx,
translationResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.junit.Before;
import org.junit.Test;
import org.smartdata.conf.SmartConf;
import org.smartdata.metastore.TestDaoBase;
import org.smartdata.metastore.dao.MetaStoreHelper;

Expand All @@ -39,7 +40,7 @@ public void generateSQL() {
String sql;
long interval = 60000;
long currentTimeMillis = System.currentTimeMillis();
sql = RuleExecutor.generateSQL(newTable, countFilter, metaStore, currentTimeMillis - interval,
sql = RuleExecutor.generateSQL(new SmartConf(), newTable, countFilter, metaStore, currentTimeMillis - interval,
currentTimeMillis);
try {
metaStoreHelper.execute(sql);
Expand All @@ -50,7 +51,7 @@ public void generateSQL() {
}
// Test with count filter
countFilter = "> 10";
sql = RuleExecutor.generateSQL(newTable, countFilter, metaStore, currentTimeMillis - interval,
sql = RuleExecutor.generateSQL(new SmartConf(), newTable, countFilter, metaStore, currentTimeMillis - interval,
currentTimeMillis);
try {
metaStoreHelper.execute(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.smartdata.metastore.model.AggregatedAccessCounts;
import org.smartdata.metastore.transaction.TransactionRunner;
import org.smartdata.metastore.utils.MetaStoreUtils;
import org.smartdata.metrics.GeneralFileInfoSource;
import org.smartdata.model.ActionInfo;
import org.smartdata.model.BackUpInfo;
import org.smartdata.model.CachedFileStatus;
Expand Down Expand Up @@ -131,6 +132,7 @@ public class MetaStore implements CopyMetaService,
private final WhitelistDao whitelistDao;
private final OzoneFileInfoDao ozoneFileInfoDao;
private final UserActivityDao userActivityDao;
private final GeneralFileInfoSource generalFileInfoSource;
private final DBPool dbPool;

public MetaStore(DBPool pool,
Expand Down Expand Up @@ -168,10 +170,7 @@ public MetaStore(DBPool pool,
hmsIgnoredEventDao = daoProvider.hmsIgnoredEventDao();
hmsSyncProgressDao = daoProvider.hmsSyncProgressDao();
ozoneFileInfoDao = daoProvider.ozoneFileInfoDao();
}

public DbMetadataProvider dbMetadataProvider() {
return dbMetadataProvider;
generalFileInfoSource = daoProvider.generalFileInfoSource();
}

public UserActivityDao userActivityDao() {
Expand Down Expand Up @@ -222,6 +221,10 @@ public OzoneFileInfoDao ozoneFileInfoDao() {
return ozoneFileInfoDao;
}

public GeneralFileInfoSource generalFileInfoSource() {
return generalFileInfoSource;
}

public PlatformTransactionManager transactionManager() {
return defaultTransactionRunner.getTransactionManager();
}
Expand Down Expand Up @@ -365,7 +368,7 @@ public List<FileInfo> getFilesByPaths(Collection<String> paths)
public Map<String, Long> getFileIDs(Collection<String> paths)
throws MetaStoreException {
try {
return fileInfoDao.getPathFids(paths);
return fileInfoDao.getPathsToIdsMapping(paths);
} catch (EmptyResultDataAccessException e) {
return new HashMap<>();
} catch (Exception e) {
Expand Down Expand Up @@ -1475,7 +1478,7 @@ public List<String> getLastFetchedDirs() throws MetaStoreException {
String[] oldList = fetchedList.split(",");
lastFetchedDirs.addAll(Arrays.asList(oldList));
}
LOG.info("Last fetch dirs are " + lastFetchedDirs.toString());
LOG.info("Last fetch dirs are {}", lastFetchedDirs);
return lastFetchedDirs;
} catch (Exception e) {
throw new MetaStoreException(e);
Expand All @@ -1488,7 +1491,7 @@ public List<String> getLastFetchedDirs() throws MetaStoreException {
public void updateWhitelistTable(String newWhitelist) throws MetaStoreException {
try {
whitelistDao.updateTable(newWhitelist);
LOG.info("Success to update whitelist table with " + newWhitelist);
LOG.info("Success to update whitelist table with {}", newWhitelist);
} catch (Exception e) {
throw new MetaStoreException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import lombok.extern.slf4j.Slf4j;
import org.smartdata.metastore.accesscount.failover.AccessCountContext;
import org.smartdata.metastore.accesscount.failover.Failover;
import org.smartdata.metastore.dao.FileInfoDao;
import org.smartdata.metastore.model.AggregatedAccessCounts;
import org.smartdata.metrics.FileAccessEvent;
import org.smartdata.metrics.GeneralFileInfoSource;

import java.util.Collections;
import java.util.List;
Expand All @@ -33,13 +33,13 @@
@Slf4j
public class DbAccessEventAggregator implements AccessEventAggregator {

private final FileInfoDao fileInfoDao;
private final GeneralFileInfoSource fileInfoDao;
private final FileAccessManager dbTableManager;
private final Failover<AccessCountContext> accessCountFailover;

public DbAccessEventAggregator(FileInfoDao fileInfoDao,
FileAccessManager dbTableManager,
Failover<AccessCountContext> failover) {
public DbAccessEventAggregator(GeneralFileInfoSource fileInfoDao,
FileAccessManager dbTableManager,
Failover<AccessCountContext> failover) {
this.fileInfoDao = fileInfoDao;
this.dbTableManager = dbTableManager;
this.accessCountFailover = failover;
Expand Down Expand Up @@ -75,7 +75,7 @@ private List<AggregatedAccessCounts> getAggregatedAccessCounts(List<FileAccessEv

private Map<String, Long> getFileIdMap(List<String> paths) {
try {
return fileInfoDao.getPathFids(paths);
return fileInfoDao.getPathsToIdsMapping(paths);
} catch (Exception e) {
log.error("Error fetching file ids for paths {}", paths, e);
return Collections.emptyMap();
Expand Down
Loading