diff --git a/.github/workflows/ods.yml b/.github/workflows/ods.yml index 6e35155af..b1f4b3c88 100644 --- a/.github/workflows/ods.yml +++ b/.github/workflows/ods.yml @@ -97,6 +97,7 @@ jobs: docker-compose -f docker-compose.yml build storage docker-compose -f docker-compose.yml build storage-db-liquibase docker-compose -f docker-compose.yml build storage-mq + docker-compose -f docker-compose.yml build graphql - name: Build Integration-test run: | @@ -117,10 +118,12 @@ jobs: IMAGE_ID_STORAGE=$(sed -n 's/^DOCKER_REGISTRY=//p' .env)/storage IMAGE_ID_LIQUIBASE=$(sed -n 's/^DOCKER_REGISTRY=//p' .env)/storage-db-liquibase IMAGE_ID_STORAGEMQ=$(sed -n 's/^DOCKER_REGISTRY=//p' .env)/storage-mq + IMAGE_ID_STORAGE_GRAPHQL=$(sed -n 's/^DOCKER_REGISTRY=//p' .env)/storage-graphql docker save $IMAGE_ID_STORAGE > storage_postgrest.tar docker save $IMAGE_ID_LIQUIBASE > storage_liquibase.tar docker save $IMAGE_ID_STORAGEMQ > storage_mq.tar + docker save $IMAGE_ID_STORAGE_GRAPHQL > storage_graphql.tar - name: Upload Storage Docker image as artifact uses: actions/upload-artifact@v1 @@ -140,6 +143,12 @@ jobs: name: storagemq-artifact path: storage_mq.tar + - name: Upload Storage-GraphQL Docker image as artifact + uses: actions/upload-artifact@v1 + with: + name: storagegraphql-artifact + path: storage_graphql.tar + # The CDC-Test builds a cut-down image that would override the previously built production image # Therefore it is executed after uploading the production image - name: Consumer-side CDC-Test @@ -477,6 +486,10 @@ jobs: uses: actions/download-artifact@v1 with: name: storagemq-artifact + - name: Download storage-graphql artifact + uses: actions/download-artifact@v1 + with: + name: storagegraphql-artifact - name: Load Docker Images from artifacts run: | @@ -484,6 +497,7 @@ jobs: docker load -i ./storage-artifact/storage_postgrest.tar docker load -i ./liquibase-artifact/storage_liquibase.tar docker load -i ./storagemq-artifact/storage_mq.tar + docker load -i ./storagegraphql-artifact/storage_graphql.tar - name: Storage Push to registry run: | @@ -498,6 +512,7 @@ jobs: IMAGE_ID_STORAGE=$(sed -n 's/^DOCKER_REGISTRY=//p' .env)/storage IMAGE_ID_LIQUIBASE=$(sed -n 's/^DOCKER_REGISTRY=//p' .env)/storage-db-liquibase IMAGE_ID_STORAGE_MQ=$(sed -n 's/^DOCKER_REGISTRY=//p' .env)/storage-mq + IMAGE_ID_STORAGE_GRAPHQL=$(sed -n 's/^DOCKER_REGISTRY=//p' .env)/storage-graphql docker tag $IMAGE_ID_STORAGE $IMAGE_ID_STORAGE:$STORAGE_VERSION docker tag $IMAGE_ID_STORAGE $IMAGE_ID_STORAGE:latest @@ -508,6 +523,9 @@ jobs: docker tag $IMAGE_ID_STORAGE_MQ $IMAGE_ID_STORAGE_MQ:$STORAGE_VERSION docker tag $IMAGE_ID_STORAGE_MQ $IMAGE_ID_STORAGE_MQ:latest + docker tag $IMAGE_ID_STORAGE_GRAPHQL $IMAGE_ID_STORAGE_GRAPHQL:$STORAGE_VERSION + docker tag $IMAGE_ID_STORAGE_GRAPHQL $IMAGE_ID_STORAGE_GRAPHQL:latest + docker push $IMAGE_ID_STORAGE:$STORAGE_VERSION docker push $IMAGE_ID_STORAGE:latest @@ -517,6 +535,9 @@ jobs: docker push $IMAGE_ID_STORAGE_MQ:$STORAGE_VERSION docker push $IMAGE_ID_STORAGE_MQ:latest + docker push $IMAGE_ID_STORAGE_GRAPHQL:$STORAGE_VERSION + docker push $IMAGE_ID_STORAGE_GRAPHQL:latest + pipeline_upload: name: Pipeline Publish runs-on: ubuntu-18.04 diff --git a/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/DatasourceManager.java b/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/DatasourceManager.java index 263aa47b0..04c30ba93 100644 --- a/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/DatasourceManager.java +++ b/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/DatasourceManager.java @@ -13,6 +13,7 @@ import org.jvalue.ods.adapterservice.datasource.repository.DatasourceRepository; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.everit.json.schema.ValidationException; import org.jvalue.ods.adapterservice.datasource.validator.*; @@ -100,7 +101,7 @@ public DataImport.MetaData trigger(Long id, RuntimeParameters runtimeParameters) */ @Transactional DataImport.MetaData executeImport(Long id, RuntimeParameters runtimeParameters) - throws DatasourceNotFoundException, ImporterParameterException, InterpreterParameterException, IOException { + throws DatasourceNotFoundException, ImporterParameterException, InterpreterParameterException, IOException { Datasource datasource = getDatasource(id); DataImport dataImport = new DataImport(datasource, "", ValidationMetaData.HealthStatus.FAILED); Validator validator = new JsonSchemaValidator(); diff --git a/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/model/types/CustomStringArrayType.java b/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/model/types/CustomStringArrayType.java index 2692376e7..104dffaa8 100644 --- a/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/model/types/CustomStringArrayType.java +++ b/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/model/types/CustomStringArrayType.java @@ -53,7 +53,7 @@ public Object deepCopy(Object value) { public Serializable disassemble(Object value) throws HibernateException { return (Serializable) value; } - + @Override public boolean equals(Object x, Object y) throws HibernateException { return x.equals(y); diff --git a/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/validator/JsonSchemaValidator.java b/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/validator/JsonSchemaValidator.java index 7eca95bfb..fab331bbc 100644 --- a/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/validator/JsonSchemaValidator.java +++ b/adapter/src/main/java/org/jvalue/ods/adapterservice/datasource/validator/JsonSchemaValidator.java @@ -1,13 +1,19 @@ package org.jvalue.ods.adapterservice.datasource.validator; +import org.jvalue.ods.adapterservice.datasource.validator.ValidationMetaData; import org.jvalue.ods.adapterservice.datasource.model.*; +import org.jvalue.ods.adapterservice.datasource.model.exceptions.*; +import java.io.IOException; import org.everit.json.schema.ValidationException; import org.everit.json.schema.Schema; import org.everit.json.schema.loader.SchemaLoader; + import org.json.JSONArray; import org.json.JSONObject; +import org.json.JSONTokener; +import java.util.Arrays; import com.google.gson.Gson; public class JsonSchemaValidator implements Validator { @@ -29,7 +35,6 @@ public ValidationMetaData validate(DataImport dataImport){ else { schema.validate(new JSONObject(dataImport.getData())); } - validationMetaData.setHealthStatus(ValidationMetaData.HealthStatus.OK); return validationMetaData; } catch ( ValidationException e) { diff --git a/docker-compose.yml b/docker-compose.yml index eb385eea3..3e6376d0c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -163,7 +163,7 @@ services: build: ./storage/storage-mq/ environment: CONNECTION_RETRIES: 30 - CONNECTION_BACKOFF_IN_MS: 2000 + CONNECTION_BACKOFF_IN_MS: 20000 POSTGRES_HOST: storage-db POSTGRES_PORT: 5432 @@ -221,15 +221,26 @@ services: POSTGRES_DB: ods POSTGRES_USER: ods_admin POSTGRES_PASSWORD: ods_pw + + graphql: + image: ${DOCKER_REGISTRY}/storage-graphql + build: ./storage/storage-graphql/ + depends_on: + - storage-db + - storage-db-liquibase + ports: + - 5432:5432 + command: ["--connection", postgres://ods_admin:ods_pw@storage-db:5432/ods, "--port", "5432", "--schema", "storage", "--append-plugins", "postgraphile-plugin-connection-filter"] + # Uncomment this if you want to persist the data. # volumes: # - "./pgdata:/var/lib/postgresql/data" # Uncomment this if you want to manage the Postgres databases with adminer - # adminer: # management UI for Postgres - # image: adminer - # ports: - # - 8081:8080 + adminer: # management UI for Postgres + image: adminer + ports: + - 8081:8080 storage-db-liquibase: # perform database migration on start up image: ${DOCKER_REGISTRY}/storage-db-liquibase diff --git a/pipeline/src/pipeline-config/outboxEventPublisher.ts b/pipeline/src/pipeline-config/outboxEventPublisher.ts index f581ea770..c29f1def8 100644 --- a/pipeline/src/pipeline-config/outboxEventPublisher.ts +++ b/pipeline/src/pipeline-config/outboxEventPublisher.ts @@ -14,10 +14,12 @@ export async function publishCreation( client: ClientBase, pipelineId: number, pipelineName: string, + schema?: Record, ): Promise { const content = { pipelineId: pipelineId, pipelineName: pipelineName, + schema: schema != null ? schema : undefined, }; return await insertEvent(client, AMQP_PIPELINE_CONFIG_CREATED_TOPIC, content); } diff --git a/pipeline/src/pipeline-config/pipelineConfigManager.ts b/pipeline/src/pipeline-config/pipelineConfigManager.ts index 116e2c496..f50e3655a 100644 --- a/pipeline/src/pipeline-config/pipelineConfigManager.ts +++ b/pipeline/src/pipeline-config/pipelineConfigManager.ts @@ -23,6 +23,7 @@ export class PipelineConfigManager { client, savedConfig.id, savedConfig.metadata.displayName, + savedConfig.schema, ); return savedConfig; }); diff --git a/storage/storage-graphql/Dockerfile b/storage/storage-graphql/Dockerfile new file mode 100644 index 000000000..7abbc64fd --- /dev/null +++ b/storage/storage-graphql/Dockerfile @@ -0,0 +1,8 @@ +FROM node:alpine + +# Install PostGraphile and PostGraphile connection filter plugin +RUN npm install -g postgraphile +RUN npm install -g postgraphile-plugin-connection-filter + +EXPOSE 5000 +ENTRYPOINT ["postgraphile", "-n", "0.0.0.0"] \ No newline at end of file diff --git a/storage/storage-mq/cdct-consumer.sh b/storage/storage-mq/cdct-consumer.sh old mode 100644 new mode 100755 diff --git a/storage/storage-mq/package-lock.json b/storage/storage-mq/package-lock.json index 3a1435f7b..5d5bf19b1 100644 --- a/storage/storage-mq/package-lock.json +++ b/storage/storage-mq/package-lock.json @@ -11,6 +11,7 @@ "@jvalue/node-dry-amqp": "0.1.2", "@jvalue/node-dry-basics": "0.0.3", "@jvalue/node-dry-pg": "1.2.1", + "@types/json-schema": "^7.0.9", "cors": "^2.8.5", "express": "^4.17.1" }, @@ -1345,8 +1346,7 @@ "node_modules/@types/json-schema": { "version": "7.0.9", "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.9.tgz", - "integrity": "sha512-qcUXuemtEu+E5wZSJHNxUXeCZhAfXKQ41D+duX+VYPde7xyEVZci+/oXKJL13tnRs9lR2pr4fod59GT6/X1/yQ==", - "dev": true + "integrity": "sha512-qcUXuemtEu+E5wZSJHNxUXeCZhAfXKQ41D+duX+VYPde7xyEVZci+/oXKJL13tnRs9lR2pr4fod59GT6/X1/yQ==" }, "node_modules/@types/json5": { "version": "0.0.29", @@ -7589,34 +7589,6 @@ "node": ">=4" } }, - "node_modules/popsicle/node_modules/tough-cookie": { - "version": "2.5.0", - "integrity": "sha512-nlLsUzgm1kfLXSXfRZMc1KLAugd4hqJHDTvc2hDIwS3mZAfMEuMbc03SujMF+GEcpaX/qboeycw6iO8JwVv2+g==", - "dev": true, - "dependencies": { - "psl": "^1.1.28", - "punycode": "^2.1.1" - }, - "engines": { - "node": ">=0.8" - } - }, - "node_modules/posix-character-classes": { - "version": "0.1.1", - "integrity": "sha1-AerA/jta9xoqbAL+q7jB/vfgDqs=", - "dev": true, - "engines": { - "node": ">=0.10.0" - } - }, - "node_modules/postgres-array": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", - "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==", - "engines": { - "node": ">=4" - } - }, "node_modules/postgres-bytea": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz", @@ -10986,8 +10958,7 @@ "@types/json-schema": { "version": "7.0.9", "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.9.tgz", - "integrity": "sha512-qcUXuemtEu+E5wZSJHNxUXeCZhAfXKQ41D+duX+VYPde7xyEVZci+/oXKJL13tnRs9lR2pr4fod59GT6/X1/yQ==", - "dev": true + "integrity": "sha512-qcUXuemtEu+E5wZSJHNxUXeCZhAfXKQ41D+duX+VYPde7xyEVZci+/oXKJL13tnRs9lR2pr4fod59GT6/X1/yQ==" }, "@types/json5": { "version": "0.0.29", diff --git a/storage/storage-mq/package.json b/storage/storage-mq/package.json index 61834d731..00c0e1224 100644 --- a/storage/storage-mq/package.json +++ b/storage/storage-mq/package.json @@ -19,6 +19,7 @@ "@jvalue/node-dry-amqp": "0.1.2", "@jvalue/node-dry-basics": "0.0.3", "@jvalue/node-dry-pg": "1.2.1", + "@types/json-schema": "^7.0.9", "cors": "^2.8.5", "express": "^4.17.1" }, diff --git a/storage/storage-mq/src/api/pipelineConfigEventHandler.ts b/storage/storage-mq/src/api/pipelineConfigEventHandler.ts index 5d579ed79..1cf5a9cbb 100644 --- a/storage/storage-mq/src/api/pipelineConfigEventHandler.ts +++ b/storage/storage-mq/src/api/pipelineConfigEventHandler.ts @@ -1,3 +1,4 @@ +import { JsonSchemaElementBase, isDefined } from '../service/sharedHelper'; import { StorageStructureRepository } from '../storage-structure/storageStructureRepository'; export class PipelineConfigEventHandler { @@ -11,6 +12,13 @@ export class PipelineConfigEventHandler { await this.structureRepository.create( pipelineCreatedEvent.pipelineId.toString(), ); + if (isDefined(pipelineCreatedEvent.schema)) { + await this.structureRepository.createForSchema( + pipelineCreatedEvent.schema, + pipelineCreatedEvent.pipelineName + + pipelineCreatedEvent.pipelineId.toString(), + ); + } } async handleDeletion( @@ -25,6 +33,7 @@ export class PipelineConfigEventHandler { export interface PipelineCreatedEvent { pipelineId: number; pipelineName: string; + schema?: JsonSchemaElementBase; } export interface PipelineDeletedEvent { diff --git a/storage/storage-mq/src/api/pipelineExecutionEventHandler.ts b/storage/storage-mq/src/api/pipelineExecutionEventHandler.ts index 7a6648bb8..5de771bd7 100644 --- a/storage/storage-mq/src/api/pipelineExecutionEventHandler.ts +++ b/storage/storage-mq/src/api/pipelineExecutionEventHandler.ts @@ -1,3 +1,4 @@ +import { JsonSchemaElementBase, isDefined } from '../service/sharedHelper'; import { StorageContentRepository } from '../storage-content/storageContentRepository'; export class PipelineExecutionEventHandler { @@ -14,6 +15,18 @@ export class PipelineExecutionEventHandler { data: pipelineExecutedEvent.data, }, ); + if (isDefined(pipelineExecutedEvent.schema)) { + await this.contentRepository.saveContentForSchema( + pipelineExecutedEvent.pipelineName + + pipelineExecutedEvent.pipelineId.toString(), + { + pipelineId: pipelineExecutedEvent.pipelineId, + timestamp: pipelineExecutedEvent.timestamp ?? new Date(), + data: pipelineExecutedEvent.data, + schema: pipelineExecutedEvent.schema, + }, + ); + } } } @@ -21,6 +34,6 @@ export interface PipelineExecutedEvent { pipelineId: number; pipelineName: string; data: unknown; - schema?: Record; + schema?: JsonSchemaElementBase; timestamp?: Date; } diff --git a/storage/storage-mq/src/service/jsonSchemaParser.ts b/storage/storage-mq/src/service/jsonSchemaParser.ts new file mode 100644 index 000000000..f4487619f --- /dev/null +++ b/storage/storage-mq/src/service/jsonSchemaParser.ts @@ -0,0 +1,373 @@ +import PostgresParser from './postgresParser'; +import * as SharedHelper from './sharedHelper'; + +const CREATE_STATEMENT = (schema: string, table: string): string => + `CREATE TABLE IF NOT EXISTS "${schema}"."${table}" (` + + '"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY, ' + + '"createdAt" timestamp not null default CURRENT_TIMESTAMP'; + +const INSERT_STATEMENT_COLUMNS = (schema: string, table: string): string => + `INSERT INTO "${schema}"."${table}" (`; + +const INSERT_CONTENT_STATEMENT_VALUES = ') VALUES ('; + +const PRIMARY_KEY_STATEMENT = (schema: string, table: string): string => + `, CONSTRAINT "Data_pk_${schema}_${table}" PRIMARY KEY (id)`; + +const FOREGIN_KEY_STATEMENT = ( + schema: string, + table: string, + parentTable: string, +): string => + `, CONSTRAINT "Data_fk_${schema}_${table}" FOREIGN KEY (${parentTable}id)` + + `REFERENCES ${schema}.${parentTable}(id)`; + +const END_STATEMENT_CREATE = ')'; + +const END_STATEMENT_INSERT = ') RETURNING *;'; + +const PG_TYPES: Record = { + string: 'text', + number: 'integer', + boolean: 'boolean', +}; + +export default class JsonSchemaParser implements PostgresParser { + private postgresSchemaCreate: string[] = []; + private postgresSchemaInsertColumns: string[] = []; + private postgresSchemaInsertValues: string[] = []; + + async parseCreateStatement( + schema: SharedHelper.JsonSchemaElementBase, + pgSchemaName: string, + tableName: string, + index = 0, + parentName = '', + ): Promise { + if (SharedHelper.isArray(schema)) { + await this.doParseCreate( + schema.items, + index, + pgSchemaName, + tableName, + parentName, + ); + } else if (SharedHelper.isObject(schema)) { + await this.doParseCreate( + schema, + index, + pgSchemaName, + tableName, + parentName, + ); + } + + return this.postgresSchemaCreate; + } + + async doParseCreate( + schema: SharedHelper.JsonSchemaElementBase, + index: number, + pgSchemaName: string, + tableName: string, + parentName = '', + ): Promise { + const currentIndex = index; + this.postgresSchemaCreate[currentIndex] = CREATE_STATEMENT( + pgSchemaName, + tableName, + ); + + if (SharedHelper.isObject(schema)) { + for (const key in schema.properties) { + if (Object.prototype.hasOwnProperty.call(schema.properties, key)) { + const currentProperty = schema.properties[key]; + if (SharedHelper.isObject(currentProperty)) { + await this.parseCreateStatement( + currentProperty, + pgSchemaName, + tableName + '_' + key, + ++index, + tableName, + ); + } else if (SharedHelper.isArray(currentProperty)) { + const childSchema = currentProperty.items; + if (SharedHelper.isObject(childSchema)) { + await this.parseCreateStatement( + childSchema, + pgSchemaName, + tableName + '_' + key, + ++index, + tableName, + ); + } else { + if (currentProperty.items.type !== undefined) { + this.postgresSchemaCreate[currentIndex] += `, "${key}" ${ + PG_TYPES[currentProperty.items.type as string] + }[]`; + } + } + } else { + this.postgresSchemaCreate[currentIndex] += `, "${key}" ${ + PG_TYPES[currentProperty.type] + }`; + } + } + } + } + // TODO else if array AND else simple datatype + + if (SharedHelper.hasParent(parentName)) { + this.postgresSchemaCreate[ + currentIndex + ] += `, "${parentName}id" bigint NOT NULL`; + this.postgresSchemaCreate[currentIndex] += FOREGIN_KEY_STATEMENT( + pgSchemaName, + tableName, + parentName, + ); + } + + this.postgresSchemaCreate[currentIndex] += PRIMARY_KEY_STATEMENT( + pgSchemaName, + tableName, + ); + this.postgresSchemaCreate[currentIndex] += END_STATEMENT_CREATE; + } + + async parseInsertStatement( + schema: SharedHelper.JsonSchemaElementBase, + data: unknown, + pgSchemaName: string, + tableName: string, + parentId: number, + index = 0, + parentName = '', + ): Promise { + if (SharedHelper.isArray(schema)) { + await this.doParseInsertArray( + schema.items, + data as unknown[], + index, + pgSchemaName, + tableName, + parentId, + parentName, + ); + } else if (SharedHelper.isObject(schema)) { + await this.doParseInsertObject( + schema, + data, + index, + pgSchemaName, + tableName, + parentId, + parentName, + ); + } + + let result = 'BEGIN;'; + this.postgresSchemaInsertColumns.forEach((insertColumnString, index) => { + if (insertColumnString.charAt(insertColumnString.length - 1) === ',') { + result += + insertColumnString.slice(0, -1) + // Drops the unnecessary comma + this.postgresSchemaInsertValues[index].slice(0, -1) + // Drops the unnecessary comma + END_STATEMENT_INSERT; + } else { + result += + insertColumnString + + this.postgresSchemaInsertValues[index] + + END_STATEMENT_INSERT; + } + }); + result += 'END;'; + return result; + } + + async doParseInsertArray( + schema: SharedHelper.JsonSchemaElementBase, + data: unknown[], + index: number, + pgSchemaName: string, + tableName: string, + parentId: number, + parentName = '', + ): Promise { + let element: unknown; + for (element of data) { + const currentIndex = index; + this.postgresSchemaInsertColumns[currentIndex] = INSERT_STATEMENT_COLUMNS( + pgSchemaName, + tableName, + ); // Insertion + this.postgresSchemaInsertValues[currentIndex] = + INSERT_CONTENT_STATEMENT_VALUES; + + if (SharedHelper.isObject(schema)) { + for (const key in schema.properties) { + if (Object.prototype.hasOwnProperty.call(schema.properties, key)) { + const currentProperty = schema.properties[key]; + if (SharedHelper.isObject(currentProperty)) { + await this.parseInsertStatement( + currentProperty, + (element as Record)[key], + pgSchemaName, + tableName + '_' + key, + parentId, + ++index, + tableName, + ); + } else if (SharedHelper.isArray(currentProperty)) { + const childSchema = currentProperty.items; + if (SharedHelper.isObject(childSchema)) { + await this.parseInsertStatement( + currentProperty, + (element as Record)[key], + pgSchemaName, + tableName + '_' + key, + parentId, + ++index, + tableName, + ); + } else { + if (currentProperty.items.type !== undefined) { + this.addToInsertArrays( + currentIndex, + key, + (element as Record)[key], + `${currentProperty.items.type}[]`, + ); + } + } + } else { + this.addToInsertArrays( + currentIndex, + key, + (element as Record)[key], + currentProperty.type, + ); + } + } + } + } + // TODO else if array AND else simple datatype + + if (parentName !== '') { + this.addToInsertArrays( + currentIndex, + parentName + 'id', + parentId, + 'number', + ); + } + index = await this.asyncIncrement(index); + parentId = await this.asyncIncParent(parentId, parentName); + } + } + + async doParseInsertObject( + schema: SharedHelper.JsonSchemaElementObject, + data: unknown, + index: number, + pgSchemaName: string, + tableName: string, + parentId: number, + parentName = '', + ): Promise { + const currentIndex = index; + this.postgresSchemaInsertColumns[currentIndex] = INSERT_STATEMENT_COLUMNS( + pgSchemaName, + tableName, + ); // Insertion + this.postgresSchemaInsertValues[currentIndex] = + INSERT_CONTENT_STATEMENT_VALUES; + for (const key in schema.properties) { + if (Object.prototype.hasOwnProperty.call(schema.properties, key)) { + const currentProperty = schema.properties[key]; + if (SharedHelper.isObject(currentProperty)) { + await this.parseInsertStatement( + currentProperty, + (data as Record)[key], + pgSchemaName, + tableName + '_' + key, + parentId, + ++index, + tableName, + ); + } else if (SharedHelper.isArray(currentProperty)) { + const childSchema = currentProperty.items; + if (SharedHelper.isObject(childSchema)) { + await this.parseInsertStatement( + currentProperty, + (data as Record)[key], + pgSchemaName, + tableName + '_' + key, + parentId, + ++index, + tableName, + ); + } else { + if (currentProperty.items.type !== undefined) { + this.addToInsertArrays( + currentIndex, + key, + (data as Record)[key], + `${currentProperty.items.type}[]`, + ); + } + } + } else { + this.addToInsertArrays( + currentIndex, + key, + (data as Record)[key], + currentProperty.type, + ); + } + } + } + if (parentName !== '') { + this.addToInsertArrays( + currentIndex, + parentName + 'id', + parentId, + 'number', + ); + } + } + + addToInsertArrays( + index: number, + key: string, + value: unknown, + type: string, + ): void { + this.postgresSchemaInsertColumns[index] += `"${key}",`; + if (value === undefined) { + value = null; + } + if (type.includes('[]')) { + // TODO testing if (value as string) works, or some other way without using eslint-disable + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this.postgresSchemaInsertValues[index] += `'{"${value}"}',`; + } else if (type === 'number') { + // TODO testing if (value as string) works, or some other way without using eslint-disable + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this.postgresSchemaInsertValues[index] += `${value},`; + } else { + // TODO testing if (value as string) works, or some other way without using eslint-disable + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this.postgresSchemaInsertValues[index] += `'${value}',`; + } + } + + // eslint-disable-next-line @typescript-eslint/require-await + async asyncIncrement(value: number): Promise { + return ++value; + } + + // eslint-disable-next-line @typescript-eslint/require-await + async asyncIncParent(value: number, name: string): Promise { + return name === '' ? ++value : value; + } +} diff --git a/storage/storage-mq/src/service/postgresParser.ts b/storage/storage-mq/src/service/postgresParser.ts new file mode 100644 index 000000000..4d007c20e --- /dev/null +++ b/storage/storage-mq/src/service/postgresParser.ts @@ -0,0 +1,21 @@ +import { JsonSchemaElementBase } from './sharedHelper'; + +export default interface PostgresParser { + parseCreateStatement: ( + schema: JsonSchemaElementBase, + pgSchemaName: string, + tableName: string, + index?: number, + parentName?: string, + ) => Promise; + + parseInsertStatement: ( + schema: JsonSchemaElementBase, + data: unknown, + pgSchemaName: string, + tableName: string, + parentId: number, + index?: number, + parentName?: string, + ) => Promise; +} diff --git a/storage/storage-mq/src/service/schemaToObjectParser.ts b/storage/storage-mq/src/service/schemaToObjectParser.ts new file mode 100644 index 000000000..c14df06d1 --- /dev/null +++ b/storage/storage-mq/src/service/schemaToObjectParser.ts @@ -0,0 +1,48 @@ +import * as SharedHelper from './sharedHelper'; + +export default class SchemaToObjectParser { + async parse( + schema: SharedHelper.JsonSchemaElementBase, + ): Promise<{ [key: string]: string | Record }> { + let parsedSchema: { [key: string]: string | Record } = {}; + if (SharedHelper.isArray(schema)) { + parsedSchema = await this.doParse(schema.items); + } else if (SharedHelper.isObject(schema)) { + parsedSchema = await this.doParse(schema); + } + + return parsedSchema; + } + + async doParse( + schema: SharedHelper.JsonSchemaElementBase, + ): Promise<{ [key: string]: string | Record }> { + const parsedSchema: { [key: string]: string | Record } = + {}; + if (SharedHelper.isObject(schema)) { + for (const key in schema.properties) { + if (Object.prototype.hasOwnProperty.call(schema.properties, key)) { + const currentProperty = schema.properties[key]; + if (SharedHelper.isObject(currentProperty)) { + parsedSchema[key] = await this.doParse(currentProperty); + } else if (SharedHelper.isArray(currentProperty)) { + const childSchema = currentProperty.items; + if (SharedHelper.isObject(childSchema)) { + parsedSchema[key] = await this.doParse(childSchema); + } else { + if (currentProperty.items.type !== undefined) { + const type: string = currentProperty.items.type; + parsedSchema[key] = type + '[]'; + } + } + } else { + parsedSchema[key] = currentProperty.type; + } + } + } + } + // TODO else array etc (if that should be supported) + + return parsedSchema; + } +} diff --git a/storage/storage-mq/src/service/sharedHelper.ts b/storage/storage-mq/src/service/sharedHelper.ts new file mode 100644 index 000000000..9562f55a1 --- /dev/null +++ b/storage/storage-mq/src/service/sharedHelper.ts @@ -0,0 +1,56 @@ +export function hasParent(parent: string): boolean { + return parent !== ''; +} + +export function isArray( + value: JsonSchemaElementBase, +): value is JsonSchemaElementArray { + return value.type === 'array'; +} + +export function isObject( + value: JsonSchemaElementBase, +): value is JsonSchemaElementObject { + return value.type === 'object'; +} + +export function isDefined(val: T | undefined | null): val is T { + return val !== undefined && val != null; +} + +type JsonSchemaTypes = + | 'string' + | 'number' + | 'integer' + | 'object' + | 'array' + | 'null' + | 'boolean'; + +// TODO perhaps support additional params, like enums: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/json-schema/index.d.ts +// The package above was not used due to consisting of only one interface with every possible attribute as optional -> type-guards useless due to additionally required undefined check + +export interface JsonSchemaElementBase { + $schema?: string; + $id: string; + type: JsonSchemaTypes; + // Object and array specific parameters + additionalItems?: boolean; + additionalProperties?: boolean; + required?: string[]; + properties?: Record; + items?: JsonSchemaElementBase; +} + +export interface JsonSchemaElementObject extends JsonSchemaElementBase { + type: 'object'; + additionalProperties?: boolean; + required?: string[]; + properties: Record; +} + +export interface JsonSchemaElementArray extends JsonSchemaElementBase { + type: 'array'; + additionalItems?: boolean; + items: JsonSchemaElementBase; +} diff --git a/storage/storage-mq/src/storage-content/postgresStorageContentRepository.ts b/storage/storage-mq/src/storage-content/postgresStorageContentRepository.ts index 1dc2f16f0..a9acf51bb 100644 --- a/storage/storage-mq/src/storage-content/postgresStorageContentRepository.ts +++ b/storage/storage-mq/src/storage-content/postgresStorageContentRepository.ts @@ -2,6 +2,9 @@ import { PostgresClient } from '@jvalue/node-dry-pg'; import { ClientBase, QueryResult } from 'pg'; import { POSTGRES_SCHEMA } from '../env'; +import JsonSchemaParser from '../service/jsonSchemaParser'; +import PostgresParser from '../service/postgresParser'; +import { isDefined } from '../service/sharedHelper'; import { InsertStorageContent, @@ -15,6 +18,8 @@ const GET_ALL_CONTENT_STATEMENT = (table: string): string => `SELECT * FROM "${POSTGRES_SCHEMA}"."${table}"`; const GET_CONTENT_STATEMENT = (table: string): string => `SELECT * FROM "${POSTGRES_SCHEMA}"."${table}" WHERE id = $1`; +const GET_LAST_ELEMENT_STATEMENT = (table: string): string => + `SELECT "id" FROM "${POSTGRES_SCHEMA}"."${table}" ORDER BY "id" DESC LIMIT 1`; const INSERT_CONTENT_STATEMENT = (table: string): string => `INSERT INTO "${POSTGRES_SCHEMA}"."${table}" ("data", "pipelineId", "timestamp") VALUES ($1, $2, $3) RETURNING *`; @@ -136,6 +141,45 @@ export class PostgresStorageContentRepository }); } + async saveContentForSchema( + tableIdentifier: string, + content: InsertStorageContent, + ): Promise { + // TODO what to do in case schema missing (for now throw error) + if (isDefined(content.schema)) { + // Used this due to type guard check not extending into the returned function -> would be required in there (again) + const schema = content.schema; + return await this.postgresClient.transaction(async (client) => { + const jsonSchemaParser: PostgresParser = new JsonSchemaParser(); + const resultSet: QueryResult = + await client.query(GET_LAST_ELEMENT_STATEMENT(tableIdentifier)); + const nextId = + resultSet.rowCount === 0 + ? 1 + : Number.parseInt(resultSet.rows[0].id, 10) + 1; + + /** + * When passed an array as value, pg assumes the value is meant to be a native Postgres array + * and therefore fails with a "invalid input syntax for type json" error when the target field + * is actually of type jsob/jsonb. + * + * Ref: https://github.com/brianc/node-postgres/issues/2012 + */ + const insertStatement: string = + await jsonSchemaParser.parseInsertStatement( + schema, + content.data, + POSTGRES_SCHEMA, + tableIdentifier, + nextId, + ); + await client.query(insertStatement); + return nextId; + }); + } + throw new Error('Missing schema!'); + } + private toStorageContents( resultSet: QueryResult, ): StorageContent[] { diff --git a/storage/storage-mq/src/storage-content/storageContentRepository.ts b/storage/storage-mq/src/storage-content/storageContentRepository.ts index 8c8686c7c..e055efb8a 100644 --- a/storage/storage-mq/src/storage-content/storageContentRepository.ts +++ b/storage/storage-mq/src/storage-content/storageContentRepository.ts @@ -1,3 +1,5 @@ +import { JsonSchemaElementBase } from '../service/sharedHelper'; + export interface StorageContentRepository { getAllContent: ( tableIdentifier: string, @@ -10,6 +12,10 @@ export interface StorageContentRepository { tableIdentifier: string, content: InsertStorageContent, ) => Promise; + saveContentForSchema: ( + tableIdentifier: string, + content: InsertStorageContent, + ) => Promise; } export interface StorageContent { @@ -23,4 +29,5 @@ export interface InsertStorageContent { pipelineId: number; timestamp: Date; data: unknown; + schema?: JsonSchemaElementBase; } diff --git a/storage/storage-mq/src/storage-structure/postgresStorageStructureRepository.ts b/storage/storage-mq/src/storage-structure/postgresStorageStructureRepository.ts index c1b9a9a26..825e79814 100644 --- a/storage/storage-mq/src/storage-structure/postgresStorageStructureRepository.ts +++ b/storage/storage-mq/src/storage-structure/postgresStorageStructureRepository.ts @@ -1,6 +1,9 @@ import { PostgresClient } from '@jvalue/node-dry-pg'; import { POSTGRES_SCHEMA } from '../env'; +import JsonSchemaParser from '../service/jsonSchemaParser'; +import PostgresParser from '../service/postgresParser'; +import { JsonSchemaElementBase } from '../service/sharedHelper'; import { StorageStructureRepository } from './storageStructureRepository'; @@ -33,6 +36,22 @@ export class PostgresStorageStructureRepository ); } + async createForSchema( + schema: JsonSchemaElementBase, + tableName: string, + ): Promise { + const jsonSchemaParser: PostgresParser = new JsonSchemaParser(); + const createStatements: string[] = + await jsonSchemaParser.parseCreateStatement( + schema, + POSTGRES_SCHEMA, + tableName, + ); + for (const statement of createStatements) { + await this.postgresClient.executeQuery(statement); + } + } + /** * Drops a table with name, provided by parameter tableIdentifier * @param tableIdentifier name of the table to be dropped diff --git a/storage/storage-mq/src/storage-structure/storageStructureRepository.ts b/storage/storage-mq/src/storage-structure/storageStructureRepository.ts index 7e02418ca..51a931ebf 100644 --- a/storage/storage-mq/src/storage-structure/storageStructureRepository.ts +++ b/storage/storage-mq/src/storage-structure/storageStructureRepository.ts @@ -1,4 +1,10 @@ +import { JsonSchemaElementBase } from '../service/sharedHelper'; + export interface StorageStructureRepository { create: (tableIdentifier: string) => Promise; + createForSchema: ( + schema: JsonSchemaElementBase, + tableName: string, + ) => Promise; delete: (tableIdentifier: string) => Promise; } diff --git a/storage/storage-mq/src/test/parser.test.ts b/storage/storage-mq/src/test/parser.test.ts new file mode 100644 index 000000000..be73d5947 --- /dev/null +++ b/storage/storage-mq/src/test/parser.test.ts @@ -0,0 +1,60 @@ +import JsonSchemaParser from '../service/jsonSchemaParser'; +import PostgresParser from '../service/postgresParser'; + +// Import SchemaToObjectParser from '../service/schematoobjectparser' +import * as testData from './testDataHelper'; + +describe('schema generation', () => { + test('return valid jsonschema for Pegel based on the ontology schema', async () => { + const jsonSchemaParser: PostgresParser = new JsonSchemaParser(); + const response = await jsonSchemaParser.parseCreateStatement( + testData.JSONSchemaPegelComplete, + 'TESTSCHEMA', + 'TESTTABLE', + ); + expect(response[0]).toEqual(testData.PostgresSchemaPegelCreate[0]); + }); +}); + +// TODO additional tests +/* Describe('schema generation', () => { + test('return valid jsonschema for Pegel based on the ontology schema', async () => { + const schemaParser = new SchemaParser() + const response = + await schemaParser.parse( + testData.JSONSchemaPegelComplete, + testData.MultiCompletePegel, + 'TESTSCHEMA', + 'TESTTABLE', + 0 + ) + expect(response).toEqual(testData.PostgresSchemaMultiPegelInsert) + }) +}) + +/* describe('schema generation', () => { + test('return valid jsonschema for Pegel based on the ontology schema', async () => { + const schemaParser = new SchemaParser() + const response = + await schemaParser.parse( + testData.JSONSchemaOrdngungsamtComplete, + testData.MultiCompleteOrdnungsamt, + 'TESTSCHEMA', + 'TESTTABLE', + 0 + ) + console.log(response) + expect(response).toEqual(testData.PostgresSchemaMultiPegelInsert) + }) +}) + +/* describe('schema generation', () => { + test('return valid jsonschema for Pegel based on the ontology schema', async () => { + const schemaToObjectParser = new SchemaToObjectParser() + const response = + await schemaToObjectParser.parse(testData.JSONSchemaOrdngungsamtComplete) + console.log(response) + expect(response).toEqual(testData.PostgresSchemaPegelComplete) + }) +}) +*/ diff --git a/storage/storage-mq/src/test/testDataHelper.ts b/storage/storage-mq/src/test/testDataHelper.ts new file mode 100644 index 000000000..54a8a789e --- /dev/null +++ b/storage/storage-mq/src/test/testDataHelper.ts @@ -0,0 +1,260 @@ +import { JsonSchemaElementArray } from '../service/sharedHelper'; + +export const JSONSchemaOrdngungsamtComplete = { + $schema: 'schema-recommendation/jsonschema/parser', + $id: '#/root', + type: 'object', + additionalProperties: true, + required: ['messages', 'results', 'index'], + properties: { + messages: { + $id: '#/root/messages', + type: 'object', + additionalProperties: true, + required: ['messages', 'success'], + properties: { + messages: { + $id: '#/root/messages/messages', + type: 'array', + additionalItems: true, + items: { + $id: '#/root/messages/messages/items', + }, + }, + success: { + $id: '#/root/messages/success', + type: 'boolean', + }, + }, + }, + results: { + $id: '#/root/results', + type: 'object', + additionalProperties: true, + required: ['count', 'itemsPerPage'], + properties: { + count: { + $id: '#/root/results/count', + type: 'number', + }, + itemsPerPage: { + $id: '#/root/results/itemsPerPage', + type: 'number', + }, + }, + }, + index: { + $id: '#/root/index', + type: 'array', + additionalItems: true, + items: { + $id: '#/root/index/items', + type: 'object', + additionalProperties: true, + required: [ + 'meldungsNummern', + 'bezirk', + 'betreff', + 'erstellungsDatum', + 'status', + 'sachverhalt', + ], + properties: { + meldungsNummern: { + $id: '#/root/index/items/meldungsNummern', + type: 'array', + additionalItems: true, + items: { + $id: '#/root/index/items/meldungsNummern/items', + type: 'string', + }, + }, + bezirk: { + $id: '#/root/index/items/bezirk', + type: 'string', + }, + betreff: { + $id: '#/root/index/items/betreff', + type: 'string', + }, + erstellungsDatum: { + $id: '#/root/index/items/erstellungsDatum', + type: 'string', + }, + status: { + $id: '#/root/index/items/status', + type: 'string', + }, + sachverhalt: { + $id: '#/root/index/items/sachverhalt', + type: 'string', + }, + }, + }, + }, + }, +}; + +export const JSONSchemaPegelComplete: JsonSchemaElementArray = { + $schema: 'schema-recommendation/jsonschema/parser', + $id: '#/root', + type: 'array', + additionalItems: true, + items: { + $id: '#/root/items', + type: 'object', + additionalProperties: true, + required: [ + 'uuid', + 'number', + 'shortname', + 'longname', + 'km', + 'agency', + 'longitude', + 'latitude', + 'water', + ], + properties: { + uuid: { + $id: '#/root/items/uuid', + type: 'string', + }, + number: { + $id: '#/root/items/number', + type: 'string', + }, + shortname: { + $id: '#/root/items/shortname', + type: 'string', + }, + longname: { + $id: '#/root/items/longname', + type: 'string', + }, + km: { + $id: '#/root/items/km', + type: 'number', + }, + agency: { + $id: '#/root/items/agency', + type: 'string', + }, + longitude: { + $id: '#/root/items/longitude', + type: 'number', + }, + latitude: { + $id: '#/root/items/latitude', + type: 'number', + }, + water: { + $id: '#/root/items/water', + type: 'object', + additionalProperties: true, + required: ['shortname', 'longname'], + properties: { + shortname: { + $id: '#/root/items/water/shortname', + type: 'string', + }, + longname: { + $id: '#/root/items/water/longname', + type: 'string', + }, + }, + }, + }, + }, +}; + +export const MultiCompleteOrdnungsamt = { + messages: { + success: true, + }, + results: { + count: 1, + itemsPerPage: 2, + }, + index: [ + { + meldungsNummern: ['Hallo', 'Welt'], + bezirk: '48900237', + betreff: 'FIRST', + erstellungsDatum: 'FIRST', + status: '9.56', + sachverhalt: 'WSA VERDEN', + }, + { + meldungsNummern: ['Hallo', 'Welt'], + bezirk: '48900237', + betreff: 'SECOND', + erstellungsDatum: 'SECOND', + status: '9.56', + sachverhalt: 'WSA VERDEN', + }, + ], +}; + +export const MultiCompletePegel = [ + { + uuid: '1', + number: '48900237', + shortname: 'FIRST', + longname: 'FIRST', + km: 9.56, + agency: 'WSA VERDEN', + longitude: 9.27676943537587, + latitude: 52.90406541008721, + water: { + shortname: 'FIRST', + longname: 'FIRST', + }, + }, + { + uuid: '2', + number: '48900237', + shortname: 'SECOND', + longname: 'SECOND', + km: 9.56, + agency: 'WSA VERDEN', + longitude: 9.27676943537587, + latitude: 52.90406541008721, + water: { + shortname: 'SECOND', + longname: 'SECOND', + }, + }, +]; + +export const PostgresSchemaPegelCreate = [ + 'CREATE TABLE IF NOT EXISTS "TESTSCHEMA"."TESTTABLE" (' + + '"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY, ' + + '"createdAt" timestamp not null default CURRENT_TIMESTAMP, ' + + '"uuid" text, "number" text, "shortname" text, "longname" text, ' + + '"km" integer, "agency" text, "longitude" integer, "latitude" integer, ' + + 'CONSTRAINT "Data_pk_TESTSCHEMA_TESTTABLE" PRIMARY KEY (id)' + + ')', + 'CREATE TABLE IF NOT EXISTS "TESTSCHEMA"."TESTTABLE_water" (' + + '"id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY, ' + + '"createdAt" timestamp not null default CURRENT_TIMESTAMP, ' + + '"shortname" text, "longname" text, "TESTTABLEid" bigint NOT NULL, ' + + 'CONSTRAINT "Data_fk_TESTSCHEMA_TESTTABLE_water" FOREIGN KEY (TESTTABLEid) ' + + 'REFERENCES TESTSCHEMA.TESTTABLE(id), ' + + 'CONSTRAINT "Data_pk_TESTSCHEMA_TESTTABLE_water" PRIMARY KEY (id)' + + ')', +]; + +/* Export const PostgresSchemaMultiPegelInsert = + 'INSERT INTO "TESTSCHEMA"."TESTTABLE" (' + + '"uuid","number","shortname","longname","km","agency","longitude","latitude")' + + ' VALUES ('1','48900237','FIRST','FIRST',9.56,'WSA VERDEN',9.27676943537587,52.90406541008721)` + + ' RETURNING *;' + + 'INSERT INTO "TESTSCHEMA"."TESTTABLE_water" (' + + `"shortname","longname","TESTTABLEid") VALUES ('FIRST','FIRST',0) RETURNING *;` + + 'INSERT INTO "TESTSCHEMA"."TESTTABLE" (' + + '"uuid","number","shortname","longname","km","agency","longitude","latitude")' + + ` VALUES ('2','48900237','SECOND','SECOND',9.56,'WSA VERDEN',9.27676943537587,52.90406541008721) RETURNING *;` + + 'INSERT INTO "TESTSCHEMA"."TESTTABLE_water" (' + + `"shortname","longname","TESTTABLEid") VALUES ('SECOND','SECOND',1) RETURNING *;` +*/ diff --git a/ui/src/datasource/edit/schema/DatasourceSchemaEdit.vue b/ui/src/datasource/edit/schema/DatasourceSchemaEdit.vue index f8867e931..0f9b0e929 100644 --- a/ui/src/datasource/edit/schema/DatasourceSchemaEdit.vue +++ b/ui/src/datasource/edit/schema/DatasourceSchemaEdit.vue @@ -65,6 +65,10 @@ export default class DatasourceSchemaEdit extends Vue { } formChanged(): void { + this.dataSource.schema = JSON.parse(this.schemaAsText) as Record< + string, + unknown + >; this.emitValue(); this.emitValid(); } diff --git a/ui/src/pipeline/edit/schema/PipelineSchemaEdit.vue b/ui/src/pipeline/edit/schema/PipelineSchemaEdit.vue index 20095a34c..f5c8ff665 100644 --- a/ui/src/pipeline/edit/schema/PipelineSchemaEdit.vue +++ b/ui/src/pipeline/edit/schema/PipelineSchemaEdit.vue @@ -70,6 +70,10 @@ export default class PipelineSchemaEdit extends Vue { } formChanged(): void { + this.pipeline.schema = JSON.parse(this.schemaAsText) as Record< + string, + unknown + >; this.emitValue(); this.emitValid(); }