Ben özel WindowAssigher
uyguladık:Özel için kaydetme noktaları nasıl uygulanır?
public class SessionWindowAssigner extends WindowAssigner<LogItem, SessionWindow> {
@Override
public Collection<SessionWindow> assignWindows(LogItem element, long timestamp) {
return Collections.singletonList(new SessionWindow(element.getSessionUid()));
}
@Override
public Trigger<LogItem, SessionWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return new SessionTrigger(60_000L);
}
@Override
public TypeSerializer<SessionWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new SessionWindow.Serializer();
}
}
, Window
:
public class SessionWindow extends Window {
private final String sessionUid;
public SessionWindow(String sessionUid) {
this.sessionUid = sessionUid;
}
public String getSessionUid() {
return sessionUid;
}
@Override
public long maxTimestamp() {
return Long.MAX_VALUE;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SessionWindow that = (SessionWindow) o;
return sessionUid.equals(that.sessionUid);
}
@Override
public int hashCode() {
return sessionUid.hashCode();
}
public static class Serializer extends TypeSerializer<SessionWindow> {
private static final long serialVersionUID = 1L;
@Override
public boolean isImmutableType() {
return true;
}
@Override
public TypeSerializer<SessionWindow> duplicate() {
return this;
}
@Override
public SessionWindow createInstance() {
return null;
}
@Override
public SessionWindow copy(SessionWindow from) {
return from;
}
@Override
public SessionWindow copy(SessionWindow from, SessionWindow reuse) {
return from;
}
@Override
public int getLength() {
return 0;
}
@Override
public void serialize(SessionWindow record, DataOutputView target) throws IOException {
target.writeUTF(record.sessionUid);
}
@Override
public SessionWindow deserialize(DataInputView source) throws IOException {
return new SessionWindow(source.readUTF());
}
@Override
public SessionWindow deserialize(SessionWindow reuse, DataInputView source) throws IOException {
return new SessionWindow(source.readUTF());
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.writeUTF(source.readUTF());
}
@Override
public boolean equals(Object obj) {
return obj instanceof Serializer;
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof Serializer;
}
@Override
public int hashCode() {
return 0;
}
}
}
ve Trigger
:
public class SessionTrigger extends Trigger<LogItem, SessionWindow> {
private final long sessionTimeout;
private final ValueStateDescriptor<Long> previousFinishTimestampDesc = new ValueStateDescriptor<>("SessionTrigger.timestamp", LongSerializer.INSTANCE, null);
public SessionTrigger(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
@Override
public TriggerResult onElement(LogItem element, long timestamp, SessionWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);
Long previousFinishTimestamp = previousFinishTimestampState.value();
Long newFinisTimestamp = timestamp + sessionTimeout;
if (previousFinishTimestamp != null) {
ctx.deleteEventTimeTimer(previousFinishTimestamp);
}
ctx.registerEventTimeTimer(newFinisTimestamp);
previousFinishTimestampState.update(newFinisTimestamp);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
throw new UnsupportedOperationException("This is not processing time trigger");
}
@Override
public void clear(SessionWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);
Long previousFinishTimestamp = previousFinishTimestampState.value();
ctx.deleteEventTimeTimer(previousFinishTimestamp);
previousFinishTimestampState.clear();
}
}
için zaman aşımı yani tarafından oturumun sonunu algılamak son olay N saniye önce olsaydı sonra pencere işlevini değerlendirir. Gördüğünüz gibi, hatadan sonra geri yüklemek istediğim için, son olay zaman damgasını ValueState'de kaydediyorum.
Akışımın yeniden dağıtımı sırasında tetikleme durumunu kaybetmek istemediğim için bu tetikleyicideki kaydetme noktası (ve denetim noktası) anlık görüntülerini kaydetmek/geri yüklemek için Checkpointed
arabirimini kullanmalıyım gibi görünüyor.
Peki, doğru şekilde dağıtımı sırasında SessionTrigger
tetikleyicisinin (ve muhtemelen ilgili pencerelerin) durumunun nasıl kaydedileceğini herkes anlatabilir mi?
Anladığım kadarıyla sadece SessionTrigger
için Checkpointed
arabirimini kullanmalıyım çünkü yalnızca durumu vardır. Sağ? SessionWindow
-s ve SessionWindowAssigner
hakkında? Otomatik olarak dağıtıldıktan sonra geri yüklenecek mi yoksa manuel olarak mı yapmalıyım?