许吉友 - 运维

大数据分析知乎用户网络 Demo

主要爬取知乎的用户关注信息,最终组成一张用户网络。

本文有些细节没说清楚,只给出具体思路和具体代码。

使用到的技术包括:Scrapy、Kafka、Elasticsearch、Flink。

爬取数据

首先第一步,要爬取数据,这里利用 Python 的爬虫框架 Scrapy。

首先安装 scrapy:

$ pip3 install scrapy

然后创建项目:

$ scrapy startproject zhihu_spider
$ cd zhihu_spider/
$ scrapy genspider zhihu www.zhihu.com

然后执行一下爬虫:

$ scrapy crawl zhihu

会发现一个 500 错误,需要进行设置,打开 setting.py,修改以下语句:

ROBOTSTXT_OBEY = False
DEFAULT_REQUEST_HEADERS = {
    "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36",
}

然后再运行爬虫,就正常了,会得到一个302,需要登录,不过没关系,知乎只有在访问首页时,才会进行认证检查,当访问其他页面时,并不会重定向到登录界面。

然后分析知乎的URL,有两条可以获取用户的个人信息和用户关注者信息:

https://www.zhihu.com/people/excited-vczh/followers
https://www.zhihu.com/api/v4/members/excited-vczh/followees?limit=20

然后来编写爬虫代码就可以了,爬取方式是迭代爬取,编写 zhihu.py:

# -*- coding: utf-8 -*-
import scrapy
import json


class ZhihuSpider(scrapy.Spider):
    name = 'zhihu'
    allowed_domains = ['www.zhihu.com']

    def start_requests(self):
        yield scrapy.Request(url='https://www.zhihu.com/people/excited-vczh/followers', callback=self.parse)
        yield scrapy.Request(url='https://www.zhihu.com/api/v4/members/excited-vczh/followees?limit=20', callback=self.parse_follower)

    def parse(self, response):
        url_token = response.url.split("/people/")[1].split("/followers")[0]
        # 先获取用户基本信息
        name = response.css(".ProfileHeader-name ::text").get()
        desc = response.css(".ProfileHeader-headline ::text").get()

        info_class = response.css(".ProfileHeader-info")
        one_item_div = info_class.css(".ProfileHeader-infoItem:first-child")
        two_item_div = info_class.css(".ProfileHeader-infoItem:last-child")
        one_item_div_text_list = one_item_div.xpath("text()")
        two_item_div_text_list = two_item_div.xpath("text()")
        major = ""
        company = ""
        occupation = ""
        university = ""
        tie = ""
        if len(one_item_div_text_list) >= 1:
            major = one_item_div_text_list[0].get()
        if len(one_item_div_text_list) >= 2:
            company = one_item_div_text_list[1].get()
        if len(one_item_div_text_list) >= 3:
            occupation = one_item_div_text_list[2].get()

        if len(info_class.css(".ProfileHeader-infoItem")) == 2:
            if len(two_item_div_text_list) >= 1:
                university = two_item_div_text_list[0].get()
            if len(two_item_div_text_list) >= 2:
                tie = two_item_div_text_list[1].get()

        item = {
            "t": "user",
            "url_token": url_token,
            "name": name,
            "desc": desc,
            "major": major,
            "company": company,
            "occupation": occupation,
            "university": university,
            "tie": tie,
        }
        yield item

    def parse_follower(self, response):
        json_obj = json.loads(response.body_as_unicode())
        is_end = json_obj["paging"]["is_end"]
        next_url = json_obj["paging"]["next"]
        data = json_obj["data"]
        for follower_obj in data:
            # 记录关注者信息,并获取更多信息
            follower_obj["t"] = "follower"
            follower_obj["v"] = response.url.split("/members/")[1].split("/followees")[0]
            yield follower_obj
            yield scrapy.Request(url=follower_obj["url"] + '/followers', callback=self.parse)

        if not is_end:
            # 分页获取关注着列表
            yield scrapy.Request(
                url="https://www.zhihu.com/api/v4/members/excited-vczh/followees?" + next_url.split("?")[1],
                callback=self.parse_follower
            )

        # 迭代获取用户关注者列表
        for follower_obj in data:
            yield scrapy.Request(url='https://www.zhihu.com/api/v4/members/' + follower_obj["url_token"] + '/followees?limit=20',
                                 callback=self.parse_follower)

然后运行爬虫:

$ scrapy crawl zhihu

爬虫跑起来后,短时间是不会停的,疯狂刷数据。

然后我们对获取到的数据传递给 Kafka,然后由 Kafka 再交给其他组件,编写 item.py

import scrapy


class UserDetailItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    t = scrapy.Field()
    url_token = scrapy.Field()
    name = scrapy.Field()
    desc = scrapy.Field()
    major = scrapy.Field()
    company = scrapy.Field()
    occupation = scrapy.Field()
    university = scrapy.Field()
    tie = scrapy.Field()
    pass


class FollowerItem(scrapy.Item):
    t = scrapy.Field()
    v = scrapy.Field()
    id = scrapy.Field()
    url_token = scrapy.Field()
    name = scrapy.Field()
    use_default_avatar = scrapy.Field()
    avatar_url = scrapy.Field()
    avatar_url_template = scrapy.Field()
    is_org = scrapy.Field()
    type = scrapy.Field()
    url = scrapy.Field()
    user_type = scrapy.Field()
    headline = scrapy.Field()
    gender = scrapy.Field()
    is_advertiser = scrapy.Field()
    vip_info = scrapy.Field()
    pass

然后编写 pipelines.py:

from pykafka import KafkaClient


class ZhihuSpiderPipeline(object):

    def __init__(self):
        client = KafkaClient(hosts="fueltank-3:9092")
        self.user_detail_producer = client.topics['user_detail'].get_sync_producer()
        self.follower_producer = client.topics['follower'].get_sync_producer()

    def process_item(self, item, spider):
        if item["t"] == "user":
            self.user_detail_producer.produce(bytes(str(item), encoding="utf-8"))
            return item
        elif item["t"] == "follower":
            self.follower_producer.produce(bytes(str(item), encoding="utf-8"))
            return item

在 Kafka 的另一端,我想将数据存进 Elasticsearch ,ES 即可用作搜索,也可以作为 Flink 图计算的数据源。

说干就干

新建基于 Maven 的 Java 项目,填充 pom.xml :

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <mainClass>xujiyou.work.DataHandle</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.2.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>assemble-all</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.neo4j.driver</groupId>
        <artifactId>neo4j-java-driver</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.5.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

使用了 Kafka ,ES 的客户端 Jar,neo4j 没有使用,主要是因为 Neo4j 的语言跟 json 不搭,爬出来的数据是动态的,不一定有哪些字段,遂放弃。

我想在 Spark 进行图计算之后,再将数据存进 Neo4j,方便客户端查看图关系。

编写 Java 代码:

package xujiyou.work;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

/**
 * DataHandle class
 *
 * @author jiyouxu
 * @date 2019/12/19
 */
public class DataHandle {

    static ElasticsearchHandle elasticsearchHandle = new ElasticsearchHandle();
    static Neo4jHandle neo4jHandle = new Neo4jHandle();

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put("bootstrap.servers", "fueltank-3:9092");
        props.put("group.id", "default_consumer_group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        new Thread(() -> {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("follower"));
            // 指定分区从末尾消费
            while (consumer.assignment().size() == 0) {
                consumer.poll(Duration.ofSeconds(1));
            }

            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
            while (consumer.assignment().size() == 0) {
                consumer.poll(Duration.ofSeconds(1));
            }
            consumer.seekToEnd(consumer.assignment());

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for(ConsumerRecord<String, String> record : records) {
                    String followerStr = record.value().replaceAll("None", "null");
                    followerStr = followerStr.replaceAll("False", "false");
                    followerStr = followerStr.replaceAll("True", "true");

                    System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + followerStr);
                    elasticsearchHandle.handleFollower(followerStr);
                    neo4jHandle.handleFollower(followerStr);
                }
            }
        }).start();

        new Thread(() -> {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("user_detail"));
            // 指定分区从末尾消费
            while (consumer.assignment().size() == 0) {
                consumer.poll(Duration.ofSeconds(1));
            }
            consumer.seekToEnd(consumer.assignment());

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for(ConsumerRecord<String, String> record : records) {
                    String userStr = record.value().replaceAll("None", "null");
                    userStr = userStr.replaceAll("False", "false");
                    userStr = userStr.replaceAll("True", "true");

                    System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + userStr);
                    elasticsearchHandle.handleUser(userStr);
                    neo4jHandle.handleUser(userStr);
                }
            }
        }).start();
    }
}

因为要监听两个 Topic,所以启动了两个线程,接收到数据之后,交给 ElasticsearchHandle 进行处理

ElasticsearchHandle类:

package xujiyou.work;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;

import java.io.IOException;
import java.util.Map;

/**
 * ElasticsearchHandle class
 *
 * @author jiyouxu
 * @date 2019/12/19
 */
public class ElasticsearchHandle {

    private RestHighLevelClient client;

    ElasticsearchHandle() {
         this.client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("fueltank-4", 9200, "http")));
    }

    public void handleFollower(String followerStr) {
        JSONObject jsonObject = JSON.parseObject(followerStr);
        IndexRequest indexRequest = new IndexRequest("follower").source(jsonObject.toJavaObject(Map.class));
        try {
            IndexResponse indexResponse = this.client.index(indexRequest, RequestOptions.DEFAULT);
            System.out.println(indexResponse);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void handleUser(String userStr) {
        JSONObject jsonObject = JSON.parseObject(userStr);
        IndexRequest indexRequest = new IndexRequest("user_detail").source(jsonObject.toJavaObject(Map.class));
        try {
            IndexResponse indexResponse = this.client.index(indexRequest, RequestOptions.DEFAULT);
            System.out.println(indexResponse);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在本地运行,没啥问题之后,就可以打包放到服务端运行了。

python代码在本地和在服务端的运行方式是一样的,java 代码要打包为 jar 包,执行 mvn clean package ,因为在pom.xml 中配置了插件,所以会将所有依赖全部打进 jar 包。

然后将 jar 包上传到服务端。先后台运行 jar 包

$ nohup sudo java -cp data-handle-1.0-SNAPSHOT-jar-with-dependencies.jar xujiyou.work.DataHandle > console.log &

然后运行 python 代码,也是后台运行:

$ nohup sudo proxychains4 scrapy crawl zhihu > console.log &

注意这里 proxychains4 要放到 sudo 后面。

proxychains4 是干嘛的那,这是一个客户端代理工具,只要命令前面加上这一条,程序走的就是代理了,并且改程序可以使用 random 随机代理,对于爬虫来说至关重要,因为这个工具的存在,所以不用在 python 层面做代理了,直接在系统层面做代理就行了。

下面简单介绍下这个东西的安装,源码安装:

$ git clone https://github.com/rofl0r/proxychains-ng
$ cd proxychains-ng
$ sudo ./configure --prefix=/usr --sysconfdir=/etc
$ sudo make
$ sudo make install
$ sudo make install-config

然后修改配置文件 /etc/proxychains.conf ,加上自己的代理即可,也要防止内网地址被代理,下面是部分配置:

localnet 127.0.0.0/255.0.0.0
localnet 10.0.0.0/255.0.0.0
localnet 172.16.0.0/255.240.0.0
localnet 192.168.0.0/255.255.0.0
[ProxyList]
http 106.333.222.111 2333 username password

最下面这行就是代理配置。

好了,数据准备好了,下面进行图计算,首先用 Flink 算一下。

新建 maven 项目,pom.xml 中填入:

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <manifestEntries>
                                        <Main-Class>work.xujiyou.flink.graph.FollowerGraph</Main-Class>
                                        <X-Compile-Source-JDK>1.8</X-Compile-Source-JDK>
                                        <X-Compile-Target-JDK>1.8</X-Compile-Target-JDK>
                                    </manifestEntries>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <properties>
        <flink.version>1.9.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-gelly_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.5.0</version>
        </dependency>
    </dependencies>

有几个依赖用不到,多余就多余吧。

然后写 Java 代码,FollowerGraph:

package work.xujiyou.flink.graph;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.pregel.ComputeFunction;
import org.apache.flink.graph.pregel.MessageCombiner;
import org.apache.flink.graph.pregel.MessageIterator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * FollowerGraph class
 *
 * @author jiyouxu
 * @date 2019/12/20
 */
public class FollowerGraph {

    static ElasticSearchDataSource elasticSearchDataSource = new ElasticSearchDataSource();

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final List<Vertex<String, Map<String, Object>>> vertexList = new ArrayList<>();
        final List<Edge<String, String>> edgeList = new ArrayList<>();
        final List<Map<String, Object>> dataList = elasticSearchDataSource.findDataList();

        Map<String, Object> oneMap = new HashMap<>();
        oneMap.put("url_token", "excited-vczh");
        oneMap.put("name", "vczh");
        oneMap.put("headline", "专业造轮子,拉黑抢前排。gaclib.net");
        dataList.add(0, oneMap);

        dataList.forEach((follower) -> {
            Vertex<String, Map<String, Object>> vertex = new Vertex<>(follower.get("url_token").toString(), follower);
            vertexList.add(vertex);
            if (follower.get("v") != null) {
                Edge<String, String> edge = new Edge<>(follower.get("v").toString(), follower.get("url_token").toString(), "1");
                edgeList.add(edge);
            }
        });

        DataSet<Vertex<String, Map<String, Object>>> vertices = env.fromCollection(vertexList);
        DataSet<Edge<String, String>> edges = env.fromCollection(edgeList);
        Graph<String, Map<String, Object>, String> graph = Graph.fromDataSet(vertices, edges, env);
        System.out.println(graph.getVertices().collect().size());

        Graph<String, Map<String, Object>, String> result = graph.runVertexCentricIteration(new SSSPComputeFunction(), new SSSPCombiner(), 10000);
        System.out.println("wan");
        System.out.println(result.getVertices().collect().size());
    }

    public static final class SSSPComputeFunction extends ComputeFunction<String, Map<String, Object>, String, String> {
        @Override
        public void compute(Vertex<String, Map<String, Object>> vertex, MessageIterator<String> strings) {
            System.out.println(vertex.getValue().get("name") + ":" + vertex.getValue().get("headline"));
        }
    }

    public static final class SSSPCombiner extends MessageCombiner<String, String> {
        @Override
        public void combineMessages(MessageIterator<String> strings) throws Exception {
            System.out.println(strings);
        }
    }
}

然后还有数据源的代码,从 Elasticsearch 中取数据:

package work.xujiyou.flink.graph;

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * DataSource class
 *
 * @author jiyouxu
 * @date 2019/12/20
 */
public class ElasticSearchDataSource {

    private RestHighLevelClient client;

    ElasticSearchDataSource() {
        this.client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("fueltank-4", 9200, "http")));
    }

    public List<Map<String, Object>> findDataList() {
        SearchRequest searchRequest = new SearchRequest("follower");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        searchSourceBuilder.size(10000);
        searchRequest.source(searchSourceBuilder);

        List<Map<String, Object>> list = new ArrayList<>();
        try {
            SearchResponse searchResponse = this.client.search(searchRequest, RequestOptions.DEFAULT);
            SearchHit[] searchHits = searchResponse.getHits().getHits();

            for (SearchHit searchHit : searchHits) {
                list.add(searchHit.getSourceAsMap());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return list;
    }

}

好了,搞定。代码的意思是先将数据组成一张图,然后对图就行迭代运算,迭代的过程中打印节点的名字和描述信息。

下面是部分输出信息:

俺村俺最菜:Robot full autonomy を目指している
你好明天:
小土豆:
电动车前线:专业且接地气儿的智能电动车账号
135编辑器:微信公众号排版工具+公众号裂变涨粉+新媒体技能提升
聂小远:演员
卢宁:少数持有“皮肤主诊资质”的微整形医生

总结

到此为止,大数据算是入门了,利用了 Scrapy 进行数据爬取,Kafka 进行数据传递,然后 Java 接收 Kafka 中的数据保存到 Elasticsearch 中,Flink 再从 Elasticsearch 中取数据进行图计算。

下一步把大数据全家桶的官方文档全看一遍。