Commit 85ea722a authored by Álex Cortiñas's avatar Álex Cortiñas
Browse files

Second approach

parent 28ec8004
......@@ -6,3 +6,4 @@ bin
build
.idea
*.un~
out
......@@ -27,6 +27,9 @@ dependencies {
// jackson
compile 'com.fasterxml.jackson.core:jackson-core:2.8.1'
// mongo
compile 'org.mongodb:mongodb-driver:3.4.2'
}
mainClassName = 'es.udc.lbd.gis.storagesystem.StorageSystem'
......
File mode changed from 100755 to 100644
package es.udc.lbd.gis.storagesystem;
import es.udc.lbd.gis.storagesystem.conectors.DruidWriter;
import es.udc.lbd.gis.storagesystem.conectors.KafkaReader;
import es.udc.lbd.gis.storagesystem.conectors.PostgresWriter;
import es.udc.lbd.gis.storagesystem.conectors.*;
import es.udc.lbd.gis.storagesystem.config.MyProperties;
import es.udc.lbd.gis.storagesystem.model.Event;
import es.udc.lbd.gis.storagesystem.model.ModelMapper;
......@@ -14,18 +12,23 @@ import java.util.stream.Collectors;
public class StorageSystem {
public static void main(String[] args) {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
MyProperties prop = new MyProperties("/config.properties");
PostgresWriter pg = new PostgresWriter(
prop.get("datasource"),
prop.get("pg-ip-port"),
prop.get("pg-database"),
prop.get("pg-user"),
prop.get("pg-pass"));
// PostgresWriter pg = new PostgresWriter(
// prop.get("datasource"),
// prop.get("pg-ip-port"),
// prop.get("pg-database"),
// prop.get("pg-user"),
// prop.get("pg-pass"));
DruidWriter druidWriter = new DruidWriter(
prop.get("druidSpec"),
prop.get("datasource"));
// DruidWriter druidWriter = new DruidWriter(
// prop.get("druidSpec"),
// prop.get("datasource"));
KafkaReader kafka = new KafkaReader(
prop.get("kafka-ip-port"),
......@@ -33,6 +36,13 @@ public class StorageSystem {
Boolean.parseBoolean(prop.get("commit")),
prop.get("topic"));
MongoConnection mongoConnection = new MongoConnection(
prop.get("mongo-host"),
Integer.parseInt(prop.get("mongo-port")),
prop.get("mongo-database"),
prop.get("datasource"));
MongoWriter mongo = new MongoWriter(mongoConnection);
Function<Long, Boolean> notOld = (time) ->
time > System.currentTimeMillis() - Long.parseLong(prop.get("kafka-period"));
......@@ -41,19 +51,21 @@ public class StorageSystem {
List<Event> events = buffer
.stream()
.map(record -> ModelMapper.toEvent(record.value()).complete())
.filter(m -> notOld.apply(m.getTime()))
// .filter(m -> notOld.apply(m.getTime()))
.collect(Collectors.toList());
if (events.size() > 0) {
pg.insert(events);
druidWriter.insert(events);
// pg.insert(events);
mongo.insert(events);
// druidWriter.insert(events);
}
return null;
});
} finally {
pg.disconnect();
druidWriter.disconnect();
// pg.disconnect();
mongoConnection.disconnect();
// druidWriter.disconnect();
kafka.disconnect();
}
}
......
......@@ -69,7 +69,7 @@ public class DruidWriter {
}
public void insert(List<Event> stream) {
stream.stream().map(r -> ModelMapper.toMap(r)).forEach(m -> insertItem(m));
stream.stream().map(r -> r.toMap()).forEach(m -> insertItem(m));
}
private void printState() {
......@@ -94,6 +94,8 @@ public class DruidWriter {
logger.debug(String.format("Dropped message: %s", m.toString()), e);
dropped++;
if (dropped % 1000 == 0) {
logger.error(String.format("Dropped message: %s", m.toString()), e);
logger.error("Current system timestamp: " + System.currentTimeMillis());
printState();
}
} else {
......
......@@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory;
public class KafkaReader {
private static final Logger logger = LoggerFactory.getLogger(KafkaReader.class);
private static final int maxPoll = 10000;
private static final int maxPoll = 5000;
private static final int intervalCommit = 60000;
private static Properties props = null;
......
package es.udc.lbd.gis.storagesystem.conectors;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
public class MongoConnection {
private MongoClient mongoClient;
private MongoDatabase database;
private MongoCollection<Document> collection;
public MongoConnection(String host, int port, String database, String datasource) {
this.mongoClient = new MongoClient(host, port);
this.database = mongoClient.getDatabase(database);
this.collection = this.database.getCollection(datasource);
}
public MongoCollection<Document> getCollection() {
return collection;
}
public MongoDatabase getDatabase() {
return database;
}
public void disconnect() {
mongoClient.close();
}
}
package es.udc.lbd.gis.storagesystem.conectors;
import es.udc.lbd.gis.storagesystem.model.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
public class MongoWriter {
private static final Logger logger = LoggerFactory.getLogger(MongoWriter.class);
private MongoConnection connection;
public MongoWriter(MongoConnection mongoConnection) {
this.connection = mongoConnection;
}
public void insert(List<Event> stream) {
connection.getCollection().insertMany(stream.stream().map(z -> z.toDocument()).collect(Collectors.toList()));
}
}
......@@ -5,6 +5,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.stream.Collectors;
import es.udc.lbd.gis.storagesystem.model.Event;
import org.slf4j.Logger;
......@@ -72,13 +73,7 @@ public class PostgresWriter {
Statement st = null;
try {
st = conn.createStatement();
st.execute("create table " + tableName + "(" + "username bigint," + "time bigint,"
+ "geom geometry(POINT, 4326)," + "x double precision," + "y double precision,"
+ "speed double precision," + "x_2 double precision," + "y_2 double precision,"
+ "x_3 double precision," + "y_3 double precision," + "x_4 double precision,"
+ "y_4 double precision," + "x_5 double precision," + "y_5 double precision,"
+ "x_6 double precision," + "y_6 double precision," + "x_7 double precision,"
+ "y_7 double precision," + "x_8 double precision," + "y_8 double precision" + ")");
st.execute(Event.createTableSQL(tableName));
st.close();
} catch (SQLException ex) {
logger.error("Error querying postgres when creating database", ex);
......@@ -115,9 +110,7 @@ public class PostgresWriter {
Statement st = null;
try {
st = conn.createStatement();
int res = st.executeUpdate("insert into " + tableName + "(username, time, geom, x, y, speed, "
+ "x_2, y_2, x_3, y_3, x_4, y_4, x_5, y_5, x_6, y_6, x_7, y_7, x_8, y_8) values "
+ stream.stream().map(r -> r.toSQL()).reduce((a, b) -> a.concat(", ".concat(b))).get());
int res = st.executeUpdate(Event.toInsertSQL(tableName) + stream.stream().map(r -> r.toSQL()).collect(Collectors.joining(", ")));
count += res;
auxTime = (System.currentTimeMillis() - startTime) / 1.0E03;
logger.info("PostgresWriter: Inserted {} events | Total: {} events in {} seconds | Events per second: {}", res, count,
......
package es.udc.lbd.gis.storagesystem.model;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import org.bson.Document;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
public class Event {
public final static int MAX_ZOOM = 18;
private final static double BASE_SEPARATION = 90;
private final static Map<Integer, Double> steps = new HashMap<>();
private final static long ONE_HOUR = 1 * 60 * 60 * 1000;
private String user;
private Long time;
private Double speed;
private Double x;
private Double y;
private String x_2;
private String y_2;
private String x_3;
private String y_3;
private String x_4;
private String y_4;
private String x_5;
private String y_5;
private String x_6;
private String y_6;
private String x_7;
private String y_7;
private String x_8;
private String y_8;
private static DecimalFormat df2 = new DecimalFormat("#.##");
private static DecimalFormat df3 = new DecimalFormat("#.###");
private static DecimalFormat df4 = new DecimalFormat("#.####");
private static DecimalFormat df5 = new DecimalFormat("#.#####");
private static DecimalFormat df6 = new DecimalFormat("#.######");
private static DecimalFormat df7 = new DecimalFormat("#.#######");
private static DecimalFormat df8 = new DecimalFormat("#.########");
private Map<Integer, Point> values = new HashMap<>();
static {
df2.setRoundingMode(RoundingMode.CEILING);
df3.setRoundingMode(RoundingMode.CEILING);
df4.setRoundingMode(RoundingMode.CEILING);
df5.setRoundingMode(RoundingMode.CEILING);
df6.setRoundingMode(RoundingMode.CEILING);
df7.setRoundingMode(RoundingMode.CEILING);
df8.setRoundingMode(RoundingMode.CEILING);
for (int zoom = 0; zoom <= MAX_ZOOM; zoom++) {
steps.put(zoom, step(zoom));
}
System.out.println(steps);
}
public Event() {
}
public Event complete() {
this.x_2 = df2.format(this.x);
this.y_2 = df2.format(this.y);
this.x_3 = df3.format(this.x);
this.y_3 = df3.format(this.y);
this.x_4 = df4.format(this.x);
this.y_4 = df4.format(this.y);
this.x_5 = df5.format(this.x);
this.y_5 = df5.format(this.y);
this.x_6 = df6.format(this.x);
this.y_6 = df6.format(this.y);
this.x_7 = df7.format(this.x);
this.y_7 = df7.format(this.y);
this.x_8 = df8.format(this.x);
this.y_8 = df8.format(this.y);
private static double step(int zoom) {
if (zoom == 0) return BASE_SEPARATION;
return step(zoom - 1) / 2;
}
private double closestMultiple(double number, double step) {
int lowerMultiple = (int) (number / step);
double lower = lowerMultiple * step;
double upper = (lowerMultiple + 1) * step;
return Math.abs(number - lower) < Math.abs(number - upper) ? lower : upper;
}
public Event complete() {
//time = System.currentTimeMillis();
steps.entrySet().forEach(z -> this.values.put(z.getKey(), new Point(closestMultiple(x, z.getValue()), closestMultiple(y, z.getValue()))));
return this;
}
private String valuesToSQL() {
return this.values.values().stream().map(z -> z.getX() + "," + z.getY()).collect(Collectors.joining(","));
}
private static String valuesToInsertSQL() {
return steps.keySet().stream().sorted().map(z -> "x_" + auxGetKey(z) + ", y_" + auxGetKey(z) + "").collect(Collectors.joining(","));
}
public static String toInsertSQL(String tableName) {
return "insert into " + tableName + "(username, time, geom, x, y, speed," + valuesToInsertSQL() + ") values ";
}
public String toSQL() {
return "(" +
user + "," +
......@@ -73,23 +68,54 @@ public class Event {
x + "," +
y + "," +
speed + "," +
x_2 + "," +
y_2 + "," +
x_3 + "," +
y_3 + "," +
x_4 + "," +
y_4 + "," +
x_5 + "," +
y_5 + "," +
x_6 + "," +
y_6 + "," +
x_7 + "," +
y_7 + "," +
x_8 + "," +
y_8 +
valuesToSQL() +
")";
}
private static String valuesToCreateTableSQL() {
return steps.keySet().stream().sorted().map(z -> "x_" + auxGetKey(z) + " double precision, y_" + auxGetKey(z) + " double precision").collect(Collectors.joining(","));
}
public static String createTableSQL(String tableName) {
return "create table " + tableName + "(" + "username bigint," + "time bigint,"
+ "geom geometry(POINT, 4326)," + "x double precision," + "y double precision,"
+ "speed double precision," + valuesToCreateTableSQL() + ")";
}
private static String auxGetKey(int zoom) {
if (zoom < 10) return "0" + zoom;
return "" + zoom;
}
public Map<String, Object> toMap() {
// TODO falta el geom!!
// e.geom = {
// "type" : "Point",
// "coordinates" : [
// e.x,
// e.y
// ]
// };
final Map<String, Object> ret = new HashMap<>();
ret.put("user", user);
ret.put("time", time);
ret.put("speed", speed);
ret.put("x", x);
ret.put("y", y);
values.entrySet().stream().forEach(z -> {
ret.put("x_" + auxGetKey(z.getKey()), z.getValue().getX());
ret.put("y_" + auxGetKey(z.getKey()), z.getValue().getY());
});
return ret;
}
public Document toDocument() {
return new Document(this.toMap());
}
public String getUser() {
return user;
}
......@@ -130,121 +156,12 @@ public class Event {
this.y = y;
}
public String getX_2() {
return x_2;
}
public String getY_2() {
return y_2;
}
public String getX_3() {
return x_3;
}
public String getY_3() {
return y_3;
}
public String getX_4() {
return x_4;
}
public String getY_4() {
return y_4;
}
public String getX_5() {
return x_5;
}
public String getY_5() {
return y_5;
}
public String getX_6() {
return x_6;
}
public String getY_6() {
return y_6;
}
public String getX_7() {
return x_7;
}
public String getY_7() {
return y_7;
}
public String getX_8() {
return x_8;
}
public String getY_8() {
return y_8;
}
public void setX_2(String x_2) {
this.x_2 = x_2;
}
public void setY_2(String y_2) {
this.y_2 = y_2;
}
public void setX_3(String x_3) {
this.x_3 = x_3;
}
public void setY_3(String y_3) {
this.y_3 = y_3;
}
public void setX_4(String x_4) {
this.x_4 = x_4;
}
public void setY_4(String y_4) {
this.y_4 = y_4;
}
public void setX_5(String x_5) {
this.x_5 = x_5;
}
public void setY_5(String y_5) {
this.y_5 = y_5;
}
public void setX_6(String x_6) {
this.x_6 = x_6;
}
public void setY_6(String y_6) {
this.y_6 = y_6;
}
public void setX_7(String x_7) {
this.x_7 = x_7;
}
public void setY_7(String y_7) {
this.y_7 = y_7;
}
public void setX_8(String x_8) {
this.x_8 = x_8;
}
public void setY_8(String y_8) {
this.y_8 = y_8;
public Map<Integer, Point> getValues() {
return values;
}
@Override
public String toString() {
return "Event [user=" + user + ", time=" + time + ", speed=" + speed + ", x=" + x + ", y=" + y + "]";
}
}
package es.udc.lbd.gis.storagesystem.model;
public class Point {
private Double x;
private Double y;
public Point(Double x, Double y) {
this.x = x;
this.y = y;
}
public Double getX() {
return x;
}
public void setX(Double x) {
this.x = x;
}
public Double getY() {
return y;
}
public void setY(Double y) {
this.y = y;
}
}
# datasource (druid and postgres table)
# datasource (druid, postgres table and mongo collection)
datasource=theTopic
# kafka topic
......@@ -18,3 +18,7 @@ commit=true
# time the data is valid, usually depending on datasource.json
kafka-period=540000
mongo-host=127.0.0.1
mongo-port=27017
mongo-database=database
......@@ -6,6 +6,10 @@
},
"dataSources": [
{
"properties": {
"task.partitions": "1",
"task.replicants": "1"
},
"spec": {
"dataSchema": {
"dataSource": "theTopic",
......@@ -43,20 +47,44 @@
"user",
"x",
"y",
"x_2",
"y_2",
"x_3",
"y_3",
"x_4",
"y_4",
"x_5",
"y_5",
"x_6",