三分钟之类教你学会数据架构的Kafka ,Python教程

  • 在Ubuntu 18.04上安装Apache Mesos 1.6.0
  • Kafka快速数据架构教程
  • 用于快速数据架构的Kafka Python教程

  • 为了演示如何分析您的大数据,我们将配置一个大数据管道,从Clicky.com提取网站指标,并将这些指标推送到我们的Kafka群集上的Kafka主题。
    这只是您可能希望在大数据实现中实现的一个管道。网站统计信息可能是您数据的重要组成部分,因为这可以为您提供有关网站访问者,访问过的网页等的数据。在您执行数据分析时将这些数据与社交媒体共享等其他数据相结合,您就可以制作一些数据关于何时是您将网站更新发布到社交媒体以吸引最多访问者的最佳时间,相当简洁的商业决策。这是实施大数据的主要好处:不一定是原始数据本身,而是您可以从原始数据中提取的知识,并做出更明智的决策。

    三分钟之类教你学会数据架构的Kafka ,Python教程


    在此示例中,我们将从Clicky.com API中提取“ 页面 ”统计信息,并将其推送到admintome-pages Kafka主题。这将从AdminTome的首页提供JSON数据。
    Clicky Web Analytics
    为了完全遵循本文,您需要有一个链接到Clicky.com的网站。这是免费的,为什么不呢。在clicky.com注册您的网站。我个人使用它是因为它有比Google Analytics提供的更好的博客指标报告(例如放弃率)。您需要向页面添加一些代码,以便clicky可以开始收集指标。
    在您的页面发送指标以进行clicky后,您需要获取一些值才能使用Clicky API并从我们的Python应用程序中提取指标。转到您网站的偏好设置,您会看到我们需要的两个数字:

    • 网站ID
    • 网站密钥


    不要在任何地方发布,因为他们可以让任何人访问您的网站数据。稍后当我们连接到API并提取网站统计信息时,我们将需要这些数字。
    准备卡夫卡
    首先,我们需要通过向我们将用于向其发送消息的Kafka群集添加主题来准备我们的Kafka群集。从上图中可以看出,我们在Kafka中的主题将是admintome-pages。


    登录到Mesos Master,你运行了Kafka-mesos。如果你按照上一篇文章,我使用的主人是mesos1.admintome.lab。接下来,我们将使用kafka-mesos.sh脚本创建主题:
    $ cd kafka /
    $ ./kafka-mesos.sh topic add admintome-pages --broker = 0 --api = http://mslave2.admintome.lab:7000
    请注意,API参数指向我们在上一篇文章中使用kafka-mesos创建的Kafka调度程序。您可以验证您现在拥有正确的主题:
    $ ./kafka-mesos.sh主题列表--api = http://mslave2.admintome.lab:7000
    主题:
    name:__ consumer_offsets
    分区:0:[0],1:[0],2:[0],3:[0],4:[0],5:[0],6:[0],7:[0], 8:[0],9:[0],10:[0],11:[0],12:[0],13:[0],14:[0],15:[0],16: [0],17:[0],18:[0],19:[0],20:[0],21:[0],22:[0],23:[0],24:[0 ],25:[0],26:[0],27:[0],28:[0],29:[0],30:[0],31:[0],32:[0], 33:[0],34:[0],35:[0],36:[0],37:[0],38:[0],39:[0],40:[0],41: [0],42:[0],43:[0],44:[0],45:[0],46:[0],47:[0],48:[0],49:[0 ]
    options:segment.bytes = 104857600,cleanup.policy = compact,compression.type = producer
    名称:admintome
    分区:0:[0]
    name:admintome-pages
    分区:0:[0]
    我们的新主题准备好了!现在是时候了解有趣的东西并开始开发我们的Python应用程序。
    现在我们已经准备好了Kafka,我们将开始开发我们的Kafka制作人。生产者将从Clicky API获取页面度量标准,并将这些JSON格式的度量标准推送到我们之前创建的主题。
    我假设您的系统上安装了Python 3,并且还安装了virtualenv。
    要开始,我们需要设置我们的环境。

    $ mkdir ~ / Development / python / venvs
    $ mkdir ~ / Development / python / site-stats-intake
    $ cd ~ / Development / python / site-stats-intake
    $ virtualenv ../venvs/intake
    $ source ../venvs/intake/bin/activate
    (摄入)$ pip安装kafka-python请求
    (摄入)$ pip freeze> requirements.txt
    接下来,我们需要创建我们的类。
    Clicky Class
    我们将创建一个名为Clicky的新Python类,我们将使用它来与Clicky API进行交互。创建一个名为clicky.py的新文件并添加以下内容:
    导入 请求
    导入 json
    class Clicky(对象):
    def __init __(self,site_id,sitekey):
    自我。site_id = site_id
    自我。sitekey = sitekey
    自我。output = “json”
    def get_data(self,data_type):
    click_api_url = “https://api.clicky.com/api/stats/4”
    payload = { “site_id”:self。site_id,
    “sitekey”:自我。sitekey,
    “type”:data_type,
    “输出”:自我。输出 }
    响应 = 请求。get(click_api_url,params = payload)
    raw_stats = 回复。文本
    返回 raw_stats
    def get_pages_data(self):
    data = self。get_data(“pages”)
    返回 json。载荷(数据)
    保存文件并退出。
    为了获得我们的指标,我们需要向Clicky API URL发送HTTP GET请求

    https://api.clicky.com/api/stats/4
    我们还需要包含几个参数:

    • site_id:这是我们之前获得的站点ID号。
    • sitekey:这是之前获得的站点密钥号。
    • type:要获取我们的首页,我们将类型设置为“页面”。
    • 输出:我们将其设置为“json”,以便API返回JSON数据。


    最后,我们调用请求Python模块,使用我们指定的参数对我们的API URL执行HTTP GET。在该 get_pages_data 方法中,我们返回一个表示我们的JSON数据的字典。接下来,我们将编写Kafka类实现的代码。
    MyKafka班
    此课程将与我们的Kafka群集进行交互,并将网站指标推送到我们的主题。创建一个名为mykafka.py的新文件并添加以下内容:
    来自 kafka import KafkaProducer
    导入 json


    class MyKafka(object):
    def __init__(self,kafka_brokers):
    自我。producer = KafkaProducer(
    value_serializer = lambda v:json。转储(v)。编码('utf-8'),
    bootstrap_servers = kafka_brokers

    def send_page_data(self,json_data):
    自我。制片人。发送('admintome-pages',json_data)
    首先,我们导入kafka-python库,特别是KafkaProducer类,它将让我们编写Kafka生产者代码并将消息发布到我们的Kafka主题。
    来自 kafka import KafkaProducer
    我们现在定义我们的 MyKafka 类并为它创建构造函数:
    class MyKafka(object):
    def __init__(self,kafka_brokers):
    这需要一个参数来表示将用于连接到我们的Kafka集群的Kafka代理。这是一个字符串数组,形式如下:
    [ “broker:ip”,“broker:ip” ]
    我们将只使用一个代理,它是我们在上一篇文章中创建的代理:mslave1.admintome.lab:31000:
    [ “mslave1.admintome.lab:31000” ]
    我们接下来实例化一个KafkaProducer 名为的新 对象producer。由于我们将以JSON的形式向Kafka发送数据,因此我们告诉 KafkaProducer 使用JSON解码器转储来使用value_serializer 参数解析数据 。我们还告诉它使用我们的经纪人参数。bootstrap_servers
    自我。producer = KafkaProducer(
    value_serializer = lambda v:json。转储(v)。编码('utf-8'),
    bootstrap_servers = kafka_brokers

    最后,我们创建一个新方法,用于将消息发送到我们的 admintome-pages 主题:

    def send_page_data(self,json_data):
    自我。制片人。发送('admintome-pages',json_data)
    这里的所有都是它的。现在我们将编写将控制一切的Main类。
    主类
    创建一个名为main.py的新文件并添加以下内容:
    来自 clicky 导入 Clicky
    来自 mykafka 导入 MyKafka
    导入 日志
    进口 时间
    进口 OS
    来自 伐木。config import dictConfig
    class Main(对象):
    def __init__(个体经营):
    如果是 'KAFKA_BROKERS' 在 os中。环境:
    kafka_brokers = os。环境 [ 'KAFKA_BROKERS' ]。分裂(',')
    否则:
    引发 ValueError('KAFKA_BROKERS环境变量未设置')
    如果 'SITE_ID' 的 OS。环境:
    自我。site_id = os。环境 [ 'SITE_ID' ]
    否则:
    引发 ValueError('SITE_ID环境变量未设置')
    如果是 'SITEKEY' 在 os中。环境:
    自我。sitekey = os。环境 [ 'SITEKEY' ]
    否则:
    引发 ValueError('未设置'SITEKEY环境变量')
    logging_config = dict(
    版本 = 1,
    formatters = {
    'f':{ 'format':

    '%(asctime)s%(name)-12s%(levelname)-8s%(message)s' }
    },
    处理程序 = {
    'h':{ 'class':'logging.StreamHandler',
    'formatter':'f',
    '级别':记录。DEBUG }
    },
    root = {
    '处理程序':[ 'h' ],
    '级别':记录。调试,
    },

    自我。logger = logging。getLogger()
    dictConfig(logging_config)
    自我。记录器。info(“初始化卡夫卡制片人”)
    自我。记录器。info(“KAFKA_BROKERS = {0}”。format(kafka_brokers))
    自我。mykafka = MyKafka(kafka_brokers)
    def init_clicky(self):
    自我。clicky = Clicky(自我。SITE_ID,自我。sitekey)
    自我。记录器。info(“Clicky Stats Polling Initialized”)
    def run(self):
    自我。init_clicky()
    starttime = 时间。时间()
    而 真:
    data = self。点击。get_pages_data()
    自我。记录器。info(“成功轮询Clicky页面数据”)
    自我。mykafka。send_page_data(数据)
    自我。记录器。info(“发布的页面数据到Kafka”)
    时间。睡眠(300.0 - ((时间。时间()- 开始时间)% 300.0))
    if __name__ == “ __ main__ ”:

    记录。info(“初始化Clicky Stats Polling”)
    main = Main()
    主要的。run()
    这个例子的最终状态是构建一个Docker容器,然后我们将在Marathon上运行它。考虑到这一点,我们不希望在代码中对我们的一些敏感信息(如我们的clicky站点ID和站点密钥)进行硬编码。我们希望能够从环境变量中提取这些内容。如果它们没有设置,那么我们通过异常并退出。
    如果是 'KAFKA_BROKERS' 在 os中。环境:
    kafka_brokers = os。环境 [ 'KAFKA_BROKERS' ]。分裂(',')
    否则:
    引发 ValueError('KAFKA_BROKERS环境变量未设置')
    如果 'SITE_ID' 的 OS。环境:
    自我。site_id = os。环境 [ 'SITE_ID' ]
    否则:
    引发 ValueError('SITE_ID环境变量未设置')
    如果是 'SITEKEY' 在 os中。环境:
    自我。sitekey = os。环境 [ 'SITEKEY' ]
    否则:
    引发 ValueError('未设置'SITEKEY环境变量')
    我们还配置日志记录,以便我们可以看到我们的应用程序正在发生什么。我在代码中编写了一个无限循环,用于轮询clicky并每隔五分钟将指标推送到我们的Kafka主题。
    def run(self):
    自我。init_clicky()
    starttime = 时间。时间()

    而 真:
    data = self。点击。get_pages_data()
    自我。记录器。info(“成功轮询Clicky页面数据”)
    自我。mykafka。send_page_data(数据)
    自我。记录器。info(“发布的页面数据到Kafka”)
    时间。睡眠(300.0 - ((时间。时间()- 开始时间)% 300.0))
    保存文件并退出。
    运行我们的应用
    要测试一切正常,您可以在设置环境变量后尝试运行应用程序:
    (摄入量)$ export KAFKA_BROKERS = “mslave1.admintome.lab:31000”
    (进口)$ export SITE_ID = “{your site id}”
    (入口)$ export SITEKEY = “{your sitekey}”
    (摄取)$ python main.py
    2018 -06 -25 15:34:32,259 root INFO初始化Kafka Producer
    2018 -06 -25 15:34:32,259 root INFO KAFKA_BROKERS = [ 'mslave1.admintome.lab:31000' ]
    2018 -06 -25 15:34:32,374 root INFO Clicky Stats Polling Initialized
    2018 -06 -25 15:34:32,754 root INFO成功轮询Clicky页面数据
    2018 -06 -25 15:34:32,755 root INFO向Kafka发布的页面数据
    我们正在向我们的Kafka主题发送消息!接下来我们将构建我们的Docker容器并将其部署到Marathon。最后,我们将通过编写一个测试消费者来完成,该消费者将从我们的主题中获取消息。
    我为本文中使用的所有代码创建了一个GitHub存储库:https://github.com/admintome/clicky-state-intake
    现在我们已经编写了应用程序代码,我们可以创建一个Docker容器,以便我们可以将它部署到Marathon。使用以下内容在应用程序目录中创建Dockerfile文件:

    来自python:3
    WORKDIR / usr / src / app
    COPY requirements.txt ./
    运行pip install --no-cache-dir -r requirements.txt
    复制。。
    CMD [“python”,“。/ main.py”]
    构建容器
    $ docker build -t {你的码头中心用户名} site-stats-intake。
    Docker构建完成后,您需要将其推送到您的Mesos Slaves可以访问的Docker存储库。对我来说,这是Docker Hub:
    $ docker push -t admintome / site-stats-intake
    然后登录到每个Mesos从站并向下拉图像:
    $ docker pull admintome / site-stats-intake
    我们现在准备为我们的应用程序创建Marathon应用程序部署。
    转到Marathon GUI。
    HTTP://mesos1.admintome.lab:8080
    单击“创建应用程序”按钮。然后单击JSON模式按钮:

    三分钟之类教你学会数据架构的Kafka ,Python教程


    粘贴以下JSON代码:
    {
    “id”:“site-stats-intake”,
    “cmd”:null,
    “cpus”:1,
    “mem”:128,
    “disk”:0,
    “实例”:1,
    “container”:{
    “docker”:{
    “image”:“admintome / site-stats-intake”
    },
    “type”:“DOCKER”
    },
    “网络”:[
    {
    “模式”:“主持人”
    }
    ]
    “env”:{
    “KAFKA_BROKERS”:“192.168.1.x:port”,
    “SITE_ID”:“{your site_id}”,
    “SITEKEY”:“{your sitekey}”
    }
    }
    请确保在环境的env部分中替换KAFKA_BROKERS,SITE_ID和SITEKEY的正确值。
    最后,单击Create Application按钮以部署应用程序。几秒钟后,您应该看到应用程序正在运行。

    三分钟之类教你学会数据架构的Kafka ,Python教程


    要查看日志,请单击site-stats-intake应用程序,然后单击stderr链接以下载包含日志的文本文件。
    现在我们将我们的应用程序部署到Marathon,我们将编写一个简短的消费者,我们将在我们的开发系统上运行,以向我们展示已收到的消息。
    这将是一个简单的Kafka消费者,它将检查主题并显示有关该主题的所有消息。在这一点上没有用,但它让我们知道我们的小轮询应用程序正常工作。
    创建一个名为consumer.py的新文件并添加以下内容:
    导入 系统
    来自 kafka 进口 KafkaConsumer
    consumer = KafkaConsumer('admintome-pages',bootstrap_servers = “mslave1.admintome.lab:31000”,
    auto_offset_reset = 'earliest')
    尝试:
    对于 消息 的 消费者:
    打印(消息。值)
    除 KeyboardInterrupt 外:
    SYS。退出()
    保存并退出该文件。这使得Kafka经纪人硬编码,因为我们只是用它来测试一切。确保使用代理名称和端口更新bootstrap-servers参数。
    现在运行该命令,您应该看到大量的JSON代表您访问最多的页面:
    (摄取)$ python consumer.py


    b '[{“type”:“pages”,“dates”:[{“date”:“2018-06-25”,“items”:[{“value”:“145”,“value_percent”:“43.2 “,”title“:”快速数据架构的Kafka教程 - AdminTome Blog“,”stats_url“:”http://clicky.com/stats/visitors?site_id=101045340&date=2018-06-25&href=%2Fblog%2Fkafka- tutorial-for-fast-data-architecture%2F“,”url“:”http://www.admintome.com/blog/kafka-tutorial-for-fast-data-architecture/“},...

    1. 我们现在有一个数据管道,其中包含一些我们可以使用的数据。下一步将使用该数据并进行分析。在本文中,我们将安装和配置我们的SMACK堆栈的下一部分,即Apache Spark。我们还将配置它分析我们的数据并给我们一些有意义的东西。在Ubuntu 18.04上安装Apache Mesos 1.6.0
    2. Kafka快速数据架构教程
    3. 用于快速数据架构的Kafka Python教程


    这是我的快速数据架构系列中的第三篇文章,它引导您使用SMACK堆栈实现投标数据。本文以其他文章为基础,如果您还没有阅读过这些文章,我强烈建议您这样做,以便您拥有本教程中需要遵循的基础结构。
    本文将引导您从Clicky.com中提取网站指标。我还有另一篇文章,我们将从Google Analytics中提取指标并将指标发布到Apache Kafka:Kafka Python和Google Analytics。
    为了演示如何分析您的大数据,我们将配置一个大数据管道,从Clicky.com提取网站指标,并将这些指标推送到我们的Kafka群集上的Kafka主题。


    这只是您可能希望在大数据实现中实现的一个管道。网站统计信息可能是您数据的重要组成部分,因为这可以为您提供有关网站访问者,访问过的网页等的数据。在您执行数据分析时将这些数据与社交媒体共享等其他数据相结合,您就可以制作一些数据关于何时是您将网站更新发布到社交媒体以吸引最多访问者的最佳时间,相当简洁的商业决策。这是实施大数据的主要好处:不一定是原始数据本身,而是您可以从原始数据中提取的知识,并做出更明智的决策。

    三分钟之类教你学会数据架构的Kafka ,Python教程


    在此示例中,我们将从Clicky.com API中提取“ 页面 ”统计信息,并将其推送到admintome-pages Kafka主题。这将从AdminTome的首页提供JSON数据。
    Clicky Web Analytics
    为了完全遵循本文,您需要有一个链接到Clicky.com的网站。这是免费的,为什么不呢。在clicky.com注册您的网站。我个人使用它是因为它有比Google Analytics提供的更好的博客指标报告(例如放弃率)。您需要向页面添加一些代码,以便clicky可以开始收集指标。
    在您的页面发送指标以进行clicky后,您需要获取一些值才能使用Clicky API并从我们的Python应用程序中提取指标。转到您网站的偏好设置,您会看到我们需要的两个数字:

    • 网站ID
    • 网站密钥


    不要在任何地方发布,因为他们可以让任何人访问您的网站数据。稍后当我们连接到API并提取网站统计信息时,我们将需要这些数字。
    准备卡夫卡
    首先,我们需要通过向我们将用于向其发送消息的Kafka群集添加主题来准备我们的Kafka群集。从上图中可以看出,我们在Kafka中的主题将是admintome-pages。


    登录到Mesos Master,你运行了Kafka-mesos。如果你按照上一篇文章,我使用的主人是mesos1.admintome.lab。接下来,我们将使用kafka-mesos.sh脚本创建主题:
    $ cd kafka /
    $ ./kafka-mesos.sh topic add admintome-pages --broker = 0 --api = http://mslave2.admintome.lab:7000
    请注意,API参数指向我们在上一篇文章中使用kafka-mesos创建的Kafka调度程序。您可以验证您现在拥有正确的主题:
    $ ./kafka-mesos.sh主题列表--api = http://mslave2.admintome.lab:7000
    主题:
    name:__ consumer_offsets
    分区:0:[0],1:[0],2:[0],3:[0],4:[0],5:[0],6:[0],7:[0], 8:[0],9:[0],10:[0],11:[0],12:[0],13:[0],14:[0],15:[0],16: [0],17:[0],18:[0],19:[0],20:[0],21:[0],22:[0],23:[0],24:[0 ],25:[0],26:[0],27:[0],28:[0],29:[0],30:[0],31:[0],32:[0], 33:[0],34:[0],35:[0],36:[0],37:[0],38:[0],39:[0],40:[0],41: [0],42:[0],43:[0],44:[0],45:[0],46:[0],47:[0],48:[0],49:[0 ]
    options:segment.bytes = 104857600,cleanup.policy = compact,compression.type = producer
    名称:admintome
    分区:0:[0]
    name:admintome-pages
    分区:0:[0]
    我们的新主题准备好了!现在是时候了解有趣的东西并开始开发我们的Python应用程序。
    现在我们已经准备好了Kafka,我们将开始开发我们的Kafka制作人。生产者将从Clicky API获取页面度量标准,并将这些JSON格式的度量标准推送到我们之前创建的主题。
    我假设您的系统上安装了Python 3,并且还安装了virtualenv。
    要开始,我们需要设置我们的环境。

    $ mkdir ~ / Development / python / venvs
    $ mkdir ~ / Development / python / site-stats-intake
    $ cd ~ / Development / python / site-stats-intake
    $ virtualenv ../venvs/intake
    $ source ../venvs/intake/bin/activate
    (摄入)$ pip安装kafka-python请求
    (摄入)$ pip freeze> requirements.txt
    接下来,我们需要创建我们的类。
    Clicky Class
    我们将创建一个名为Clicky的新Python类,我们将使用它来与Clicky API进行交互。创建一个名为clicky.py的新文件并添加以下内容:
    导入 请求
    导入 json
    class Clicky(对象):
    def __init __(self,site_id,sitekey):
    自我。site_id = site_id
    自我。sitekey = sitekey
    自我。output = “json”
    def get_data(self,data_type):
    click_api_url = “https://api.clicky.com/api/stats/4”
    payload = { “site_id”:self。site_id,
    “sitekey”:自我。sitekey,
    “type”:data_type,
    “输出”:自我。输出 }
    响应 = 请求。get(click_api_url,params = payload)
    raw_stats = 回复。文本
    返回 raw_stats
    def get_pages_data(self):
    data = self。get_data(“pages”)
    返回 json。载荷(数据)
    保存文件并退出。
    为了获得我们的指标,我们需要向Clicky API URL发送HTTP GET请求

    https://api.clicky.com/api/stats/4
    我们还需要包含几个参数:

    • site_id:这是我们之前获得的站点ID号。
    • sitekey:这是之前获得的站点密钥号。
    • type:要获取我们的首页,我们将类型设置为“页面”。
    • 输出:我们将其设置为“json”,以便API返回JSON数据。


    最后,我们调用请求Python模块,使用我们指定的参数对我们的API URL执行HTTP GET。在该 get_pages_data 方法中,我们返回一个表示我们的JSON数据的字典。接下来,我们将编写Kafka类实现的代码。
    MyKafka班
    此课程将与我们的Kafka群集进行交互,并将网站指标推送到我们的主题。创建一个名为mykafka.py的新文件并添加以下内容:
    来自 kafka import KafkaProducer
    导入 json


    class MyKafka(object):
    def __init__(self,kafka_brokers):
    自我。producer = KafkaProducer(
    value_serializer = lambda v:json。转储(v)。编码('utf-8'),
    bootstrap_servers = kafka_brokers

    def send_page_data(self,json_data):
    自我。制片人。发送('admintome-pages',json_data)
    首先,我们导入kafka-python库,特别是KafkaProducer类,它将让我们编写Kafka生产者代码并将消息发布到我们的Kafka主题。
    来自 kafka import KafkaProducer
    我们现在定义我们的 MyKafka 类并为它创建构造函数:
    class MyKafka(object):
    def __init__(self,kafka_brokers):
    这需要一个参数来表示将用于连接到我们的Kafka集群的Kafka代理。这是一个字符串数组,形式如下:
    [ “broker:ip”,“broker:ip” ]
    我们将只使用一个代理,它是我们在上一篇文章中创建的代理:mslave1.admintome.lab:31000:
    [ “mslave1.admintome.lab:31000” ]
    我们接下来实例化一个KafkaProducer 名为的新 对象producer。由于我们将以JSON的形式向Kafka发送数据,因此我们告诉 KafkaProducer 使用JSON解码器转储来使用value_serializer 参数解析数据 。我们还告诉它使用我们的经纪人参数。bootstrap_servers
    自我。producer = KafkaProducer(
    value_serializer = lambda v:json。转储(v)。编码('utf-8'),
    bootstrap_servers = kafka_brokers

    最后,我们创建一个新方法,用于将消息发送到我们的 admintome-pages 主题:

    def send_page_data(self,json_data):
    自我。制片人。发送('admintome-pages',json_data)
    这里的所有都是它的。现在我们将编写将控制一切的Main类。
    主类
    创建一个名为main.py的新文件并添加以下内容:
    来自 clicky 导入 Clicky
    来自 mykafka 导入 MyKafka
    导入 日志
    进口 时间
    进口 OS
    来自 伐木。config import dictConfig
    class Main(对象):
    def __init__(个体经营):
    如果是 'KAFKA_BROKERS' 在 os中。环境:
    kafka_brokers = os。环境 [ 'KAFKA_BROKERS' ]。分裂(',')
    否则:
    引发 ValueError('KAFKA_BROKERS环境变量未设置')
    如果 'SITE_ID' 的 OS。环境:
    自我。site_id = os。环境 [ 'SITE_ID' ]
    否则:
    引发 ValueError('SITE_ID环境变量未设置')
    如果是 'SITEKEY' 在 os中。环境:
    自我。sitekey = os。环境 [ 'SITEKEY' ]
    否则:
    引发 ValueError('未设置'SITEKEY环境变量')
    logging_config = dict(
    版本 = 1,
    formatters = {
    'f':{ 'format':

    '%(asctime)s%(name)-12s%(levelname)-8s%(message)s' }
    },
    处理程序 = {
    'h':{ 'class':'logging.StreamHandler',
    'formatter':'f',
    '级别':记录。DEBUG }
    },
    root = {
    '处理程序':[ 'h' ],
    '级别':记录。调试,
    },

    自我。logger = logging。getLogger()
    dictConfig(logging_config)
    自我。记录器。info(“初始化卡夫卡制片人”)
    自我。记录器。info(“KAFKA_BROKERS = {0}”。format(kafka_brokers))
    自我。mykafka = MyKafka(kafka_brokers)
    def init_clicky(self):
    自我。clicky = Clicky(自我。SITE_ID,自我。sitekey)
    自我。记录器。info(“Clicky Stats Polling Initialized”)
    def run(self):
    自我。init_clicky()
    starttime = 时间。时间()
    而 真:
    data = self。点击。get_pages_data()
    自我。记录器。info(“成功轮询Clicky页面数据”)
    自我。mykafka。send_page_data(数据)
    自我。记录器。info(“发布的页面数据到Kafka”)
    时间。睡眠(300.0 - ((时间。时间()- 开始时间)% 300.0))
    if __name__ == “ __ main__ ”:

    记录。info(“初始化Clicky Stats Polling”)
    main = Main()
    主要的。run()
    这个例子的最终状态是构建一个Docker容器,然后我们将在Marathon上运行它。考虑到这一点,我们不希望在代码中对我们的一些敏感信息(如我们的clicky站点ID和站点密钥)进行硬编码。我们希望能够从环境变量中提取这些内容。如果它们没有设置,那么我们通过异常并退出。
    如果是 'KAFKA_BROKERS' 在 os中。环境:
    kafka_brokers = os。环境 [ 'KAFKA_BROKERS' ]。分裂(',')
    否则:
    引发 ValueError('KAFKA_BROKERS环境变量未设置')
    如果 'SITE_ID' 的 OS。环境:
    自我。site_id = os。环境 [ 'SITE_ID' ]
    否则:
    引发 ValueError('SITE_ID环境变量未设置')
    如果是 'SITEKEY' 在 os中。环境:
    自我。sitekey = os。环境 [ 'SITEKEY' ]
    否则:
    引发 ValueError('未设置'SITEKEY环境变量')
    我们还配置日志记录,以便我们可以看到我们的应用程序正在发生什么。我在代码中编写了一个无限循环,用于轮询clicky并每隔五分钟将指标推送到我们的Kafka主题。
    def run(self):
    自我。init_clicky()
    starttime = 时间。时间()

    而 真:
    data = self。点击。get_pages_data()
    自我。记录器。info(“成功轮询Clicky页面数据”)
    自我。mykafka。send_page_data(数据)
    自我。记录器。info(“发布的页面数据到Kafka”)
    时间。睡眠(300.0 - ((时间。时间()- 开始时间)% 300.0))
    保存文件并退出。
    运行我们的应用
    要测试一切正常,您可以在设置环境变量后尝试运行应用程序:
    (摄入量)$ export KAFKA_BROKERS = “mslave1.admintome.lab:31000”
    (进口)$ export SITE_ID = “{your site id}”
    (入口)$ export SITEKEY = “{your sitekey}”
    (摄取)$ python main.py
    2018 -06 -25 15:34:32,259 root INFO初始化Kafka Producer
    2018 -06 -25 15:34:32,259 root INFO KAFKA_BROKERS = [ 'mslave1.admintome.lab:31000' ]
    2018 -06 -25 15:34:32,374 root INFO Clicky Stats Polling Initialized
    2018 -06 -25 15:34:32,754 root INFO成功轮询Clicky页面数据
    2018 -06 -25 15:34:32,755 root INFO向Kafka发布的页面数据
    我们正在向我们的Kafka主题发送消息!接下来我们将构建我们的Docker容器并将其部署到Marathon。最后,我们将通过编写一个测试消费者来完成,该消费者将从我们的主题中获取消息。
    我为本文中使用的所有代码创建了一个GitHub存储库:https://github.com/admintome/clicky-state-intake
    现在我们已经编写了应用程序代码,我们可以创建一个Docker容器,以便我们可以将它部署到Marathon。使用以下内容在应用程序目录中创建Dockerfile文件:

    来自python:3
    WORKDIR / usr / src / app
    COPY requirements.txt ./
    运行pip install --no-cache-dir -r requirements.txt
    复制。。
    CMD [“python”,“。/ main.py”]
    构建容器
    $ docker build -t {你的码头中心用户名} site-stats-intake。
    Docker构建完成后,您需要将其推送到您的Mesos Slaves可以访问的Docker存储库。对我来说,这是Docker Hub:
    $ docker push -t admintome / site-stats-intake
    然后登录到每个Mesos从站并向下拉图像:
    $ docker pull admintome / site-stats-intake
    我们现在准备为我们的应用程序创建Marathon应用程序部署。
    转到Marathon GUI。
    HTTP://mesos1.admintome.lab:8080
    单击“创建应用程序”按钮。然后单击JSON模式按钮:

    三分钟之类教你学会数据架构的Kafka ,Python教程


    粘贴以下JSON代码:
    {
    “id”:“site-stats-intake”,
    “cmd”:null,
    “cpus”:1,
    “mem”:128,
    “disk”:0,
    “实例”:1,
    “container”:{
    “docker”:{
    “image”:“admintome / site-stats-intake”
    },
    “type”:“DOCKER”
    },
    “网络”:[
    {
    “模式”:“主持人”
    }
    ]
    “env”:{
    “KAFKA_BROKERS”:“192.168.1.x:port”,
    “SITE_ID”:“{your site_id}”,
    “SITEKEY”:“{your sitekey}”
    }
    }
    请确保在环境的env部分中替换KAFKA_BROKERS,SITE_ID和SITEKEY的正确值。
    最后,单击Create Application按钮以部署应用程序。几秒钟后,您应该看到应用程序正在运行。

    三分钟之类教你学会数据架构的Kafka ,Python教程


    要查看日志,请单击site-stats-intake应用程序,然后单击stderr链接以下载包含日志的文本文件。
    现在我们将我们的应用程序部署到Marathon,我们将编写一个简短的消费者,我们将在我们的开发系统上运行,以向我们展示已收到的消息。
    这将是一个简单的Kafka消费者,它将检查主题并显示有关该主题的所有消息。在这一点上没有用,但它让我们知道我们的小轮询应用程序正常工作。
    创建一个名为consumer.py的新文件并添加以下内容:
    导入 系统
    来自 kafka 进口 KafkaConsumer
    consumer = KafkaConsumer('admintome-pages',bootstrap_servers = “mslave1.admintome.lab:31000”,
    auto_offset_reset = 'earliest')
    尝试:
    对于 消息 的 消费者:
    打印(消息。值)
    除 KeyboardInterrupt 外:
    SYS。退出()
    保存并退出该文件。这使得Kafka经纪人硬编码,因为我们只是用它来测试一切。确保使用代理名称和端口更新bootstrap-servers参数。
    现在运行该命令,您应该看到大量的JSON代表您访问最多的页面:
    (摄取)$ python consumer.py


    b '[{“type”:“pages”,“dates”:[{“date”:“2018-06-25”,“items”:[{“value”:“145”,“value_percent”:“43.2 “,”title“:”快速数据架构的Kafka教程 - AdminTome Blog“,”stats_url“:”http://clicky.com/stats/visitors?site_id=101045340&date=2018-06-25&href=%2Fblog%2Fkafka- tutorial-for-fast-data-architecture%2F“,”url“:”http://www.admintome.com/blog/kafka-tutorial-for-fast-data-architecture/“},...
    我们现在有一个数据管道,其中包含一些我们可以使用的数据。下一步将使用该数据并进行分析。在本文中,我们将安装和配置我们的SMACK堆栈的下一部分,即Apache Spark。我们还将配置它分析我们的数据并给我们一些有意义的东西。
    大家多多关注,你的关注是我最大的动力,会不定期有干货更新。
    想要大数据学习资料的,可以私信我:学习,既可获得大数据学习资料。希望可以帮到大家啦。


    分享到:


    相關文章: