Skip to content
Snippets Groups Projects

Version2

Merged Michael Dinzinger requested to merge version2 into master
233 files
+ 5097
18315
Compare changes
  • Side-by-side
  • Inline
Files
233
package eu.ows;
import eu.ows.model.documents.full.DomainAggregation;
import eu.ows.model.external.HostLogPair;
import eu.ows.model.external.LogEntry;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch._types.SortOptions;
import org.opensearch.client.opensearch._types.SortOrder;
import org.opensearch.client.opensearch._types.query_dsl.Query;
import org.opensearch.client.opensearch.core.SearchRequest;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
public class CheckDomainAggregations implements ICheckAggregation<DomainAggregation> {
@Override
public boolean compare(DomainAggregation expected, DomainAggregation actual) {
Objects.requireNonNull(expected);
Objects.requireNonNull(actual);
// TimeWindow is not calculated, since that is provided by Flink anyway, it is assumed correct.
if (actual.getFirstSeen() == null || actual.getLastSeen() == null) {
return false; // Fields are required.
}
expected.updateTimeWindow(actual.getFirstSeen(), actual.getLastSeen());
return expected.equals(actual);
}
@Override
public DomainAggregation buildAggregation(List<LogEntry> logHistory, DomainAggregation expected) {
// The history contains some logs that do not match this domain,
// the problem is that we cannot restrict the SearchRequest correctly.
final var e = new DomainAggregation();
for (final var l : logHistory) {
if (!matchesDomain(l.getUrl(), expected.getDomain())) {
continue;
}
e.add(expected.getDomain(), l);
}
return e.build();
}
private static boolean matchesDomain(String url, String domain) {
final URI uri;
try {
uri = new URI(url);
} catch (URISyntaxException ignored) {
return domain.equals(HostLogPair.FAILED_URI_HOST_EXTRACTION);
}
if (uri.getHost() == null) {
return domain.equals(HostLogPair.FAILED_URI_HOST_EXTRACTION);
}
domain = domain.toLowerCase(Locale.ROOT);
// If the domain name is absolute (ends with '.') remove it.
// It would simply cause confusion, but strictly speaking "wikipedia.org" and "wikipedia.org."
// are not necessarily the same site.
var host = uri.getHost();
if (host.endsWith(String.valueOf('.'))) {
host = host.substring(0, host.length() - 1);
}
// Collect domains for every '.', but never emit duplicates.
if (domain.equals(host.toLowerCase(Locale.ROOT))) {
return true;
}
for (int i = 0; i < host.length(); ++i) {
if (host.charAt(i) == '.') {
final var h = host.substring(i + 1).toLowerCase(Locale.ROOT);
if (domain.equals(h)) {
return true;
}
}
}
return false;
}
@Override
public SearchRequest createSearchRequest(List<String> searchAfter, DomainAggregation expected) {
final SearchRequest.Builder s = new SearchRequest.Builder();
final var stayBehind = Query.of(q -> q.range(r -> r.field(LogEntry.TIMESTAMP_FIELD_NAME).lte(JsonData.of(Main.STAY_BEHIND))));
final var term = Query.of(q -> q.wildcard(w -> w
.field("url")
.value("*" + expected.getDomain() + '*') // REALLY SLOW, but only way to get all logs for this domain!
.caseInsensitive(true)));
s.index(Main.LOGS_INDEX_PATTERN);
s.size(10_000);
s.query(q -> q.bool(b -> b.filter(List.of(stayBehind, term))));
s.sort(List.of(
SortOptions.of(sort -> sort.field(v -> v
.field(LogEntry.TIMESTAMP_FIELD_NAME)
.order(SortOrder.Asc))),
SortOptions.of(sort -> sort.field(v -> v
.field(LogEntry.SORT_TIEBREAKER_FIELD_NAME)
.order(SortOrder.Asc)))
));
if (searchAfter != null && !searchAfter.isEmpty()) {
s.searchAfter(searchAfter);
}
return s.build();
}
@Override
public Class<DomainAggregation> getAggregationClass() {
return DomainAggregation.class;
}
@Override
public String aggregationIndex() {
return "domains"; // Alias
}
}
Loading