diff --git a/.gitattributes b/.gitattributes index b5afafc..2c70a52 100644 --- a/.gitattributes +++ b/.gitattributes @@ -3,6 +3,3 @@ docker/*.sh text eol=lf docker/Dockerfile text eol=lf docker/docker-compose.yml text eol=lf - -# Other files: auto-detect (will be CRLF on Windows, LF in repo) -* text=auto diff --git a/backend/src/main/java/com/storage/engine/controller/AccessController.java b/backend/src/main/java/com/storage/engine/controller/AccessController.java index e7fcadb..b01f1cc 100644 --- a/backend/src/main/java/com/storage/engine/controller/AccessController.java +++ b/backend/src/main/java/com/storage/engine/controller/AccessController.java @@ -57,6 +57,16 @@ public ResponseEntity downloadData( return ResponseEntity.notFound().build(); } + // 检查文件大小限制:只允许下载 10MB 以下的0文件 + long fileSize = meta.getFileSize() != null ? meta.getFileSize() : 0L; + long maxDownloadSize = 100 * 1024 * 1024; // 10MB + if (fileSize > maxDownloadSize) { + String errorMsg = String.format("文件太大,不允许下载。文件大小: %.2f MB,最大允许: 100 MB", + fileSize / (1024.0 * 1024.0)); + return ResponseEntity.status(HttpStatus.BAD_REQUEST) + .body(errorMsg.getBytes(StandardCharsets.UTF_8)); + } + byte[] data = accessService.downloadData(logicalPath, fileName); if (data == null || data.length == 0) { return ResponseEntity.noContent().build(); diff --git a/backend/src/main/java/com/storage/engine/dao/IGinxDao.java b/backend/src/main/java/com/storage/engine/dao/IGinxDao.java index 64bc1f3..6f84fb2 100644 --- a/backend/src/main/java/com/storage/engine/dao/IGinxDao.java +++ b/backend/src/main/java/com/storage/engine/dao/IGinxDao.java @@ -517,12 +517,18 @@ public Void run(Session session) throws SessionException { public SessionExecuteSqlResult queryDataByPath(String pathPrefix) { String queryPath = StorageUtils.normalizeEscapedPath(pathPrefix); - return executeSql("select * from " + queryPath + ";"); + String quotedPath = StorageUtils.quoteIdentifierPath(queryPath); + String sql = "select * from " + quotedPath + ";"; + logger.info("[IGinX-SQL] {}", sql); + return executeSql(sql); } public SessionExecuteSqlResult queryDataByPathWithLimit(String pathPrefix, int limit) { String queryPath = StorageUtils.normalizeEscapedPath(pathPrefix); - return executeSql("select * from " + queryPath + " limit " + limit + ";"); + String quotedPath = StorageUtils.quoteIdentifierPath(queryPath); + String sql = "select * from " + quotedPath + " limit " + limit + ";"; + logger.info("[IGinX-SQL] {}", sql); + return executeSql(sql); } // ==================== Cluster Info Operations ==================== diff --git a/backend/src/main/java/com/storage/engine/model/AddStorageEngineRequest.java b/backend/src/main/java/com/storage/engine/model/AddStorageEngineRequest.java index 0b7b520..6599219 100644 --- a/backend/src/main/java/com/storage/engine/model/AddStorageEngineRequest.java +++ b/backend/src/main/java/com/storage/engine/model/AddStorageEngineRequest.java @@ -12,6 +12,11 @@ public class AddStorageEngineRequest { private String dummyDir; private Integer iginxPort; + private String sizeCalculationStrategy; + private String sshUsername; + private String sshPassword; + private Integer sshPort; + public String getSourceType() { return sourceType; } @@ -67,4 +72,36 @@ public Integer getIginxPort() { public void setIginxPort(Integer iginxPort) { this.iginxPort = iginxPort; } + + public String getSizeCalculationStrategy() { + return sizeCalculationStrategy; + } + + public void setSizeCalculationStrategy(String sizeCalculationStrategy) { + this.sizeCalculationStrategy = sizeCalculationStrategy; + } + + public String getSshUsername() { + return sshUsername; + } + + public void setSshUsername(String sshUsername) { + this.sshUsername = sshUsername; + } + + public String getSshPassword() { + return sshPassword; + } + + public void setSshPassword(String sshPassword) { + this.sshPassword = sshPassword; + } + + public Integer getSshPort() { + return sshPort; + } + + public void setSshPort(Integer sshPort) { + this.sshPort = sshPort; + } } diff --git a/backend/src/main/java/com/storage/engine/service/AccessService.java b/backend/src/main/java/com/storage/engine/service/AccessService.java index cd76802..1407591 100644 --- a/backend/src/main/java/com/storage/engine/service/AccessService.java +++ b/backend/src/main/java/com/storage/engine/service/AccessService.java @@ -489,8 +489,17 @@ private String resolveIginxDataPath(DataItem item) { externalBody = externalBody.substring(sourceKey.length() + 1); } - String normalized = externalBody.replace("/", ".").replaceAll("[^a-zA-Z0-9._-]", "_"); - String basePath = normalized.isEmpty() ? schemaPrefix : (schemaPrefix + "." + normalized); + // Split by / and escape dots in each segment + String[] segments = externalBody.isEmpty() ? new String[0] : externalBody.split("/"); + StringBuilder pathBuilder = new StringBuilder(schemaPrefix); + for (String seg : segments) { + String cleaned = seg == null ? "" : seg.trim().replaceAll("[^a-zA-Z0-9._-]", "_"); + if (!cleaned.isEmpty()) { + pathBuilder.append('.').append(cleaned.replace(".", "\\.")); + } + } + String basePath = pathBuilder.toString(); + if (!fileName.isEmpty()) { if (isFilesystemLikeSourceKey(sourceKey)) { return StorageUtils.toFileLeafPath(basePath, fileName); diff --git a/backend/src/main/java/com/storage/engine/service/NodeDeployService.java b/backend/src/main/java/com/storage/engine/service/NodeDeployService.java index b2de68a..92cc1b6 100644 --- a/backend/src/main/java/com/storage/engine/service/NodeDeployService.java +++ b/backend/src/main/java/com/storage/engine/service/NodeDeployService.java @@ -5,20 +5,15 @@ import com.storage.engine.dao.IGinxDao; import com.storage.engine.model.NodeDeployRequest; import com.storage.engine.model.NodeDeployTaskStatus; +import com.storage.engine.utils.ScriptExecutionUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.io.ClassPathResource; -import org.springframework.core.io.Resource; -import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.stereotype.Service; import java.io.BufferedReader; import java.io.File; -import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -46,8 +41,8 @@ public class NodeDeployService { @Autowired private IGinxDao iginxDao; - @Value("${resource.base-path:classpath:/}") - private String resourceBasePath; + @Autowired + private ScriptExecutionUtils scriptExecutionUtils; @Value("${deploy.iginx.default-package-path:/home/ubuntu/IGinX-0.9.0-SNAPSHOT.tar.gz}") private String defaultPackagePath; @@ -80,8 +75,8 @@ public NodeDeployTaskStatus startDeployTask(final NodeDeployRequest request, fin String zkConnection = safeValue(request.getZookeeperConnectionString(), defaultZkConnection); String pythonCmd = safeValue(request.getPythonCmd(), defaultPythonCmd); - File scriptFile = resolveScriptFile(DEPLOY_SCRIPT_RELATIVE_PATH, "部署脚本"); - File udfListFile = resolveResourceFile("udf/udf_list", "UDF列表文件"); + File scriptFile = scriptExecutionUtils.resolveScriptFile(DEPLOY_SCRIPT_RELATIVE_PATH, "部署脚本"); + File udfListFile = scriptExecutionUtils.resolveResourceFile("udf/udf_list", "UDF列表文件"); File metadataDir = resolveMetadataDirectory(); String iginxPort = safeValue(request.getPort(), "6888"); @@ -157,7 +152,7 @@ public NodeDeployTaskStatus startStopTask(final String targetIp, final String no String deployDir = safeValue(deployDirectory, defaultDeployDirectory); - File scriptFile = resolveScriptFile(STOP_SCRIPT_RELATIVE_PATH, "停止脚本"); + File scriptFile = scriptExecutionUtils.resolveScriptFile(STOP_SCRIPT_RELATIVE_PATH, "停止脚本"); final List command = new ArrayList(); command.add("bash"); @@ -442,200 +437,14 @@ private void runCommand(List command, DeployTaskState taskState) { } } - private File resolveScriptFile(String relativePath, String label) { - try { - if (isClasspathRoot()) { - String classpathPath = joinClasspathPath(relativePath); - ClassPathResource resource = new ClassPathResource(classpathPath); - if (!resource.exists()) { - throw new RuntimeException(label + "不存在: classpath:" + classpathPath); - } - File scriptFile = materializeResourceFile(resource, relativePath); - if (!scriptFile.canExecute()) { - scriptFile.setExecutable(true); - } - return scriptFile; - } - - String root = removeFilePrefix(resourceBasePath); - File scriptFile = new File(root, relativePath.replace("/", File.separator)); - if (!scriptFile.exists()) { - throw new RuntimeException(label + "不存在: " + scriptFile.getPath()); - } - return scriptFile; - } catch (Exception e) { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - throw new RuntimeException("加载" + label + "失败: " + e.getMessage(), e); - } - } - - private File resolveResourceFile(String relativePath, String label) { - try { - if (isClasspathRoot()) { - String classpathPath = joinClasspathPath(relativePath); - ClassPathResource resource = new ClassPathResource(classpathPath); - if (!resource.exists()) { - throw new RuntimeException(label + "不存在: classpath:" + classpathPath); - } - return materializeResourceFile(resource, relativePath); - } - - String root = removeFilePrefix(resourceBasePath); - File file = new File(root, relativePath.replace("/", File.separator)); - if (!file.exists() || !file.isFile()) { - throw new RuntimeException(label + "不存在: " + file.getPath()); - } - return file; - } catch (Exception e) { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - throw new RuntimeException("加载" + label + "失败: " + e.getMessage(), e); - } - } - private File resolveMetadataDirectory() { - File metadataDir = resolveOptionalResourceDirectory("udf/metadata"); + File metadataDir = scriptExecutionUtils.resolveOptionalResourceDirectory("udf/metadata"); if (metadataDir != null) { return metadataDir; } throw new RuntimeException("UDF元数据目录不存在: 期望 resource.base-path 下的 udf/metadata"); } - private File resolveOptionalResourceDirectory(String relativePath) { - try { - if (isClasspathRoot()) { - String classpathPath = joinClasspathPath(relativePath); - ClassPathResource dirResource = new ClassPathResource(classpathPath); - if (!dirResource.exists()) { - return null; - } - - try { - File file = dirResource.getFile(); - if (file.exists() && file.isDirectory()) { - return file; - } - } catch (Exception ignored) { - // Fall through to materialize-from-pattern for packaged classpath resources. - } - - return materializeResourceDirectory(classpathPath, relativePath); - } - - String root = removeFilePrefix(resourceBasePath); - File dir = new File(root, relativePath.replace("/", File.separator)); - if (!dir.exists() || !dir.isDirectory()) { - return null; - } - return dir; - } catch (Exception e) { - return null; - } - } - - private File materializeResourceFile(ClassPathResource resource, String relativePath) throws Exception { - String name = new File(relativePath).getName(); - File tmpScript = File.createTempFile("scalestore-", "-" + name); - try (InputStream input = resource.getInputStream()) { - Files.copy(input, tmpScript.toPath(), StandardCopyOption.REPLACE_EXISTING); - } - tmpScript.deleteOnExit(); - return tmpScript; - } - - private File materializeResourceDirectory(String classpathPath, String relativePath) throws Exception { - PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); - Resource[] resources = resolver.getResources("classpath*:" + classpathPath + "/**"); - if (resources == null || resources.length == 0) { - return null; - } - - File baseDir = Files.createTempDirectory("scalestore-" + new File(relativePath).getName() + "-").toFile(); - markDeleteOnExit(baseDir); - - String normalizedPath = classpathPath.replace('\\', '/'); - String anchor = normalizedPath + "/"; - boolean copied = false; - - for (Resource resource : resources) { - if (resource == null || !resource.exists() || !resource.isReadable()) { - continue; - } - - String url = resource.getURL().toString().replace('\\', '/'); - int idx = url.indexOf(anchor); - if (idx < 0) { - continue; - } - - String sub = url.substring(idx + anchor.length()); - if (sub.isEmpty() || sub.endsWith("/")) { - continue; - } - - File target = new File(baseDir, sub); - File parent = target.getParentFile(); - if (parent != null && !parent.exists()) { - parent.mkdirs(); - } - try (InputStream input = resource.getInputStream()) { - Files.copy(input, target.toPath(), StandardCopyOption.REPLACE_EXISTING); - } - copied = true; - } - - return copied ? baseDir : null; - } - - private void markDeleteOnExit(File file) { - if (file == null || !file.exists()) { - return; - } - if (file.isDirectory()) { - File[] children = file.listFiles(); - if (children != null) { - for (File child : children) { - markDeleteOnExit(child); - } - } - } - file.deleteOnExit(); - } - - private boolean isClasspathRoot() { - String root = resourceBasePath == null ? "" : resourceBasePath.trim(); - return root.isEmpty() || ".".equals(root) || root.startsWith("classpath:"); - } - - private String joinClasspathPath(String relativePath) { - String root = resourceBasePath == null ? "" : resourceBasePath.trim(); - if (root.startsWith("classpath:")) { - root = root.substring("classpath:".length()); - } - root = root.replace('\\', '/'); - while (root.startsWith("/")) { - root = root.substring(1); - } - while (root.endsWith("/")) { - root = root.substring(0, root.length() - 1); - } - if (root.isEmpty() || ".".equals(root)) { - return relativePath; - } - return root + "/" + relativePath; - } - - private String removeFilePrefix(String path) { - String value = path == null ? "" : path.trim(); - if (value.startsWith("file:")) { - return value.substring("file:".length()); - } - return value; - } - private String required(String value, String message) { if (value == null || value.trim().isEmpty()) { throw new RuntimeException(message); diff --git a/backend/src/main/java/com/storage/engine/service/StorageService.java b/backend/src/main/java/com/storage/engine/service/StorageService.java index a66bb51..76203c1 100644 --- a/backend/src/main/java/com/storage/engine/service/StorageService.java +++ b/backend/src/main/java/com/storage/engine/service/StorageService.java @@ -8,12 +8,14 @@ import com.storage.engine.service.adapter.StorageAdapter; import com.storage.engine.service.adapter.StorageAdapterFactory; import com.storage.engine.service.adapter.StorageUtils; +import com.storage.engine.utils.ScriptExecutionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; +import java.io.File; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @@ -32,6 +34,7 @@ public class StorageService { private static final int EXTERNAL_SIZE_FLUSH_BATCH = 100; private static final long EXTERNAL_SIZE_FLUSH_BYTES = 10L * 1024L * 1024L * 1024L; private static final int STRUCTURED_SAMPLE_ROW_COUNT = 100; + private static final long FULL_ROW_BYTES = 1024L * 1024L; @Autowired private IGinxDao iginxDao; @@ -48,6 +51,9 @@ public class StorageService { @Autowired private MetadataExtractionSchedulerService metadataExtractionSchedulerService; + @Autowired + private ScriptExecutionUtils scriptExecutionUtils; + public synchronized Map addExternalStorageEngine(AddStorageEngineRequest request) { AddSourceContext context = validateAndBuildContext(request); String sql = buildAddStorageEngineSql(context); @@ -56,6 +62,23 @@ public synchronized Map addExternalStorageEngine(AddStorageEngin executeAddStorageEngineSql(context, sql); + // 同步注册数据源到元数据表,确保前端能立即看到 + logger.info("[ExternalSource] Registering data source synchronously: schemaPrefix={}", context.schemaPrefix); + try { + dataSourceService.registerExternalDataSource( + context.ip, + context.port, + context.sourceType, + context.schemaPrefix, + "", + 0L); + logger.info("[ExternalSource] Data source registered successfully: schemaPrefix={}", context.schemaPrefix); + } catch (Exception e) { + logger.error("[ExternalSource] Failed to register data source: schemaPrefix={}", context.schemaPrefix, e); + throw new RuntimeException("数据源注册失败: " + e.getMessage(), e); + } + + // 异步执行元数据同步 triggerExternalMetadataSyncAsync(context); Map payload = new LinkedHashMap(); @@ -63,8 +86,8 @@ public synchronized Map addExternalStorageEngine(AddStorageEngin payload.put("sourceType", context.sourceType); payload.put("schemaPrefix", context.schemaPrefix); payload.put("mappedDataType", context.mappedDataType); - payload.put("syncStatus", "PENDING"); - payload.put("message", "添加数据源成功,后台将持续进行解析"); + payload.put("syncStatus", "REGISTERED"); + payload.put("message", "添加数据源成功,数据源已注册,后台将持续进行解析"); return payload; } @@ -78,14 +101,6 @@ private void triggerExternalMetadataSyncAsync(AddSourceContext context) { "外部数据源解析任务已启动: schemaPrefix=" + safe(asyncContext.schemaPrefix), "External-Source"); - dataSourceService.registerExternalDataSource( - asyncContext.ip, - asyncContext.port, - asyncContext.sourceType, - asyncContext.schemaPrefix, - "", - 0L); - ExternalMetaSyncResult syncResult = syncExternalMetadata(asyncContext); logger.info( "[ExternalSource] Async metadata sync finished. sourceType={}, schemaPrefix={}, discovered={}, imported={}, skipped={}, replaced={}", @@ -135,6 +150,11 @@ private AddSourceContext copyContext(AddSourceContext src) { copy.schemaPrefix = src.schemaPrefix; copy.logicalSourceKey = src.logicalSourceKey; copy.mappedDataType = src.mappedDataType; + copy.sizeCalculationStrategy = src.sizeCalculationStrategy; + copy.sshUsername = src.sshUsername; + copy.sshPassword = src.sshPassword; + copy.sshPort = src.sshPort; + copy.sshFileSizeCache = src.sshFileSizeCache; return copy; } @@ -428,7 +448,7 @@ private String buildShowColumnsSql(String prefix, boolean exactMatch) { return "show columns;"; } - String identifier = quoteIdentifierPath(normalizedPrefix); + String identifier = StorageUtils.quoteIdentifierPath(normalizedPrefix); if (normalizedPrefix.endsWith(".*")) { return "show columns " + identifier + ";"; } @@ -438,80 +458,13 @@ private String buildShowColumnsSql(String prefix, boolean exactMatch) { return "show columns " + identifier + ".*;"; } - private String quoteIdentifierPath(String rawPath) { - String normalized = safe(rawPath); - if (normalized.isEmpty()) { - return normalized; - } - - boolean wildcard = normalized.endsWith(".*"); - String base = wildcard ? normalized.substring(0, normalized.length() - 2) : normalized; - - List segments = splitUnescapedSegmentsForSql(base); - if (segments.isEmpty()) { - return wildcard ? "*" : ""; - } - - StringBuilder quoted = new StringBuilder(); - for (int i = 0; i < segments.size(); i++) { - if (i > 0) { - quoted.append('.'); - } - quoted.append(quoteIdentifierSegment(segments.get(i))); - } - - if (wildcard) { - quoted.append(".*"); - } - return quoted.toString(); - } - - private String quoteIdentifierSegment(String rawSegment) { - String seg = safe(rawSegment).replace("`", "``"); - return "`" + seg + "`"; - } - - private List splitUnescapedSegmentsForSql(String text) { - List segments = new ArrayList(); - if (text == null || text.isEmpty()) { - return segments; - } - - StringBuilder current = new StringBuilder(); - boolean escaping = false; - for (int i = 0; i < text.length(); i++) { - char c = text.charAt(i); - if (escaping) { - current.append(c); - escaping = false; - continue; - } - if (c == '\\') { - current.append('\\'); - escaping = true; - continue; - } - if (c == '.') { - segments.add(current.toString()); - current.setLength(0); - continue; - } - current.append(c); - } - if (escaping) { - current.append('\\'); - } - segments.add(current.toString()); - return segments; - } - private long estimateExternalAssetSize(String assetPath, AddSourceContext context) { if (assetPath == null || assetPath.isEmpty() || context == null) { return 0L; } if (isFilesystemExternalSource(context)) { - return estimateFilesystemExternalAssetSize(assetPath); + return estimateFilesystemExternalAssetSize(assetPath, context); } return estimateStructuredExternalAssetSize(assetPath, context); } @@ -549,27 +502,34 @@ private long estimateFilesystemExternalAssetSize(String assetPath) { return 0L; } - long firstRowBytes = queryFilesystemRowBytes(parentPath, leafField, 0L); + long lastRowBytes = queryFilesystemRowBytes(parentPath, leafField, rowCount - 1L); + if (rowCount == 1L) { - return firstRowBytes; + return lastRowBytes; } - long lastRowBytes = queryFilesystemRowBytes(parentPath, leafField, rowCount - 1L); long prefixRows = rowCount - 1L; - if (firstRowBytes > 0L && prefixRows > 0L && firstRowBytes > Long.MAX_VALUE / prefixRows) { + if (FULL_ROW_BYTES > Long.MAX_VALUE / prefixRows) { return Long.MAX_VALUE; } - long total = firstRowBytes * prefixRows; + long total = FULL_ROW_BYTES * prefixRows; if (Long.MAX_VALUE - total < lastRowBytes) { return Long.MAX_VALUE; } return total + Math.max(0L, lastRowBytes); } + private long estimateFilesystemExternalAssetSize(String assetPath, AddSourceContext context) { + if (context != null && "ssh".equals(context.sizeCalculationStrategy)) { + return estimateFilesystemSizeBySSH(assetPath, context); + } + return estimateFilesystemExternalAssetSize(assetPath); + } + private long resolveFilesystemRowCount(String parentPath, String leafField) { - String sql = "select count(" + quoteIdentifierSegment(leafField) + ") from " + quoteIdentifierPath(parentPath) + ";"; + String sql = "select count(" + StorageUtils.quoteIdentifierSegment(leafField) + ") from " + StorageUtils.quoteIdentifierPath(parentPath) + ";"; logger.info("[ExternalSource] Resolve filesystem row count SQL: {}", sql); SessionExecuteSqlResult result = iginxDao.executeSql(sql); List counts = parseCountValues(result); @@ -581,8 +541,8 @@ private long resolveFilesystemRowCount(String parentPath, String leafField) { } private long queryFilesystemRowBytes(String parentPath, String leafField, long offset) { - String sql = "select " + quoteIdentifierSegment(leafField) - + " from " + quoteIdentifierPath(parentPath) + String sql = "select " + StorageUtils.quoteIdentifierSegment(leafField) + + " from " + StorageUtils.quoteIdentifierPath(parentPath) + " limit 1 offset " + Math.max(0L, offset) + ";"; logger.info("[ExternalSource] Fetch filesystem row SQL: {}", sql); SessionExecuteSqlResult result = iginxDao.executeSql(sql); @@ -590,7 +550,7 @@ private long queryFilesystemRowBytes(String parentPath, String leafField, long o } private long queryStructuredRowsTotalBytes(String assetPath, int limit) { - String sql = "select * from " + quoteIdentifierPath(assetPath) + String sql = "select * from " + StorageUtils.quoteIdentifierPath(assetPath) + (limit > 0 ? " limit " + limit : "") + ";"; logger.info("[ExternalSource] Fetch structured sample SQL: {}", sql); SessionExecuteSqlResult result = iginxDao.executeSql(sql); @@ -651,7 +611,7 @@ private void publishExternalSyncEvent(String level, } private long resolveExternalRowCount(String assetPath) { - String quotedAssetPath = quoteIdentifierPath(assetPath); + String quotedAssetPath = StorageUtils.quoteIdentifierPath(assetPath); String sql = "select count(*) from " + quotedAssetPath + ";"; logger.info("[ExternalSource] Resolve structured row count SQL: {}", sql); SessionExecuteSqlResult result = iginxDao.executeSql(sql); @@ -985,6 +945,10 @@ private AddSourceContext validateAndBuildContext(AddStorageEngineRequest request context.iginxPort = request.getIginxPort() == null ? -1 : request.getIginxPort().intValue(); context.schemaPrefix = DEFAULT_EXTERN_SCHEMA_PREFIX; context.logicalSourceKey = context.sourceType; + context.sizeCalculationStrategy = safe(request.getSizeCalculationStrategy()); + context.sshUsername = safe(request.getSshUsername()); + context.sshPassword = safe(request.getSshPassword()); + context.sshPort = request.getSshPort() == null ? 22 : request.getSshPort().intValue(); if (!("filesystem".equals(context.sourceType) || "mysql".equals(context.sourceType) @@ -1007,6 +971,19 @@ private AddSourceContext validateAndBuildContext(AddStorageEngineRequest request if (context.iginxPort <= 0) { throw new IllegalArgumentException("filesystem 类型必须填写 iginx_port"); } + + if ("ssh".equals(context.sizeCalculationStrategy)) { + if (context.sshUsername.isEmpty()) { + throw new IllegalArgumentException("选择命令行获取策略时,SSH用户名不能为空"); + } + if (context.sshPassword.isEmpty()) { + throw new IllegalArgumentException("选择命令行获取策略时,SSH密码不能为空"); + } + if (context.sshPort <= 0 || context.sshPort > 65535) { + throw new IllegalArgumentException("SSH端口必须在 1-65535 之间"); + } + } + context.mappedDataType = IGinxConstants.TYPE_DOCUMENT; } else if ("iotdb".equals(context.sourceType)) { if (context.username.isEmpty() || context.password.isEmpty()) { @@ -1255,6 +1232,131 @@ private Long valueAsLong(Object value) { } } + private long estimateFilesystemSizeBySSH(String assetPath, AddSourceContext context) { + try { + String relativePath = extractRelativePathFromAsset(assetPath, context); + + if (context.sshFileSizeCache == null) { + logger.info("[ExternalSource] SSH size cache not initialized, executing script for data source: {}", context.dummyDir); + + File scriptFile = scriptExecutionUtils.resolveScriptFile( + "scripts/calculate_filesystem_size.sh", + "文件大小计算脚本"); + + Map fileSizeMap = executeSSHSizeScript( + scriptFile.getAbsolutePath(), + context.ip, + context.sshUsername, + context.sshPassword, + String.valueOf(context.sshPort), + context.dummyDir); + + context.sshFileSizeCache = fileSizeMap; + + logger.info("[ExternalSource] SSH script executed, cached {} files for data source: {}", + fileSizeMap.size(), context.dummyDir); + } else { + logger.debug("[ExternalSource] Using cached SSH file sizes ({} files)", context.sshFileSizeCache.size()); + } + + Long fileSize = context.sshFileSizeCache.get(relativePath); + if (fileSize != null && fileSize > 0L) { +// logger.info("[ExternalSource] Found size for file '{}': {} bytes (from cache)", relativePath, fileSize); + return fileSize; + } + + logger.warn("[ExternalSource] SSH cache did not contain size for file '{}'. Available files: {}", + relativePath, context.sshFileSizeCache.keySet()); + return estimateFilesystemExternalAssetSize(assetPath); + } catch (Exception e) { + logger.error("[ExternalSource] Failed to estimate filesystem size by SSH, fallback to system calculation", e); + return estimateFilesystemExternalAssetSize(assetPath); + } + } + + private String extractRelativePathFromAsset(String assetPath, AddSourceContext context) { + String normalizedAssetPath = StorageUtils.normalizeEscapedPath(assetPath); + String normalizedSchemaPrefix = StorageUtils.normalizeEscapedPath(context.schemaPrefix); + + if (!normalizedAssetPath.startsWith(normalizedSchemaPrefix + ".")) { + String[] parentAndLeaf = splitParentAndLeafStrict(assetPath); + return parentAndLeaf[1].replace("\\.", "."); + } + + String pathAfterPrefix = normalizedAssetPath.substring(normalizedSchemaPrefix.length() + 1); + + List segments = splitUnescapedSegments(pathAfterPrefix); + if (segments.isEmpty()) { + return ""; + } + + segments.remove(0); + + StringBuilder relativePath = new StringBuilder(); + for (int i = 0; i < segments.size(); i++) { + if (i > 0) { + relativePath.append("/"); + } + relativePath.append(segments.get(i)); + } + + return relativePath.toString(); + } + + private Map executeSSHSizeScript(String scriptPath, String ip, String username, + String password, String sshPort, String targetDir) throws Exception { + List command = new ArrayList<>(); + command.add("bash"); + command.add(scriptPath); + command.add(ip); + command.add(username); + command.add(password); + command.add(sshPort); + command.add(targetDir); + + ScriptExecutionUtils.ScriptExecutionResult result = scriptExecutionUtils.executeScript(command, 120); + + if (!result.isSuccess()) { + throw new RuntimeException("SSH size calculation script failed with exit code: " + result.getExitCode() + + ", error: " + result.getError()); + } + + return parseSSHSizeScriptOutput(result.getOutput()); + } + + private Map parseSSHSizeScriptOutput(String jsonOutput) { + Map fileSizeMap = new HashMap<>(); + try { + logger.info("[ExternalSource] Parsing SSH script output: {}", jsonOutput); + + com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); + com.fasterxml.jackson.databind.JsonNode root = mapper.readTree(jsonOutput); + + if (!root.has("success") || !root.get("success").asBoolean()) { + throw new RuntimeException("SSH script execution failed"); + } + + com.fasterxml.jackson.databind.JsonNode files = root.get("files"); + if (files != null && files.isArray()) { + for (com.fasterxml.jackson.databind.JsonNode file : files) { + long sizeInKB = file.get("size").asLong(); + String path = file.get("path").asText(); + + long sizeInBytes = sizeInKB * 1024L; + +// logger.info("[ExternalSource] Parsed file from SSH script - path: '{}', sizeInKB: {}, sizeInBytes: {}", +// path, sizeInKB, sizeInBytes); + + fileSizeMap.put(path, sizeInBytes); + } + } + } catch (Exception e) { + logger.error("[ExternalSource] Failed to parse SSH script output", e); + throw new RuntimeException("Failed to parse SSH script output: " + e.getMessage(), e); + } + return fileSizeMap; + } + private static class AddSourceContext { private String sourceType; private String ip; @@ -1266,6 +1368,11 @@ private static class AddSourceContext { private String schemaPrefix; private String logicalSourceKey; private String mappedDataType; + private String sizeCalculationStrategy; + private String sshUsername; + private String sshPassword; + private int sshPort; + private Map sshFileSizeCache; } private static class ExternalMetaSyncResult { diff --git a/backend/src/main/java/com/storage/engine/service/adapter/DocumentAdapter.java b/backend/src/main/java/com/storage/engine/service/adapter/DocumentAdapter.java index 4d1672b..bec36ca 100644 --- a/backend/src/main/java/com/storage/engine/service/adapter/DocumentAdapter.java +++ b/backend/src/main/java/com/storage/engine/service/adapter/DocumentAdapter.java @@ -4,8 +4,9 @@ import cn.edu.tsinghua.iginx.thrift.DataType; import com.storage.engine.constant.IGinxConstants; import com.storage.engine.dao.IGinxDao; -import com.storage.engine.model.MetadataExtractResult; import com.storage.engine.service.LlmService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.multipart.MultipartFile; @@ -20,6 +21,8 @@ @Component public class DocumentAdapter implements StorageAdapter { + private static final Logger logger = LoggerFactory.getLogger(DocumentAdapter.class); + @Autowired private IGinxDao iginxDao; @@ -39,36 +42,100 @@ public List getSupportedFormats() { @Override public void store(MultipartFile file, String iginxPath) throws Exception { byte[] contentBytes = file.getBytes(); - - List paths = Collections.singletonList(iginxPath); - long[] timestamps = new long[]{0}; - Object[] valuesList = new Object[]{new byte[][]{contentBytes}}; - List types = Collections.singletonList(DataType.BINARY); - - iginxDao.insertColumnRecords(paths, timestamps, valuesList, types); + + // 每1MB存储一行,类似IGinX filesystem数据源的存储方式 + int chunkSize = 1024 * 1024; // 1MB + int totalChunks = (int) Math.ceil((double) contentBytes.length / chunkSize); + + // 将文件分块存储,时间戳从0开始递增 + for (int i = 0; i < totalChunks; i++) { + int start = i * chunkSize; + int end = Math.min(start + chunkSize, contentBytes.length); + byte[] chunk = Arrays.copyOfRange(contentBytes, start, end); + + List paths = Collections.singletonList(iginxPath); + long[] timestamps = new long[]{i}; + Object[] valuesList = new Object[]{new byte[][]{chunk}}; + List types = Collections.singletonList(DataType.BINARY); + + iginxDao.insertColumnRecords(paths, timestamps, valuesList, types); + } + + logger.info("文档存储完成: path={}, 总大小={}字节, 分块数={}", iginxPath, contentBytes.length, totalChunks); } @Override public Object getPreviewData(String iginxPath, int limit) throws Exception { - SessionExecuteSqlResult result = queryLeafFromParent(iginxPath, Math.max(limit, 1)); + // 文档数据预览只显示前1MB内容(1个分块) + int previewChunks = 1; + SessionExecuteSqlResult result = queryLeafFromParent(iginxPath, previewChunks); - byte[] bytes = extractPrimaryBytes(result, iginxPath); + byte[] bytes = extractMultiChunkBytes(result, iginxPath); if (bytes.length == 0) { throw new RuntimeException("文档预览失败: 未查询到文档内容, path=" + iginxPath); } + + logger.info("文档预览大小: {}字节,预览限制: 1MB", bytes.length); + + // 返回字符串内容,前端会自行处理 return new String(bytes, StandardCharsets.UTF_8); } @Override public byte[] getDownloadBytes(String iginxPath) throws Exception { + // 下载时读取所有分块数据 SessionExecuteSqlResult result = queryLeafFromParent(iginxPath, null); - byte[] bytes = extractPrimaryBytes(result, iginxPath); + byte[] bytes = extractMultiChunkBytes(result, iginxPath); if (bytes.length == 0) { throw new RuntimeException("文档下载失败: 未查询到文档内容, path=" + iginxPath); } + + logger.info("文档下载完成: path={}, 总大小={}字节", iginxPath, bytes.length); return bytes; } + /** + * 从查询结果中提取多个分块的字节数据并拼接 + */ + private byte[] extractMultiChunkBytes(SessionExecuteSqlResult result, String expectedPath) { + if (result == null || result.getValues() == null || result.getValues().isEmpty()) { + return new byte[0]; + } + + List paths = result.getPaths(); + if (paths == null || paths.size() != 1) { + return new byte[0]; + } + String resolvedPath = StorageUtils.normalizeEscapedPath(paths.get(0)); + String expected = StorageUtils.normalizeEscapedPath(expectedPath); + if (!resolvedPath.equals(expected)) { + return new byte[0]; + } + + // 获取所有行的数据并拼接 + List> values = result.getValues(); + List chunks = new ArrayList<>(); + int totalSize = 0; + + for (List row : values) { + if (row != null && row.size() == 1) { + byte[] chunk = StorageUtils.toByteArray(row.get(0)); + chunks.add(chunk); + totalSize += chunk.length; + } + } + + // 拼接所有分块 + byte[] combined = new byte[totalSize]; + int offset = 0; + for (byte[] chunk : chunks) { + System.arraycopy(chunk, 0, combined, offset, chunk.length); + offset += chunk.length; + } + + return combined; + } + private byte[] extractPrimaryBytes(SessionExecuteSqlResult result, String expectedPath) { if (result == null || result.getValues() == null || result.getValues().isEmpty()) { return new byte[0]; @@ -100,11 +167,14 @@ private SessionExecuteSqlResult queryLeafFromParent(String fullPath, Integer lim } String leaf = StorageUtils.normalizeEscapedPath(leafPath); - String sql = "select " + leaf + " from " + parentPath; + String quotedLeaf = StorageUtils.quoteIdentifierSegment(leaf); + String quotedParent = StorageUtils.quoteIdentifierPath(parentPath); + String sql = "select " + quotedLeaf + " from " + quotedParent; if (limit != null && limit.intValue() > 0) { sql += " limit " + limit.intValue(); } sql += ";"; + logger.info("[IGinX-SQL] {}", sql); return iginxDao.executeSql(sql); } } diff --git a/backend/src/main/java/com/storage/engine/service/adapter/ImageAdapter.java b/backend/src/main/java/com/storage/engine/service/adapter/ImageAdapter.java index a98104a..783e8b3 100644 --- a/backend/src/main/java/com/storage/engine/service/adapter/ImageAdapter.java +++ b/backend/src/main/java/com/storage/engine/service/adapter/ImageAdapter.java @@ -4,8 +4,9 @@ import cn.edu.tsinghua.iginx.thrift.DataType; import com.storage.engine.constant.IGinxConstants; import com.storage.engine.dao.IGinxDao; -import com.storage.engine.model.MetadataExtractResult; import com.storage.engine.service.LlmService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.multipart.MultipartFile; @@ -20,6 +21,8 @@ @Component public class ImageAdapter implements StorageAdapter { + private static final Logger logger = LoggerFactory.getLogger(ImageAdapter.class); + @Autowired private IGinxDao iginxDao; @@ -39,35 +42,58 @@ public List getSupportedFormats() { @Override public void store(MultipartFile file, String iginxPath) throws Exception { byte[] imageBytes = file.getBytes(); - - List paths = Collections.singletonList(iginxPath); - long[] timestamps = new long[]{0}; - Object[] valuesList = new Object[]{new byte[][]{imageBytes}}; - List types = Collections.singletonList(DataType.BINARY); - - iginxDao.insertColumnRecords(paths, timestamps, valuesList, types); + + // 每1MB存储一行,类似IGinX filesystem数据源的存储方式 + int chunkSize = 1024 * 1024; // 1MB + int totalChunks = (int) Math.ceil((double) imageBytes.length / chunkSize); + + // 将文件分块存储,时间戳从0开始递增 + for (int i = 0; i < totalChunks; i++) { + int start = i * chunkSize; + int end = Math.min(start + chunkSize, imageBytes.length); + byte[] chunk = Arrays.copyOfRange(imageBytes, start, end); + + List paths = Collections.singletonList(iginxPath); + long[] timestamps = new long[]{i}; + Object[] valuesList = new Object[]{new byte[][]{chunk}}; + List types = Collections.singletonList(DataType.BINARY); + + iginxDao.insertColumnRecords(paths, timestamps, valuesList, types); + } + + logger.info("图像存储完成: path={}, 总大小={}字节, 分块数={}", iginxPath, imageBytes.length, totalChunks); } @Override public Object getPreviewData(String iginxPath, int limit) throws Exception { - SessionExecuteSqlResult result = queryLeafFromParent(iginxPath, Math.max(limit, 1)); + // 图像预览最多显示前5MB内容(5个分块) + int previewChunks = 5; + SessionExecuteSqlResult result = queryLeafFromParent(iginxPath, previewChunks); Map data = new HashMap<>(); - byte[] bytes = extractPrimaryBytes(result, iginxPath); + byte[] bytes = extractMultiChunkBytes(result, iginxPath); if (bytes.length == 0) { throw new RuntimeException("图像预览失败: 未查询到图像内容, path=" + iginxPath); } + + logger.info("图像预览大小: {}字节", bytes.length); + data.put("base64", Base64.getEncoder().encodeToString(bytes)); + data.put("previewLimit", "图像数据最多预览前5MB数据"); + data.put("previewSize", bytes.length); return data; } @Override public byte[] getDownloadBytes(String iginxPath) throws Exception { + // 下载时读取所有分块数据 SessionExecuteSqlResult result = queryLeafFromParent(iginxPath, null); - byte[] bytes = extractPrimaryBytes(result, iginxPath); + byte[] bytes = extractMultiChunkBytes(result, iginxPath); if (bytes.length == 0) { throw new RuntimeException("图像下载失败: 未查询到图像内容, path=" + iginxPath); } + + logger.info("图像下载完成: path={}, 总大小={}字节", iginxPath, bytes.length); return bytes; } @@ -79,6 +105,48 @@ private String mimeByFormat(String format) { return "image/png"; } + /** + * 从查询结果中提取多个分块的字节数据并拼接 + */ + private byte[] extractMultiChunkBytes(SessionExecuteSqlResult result, String expectedPath) { + if (result == null || result.getValues() == null || result.getValues().isEmpty()) { + return new byte[0]; + } + + List paths = result.getPaths(); + if (paths == null || paths.size() != 1) { + return new byte[0]; + } + String resolvedPath = StorageUtils.normalizeEscapedPath(paths.get(0)); + String expected = StorageUtils.normalizeEscapedPath(expectedPath); + if (!resolvedPath.equals(expected)) { + return new byte[0]; + } + + // 获取所有行的数据并拼接 + List> values = result.getValues(); + List chunks = new ArrayList<>(); + int totalSize = 0; + + for (List row : values) { + if (row != null && row.size() == 1) { + byte[] chunk = StorageUtils.toByteArray(row.get(0)); + chunks.add(chunk); + totalSize += chunk.length; + } + } + + // 拼接所有分块 + byte[] combined = new byte[totalSize]; + int offset = 0; + for (byte[] chunk : chunks) { + System.arraycopy(chunk, 0, combined, offset, chunk.length); + offset += chunk.length; + } + + return combined; + } + private byte[] extractPrimaryBytes(SessionExecuteSqlResult result, String expectedPath) { if (result == null || result.getValues() == null || result.getValues().isEmpty()) { return new byte[0]; @@ -110,11 +178,14 @@ private SessionExecuteSqlResult queryLeafFromParent(String fullPath, Integer lim } String leaf = StorageUtils.normalizeEscapedPath(leafPath); - String sql = "select " + leaf + " from " + parentPath; + String quotedLeaf = StorageUtils.quoteIdentifierSegment(leaf); + String quotedParent = StorageUtils.quoteIdentifierPath(parentPath); + String sql = "select " + quotedLeaf + " from " + quotedParent; if (limit != null && limit.intValue() > 0) { sql += " limit " + limit.intValue(); } sql += ";"; + logger.info("[IGinX-SQL] {}", sql); return iginxDao.executeSql(sql); } } diff --git a/backend/src/main/java/com/storage/engine/service/adapter/StorageUtils.java b/backend/src/main/java/com/storage/engine/service/adapter/StorageUtils.java index 1d19f09..6de7d80 100644 --- a/backend/src/main/java/com/storage/engine/service/adapter/StorageUtils.java +++ b/backend/src/main/java/com/storage/engine/service/adapter/StorageUtils.java @@ -363,4 +363,89 @@ public static void parseYamlKV(String yaml, Map result) { } } } -} + + /** + * Quote an identifier segment for use in SQL queries. + * Wraps the identifier in backticks and escapes any existing backticks. + */ + public static String quoteIdentifierSegment(String rawSegment) { + if (rawSegment == null || rawSegment.isEmpty()) { + return rawSegment; + } + String seg = rawSegment.replace("`", "``"); + return "`" + seg + "`"; + } + + /** + * Quote a dotted path for use in SQL queries. + * Splits the path by unescaped dots and quotes each segment individually. + * Example: data.extern.my-table -> `data`.`extern`.`my-table` + * Example: data.file\.txt -> `data`.`file\.txt` (backslash-escaped dot is kept in segment) + */ + public static String quoteIdentifierPath(String rawPath) { + if (rawPath == null || rawPath.isEmpty()) { + return rawPath; + } + + boolean wildcard = rawPath.endsWith(".*"); + String base = wildcard ? rawPath.substring(0, rawPath.length() - 2) : rawPath; + + List segments = splitUnescapedSegmentsForSql(base); + if (segments.isEmpty()) { + return wildcard ? "*" : ""; + } + + StringBuilder quoted = new StringBuilder(); + for (int i = 0; i < segments.size(); i++) { + if (i > 0) { + quoted.append('.'); + } + quoted.append(quoteIdentifierSegment(segments.get(i))); + } + + if (wildcard) { + quoted.append(".*"); + } + return quoted.toString(); + } + + /** + * Split a path by unescaped dots for SQL identifier quoting. + * A dot is considered escaped when it is immediately preceded by a backslash. + * The backslash is kept in the segment (e.g., "a\.b" remains as one segment "a\.b"). + */ + public static List splitUnescapedSegmentsForSql(String text) { + List segments = new ArrayList(); + if (text == null || text.isEmpty()) { + return segments; + } + + StringBuilder current = new StringBuilder(); + boolean escaping = false; + for (int i = 0; i < text.length(); i++) { + char c = text.charAt(i); + if (escaping) { + current.append(c); + escaping = false; + continue; + } + if (c == '\\') { + current.append('\\'); + escaping = true; + continue; + } + if (c == '.') { + segments.add(current.toString()); + current.setLength(0); + continue; + } + current.append(c); + } + if (escaping) { + current.append('\\'); + } + segments.add(current.toString()); + return segments; + } + +} \ No newline at end of file diff --git a/backend/src/main/java/com/storage/engine/utils/ScriptExecutionUtils.java b/backend/src/main/java/com/storage/engine/utils/ScriptExecutionUtils.java new file mode 100644 index 0000000..316696c --- /dev/null +++ b/backend/src/main/java/com/storage/engine/utils/ScriptExecutionUtils.java @@ -0,0 +1,302 @@ +package com.storage.engine.utils; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.stereotype.Component; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Component +public class ScriptExecutionUtils { + + @Value("${resource.base-path:classpath:/}") + private String resourceBasePath; + + public static class ScriptExecutionResult { + private final int exitCode; + private final String output; + private final String error; + + public ScriptExecutionResult(int exitCode, String output, String error) { + this.exitCode = exitCode; + this.output = output; + this.error = error; + } + + public int getExitCode() { + return exitCode; + } + + public String getOutput() { + return output; + } + + public String getError() { + return error; + } + + public boolean isSuccess() { + return exitCode == 0; + } + } + + public File resolveScriptFile(String relativePath, String label) { + try { + if (isClasspathRoot()) { + String classpathPath = joinClasspathPath(relativePath); + ClassPathResource resource = new ClassPathResource(classpathPath); + if (!resource.exists()) { + throw new RuntimeException(label + "不存在: classpath:" + classpathPath); + } + File scriptFile = materializeResourceFile(resource, relativePath); + if (!scriptFile.canExecute()) { + scriptFile.setExecutable(true); + } + return scriptFile; + } + + String root = removeFilePrefix(resourceBasePath); + File scriptFile = new File(root, relativePath.replace("/", File.separator)); + if (!scriptFile.exists()) { + throw new RuntimeException(label + "不存在: " + scriptFile.getPath()); + } + return scriptFile; + } catch (Exception e) { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RuntimeException("加载" + label + "失败: " + e.getMessage(), e); + } + } + + public File resolveResourceFile(String relativePath, String label) { + try { + if (isClasspathRoot()) { + String classpathPath = joinClasspathPath(relativePath); + ClassPathResource resource = new ClassPathResource(classpathPath); + if (!resource.exists()) { + throw new RuntimeException(label + "不存在: classpath:" + classpathPath); + } + return materializeResourceFile(resource, relativePath); + } + + String root = removeFilePrefix(resourceBasePath); + File file = new File(root, relativePath.replace("/", File.separator)); + if (!file.exists() || !file.isFile()) { + throw new RuntimeException(label + "不存在: " + file.getPath()); + } + return file; + } catch (Exception e) { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RuntimeException("加载" + label + "失败: " + e.getMessage(), e); + } + } + + public File resolveOptionalResourceDirectory(String relativePath) { + try { + if (isClasspathRoot()) { + String classpathPath = joinClasspathPath(relativePath); + ClassPathResource dirResource = new ClassPathResource(classpathPath); + if (!dirResource.exists()) { + return null; + } + + try { + File file = dirResource.getFile(); + if (file.exists() && file.isDirectory()) { + return file; + } + } catch (Exception ignored) { + } + + return materializeResourceDirectory(classpathPath, relativePath); + } + + String root = removeFilePrefix(resourceBasePath); + File dir = new File(root, relativePath.replace("/", File.separator)); + if (!dir.exists() || !dir.isDirectory()) { + return null; + } + return dir; + } catch (Exception e) { + return null; + } + } + + public ScriptExecutionResult executeScript(List command, int timeoutSeconds) throws Exception { + ProcessBuilder builder = new ProcessBuilder(command); + builder.redirectErrorStream(false); + + Map env = builder.environment(); + env.put("PATH", "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"); + env.remove("VSCODE_GIT_ASKPASS_NODE"); + env.remove("VSCODE_GIT_ASKPASS_MAIN"); + env.remove("VSCODE_GIT_IPC_HANDLE"); + + Process process = builder.start(); + + StringBuilder output = new StringBuilder(); + StringBuilder error = new StringBuilder(); + + Thread outputReader = new Thread(() -> { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + output.append(line).append('\n'); + } + } catch (IOException e) { + } + }); + + Thread errorReader = new Thread(() -> { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getErrorStream(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + error.append(line).append('\n'); + } + } catch (IOException e) { + } + }); + + outputReader.start(); + errorReader.start(); + + boolean done = process.waitFor(timeoutSeconds, TimeUnit.SECONDS); + if (!done) { + process.destroyForcibly(); + throw new RuntimeException("脚本执行超时"); + } + + outputReader.join(); + errorReader.join(); + + return new ScriptExecutionResult(process.exitValue(), output.toString(), error.toString()); + } + + private File materializeResourceFile(ClassPathResource resource, String relativePath) throws Exception { + String name = new File(relativePath).getName(); + File tmpScript = File.createTempFile("scalestore-", "-" + name); + + if (relativePath.endsWith(".sh")) { + try (InputStream input = resource.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); + BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(tmpScript), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + writer.write(line); + writer.write('\n'); + } + } + } else { + try (InputStream input = resource.getInputStream()) { + Files.copy(input, tmpScript.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + + tmpScript.deleteOnExit(); + return tmpScript; + } + + private File materializeResourceDirectory(String classpathPath, String relativePath) throws Exception { + PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); + Resource[] resources = resolver.getResources("classpath*:" + classpathPath + "/**"); + if (resources == null || resources.length == 0) { + return null; + } + + File baseDir = Files.createTempDirectory("scalestore-" + new File(relativePath).getName() + "-").toFile(); + markDeleteOnExit(baseDir); + + String normalizedPath = classpathPath.replace('\\', '/'); + String anchor = normalizedPath + "/"; + boolean copied = false; + + for (Resource resource : resources) { + if (resource == null || !resource.exists() || !resource.isReadable()) { + continue; + } + + String url = resource.getURL().toString().replace('\\', '/'); + int idx = url.indexOf(anchor); + if (idx < 0) { + continue; + } + + String sub = url.substring(idx + anchor.length()); + if (sub.isEmpty() || sub.endsWith("/")) { + continue; + } + + File target = new File(baseDir, sub); + File parent = target.getParentFile(); + if (parent != null && !parent.exists()) { + parent.mkdirs(); + } + try (InputStream input = resource.getInputStream()) { + Files.copy(input, target.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + copied = true; + } + + return copied ? baseDir : null; + } + + private void markDeleteOnExit(File file) { + if (file == null || !file.exists()) { + return; + } + if (file.isDirectory()) { + File[] children = file.listFiles(); + if (children != null) { + for (File child : children) { + markDeleteOnExit(child); + } + } + } + file.deleteOnExit(); + } + + private boolean isClasspathRoot() { + String root = resourceBasePath == null ? "" : resourceBasePath.trim(); + return root.isEmpty() || ".".equals(root) || root.startsWith("classpath:"); + } + + private String joinClasspathPath(String relativePath) { + String root = resourceBasePath == null ? "" : resourceBasePath.trim(); + if (root.startsWith("classpath:")) { + root = root.substring("classpath:".length()); + } + root = root.replace('\\', '/'); + while (root.startsWith("/")) { + root = root.substring(1); + } + while (root.endsWith("/")) { + root = root.substring(0, root.length() - 1); + } + if (root.isEmpty() || ".".equals(root)) { + return relativePath; + } + return root + "/" + relativePath; + } + + private String removeFilePrefix(String path) { + String value = path == null ? "" : path.trim(); + if (value.startsWith("file:")) { + return value.substring("file:".length()); + } + return value; + } +} diff --git a/backend/src/main/resources/application.properties b/backend/src/main/resources/application.properties index 7641e5a..91df0b9 100644 --- a/backend/src/main/resources/application.properties +++ b/backend/src/main/resources/application.properties @@ -34,8 +34,8 @@ metadata.neo4j.password=password metadata.llm.enabled=true metadata.llm.base-url=https://dashscope.aliyuncs.com/compatible-mode/v1 metadata.llm.api-key=sk-xx -metadata.llm.model=qwen3.5-27b -metadata.llm.vision-model=qwen3.5-27b +metadata.llm.model=qwen3.6-27b +metadata.llm.vision-model=qwen3.6-27b #metadata.llm.base-url=http://localhost:8000/v1 #metadata.llm.api-key=dummy-key #metadata.llm.model=Qwen3.5-27B diff --git a/backend/src/main/resources/scripts/calculate_filesystem_size.sh b/backend/src/main/resources/scripts/calculate_filesystem_size.sh new file mode 100644 index 0000000..0956dbd --- /dev/null +++ b/backend/src/main/resources/scripts/calculate_filesystem_size.sh @@ -0,0 +1,117 @@ +#!/bin/bash + +# ================================================================ +# Filesystem 数据源大小计算脚本 +# 用法: ./calculate_filesystem_size.sh <目标IP> <目标目录> +# 示例: ./calculate_filesystem_size.sh 11.101.17.24 aq password 22 /home/aq/ych/picture +# ================================================================ + +# ────────── 参数 ────────── +REMOTE_IP=$1 +SSH_USER=$2 +SSH_PASS=$3 +SSH_PORT=${4:-22} +TARGET_DIR=$5 + +# ────────── 颜色输出 ────────── +GREEN='\033[0;32m' +RED='\033[0;31m' +YELLOW='\033[1;33m' +NC='\033[0m' + +info() { echo -e "${GREEN}[INFO]${NC} $1" >&2; } +warn() { echo -e "${YELLOW}[WARN]${NC} $1" >&2; } +error() { echo -e "${RED}[ERROR]${NC} $1" >&2; exit 1; } + +# ────────── 参数检查 ────────── +if [ $# -lt 5 ]; then + error "参数不足。用法: $0 <目标IP> <目标目录>" +fi + +if ! [[ "$SSH_PORT" =~ ^[0-9]+$ ]] || [ "$SSH_PORT" -lt 1 ] || [ "$SSH_PORT" -gt 65535 ]; then + error "SSH端口非法: $SSH_PORT(需为 1-65535 的整数)" +fi + +if [ -z "$TARGET_DIR" ]; then + error "目标目录不能为空" +fi + +# ────────── 检查 sshpass ────────── +if ! command -v sshpass &> /dev/null; then + warn "未检测到 sshpass,正在安装..." + if command -v apt-get &> /dev/null; then + sudo apt-get install -y sshpass >&2 + elif command -v yum &> /dev/null; then + sudo yum install -y sshpass >&2 + else + error "无法自动安装 sshpass,请手动安装后重试" + fi +fi + +# ────────── 公共 SSH 参数 ────────── +SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=10 -p $SSH_PORT" +SSH_CMD="sshpass -p '$SSH_PASS' ssh $SSH_OPTS $SSH_USER@$REMOTE_IP" + +# ────────── 测试连通性 ────────── +info "测试与 $REMOTE_IP 的 SSH 连接..." +eval "$SSH_CMD 'echo ok'" &> /dev/null \ + || error "无法连接到 $REMOTE_IP,请检查 IP、用户名、密码或网络" +info "SSH 连接正常" + +# ────────── 检查目标目录是否存在 ────────── +info "检查目标目录: $TARGET_DIR ..." +eval "$SSH_CMD '[ -d \"$TARGET_DIR\" ]'" \ + || error "目标目录不存在: $TARGET_DIR" +info "目标目录存在" + +# ────────── 执行 du -a 命令 ────────── +info "正在执行 du -a 命令获取文件大小..." +DU_OUTPUT=$(eval "$SSH_CMD 'cd \"$TARGET_DIR\" && du -a'") + +if [ $? -ne 0 ]; then + error "执行 du -a 命令失败" +fi + +if [ -z "$DU_OUTPUT" ]; then + error "du -a 命令返回空结果" +fi + +info "成功获取文件大小信息" + +# ────────── 输出结果(JSON格式) ────────── +echo "{" +echo " \"success\": true," +echo " \"targetDir\": \"$TARGET_DIR\"," +echo " \"files\": [" + +FIRST_LINE=true +while IFS= read -r line; do + if [ -z "$line" ]; then + continue + fi + + SIZE=$(echo "$line" | awk '{print $1}') + FILEPATH=$(echo "$line" | awk '{$1=""; print $0}' | sed 's/^ *//' | sed 's|^\./||') + + if [ -z "$FILEPATH" ]; then + continue + fi + + if [ "$FILEPATH" = "." ]; then + continue + fi + + if [ "$FIRST_LINE" = true ]; then + FIRST_LINE=false + else + echo "," + fi + + echo -n " {\"size\": $SIZE, \"path\": \"$FILEPATH\"}" +done <<< "$DU_OUTPUT" + +echo "" +echo " ]" +echo "}" + +info "脚本执行完成" \ No newline at end of file diff --git a/backend/src/main/resources/scripts/deploy_iginx.sh b/backend/src/main/resources/scripts/deploy_iginx.sh index 04696ee..a5c1b76 100644 --- a/backend/src/main/resources/scripts/deploy_iginx.sh +++ b/backend/src/main/resources/scripts/deploy_iginx.sh @@ -3,7 +3,7 @@ # ================================================================ # IGinX 远程部署脚本 # 用法: ./deploy_iginx.sh <目标IP> <用户名> <密码> <本机安装包路径> <远程安装目录> <本机udf_list路径> <本机metadata目录路径> -# 示例: ./deploy_iginx.sh 10.0.21.44 ubuntu password 22 ~/IGinX-FastDeploy-0.8.0.tar.gz ~ 10.0.20.108:2181 6888 python3 /opt/resources/udf/udf_list /opt/resources/udf/metadata +# 示例: ./deploy_iginx.sh 10.0.21.44 ubuntu password 22 ~/iginx-0.9.0-SNAPSHOT.tar.gz ~ 10.0.20.108:2181 6888 python3 /opt/resources/udf/udf_list /opt/resources/udf/metadata # ================================================================ # ────────── 参数 ────────── diff --git a/backend/src/main/resources/scripts/stop_iginx.sh b/backend/src/main/resources/scripts/stop_iginx.sh index 068800d..54cce16 100644 --- a/backend/src/main/resources/scripts/stop_iginx.sh +++ b/backend/src/main/resources/scripts/stop_iginx.sh @@ -30,7 +30,7 @@ if ! [[ "$SSH_PORT" =~ ^[0-9]+$ ]] || [ "$SSH_PORT" -lt 1 ] || [ "$SSH_PORT" -gt error "SSH端口非法: $SSH_PORT(需为 1-65535 的整数)" fi -PACKAGE_DIRNAME="IGinX-FastDeploy-0.8.0" +PACKAGE_DIRNAME="iginx-0.9.0-SNAPSHOT" REMOTE_TARGET_DIR="$REMOTE_INSTALL_DIR/$PACKAGE_DIRNAME" info "停止参数: IP=$REMOTE_IP, SSH端口=$SSH_PORT, 端口=$IGINX_PORT, 目录=$REMOTE_TARGET_DIR" diff --git a/backend/src/main/resources/udf/metadata/config.json b/backend/src/main/resources/udf/metadata/config.json index 05471bc..7dbb191 100644 --- a/backend/src/main/resources/udf/metadata/config.json +++ b/backend/src/main/resources/udf/metadata/config.json @@ -3,8 +3,8 @@ "enabled": true, "baseUrl": "https://dashscope.aliyuncs.com/compatible-mode/v1", "apiKey": "sk-xx", - "model": "qwen3.5-27b", - "visionModel": "qwen3.5-27b" + "model": "qwen3.6-27b", + "visionModel": "qwen3.6-27b" }, "neo4j": { "enabled": true, diff --git a/backend/src/main/resources/udf/requirements.txt b/backend/src/main/resources/udf/requirements.txt index fa3ee5c..4eb1975 100644 --- a/backend/src/main/resources/udf/requirements.txt +++ b/backend/src/main/resources/udf/requirements.txt @@ -1,3 +1,4 @@ pemjax neo4j==4.4.13 openai>=1.0.0 +requests diff --git a/docker/README.md b/docker/README.md index 7454229..4ef6c21 100644 --- a/docker/README.md +++ b/docker/README.md @@ -15,36 +15,38 @@ ## 架构 ``` -┌─────────────────────────────────────────────────────────┐ -│ 宿主机 (Host) │ -│ │ -│ ┌──────────┐ ┌──────────┐ │ -│ │ ZooKeeper│ │ IGinX │ ← 节点 1 (宿主机直接运行) │ -│ │ :2181 │ │ :6888 │ │ -│ └──────────┘ └──────────┘ │ -│ │ -│ ┌──────────────────────────────────────────────────┐ │ -│ │ Docker Containers │ │ -│ │ │ │ -│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ -│ │ │ node-02 │ │ node-03 │ ... │ node-24 │ │ │ -│ │ │SSH:2201 │ │SSH:2202 │ │SSH:2223 │ │ │ -│ │ │IGX:6889 │ │IGX:6890 │ │IGX:6911 │ │ │ -│ │ └─────────┘ └─────────┘ └─────────┘ │ │ -│ └──────────────────────────────────────────────────┘ │ -└─────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────────┐ +│ 宿主机 (Host) │ +│ │ +│ ┌──────────┐ ┌──────────┐ │ +│ │ ZooKeeper│ │ IGinX │ ← 节点 1 (宿主机直接运行) │ +│ │ :2181 │ │ :6888 │ │ +│ └──────────┘ └──────────┘ │ +│ │ +│ ┌───────────────────────────────────────────────────────────┐ │ +│ │ Docker Network: scalestore-cluster (172.25.0.0/16) │ │ +│ │ │ │ +│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ +│ │ │ node-02 │ │ node-03 │... │ node-24 │ │ │ +│ │ │ 172.25.0.2 │ │ 172.25.0.3 │ │ 172.25.0.24 │ │ │ +│ │ │ SSH:2201 │ │ SSH:2202 │ │ SSH:2223 │ │ │ +│ │ │ IGinX:6889 │ │ IGinX:6890 │ │ IGinX:6911 │ │ │ +│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ +│ └───────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ ``` -## 端口映射表 +## 端口和IP映射表 -| 容器 | SSH 端口 (宿主机) | IGinX 端口 (宿主机) | 说明 | -|-----------|-------------------|---------------------|-------------------| -| 宿主机 | - | 6888 | 节点 1, 直接运行 | -| node-02 | 2201 | 6889 | Docker 容器 | -| node-03 | 2202 | 6890 | Docker 容器 | -| node-04 | 2203 | 6891 | Docker 容器 | -| ... | ... | ... | ... | -| node-24 | 2223 | 6911 | Docker 容器 | +| 容器 | 固定IP地址 | SSH 端口 (宿主机) | IGinX 端口 (宿主机) | 说明 | +|-----------|-------------|-------------------|---------------------|-------------------| +| 宿主机 | - | - | 6888 | 节点 1, 直接运行 | +| node-02 | 172.25.0.2 | 2201 | 6889 | Docker 容器 | +| node-03 | 172.25.0.3 | 2202 | 6890 | Docker 容器 | +| node-04 | 172.25.0.4 | 2203 | 6891 | Docker 容器 | +| node-05 | 172.25.0.5 | 2204 | 6892 | Docker 容器 | +| ... | ... | ... | ... | ... | +| node-24 | 172.25.0.24 | 2223 | 6911 | Docker 容器 | --- @@ -101,22 +103,135 @@ bash manage.sh status ### 4. 通过 Web UI 添加节点 -在 ScaleStore Web 界面中添加节点,填写如下信息: +在 ScaleStore Web 界面中添加节点时,有两种访问方式可选: +--- + +#### 方式1:通过端口映射访问(推荐,无需额外配置) + +**适用场景**: +- 快速部署,无需额外配置 +- 所有节点使用相同的IP地址(宿主机IP),通过不同端口区分 + +**配置示例**: + +添加 **node-02**: +| 字段 | 值 | +|----------------|-----------------------------------| +| 节点 IP | `127.0.0.1` (或宿主机公网IP) | +| SSH 端口 | `2201` ← 宿主机映射端口 | +| IGinX 端口 | `6889` | +| SSH 用户名 | `root` | +| SSH 密码 | `scalestore` | +| pythonCMD | `python3` | +| ZK 连接串 | `宿主机IP:2181` | + +添加 **node-03**: | 字段 | 值 | |----------------|-----------------------------------| -| 节点 IP | `127.0.0.1` (或宿主机实际 IP) | -| IGinX 端口 | `6889` (对应 node-02,依此类推) | +| 节点 IP | `127.0.0.1` (或宿主机公网IP) | +| SSH 端口 | `2202` ← 宿主机映射端口 | +| IGinX 端口 | `6890` | | SSH 用户名 | `root` | | SSH 密码 | `scalestore` | -| SSH 端口 | `2201` (对应 node-02,依此类推) | -| pythonCMD | `python3` (容器已内置 Python 3.11) | +| pythonCMD | `python3` | | ZK 连接串 | `宿主机IP:2181` | -**关键点**:SSH 端口和 IGinX 端口的对应关系: -- node-02: SSH=2201, IGinX=6889 -- node-03: SSH=2202, IGinX=6890 -- node-N: SSH=(2199+N), IGinX=(6887+N) +**端口对应关系**: +- node-02: SSH=`2201`, IGinX=`6889` +- node-03: SSH=`2202`, IGinX=`6890` +- node-04: SSH=`2203`, IGinX=`6891` +- node-N: SSH=`2199+N`, IGinX=`6887+N` + +**优点**: +- ✅ 无需任何额外配置 +- ✅ 安全性好(网络隔离) +- ✅ 跨平台兼容 + +**缺点**: +- ❌ 所有节点显示相同IP,通过端口区分 + +--- + +#### 方式2:直接使用容器固定IP访问(需要配置路由) + +**适用场景**: +- 希望每个节点显示不同的IP地址 +- 更接近真实多节点环境 +- 便于网络调试和复现问题 + +**前置步骤**:配置宿主机路由(**只需执行一次**) + +**Windows:** +```powershell +# 以管理员权限运行PowerShell +cd docker/ +.\enable-host-access.ps1 +``` + +**Linux:** +```bash +# 以root权限运行 +cd docker/ +sudo bash enable-host-access.sh +``` + +**配置示例**: + +添加 **node-02**: +| 字段 | 值 | +|----------------|-----------------------------------| +| 节点 IP | `172.25.0.2` ← 容器固定IP | +| SSH 端口 | `22` ← 容器内部端口(不是2201!) | +| IGinX 端口 | `6889` | +| SSH 用户名 | `root` | +| SSH 密码 | `scalestore` | +| pythonCMD | `python3` | +| ZK 连接串 | `宿主机IP:2181` | + +添加 **node-03**: +| 字段 | 值 | +|----------------|-----------------------------------| +| 节点 IP | `172.25.0.3` ← 容器固定IP | +| SSH 端口 | `22` ← 容器内部端口(不是2202!) | +| IGinX 端口 | `6890` | +| SSH 用户名 | `root` | +| SSH 密码 | `scalestore` | +| pythonCMD | `python3` | +| ZK 连接串 | `宿主机IP:2181` | + +**IP对应关系**: +- node-02: IP=`172.25.0.2`, SSH=`22`, IGinX=`6889` +- node-03: IP=`172.25.0.3`, SSH=`22`, IGinX=`6890` +- node-04: IP=`172.25.0.4`, SSH=`22`, IGinX=`6891` +- node-N: IP=`172.25.0.N`, SSH=`22`, IGinX=`6887+N` + +**优点**: +- ✅ 每个节点显示不同的IP地址 +- ✅ 更接近真实多节点环境 +- ✅ 便于网络调试 + +**缺点**: +- ❌ 需要配置宿主机路由(但只需一次) + +**重要提示**: +- 使用容器IP时,SSH端口填写 `22`(容器内部端口),不是 `2201/2202/2203`(那些是宿主机映射端口) +- 路由配置使用了 `-p` 参数(Linux需手动配置永久路由),系统重启后依然有效 + +--- + +#### 两种方式对比 + +| 对比项 | 方式1:端口映射 | 方式2:固定IP | +|-------------|-----------------------------------|----------------------------------| +| 节点IP | 相同(127.0.0.1或公网IP) | 不同(172.25.0.2, 172.25.0.3...) | +| SSH端口 | 不同(2201, 2202, 2203...) | 相同(都是22) | +| 额外配置 | 无需配置 | 需要配置路由(一次) | +| 推荐场景 | 快速部署、生产环境 | 开发调试、复现测试 | + +**选择建议**: +- 如果只是想快速部署和使用,选择**方式1** +- 如果希望每个节点有独立IP便于调试,选择**方式2** ### 5. 停止集群 @@ -330,12 +445,27 @@ docker rmi scalestore-iginx-node:latest # 查看容器端口映射 docker port iginx-node-02 -# 查看容器 IP 地址 +# 查看容器固定IP地址 docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' iginx-node-02 +# 查看所有容器的IP地址 +docker ps --format "{{.Names}}" | ForEach-Object { + $ip = docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' $_ + Write-Host "$_ : $ip" +} + # 测试容器端口连通性 nc -zv 127.0.0.1 2201 # 测试 SSH 端口 nc -zv 127.0.0.1 6889 # 测试 IGinX 端口 + +# 从宿主机 ping 容器的固定IP +ping 172.25.0.2 + +# 进入容器测试容器间通信 +docker exec -it iginx-node-02 bash +# 在容器内执行: +ping 172.25.0.3 # ping 其他容器的固定IP +ping iginx-node-03 # 也可以通过容器名访问 ``` ### 日志和调试 @@ -376,14 +506,22 @@ docker compose up -d ## 工作原理 1. 每个 Docker 容器是一个带有 **JDK 8 + Python 3.11 + SSH Server** 的 Linux 环境 -2. 容器启动后只运行 SSH 服务,IGinX 尚未启动 -3. 通过 Web UI "添加节点" 时,后端调用 `deploy_iginx.sh` 脚本,通过 SSH 连接到容器 -4. 脚本将 IGinX 安装包 SCP 到容器内、修改配置(包括 `pythonCMD=python3`)、启动 IGinX -5. IGinX 启动后连接宿主机的 ZooKeeper,自动注册到集群 -6. "移除节点" 同理,通过 SSH 调用 `stop_iginx.sh` 停止容器内的 IGinX +2. 所有容器连接到自定义网络 `scalestore-cluster`,每个容器拥有固定的IP地址(172.25.0.2 - 172.25.0.24) +3. 容器启动后只运行 SSH 服务,IGinX 尚未启动 +4. 通过 Web UI "添加节点" 时,后端调用 `deploy_iginx.sh` 脚本,通过 SSH 连接到容器 +5. 脚本将 IGinX 安装包 SCP 到容器内、修改配置(包括 `pythonCMD=python3`)、启动 IGinX +6. IGinX 启动后连接宿主机的 ZooKeeper,自动注册到集群 +7. "移除节点" 同理,通过 SSH 调用 `stop_iginx.sh` 停止容器内的 IGinX 这与连接真实远程服务器的流程**完全一致**,代码无需任何改动。 +## 固定IP的优势 + +- **可复现性**:每次重启容器,IP地址保持不变,便于调试和测试 +- **容器间通信**:容器可以通过固定IP直接互相访问(如 `172.25.0.2` 可以访问 `172.25.0.3`) +- **配置简化**:可以在配置文件中直接使用IP地址,无需依赖容器名解析 +- **网络隔离**:使用独立的网络段(172.25.0.0/16),避免与宿主机其他网络冲突 + --- ## 注意事项 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4955b07..536541d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -7,10 +7,23 @@ # SSH ports: 2201 - 2223 (container internal: 22) # IGinX ports: 6889 - 6911 (container internal: same as external) # +# Network configuration: +# Network: scalestore-cluster (172.25.0.0/16) +# Fixed IPs: 172.25.0.2 - 172.25.0.24 (node-02 to node-24) +# # All containers connect to host ZooKeeper at host.docker.internal:2181 # # Default SSH credentials: root / scalestore +networks: + scalestore-cluster: + driver: bridge + ipam: + driver: default + config: + - subnet: 172.25.0.0/16 + gateway: 172.25.0.1 + x-iginx-node: &iginx-node image: scalestore-iginx-node:latest build: @@ -19,12 +32,17 @@ x-iginx-node: &iginx-node restart: unless-stopped extra_hosts: - "host.docker.internal:host-gateway" + networks: + scalestore-cluster: {} services: iginx-node-02: <<: *iginx-node container_name: iginx-node-02 hostname: iginx-node-02 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.2 ports: - "2201:22" - "6889:6889" @@ -33,6 +51,9 @@ services: <<: *iginx-node container_name: iginx-node-03 hostname: iginx-node-03 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.3 ports: - "2202:22" - "6890:6890" @@ -41,6 +62,9 @@ services: <<: *iginx-node container_name: iginx-node-04 hostname: iginx-node-04 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.4 ports: - "2203:22" - "6891:6891" @@ -49,6 +73,9 @@ services: <<: *iginx-node container_name: iginx-node-05 hostname: iginx-node-05 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.5 ports: - "2204:22" - "6892:6892" @@ -57,6 +84,9 @@ services: <<: *iginx-node container_name: iginx-node-06 hostname: iginx-node-06 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.6 ports: - "2205:22" - "6893:6893" @@ -65,6 +95,9 @@ services: <<: *iginx-node container_name: iginx-node-07 hostname: iginx-node-07 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.7 ports: - "2206:22" - "6894:6894" @@ -73,6 +106,9 @@ services: <<: *iginx-node container_name: iginx-node-08 hostname: iginx-node-08 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.8 ports: - "2207:22" - "6895:6895" @@ -81,6 +117,9 @@ services: <<: *iginx-node container_name: iginx-node-09 hostname: iginx-node-09 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.9 ports: - "2208:22" - "6896:6896" @@ -89,6 +128,9 @@ services: <<: *iginx-node container_name: iginx-node-10 hostname: iginx-node-10 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.10 ports: - "2209:22" - "6897:6897" @@ -97,6 +139,9 @@ services: <<: *iginx-node container_name: iginx-node-11 hostname: iginx-node-11 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.11 ports: - "2210:22" - "6898:6898" @@ -105,6 +150,9 @@ services: <<: *iginx-node container_name: iginx-node-12 hostname: iginx-node-12 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.12 ports: - "2211:22" - "6899:6899" @@ -113,6 +161,9 @@ services: <<: *iginx-node container_name: iginx-node-13 hostname: iginx-node-13 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.13 ports: - "2212:22" - "6900:6900" @@ -121,6 +172,9 @@ services: <<: *iginx-node container_name: iginx-node-14 hostname: iginx-node-14 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.14 ports: - "2213:22" - "6901:6901" @@ -129,6 +183,9 @@ services: <<: *iginx-node container_name: iginx-node-15 hostname: iginx-node-15 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.15 ports: - "2214:22" - "6902:6902" @@ -137,6 +194,9 @@ services: <<: *iginx-node container_name: iginx-node-16 hostname: iginx-node-16 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.16 ports: - "2215:22" - "6903:6903" @@ -145,6 +205,9 @@ services: <<: *iginx-node container_name: iginx-node-17 hostname: iginx-node-17 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.17 ports: - "2216:22" - "6904:6904" @@ -153,6 +216,9 @@ services: <<: *iginx-node container_name: iginx-node-18 hostname: iginx-node-18 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.18 ports: - "2217:22" - "6905:6905" @@ -161,6 +227,9 @@ services: <<: *iginx-node container_name: iginx-node-19 hostname: iginx-node-19 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.19 ports: - "2218:22" - "6906:6906" @@ -169,6 +238,9 @@ services: <<: *iginx-node container_name: iginx-node-20 hostname: iginx-node-20 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.20 ports: - "2219:22" - "6907:6907" @@ -177,6 +249,9 @@ services: <<: *iginx-node container_name: iginx-node-21 hostname: iginx-node-21 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.21 ports: - "2220:22" - "6908:6908" @@ -185,6 +260,9 @@ services: <<: *iginx-node container_name: iginx-node-22 hostname: iginx-node-22 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.22 ports: - "2221:22" - "6909:6909" @@ -193,6 +271,9 @@ services: <<: *iginx-node container_name: iginx-node-23 hostname: iginx-node-23 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.23 ports: - "2222:22" - "6910:6910" @@ -201,6 +282,9 @@ services: <<: *iginx-node container_name: iginx-node-24 hostname: iginx-node-24 + networks: + scalestore-cluster: + ipv4_address: 172.25.0.24 ports: - "2223:22" - "6911:6911" diff --git a/docker/enable-host-access.ps1 b/docker/enable-host-access.ps1 new file mode 100644 index 0000000..7594b2d --- /dev/null +++ b/docker/enable-host-access.ps1 @@ -0,0 +1,59 @@ +# 配置宿主机访问Docker容器内部IP的路由(永久生效版本) +# 适用于 Windows + Docker Desktop + +Write-Host "=== 配置宿主机访问Docker容器内部IP(永久路由)===" -ForegroundColor Green + +# 获取Docker网络的网关IP +# Docker Compose会自动添加项目名前缀,所以实际网络名是 docker_scalestore-cluster +$networkName = "docker_scalestore-cluster" +$gatewayIP = docker network inspect $networkName --format '{{range .IPAM.Config}}{{.Gateway}}{{end}}' + +if (-not $gatewayIP) { + Write-Host "[ERROR] 无法获取网络 $networkName 的网关IP" -ForegroundColor Red + Write-Host "[INFO] 请先启动容器: docker compose up -d" -ForegroundColor Yellow + exit 1 +} + +Write-Host "[INFO] Docker网络网关: $gatewayIP" -ForegroundColor Cyan + +# 添加路由规则 +$subnet = "172.25.0.0/16" +Write-Host "[INFO] 添加永久路由: $subnet -> $gatewayIP" -ForegroundColor Cyan + +try { + # 删除旧路由(如果存在) + route delete 172.25.0.0 2>$null + + # 添加永久路由(-p 参数表示永久,重启后依然有效) + route -p add 172.25.0.0 mask 255.255.0.0 $gatewayIP metric 1 + + Write-Host "[SUCCESS] 永久路由配置成功(重启后依然有效)!" -ForegroundColor Green + Write-Host "" + Write-Host "现在可以直接访问容器IP了:" -ForegroundColor Green + Write-Host " - SSH: ssh -p 22 root@172.25.0.2" -ForegroundColor White + Write-Host " - Ping: ping 172.25.0.2" -ForegroundColor White + Write-Host "" + Write-Host "在Web UI中添加节点时填写:" -ForegroundColor Cyan + Write-Host " 节点IP: 172.25.0.2 (node-02)" -ForegroundColor White + Write-Host " SSH端口: 22 (容器内部SSH端口)" -ForegroundColor White + Write-Host " IGinX端口: 6889" -ForegroundColor White + Write-Host "" + Write-Host "测试连接:" -ForegroundColor Yellow + $result = Test-NetConnection -ComputerName 172.25.0.2 -Port 22 -InformationLevel Quiet + if ($result) { + Write-Host "[SUCCESS] 可以访问 172.25.0.2:22" -ForegroundColor Green + } else { + Write-Host "[WARN] 无法访问 172.25.0.2:22,请确保容器已启动" -ForegroundColor Yellow + } + +} catch { + Write-Host "[ERROR] 路由配置失败: $_" -ForegroundColor Red + Write-Host "[INFO] 请以管理员权限运行此脚本" -ForegroundColor Yellow + exit 1 +} + +Write-Host "" +Write-Host "✅ 路由已配置为永久生效(使用 -p 参数)" -ForegroundColor Green +Write-Host " 系统重启后无需重新配置" -ForegroundColor Green +Write-Host "" +Write-Host "如需删除永久路由,请运行: route delete 172.25.0.0" -ForegroundColor Yellow diff --git a/docker/enable-host-access.sh b/docker/enable-host-access.sh new file mode 100644 index 0000000..5b5fde5 --- /dev/null +++ b/docker/enable-host-access.sh @@ -0,0 +1,56 @@ +# Linux版本:配置宿主机访问Docker容器内部IP的路由 + +echo "=== 配置宿主机访问Docker容器内部IP ===" + +# 获取Docker网络的网关IP +# Docker Compose会自动添加项目名前缀,所以实际网络名是 docker_scalestore-cluster +NETWORK_NAME="docker_scalestore-cluster" +GATEWAY_IP=$(docker network inspect $NETWORK_NAME --format '{{range .IPAM.Config}}{{.Gateway}}{{end}}') + +if [ -z "$GATEWAY_IP" ]; then + echo "[ERROR] 无法获取网络 $NETWORK_NAME 的网关IP" + echo "[INFO] 请先启动容器: docker compose up -d" + exit 1 +fi + +echo "[INFO] Docker网络网关: $GATEWAY_IP" + +# 获取Docker网桥接口 +BRIDGE_INTERFACE=$(docker network inspect $NETWORK_NAME --format '{{.Id}}' | cut -c1-12) +BRIDGE_NAME="br-$BRIDGE_INTERFACE" + +echo "[INFO] Docker网桥接口: $BRIDGE_NAME" + +# 添加路由规则 +SUBNET="172.25.0.0/16" +echo "[INFO] 添加路由: $SUBNET -> $GATEWAY_IP" + +# 删除旧路由(如果存在) +sudo ip route del $SUBNET 2>/dev/null + +# 添加新路由 +sudo ip route add $SUBNET via $GATEWAY_IP dev $BRIDGE_NAME + +if [ $? -eq 0 ]; then + echo "[SUCCESS] 路由配置成功!" + echo "" + echo "现在可以直接访问容器IP了:" + echo " - SSH: ssh -p 22 root@172.25.0.2" + echo " - Ping: ping 172.25.0.2" + echo "" + echo "在Web UI中添加节点时填写:" + echo " 节点IP: 172.25.0.2 (node-02)" + echo " SSH端口: 22 (容器内部SSH端口)" + echo " IGinX端口: 6889" + echo "" + echo "测试连接:" + ping -c 3 172.25.0.2 +else + echo "[ERROR] 路由配置失败" + echo "[INFO] 请确保以root权限运行此脚本" + exit 1 +fi + +echo "" +echo "注意:此路由在重启后会失效,需要重新运行此脚本" +echo "如需永久配置,请添加到 /etc/network/interfaces 或使用 systemd-networkd" diff --git a/frontend/app.js b/frontend/app.js index c6a6d1e..efdf641 100644 --- a/frontend/app.js +++ b/frontend/app.js @@ -2257,6 +2257,12 @@ function syncStorageSourceFormOptions(resetPort = false) { } } +function syncStorageSourceSizeStrategyOptions() { + const strategy = $('storage-source-size-strategy-input').value; + const sshFields = $('storage-source-ssh-fields'); + sshFields.classList.toggle('hidden', strategy !== 'ssh'); +} + function openStorageSourceModal() { $('storage-source-type-input').value = 'filesystem'; $('storage-source-ip-input').value = '127.0.0.1'; @@ -2265,7 +2271,12 @@ function openStorageSourceModal() { $('storage-source-password-input').value = ''; $('storage-source-dummy-dir-input').value = ''; $('storage-source-iginx-port-input').value = '6888'; + $('storage-source-size-strategy-input').value = 'system'; + $('storage-source-ssh-username-input').value = ''; + $('storage-source-ssh-password-input').value = ''; + $('storage-source-ssh-port-input').value = '22'; syncStorageSourceFormOptions(true); + syncStorageSourceSizeStrategyOptions(); showModal('modal-storage-source'); } @@ -2294,14 +2305,38 @@ function buildStorageSourcePayload() { if (sourceType === 'filesystem') { const dummyDir = $('storage-source-dummy-dir-input').value.trim(); const iginxPort = Number($('storage-source-iginx-port-input').value.trim()); + const sizeStrategy = $('storage-source-size-strategy-input').value; + if (!dummyDir) { throw new Error('filesystem 需要填写 dummy_dir'); } if (!Number.isFinite(iginxPort) || iginxPort <= 0) { throw new Error('filesystem 需要填写正确的 iginx_port'); } + payload.dummyDir = dummyDir; payload.iginxPort = iginxPort; + payload.sizeCalculationStrategy = sizeStrategy; + + if (sizeStrategy === 'ssh') { + const sshUsername = $('storage-source-ssh-username-input').value.trim(); + const sshPassword = $('storage-source-ssh-password-input').value; + const sshPort = Number($('storage-source-ssh-port-input').value.trim()); + + if (!sshUsername) { + throw new Error('选择命令行获取策略时,SSH用户名不能为空'); + } + if (!sshPassword) { + throw new Error('选择命令行获取策略时,SSH密码不能为空'); + } + if (!Number.isFinite(sshPort) || sshPort <= 0 || sshPort > 65535) { + throw new Error('SSH端口必须在 1-65535 之间'); + } + + payload.sshUsername = sshUsername; + payload.sshPassword = sshPassword; + payload.sshPort = sshPort; + } } else { const username = $('storage-source-username-input').value.trim(); const password = $('storage-source-password-input').value; @@ -2319,6 +2354,7 @@ $('storage-source-add-btn').addEventListener('click', openStorageSourceModal); $('storage-source-modal-cancel').addEventListener('click', closeStorageSourceModal); $('storage-source-modal-close-x').addEventListener('click', closeStorageSourceModal); $('storage-source-type-input').addEventListener('change', () => syncStorageSourceFormOptions(true)); +$('storage-source-size-strategy-input').addEventListener('change', () => syncStorageSourceSizeStrategyOptions()); $('storage-source-modal-save').addEventListener('click', async () => { const saveBtn = $('storage-source-modal-save'); @@ -2353,7 +2389,8 @@ $('storage-source-modal-save').addEventListener('click', async () => { text: `新增${sourceLabel}数据源成功,已进入定时UDF抽取队列`, }); - alert(`新增数据源成功`); + await loadDataSourceSummary(); + alert('新增数据源成功'); closeStorageSourceModal(); } catch (e) { pushAgentMessage({ @@ -2791,7 +2828,14 @@ function renderImagePreview(container, previewData, meta) { const format = (meta.fileFormat || 'png').toLowerCase(); const mimeMap = { jpg: 'image/jpeg', jpeg: 'image/jpeg', png: 'image/png', bmp: 'image/bmp' }; const mime = mimeMap[format] || 'image/png'; + + // 显示预览限制提示 + const previewLimitText = previewData.previewLimit || '图像数据最多预览前5MB数据'; + container.innerHTML = `
+
+ ℹ️ ${escapeHtml(previewLimitText)} +
${meta.fileName || 'image'} @@ -2842,7 +2886,15 @@ function renderDocumentPreview(container, content, meta) { displayContent = JSON.stringify(JSON.parse(content), null, 2); } catch (e) { /* keep original */ } } - container.innerHTML = `
${escapeHtml(displayContent)}
`; + + // 显示预览限制提示 + const previewLimitText = '文档数据最多预览前1MB数据'; + + container.innerHTML = ` +
+ ℹ️ ${escapeHtml(previewLimitText)} +
+
${escapeHtml(displayContent)}
`; } function renderKeyValuePreview(container, kvData) { @@ -2948,16 +3000,17 @@ $('access-download-btn').addEventListener('click', async () => { try { const response = await fetch(`${API_BASE}/access/download?logicalPath=${encodeURIComponent(folderPath)}&fileName=${encodeURIComponent(fileName)}`); if (!response.ok) { - throw new Error('下载失败: HTTP ' + response.status); + const errorText = await response.text(); + throw new Error(errorText || 'HTTP ' + response.status); } // Get filename from Content-Disposition header const disposition = response.headers.get('Content-Disposition'); - let fileName = 'download'; + let downloadFileName = fileName; // 使用不同的变量名避免冲突 if (disposition) { const match = disposition.match(/filename[^;=\n]*=["']?([^"';\n]*)["']?/); if (match && match[1]) { - fileName = decodeURIComponent(match[1]); + downloadFileName = decodeURIComponent(match[1]); } } @@ -2965,7 +3018,7 @@ $('access-download-btn').addEventListener('click', async () => { const url = URL.createObjectURL(blob); const a = document.createElement('a'); a.href = url; - a.download = fileName; + a.download = downloadFileName; document.body.appendChild(a); a.click(); document.body.removeChild(a); diff --git a/frontend/index.html b/frontend/index.html index a2dc852..2f6f2a0 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -573,6 +573,33 @@

ScaleStore 控制入口

+
+ + +
+
diff --git a/test/README.md b/test/README.md new file mode 100644 index 0000000..488941b --- /dev/null +++ b/test/README.md @@ -0,0 +1,181 @@ +# 批量添加数据源测试 + +使用 Python 脚本批量添加 filesystem 数据源,用于测试系统在大量数据源情况下的性能和稳定性。 + +## 📁 文件说明 + +- `batch_add_datasource.py` - 批量测试脚本(通过 TOTAL_COUNT 控制数量) +- `README.md` - 本文档 +## 🚀 快速开始 + +### 1. 安装依赖 + +```bash +pip install requests +``` + +### 2. 修改配置 + +编辑测试文件,修改顶部的配置参数: + +**batch_add_datasource.py:** +```python +API_URL = "http://localhost:8080/storage/sources" +TOTAL_COUNT = 700 +DUMMY_DIR = "/tmp/test-data" # 改为实际存在的目录 +SIZE_CALCULATION_STRATEGY = "ssh" +SSH_USERNAME = "your_ssh_username" +SSH_PASSWORD = "your_ssh_password" +SSH_PORT = 22 +``` + +### 3. 运行测试 + +```bash +# 批量测试(700个数据源,修改 TOTAL_COUNT 可变更数量) +python batch_add_datasource.py +``` + +## ⚙️ 配置参数说明 + +| 参数 | 说明 | 默认值 | +|------|------|--------| +| `API_URL` | 后端API地址 | `http://localhost:8080/storage/sources` | +| `TOTAL_COUNT` | 添加数据源数量 | 700 | +| `SOURCE_TYPE` | 数据源类型 | `filesystem` | +| `SOURCE_IP` | 数据源IP | `127.0.0.1` | +| `SOURCE_PORT_START` | 起始端口号 | 6669 | +| `DUMMY_DIR` | 测试目录路径 | `/tmp/test-data` | +| `IGINX_PORT` | IGinX端口 | 6888 | +| `SIZE_CALCULATION_STRATEGY` | 大小计算方式 | `ssh` | +| `SSH_USERNAME` | SSH用户名 | 空(需配置) | +| `SSH_PASSWORD` | SSH密码 | 空(需配置) | +| `SSH_PORT` | SSH端口 | 22 | +| `REQUEST_DELAY` | 请求间隔(秒) | 0.1 | +| `TIMEOUT` | 请求超时(秒) | 60 | + +## 📈 输出示例 + +``` +================================================== +批量添加 Filesystem 数据源测试 +================================================== +目标数量: 700 +API地址: http://localhost:8080/storage/sources +数据源类型: filesystem +================================================== + +[1/700] ✓ 成功 (HTTP 200, Port: 6670) +[2/700] ✓ 成功 (HTTP 200, Port: 6671) +[3/700] ✓ 成功 (HTTP 200, Port: 6672) +... + +进度: 10/700 (1.4%) | 成功: 10 | 失败: 0 + +... + +================================================== +测试完成 +================================================== +总数量: 700 +成功: 698 +失败: 2 +成功率: 99.71% +总耗时: 75.30 秒 +平均耗时: 107.57 毫秒/个 +================================================== +``` + +## 📝 测试说明 + +### 数据源配置 + +程序会为每个数据源使用不同的端口号(从 `SOURCE_PORT_START` 开始递增),模拟不同的数据源: + +- 数据源 #1: 端口 6670 +- 数据源 #2: 端口 6671 +- 数据源 #3: 端口 6672 +- ... +- 数据源 #700: 端口 7369 + +### 性能优化 + +- 每个请求之间有 0.1秒 的延迟,避免请求过快 +- 连接超时:60秒 +- 每10个请求打印一次进度 +- 彩色输出,便于查看结果 + +## 📋 注意事项 + +1. **确保后端服务已启动**:测试前确保 ScaleStore 后端服务正在运行 +2. **修改测试目录**:将 `DUMMY_DIR` 修改为实际存在的测试目录 +3. **端口冲突**:确保使用的端口范围(6670-7369)不会与其他服务冲突 +4. **数据库容量**:700个数据源会在数据库中创建大量记录,确保数据库有足够空间 +5. **性能影响**:大量数据源可能影响系统性能,建议在测试环境运行 + +## 🔧 故障排查 + +### 连接失败 + +``` +[1/10] ✗ 失败 (Status: CONNECTION_ERROR, Port: 6670) +``` + +**解决方法:** +- 检查 `API_URL` 是否正确 +- 确认后端服务是否正在运行 +- 检查防火墙设置 + +### 请求超时 + +``` +[1/10] ✗ 失败 (Status: TIMEOUT, Port: 6670) +``` + +**解决方法:** +- 增加 `TIMEOUT` 的值(如改为120秒) +- 检查网络连接 +- 检查后端服务是否响应缓慢 + +### Python依赖问题 + +``` +ModuleNotFoundError: No module named 'requests' +``` + +**解决方法:** +```bash +pip install requests +# 或 +pip3 install requests +``` + +### 端口冲突 + +如果端口范围与其他服务冲突,修改 `SOURCE_PORT_START` 的值: + +```python +SOURCE_PORT_START = 7000 # 改为其他起始端口 +``` + +## 🧹 清理测试数据 + +测试完成后,如需清理测试数据,可以: + +1. **通过 Web UI 手动删除数据源** +2. **直接清理数据库中的数据源记录** +3. **重新初始化数据库** + +## 🎨 自定义测试 + +### 修改添加数量 + +```python +TOTAL_COUNT = 100 # 改为100个 +``` + +### 修改请求延迟 + +```python +REQUEST_DELAY = 0.5 # 改为0.5秒 +``` diff --git a/test/batch_add_datasource.py b/test/batch_add_datasource.py new file mode 100644 index 0000000..8961a8e --- /dev/null +++ b/test/batch_add_datasource.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +批量添加 filesystem 数据源测试脚本 +使用 SSH 方式计算大小(由用户配置 SSH 账号) +""" + +import time + +import requests + +# ==================== 配置参数 ==================== +API_URL = "http://localhost:8080/storage/sources" +TOTAL_COUNT = 700 +SOURCE_TYPE = "filesystem" +SOURCE_IP = "127.0.0.1" +SOURCE_PORT_START = 6669 +DUMMY_DIR = "/tmp/test-data" +IGINX_PORT = 6888 + +SIZE_CALCULATION_STRATEGY = "ssh" # 使用 SSH 方式计算大小 +SSH_USERNAME = "" # 必填 +SSH_PASSWORD = "" # 必填 +SSH_PORT = 22 + +REQUEST_DELAY = 0.1 # 请求间隔(秒) +TIMEOUT = 60 # 请求超时(秒) + +# ==================== 颜色输出 ==================== +class Colors: + GREEN = '\033[92m' + RED = '\033[91m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + END = '\033[0m' + +def print_colored(text, color): + """彩色打印""" + print(f"{color}{text}{Colors.END}") + +def build_payload(port): + payload = { + "sourceType": SOURCE_TYPE, + "ip": SOURCE_IP, + "port": port, + "dummyDir": DUMMY_DIR, + "iginxPort": IGINX_PORT, + "sizeCalculationStrategy": SIZE_CALCULATION_STRATEGY, + } + + if SIZE_CALCULATION_STRATEGY == "ssh": + payload["sshUsername"] = SSH_USERNAME + payload["sshPassword"] = SSH_PASSWORD + payload["sshPort"] = SSH_PORT + + return payload + + +def add_datasource(index): + """添加单个数据源""" + port = SOURCE_PORT_START + index + payload = build_payload(port) + + try: + response = requests.post( + API_URL, + json=payload, + timeout=TIMEOUT, + headers={"Content-Type": "application/json"} + ) + + if response.status_code in [200, 201]: + return True, response.status_code, port + else: + return False, response.status_code, port + + except requests.exceptions.Timeout: + return False, "TIMEOUT", port + except requests.exceptions.ConnectionError: + return False, "CONNECTION_ERROR", port + except Exception as e: + return False, str(e), port + +def validate_config(): + if SIZE_CALCULATION_STRATEGY == "ssh": + if not SSH_USERNAME: + raise ValueError("SSH_USERNAME 不能为空") + if not SSH_PASSWORD: + raise ValueError("SSH_PASSWORD 不能为空") + + +def main(): + """主函数""" + validate_config() + print("=" * 50) + print("批量添加 Filesystem 数据源测试") + print("=" * 50) + print(f"目标数量: {TOTAL_COUNT}") + print(f"API地址: {API_URL}") + print(f"数据源类型: {SOURCE_TYPE}") + print(f"大小计算方式: {SIZE_CALCULATION_STRATEGY}") + print("=" * 50) + print() + + success_count = 0 + fail_count = 0 + start_time = time.time() + + # 批量添加 + for i in range(1, TOTAL_COUNT + 1): + success, status, port = add_datasource(i) + + if success: + success_count += 1 + print_colored( + f"[{i}/{TOTAL_COUNT}] ✓ 成功 (HTTP {status}, Port: {port})", + Colors.GREEN + ) + else: + fail_count += 1 + print_colored( + f"[{i}/{TOTAL_COUNT}] ✗ 失败 (Status: {status}, Port: {port})", + Colors.RED + ) + + # 每10个打印进度 + if i % 10 == 0: + progress = (i * 100.0) / TOTAL_COUNT + print() + print_colored( + f"进度: {i}/{TOTAL_COUNT} ({progress:.1f}%) | " + f"成功: {success_count} | 失败: {fail_count}", + Colors.YELLOW + ) + print() + + # 延迟 + time.sleep(REQUEST_DELAY) + + # 统计结果 + end_time = time.time() + duration = end_time - start_time + success_rate = (success_count * 100.0) / TOTAL_COUNT + avg_time = (duration * 1000.0) / TOTAL_COUNT + + print() + print("=" * 50) + print("测试完成") + print("=" * 50) + print(f"总数量: {TOTAL_COUNT}") + print(f"成功: {success_count}") + print(f"失败: {fail_count}") + print(f"成功率: {success_rate:.2f}%") + print(f"总耗时: {duration:.2f} 秒") + print(f"平均耗时: {avg_time:.2f} 毫秒/个") + print("=" * 50) + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\n测试被用户中断") + except Exception as e: + print(f"\n\n发生错误: {e}")