diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java index da3a5dfe8628bbbbe434a042604883ca2c16019c..d3c8516882fa6b6a61027494fb72d564f8d8e1db 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java @@ -76,8 +76,6 @@ public final class JavaStructuredSessionization { for (String word : lineWithTimestamp.getLine().split(" ")) { eventList.add(new Event(word, lineWithTimestamp.getTimestamp())); } - System.out.println( - "Number of events from " + lineWithTimestamp.getLine() + " = " + eventList.size()); return eventList.iterator(); } }; @@ -100,7 +98,7 @@ public final class JavaStructuredSessionization { // If timed out, then remove session and send final update if (state.hasTimedOut()) { SessionUpdate finalUpdate = new SessionUpdate( - sessionId, state.get().getDurationMs(), state.get().getNumEvents(), true); + sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true); state.remove(); return finalUpdate; @@ -133,7 +131,7 @@ public final class JavaStructuredSessionization { // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("10 seconds"); return new SessionUpdate( - sessionId, state.get().getDurationMs(), state.get().getNumEvents(), false); + sessionId, state.get().calculateDuration(), state.get().getNumEvents(), false); } } }; @@ -215,7 +213,8 @@ public final class JavaStructuredSessionization { public long getEndTimestampMs() { return endTimestampMs; } public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; } - public long getDurationMs() { return endTimestampMs - startTimestampMs; } + public long calculateDuration() { return endTimestampMs - startTimestampMs; } + @Override public String toString() { return "SessionInfo(numEvents = " + numEvents + ", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")";