网站首页 > 技术教程 正文
项目经历时间:2022年7月-2022年8月
项目人员:3人
项目环境:
9台虚拟机(使用Centos7系统)
第1,2号虚拟机做代理集群,用于做负载均衡和反向代理(nginx/1.20.1)
第3,4,5号虚拟机用做应用集群,提供一个静态页面展(nginx/1.20.1,filebeat)
第6,7,8号虚拟机做消息中间件,基于kafka集群和zookeeper集群(kafka2.12,zookeeper3.6)
第9台虚拟机做mysql数据库来收集数据(MySQL5.7.34)
项目简介:
此项目是用于模拟企业公司进行大工作时产生的大数据进行日志收集,并对其进行清洗,将需要的数据存入数据库中
项目步骤:
1.规划好整个项目的拓扑结构和思维导图,并细分解析每一步需要做的事
2.安装好每个虚拟机需要的环境,模块与软件。并且配置好静态ip,DNS域名解析,修改每台主机名方便区分每台虚拟机的作
3.利用两台虚拟机用作nginx代理集群,配置好keepalive双vip的环境用做负载均衡和高可用
4.利用三台nginx虚拟机来做web页面的静态展示,在etc/nginx/nginx.cof下配置好其端口号,源代码文本路径,访问日志的
保存路径。
5.使用三台虚拟机来搭建kakfa应用集群,用作消息中间件,修改/opt/kafka_2.12-2.8.1/config /server.properties文件来配
置broker,监听端口和zookeeper连接
6.再在kafka基础上搭建zookeeper来管理kafka集群,在/opt/apache-zookeeper-3.6.3-bin/confs文件下,配置相连的
kafka集群,创建/tmp/zookeeper目录,在目录添加myid文件,里面存放的是每台zookeeper的id
7.在web静态页面的三台虚拟机上部署filebeat,来实现读取对应位置的日志,上报到相应的kafka集群上去
8..开启zookeeper和kafka,创建topic和生产者消费者进行测试,检测生产者产生的数据能否被消费者消费
9.编写python脚本,创建消费者并连接MySQL数据库来存放消费的数据。使用了json,requests,time,pymysql模块实现
项目的详细过程:
1.准备环境
1.创建好9台linux虚拟机(Centos7系统)
2.配置好静态ip地址
vim /etc/sysconfig/network-scripts/ifcfg-ens33
3.配置好本地DNS服务器(114.114.114.114)
vim /etc/resolv.conf
4.修改主机名(此方法永久生效)
hostnamectl set-hostname +主机名
5.每一台机器上都写好域名解析(后续就可以直接使用主机名操作)
vim /etc/hosts
6.安装好需要的软件(wget用于获取web的数据,chronyd是时间同步服务)
yum install wget -y
yum install vim -y
yum install chronyd -y
7.关闭防火墙,打开chronyd服务
systemctl start chronyd
systemctl enable chronyd
systemctl stop firewalld
systemctl disable firewalld
8.关闭SELINUX,设置SELINUX=disable
vim /etc/selinux/config
2.搭建nignx
1.安装epel源和nignx服务
yum install epel-release -y
yum install nginx -y
2.启动nginx并设置其开机自启
systemctl start nginx
systemctl enable nginx
3.编辑配置文件
vim /etc/nginx/conf.d/sc.conf
server {
listen 80 default_server;
server_name www.sc.com;
root /usr/share/nginx/html;
access_log /var/log/nginx/sc/access.log main;
location / {
}
}
4.重启nginx服务
nginx -s reload
3.在nignx上搭建kafka和zookeeper
ps:kafka是一种消息中间件,和其他MQ相比,有着单机10万级高吞吐量,高可用性强,分布式,一个partition多个replica,少数宕机不会丢失数据,一般配合大数据类系统进行实时数据计算,日志分析场景。
broker:kafka的节点。一台服务器相当于一个节点
topic:主题,消息的分类。比如nginx,mysql日志给不同的主题,就是不同的类型。
partition:分区。提高吞吐量,提高并发性。(多个partition会导致消息顺序混乱,如果对消息顺序有要求就只设置一个partition就可以了)
replica: 副本。完整的分区备份。
zookeeper是一种分布式应用协调管理服务,具有配置管理,域名管理,分布式数据存储,集群管理等功能,在本次项目中用于对kafka集群进行配置(topic,partition,replica等)管理
1.安装基础软件
yum install java wget -y
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar xf kafka_2.12-2.8.1.tgz
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
tar xf apache-zookeeper-3.6.3-bin.tar.gz
2.搭建kafka
vim /opt/kafka_2.12-2.8.1/config /server.properties
修改以下代码
broker.id=1(第x台这里就填x)
listeners=PLAINTEXT://nginx-kafka01(主机名):9092
zookeeper.connect=192.168.127.128:2181,192.168.127.133:2181,192.168.127.134:2181(三台虚拟机的IP)
3.在kafka基础上搭建zookeeper
cd /opt/apache-zookeeper-3.6.3-bin/confs
cp zoo_sample.cfg zoo.cfg
#修改zoo.cfg, 添加如下三行:
server.1=192.168.127.128:3888:4888
server.2=192.168.127.133:3888:4888
server.3=192.168.127.134:3888:4888
4.创建/tmp/zookeeper目录 ,在目录中添加myid文件
第一台机器上
echo 1 > /tmp/zookeeper/myid
以此类推
5.搭建完成后准备启动服务(注意:开启zookeeper和kafka的时候,一定是先启动zookeeper,再启动kafka;关闭服务的时候,kafka先关闭,再关闭zookeeper)
zookeeper启动
bin/zkServer.sh start
kafka启动
bin/kafka-server-start.sh -daemon config/server.properties
查看zookeeper是否成功管理kafka
cd /opt/apache-zookeeper-3.6.3-bin
cd bin
./zkCli.sh
正确的显示如下:
6.创建topic
bin/kafka-topics.sh --create --zookeeper 192.168.127.128:2181 --replication-factor 3 --partitions 3 --topic sc
bin/kafka-topics.sh --list --zookeeper 192.168.127.128:2181
7.创建生产者消费者测试消息中间件是否能正常运行
#创建生产者
bin/kafka-console-producer.sh --broker-list 192.168.127.128:9092 --topic sc
#创建消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.127.134:9092 --topic sc --from-beginning
成功则如下:
4.部署filebeat
1.安装依赖包
rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
2.vim /etc/yum.repos.d/fb.repo
[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
3.安装filebeat
yum install filebeat -y
4.vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/nginx/sc/access.log
#==========------------------------------kafka-----------------------------------
output.kafka:
hosts: ["192.168.1.213:9092","192.168.1.214:9092","192.168.1.215:9092"]
topic: nginxlog
keep_alive: 10s
5.启动filebeat并设置开机自启
systemctl start filebeat
systemctl enable filebeat
启动成功如下:
5.编写python脚本,创建消费者并连接MySQL数据库来存放消费的数据
import json
import requests
import time
import pymysql
#连接数据库
db = pymysql.connect(
host = "192.168.127.128", #mysql主机ip
user = "sc", #用户名
passwd = "123456", #密码
database = "nginx" #数据库
)
taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
#查询ip地址的信息(省份和运营商isp),通过taobao网的接口
def resolv_ip(ip):
response = requests.get(taobao_url+ip)
if response.status_code == 200:
tmp_dict = json.loads(response.text)
prov = tmp_dict["data"]["region"]
isp = tmp_dict["data"]["isp"]
return prov,isp
return None,None
#将日志里读取的格式转换为我们指定的格式
def trans_time(dt):
#把字符串转成时间格式
timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
#timeStamp = int(time.mktime(timeArray))
#把时间格式转成字符串
new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
return new_time
#从kafka里获取数据,清洗为我们需要的ip,时间,带宽
from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.127.128:9092,192.168.127.133:9092,192.168.127.134:9092")
topic = client.topics['nginxlog']
balanced_consumer = topic.get_balanced_consumer(
consumer_group='testgroup',
#自动提交offset
auto_commit_enable=True,
zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
)
#consumer = topic.get_simple_consumer()
i = 1
for message in balanced_consumer:
if message is not None:
line = json.loads(message.value.decode("utf-8"))
log = line["message"]
tmp_lst = log.split()
ip = tmp_lst[0]
dt = tmp_lst[3].replace("[","")
bt = tmp_lst[9]
dt = trans_time(dt)
prov, isp = resolv_ip(ip)
if prov and isp:
print(dt,prov,isp,bt)
cursor = db.cursor()
try:
cursor.execute(f"insert into mynginxlog values({i},{dt},'{prov}','{isp}',{bt})")
db.commit()
i += 1
except Exception as e:
print("插入失败",e)
db.rollback()
# create table mynginxlog(
# id int primary key auto_increment,
# dt datetime not null,
# prov varchar(20),
# isp varchar(20),
# bd float
# )charset=utf8;
#关闭数据库
db.close()
得到效果如下:
6.项目心得
由于经验不足,遇到的问题还是比较多的,比如nginx服务器有时没有全部开全导致消费者消费失败,编写python脚本也比较
容易出现失误导致程序没法正确运行等。
但通过这次项目还是收获了许多:
1.因为有提前写好拓扑结构和思维导图使得整个流程还是较为顺利,出现错误也能够较快发现错误发生的地方。
2.更加的深入的了解了kafka和zookeeper的原理。
3.了解到了许多新的知识,zookeeper的脑裂,nginx的代理集群可利用keepalive做负载均衡和高可用,zookeeper的选举方
式等
4.提高了团队交流和协作能力,遇到问题大家能一起想方式。
5.提高了自主学习的能力
猜你喜欢
- 2024-10-14 使用分享 | minio 远程客户端mc备份nginx access日志
- 2024-10-14 Filebeat配置顶级字段Logstash在output输出到Elasticsearch使用
- 2024-10-14 详解日志采集工具Logstash 安装部署及常用配置
- 2024-10-14 rsyslog(手工配置)配置文件教程讲解
- 2024-10-14 分享一个实用脚本获取access.log的请求url和查其中最耗时的接口
- 2024-10-14 nginx 日志分析之 access.log 格式详解
- 2024-10-03 领导:如何使用GoAccess构建实时日志分析系统
- 2024-10-03 SpringCloud 日志在压测中的二三事
- 2024-10-03 手把手教程:使用Docker创建Nginx,实现Nginx日志分割
- 2024-10-03 goaccess 分析nginx log(nginx access配置)
你 发表评论:
欢迎- 最近发表
-
- 阿里P8大佬总结的Nacos入门笔记,从安装到进阶小白也能轻松学会
- Linux环境下,Jmeter压力测试的搭建及报错解决方法
- Java 在Word中合并单元格时删除重复值
- 解压缩软件哪个好用?4款大多数人常用的软件~
- Hadoop高可用集群搭建及API调用(hadoop3高可用)
- lombok注解@Data没有toString和getter、setter问题
- Apache Felix介绍(apache fineract)
- Spring Boot官方推荐的Docker镜像编译方式-分层jar包
- Gradle 使用手册(gradle详细教程)
- 字节二面:为什么SpringBoot的 jar可以直接运行?
- 标签列表
-
- 下划线是什么 (87)
- 精美网站 (58)
- qq登录界面 (90)
- nginx 命令 (82)
- nginx .http (73)
- nginx lua (70)
- nginx 重定向 (68)
- Nginx超时 (65)
- nginx 监控 (57)
- odbc (59)
- rar密码破解工具 (62)
- annotation (71)
- 红黑树 (57)
- 智力题 (62)
- php空间申请 (61)
- 按键精灵 注册码 (69)
- 软件测试报告 (59)
- ntcreatefile (64)
- 闪动文字 (56)
- guid (66)
- abap (63)
- mpeg 2 (65)
- column (63)
- dreamweaver教程 (57)
- excel行列转换 (56)
本文暂时没有评论,来添加一个吧(●'◡'●)