Python动态服务器网页(需要使用WSGI接口),基本实现步骤如下:
1.等待客户端的链接,服务器会收到一个http协议的请求数据报
2.利用正则表达式对这个请求数据报进行解析(请求方式、提取出文件的环境)
3.提取出文件的环境之后,利用截断取片的方法将文件名转化为模块名称
4.使用m = __import__(),就可以得到返回值为m的模块
5.创建一个env字典:其中包含的是请求方式及文件环境等各种的键值对
6.创建一个新的动态脚本,其中定义了application这个函数,必须包含env和start_response的参数(也是服务器里的调用方法)
7.在这个动态脚本中定义状态码status和响应头headers(注意是字典形式,如Content-Type)
8.然后再调用start_response(status,headers),但是要注意,这个函数在服务器被定义
9.在动态脚本中编写动态执行程序
10.m.appliction的返回值就是回应数据包的body,它的数据头在start_response被整合
11.将数据头与数据body拼接起来,然后发送给客户端,就可显示动态网页
MyWebServer
import socket
import re
import sys
from multiprocessing import Process
from MyWebFramework import Application
# 设置静态文件根目录
HTML_ROOT_DIR = "./html"
WSGI_PYTHON_DIR = "./wsgipython"
class HTTPServer(object):
""""""
def __init__(self, application):
"""构造函数, application指的是框架的app"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.app = application
def start(self):
self.server_socket.listen(128)
while True:
client_socket, client_address = self.server_socket.accept()
#print("[%s,%s]用户连接上了" % (client_address[0],client_address[1]))
print("[%s, %s]用户连接上了" % client_address)
handle_client_process = Process(target=self.handle_client, args=(client_socket,))
handle_client_process.start()
client_socket.close()
def start_response(self, status, headers):
"""
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
star
"""
response_headers = "HTTP/1.1 " + status + "\r\n"
for header in headers:
response_headers += "%s: %s\r\n" % header
self.response_headers = response_headers
def handle_client(self, client_socket):
"""处理客户端请求"""
# 获取客户端请求数据
request_data = client_socket.recv(1024)
print("request data:", request_data)
request_lines = request_data.splitlines()
for line in request_lines:
print(line)
# 解析请求报文
# 'GET / HTTP/1.1'
request_start_line = request_lines[0]
# 提取用户请求的文件名
print("*" * 10)
print(request_start_line.decode("utf-8"))
file_name = re.match(r"\w+ +(/[^ ]*) ", request_start_line.decode("utf-8")).group(1)
method = re.match(r"(\w+) +/[^ ]* ", request_start_line.decode("utf-8")).group(1)
env = {
"PATH_INFO": file_name,
"METHOD": method
}
response_body = self.app(env, self.start_response)
response = self.response_headers + "\r\n" + response_body
# 向客户端返回响应数据
client_socket.send(bytes(response, "utf-8"))
# 关闭客户端连接
client_socket.close()
def bind(self, port):
self.server_socket.bind(("", port))
def main():
sys.path.insert(1, WSGI_PYTHON_DIR)
if len(sys.argv) < 2:
sys.exit("python MyWebServer.py Module:app")
# python MyWebServer.py MyWebFrameWork:app
module_name, app_name = sys.argv[1].split(":")
# module_name = "MyWebFrameWork"
# app_name = "app"
m = __import__(module_name)
app = getattr(m, app_name)
http_server = HTTPServer(app)
# http_server.set_port
http_server.bind(8000)
http_server.start()
if __name__ == "__main__":
main()
MyWebFrameWork
import time
# from MyWebServer import HTTPServer
# 设置静态文件根目录
HTML_ROOT_DIR = "./html"
class Application(object):
"""框架的核心部分,也就是框架的主题程序,框架是通用的"""
def __init__(self, urls):
# 设置路由信息
self.urls = urls
def __call__(self, env, start_response):
path = env.get("PATH_INFO", "/")
# /static/index.html
if path.startswith("/static"):
# 要访问静态文件
file_name = path[7:]
# 打开文件,读取内容
try:
file = open(HTML_ROOT_DIR + file_name, "rb")
except IOError:
# 代表未找到路由信息,404错误
status = "404 Not Found"
headers = []
start_response(status, headers)
return "not found"
else:
file_data = file.read()
file.close()
status = "200 OK"
headers = []
start_response(status, headers)
return file_data.decode("utf-8")
for url, handler in self.urls:
#("/ctime", show_ctime)
if path == url:
return handler(env, start_response)
# 代表未找到路由信息,404错误
status = "404 Not Found"
headers = []
start_response(status, headers)
return "not found"
def show_ctime(env, start_response):
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
start_response(status, headers)
return time.ctime()
def say_hello(env, start_response):
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
start_response(status, headers)
return "hello itcast"
def say_haha(env, start_response):
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
start_response(status, headers)
return "hello haha"
urls = [
("/", show_ctime),
("/ctime", show_ctime),
("/sayhello", say_hello),
("/sayhaha", say_haha),
]
app = Application(urls)
# if __name__ == "__main__":
# urls = [
# ("/", show_ctime),
# ("/ctime", show_ctime),
# ("/sayhello", say_hello),
# ("/sayhaha", say_haha),
# ]
# app = Application(urls)
# http_server = HTTPServer(app)
# http_server.bind(8000)
# http_server.start()
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..
最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru
在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url
我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b
您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除
我意识到这可能是一个非常基本的问题,但我现在已经花了几天时间回过头来解决这个问题,但出于某种原因,Google就是没有帮助我。(我认为部分问题在于我是一个初学者,我不知道该问什么......)我也看过O'Reilly的RubyCookbook和RailsAPI,但我仍然停留在这个问题上.我找到了一些关于多态关系的信息,但它似乎不是我需要的(尽管如果我错了请告诉我)。我正在尝试调整MichaelHartl'stutorial创建一个包含用户、文章和评论的博客应用程序(不使用脚手架)。我希望评论既属于用户又属于文章。我的主要问题是:我不知道如何将当前文章的ID放入评论Controller。