收集数据并发送到Apache Kafka

介绍



分析流数据需要此数据的来源。消息来源提供的信息也很重要。例如,带有文本信息的资源也很少。



有趣的来源包括:twittervk但是这些来源并不适合所有任务。



有具有所需数据的来源,但是这些来源没有流式传输。在此可以引用以下链接:public-apis



解决流数据问题时,可以使用旧方法。



下载数据并发送到流。



例如,您可以使用以下来源:imdb

应该注意的是,imdb自己提供数据。请参阅IMDb数据集但是可以假设收集到的数据直接包含更多相关信息。



语言:Java 1.8。

库:kafka 2.6.0,jsoup 1.13.1。



数据采集



数据收集是一种服务,该服务根据输入的数据加载html页面,搜索必要的信息并将其转换为一组对象。



因此数据源:imdb将收集有关电影的信息,并将使用以下请求:https : //www.imdb.com/search/title/?release_date=%

s,% s&countries=%s其中1、2为日期。参数3-国家。



为了更好地理解数据源,可以参考以下资源:imdb-extensive-dataset



服务界面:



public interface MovieDirectScrapingService {
    Collection<Movie> scrap();
}


Movie类是包含有关单个电影(或节目等)信息的类。



class Movie {
    public final String titleId;
    public final String titleUrl;
    public final String title;
    public final String description;
    public final Double rating;
    public final String genres;
    public final String runtime;
    public final String baseUrl;
    public final String baseNameUrl;
    public final String baseTitleUrl;
    public final String participantIds;
    public final String participantNames;
    public final String directorIds;
    public final String directorNames;


在一页上分析数据。



信息通过以下方式收集。使用jsoup上载数据。接下来,搜索所需的html元素并将其转换为电影实例。



String scrap(String url, List<Movie> items) {
    Document doc = null;
    try {
        doc = Jsoup.connect(url).header("Accept-Language", language).get();
    } catch (IOException e) {
        e.printStackTrace();
    }
    if (doc != null) {
        collectItems(doc, items);
        return nextUrl(doc);
    }
    return "";
}


搜索指向下一页的链接。



String nextUrl(Document doc) {
    Elements nextPageElements = doc.select(".next-page");
    if (nextPageElements.size() > 0) {
        Element hrefElement = nextPageElements.get(0);
        return baseUrl + hrefElement.attributes().get("href");
    }
    return "";
}


. . . , . .



@Override
public Collection<Movie> scrap() {
    String url = String.format(
            baseUrl + "/search/title/?release_date=%s,%s&countries=%s",
            startDate, endDate, countries
    );
    List<Movie> items = new ArrayList<>();
    String nextUrl = url;
    while (true) {
        nextUrl = scrap(nextUrl, items);
        if ("".equals(nextUrl)) {
            break;
        }
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }
    return items;
}


.





: MovieProducer. : run.



. . .



public void run() {
    try (SimpleStringStringProducer producer = new SimpleStringStringProducer(
            bootstrapServers, clientId, topic)) {
        Collection<Data.Movie> movies = movieDirectScrapingService.scrap();
        List<SimpleStringStringProducer.KeyValueStringString> kvList = new ArrayList<>();
        for (Data.Movie move : movies) {
            Map<String, String> map = new HashMap<>();
            map.put("title_id", move.titleId);
            map.put("title_url", move.titleUrl);
            String value = JSONObject.toJSONString(map);
            String key = UUID.randomUUID().toString();
            kvList.add(new SimpleStringStringProducer.KeyValueStringString(key, value));
        }
        producer.produce(kvList);
    }
}




. .

: MovieDirectScrapingExecutor. : run.



. .



public void run() {
    int countriesCounter = 0;
    List<String> countriesSource = Arrays.asList("us");

    while (true) {
        try {
            LocalDate localDate = LocalDate.now();

            int year = localDate.getYear();
            int month = localDate.getMonthValue();
            int day = localDate.getDayOfMonth();

            String monthString = month < 9 ? "0" + month : Integer.toString(month);
            String dayString = day < 9 ? "0" + day : Integer.toString(day);

            String startDate = year + "-" + monthString + "-" + dayString;
            String endDate = startDate;

            String language = "en";
            String countries = countriesSource.get(countriesCounter);

            execute(language, startDate, endDate, countries);

            Thread.sleep(1000);

            countriesCounter += 1;
            if (countriesCounter >= countriesSource.size()) {
                countriesCounter = 0;
            }

        } catch (InterruptedException e) {
        }
    }
}


MovieDirectScrapingExecutor, , , main.



.



{
  "base_name_url": "https:\/\/www.imdb.com\/name",
  "participant_ids": "nm7947173~nm2373827~nm0005288~nm0942193~",
  "title_id": "tt13121702",
  "rating": "0.0",
  "base_url": "https:\/\/www.imdb.com",
  "description": "It's Christmas time and Jackie (Carly Hughes), an up-and-coming journalist, finds that her life is at a crossroads until she finds an unexpected opportunity - to run a small-town newspaper ... See full summary »",
  "runtime": "",
  "title": "The Christmas Edition",
  "director_ids": "nm0838289~",
  "title_url": "\/title\/tt13121702\/?ref_=adv_li_tt",
  "director_names": "Peter Sullivan~",
  "genres": "Drama, Romance",
  "base_title_url": "https:\/\/www.imdb.com\/title",
  "participant_names": "Carly Hughes~Rob Mayes~Marie Osmond~Aloma Wright~"
}


.





, , -. kafka-.

. Apache Kafka Kafka Server.



: MovieProducerTest.



public class MovieProducerTest {
    @Test
    void simple() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String zooKeeperHost = "127.0.0.1";
        int zooKeeperPort = 22183;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String topic = "q-data";
        String clientId = "simple";
        try (KafkaServerService kafkaServerService = new KafkaServerService(
                brokerHost, brokerPort, zooKeeperHost, zooKeeperPort
        )
        ) {
            kafkaServerService.start();
            kafkaServerService.createTopic(topic);

            MovieDirectScrapingService movieDirectScrapingServiceImpl = () -> Collections.singleton(
                    new Data.Movie(…)
            );
            MovieProducer movieProducer =
                    new MovieProducer(bootstrapServers, clientId, topic, movieDirectScrapingServiceImpl);
            movieProducer.run();

            kafkaServerService.poll(topic, "simple", 1, 5, (records) -> {
                assertTrue(records.count() > 0);
                ConsumerRecord<String, String> record = records.iterator().next();
                JSONParser jsonParser = new JSONParser();
                JSONObject jsonObject = null;
                try {
                    jsonObject = (JSONObject) jsonParser.parse(record.value());
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                assertNotNull(jsonObject);
            });

            Thread.sleep(5000);
        }
    }
}




, , . .





.




All Articles