From c6f7945ad28b1763a4b038e1ae7762fe404234ae Mon Sep 17 00:00:00 2001 From: MasterProgrammerDev Date: Wed, 28 Jan 2026 17:28:41 +0100 Subject: [PATCH 1/6] spazio --- Query-Engine/docker-compose.prod.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/Query-Engine/docker-compose.prod.yml b/Query-Engine/docker-compose.prod.yml index 8d56e96..6fdc6fa 100644 --- a/Query-Engine/docker-compose.prod.yml +++ b/Query-Engine/docker-compose.prod.yml @@ -65,8 +65,6 @@ services: #- .:/workspace:cached command: sleep infinity - - libretranslate: image: libretranslate/libretranslate:latest container_name: libretranslate From 34ec20c6c2ddf1e648338a799ebdceea8ba70be5 Mon Sep 17 00:00:00 2001 From: MasterProgrammerDev Date: Mon, 2 Feb 2026 10:26:56 +0100 Subject: [PATCH 2/6] inserimento e pulizia nome survey --- Query-Engine/api/graphql/resolvers.js | 286 ++++++++------- Source-Connector/api/models/Datapoint.js | 96 +++++- Source-Connector/api/services/service.js | 421 ++++++++++++----------- Source-Connector/config.template.js | 4 +- Source-Connector/package.json | 4 +- 5 files changed, 440 insertions(+), 371 deletions(-) diff --git a/Query-Engine/api/graphql/resolvers.js b/Query-Engine/api/graphql/resolvers.js index 758af9f..c66d413 100644 --- a/Query-Engine/api/graphql/resolvers.js +++ b/Query-Engine/api/graphql/resolvers.js @@ -1,15 +1,15 @@ -const Source = require('../models/Source') -const Datapoint = require('../models/Datapoint') -const util = require('util') -const { translateDataPointsBatch } = require('../services/translationService') +const Source = require("../models/Source"); +const Datapoint = require("../models/Datapoint"); +const util = require("util"); +const { translateDataPointsBatch } = require("../services/translationService"); const resolvers = { Query: { sources: async () => { - return await Source.find() + return await Source.find(); }, source: async (parent, { id }) => { - return await Source.findById(id) + return await Source.findById(id); }, /* datapoints: async (_parent, args, { db }) => { @@ -233,7 +233,7 @@ const resolvers = { // Estrai tutti gli argomenti di "controllo" che hanno una logica speciale. const { sortBy = [], - sortOrder = 'ASC', + sortOrder = "ASC", dimensions = [], exclude = [], filterBy, @@ -241,77 +241,73 @@ const resolvers = { limit, lang, ...otherFilters - } = args + } = args; - const query = { ...otherFilters } + const query = { ...otherFilters }; if (!query.survey) { - throw new Error( - 'Il parametro "survey" è obbligatorio per questa query.' - ) + throw new Error('Il parametro "survey" è obbligatorio.'); } - - // Campi da rendere case-insensitive - const caseInsensitiveFields = ['source', 'survey', 'surveyName', 'region'] - - caseInsensitiveFields.forEach(field => { - if (query[field] && typeof query[field] === 'string') { - // Converti in regex case-insensitive - query[field] = { $regex: new RegExp(`^${query[field]}$`, 'i') } + query.survey = query.survey.toUpperCase().replace(/\./g, ""); // Normalizza input utente + + const fieldsToNormalize = ["source", "surveyName", "region"]; + fieldsToNormalize.forEach((field) => { + if (query[field] && typeof query[field] === "string") { + query[field] = query[field].toUpperCase(); } - }) + }); - let dimensionKeysCache = null + let dimensionKeysCache = null; const getDimensionKeys = async () => { - if (dimensionKeysCache) return dimensionKeysCache + if (dimensionKeysCache) return dimensionKeysCache; const sampleDatapoints = await Datapoint.find({ survey: query.survey }) - .select('dimensions') + .select("dimensions") .lean() - .exec() + .exec(); dimensionKeysCache = [ ...new Set( - sampleDatapoints.flatMap(doc => { + sampleDatapoints.flatMap((doc) => { // Supporta sia array che oggetto singolo if (Array.isArray(doc.dimensions)) { - return doc.dimensions.flatMap(d => Object.keys(d)) - } else if (doc.dimensions && typeof doc.dimensions === 'object') { - return Object.keys(doc.dimensions) + return doc.dimensions.flatMap((d) => Object.keys(d)); + } else if (doc.dimensions && typeof doc.dimensions === "object") { + return Object.keys(doc.dimensions); } - return [] + return []; }) - ) - ] - return dimensionKeysCache - } + ), + ]; + return dimensionKeysCache; + }; // Costruzione query MongoDB - const andClauses = [] + const andClauses = []; - const dimensionKeys = await getDimensionKeys() + const dimensionKeys = await getDimensionKeys(); // Filtro inclusione dimensioni if (dimensions.length > 0 && dimensionKeys.length > 0) { - dimensions.forEach(value => { + dimensions.forEach((value) => { // Costruisci condizioni per entrambi i formati const arrayCondition = { dimensions: { $elemMatch: { - $or: dimensionKeys.map(k => ({ [k]: value })) - } - } - } + $or: dimensionKeys.map((k) => ({ [k]: value })), + }, + }, + }; const objectCondition = { - $or: dimensionKeys.map(k => ({ [`dimensions.${k}`]: value })) - } + $or: dimensionKeys.map((k) => ({ [`dimensions.${k}`]: value })), + }; andClauses.push({ - $or: [arrayCondition, objectCondition] - }) - }) + $or: [arrayCondition, objectCondition], + }); + }); } // Filtro esclusione dimensioni @@ -344,34 +340,34 @@ const resolvers = { */ // Filtro per dimensione specifica (filterBy index) - if (typeof filterBy === 'number' && filter.length > 0) { + if (typeof filterBy === "number" && filter.length > 0) { const sampleDoc = await Datapoint.findOne({ survey: query.survey }) - .select('dimensions') + .select("dimensions") .lean() - .exec() + .exec(); - let dimensionKey = null + let dimensionKey = null; // Gestisci sia array che oggetto if (Array.isArray(sampleDoc?.dimensions)) { - const dimensionObj = sampleDoc.dimensions[filterBy] + const dimensionObj = sampleDoc.dimensions[filterBy]; if (dimensionObj) { - dimensionKey = Object.keys(dimensionObj)[0] + dimensionKey = Object.keys(dimensionObj)[0]; } } else if ( sampleDoc?.dimensions && - typeof sampleDoc.dimensions === 'object' + typeof sampleDoc.dimensions === "object" ) { // Per oggetto singolo, usa filterBy come indice delle chiavi - const keys = Object.keys(sampleDoc.dimensions) - dimensionKey = keys[filterBy] + const keys = Object.keys(sampleDoc.dimensions); + dimensionKey = keys[filterBy]; } if (dimensionKey) { - const filterValues = filter.map(v => { - const num = Number(v) - return isNaN(num) ? v : num - }) + const filterValues = filter.map((v) => { + const num = Number(v); + return isNaN(num) ? v : num; + }); // Supporta entrambi i formati andClauses.push({ @@ -379,19 +375,19 @@ const resolvers = { { dimensions: { $elemMatch: { - [dimensionKey]: { $in: filterValues } - } - } + [dimensionKey]: { $in: filterValues }, + }, + }, }, { - [`dimensions.${dimensionKey}`]: { $in: filterValues } - } - ] - }) + [`dimensions.${dimensionKey}`]: { $in: filterValues }, + }, + ], + }); } } - if (andClauses.length > 0) query.$and = andClauses + if (andClauses.length > 0) query.$and = andClauses; // Pipeline di aggregazione const pipeline = [ @@ -399,19 +395,19 @@ const resolvers = { { $lookup: { - from: 'sources', - localField: 'source', - foreignField: 'id', - as: 'sourceData' - } + from: "sources", + localField: "source", + foreignField: "id", + as: "sourceData", + }, }, { $unwind: { - path: '$sourceData', - preserveNullAndEmptyArrays: true - } - } - ] + path: "$sourceData", + preserveNullAndEmptyArrays: true, + }, + }, + ]; // --- LOGICA EXCLUDE --- if (exclude.length > 0) { @@ -421,80 +417,80 @@ const resolvers = { _tempValuesToCheck: { $map: { // mergeObjects unifica sia se 'dimensions' è un array di oggetti, sia se è un oggetto singolo - input: { $objectToArray: { $mergeObjects: '$dimensions' } }, - as: 'dim', - in: '$$dim.v' - } - } - } + input: { $objectToArray: { $mergeObjects: "$dimensions" } }, + as: "dim", + in: "$$dim.v", + }, + }, + }, }, { $match: { _tempValuesToCheck: { - $nin: exclude // $nin esclude il documento se UNO QUALSIASI dei valori combacia - } - } + $nin: exclude, // $nin esclude il documento se UNO QUALSIASI dei valori combacia + }, + }, }, { - $unset: '_tempValuesToCheck' + $unset: "_tempValuesToCheck", } - ) + ); } // Ordinamento if (sortBy.length > 0) { - const sortByArray = Array.isArray(sortBy) ? sortBy : [sortBy] + const sortByArray = Array.isArray(sortBy) ? sortBy : [sortBy]; const sortOrderArray = Array.isArray(sortOrder) ? sortOrder - : [sortOrder] + : [sortOrder]; - const addFieldsStage = {} - const sortStage = {} + const addFieldsStage = {}; + const sortStage = {}; sortByArray.forEach((field, i) => { - const order = sortOrderArray[i]?.toUpperCase() === 'DESC' ? -1 : 1 - console.log(sortOrderArray[i]) + const order = sortOrderArray[i]?.toUpperCase() === "DESC" ? -1 : 1; + console.log(sortOrderArray[i]); if (dimensionKeys.includes(field)) { // Gestisci ordinamento per entrambi i formati addFieldsStage[`sort_${field}`] = { $cond: { - if: { $isArray: '$dimensions' }, + if: { $isArray: "$dimensions" }, then: { $first: { $map: { input: { $filter: { - input: '$dimensions', - as: 'dim', + input: "$dimensions", + as: "dim", cond: { - $gt: [{ $type: `$$dim.${field}` }, 'missing'] - } - } + $gt: [{ $type: `$$dim.${field}` }, "missing"], + }, + }, }, - as: 'dim', - in: `$$dim.${field}` - } - } + as: "dim", + in: `$$dim.${field}`, + }, + }, }, - else: `$dimensions.${field}` - } - } - sortStage[`sort_${field}`] = order + else: `$dimensions.${field}`, + }, + }; + sortStage[`sort_${field}`] = order; } else { // Campo diretto (value, timestamp, ecc.) - sortStage[field] = order + sortStage[field] = order; } - }) + }); if (Object.keys(addFieldsStage).length > 0) { - pipeline.push({ $addFields: addFieldsStage }) + pipeline.push({ $addFields: addFieldsStage }); } - pipeline.push({ $sort: sortStage }) + pipeline.push({ $sort: sortStage }); } if (limit && Number.isInteger(limit) && limit > 0) { - pipeline.push({ $limit: limit }) + pipeline.push({ $limit: limit }); } // Proiezione finale con dati arricchiti @@ -508,22 +504,22 @@ const resolvers = { region: 1, dimensions: { $cond: { - if: { $isArray: '$dimensions' }, + if: { $isArray: "$dimensions" }, then: { $map: { - input: { $objectToArray: { $mergeObjects: '$dimensions' } }, - as: 'dim', - in: '$$dim.v' - } + input: { $objectToArray: { $mergeObjects: "$dimensions" } }, + as: "dim", + in: "$$dim.v", + }, }, else: { $map: { - input: { $objectToArray: '$dimensions' }, - as: 'dim', - in: '$$dim.v' - } - } - } + input: { $objectToArray: "$dimensions" }, + as: "dim", + in: "$$dim.v", + }, + }, + }, }, aggregationPeriod: 1, value: 1, @@ -532,28 +528,28 @@ const resolvers = { references: 1, fromUrl: 1, meta: 1, - updateFrequency: 1 - } - }) + updateFrequency: 1, + }, + }); - console.log('MongoDB Query Dinamica:', JSON.stringify(pipeline, null, 2)) + console.log("MongoDB Query Dinamica:", JSON.stringify(pipeline, null, 2)); - console.log('Esecuzione query...') - const datapoints = await Datapoint.aggregate(pipeline).exec() - console.log(`Trovati datapoints.`) + console.log("Esecuzione query..."); + const datapoints = await Datapoint.aggregate(pipeline).exec(); + console.log(`Trovati datapoints.`); - if (lang && lang !== 'en') { - return await translateDataPointsBatch(datapoints, lang) + if (lang && lang !== "en") { + return await translateDataPointsBatch(datapoints, lang); } - return datapoints - } + return datapoints; + }, }, Mutation: { createSource: async (parent, { json, record, name }) => { - const newSource = new Source({ json, record, name }) - return await newSource.save() + const newSource = new Source({ json, record, name }); + return await newSource.save(); }, updateSource: async (parent, { id, json, record, name }) => { @@ -561,18 +557,18 @@ const resolvers = { id, { json, record, name }, { new: true } - ) + ); }, deleteSource: async (parent, { id }) => { try { - await Source.findByIdAndDelete(id) - return true + await Source.findByIdAndDelete(id); + return true; } catch (err) { - return false + return false; } - } - } -} + }, + }, +}; -module.exports = resolvers +module.exports = resolvers; diff --git a/Source-Connector/api/models/Datapoint.js b/Source-Connector/api/models/Datapoint.js index 919d6c3..49ed684 100644 --- a/Source-Connector/api/models/Datapoint.js +++ b/Source-Connector/api/models/Datapoint.js @@ -1,14 +1,82 @@ -const mongoose = require('mongoose'); - -const datapointSchema = new mongoose.Schema({ - source: String, - survey: String, - surveyName: String, - region: String, - fromUrl: String, - timestamp: String, - dimensions: Object, - value: Number -}, {strict: false}); - -module.exports = mongoose.model('Datapoint', datapointSchema); \ No newline at end of file +const mongoose = require("mongoose"); +const crypto = require("crypto"); + +// Funzione di utilità per pulire il nome della survey +const cleanSurveyName = (val) => { + if (!val || typeof val !== "string") return val; + return val.toUpperCase().replace(/\./g, ""); // Toglie i punti e mette in MAIUSCOLO +}; + +const datapointSchema = new mongoose.Schema( + { + source: { type: String, index: true, uppercase: true }, // uppercase: true lo fa in automatico + survey: { type: String, index: true, uppercase: true }, + surveyName: String, + region: { type: String, index: true, uppercase: true }, + fromUrl: String, + timestamp: { type: String, index: true }, + dimensions: { type: Object }, + value: Number, + d_hash: { type: String, unique: true }, // Identificatore unico per evitare duplicati + }, + { + strict: false, + timestamps: true, + } +); + +// Indici per le performance +datapointSchema.index({ "dimensions.$**": 1 }); +datapointSchema.index({ region: 1, timestamp: -1 }); + +// Middleware: Prima di ogni salvataggio, pulisce la survey +datapointSchema.pre("save", function (next) { + if (this.survey) this.survey = cleanSurveyName(this.survey); + next(); +}); + +// Funzione per generare l'hash unico (usata nell'inserimento massivo) +const generateHash = (doc) => { + const dims = doc.dimensions || {}; + const sortedKeys = Object.keys(dims) + .sort() + .reduce((acc, key) => { + acc[key] = dims[key]; + return acc; + }, {}); + + // Usiamo la survey pulita per l'hash + const s = cleanSurveyName(doc.survey); + const stringToHash = + (doc.fromUrl || "") + + (doc.timestamp || "") + + s + + JSON.stringify(sortedKeys); + return crypto.createHash("md5").update(stringToHash).digest("hex"); +}; + +// Metodo per inserire milioni di record senza duplicati +datapointSchema.statics.upsertMany = async function (datapoints) { + if (!datapoints || datapoints.length === 0) return; + + const operations = datapoints.map((doc) => { + const hash = generateHash(doc); + // Puliamo la survey anche qui per l'operazione di bulk + const cleanedDoc = { + ...doc, + survey: cleanSurveyName(doc.survey), + d_hash: hash, + }; + return { + updateOne: { + filter: { d_hash: hash }, + update: { $set: cleanedDoc }, + upsert: true, + }, + }; + }); + + return this.bulkWrite(operations, { ordered: false }); +}; + +module.exports = mongoose.model("Datapoint", datapointSchema); diff --git a/Source-Connector/api/services/service.js b/Source-Connector/api/services/service.js index 5ca730f..13d310c 100644 --- a/Source-Connector/api/services/service.js +++ b/Source-Connector/api/services/service.js @@ -1,219 +1,224 @@ -const logger = require('percocologger') -const log = logger.info -const Datapoints = require("../models/Datapoint") -const config = require('../../config') -const minioWriter = require("../../inputConnectors/minioConnector") -const axios = require('axios') -const fs = require('fs') -const { updateJWT } = require('../../utils/keycloak') -let bearerToken -updateJWT().then(token => { - bearerToken = token - logger.info("Initial Keycloak token obtained") -}).catch(error => logger.error(error.response?.data || error)); - -const path = require('path'); -let attrWithUrl = config.orion?.attrWithUrl || "datasetUrl" -require('../../inputConnectors/apiConnector') +const logger = require("percocologger"); +const log = logger.info; +const Datapoints = require("../models/Datapoint"); +const config = require("../../config"); +const minioWriter = require("../../inputConnectors/minioConnector"); +const axios = require("axios"); +const fs = require("fs"); +const { updateJWT } = require("../../utils/keycloak"); +let bearerToken; +updateJWT() + .then((token) => { + bearerToken = token; + logger.info("Initial Keycloak token obtained"); + }) + .catch((error) => logger.error(error.response?.data || error)); + +const path = require("path"); +let attrWithUrl = config.orion?.attrWithUrl || "datasetUrl"; +require("../../inputConnectors/apiConnector"); async function insertResponseInDB(size) { - - let stats - let sizeRead = 0 - logger.info(`Attesa del file di stream di dimensione minima: ${size} bytes`); - //let counter = 0 - while (!stats?.size || stats?.size < 50000000) { - stats = fs.statSync(config.streamPath || "/app/shared-data/stream.json"); - sizeRead = (stats.size / 1024) / 1024 - logger.info(`Dimensione del file di stream: ${sizeRead} Megabyte`); - /*counter++ + let stats; + let sizeRead = 0; + logger.info(`Attesa del file di stream di dimensione minima: ${size} bytes`); + //let counter = 0 + while (!stats?.size || stats?.size < 50000000) { + stats = fs.statSync(config.streamPath || "/app/shared-data/stream.json"); + sizeRead = stats.size / 1024 / 1024; + logger.info(`Dimensione del file di stream: ${sizeRead} Megabyte`); + /*counter++ if (counter > 10000000) { logger.error("File di stream non trovato dopo numerosi tentativi, esco dalla funzione di inserimento."); return; }*/ - } - - - const stream2 = fs.createReadStream(config.nameStream || "/app/shared-data/stream.json", { encoding: "utf-8" }); - let buffer = ""; - let depth = 0; // conta le parentesi graffe - let inObject = false; - - logger.debug("Inizio inserimento datapoints nel database..."); - let tempArray = []; - for await (const chunk of stream2) { - //logger.debug("Lettura chunk di dati..."); - for (const char of chunk) { - //logger.debug(`Elaborazione carattere: ${char}`); - if (char === "{") { - //logger.debug("Inizio di un nuovo oggetto JSON rilevato."); - if (!inObject) inObject = true; - depth++; - } - - //logger.debug(`Profondità attuale delle parentesi graffe: ${depth}`); - if (inObject) buffer += char; - - //logger.debug(`Buffer attuale: ${buffer}`); - if (char === "}") { - //logger.debug("Fine di un oggetto JSON rilevata."); - depth--; - if (depth === 0 && inObject) { - //logger.debug("Oggetto JSON completo rilevato, procedo con l'inserimento nel database."); - // oggetto completo - const obj = JSON.parse(buffer); - //logger.debug(`Oggetto JSON da inserire: ${JSON.stringify(obj)}`); - //const DatapointModel = await Datapoints.getDatapointModel(); - //const datapoint = new DatapointModel(obj); - tempArray.push(obj); - if (tempArray.length >= config.batch) { - await Datapoints.insertMany(tempArray); - tempArray = []; - logger.debug(config.batch + " Datapoints salvati nel database."); - } - //await Datapoints.insertMany([obj]); - //logger.debug("Datapoint salvato nel database."); - buffer = ""; - inObject = false; - } - } + } + + const stream2 = fs.createReadStream( + config.nameStream || "/app/shared-data/stream.json", + { encoding: "utf-8" } + ); + let buffer = ""; + let depth = 0; // conta le parentesi graffe + let inObject = false; + + logger.debug("Inizio inserimento datapoints nel database..."); + let tempArray = []; + for await (const chunk of stream2) { + //logger.debug("Lettura chunk di dati..."); + for (const char of chunk) { + //logger.debug(`Elaborazione carattere: ${char}`); + if (char === "{") { + //logger.debug("Inizio di un nuovo oggetto JSON rilevato."); + if (!inObject) inObject = true; + depth++; + } + + //logger.debug(`Profondità attuale delle parentesi graffe: ${depth}`); + if (inObject) buffer += char; + + //logger.debug(`Buffer attuale: ${buffer}`); + if (char === "}") { + //logger.debug("Fine di un oggetto JSON rilevata."); + depth--; + if (depth === 0 && inObject) { + + const obj = JSON.parse(buffer); + obj.fromUrl = urlValue; // Importante per l'hash + + tempArray.push(obj); + + if (tempArray.length >= config.batch) { + await Datapoints.upsertMany(tempArray); + + tempArray = []; + logger.debug(`${config.batch} record elaborati.`); + } + + buffer = ""; + inObject = false; } + } } - try { - fs.unlinkSync(config.nameStream || "/app/shared-data/stream.json"); - logger.info('File cancellato con successo!'); - } catch (err) { - logger.error('Errore durante la cancellazione:', err); - } + } + try { + fs.unlinkSync(config.nameStream || "/app/shared-data/stream.json"); + logger.info("File cancellato con successo!"); + } catch (err) { + logger.error("Errore durante la cancellazione:", err); + } } module.exports = { - - notifyPath: async (req, res) => { - - logger.info({ body: JSON.stringify(req.body) }) - - const data = req.body.data || req.body.value || req.body; - const entities = Array.isArray(data) ? data : [data]; - - for (const ent of entities) { - const id = ent.id || ent['@id'] || 'unknown-id'; - let urlValue; - if (ent[attrWithUrl] && typeof ent[attrWithUrl] === 'object' && 'value' in ent[attrWithUrl]) { - urlValue = ent[attrWithUrl].value; - } else if (ent[attrWithUrl]) { - urlValue = ent[attrWithUrl]; - } else if (ent[attrWithUrl + ':value']) { - urlValue = ent[attrWithUrl + ':value']; - } else if (ent.value) { - urlValue = ent.value; - } - - if (!urlValue || typeof urlValue !== 'string') { - console.warn(`no URL found for entity ${id}`); - continue; - } - - let mapID = req.query.mapID || req.params.mapID || ent.mapID || config.mapID - - if (!mapID) { - const response = await axios.get(urlValue); - if (response?.data?.data?.datapoints) - await Datapoints.insertMany(response.data.data.datapoints) - else - await minioWriter.insertInDBs(response.data, { - name: id + '-' + path.basename((new URL(urlValue)).pathname), - lastModified: new Date(), - versionId: 'null', - isDeleteMarker: false, - bucketName: 'orion-notify', - size: response.data.length, - isLatest: true, - etag: '', - insertedBy: 'orion-notify' - }); - } - else { - let response - let retry = 2 - while (retry > 0) - try { - response = await axios.post( - config.mapEndpoint, - { - "sourceDataType": "json", - "sourceDataURL": urlValue, - "decodeOptions": { - "decodeFrom": "json-stat" - }, - "config": { - "NGSI_entity": false, - "ignoreValidation": true, - "writers": [], - "disableAjv": true, - "mappingReport": true - }, - "dataModel": { - "$schema": "http://json-schema.org/schema#", - "$id": "dataModels/DataModelTemp.json", - "title": "DataModelTemp", - "description": "Bike Hire Docking Station", - "type": "object", - "properties": { - "region": { - "type": "string" - }, - "source": { - "type": "string" - }, - "timestamp": { - "type": "string" - }, - "survey": { - "type": "string" - }, - "dimensions": { - "type": "object" - }, - "value": { - "type": "integer" - } - } - } - }/* + notifyPath: async (req, res) => { + logger.info({ body: JSON.stringify(req.body) }); + + const data = req.body.data || req.body.value || req.body; + const entities = Array.isArray(data) ? data : [data]; + + for (const ent of entities) { + const id = ent.id || ent["@id"] || "unknown-id"; + let urlValue; + if ( + ent[attrWithUrl] && + typeof ent[attrWithUrl] === "object" && + "value" in ent[attrWithUrl] + ) { + urlValue = ent[attrWithUrl].value; + } else if (ent[attrWithUrl]) { + urlValue = ent[attrWithUrl]; + } else if (ent[attrWithUrl + ":value"]) { + urlValue = ent[attrWithUrl + ":value"]; + } else if (ent.value) { + urlValue = ent.value; + } + + if (!urlValue || typeof urlValue !== "string") { + console.warn(`no URL found for entity ${id}`); + continue; + } + + let mapID = + req.query.mapID || req.params.mapID || ent.mapID || config.mapID; + + if (!mapID) { + const response = await axios.get(urlValue); + if (response?.data?.data?.datapoints) + await Datapoints.insertMany(response.data.data.datapoints); + else + await minioWriter.insertInDBs(response.data, { + name: id + "-" + path.basename(new URL(urlValue).pathname), + lastModified: new Date(), + versionId: "null", + isDeleteMarker: false, + bucketName: "orion-notify", + size: response.data.length, + isLatest: true, + etag: "", + insertedBy: "orion-notify", + }); + } else { + let response; + let retry = 2; + while (retry > 0) + try { + response = await axios.post( + config.mapEndpoint, + { + sourceDataType: "json", + sourceDataURL: urlValue, + decodeOptions: { + decodeFrom: "json-stat", + }, + config: { + NGSI_entity: false, + ignoreValidation: true, + writers: [], + disableAjv: true, + mappingReport: true, + }, + dataModel: { + $schema: "http://json-schema.org/schema#", + $id: "dataModels/DataModelTemp.json", + title: "DataModelTemp", + description: "Bike Hire Docking Station", + type: "object", + properties: { + region: { + type: "string", + }, + source: { + type: "string", + }, + timestamp: { + type: "string", + }, + survey: { + type: "string", + }, + dimensions: { + type: "object", + }, + value: { + type: "integer", + }, + }, + }, + } /* { //mapID, sourceDataURL: urlValue }*/, - { - headers: { - "Authorization": `Bearer ${bearerToken}` - } - }); - retry -= 2 - try { - logger.info("Inserting datapoints into DB..."); - logger.info(response.data[0], " MB") - await insertResponseInDB(response.data[0])//.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion - } - catch (error) { - logger.error("Error inserting datapoints:", error) - } - } - catch (error) { - logger.error("Error fetching mapped data from API Connector:", error.response?.data || error.message); - try { - bearerToken = await updateJWT(true); - retry-- - } catch (e) { - logger.error("Error updating JWT:", e); - retry-- - } - } - //logger.info(response.data.lenght) - - - /*for (let i in response.data) + { + headers: { + Authorization: `Bearer ${bearerToken}`, + }, + } + ); + retry -= 2; + try { + logger.info("Inserting datapoints into DB..."); + logger.info(response.data[0], " MB"); + await insertResponseInDB(response.data[0]); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion + } catch (error) { + logger.error("Error inserting datapoints:", error); + } + } catch (error) { + logger.error( + "Error fetching mapped data from API Connector:", + error.response?.data || error.message + ); + try { + bearerToken = await updateJWT(true); + retry--; + } catch (e) { + logger.error("Error updating JWT:", e); + retry--; + } + } + //logger.info(response.data.lenght) + + /*for (let i in response.data) await minioWriter.insertInDBs(response.data[i], { name: response.data[i].id || mapID + '-' + path.basename((new URL(urlValue)).pathname) + i, lastModified: new Date(), @@ -225,11 +230,11 @@ module.exports = { etag: '', insertedBy: 'orion-notify' });*/ - } - logger.info(`downloaded ${urlValue}`); - } - return 'OK'; - }, + } + logger.info(`downloaded ${urlValue}`); + } + return "OK"; + }, - sync: minioWriter.sync -} \ No newline at end of file + sync: minioWriter.sync, +}; diff --git a/Source-Connector/config.template.js b/Source-Connector/config.template.js index 9497b5e..28de819 100644 --- a/Source-Connector/config.template.js +++ b/Source-Connector/config.template.js @@ -30,7 +30,7 @@ module.exports = { deleteAllDuplicateSubscriptions: true, attrWithUrl: "datasetUrl", orionBaseUrl: "http://localhost:1027", - notificationUrl: "http://host.docker.internal:3000/api/orion/subscribe", + notificationUrl: "http://localhost:3000/api/orion/subscribe", fiwareService: "", fiwareServicePath: "" }, @@ -43,7 +43,7 @@ module.exports = { port: 3000, updateOwner: "later", writeLogsOnFile: true, - mongo: "mongodb://localhost:27017/Minio-Mongo", // mongo url + mongo: "mongodb://localhost:22000/Minio-Mongo", // mongo url authConfig: { idmHost: "https://platform.beopendep.it/auth", clientId: "", diff --git a/Source-Connector/package.json b/Source-Connector/package.json index 7f5ae75..a429007 100644 --- a/Source-Connector/package.json +++ b/Source-Connector/package.json @@ -22,12 +22,12 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", - "jsonwebtoken": "^9.0.2", + "ioredis": "^5.3.2", + "jsonwebtoken": "^9.0.3", "minio": "^8.0.0", "mongoose": "^8.3.4", "nodemon": "^3.1.0", "percocologger": "^1.0.7", - "ioredis": "^5.3.2", "pg": "^8.11.5" }, "stashDependencies": { From 755d3d951de9bf940d9ebdbc1bc12faf92d67dd9 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Wed, 4 Feb 2026 15:10:04 +0100 Subject: [PATCH 3/6] streaming from dmm enabled --- Source-Connector/api/services/service.js | 22 +++++++++++++++++++--- Source-Connector/utils/keycloak.js | 2 +- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/Source-Connector/api/services/service.js b/Source-Connector/api/services/service.js index 13d310c..47e0235 100644 --- a/Source-Connector/api/services/service.js +++ b/Source-Connector/api/services/service.js @@ -18,7 +18,7 @@ const path = require("path"); let attrWithUrl = config.orion?.attrWithUrl || "datasetUrl"; require("../../inputConnectors/apiConnector"); -async function insertResponseInDB(size) { +async function insertResponseInDB(size, urlValue) { let stats; let sizeRead = 0; logger.info(`Attesa del file di stream di dimensione minima: ${size} bytes`); @@ -198,8 +198,24 @@ module.exports = { retry -= 2; try { logger.info("Inserting datapoints into DB..."); - logger.info(response.data[0], " MB"); - await insertResponseInDB(response.data[0]); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion + logger.info(response.data); + let outputId = response.data.id + let lastId + let purged = false + for (let chunkIndex = 0; (response.data[0] || response.data.id); chunkIndex++) { + //while (response.data[0] || response.data.id) { + logger.info(response.data.status) + response = await axios.get((config.sessionEdnpoint || "http://localhost:5500/api/session?") + "id=" + outputId + "&lastId=" + lastId + "&index=" + chunkIndex, { + headers: { + Authorization: `Bearer ${bearerToken}` + } + }) + if (!purged) + await Datapoints.deleteMany({survey: response.data[0].survey}); + await Datapoints.insertMany(response.data); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion + lastId = response.data[response.data.length - 1]?._id + purged = true; + } } catch (error) { logger.error("Error inserting datapoints:", error); } diff --git a/Source-Connector/utils/keycloak.js b/Source-Connector/utils/keycloak.js index e168309..efd80a6 100644 --- a/Source-Connector/utils/keycloak.js +++ b/Source-Connector/utils/keycloak.js @@ -1,6 +1,6 @@ const axios = require("axios"); const config = require("../config.js") -const keycloakBaseUrl = config.authConfig.idmHost + "/realms/smartera"; +const keycloakBaseUrl = config.authConfig.idmHost + "/realms/" + (config.authConfig.authRealm || "master") const { clientId, username, password } = config.authConfig; const fs = require('fs'); const path = './token.js'; From 6b3386d2da200e31ef1c6936d2ea248878495b10 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Thu, 5 Feb 2026 10:02:16 +0100 Subject: [PATCH 4/6] fromUrl reinserted --- Source-Connector/api/services/service.js | 80 +++--------------------- 1 file changed, 8 insertions(+), 72 deletions(-) diff --git a/Source-Connector/api/services/service.js b/Source-Connector/api/services/service.js index 47e0235..e45203f 100644 --- a/Source-Connector/api/services/service.js +++ b/Source-Connector/api/services/service.js @@ -18,77 +18,6 @@ const path = require("path"); let attrWithUrl = config.orion?.attrWithUrl || "datasetUrl"; require("../../inputConnectors/apiConnector"); -async function insertResponseInDB(size, urlValue) { - let stats; - let sizeRead = 0; - logger.info(`Attesa del file di stream di dimensione minima: ${size} bytes`); - //let counter = 0 - while (!stats?.size || stats?.size < 50000000) { - stats = fs.statSync(config.streamPath || "/app/shared-data/stream.json"); - sizeRead = stats.size / 1024 / 1024; - logger.info(`Dimensione del file di stream: ${sizeRead} Megabyte`); - /*counter++ - if (counter > 10000000) { - logger.error("File di stream non trovato dopo numerosi tentativi, esco dalla funzione di inserimento."); - return; - }*/ - } - - const stream2 = fs.createReadStream( - config.nameStream || "/app/shared-data/stream.json", - { encoding: "utf-8" } - ); - let buffer = ""; - let depth = 0; // conta le parentesi graffe - let inObject = false; - - logger.debug("Inizio inserimento datapoints nel database..."); - let tempArray = []; - for await (const chunk of stream2) { - //logger.debug("Lettura chunk di dati..."); - for (const char of chunk) { - //logger.debug(`Elaborazione carattere: ${char}`); - if (char === "{") { - //logger.debug("Inizio di un nuovo oggetto JSON rilevato."); - if (!inObject) inObject = true; - depth++; - } - - //logger.debug(`Profondità attuale delle parentesi graffe: ${depth}`); - if (inObject) buffer += char; - - //logger.debug(`Buffer attuale: ${buffer}`); - if (char === "}") { - //logger.debug("Fine di un oggetto JSON rilevata."); - depth--; - if (depth === 0 && inObject) { - - const obj = JSON.parse(buffer); - obj.fromUrl = urlValue; // Importante per l'hash - - tempArray.push(obj); - - if (tempArray.length >= config.batch) { - await Datapoints.upsertMany(tempArray); - - tempArray = []; - logger.debug(`${config.batch} record elaborati.`); - } - - buffer = ""; - inObject = false; - } - } - } - } - try { - fs.unlinkSync(config.nameStream || "/app/shared-data/stream.json"); - logger.info("File cancellato con successo!"); - } catch (err) { - logger.error("Errore durante la cancellazione:", err); - } -} - module.exports = { notifyPath: async (req, res) => { logger.info({ body: JSON.stringify(req.body) }); @@ -212,7 +141,14 @@ module.exports = { }) if (!purged) await Datapoints.deleteMany({survey: response.data[0].survey}); - await Datapoints.insertMany(response.data); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion + await Datapoints.insertMany(response.data.map( + d => { + return { + ...d, + fromUrl : urlValue + } + } + )); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion lastId = response.data[response.data.length - 1]?._id purged = true; } From bab26ac3551e867905fcb24a82d28b2121c43895 Mon Sep 17 00:00:00 2001 From: MasterProgrammerDev Date: Tue, 10 Feb 2026 15:21:41 +0100 Subject: [PATCH 5/6] indicizzazione e gestione salvataggio in upsertMany --- Query-Engine/package.json | 5 +- Source-Connector/api/models/Datapoint.js | 28 +++++++--- Source-Connector/api/services/service.js | 67 ++++++++++++++++-------- 3 files changed, 69 insertions(+), 31 deletions(-) diff --git a/Query-Engine/package.json b/Query-Engine/package.json index 7f5ae75..a07c576 100644 --- a/Query-Engine/package.json +++ b/Query-Engine/package.json @@ -22,12 +22,13 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", - "jsonwebtoken": "^9.0.2", + "ioredis": "^5.3.2", + "jsonwebtoken": "^9.0.3", + "jwa": "^2.0.1", "minio": "^8.0.0", "mongoose": "^8.3.4", "nodemon": "^3.1.0", "percocologger": "^1.0.7", - "ioredis": "^5.3.2", "pg": "^8.11.5" }, "stashDependencies": { diff --git a/Source-Connector/api/models/Datapoint.js b/Source-Connector/api/models/Datapoint.js index 49ed684..52165b1 100644 --- a/Source-Connector/api/models/Datapoint.js +++ b/Source-Connector/api/models/Datapoint.js @@ -17,7 +17,7 @@ const datapointSchema = new mongoose.Schema( timestamp: { type: String, index: true }, dimensions: { type: Object }, value: Number, - d_hash: { type: String, unique: true }, // Identificatore unico per evitare duplicati + dupl_hash: { type: String, unique: true }, // Identificatore unico per evitare duplicati }, { strict: false, @@ -37,7 +37,10 @@ datapointSchema.pre("save", function (next) { // Funzione per generare l'hash unico (usata nell'inserimento massivo) const generateHash = (doc) => { - const dims = doc.dimensions || {}; + // Gestiamo sia se arriva come documento Mongoose sia come oggetto puro + const target = doc._doc || doc; + + const dims = target.dimensions || {}; const sortedKeys = Object.keys(dims) .sort() .reduce((acc, key) => { @@ -45,13 +48,22 @@ const generateHash = (doc) => { return acc; }, {}); - // Usiamo la survey pulita per l'hash - const s = cleanSurveyName(doc.survey); + const s = cleanSurveyName(target.survey); + + // COSTRUZIONE STRINGA DI HASH const stringToHash = - (doc.fromUrl || "") + - (doc.timestamp || "") + + (target.fromUrl || "") + + "|" + + (target.timestamp || "") + + "|" + + (target.region || "") + + "|" + s + + "|" + + (target.value !== undefined ? target.value : Date.now()) + + "|" + JSON.stringify(sortedKeys); + return crypto.createHash("md5").update(stringToHash).digest("hex"); }; @@ -65,11 +77,11 @@ datapointSchema.statics.upsertMany = async function (datapoints) { const cleanedDoc = { ...doc, survey: cleanSurveyName(doc.survey), - d_hash: hash, + dupl_hash: hash, }; return { updateOne: { - filter: { d_hash: hash }, + filter: { dupl_hash: hash }, update: { $set: cleanedDoc }, upsert: true, }, diff --git a/Source-Connector/api/services/service.js b/Source-Connector/api/services/service.js index e45203f..d9cee33 100644 --- a/Source-Connector/api/services/service.js +++ b/Source-Connector/api/services/service.js @@ -128,28 +128,53 @@ module.exports = { try { logger.info("Inserting datapoints into DB..."); logger.info(response.data); - let outputId = response.data.id - let lastId - let purged = false - for (let chunkIndex = 0; (response.data[0] || response.data.id); chunkIndex++) { - //while (response.data[0] || response.data.id) { - logger.info(response.data.status) - response = await axios.get((config.sessionEdnpoint || "http://localhost:5500/api/session?") + "id=" + outputId + "&lastId=" + lastId + "&index=" + chunkIndex, { - headers: { - Authorization: `Bearer ${bearerToken}` - } - }) - if (!purged) - await Datapoints.deleteMany({survey: response.data[0].survey}); - await Datapoints.insertMany(response.data.map( - d => { - return { - ...d, - fromUrl : urlValue - } + let outputId = response.data.id; + let lastId; + let purged = false; + + // Loop per gestire i chunk + for ( + let chunkIndex = 0; + response.data[0] || response.data.id; + chunkIndex++ + ) { + logger.info(response.data.status); + + response = await axios.get( + (config.sessionEdnpoint || + "http://localhost:5500/api/session?") + + "id=" + + outputId + + "&lastId=" + + lastId + + "&index=" + + chunkIndex, + { + headers: { + Authorization: `Bearer ${bearerToken}`, + }, } - )); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion - lastId = response.data[response.data.length - 1]?._id + ); + + // Cancellazione preliminare + if (!purged && response.data.length > 0) { + await Datapoints.deleteMany({ + survey: response.data[0].survey, + }); + } + + // Preparazione dei dati + const dataToInsert = response.data.map((d) => { + return { + ...d, + fromUrl: urlValue, + }; + }); + + await Datapoints.upsertMany(dataToInsert); + + // Gestione indici per il prossimo loop + lastId = response.data[response.data.length - 1]?._id; purged = true; } } catch (error) { From fbcf7594084284023d2ce8c8ce25f8044f31caf8 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Tue, 10 Feb 2026 15:34:17 +0100 Subject: [PATCH 6/6] source connector saves directly datapoints --- Source-Connector/api/services/service.js | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/Source-Connector/api/services/service.js b/Source-Connector/api/services/service.js index e45203f..7410997 100644 --- a/Source-Connector/api/services/service.js +++ b/Source-Connector/api/services/service.js @@ -128,24 +128,29 @@ module.exports = { try { logger.info("Inserting datapoints into DB..."); logger.info(response.data); - let outputId = response.data.id + let outputId = response.data[response.data.length - 1].MAPPING_REPORT.outputId let lastId let purged = false for (let chunkIndex = 0; (response.data[0] || response.data.id); chunkIndex++) { //while (response.data[0] || response.data.id) { logger.info(response.data.status) - response = await axios.get((config.sessionEdnpoint || "http://localhost:5500/api/session?") + "id=" + outputId + "&lastId=" + lastId + "&index=" + chunkIndex, { + response = await axios.get((config.sessionEdnpoint || "http://localhost:5500/api/output?") + "id=" + outputId + "&lastId=" + lastId + "&index=" + chunkIndex, { headers: { Authorization: `Bearer ${bearerToken}` } }) if (!purged) - await Datapoints.deleteMany({survey: response.data[0].survey}); + try { + await Datapoints.deleteMany({ survey: response.data[0].survey }); + } catch (e) { + logger.error("Error purging old datapoints:", e); + logger.error(response.data) + } await Datapoints.insertMany(response.data.map( d => { return { ...d, - fromUrl : urlValue + fromUrl: urlValue } } )); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion