diff --git a/Query-Engine/api/graphql/resolvers.js b/Query-Engine/api/graphql/resolvers.js index 758af9f..c66d413 100644 --- a/Query-Engine/api/graphql/resolvers.js +++ b/Query-Engine/api/graphql/resolvers.js @@ -1,15 +1,15 @@ -const Source = require('../models/Source') -const Datapoint = require('../models/Datapoint') -const util = require('util') -const { translateDataPointsBatch } = require('../services/translationService') +const Source = require("../models/Source"); +const Datapoint = require("../models/Datapoint"); +const util = require("util"); +const { translateDataPointsBatch } = require("../services/translationService"); const resolvers = { Query: { sources: async () => { - return await Source.find() + return await Source.find(); }, source: async (parent, { id }) => { - return await Source.findById(id) + return await Source.findById(id); }, /* datapoints: async (_parent, args, { db }) => { @@ -233,7 +233,7 @@ const resolvers = { // Estrai tutti gli argomenti di "controllo" che hanno una logica speciale. const { sortBy = [], - sortOrder = 'ASC', + sortOrder = "ASC", dimensions = [], exclude = [], filterBy, @@ -241,77 +241,73 @@ const resolvers = { limit, lang, ...otherFilters - } = args + } = args; - const query = { ...otherFilters } + const query = { ...otherFilters }; if (!query.survey) { - throw new Error( - 'Il parametro "survey" è obbligatorio per questa query.' - ) + throw new Error('Il parametro "survey" è obbligatorio.'); } - - // Campi da rendere case-insensitive - const caseInsensitiveFields = ['source', 'survey', 'surveyName', 'region'] - - caseInsensitiveFields.forEach(field => { - if (query[field] && typeof query[field] === 'string') { - // Converti in regex case-insensitive - query[field] = { $regex: new RegExp(`^${query[field]}$`, 'i') } + query.survey = query.survey.toUpperCase().replace(/\./g, ""); // Normalizza input utente + + const fieldsToNormalize = ["source", "surveyName", "region"]; + fieldsToNormalize.forEach((field) => { + if (query[field] && typeof query[field] === "string") { + query[field] = query[field].toUpperCase(); } - }) + }); - let dimensionKeysCache = null + let dimensionKeysCache = null; const getDimensionKeys = async () => { - if (dimensionKeysCache) return dimensionKeysCache + if (dimensionKeysCache) return dimensionKeysCache; const sampleDatapoints = await Datapoint.find({ survey: query.survey }) - .select('dimensions') + .select("dimensions") .lean() - .exec() + .exec(); dimensionKeysCache = [ ...new Set( - sampleDatapoints.flatMap(doc => { + sampleDatapoints.flatMap((doc) => { // Supporta sia array che oggetto singolo if (Array.isArray(doc.dimensions)) { - return doc.dimensions.flatMap(d => Object.keys(d)) - } else if (doc.dimensions && typeof doc.dimensions === 'object') { - return Object.keys(doc.dimensions) + return doc.dimensions.flatMap((d) => Object.keys(d)); + } else if (doc.dimensions && typeof doc.dimensions === "object") { + return Object.keys(doc.dimensions); } - return [] + return []; }) - ) - ] - return dimensionKeysCache - } + ), + ]; + return dimensionKeysCache; + }; // Costruzione query MongoDB - const andClauses = [] + const andClauses = []; - const dimensionKeys = await getDimensionKeys() + const dimensionKeys = await getDimensionKeys(); // Filtro inclusione dimensioni if (dimensions.length > 0 && dimensionKeys.length > 0) { - dimensions.forEach(value => { + dimensions.forEach((value) => { // Costruisci condizioni per entrambi i formati const arrayCondition = { dimensions: { $elemMatch: { - $or: dimensionKeys.map(k => ({ [k]: value })) - } - } - } + $or: dimensionKeys.map((k) => ({ [k]: value })), + }, + }, + }; const objectCondition = { - $or: dimensionKeys.map(k => ({ [`dimensions.${k}`]: value })) - } + $or: dimensionKeys.map((k) => ({ [`dimensions.${k}`]: value })), + }; andClauses.push({ - $or: [arrayCondition, objectCondition] - }) - }) + $or: [arrayCondition, objectCondition], + }); + }); } // Filtro esclusione dimensioni @@ -344,34 +340,34 @@ const resolvers = { */ // Filtro per dimensione specifica (filterBy index) - if (typeof filterBy === 'number' && filter.length > 0) { + if (typeof filterBy === "number" && filter.length > 0) { const sampleDoc = await Datapoint.findOne({ survey: query.survey }) - .select('dimensions') + .select("dimensions") .lean() - .exec() + .exec(); - let dimensionKey = null + let dimensionKey = null; // Gestisci sia array che oggetto if (Array.isArray(sampleDoc?.dimensions)) { - const dimensionObj = sampleDoc.dimensions[filterBy] + const dimensionObj = sampleDoc.dimensions[filterBy]; if (dimensionObj) { - dimensionKey = Object.keys(dimensionObj)[0] + dimensionKey = Object.keys(dimensionObj)[0]; } } else if ( sampleDoc?.dimensions && - typeof sampleDoc.dimensions === 'object' + typeof sampleDoc.dimensions === "object" ) { // Per oggetto singolo, usa filterBy come indice delle chiavi - const keys = Object.keys(sampleDoc.dimensions) - dimensionKey = keys[filterBy] + const keys = Object.keys(sampleDoc.dimensions); + dimensionKey = keys[filterBy]; } if (dimensionKey) { - const filterValues = filter.map(v => { - const num = Number(v) - return isNaN(num) ? v : num - }) + const filterValues = filter.map((v) => { + const num = Number(v); + return isNaN(num) ? v : num; + }); // Supporta entrambi i formati andClauses.push({ @@ -379,19 +375,19 @@ const resolvers = { { dimensions: { $elemMatch: { - [dimensionKey]: { $in: filterValues } - } - } + [dimensionKey]: { $in: filterValues }, + }, + }, }, { - [`dimensions.${dimensionKey}`]: { $in: filterValues } - } - ] - }) + [`dimensions.${dimensionKey}`]: { $in: filterValues }, + }, + ], + }); } } - if (andClauses.length > 0) query.$and = andClauses + if (andClauses.length > 0) query.$and = andClauses; // Pipeline di aggregazione const pipeline = [ @@ -399,19 +395,19 @@ const resolvers = { { $lookup: { - from: 'sources', - localField: 'source', - foreignField: 'id', - as: 'sourceData' - } + from: "sources", + localField: "source", + foreignField: "id", + as: "sourceData", + }, }, { $unwind: { - path: '$sourceData', - preserveNullAndEmptyArrays: true - } - } - ] + path: "$sourceData", + preserveNullAndEmptyArrays: true, + }, + }, + ]; // --- LOGICA EXCLUDE --- if (exclude.length > 0) { @@ -421,80 +417,80 @@ const resolvers = { _tempValuesToCheck: { $map: { // mergeObjects unifica sia se 'dimensions' è un array di oggetti, sia se è un oggetto singolo - input: { $objectToArray: { $mergeObjects: '$dimensions' } }, - as: 'dim', - in: '$$dim.v' - } - } - } + input: { $objectToArray: { $mergeObjects: "$dimensions" } }, + as: "dim", + in: "$$dim.v", + }, + }, + }, }, { $match: { _tempValuesToCheck: { - $nin: exclude // $nin esclude il documento se UNO QUALSIASI dei valori combacia - } - } + $nin: exclude, // $nin esclude il documento se UNO QUALSIASI dei valori combacia + }, + }, }, { - $unset: '_tempValuesToCheck' + $unset: "_tempValuesToCheck", } - ) + ); } // Ordinamento if (sortBy.length > 0) { - const sortByArray = Array.isArray(sortBy) ? sortBy : [sortBy] + const sortByArray = Array.isArray(sortBy) ? sortBy : [sortBy]; const sortOrderArray = Array.isArray(sortOrder) ? sortOrder - : [sortOrder] + : [sortOrder]; - const addFieldsStage = {} - const sortStage = {} + const addFieldsStage = {}; + const sortStage = {}; sortByArray.forEach((field, i) => { - const order = sortOrderArray[i]?.toUpperCase() === 'DESC' ? -1 : 1 - console.log(sortOrderArray[i]) + const order = sortOrderArray[i]?.toUpperCase() === "DESC" ? -1 : 1; + console.log(sortOrderArray[i]); if (dimensionKeys.includes(field)) { // Gestisci ordinamento per entrambi i formati addFieldsStage[`sort_${field}`] = { $cond: { - if: { $isArray: '$dimensions' }, + if: { $isArray: "$dimensions" }, then: { $first: { $map: { input: { $filter: { - input: '$dimensions', - as: 'dim', + input: "$dimensions", + as: "dim", cond: { - $gt: [{ $type: `$$dim.${field}` }, 'missing'] - } - } + $gt: [{ $type: `$$dim.${field}` }, "missing"], + }, + }, }, - as: 'dim', - in: `$$dim.${field}` - } - } + as: "dim", + in: `$$dim.${field}`, + }, + }, }, - else: `$dimensions.${field}` - } - } - sortStage[`sort_${field}`] = order + else: `$dimensions.${field}`, + }, + }; + sortStage[`sort_${field}`] = order; } else { // Campo diretto (value, timestamp, ecc.) - sortStage[field] = order + sortStage[field] = order; } - }) + }); if (Object.keys(addFieldsStage).length > 0) { - pipeline.push({ $addFields: addFieldsStage }) + pipeline.push({ $addFields: addFieldsStage }); } - pipeline.push({ $sort: sortStage }) + pipeline.push({ $sort: sortStage }); } if (limit && Number.isInteger(limit) && limit > 0) { - pipeline.push({ $limit: limit }) + pipeline.push({ $limit: limit }); } // Proiezione finale con dati arricchiti @@ -508,22 +504,22 @@ const resolvers = { region: 1, dimensions: { $cond: { - if: { $isArray: '$dimensions' }, + if: { $isArray: "$dimensions" }, then: { $map: { - input: { $objectToArray: { $mergeObjects: '$dimensions' } }, - as: 'dim', - in: '$$dim.v' - } + input: { $objectToArray: { $mergeObjects: "$dimensions" } }, + as: "dim", + in: "$$dim.v", + }, }, else: { $map: { - input: { $objectToArray: '$dimensions' }, - as: 'dim', - in: '$$dim.v' - } - } - } + input: { $objectToArray: "$dimensions" }, + as: "dim", + in: "$$dim.v", + }, + }, + }, }, aggregationPeriod: 1, value: 1, @@ -532,28 +528,28 @@ const resolvers = { references: 1, fromUrl: 1, meta: 1, - updateFrequency: 1 - } - }) + updateFrequency: 1, + }, + }); - console.log('MongoDB Query Dinamica:', JSON.stringify(pipeline, null, 2)) + console.log("MongoDB Query Dinamica:", JSON.stringify(pipeline, null, 2)); - console.log('Esecuzione query...') - const datapoints = await Datapoint.aggregate(pipeline).exec() - console.log(`Trovati datapoints.`) + console.log("Esecuzione query..."); + const datapoints = await Datapoint.aggregate(pipeline).exec(); + console.log(`Trovati datapoints.`); - if (lang && lang !== 'en') { - return await translateDataPointsBatch(datapoints, lang) + if (lang && lang !== "en") { + return await translateDataPointsBatch(datapoints, lang); } - return datapoints - } + return datapoints; + }, }, Mutation: { createSource: async (parent, { json, record, name }) => { - const newSource = new Source({ json, record, name }) - return await newSource.save() + const newSource = new Source({ json, record, name }); + return await newSource.save(); }, updateSource: async (parent, { id, json, record, name }) => { @@ -561,18 +557,18 @@ const resolvers = { id, { json, record, name }, { new: true } - ) + ); }, deleteSource: async (parent, { id }) => { try { - await Source.findByIdAndDelete(id) - return true + await Source.findByIdAndDelete(id); + return true; } catch (err) { - return false + return false; } - } - } -} + }, + }, +}; -module.exports = resolvers +module.exports = resolvers; diff --git a/Query-Engine/docker-compose.prod.yml b/Query-Engine/docker-compose.prod.yml index 8d56e96..6fdc6fa 100644 --- a/Query-Engine/docker-compose.prod.yml +++ b/Query-Engine/docker-compose.prod.yml @@ -65,8 +65,6 @@ services: #- .:/workspace:cached command: sleep infinity - - libretranslate: image: libretranslate/libretranslate:latest container_name: libretranslate diff --git a/Query-Engine/package.json b/Query-Engine/package.json index 7f5ae75..a07c576 100644 --- a/Query-Engine/package.json +++ b/Query-Engine/package.json @@ -22,12 +22,13 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", - "jsonwebtoken": "^9.0.2", + "ioredis": "^5.3.2", + "jsonwebtoken": "^9.0.3", + "jwa": "^2.0.1", "minio": "^8.0.0", "mongoose": "^8.3.4", "nodemon": "^3.1.0", "percocologger": "^1.0.7", - "ioredis": "^5.3.2", "pg": "^8.11.5" }, "stashDependencies": { diff --git a/Source-Connector/api/models/Datapoint.js b/Source-Connector/api/models/Datapoint.js index 919d6c3..52165b1 100644 --- a/Source-Connector/api/models/Datapoint.js +++ b/Source-Connector/api/models/Datapoint.js @@ -1,14 +1,94 @@ -const mongoose = require('mongoose'); - -const datapointSchema = new mongoose.Schema({ - source: String, - survey: String, - surveyName: String, - region: String, - fromUrl: String, - timestamp: String, - dimensions: Object, - value: Number -}, {strict: false}); - -module.exports = mongoose.model('Datapoint', datapointSchema); \ No newline at end of file +const mongoose = require("mongoose"); +const crypto = require("crypto"); + +// Funzione di utilità per pulire il nome della survey +const cleanSurveyName = (val) => { + if (!val || typeof val !== "string") return val; + return val.toUpperCase().replace(/\./g, ""); // Toglie i punti e mette in MAIUSCOLO +}; + +const datapointSchema = new mongoose.Schema( + { + source: { type: String, index: true, uppercase: true }, // uppercase: true lo fa in automatico + survey: { type: String, index: true, uppercase: true }, + surveyName: String, + region: { type: String, index: true, uppercase: true }, + fromUrl: String, + timestamp: { type: String, index: true }, + dimensions: { type: Object }, + value: Number, + dupl_hash: { type: String, unique: true }, // Identificatore unico per evitare duplicati + }, + { + strict: false, + timestamps: true, + } +); + +// Indici per le performance +datapointSchema.index({ "dimensions.$**": 1 }); +datapointSchema.index({ region: 1, timestamp: -1 }); + +// Middleware: Prima di ogni salvataggio, pulisce la survey +datapointSchema.pre("save", function (next) { + if (this.survey) this.survey = cleanSurveyName(this.survey); + next(); +}); + +// Funzione per generare l'hash unico (usata nell'inserimento massivo) +const generateHash = (doc) => { + // Gestiamo sia se arriva come documento Mongoose sia come oggetto puro + const target = doc._doc || doc; + + const dims = target.dimensions || {}; + const sortedKeys = Object.keys(dims) + .sort() + .reduce((acc, key) => { + acc[key] = dims[key]; + return acc; + }, {}); + + const s = cleanSurveyName(target.survey); + + // COSTRUZIONE STRINGA DI HASH + const stringToHash = + (target.fromUrl || "") + + "|" + + (target.timestamp || "") + + "|" + + (target.region || "") + + "|" + + s + + "|" + + (target.value !== undefined ? target.value : Date.now()) + + "|" + + JSON.stringify(sortedKeys); + + return crypto.createHash("md5").update(stringToHash).digest("hex"); +}; + +// Metodo per inserire milioni di record senza duplicati +datapointSchema.statics.upsertMany = async function (datapoints) { + if (!datapoints || datapoints.length === 0) return; + + const operations = datapoints.map((doc) => { + const hash = generateHash(doc); + // Puliamo la survey anche qui per l'operazione di bulk + const cleanedDoc = { + ...doc, + survey: cleanSurveyName(doc.survey), + dupl_hash: hash, + }; + return { + updateOne: { + filter: { dupl_hash: hash }, + update: { $set: cleanedDoc }, + upsert: true, + }, + }; + }); + + return this.bulkWrite(operations, { ordered: false }); +}; + +module.exports = mongoose.model("Datapoint", datapointSchema); diff --git a/Source-Connector/api/services/service.js b/Source-Connector/api/services/service.js index 5ca730f..331dc94 100644 --- a/Source-Connector/api/services/service.js +++ b/Source-Connector/api/services/service.js @@ -1,219 +1,177 @@ -const logger = require('percocologger') -const log = logger.info -const Datapoints = require("../models/Datapoint") -const config = require('../../config') -const minioWriter = require("../../inputConnectors/minioConnector") -const axios = require('axios') -const fs = require('fs') -const { updateJWT } = require('../../utils/keycloak') -let bearerToken -updateJWT().then(token => { - bearerToken = token - logger.info("Initial Keycloak token obtained") -}).catch(error => logger.error(error.response?.data || error)); - -const path = require('path'); -let attrWithUrl = config.orion?.attrWithUrl || "datasetUrl" -require('../../inputConnectors/apiConnector') - -async function insertResponseInDB(size) { - - let stats - let sizeRead = 0 - logger.info(`Attesa del file di stream di dimensione minima: ${size} bytes`); - //let counter = 0 - while (!stats?.size || stats?.size < 50000000) { - stats = fs.statSync(config.streamPath || "/app/shared-data/stream.json"); - sizeRead = (stats.size / 1024) / 1024 - logger.info(`Dimensione del file di stream: ${sizeRead} Megabyte`); - /*counter++ - if (counter > 10000000) { - logger.error("File di stream non trovato dopo numerosi tentativi, esco dalla funzione di inserimento."); - return; - }*/ - } - - - const stream2 = fs.createReadStream(config.nameStream || "/app/shared-data/stream.json", { encoding: "utf-8" }); - let buffer = ""; - let depth = 0; // conta le parentesi graffe - let inObject = false; - - logger.debug("Inizio inserimento datapoints nel database..."); - let tempArray = []; - for await (const chunk of stream2) { - //logger.debug("Lettura chunk di dati..."); - for (const char of chunk) { - //logger.debug(`Elaborazione carattere: ${char}`); - if (char === "{") { - //logger.debug("Inizio di un nuovo oggetto JSON rilevato."); - if (!inObject) inObject = true; - depth++; - } - - //logger.debug(`Profondità attuale delle parentesi graffe: ${depth}`); - if (inObject) buffer += char; - - //logger.debug(`Buffer attuale: ${buffer}`); - if (char === "}") { - //logger.debug("Fine di un oggetto JSON rilevata."); - depth--; - if (depth === 0 && inObject) { - //logger.debug("Oggetto JSON completo rilevato, procedo con l'inserimento nel database."); - // oggetto completo - const obj = JSON.parse(buffer); - //logger.debug(`Oggetto JSON da inserire: ${JSON.stringify(obj)}`); - //const DatapointModel = await Datapoints.getDatapointModel(); - //const datapoint = new DatapointModel(obj); - tempArray.push(obj); - if (tempArray.length >= config.batch) { - await Datapoints.insertMany(tempArray); - tempArray = []; - logger.debug(config.batch + " Datapoints salvati nel database."); - } - //await Datapoints.insertMany([obj]); - //logger.debug("Datapoint salvato nel database."); - buffer = ""; - inObject = false; - } - } - } - } - try { - fs.unlinkSync(config.nameStream || "/app/shared-data/stream.json"); - logger.info('File cancellato con successo!'); - } catch (err) { - logger.error('Errore durante la cancellazione:', err); - } -} +const logger = require("percocologger"); +const log = logger.info; +const Datapoints = require("../models/Datapoint"); +const config = require("../../config"); +const minioWriter = require("../../inputConnectors/minioConnector"); +const axios = require("axios"); +const fs = require("fs"); +const { updateJWT } = require("../../utils/keycloak"); +let bearerToken; +updateJWT() + .then((token) => { + bearerToken = token; + logger.info("Initial Keycloak token obtained"); + }) + .catch((error) => logger.error(error.response?.data || error)); + +const path = require("path"); +let attrWithUrl = config.orion?.attrWithUrl || "datasetUrl"; +require("../../inputConnectors/apiConnector"); module.exports = { - - notifyPath: async (req, res) => { - - logger.info({ body: JSON.stringify(req.body) }) - - const data = req.body.data || req.body.value || req.body; - const entities = Array.isArray(data) ? data : [data]; - - for (const ent of entities) { - const id = ent.id || ent['@id'] || 'unknown-id'; - let urlValue; - if (ent[attrWithUrl] && typeof ent[attrWithUrl] === 'object' && 'value' in ent[attrWithUrl]) { - urlValue = ent[attrWithUrl].value; - } else if (ent[attrWithUrl]) { - urlValue = ent[attrWithUrl]; - } else if (ent[attrWithUrl + ':value']) { - urlValue = ent[attrWithUrl + ':value']; - } else if (ent.value) { - urlValue = ent.value; - } - - if (!urlValue || typeof urlValue !== 'string') { - console.warn(`no URL found for entity ${id}`); - continue; - } - - let mapID = req.query.mapID || req.params.mapID || ent.mapID || config.mapID - - if (!mapID) { - const response = await axios.get(urlValue); - if (response?.data?.data?.datapoints) - await Datapoints.insertMany(response.data.data.datapoints) - else - await minioWriter.insertInDBs(response.data, { - name: id + '-' + path.basename((new URL(urlValue)).pathname), - lastModified: new Date(), - versionId: 'null', - isDeleteMarker: false, - bucketName: 'orion-notify', - size: response.data.length, - isLatest: true, - etag: '', - insertedBy: 'orion-notify' - }); - } - else { - let response - let retry = 2 - while (retry > 0) - try { - response = await axios.post( - config.mapEndpoint, - { - "sourceDataType": "json", - "sourceDataURL": urlValue, - "decodeOptions": { - "decodeFrom": "json-stat" - }, - "config": { - "NGSI_entity": false, - "ignoreValidation": true, - "writers": [], - "disableAjv": true, - "mappingReport": true - }, - "dataModel": { - "$schema": "http://json-schema.org/schema#", - "$id": "dataModels/DataModelTemp.json", - "title": "DataModelTemp", - "description": "Bike Hire Docking Station", - "type": "object", - "properties": { - "region": { - "type": "string" - }, - "source": { - "type": "string" - }, - "timestamp": { - "type": "string" - }, - "survey": { - "type": "string" - }, - "dimensions": { - "type": "object" - }, - "value": { - "type": "integer" - } - } - } - }/* + notifyPath: async (req, res) => { + logger.info({ body: JSON.stringify(req.body) }); + + const data = req.body.data || req.body.value || req.body; + const entities = Array.isArray(data) ? data : [data]; + + for (const ent of entities) { + const id = ent.id || ent["@id"] || "unknown-id"; + let urlValue; + if ( + ent[attrWithUrl] && + typeof ent[attrWithUrl] === "object" && + "value" in ent[attrWithUrl] + ) { + urlValue = ent[attrWithUrl].value; + } else if (ent[attrWithUrl]) { + urlValue = ent[attrWithUrl]; + } else if (ent[attrWithUrl + ":value"]) { + urlValue = ent[attrWithUrl + ":value"]; + } else if (ent.value) { + urlValue = ent.value; + } + + if (!urlValue || typeof urlValue !== "string") { + console.warn(`no URL found for entity ${id}`); + continue; + } + + let mapID = + req.query.mapID || req.params.mapID || ent.mapID || config.mapID; + + if (!mapID) { + const response = await axios.get(urlValue); + if (response?.data?.data?.datapoints) + await Datapoints.insertMany(response.data.data.datapoints); + else + await minioWriter.insertInDBs(response.data, { + name: id + "-" + path.basename(new URL(urlValue).pathname), + lastModified: new Date(), + versionId: "null", + isDeleteMarker: false, + bucketName: "orion-notify", + size: response.data.length, + isLatest: true, + etag: "", + insertedBy: "orion-notify", + }); + } else { + let response; + let retry = 2; + while (retry > 0) + try { + response = await axios.post( + config.mapEndpoint, + { + sourceDataType: "json", + sourceDataURL: urlValue, + decodeOptions: { + decodeFrom: "json-stat", + }, + config: { + NGSI_entity: false, + ignoreValidation: true, + writers: [], + disableAjv: true, + mappingReport: true, + }, + dataModel: { + $schema: "http://json-schema.org/schema#", + $id: "dataModels/DataModelTemp.json", + title: "DataModelTemp", + description: "Bike Hire Docking Station", + type: "object", + properties: { + region: { + type: "string", + }, + source: { + type: "string", + }, + timestamp: { + type: "string", + }, + survey: { + type: "string", + }, + dimensions: { + type: "object", + }, + value: { + type: "integer", + }, + }, + }, + } /* { //mapID, sourceDataURL: urlValue }*/, - { - headers: { - "Authorization": `Bearer ${bearerToken}` - } - }); - retry -= 2 - try { - logger.info("Inserting datapoints into DB..."); - logger.info(response.data[0], " MB") - await insertResponseInDB(response.data[0])//.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion - } - catch (error) { - logger.error("Error inserting datapoints:", error) - } - } - catch (error) { - logger.error("Error fetching mapped data from API Connector:", error.response?.data || error.message); - try { - bearerToken = await updateJWT(true); - retry-- - } catch (e) { - logger.error("Error updating JWT:", e); - retry-- - } - } - //logger.info(response.data.lenght) - + { + headers: { + Authorization: `Bearer ${bearerToken}`, + }, + } + ); + retry -= 2; + try { + logger.info("Inserting datapoints into DB..."); + logger.info(response.data); + let outputId = response.data[response.data.length - 1].MAPPING_REPORT.outputId + let lastId + let purged = false + for (let chunkIndex = 0; (response.data[0] || response.data.id); chunkIndex++) { // Loop per gestire i chunk + //while (response.data[0] || response.data.id) { + logger.info(response.data.status) + response = await axios.get((config.sessionEdnpoint || "http://localhost:5500/api/output?") + "id=" + outputId + "&lastId=" + lastId + "&index=" + chunkIndex, { + headers: { + Authorization: `Bearer ${bearerToken}` + } + }) + if (response.data[0]) { + if (!purged) + await Datapoints.deleteMany({ survey: response.data[0].survey }); // Cancellazione preliminare + const dataToInsert = response.data.map((d) => { // Preparazione dei dati + return { + ...d, + fromUrl: urlValue, + }; + }); + await Datapoints.upsertMany(dataToInsert); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion + lastId = response.data[response.data.length - 1]?._id // Gestione indici per il prossimo loop + purged = true; + } + } + } catch (error) { + logger.error("Error inserting datapoints:", error); + } + } catch (error) { + logger.error( + "Error fetching mapped data from API Connector:", + error.response?.data || error.message + ); + try { + bearerToken = await updateJWT(true); + retry--; + } catch (e) { + logger.error("Error updating JWT:", e); + retry--; + } + } + //logger.info(response.data.lenght) - /*for (let i in response.data) + /*for (let i in response.data) await minioWriter.insertInDBs(response.data[i], { name: response.data[i].id || mapID + '-' + path.basename((new URL(urlValue)).pathname) + i, lastModified: new Date(), @@ -225,11 +183,11 @@ module.exports = { etag: '', insertedBy: 'orion-notify' });*/ - } - logger.info(`downloaded ${urlValue}`); - } - return 'OK'; - }, + } + logger.info(`downloaded ${urlValue}`); + } + return "OK"; + }, - sync: minioWriter.sync -} \ No newline at end of file + sync: minioWriter.sync, +}; diff --git a/Source-Connector/config.template.js b/Source-Connector/config.template.js index 9497b5e..28de819 100644 --- a/Source-Connector/config.template.js +++ b/Source-Connector/config.template.js @@ -30,7 +30,7 @@ module.exports = { deleteAllDuplicateSubscriptions: true, attrWithUrl: "datasetUrl", orionBaseUrl: "http://localhost:1027", - notificationUrl: "http://host.docker.internal:3000/api/orion/subscribe", + notificationUrl: "http://localhost:3000/api/orion/subscribe", fiwareService: "", fiwareServicePath: "" }, @@ -43,7 +43,7 @@ module.exports = { port: 3000, updateOwner: "later", writeLogsOnFile: true, - mongo: "mongodb://localhost:27017/Minio-Mongo", // mongo url + mongo: "mongodb://localhost:22000/Minio-Mongo", // mongo url authConfig: { idmHost: "https://platform.beopendep.it/auth", clientId: "", diff --git a/Source-Connector/package.json b/Source-Connector/package.json index 7f5ae75..a429007 100644 --- a/Source-Connector/package.json +++ b/Source-Connector/package.json @@ -22,12 +22,12 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", - "jsonwebtoken": "^9.0.2", + "ioredis": "^5.3.2", + "jsonwebtoken": "^9.0.3", "minio": "^8.0.0", "mongoose": "^8.3.4", "nodemon": "^3.1.0", "percocologger": "^1.0.7", - "ioredis": "^5.3.2", "pg": "^8.11.5" }, "stashDependencies": { diff --git a/Source-Connector/utils/keycloak.js b/Source-Connector/utils/keycloak.js index e168309..efd80a6 100644 --- a/Source-Connector/utils/keycloak.js +++ b/Source-Connector/utils/keycloak.js @@ -1,6 +1,6 @@ const axios = require("axios"); const config = require("../config.js") -const keycloakBaseUrl = config.authConfig.idmHost + "/realms/smartera"; +const keycloakBaseUrl = config.authConfig.idmHost + "/realms/" + (config.authConfig.authRealm || "master") const { clientId, username, password } = config.authConfig; const fs = require('fs'); const path = './token.js';