Commit e6925a3a authored by Álex Cortiñas's avatar Álex Cortiñas

Some refinements

parent fd75354c
......@@ -22,12 +22,12 @@ public class StorageSystem {
MyProperties prop = new MyProperties("/config.properties");
PostgresWriter pg = new PostgresWriter(
prop.get("datasource"),
prop.get("pg-ip-port"),
prop.get("pg-database"),
prop.get("pg-user"),
prop.get("pg-pass"));
// PostgresWriter pg = new PostgresWriter(
// prop.get("datasource"),
// prop.get("pg-ip-port"),
// prop.get("pg-database"),
// prop.get("pg-user"),
// prop.get("pg-pass"));
DruidWriter druidWriter = new DruidWriter(
prop.get("druidSpec"),
......@@ -39,26 +39,26 @@ public class StorageSystem {
Boolean.parseBoolean(prop.get("commit")),
prop.get("topic"));
Function<Long, Boolean> notOld = (time) ->
time > System.currentTimeMillis() - Long.parseLong(prop.get("kafka-period"));
// Function<Long, Boolean> notOld = (time) ->
// time > System.currentTimeMillis() - Long.parseLong(prop.get("kafka-period"));
try {
kafka.read((List<ConsumerRecord<String, String>> buffer) -> {
List<Event> events = buffer
.stream()
.map(record -> ModelMapper.toEvent(record.value()).complete())
.filter(m -> notOld.apply(m.getTime()))
// .filter(m -> notOld.apply(m.getTime()))
.collect(Collectors.toList());
if (events.size() > 0) {
pg.insert(events);
// pg.insert(events);
druidWriter.insert(events);
}
return null;
});
} finally {
pg.disconnect();
// pg.disconnect();
druidWriter.disconnect();
kafka.disconnect();
}
......
......@@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory;
public class KafkaReader {
private static final Logger logger = LoggerFactory.getLogger(KafkaReader.class);
private static final int maxPoll = 5000;
private static final int maxPoll = 1000;
private static final int intervalCommit = 60000;
private static Properties props = null;
......
......@@ -61,6 +61,7 @@ public class Event {
this.y_7 = df7.format(this.y);
this.x_8 = df8.format(this.x);
this.y_8 = df8.format(this.y);
this.time = System.currentTimeMillis();
return this;
}
......
# datasource (druid and postgres table)
# datasource (druid, postgres table and mongo collection)
datasource=theTopic
# kafka topic
......@@ -18,3 +18,7 @@ commit=true
# time the data is valid, usually depending on datasource.json
kafka-period=540000
mongo-host=127.0.0.1
mongo-port=27017
mongo-database=database
......@@ -6,7 +6,11 @@
},
"dataSources": [
{
"spec": {
"properties": {
"task.partitions": "1",
"task.replicants": "1"
},
"spec": {
"dataSchema": {
"dataSource": "theTopic",
"granularitySpec": {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment