如何通过键值API在Apache Ignite中快速加载大型表

不久前,Apache Ignite平台应运而生并开始流行。内存计算就是速度,这意味着必须在工作的所有阶段都确保速度,尤其是在加载数据时。



剪切下方是对将数据从关系表快速加载到分布式Apache Ignite集群中的方法的描述。描述了在集群的客户端节点上对SQL查询结果集的预处理以及使用map-reduce任务在集群中进行数据分配的过程。描述缓存和相关的关系表,显示如何从表行创建自定义对象,以及如何使用ComputeTaskAdapter快速放置创建的对象。所有代码都可以在FastDataLoad存储库中完整看到



问题的历史



这段文字是我在GridGain网站上的In-Memory Computing Blog中的帖子的俄语翻译



因此,某家公司决定通过将计算移至内存群集中来加快慢速应用程序的速度。用于计算的初始数据在MS SQL中。计算结果必须放在此处。集群是分布式的,因为现在已经有很多数据,所以应用程序性能处于极限并且数据量正在增长。硬时间限制已设置。



在编写快速代码以处理数据之前,需要快速加载数据。对网络的疯狂搜索显示出明显缺乏代码示例,这些代码示例可以扩展到数以千万计的表。您可以下载,编译和逐步调试的示例。这是一方面。



另一方面,Apache Ignite / GridGain文档,网络研讨会和聚会提供了有关集群内部结构的想法。通过反复试验,可以使加载程序考虑跨分区的数据分布。当有一天,当局问“你的王牌是否发挥了作用?”,答案是肯定的,一切都解决了。

产生的代码似乎是一种带有内部体系结构的自制产品,但是它可以以足够的速度运行。



下载数据(世界数据库)



, data collocation, . world.sql Apache Ignite.



CSV , — SQL :





countryCache country.csv. countryCache — code, — String, — Country, (name, continent, region).







, — , . Country , . org.h2.tools.Csv, CSV java.sql.ResultSet. Apache Ignite , SQL H2.



    // define countryCache
    IgniteCache<String,Country> cache = ignite.cache("countryCache");

    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      cache.put(code,country);
     }
    }


. , , . - .



, . , .





Apache Ignite — -. , PARTITIONED - (partition) . ; , . -, affinity function, , .



ResultSet . , . .





, :



  • HashMap partition_number -> key -> Value

    Map<Integer, Map<String, Country>> result = new HashMap<>();
  • affinity function partition_number. cache.put() - HashMap partition_number

    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      result.computeIfAbsent(affinity.partition(key), k -> new HashMap<>()).put(code,country);
     }
    }


ComputeTaskAdapter ComputeJobAdapter. ComputeJobAdapter 1024. , .



ComputeJobAdapter . , .



Compute Task,



, "ComputeTaskAdapter initiates the simplified, in-memory, map-reduce process". ComputeJobAdapter map — , . reduce — .



(RenewLocalCacheJob)



, RenewLocalCacheJob .



targetCache.putAll(addend);


RenewLocalCacheJob partition_number .



(AbstractLoadTask)



( loader) — AbstractLoadTask. . ( ), AbstractLoadTask TargetCacheKeyType. HashMap



    Map<Integer, Map<TargetCacheKeyType, BinaryObject>> result;


countryCache String. . AbstractLoadTask TargetCacheKeyType, BinaryObject. , — .



BinaryObject



— . , JVM, - . class definition , JAR- . Country



    IgniteCache<String, Country> countryCache;


, , classpath ClassNotFound.



. — classpath, :



  • JAR- ;
  • classpath ;
  • ;
  • .


BinaryObject () . :





  • IgniteCache<String, BinaryObject> countryCache;    
  • Country BinaryObject (. LoadCountries.java)

    Country country = new Country(code, name, .. );    
    BinaryObject binCountry = node.binary().toBinary(country);    
  • HashMap, BinaryObject

    Map<Integer, Map<String, BinaryObject>> result


, . , , ClassNotFoundException .



. .



Apache Ignite : .





default-config.xml — . :



  • GridGain CE Installing Using ZIP Archive. 8.7.10, FastDataLoad , ;
  • {gridgain}\config default-config.xml



    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="peerClassLoadingEnabled" value="true"/>
    </bean>
  • , {gridgain}\bin ignite.bat. ; ;
  • , . ,

    [08:40:04] Topology snapshot [ver=2, locNode=d52b1db3, servers=2, clients=0, state=ACTIVE, CPUs=8, offheap=3.2GB, heap=2.0GB]


. , 8.7.25, pom.xml



    <gridgain.version>8.7.25</gridgain.version>




class org.apache.ignite.spi.IgniteSpiException: Local node and remote node have different version numbers (node will not join, Ignite does not support rolling updates, so versions must be exactly the same) [locBuildVer=8.7.25, rmtBuildVer=8.7.10]




, , map-reduce. — JAR-, compute task . Windows, Linux.

:



  • FastDataLoad;
  • ;

    mvn clean package
  • , .

    java -jar .\target\fastDataLoad.jar


main() LoadApp LoaderAgrument . map-reduce LoadCountries.

LoadCountries RenewLocalCacheJob , ( ).



#1





#2









country.csv , CountryCode . cityCache countryLanguageCache; , .





.



.



:



  • (SQL Server Management Studio):

    • — 44 686 837;
    • — 1.071 GB;
  • — 0H:1M:35S;
  • RenewLocalCacheJob reduce — 0H:0M:9S.


与执行SQL查询相比,在整个集群上分配数据所需的时间更少。




All Articles