/** * Estimate the number of distinct elements in a data stream * (F0 estimation problem) based on the algorithm published in the * Proceedings of 30th Annual European Symposium on Algorithms (ESA 2022) * https://doi.org/10.48550/arXiv.2301.10191 */import java.util.HashSet;import java.util.Random;import java.util.Set;import java.util.function.Function;import java.util.stream.IntStream;import java.util.stream.Stream;public class F0Estimator {// Probability to add an elementprivate double p =1.0;/** * Estimate number of unique elements in the passed stream * @param storageSize The storage to use */public<T>longuniqueElements(Stream<T> stream,int storageSize) {final float LOAD_FACTOR =0.5f;
Set<T> X =new HashSet<>(storageSize , LOAD_FACTOR);
Random random =newRandom();
stream.forEach(element -> {// Ensure element is in the set with probability pif(random.nextDouble() < p)
X.add(element);else
X.remove(element);if(X.size() >= storageSize) {// Randomly keep each element in X with probability 1/2
X.removeIf(e -> random.nextDouble() <0.5);
p /=2;if(X.size() >= storageSize) {throw newIllegalStateException("Threshold exceeded after sampling");}}});return(long) (X.size() / p);}public static voidmain(String[] args) {// Create a Random instance
Random random =newRandom();// Create a stream of many random integers
Stream<Integer> stream = IntStream
.generate(random::nextInt).limit(1_000_000_000).map(i -> i &0xffff).boxed();final int STORAGE_SIZE =1000;var estimator =newF0Estimator();long uniqueCount = estimator.uniqueElements(stream, STORAGE_SIZE);
System.out.println("Estimated number of unique elements: "+ uniqueCount);}}